""" Queue Worker — single-threaded background worker for report generation. Polls PostgreSQL report_queue table, claims tasks with SELECT FOR UPDATE SKIP LOCKED, runs scraping + enrichment inline, and marks tasks COMPLETED or FAILED. """ import os import socket import threading import time from datetime import datetime, timedelta from typing import Optional from sqlalchemy import text from logging_config import get_logger log = get_logger("queue_worker") class QueueWorker: def __init__(self, poll_interval: float = 2.0): self.poll_interval = poll_interval self._running = False self._thread: Optional[threading.Thread] = None self.worker_id = f"{socket.gethostname()}_{os.getpid()}" def start(self): self._running = True self._thread = threading.Thread( target=self._run_loop, daemon=True, name="queue-worker" ) self._thread.start() log.info(f"Queue worker started: {self.worker_id}") def stop(self): self._running = False log.info(f"Queue worker stopping: {self.worker_id}") def _run_loop(self): # Wait a few seconds for app startup to complete time.sleep(3) self._recover_stuck_tasks() while self._running: try: claimed = self._claim_next_task() if claimed: self._process_task(claimed) else: time.sleep(self.poll_interval) except Exception as e: log.error(f"Queue worker loop error: {e}", exc_info=True) time.sleep(5) def _claim_next_task(self) -> Optional[int]: """Claim the oldest PENDING task using SELECT FOR UPDATE SKIP LOCKED.""" from database import SessionLocal db = SessionLocal() try: result = db.execute(text(""" UPDATE report_queue SET status = 'PROCESSING', worker_id = :worker_id, started_at = NOW(), updated_at = NOW() WHERE id = ( SELECT id FROM report_queue WHERE status = 'PENDING' ORDER BY created_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED ) RETURNING id """), {"worker_id": self.worker_id}) db.commit() row = result.fetchone() if row: queue_id = row[0] log.info(f"Claimed task {queue_id}") return queue_id return None except Exception as e: db.rollback() log.error(f"Failed to claim task: {e}") return None finally: db.close() def _process_task(self, queue_id: int): """Process a single queue task: scrape, save report, enrich.""" from database import SessionLocal, Report, ReportQueue import json as json_module db = SessionLocal() try: task = db.query(ReportQueue).filter(ReportQueue.id == queue_id).first() if not task: log.warning(f"Task {queue_id} not found") return report_name = task.report_name category_id = task.category_id log.info(f"Processing task {queue_id}: name={report_name}, category={category_id}") self._update_progress(db, queue_id, 5, f"Kategori agaci yukleniyor...") # Load category tree from main import _load_category_tree tree = _load_category_tree() tree_by_id = {c["id"]: c for c in tree} main_cat = tree_by_id.get(category_id) if not main_cat: self._mark_failed(db, queue_id, f"Category {category_id} not found") return # Find leaf categories def find_leaves(parent_id): children = [c for c in tree if c.get("parentId") == parent_id] if not children: cat = tree_by_id.get(parent_id) if cat: path_model = (cat.get("url") or "").lstrip("/") or None return [(path_model, cat["name"], cat["id"])] return [] leaves = [] for child in children: leaves.extend(find_leaves(child["id"])) return leaves categories_to_scrape = find_leaves(category_id) if not categories_to_scrape: self._mark_failed(db, queue_id, "No scrapable categories found") return main_cat_name = main_cat["name"] self._update_progress( db, queue_id, 8, f"{main_cat_name}: {len(categories_to_scrape)} alt kategori bulundu" ) # Import scrapers and rate limiter from scraper import TrendyolSearchScraper, TrendyolScraper from main import _trendyol_limiter, CATEGORIES_DIR, REPORTS_DIR results = { "successful": 0, "failed": 0, "total_products": 0, "details": [] } # Scrape each leaf category for idx, (path_model, cat_name, cat_id) in enumerate(categories_to_scrape, 1): progress = int((idx / len(categories_to_scrape)) * 80) + 10 self._update_progress( db, queue_id, progress, f"[{idx}/{len(categories_to_scrape)}] {cat_name} cekiliyor..." ) try: products = [] if path_model: _trendyol_limiter.wait() scraper = TrendyolSearchScraper(path_model) products = scraper.fetch_all_products() # Enrich with socialProofs from Top Rankings API if products and cat_id and not any(p.get("socialProofs") for p in products): try: _trendyol_limiter.wait() top_scraper = TrendyolScraper(cat_id, page_size=20) top_products = top_scraper.fetch_all_products(delay=0.5, max_pages=5) if top_products: social_map = {} for tp in top_products: tid = tp.get("id") or tp.get("contentId") sp = tp.get("socialProofs", []) if tid and sp: social_map[int(tid)] = sp if social_map: for p in products: pid = p.get("id") if pid and int(pid) in social_map: p["socialProofs"] = social_map[int(pid)] log.info(f"Enriched {len(social_map)} products with socialProofs") except Exception as e: log.warning(f"Top Rankings enrichment failed: {e}") elif cat_id: _trendyol_limiter.wait() scraper = TrendyolScraper(cat_id) products = scraper.fetch_all_products() if products: os.makedirs(CATEGORIES_DIR, exist_ok=True) file_id = cat_id if cat_id else path_model.replace("/", "_") filename = f"{CATEGORIES_DIR}/{cat_name.replace(' ', '_')}_{file_id}.json" data = { "category_id": cat_id, "path_model": path_model, "category_name": cat_name, "total_products": len(products), "scraped_at": datetime.now().isoformat(), "products": products } with open(filename, 'w', encoding='utf-8') as f: json_module.dump(data, f, ensure_ascii=False, indent=2) results["successful"] += 1 results["total_products"] += len(products) results["details"].append({ "category_id": cat_id, "path_model": path_model, "category_name": cat_name, "success": True, "total_products": len(products), "file_path": filename }) else: results["failed"] += 1 results["details"].append({ "category_id": cat_id, "path_model": path_model, "category_name": cat_name, "success": False, "total_products": 0, "file_path": None }) except Exception as e: results["failed"] += 1 results["details"].append({ "category_id": cat_id, "path_model": path_model, "category_name": cat_name, "success": False, "total_products": 0, "file_path": None }) log.warning(f"Scrape failed for {cat_name}: {e}") # Rate limit between categories time.sleep(1.5) # Generate combined report JSON self._update_progress(db, queue_id, 88, "Rapor dosyasi olusturuluyor...") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") safe_name = report_name.lower().replace(" ", "_") for old, new in [("ı", "i"), ("ş", "s"), ("ğ", "g"), ("ü", "u"), ("ö", "o"), ("ç", "c")]: safe_name = safe_name.replace(old, new) os.makedirs(REPORTS_DIR, exist_ok=True) json_filename = f"{REPORTS_DIR}/{safe_name}_{timestamp}.json" combined_data = { "report_name": report_name, "category": main_cat_name, "created_at": datetime.now().isoformat(), "total_subcategories": len(categories_to_scrape), "total_products": results["total_products"], "details": results["details"] } with open(json_filename, 'w', encoding='utf-8') as f: json_module.dump(combined_data, f, ensure_ascii=False, indent=2) # Save to DB self._update_progress(db, queue_id, 93, "Veritabanina kaydediliyor...") new_report = Report( name=report_name, category_id=category_id, total_products=results["total_products"], total_subcategories=len(categories_to_scrape), json_file_path=json_filename, html_file_path=None, created_at=datetime.now() ) try: db.add(new_report) db.commit() db.refresh(new_report) except Exception as db_err: db.rollback() log.warning(f"DB save failed (non-critical): {db_err}") self._mark_failed(db, queue_id, f"DB save failed: {db_err}") return report_id = new_report.id # Run enrichment INLINE (not daemon thread) self._update_progress(db, queue_id, 95, "Sosyal kanit verileri toplanıyor...") try: from main import _enrich_report_task _enrich_report_task(report_id) log.info(f"Enrichment completed for report {report_id}") except Exception as e: log.warning(f"Enrichment failed (non-critical): {e}") # Mark COMPLETED self._mark_completed(db, queue_id, report_id) log.info(f"Task {queue_id} completed: report_id={report_id}, products={results['total_products']}") except Exception as e: log.error(f"Task {queue_id} failed: {e}", exc_info=True) try: self._mark_failed(db, queue_id, str(e)[:500]) except Exception: pass finally: db.close() def _update_progress(self, db, queue_id: int, progress: int, message: str): try: db.execute(text(""" UPDATE report_queue SET progress = :progress, message = :message, updated_at = NOW() WHERE id = :queue_id """), {"progress": progress, "message": message, "queue_id": queue_id}) db.commit() except Exception as e: db.rollback() log.debug(f"Progress update failed for {queue_id}: {e}") def _mark_completed(self, db, queue_id: int, report_id: int): try: db.execute(text(""" UPDATE report_queue SET status = 'COMPLETED', result_report_id = :report_id, progress = 100, message = 'Rapor tamamlandi', completed_at = NOW(), updated_at = NOW() WHERE id = :queue_id """), {"report_id": report_id, "queue_id": queue_id}) db.commit() except Exception as e: db.rollback() log.error(f"Failed to mark completed for {queue_id}: {e}") def _mark_failed(self, db, queue_id: int, error: str): try: db.execute(text(""" UPDATE report_queue SET status = 'FAILED', error = :error, completed_at = NOW(), updated_at = NOW() WHERE id = :queue_id """), {"error": error[:1000], "queue_id": queue_id}) db.commit() except Exception as e: db.rollback() log.error(f"Failed to mark failed for {queue_id}: {e}") def _recover_stuck_tasks(self): """Reset tasks that were PROCESSING for more than 15 minutes back to PENDING.""" from database import SessionLocal db = SessionLocal() try: result = db.execute(text(""" UPDATE report_queue SET status = 'PENDING', worker_id = NULL, started_at = NULL, updated_at = NOW() WHERE status = 'PROCESSING' AND updated_at < NOW() - INTERVAL '15 minutes' RETURNING id """)) db.commit() recovered = result.fetchall() if recovered: ids = [r[0] for r in recovered] log.warning(f"Recovered {len(ids)} stuck tasks: {ids}") else: log.info("No stuck tasks found on startup") except Exception as e: db.rollback() log.error(f"Stuck task recovery failed: {e}") finally: db.close()