Files
trendyol-analiz/backend/queue_worker.py
furkanyigit34 706f957ba2 fix(scraper): pathModel -x-c{id} suffix + defensive int cast
Ne yaptık:
- queue_worker.py find_leaves: kategori URL'sine -x-c{id} suffix ekledik
  (ör: makyaj-cantasi -> makyaj-cantasi-x-c1110)
- scraper.py fetch_all_products: total degeri str gelirse int'e cast
  ediyoruz (roughTotal "0" string donuyordu, '/' operatoru patliyordu)

Neden yaptık:
- Trendyol Search API path formati degisti, artik suffix'siz cagri 0
  urun donduruyor (ornek test: makyaj-cantasi=0, makyaj-cantasi-x-c1110=15.712)
- Yan etki olarak 'total' int 0 falsy oldugu icin 'or' chain
  roughTotal'a dusuyor, bu da string olarak donuyor ve math.ceil(str/int)
  TypeError firlatiyordu. Tum 35 alt kategori bu yuzden patladi
  (rapor 62 "Makyaj Analizi" sifir urun).
- Fix iki seviyeli: asil cozum suffix (artik dogru data ceker), defansif
  int() cast gelecekte API'nin baska sekilde tutarsiz donmesine karsi
  guvenlik agi.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-25 16:38:13 +03:00

