mirror of
https://github.com/nethunterzist/trendyol-analiz
synced 2026-07-01 17:37:04 +00:00
Ne yaptık:
- queue_worker.py find_leaves: kategori URL'sine -x-c{id} suffix ekledik
(ör: makyaj-cantasi -> makyaj-cantasi-x-c1110)
- scraper.py fetch_all_products: total degeri str gelirse int'e cast
ediyoruz (roughTotal "0" string donuyordu, '/' operatoru patliyordu)
Neden yaptık:
- Trendyol Search API path formati degisti, artik suffix'siz cagri 0
urun donduruyor (ornek test: makyaj-cantasi=0, makyaj-cantasi-x-c1110=15.712)
- Yan etki olarak 'total' int 0 falsy oldugu icin 'or' chain
roughTotal'a dusuyor, bu da string olarak donuyor ve math.ceil(str/int)
TypeError firlatiyordu. Tum 35 alt kategori bu yuzden patladi
(rapor 62 "Makyaj Analizi" sifir urun).
- Fix iki seviyeli: asil cozum suffix (artik dogru data ceker), defansif
int() cast gelecekte API'nin baska sekilde tutarsiz donmesine karsi
guvenlik agi.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
398 lines
16 KiB
Python
398 lines
16 KiB
Python
"""
|
||
Queue Worker — single-threaded background worker for report generation.
|
||
|
||
Polls PostgreSQL report_queue table, claims tasks with SELECT FOR UPDATE SKIP LOCKED,
|
||
runs scraping + enrichment inline, and marks tasks COMPLETED or FAILED.
|
||
"""
|
||
import os
|
||
import socket
|
||
import threading
|
||
import time
|
||
from datetime import datetime, timedelta
|
||
from typing import Optional
|
||
|
||
from sqlalchemy import text
|
||
from logging_config import get_logger
|
||
|
||
log = get_logger("queue_worker")
|
||
|
||
|
||
class QueueWorker:
|
||
def __init__(self, poll_interval: float = 2.0):
|
||
self.poll_interval = poll_interval
|
||
self._running = False
|
||
self._thread: Optional[threading.Thread] = None
|
||
self.worker_id = f"{socket.gethostname()}_{os.getpid()}"
|
||
|
||
def start(self):
|
||
self._running = True
|
||
self._thread = threading.Thread(
|
||
target=self._run_loop, daemon=True, name="queue-worker"
|
||
)
|
||
self._thread.start()
|
||
log.info(f"Queue worker started: {self.worker_id}")
|
||
|
||
def stop(self):
|
||
self._running = False
|
||
log.info(f"Queue worker stopping: {self.worker_id}")
|
||
|
||
def _run_loop(self):
|
||
# Wait a few seconds for app startup to complete
|
||
time.sleep(3)
|
||
self._recover_stuck_tasks()
|
||
|
||
while self._running:
|
||
try:
|
||
claimed = self._claim_next_task()
|
||
if claimed:
|
||
self._process_task(claimed)
|
||
else:
|
||
time.sleep(self.poll_interval)
|
||
except Exception as e:
|
||
log.error(f"Queue worker loop error: {e}", exc_info=True)
|
||
time.sleep(5)
|
||
|
||
def _claim_next_task(self) -> Optional[int]:
|
||
"""Claim the oldest PENDING task using SELECT FOR UPDATE SKIP LOCKED."""
|
||
from database import SessionLocal
|
||
|
||
db = SessionLocal()
|
||
try:
|
||
result = db.execute(text("""
|
||
UPDATE report_queue
|
||
SET status = 'PROCESSING',
|
||
worker_id = :worker_id,
|
||
started_at = NOW(),
|
||
updated_at = NOW()
|
||
WHERE id = (
|
||
SELECT id FROM report_queue
|
||
WHERE status = 'PENDING'
|
||
ORDER BY created_at ASC
|
||
LIMIT 1
|
||
FOR UPDATE SKIP LOCKED
|
||
)
|
||
RETURNING id
|
||
"""), {"worker_id": self.worker_id})
|
||
db.commit()
|
||
|
||
row = result.fetchone()
|
||
if row:
|
||
queue_id = row[0]
|
||
log.info(f"Claimed task {queue_id}")
|
||
return queue_id
|
||
return None
|
||
except Exception as e:
|
||
db.rollback()
|
||
log.error(f"Failed to claim task: {e}")
|
||
return None
|
||
finally:
|
||
db.close()
|
||
|
||
def _process_task(self, queue_id: int):
|
||
"""Process a single queue task: scrape, save report, enrich."""
|
||
from database import SessionLocal, Report, ReportQueue
|
||
import json as json_module
|
||
|
||
db = SessionLocal()
|
||
try:
|
||
task = db.query(ReportQueue).filter(ReportQueue.id == queue_id).first()
|
||
if not task:
|
||
log.warning(f"Task {queue_id} not found")
|
||
return
|
||
|
||
report_name = task.report_name
|
||
category_id = task.category_id
|
||
log.info(f"Processing task {queue_id}: name={report_name}, category={category_id}")
|
||
|
||
self._update_progress(db, queue_id, 5, f"Kategori agaci yukleniyor...")
|
||
|
||
# Load category tree
|
||
from main import _load_category_tree
|
||
tree = _load_category_tree()
|
||
tree_by_id = {c["id"]: c for c in tree}
|
||
|
||
main_cat = tree_by_id.get(category_id)
|
||
if not main_cat:
|
||
self._mark_failed(db, queue_id, f"Category {category_id} not found")
|
||
return
|
||
|
||
# Find leaf categories
|
||
def find_leaves(parent_id):
|
||
children = [c for c in tree if c.get("parentId") == parent_id]
|
||
if not children:
|
||
cat = tree_by_id.get(parent_id)
|
||
if cat:
|
||
url_slug = (cat.get("url") or "").lstrip("/")
|
||
# Trendyol Search API now requires -x-c{id} suffix on pathModel
|
||
path_model = f"{url_slug}-x-c{cat['id']}" if url_slug else None
|
||
return [(path_model, cat["name"], cat["id"])]
|
||
return []
|
||
leaves = []
|
||
for child in children:
|
||
leaves.extend(find_leaves(child["id"]))
|
||
return leaves
|
||
|
||
categories_to_scrape = find_leaves(category_id)
|
||
if not categories_to_scrape:
|
||
self._mark_failed(db, queue_id, "No scrapable categories found")
|
||
return
|
||
|
||
main_cat_name = main_cat["name"]
|
||
self._update_progress(
|
||
db, queue_id, 8,
|
||
f"{main_cat_name}: {len(categories_to_scrape)} alt kategori bulundu"
|
||
)
|
||
|
||
# Import scrapers and rate limiter
|
||
from scraper import TrendyolSearchScraper, TrendyolScraper
|
||
from main import _trendyol_limiter, CATEGORIES_DIR, REPORTS_DIR
|
||
|
||
results = {
|
||
"successful": 0,
|
||
"failed": 0,
|
||
"total_products": 0,
|
||
"details": []
|
||
}
|
||
|
||
# Scrape each leaf category
|
||
for idx, (path_model, cat_name, cat_id) in enumerate(categories_to_scrape, 1):
|
||
progress = int((idx / len(categories_to_scrape)) * 80) + 10
|
||
|
||
self._update_progress(
|
||
db, queue_id, progress,
|
||
f"[{idx}/{len(categories_to_scrape)}] {cat_name} cekiliyor..."
|
||
)
|
||
|
||
try:
|
||
products = []
|
||
if path_model:
|
||
_trendyol_limiter.wait()
|
||
scraper = TrendyolSearchScraper(path_model)
|
||
products = scraper.fetch_all_products()
|
||
|
||
# Search API returned 0 — fallback to Top Rankings API
|
||
if not products and cat_id:
|
||
log.warning(f"Search API 0 ürün döndürdü ({path_model}), Top Rankings API ile fallback deneniyor...")
|
||
_trendyol_limiter.wait()
|
||
fallback_scraper = TrendyolScraper(cat_id, page_size=20)
|
||
products = fallback_scraper.fetch_all_products(delay=0.5, max_pages=5)
|
||
if products:
|
||
log.info(f"Fallback başarılı: {len(products)} ürün bulundu (cat_id={cat_id})")
|
||
|
||
# Enrich with socialProofs from Top Rankings API
|
||
if products and cat_id and not any(p.get("socialProofs") for p in products):
|
||
try:
|
||
_trendyol_limiter.wait()
|
||
top_scraper = TrendyolScraper(cat_id, page_size=20)
|
||
top_products = top_scraper.fetch_all_products(delay=0.5, max_pages=5)
|
||
if top_products:
|
||
social_map = {}
|
||
for tp in top_products:
|
||
tid = tp.get("id") or tp.get("contentId")
|
||
sp = tp.get("socialProofs", [])
|
||
if tid and sp:
|
||
social_map[int(tid)] = sp
|
||
if social_map:
|
||
for p in products:
|
||
pid = p.get("id")
|
||
if pid and int(pid) in social_map:
|
||
p["socialProofs"] = social_map[int(pid)]
|
||
log.info(f"Enriched {len(social_map)} products with socialProofs")
|
||
except Exception as e:
|
||
log.warning(f"Top Rankings enrichment failed: {e}")
|
||
|
||
elif cat_id:
|
||
_trendyol_limiter.wait()
|
||
scraper = TrendyolScraper(cat_id)
|
||
products = scraper.fetch_all_products()
|
||
|
||
if products:
|
||
os.makedirs(CATEGORIES_DIR, exist_ok=True)
|
||
file_id = cat_id if cat_id else path_model.replace("/", "_")
|
||
filename = f"{CATEGORIES_DIR}/{cat_name.replace(' ', '_')}_{file_id}.json"
|
||
|
||
data = {
|
||
"category_id": cat_id,
|
||
"path_model": path_model,
|
||
"category_name": cat_name,
|
||
"total_products": len(products),
|
||
"scraped_at": datetime.now().isoformat(),
|
||
"products": products
|
||
}
|
||
with open(filename, 'w', encoding='utf-8') as f:
|
||
json_module.dump(data, f, ensure_ascii=False, indent=2)
|
||
|
||
results["successful"] += 1
|
||
results["total_products"] += len(products)
|
||
results["details"].append({
|
||
"category_id": cat_id,
|
||
"path_model": path_model,
|
||
"category_name": cat_name,
|
||
"success": True,
|
||
"total_products": len(products),
|
||
"file_path": filename
|
||
})
|
||
else:
|
||
results["failed"] += 1
|
||
results["details"].append({
|
||
"category_id": cat_id,
|
||
"path_model": path_model,
|
||
"category_name": cat_name,
|
||
"success": False,
|
||
"total_products": 0,
|
||
"file_path": None
|
||
})
|
||
|
||
except Exception as e:
|
||
results["failed"] += 1
|
||
results["details"].append({
|
||
"category_id": cat_id,
|
||
"path_model": path_model,
|
||
"category_name": cat_name,
|
||
"success": False,
|
||
"total_products": 0,
|
||
"file_path": None
|
||
})
|
||
log.warning(f"Scrape failed for {cat_name}: {e}")
|
||
|
||
# Rate limit between categories
|
||
time.sleep(1.5)
|
||
|
||
# Generate combined report JSON
|
||
self._update_progress(db, queue_id, 88, "Rapor dosyasi olusturuluyor...")
|
||
|
||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
safe_name = report_name.lower().replace(" ", "_")
|
||
for old, new in [("ı", "i"), ("ş", "s"), ("ğ", "g"), ("ü", "u"), ("ö", "o"), ("ç", "c")]:
|
||
safe_name = safe_name.replace(old, new)
|
||
|
||
os.makedirs(REPORTS_DIR, exist_ok=True)
|
||
json_filename = f"{REPORTS_DIR}/{safe_name}_{timestamp}.json"
|
||
|
||
combined_data = {
|
||
"report_name": report_name,
|
||
"category": main_cat_name,
|
||
"created_at": datetime.now().isoformat(),
|
||
"total_subcategories": len(categories_to_scrape),
|
||
"total_products": results["total_products"],
|
||
"details": results["details"]
|
||
}
|
||
with open(json_filename, 'w', encoding='utf-8') as f:
|
||
json_module.dump(combined_data, f, ensure_ascii=False, indent=2)
|
||
|
||
# Save to DB
|
||
self._update_progress(db, queue_id, 93, "Veritabanina kaydediliyor...")
|
||
|
||
new_report = Report(
|
||
name=report_name,
|
||
category_id=category_id,
|
||
total_products=results["total_products"],
|
||
total_subcategories=len(categories_to_scrape),
|
||
json_file_path=json_filename,
|
||
html_file_path=None,
|
||
created_at=datetime.now()
|
||
)
|
||
try:
|
||
db.add(new_report)
|
||
db.commit()
|
||
db.refresh(new_report)
|
||
except Exception as db_err:
|
||
db.rollback()
|
||
log.warning(f"DB save failed (non-critical): {db_err}")
|
||
self._mark_failed(db, queue_id, f"DB save failed: {db_err}")
|
||
return
|
||
|
||
report_id = new_report.id
|
||
|
||
# Run enrichment INLINE (not daemon thread)
|
||
self._update_progress(db, queue_id, 95, "Sosyal kanit verileri toplanıyor...")
|
||
try:
|
||
from main import _enrich_report_task
|
||
_enrich_report_task(report_id)
|
||
log.info(f"Enrichment completed for report {report_id}")
|
||
except Exception as e:
|
||
log.warning(f"Enrichment failed (non-critical): {e}")
|
||
|
||
# Mark COMPLETED
|
||
self._mark_completed(db, queue_id, report_id)
|
||
log.info(f"Task {queue_id} completed: report_id={report_id}, products={results['total_products']}")
|
||
|
||
except Exception as e:
|
||
log.error(f"Task {queue_id} failed: {e}", exc_info=True)
|
||
try:
|
||
self._mark_failed(db, queue_id, str(e)[:500])
|
||
except Exception:
|
||
pass
|
||
finally:
|
||
db.close()
|
||
|
||
def _update_progress(self, db, queue_id: int, progress: int, message: str):
|
||
try:
|
||
db.execute(text("""
|
||
UPDATE report_queue
|
||
SET progress = :progress, message = :message, updated_at = NOW()
|
||
WHERE id = :queue_id
|
||
"""), {"progress": progress, "message": message, "queue_id": queue_id})
|
||
db.commit()
|
||
except Exception as e:
|
||
db.rollback()
|
||
log.debug(f"Progress update failed for {queue_id}: {e}")
|
||
|
||
def _mark_completed(self, db, queue_id: int, report_id: int):
|
||
try:
|
||
db.execute(text("""
|
||
UPDATE report_queue
|
||
SET status = 'COMPLETED',
|
||
result_report_id = :report_id,
|
||
progress = 100,
|
||
message = 'Rapor tamamlandi',
|
||
completed_at = NOW(),
|
||
updated_at = NOW()
|
||
WHERE id = :queue_id
|
||
"""), {"report_id": report_id, "queue_id": queue_id})
|
||
db.commit()
|
||
except Exception as e:
|
||
db.rollback()
|
||
log.error(f"Failed to mark completed for {queue_id}: {e}")
|
||
|
||
def _mark_failed(self, db, queue_id: int, error: str):
|
||
try:
|
||
db.execute(text("""
|
||
UPDATE report_queue
|
||
SET status = 'FAILED',
|
||
error = :error,
|
||
completed_at = NOW(),
|
||
updated_at = NOW()
|
||
WHERE id = :queue_id
|
||
"""), {"error": error[:1000], "queue_id": queue_id})
|
||
db.commit()
|
||
except Exception as e:
|
||
db.rollback()
|
||
log.error(f"Failed to mark failed for {queue_id}: {e}")
|
||
|
||
def _recover_stuck_tasks(self):
|
||
"""Reset tasks that were PROCESSING for more than 15 minutes back to PENDING."""
|
||
from database import SessionLocal
|
||
|
||
db = SessionLocal()
|
||
try:
|
||
result = db.execute(text("""
|
||
UPDATE report_queue
|
||
SET status = 'PENDING', worker_id = NULL, started_at = NULL, updated_at = NOW()
|
||
WHERE status = 'PROCESSING'
|
||
AND updated_at < NOW() - INTERVAL '15 minutes'
|
||
RETURNING id
|
||
"""))
|
||
db.commit()
|
||
recovered = result.fetchall()
|
||
if recovered:
|
||
ids = [r[0] for r in recovered]
|
||
log.warning(f"Recovered {len(ids)} stuck tasks: {ids}")
|
||
else:
|
||
log.info("No stuck tasks found on startup")
|
||
except Exception as e:
|
||
db.rollback()
|
||
log.error(f"Stuck task recovery failed: {e}")
|
||
finally:
|
||
db.close()
|