fix: move blocking file I/O to threads to prevent event loop stall

Ne yaptık:
- json.dump/json.load ve db.commit çağrılarını asyncio.to_thread() ile sardık
- 5 ayrı blocking I/O noktası: kategori scraping yazma, kategori okuma (sosyal kanıt için), rapor JSON yazma, DB kaydetme, sosyal kanıt JSON yazma
- _write_json, _read_json, _db_save helper fonksiyonları eklendi

Neden yaptık:
- Büyük raporlarda (15K+ ürün) sosyal kanıt tamamlandıktan sonra rapor kaydetme aşamasında senkron I/O event loop'u blokluyordu
- Backend health check fail oluyordu, container "unhealthy" durumuna düşüyordu
- "Tamamlandı" modalı frontend'e ulaşamıyordu çünkü SSE stream donuyordu

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
furkanyigit34
2026-03-27 18:01:40 +03:00
parent dc1f8fcfb2
commit 1baaa6fbce

View File

@@ -1469,6 +1469,20 @@ async def create_report(
# Generate unique task ID
task_id = str(uuid.uuid4())
# Helper functions for non-blocking I/O
def _write_json(path, data):
with open(path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def _read_json(path):
with open(path, 'r', encoding='utf-8') as f:
return json.load(f)
def _db_save(db, report):
db.add(report)
db.commit()
db.refresh(report)
# Stream progress with SSE
async def progress_stream():
"""Generator that yields real-time progress events"""
@@ -1528,8 +1542,7 @@ async def create_report(
"products": products
}
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
await asyncio.to_thread(_write_json, filename, data)
results["successful"] += 1
results["total_products"] += len(products)
@@ -1586,41 +1599,40 @@ async def create_report(
if detail["success"] and detail["file_path"]:
category_name = detail.get("category_name", "Bilinmeyen Kategori")
try:
with open(detail["file_path"], 'r', encoding='utf-8') as f:
cat_data = json.load(f)
products = cat_data.get("products", [])
# print(f"🔍 DEBUG: {detail['file_path']} dosyasından {len(products)} ürün bulundu")
for product in products:
product_id = product.get("id")
if product_id:
all_product_ids.append(int(product_id))
# Extract rating data
rating_score_obj = product.get("ratingScore", {})
rating = rating_score_obj.get("averageRating", 0) if isinstance(rating_score_obj, dict) else 0
rating_count = rating_score_obj.get("totalCount", 0) if isinstance(rating_score_obj, dict) else 0
cat_data = await asyncio.to_thread(_read_json, detail["file_path"])
products = cat_data.get("products", [])
# print(f"🔍 DEBUG: {detail['file_path']} dosyasından {len(products)} ürün bulundu")
for product in products:
product_id = product.get("id")
if product_id:
all_product_ids.append(int(product_id))
# Extract rating data
rating_score_obj = product.get("ratingScore", {})
rating = rating_score_obj.get("averageRating", 0) if isinstance(rating_score_obj, dict) else 0
rating_count = rating_score_obj.get("totalCount", 0) if isinstance(rating_score_obj, dict) else 0
# Extract barcode from first variant
barcode = ""
merchant_listings = product.get("merchantListings", [])
if merchant_listings and len(merchant_listings) > 0:
variants = merchant_listings[0].get("variants", [])
if variants and len(variants) > 0:
barcode = variants[0].get("barcode", "")
# Extract barcode from first variant
barcode = ""
merchant_listings = product.get("merchantListings", [])
if merchant_listings and len(merchant_listings) > 0:
variants = merchant_listings[0].get("variants", [])
if variants and len(variants) > 0:
barcode = variants[0].get("barcode", "")
# Store product info with category, brand, price, rating, and barcode
product_info_map[str(product_id)] = {
"name": product.get("name", ""),
"imageUrl": product.get("imageUrl", ""),
"url": product.get("url", ""),
"category": category_name,
"brand": product.get("brand", {}).get("name", "Bilinmeyen Marka"),
"price": product.get("price", {}).get("sellingPrice", 0),
"rating": round(rating, 2) if rating else 0,
"rating_count": rating_count,
"barcode": barcode,
"barcode_country": get_country_from_barcode(barcode), # Extract country from barcode prefix
"origin_country": "Bilinmeyen" # Not available in product data
}
# Store product info with category, brand, price, rating, and barcode
product_info_map[str(product_id)] = {
"name": product.get("name", ""),
"imageUrl": product.get("imageUrl", ""),
"url": product.get("url", ""),
"category": category_name,
"brand": product.get("brand", {}).get("name", "Bilinmeyen Marka"),
"price": product.get("price", {}).get("sellingPrice", 0),
"rating": round(rating, 2) if rating else 0,
"rating_count": rating_count,
"barcode": barcode,
"barcode_country": get_country_from_barcode(barcode), # Extract country from barcode prefix
"origin_country": "Bilinmeyen" # Not available in product data
}
except Exception as e:
pass
# print(f"⚠️ DEBUG: Dosya okuma hatası {detail['file_path']}: {str(e)}")
@@ -1710,8 +1722,7 @@ async def create_report(
"details": results["details"]
}
with open(json_filename, 'w', encoding='utf-8') as f:
json.dump(combined_data, f, ensure_ascii=False, indent=2)
await asyncio.to_thread(_write_json, json_filename, combined_data)
# Save to database
yield f"data: {json_module.dumps({'type': 'info', 'message': '💾 Veritabanına kaydediliyor...', 'progress': 95})}\n\n"
@@ -1727,9 +1738,7 @@ async def create_report(
created_at=datetime.now()
)
db.add(new_report)
db.commit()
db.refresh(new_report)
await asyncio.to_thread(_db_save, db, new_report)
# Save social proof data to persistent cache
# print(f"\n🔍 DEBUG: Sosyal kanıt kaydetme bölümü - social_proof_data uzunluğu: {len(social_proof_data)}")
@@ -1752,8 +1761,7 @@ async def create_report(
# print(f"✅ DEBUG: Sosyal kanıt dosyası kaydediliyor: {social_file}")
# print(f"🔍 DEBUG: Toplam metrikler: {social_output['total']}")
with open(social_file, 'w', encoding='utf-8') as f:
json.dump(social_output, f, ensure_ascii=False, indent=2)
await asyncio.to_thread(_write_json, social_file, social_output)
# print(f"✅ DEBUG: Sosyal kanıt dosyası başarıyla kaydedildi")
else:
pass