398 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Queue Worker — single-threaded background worker for report generation.
Polls PostgreSQL report_queue table, claims tasks with SELECT FOR UPDATE SKIP LOCKED,
runs scraping + enrichment inline, and marks tasks COMPLETED or FAILED.
"""
import os
import socket
import threading
import time
from datetime import datetime, timedelta
from typing import Optional
from sqlalchemy import text
from logging_config import get_logger
log = get_logger("queue_worker")
class QueueWorker:
def __init__(self, poll_interval: float = 2.0):
self.poll_interval = poll_interval
self._running = False
self._thread: Optional[threading.Thread] = None
self.worker_id = f"{socket.gethostname()}_{os.getpid()}"
def start(self):
self._running = True
self._thread = threading.Thread(
target=self._run_loop, daemon=True, name="queue-worker"
)
self._thread.start()
log.info(f"Queue worker started: {self.worker_id}")
def stop(self):
self._running = False
log.info(f"Queue worker stopping: {self.worker_id}")
def _run_loop(self):
# Wait a few seconds for app startup to complete
time.sleep(3)
self._recover_stuck_tasks()
while self._running:
try:
claimed = self._claim_next_task()
if claimed:
self._process_task(claimed)
else:
time.sleep(self.poll_interval)
except Exception as e:
log.error(f"Queue worker loop error: {e}", exc_info=True)
time.sleep(5)
def _claim_next_task(self) -> Optional[int]:
"""Claim the oldest PENDING task using SELECT FOR UPDATE SKIP LOCKED."""
from database import SessionLocal
db = SessionLocal()
try:
result = db.execute(text("""
UPDATE report_queue
SET status = 'PROCESSING',
worker_id = :worker_id,
started_at = NOW(),
updated_at = NOW()
WHERE id = (
SELECT id FROM report_queue
WHERE status = 'PENDING'
ORDER BY created_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
)
RETURNING id
"""), {"worker_id": self.worker_id})
db.commit()
row = result.fetchone()
if row:
queue_id = row[0]
log.info(f"Claimed task {queue_id}")
return queue_id
return None
except Exception as e:
db.rollback()
log.error(f"Failed to claim task: {e}")
return None
finally:
db.close()
def _process_task(self, queue_id: int):
"""Process a single queue task: scrape, save report, enrich."""
from database import SessionLocal, Report, ReportQueue
import json as json_module
db = SessionLocal()
try:
task = db.query(ReportQueue).filter(ReportQueue.id == queue_id).first()
if not task:
log.warning(f"Task {queue_id} not found")
return
report_name = task.report_name
category_id = task.category_id
log.info(f"Processing task {queue_id}: name={report_name}, category={category_id}")
self._update_progress(db, queue_id, 5, f"Kategori agaci yukleniyor...")
# Load category tree
from main import _load_category_tree
tree = _load_category_tree()
tree_by_id = {c["id"]: c for c in tree}
main_cat = tree_by_id.get(category_id)
if not main_cat:
self._mark_failed(db, queue_id, f"Category {category_id} not found")
return
# Find leaf categories
def find_leaves(parent_id):
children = [c for c in tree if c.get("parentId") == parent_id]
if not children:
cat = tree_by_id.get(parent_id)
if cat:
url_slug = (cat.get("url") or "").lstrip("/")
# Trendyol Search API now requires -x-c{id} suffix on pathModel
path_model = f"{url_slug}-x-c{cat['id']}" if url_slug else None
return [(path_model, cat["name"], cat["id"])]
return []
leaves = []
for child in children:
leaves.extend(find_leaves(child["id"]))
return leaves
categories_to_scrape = find_leaves(category_id)
if not categories_to_scrape:
self._mark_failed(db, queue_id, "No scrapable categories found")
return
main_cat_name = main_cat["name"]
self._update_progress(
db, queue_id, 8,
f"{main_cat_name}: {len(categories_to_scrape)} alt kategori bulundu"
)
# Import scrapers and rate limiter
from scraper import TrendyolSearchScraper, TrendyolScraper
from main import _trendyol_limiter, CATEGORIES_DIR, REPORTS_DIR
results = {
"successful": 0,
"failed": 0,
"total_products": 0,
"details": []
}
# Scrape each leaf category
for idx, (path_model, cat_name, cat_id) in enumerate(categories_to_scrape, 1):
progress = int((idx / len(categories_to_scrape)) * 80) + 10
self._update_progress(
db, queue_id, progress,
f"[{idx}/{len(categories_to_scrape)}] {cat_name} cekiliyor..."
)
try:
products = []
if path_model:
_trendyol_limiter.wait()
scraper = TrendyolSearchScraper(path_model)
products = scraper.fetch_all_products()
# Search API returned 0 — fallback to Top Rankings API
if not products and cat_id:
log.warning(f"Search API 0 ürün döndürdü ({path_model}), Top Rankings API ile fallback deneniyor...")
_trendyol_limiter.wait()
fallback_scraper = TrendyolScraper(cat_id, page_size=20)
products = fallback_scraper.fetch_all_products(delay=0.5, max_pages=5)
if products:
log.info(f"Fallback başarılı: {len(products)} ürün bulundu (cat_id={cat_id})")
# Enrich with socialProofs from Top Rankings API
if products and cat_id and not any(p.get("socialProofs") for p in products):
try:
_trendyol_limiter.wait()
top_scraper = TrendyolScraper(cat_id, page_size=20)
top_products = top_scraper.fetch_all_products(delay=0.5, max_pages=5)
if top_products:
social_map = {}
for tp in top_products:
tid = tp.get("id") or tp.get("contentId")
sp = tp.get("socialProofs", [])
if tid and sp:
social_map[int(tid)] = sp
if social_map:
for p in products:
pid = p.get("id")
if pid and int(pid) in social_map:
p["socialProofs"] = social_map[int(pid)]
log.info(f"Enriched {len(social_map)} products with socialProofs")
except Exception as e:
log.warning(f"Top Rankings enrichment failed: {e}")
elif cat_id:
_trendyol_limiter.wait()
scraper = TrendyolScraper(cat_id)
products = scraper.fetch_all_products()
if products:
os.makedirs(CATEGORIES_DIR, exist_ok=True)
file_id = cat_id if cat_id else path_model.replace("/", "_")
filename = f"{CATEGORIES_DIR}/{cat_name.replace(' ', '_')}_{file_id}.json"
data = {
"category_id": cat_id,
"path_model": path_model,
"category_name": cat_name,
"total_products": len(products),
"scraped_at": datetime.now().isoformat(),
"products": products
}
with open(filename, 'w', encoding='utf-8') as f:
json_module.dump(data, f, ensure_ascii=False, indent=2)
results["successful"] += 1
results["total_products"] += len(products)
results["details"].append({
"category_id": cat_id,
"path_model": path_model,
"category_name": cat_name,
"success": True,
"total_products": len(products),
"file_path": filename
})
else:
results["failed"] += 1
results["details"].append({
"category_id": cat_id,
"path_model": path_model,
"category_name": cat_name,
"success": False,
"total_products": 0,
"file_path": None
})
except Exception as e:
results["failed"] += 1
results["details"].append({
"category_id": cat_id,
"path_model": path_model,
"category_name": cat_name,
"success": False,
"total_products": 0,
"file_path": None
})
log.warning(f"Scrape failed for {cat_name}: {e}")
# Rate limit between categories
time.sleep(1.5)
# Generate combined report JSON
self._update_progress(db, queue_id, 88, "Rapor dosyasi olusturuluyor...")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
safe_name = report_name.lower().replace(" ", "_")
for old, new in [("ı", "i"), ("ş", "s"), ("ğ", "g"), ("ü", "u"), ("ö", "o"), ("ç", "c")]:
safe_name = safe_name.replace(old, new)
os.makedirs(REPORTS_DIR, exist_ok=True)
json_filename = f"{REPORTS_DIR}/{safe_name}_{timestamp}.json"
combined_data = {
"report_name": report_name,
"category": main_cat_name,
"created_at": datetime.now().isoformat(),
"total_subcategories": len(categories_to_scrape),
"total_products": results["total_products"],
"details": results["details"]
}
with open(json_filename, 'w', encoding='utf-8') as f:
json_module.dump(combined_data, f, ensure_ascii=False, indent=2)
# Save to DB
self._update_progress(db, queue_id, 93, "Veritabanina kaydediliyor...")
new_report = Report(
name=report_name,
category_id=category_id,
total_products=results["total_products"],
total_subcategories=len(categories_to_scrape),
json_file_path=json_filename,
html_file_path=None,
created_at=datetime.now()
)
try:
db.add(new_report)
db.commit()
db.refresh(new_report)
except Exception as db_err:
db.rollback()
log.warning(f"DB save failed (non-critical): {db_err}")
self._mark_failed(db, queue_id, f"DB save failed: {db_err}")
return
report_id = new_report.id
# Run enrichment INLINE (not daemon thread)
self._update_progress(db, queue_id, 95, "Sosyal kanit verileri toplanıyor...")
try:
from main import _enrich_report_task
_enrich_report_task(report_id)
log.info(f"Enrichment completed for report {report_id}")
except Exception as e:
log.warning(f"Enrichment failed (non-critical): {e}")
# Mark COMPLETED
self._mark_completed(db, queue_id, report_id)
log.info(f"Task {queue_id} completed: report_id={report_id}, products={results['total_products']}")
except Exception as e:
log.error(f"Task {queue_id} failed: {e}", exc_info=True)
try:
self._mark_failed(db, queue_id, str(e)[:500])
except Exception:
pass
finally:
db.close()
def _update_progress(self, db, queue_id: int, progress: int, message: str):
try:
db.execute(text("""
UPDATE report_queue
SET progress = :progress, message = :message, updated_at = NOW()
WHERE id = :queue_id
"""), {"progress": progress, "message": message, "queue_id": queue_id})
db.commit()
except Exception as e:
db.rollback()
log.debug(f"Progress update failed for {queue_id}: {e}")
def _mark_completed(self, db, queue_id: int, report_id: int):
try:
db.execute(text("""
UPDATE report_queue
SET status = 'COMPLETED',
result_report_id = :report_id,
progress = 100,
message = 'Rapor tamamlandi',
completed_at = NOW(),
updated_at = NOW()
WHERE id = :queue_id
"""), {"report_id": report_id, "queue_id": queue_id})
db.commit()
except Exception as e:
db.rollback()
log.error(f"Failed to mark completed for {queue_id}: {e}")
def _mark_failed(self, db, queue_id: int, error: str):
try:
db.execute(text("""
UPDATE report_queue
SET status = 'FAILED',
error = :error,
completed_at = NOW(),
updated_at = NOW()
WHERE id = :queue_id
"""), {"error": error[:1000], "queue_id": queue_id})
db.commit()
except Exception as e:
db.rollback()
log.error(f"Failed to mark failed for {queue_id}: {e}")
def _recover_stuck_tasks(self):
"""Reset tasks that were PROCESSING for more than 15 minutes back to PENDING."""
from database import SessionLocal
db = SessionLocal()
try:
result = db.execute(text("""
UPDATE report_queue
SET status = 'PENDING', worker_id = NULL, started_at = NULL, updated_at = NOW()
WHERE status = 'PROCESSING'
AND updated_at < NOW() - INTERVAL '15 minutes'
RETURNING id
"""))
db.commit()
recovered = result.fetchall()
if recovered:
ids = [r[0] for r in recovered]
log.warning(f"Recovered {len(ids)} stuck tasks: {ids}")
else:
log.info("No stuck tasks found on startup")
except Exception as e:
db.rollback()
log.error(f"Stuck task recovery failed: {e}")
finally:
db.close()