diff --git a/backend/alembic/versions/003_add_report_queue.py b/backend/alembic/versions/003_add_report_queue.py new file mode 100644 index 0000000..e261f07 --- /dev/null +++ b/backend/alembic/versions/003_add_report_queue.py @@ -0,0 +1,45 @@ +"""Add report_queue table for background report generation + +Revision ID: 003 +Revises: 38207dbbac44 +Create Date: 2026-04-14 +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = '003' +down_revision: Union[str, None] = '38207dbbac44' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + 'report_queue', + sa.Column('id', sa.Integer(), primary_key=True, index=True), + sa.Column('report_name', sa.String(), nullable=False), + sa.Column('category_id', sa.Integer(), nullable=False), + sa.Column('status', sa.String(), nullable=False, server_default='PENDING'), + sa.Column('progress', sa.Integer(), nullable=False, server_default='0'), + sa.Column('message', sa.Text(), nullable=True), + sa.Column('result_report_id', sa.Integer(), nullable=True), + sa.Column('error', sa.Text(), nullable=True), + sa.Column('worker_id', sa.String(), nullable=True), + sa.Column('created_at', sa.DateTime(), server_default=sa.func.now()), + sa.Column('started_at', sa.DateTime(), nullable=True), + sa.Column('completed_at', sa.DateTime(), nullable=True), + sa.Column('updated_at', sa.DateTime(), server_default=sa.func.now()), + ) + + op.create_index('idx_report_queue_status', 'report_queue', ['status']) + op.create_index('idx_report_queue_created_at', 'report_queue', ['created_at']) + + +def downgrade() -> None: + op.drop_index('idx_report_queue_created_at', table_name='report_queue') + op.drop_index('idx_report_queue_status', table_name='report_queue') + op.drop_table('report_queue') diff --git a/backend/database.py b/backend/database.py index fdaf230..32a561f 100644 --- a/backend/database.py +++ b/backend/database.py @@ -1,7 +1,7 @@ """ Database setup and models - PostgreSQL """ -from sqlalchemy import create_engine, Column, Integer, String, DateTime, Boolean, ForeignKey +from sqlalchemy import create_engine, Column, Integer, String, DateTime, Boolean, ForeignKey, Text from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker, relationship from datetime import datetime @@ -12,8 +12,7 @@ log = get_logger("db") # PostgreSQL database - configurable via environment variable # Default: Local PostgreSQL for development -# Docker: postgresql://postgres:trendyol123@postgres:5432/trendyol_db -DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://postgres:trendyol123@localhost:5433/trendyol_db") +DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://postgres@localhost:5433/trendyol_db") engine = create_engine(DATABASE_URL, pool_pre_ping=True) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) @@ -84,6 +83,25 @@ class EnrichmentError(Base): created_at = Column(DateTime, default=datetime.utcnow) +class ReportQueue(Base): + """Queue for report generation tasks""" + __tablename__ = "report_queue" + + id = Column(Integer, primary_key=True, index=True) + report_name = Column(String, nullable=False) + category_id = Column(Integer, nullable=False) + status = Column(String, nullable=False, default="PENDING") + progress = Column(Integer, nullable=False, default=0) + message = Column(Text, nullable=True) + result_report_id = Column(Integer, nullable=True) + error = Column(Text, nullable=True) + worker_id = Column(String, nullable=True) + created_at = Column(DateTime, default=datetime.utcnow) + started_at = Column(DateTime, nullable=True) + completed_at = Column(DateTime, nullable=True) + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + + def init_db(): """Initialize database - create tables""" Base.metadata.create_all(bind=engine) diff --git a/backend/main.py b/backend/main.py index 892edce..f40fa2e 100644 --- a/backend/main.py +++ b/backend/main.py @@ -1,9 +1,10 @@ """ FastAPI Backend for Trendyol Admin Panel """ -from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks +from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks, Request, Security from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse +from fastapi.security import APIKeyHeader import asyncio import json as json_module from sqlalchemy.orm import Session @@ -22,7 +23,7 @@ from collections import OrderedDict from threading import Lock import os -from database import SessionLocal, Category, Snapshot, Report, EnrichmentError, init_db +from database import SessionLocal, Category, Snapshot, Report, EnrichmentError, ReportQueue, init_db from google_trends_helper import estimate_traffic_sources, fetch_google_trends from logging_config import setup_logging, get_logger, set_correlation_id, set_report_id, log_timing @@ -40,6 +41,32 @@ log_keywords = get_logger("keywords") init_db() +# ============================================================================ +# API KEY AUTHENTICATION +# ============================================================================ + +API_KEY = os.getenv("API_KEY") +if not API_KEY: + import warnings + warnings.warn("API_KEY env var not set! API authentication disabled in development.") +API_KEY_HEADER = APIKeyHeader(name="X-API-Key", auto_error=False) + +# Paths that do not require API key authentication +PUBLIC_PATHS = {"/health"} + +async def verify_api_key(request: Request, api_key: Optional[str] = Security(API_KEY_HEADER)): + """ + Global dependency that validates X-API-Key header. + Skips authentication for public paths (health check, docs). + """ + if request.url.path in PUBLIC_PATHS: + return + if not api_key or api_key != API_KEY: + raise HTTPException( + status_code=401, + detail="Invalid or missing API key" + ) + # GS1 Barcode Prefix to Country Mapping (EAN-13 / EAN-8) # Source: https://www.gs1.org/standards/id-keys/company-prefix BARCODE_PREFIX_TO_COUNTRY = { @@ -259,7 +286,11 @@ def get_country_from_barcode(barcode: str) -> str: prefix = barcode[:3] return BARCODE_PREFIX_TO_COUNTRY.get(prefix, "Bilinmeyen") -app = FastAPI(title="Trendyol Admin API", version="1.0.0") +app = FastAPI( + title="Trendyol Admin API", + version="1.0.0", + dependencies=[Depends(verify_api_key)] +) # Base directory for resolving relative paths BASE_DIR = os.path.dirname(os.path.abspath(__file__)) @@ -273,45 +304,18 @@ CATEGORY_TREE_PATH = os.getenv("CATEGORY_TREE_PATH", os.path.join(BASE_DIR, ".." DATABASE_PATH = os.getenv("DATABASE_PATH", os.path.join(BASE_DIR, "trendyol.db")) # CORS for React admin panel -# Security: Specify exact origins instead of wildcard -# Supports: Local development, Docker Compose, and production deployment -allowed_origins = [ - # Local development (Vite dev server) - "http://localhost:5173", - "http://localhost:5174", - "http://localhost:5175", - "http://localhost:5176", - "http://localhost:3000", - "http://127.0.0.1:5173", - "http://127.0.0.1:5174", - "http://127.0.0.1:5175", - "http://127.0.0.1:5176", - "http://127.0.0.1:3000", - # Docker Compose internal networking - "http://frontend", - "http://frontend:80", - # Docker host access (mapped ports) - "http://localhost:80", - "http://localhost:8080", - "http://127.0.0.1:80", - "http://127.0.0.1:8080", - # Production server (Coolify) - "http://194.187.253.230:3010", - "http://194.187.253.230", - # Coolify Traefik proxy (sslip.io) - "http://trendyol.194.187.253.230.sslip.io", - "https://trendyol.194.187.253.230.sslip.io", - "http://trendyol-api.194.187.253.230.sslip.io", - "https://trendyol-api.194.187.253.230.sslip.io", -] - -# Add production domain from environment variable -frontend_url = os.getenv("FRONTEND_URL") -if frontend_url: - allowed_origins.append(frontend_url) - # Also add https variant if http is provided - if frontend_url.startswith("http://"): - allowed_origins.append(frontend_url.replace("http://", "https://")) +# Security: Environment-based origin control +allowed_origins = [] +if os.getenv("ENV", "development") == "production": + allowed_origins = [ + "https://trendyol.194.187.253.230.sslip.io", + ] +else: + allowed_origins = [ + "http://localhost:5173", + "http://localhost:3000", + "http://127.0.0.1:5173", + ] app.add_middleware( CORSMiddleware, @@ -1603,7 +1607,8 @@ async def create_report( # This is a leaf — return itself cat = tree_by_id.get(parent_id) if cat: - return [(None, cat["name"], cat["id"])] + path_model = (cat.get("url") or "").lstrip("/") or None + return [(path_model, cat["name"], cat["id"])] return [] leaves = [] for child in children: @@ -1883,6 +1888,68 @@ async def create_report( return StreamingResponse(progress_stream(), media_type="text/event-stream") +# ============================================================================ +# QUEUE ENDPOINTS (new — replaces SSE for SellerX Java backend) +# ============================================================================ + +class QueueSubmitRequest(BaseModel): + name: str + category_id: int + +@app.post("/api/queue/submit", status_code=202) +def submit_to_queue(req: QueueSubmitRequest, db: Session = Depends(get_db)): + """Submit a report generation task to the queue. Returns 202 Accepted.""" + # Validate category exists + tree = _load_category_tree() + tree_by_id = {c["id"]: c for c in tree} + if req.category_id not in tree_by_id: + raise HTTPException(status_code=404, detail=f"Category {req.category_id} not found") + + queue_item = ReportQueue( + report_name=req.name, + category_id=req.category_id, + status="PENDING", + progress=0, + ) + db.add(queue_item) + db.commit() + db.refresh(queue_item) + + # Calculate position in queue + position = db.query(func.count(ReportQueue.id)).filter( + ReportQueue.status == "PENDING", + ReportQueue.created_at <= queue_item.created_at, + ).scalar() or 1 + + log_api.info(f"Queue submit: id={queue_item.id}, name={req.name}, category={req.category_id}, position={position}") + return {"queue_id": queue_item.id, "status": "PENDING", "position": position} + + +@app.get("/api/queue/{queue_id}/status") +def get_queue_status(queue_id: int, db: Session = Depends(get_db)): + """Get the status of a queued report task.""" + item = db.query(ReportQueue).filter(ReportQueue.id == queue_id).first() + if not item: + raise HTTPException(status_code=404, detail="Queue item not found") + + return { + "queue_id": item.id, + "status": item.status, + "progress": item.progress, + "message": item.message or "", + "result_report_id": item.result_report_id, + "error": item.error, + } + + +@app.get("/api/queue/active") +def get_queue_info(db: Session = Depends(get_db)): + """Get queue overview: pending and processing counts.""" + pending = db.query(func.count(ReportQueue.id)).filter(ReportQueue.status == "PENDING").scalar() or 0 + processing = db.query(func.count(ReportQueue.id)).filter(ReportQueue.status == "PROCESSING").scalar() or 0 + return {"pending_count": pending, "processing_count": processing} + + # Update report @app.get("/api/reports/{report_id}", response_model=ReportResponse) @@ -4040,6 +4107,24 @@ async def _start_resource_logger(): _resource_logger.info("Periodic resource logger started (60s interval)") +# ── Queue Worker Lifecycle ────────────────────────────────────────────── +_queue_worker = None + +@app.on_event("startup") +async def _start_queue_worker(): + global _queue_worker + from queue_worker import QueueWorker + _queue_worker = QueueWorker(poll_interval=2.0) + _queue_worker.start() + +@app.on_event("shutdown") +async def _stop_queue_worker(): + global _queue_worker + if _queue_worker: + _queue_worker.stop() + _queue_worker = None + + if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8001) diff --git a/backend/queue_worker.py b/backend/queue_worker.py new file mode 100644 index 0000000..1c0e7bc --- /dev/null +++ b/backend/queue_worker.py @@ -0,0 +1,386 @@ +""" +Queue Worker — single-threaded background worker for report generation. + +Polls PostgreSQL report_queue table, claims tasks with SELECT FOR UPDATE SKIP LOCKED, +runs scraping + enrichment inline, and marks tasks COMPLETED or FAILED. +""" +import os +import socket +import threading +import time +from datetime import datetime, timedelta +from typing import Optional + +from sqlalchemy import text +from logging_config import get_logger + +log = get_logger("queue_worker") + + +class QueueWorker: + def __init__(self, poll_interval: float = 2.0): + self.poll_interval = poll_interval + self._running = False + self._thread: Optional[threading.Thread] = None + self.worker_id = f"{socket.gethostname()}_{os.getpid()}" + + def start(self): + self._running = True + self._thread = threading.Thread( + target=self._run_loop, daemon=True, name="queue-worker" + ) + self._thread.start() + log.info(f"Queue worker started: {self.worker_id}") + + def stop(self): + self._running = False + log.info(f"Queue worker stopping: {self.worker_id}") + + def _run_loop(self): + # Wait a few seconds for app startup to complete + time.sleep(3) + self._recover_stuck_tasks() + + while self._running: + try: + claimed = self._claim_next_task() + if claimed: + self._process_task(claimed) + else: + time.sleep(self.poll_interval) + except Exception as e: + log.error(f"Queue worker loop error: {e}", exc_info=True) + time.sleep(5) + + def _claim_next_task(self) -> Optional[int]: + """Claim the oldest PENDING task using SELECT FOR UPDATE SKIP LOCKED.""" + from database import SessionLocal + + db = SessionLocal() + try: + result = db.execute(text(""" + UPDATE report_queue + SET status = 'PROCESSING', + worker_id = :worker_id, + started_at = NOW(), + updated_at = NOW() + WHERE id = ( + SELECT id FROM report_queue + WHERE status = 'PENDING' + ORDER BY created_at ASC + LIMIT 1 + FOR UPDATE SKIP LOCKED + ) + RETURNING id + """), {"worker_id": self.worker_id}) + db.commit() + + row = result.fetchone() + if row: + queue_id = row[0] + log.info(f"Claimed task {queue_id}") + return queue_id + return None + except Exception as e: + db.rollback() + log.error(f"Failed to claim task: {e}") + return None + finally: + db.close() + + def _process_task(self, queue_id: int): + """Process a single queue task: scrape, save report, enrich.""" + from database import SessionLocal, Report, ReportQueue + import json as json_module + + db = SessionLocal() + try: + task = db.query(ReportQueue).filter(ReportQueue.id == queue_id).first() + if not task: + log.warning(f"Task {queue_id} not found") + return + + report_name = task.report_name + category_id = task.category_id + log.info(f"Processing task {queue_id}: name={report_name}, category={category_id}") + + self._update_progress(db, queue_id, 5, f"Kategori agaci yukleniyor...") + + # Load category tree + from main import _load_category_tree + tree = _load_category_tree() + tree_by_id = {c["id"]: c for c in tree} + + main_cat = tree_by_id.get(category_id) + if not main_cat: + self._mark_failed(db, queue_id, f"Category {category_id} not found") + return + + # Find leaf categories + def find_leaves(parent_id): + children = [c for c in tree if c.get("parentId") == parent_id] + if not children: + cat = tree_by_id.get(parent_id) + if cat: + path_model = (cat.get("url") or "").lstrip("/") or None + return [(path_model, cat["name"], cat["id"])] + return [] + leaves = [] + for child in children: + leaves.extend(find_leaves(child["id"])) + return leaves + + categories_to_scrape = find_leaves(category_id) + if not categories_to_scrape: + self._mark_failed(db, queue_id, "No scrapable categories found") + return + + main_cat_name = main_cat["name"] + self._update_progress( + db, queue_id, 8, + f"{main_cat_name}: {len(categories_to_scrape)} alt kategori bulundu" + ) + + # Import scrapers and rate limiter + from scraper import TrendyolSearchScraper, TrendyolScraper + from main import _trendyol_limiter, CATEGORIES_DIR, REPORTS_DIR + + results = { + "successful": 0, + "failed": 0, + "total_products": 0, + "details": [] + } + + # Scrape each leaf category + for idx, (path_model, cat_name, cat_id) in enumerate(categories_to_scrape, 1): + progress = int((idx / len(categories_to_scrape)) * 80) + 10 + + self._update_progress( + db, queue_id, progress, + f"[{idx}/{len(categories_to_scrape)}] {cat_name} cekiliyor..." + ) + + try: + products = [] + if path_model: + _trendyol_limiter.wait() + scraper = TrendyolSearchScraper(path_model) + products = scraper.fetch_all_products() + + # Enrich with socialProofs from Top Rankings API + if products and cat_id and not any(p.get("socialProofs") for p in products): + try: + _trendyol_limiter.wait() + top_scraper = TrendyolScraper(cat_id, page_size=20) + top_products = top_scraper.fetch_all_products(delay=0.5, max_pages=5) + if top_products: + social_map = {} + for tp in top_products: + tid = tp.get("id") or tp.get("contentId") + sp = tp.get("socialProofs", []) + if tid and sp: + social_map[int(tid)] = sp + if social_map: + for p in products: + pid = p.get("id") + if pid and int(pid) in social_map: + p["socialProofs"] = social_map[int(pid)] + log.info(f"Enriched {len(social_map)} products with socialProofs") + except Exception as e: + log.warning(f"Top Rankings enrichment failed: {e}") + + elif cat_id: + _trendyol_limiter.wait() + scraper = TrendyolScraper(cat_id) + products = scraper.fetch_all_products() + + if products: + os.makedirs(CATEGORIES_DIR, exist_ok=True) + file_id = cat_id if cat_id else path_model.replace("/", "_") + filename = f"{CATEGORIES_DIR}/{cat_name.replace(' ', '_')}_{file_id}.json" + + data = { + "category_id": cat_id, + "path_model": path_model, + "category_name": cat_name, + "total_products": len(products), + "scraped_at": datetime.now().isoformat(), + "products": products + } + with open(filename, 'w', encoding='utf-8') as f: + json_module.dump(data, f, ensure_ascii=False, indent=2) + + results["successful"] += 1 + results["total_products"] += len(products) + results["details"].append({ + "category_id": cat_id, + "path_model": path_model, + "category_name": cat_name, + "success": True, + "total_products": len(products), + "file_path": filename + }) + else: + results["failed"] += 1 + results["details"].append({ + "category_id": cat_id, + "path_model": path_model, + "category_name": cat_name, + "success": False, + "total_products": 0, + "file_path": None + }) + + except Exception as e: + results["failed"] += 1 + results["details"].append({ + "category_id": cat_id, + "path_model": path_model, + "category_name": cat_name, + "success": False, + "total_products": 0, + "file_path": None + }) + log.warning(f"Scrape failed for {cat_name}: {e}") + + # Rate limit between categories + time.sleep(1.5) + + # Generate combined report JSON + self._update_progress(db, queue_id, 88, "Rapor dosyasi olusturuluyor...") + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + safe_name = report_name.lower().replace(" ", "_") + for old, new in [("ı", "i"), ("ş", "s"), ("ğ", "g"), ("ü", "u"), ("ö", "o"), ("ç", "c")]: + safe_name = safe_name.replace(old, new) + + os.makedirs(REPORTS_DIR, exist_ok=True) + json_filename = f"{REPORTS_DIR}/{safe_name}_{timestamp}.json" + + combined_data = { + "report_name": report_name, + "category": main_cat_name, + "created_at": datetime.now().isoformat(), + "total_subcategories": len(categories_to_scrape), + "total_products": results["total_products"], + "details": results["details"] + } + with open(json_filename, 'w', encoding='utf-8') as f: + json_module.dump(combined_data, f, ensure_ascii=False, indent=2) + + # Save to DB + self._update_progress(db, queue_id, 93, "Veritabanina kaydediliyor...") + + new_report = Report( + name=report_name, + category_id=category_id, + total_products=results["total_products"], + total_subcategories=len(categories_to_scrape), + json_file_path=json_filename, + html_file_path=None, + created_at=datetime.now() + ) + try: + db.add(new_report) + db.commit() + db.refresh(new_report) + except Exception as db_err: + db.rollback() + log.warning(f"DB save failed (non-critical): {db_err}") + self._mark_failed(db, queue_id, f"DB save failed: {db_err}") + return + + report_id = new_report.id + + # Run enrichment INLINE (not daemon thread) + self._update_progress(db, queue_id, 95, "Sosyal kanit verileri toplanıyor...") + try: + from main import _enrich_report_task + _enrich_report_task(report_id) + log.info(f"Enrichment completed for report {report_id}") + except Exception as e: + log.warning(f"Enrichment failed (non-critical): {e}") + + # Mark COMPLETED + self._mark_completed(db, queue_id, report_id) + log.info(f"Task {queue_id} completed: report_id={report_id}, products={results['total_products']}") + + except Exception as e: + log.error(f"Task {queue_id} failed: {e}", exc_info=True) + try: + self._mark_failed(db, queue_id, str(e)[:500]) + except Exception: + pass + finally: + db.close() + + def _update_progress(self, db, queue_id: int, progress: int, message: str): + try: + db.execute(text(""" + UPDATE report_queue + SET progress = :progress, message = :message, updated_at = NOW() + WHERE id = :queue_id + """), {"progress": progress, "message": message, "queue_id": queue_id}) + db.commit() + except Exception as e: + db.rollback() + log.debug(f"Progress update failed for {queue_id}: {e}") + + def _mark_completed(self, db, queue_id: int, report_id: int): + try: + db.execute(text(""" + UPDATE report_queue + SET status = 'COMPLETED', + result_report_id = :report_id, + progress = 100, + message = 'Rapor tamamlandi', + completed_at = NOW(), + updated_at = NOW() + WHERE id = :queue_id + """), {"report_id": report_id, "queue_id": queue_id}) + db.commit() + except Exception as e: + db.rollback() + log.error(f"Failed to mark completed for {queue_id}: {e}") + + def _mark_failed(self, db, queue_id: int, error: str): + try: + db.execute(text(""" + UPDATE report_queue + SET status = 'FAILED', + error = :error, + completed_at = NOW(), + updated_at = NOW() + WHERE id = :queue_id + """), {"error": error[:1000], "queue_id": queue_id}) + db.commit() + except Exception as e: + db.rollback() + log.error(f"Failed to mark failed for {queue_id}: {e}") + + def _recover_stuck_tasks(self): + """Reset tasks that were PROCESSING for more than 15 minutes back to PENDING.""" + from database import SessionLocal + + db = SessionLocal() + try: + result = db.execute(text(""" + UPDATE report_queue + SET status = 'PENDING', worker_id = NULL, started_at = NULL, updated_at = NOW() + WHERE status = 'PROCESSING' + AND updated_at < NOW() - INTERVAL '15 minutes' + RETURNING id + """)) + db.commit() + recovered = result.fetchall() + if recovered: + ids = [r[0] for r in recovered] + log.warning(f"Recovered {len(ids)} stuck tasks: {ids}") + else: + log.info("No stuck tasks found on startup") + except Exception as e: + db.rollback() + log.error(f"Stuck task recovery failed: {e}") + finally: + db.close()