Files
trendyol-analiz/backend/queue_worker.py
furkanyigit34 942c8d1244 fix: scraper Search API fallback + logging + auth
Ne yaptık:
- queue_worker.py: TrendyolSearchScraper 0 ürün döndürdüğünde TrendyolScraper
  (Top Rankings API) ile fallback yap — abiye gibi kategoriler için kritik
- logging_config.py: varsayılan log dizinini /tmp/logs olarak değiştir,
  container restart'ta /logs permission hatası düzeldi
- main.py: API_KEY env var yoksa auth'u gerçekten atla (uyarıyla uyumlu hale getir)

Neden yaptık:
- TrendyolSearchScraper pathModel ile bazı kategoriler (abiye-elbise gibi)
  0 ürün döndürüyor; eski Top Rankings API categoryId ile çalışıyor
- /logs dizini container restart'ta izin hatası veriyordu
- API_KEY yoksa tüm istekler 401 dönüyordu (yorum ile çelişki)
2026-04-15 01:53:45 +03:00

396 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Queue Worker — single-threaded background worker for report generation.
Polls PostgreSQL report_queue table, claims tasks with SELECT FOR UPDATE SKIP LOCKED,
runs scraping + enrichment inline, and marks tasks COMPLETED or FAILED.
"""
import os
import socket
import threading
import time
from datetime import datetime, timedelta
from typing import Optional
from sqlalchemy import text
from logging_config import get_logger
log = get_logger("queue_worker")
class QueueWorker:
def __init__(self, poll_interval: float = 2.0):
self.poll_interval = poll_interval
self._running = False
self._thread: Optional[threading.Thread] = None
self.worker_id = f"{socket.gethostname()}_{os.getpid()}"
def start(self):
self._running = True
self._thread = threading.Thread(
target=self._run_loop, daemon=True, name="queue-worker"
)
self._thread.start()
log.info(f"Queue worker started: {self.worker_id}")
def stop(self):
self._running = False
log.info(f"Queue worker stopping: {self.worker_id}")
def _run_loop(self):
# Wait a few seconds for app startup to complete
time.sleep(3)
self._recover_stuck_tasks()
while self._running:
try:
claimed = self._claim_next_task()
if claimed:
self._process_task(claimed)
else:
time.sleep(self.poll_interval)
except Exception as e:
log.error(f"Queue worker loop error: {e}", exc_info=True)
time.sleep(5)
def _claim_next_task(self) -> Optional[int]:
"""Claim the oldest PENDING task using SELECT FOR UPDATE SKIP LOCKED."""
from database import SessionLocal
db = SessionLocal()
try:
result = db.execute(text("""
UPDATE report_queue
SET status = 'PROCESSING',
worker_id = :worker_id,
started_at = NOW(),
updated_at = NOW()
WHERE id = (
SELECT id FROM report_queue
WHERE status = 'PENDING'
ORDER BY created_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
)
RETURNING id
"""), {"worker_id": self.worker_id})
db.commit()
row = result.fetchone()
if row:
queue_id = row[0]
log.info(f"Claimed task {queue_id}")
return queue_id
return None
except Exception as e:
db.rollback()
log.error(f"Failed to claim task: {e}")
return None
finally:
db.close()
def _process_task(self, queue_id: int):
"""Process a single queue task: scrape, save report, enrich."""
from database import SessionLocal, Report, ReportQueue
import json as json_module
db = SessionLocal()
try:
task = db.query(ReportQueue).filter(ReportQueue.id == queue_id).first()
if not task:
log.warning(f"Task {queue_id} not found")
return
report_name = task.report_name
category_id = task.category_id
log.info(f"Processing task {queue_id}: name={report_name}, category={category_id}")
self._update_progress(db, queue_id, 5, f"Kategori agaci yukleniyor...")
# Load category tree
from main import _load_category_tree
tree = _load_category_tree()
tree_by_id = {c["id"]: c for c in tree}
main_cat = tree_by_id.get(category_id)
if not main_cat:
self._mark_failed(db, queue_id, f"Category {category_id} not found")
return
# Find leaf categories
def find_leaves(parent_id):
children = [c for c in tree if c.get("parentId") == parent_id]
if not children:
cat = tree_by_id.get(parent_id)
if cat:
path_model = (cat.get("url") or "").lstrip("/") or None
return [(path_model, cat["name"], cat["id"])]
return []
leaves = []
for child in children:
leaves.extend(find_leaves(child["id"]))
return leaves
categories_to_scrape = find_leaves(category_id)
if not categories_to_scrape:
self._mark_failed(db, queue_id, "No scrapable categories found")
return
main_cat_name = main_cat["name"]
self._update_progress(
db, queue_id, 8,
f"{main_cat_name}: {len(categories_to_scrape)} alt kategori bulundu"
)
# Import scrapers and rate limiter
from scraper import TrendyolSearchScraper, TrendyolScraper
from main import _trendyol_limiter, CATEGORIES_DIR, REPORTS_DIR
results = {
"successful": 0,
"failed": 0,
"total_products": 0,
"details": []
}
# Scrape each leaf category
for idx, (path_model, cat_name, cat_id) in enumerate(categories_to_scrape, 1):
progress = int((idx / len(categories_to_scrape)) * 80) + 10
self._update_progress(
db, queue_id, progress,
f"[{idx}/{len(categories_to_scrape)}] {cat_name} cekiliyor..."
)
try:
products = []
if path_model:
_trendyol_limiter.wait()
scraper = TrendyolSearchScraper(path_model)
products = scraper.fetch_all_products()
# Search API returned 0 — fallback to Top Rankings API
if not products and cat_id:
log.warning(f"Search API 0 ürün döndürdü ({path_model}), Top Rankings API ile fallback deneniyor...")
_trendyol_limiter.wait()
fallback_scraper = TrendyolScraper(cat_id, page_size=20)
products = fallback_scraper.fetch_all_products(delay=0.5, max_pages=5)
if products:
log.info(f"Fallback başarılı: {len(products)} ürün bulundu (cat_id={cat_id})")
# Enrich with socialProofs from Top Rankings API
if products and cat_id and not any(p.get("socialProofs") for p in products):
try:
_trendyol_limiter.wait()
top_scraper = TrendyolScraper(cat_id, page_size=20)
top_products = top_scraper.fetch_all_products(delay=0.5, max_pages=5)
if top_products:
social_map = {}
for tp in top_products:
tid = tp.get("id") or tp.get("contentId")
sp = tp.get("socialProofs", [])
if tid and sp:
social_map[int(tid)] = sp
if social_map:
for p in products:
pid = p.get("id")
if pid and int(pid) in social_map:
p["socialProofs"] = social_map[int(pid)]
log.info(f"Enriched {len(social_map)} products with socialProofs")
except Exception as e:
log.warning(f"Top Rankings enrichment failed: {e}")
elif cat_id:
_trendyol_limiter.wait()
scraper = TrendyolScraper(cat_id)
products = scraper.fetch_all_products()
if products:
os.makedirs(CATEGORIES_DIR, exist_ok=True)
file_id = cat_id if cat_id else path_model.replace("/", "_")
filename = f"{CATEGORIES_DIR}/{cat_name.replace(' ', '_')}_{file_id}.json"
data = {
"category_id": cat_id,
"path_model": path_model,
"category_name": cat_name,
"total_products": len(products),
"scraped_at": datetime.now().isoformat(),
"products": products
}
with open(filename, 'w', encoding='utf-8') as f:
json_module.dump(data, f, ensure_ascii=False, indent=2)
results["successful"] += 1
results["total_products"] += len(products)
results["details"].append({
"category_id": cat_id,
"path_model": path_model,
"category_name": cat_name,
"success": True,
"total_products": len(products),
"file_path": filename
})
else:
results["failed"] += 1
results["details"].append({
"category_id": cat_id,
"path_model": path_model,
"category_name": cat_name,
"success": False,
"total_products": 0,
"file_path": None
})
except Exception as e:
results["failed"] += 1
results["details"].append({
"category_id": cat_id,
"path_model": path_model,
"category_name": cat_name,
"success": False,
"total_products": 0,
"file_path": None
})
log.warning(f"Scrape failed for {cat_name}: {e}")
# Rate limit between categories
time.sleep(1.5)
# Generate combined report JSON
self._update_progress(db, queue_id, 88, "Rapor dosyasi olusturuluyor...")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
safe_name = report_name.lower().replace(" ", "_")
for old, new in [("ı", "i"), ("ş", "s"), ("ğ", "g"), ("ü", "u"), ("ö", "o"), ("ç", "c")]:
safe_name = safe_name.replace(old, new)
os.makedirs(REPORTS_DIR, exist_ok=True)
json_filename = f"{REPORTS_DIR}/{safe_name}_{timestamp}.json"
combined_data = {
"report_name": report_name,
"category": main_cat_name,
"created_at": datetime.now().isoformat(),
"total_subcategories": len(categories_to_scrape),
"total_products": results["total_products"],
"details": results["details"]
}
with open(json_filename, 'w', encoding='utf-8') as f:
json_module.dump(combined_data, f, ensure_ascii=False, indent=2)
# Save to DB
self._update_progress(db, queue_id, 93, "Veritabanina kaydediliyor...")
new_report = Report(
name=report_name,
category_id=category_id,
total_products=results["total_products"],
total_subcategories=len(categories_to_scrape),
json_file_path=json_filename,
html_file_path=None,
created_at=datetime.now()
)
try:
db.add(new_report)
db.commit()
db.refresh(new_report)
except Exception as db_err:
db.rollback()
log.warning(f"DB save failed (non-critical): {db_err}")
self._mark_failed(db, queue_id, f"DB save failed: {db_err}")
return
report_id = new_report.id
# Run enrichment INLINE (not daemon thread)
self._update_progress(db, queue_id, 95, "Sosyal kanit verileri toplanıyor...")
try:
from main import _enrich_report_task
_enrich_report_task(report_id)
log.info(f"Enrichment completed for report {report_id}")
except Exception as e:
log.warning(f"Enrichment failed (non-critical): {e}")
# Mark COMPLETED
self._mark_completed(db, queue_id, report_id)
log.info(f"Task {queue_id} completed: report_id={report_id}, products={results['total_products']}")
except Exception as e:
log.error(f"Task {queue_id} failed: {e}", exc_info=True)
try:
self._mark_failed(db, queue_id, str(e)[:500])
except Exception:
pass
finally:
db.close()
def _update_progress(self, db, queue_id: int, progress: int, message: str):
try:
db.execute(text("""
UPDATE report_queue
SET progress = :progress, message = :message, updated_at = NOW()
WHERE id = :queue_id
"""), {"progress": progress, "message": message, "queue_id": queue_id})
db.commit()
except Exception as e:
db.rollback()
log.debug(f"Progress update failed for {queue_id}: {e}")
def _mark_completed(self, db, queue_id: int, report_id: int):
try:
db.execute(text("""
UPDATE report_queue
SET status = 'COMPLETED',
result_report_id = :report_id,
progress = 100,
message = 'Rapor tamamlandi',
completed_at = NOW(),
updated_at = NOW()
WHERE id = :queue_id
"""), {"report_id": report_id, "queue_id": queue_id})
db.commit()
except Exception as e:
db.rollback()
log.error(f"Failed to mark completed for {queue_id}: {e}")
def _mark_failed(self, db, queue_id: int, error: str):
try:
db.execute(text("""
UPDATE report_queue
SET status = 'FAILED',
error = :error,
completed_at = NOW(),
updated_at = NOW()
WHERE id = :queue_id
"""), {"error": error[:1000], "queue_id": queue_id})
db.commit()
except Exception as e:
db.rollback()
log.error(f"Failed to mark failed for {queue_id}: {e}")
def _recover_stuck_tasks(self):
"""Reset tasks that were PROCESSING for more than 15 minutes back to PENDING."""
from database import SessionLocal
db = SessionLocal()
try:
result = db.execute(text("""
UPDATE report_queue
SET status = 'PENDING', worker_id = NULL, started_at = NULL, updated_at = NOW()
WHERE status = 'PROCESSING'
AND updated_at < NOW() - INTERVAL '15 minutes'
RETURNING id
"""))
db.commit()
recovered = result.fetchall()
if recovered:
ids = [r[0] for r in recovered]
log.warning(f"Recovered {len(ids)} stuck tasks: {ids}")
else:
log.info("No stuck tasks found on startup")
except Exception as e:
db.rollback()
log.error(f"Stuck task recovery failed: {e}")
finally:
db.close()