mirror of
https://github.com/nethunterzist/trendyol-analiz
synced 2026-07-01 01:17:04 +00:00
Ne yaptik:
- ReportQueue modeli + Alembic migration (report_queue tablosu)
- QueueWorker: SELECT FOR UPDATE SKIP LOCKED ile tek worker polling
- 3 yeni endpoint: POST /api/queue/submit, GET /api/queue/{id}/status, GET /api/queue/active
- Startup hook ile worker otomatik basliyor, shutdown'da duruyor
- Stuck task recovery: 15 dk'dan eski PROCESSING tasklar PENDING'e donuyor
Neden yaptik:
- Esanli SSE rapor isteklerinde IP ban riski ve veri kaybi vardi
- Tek worker ile sirayla isleniyor, rate limit korunuyor
- Sifir yeni dependency: sadece PostgreSQL + mevcut scraper'lar
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
122 lines
4.4 KiB
Python
122 lines
4.4 KiB
Python
"""
|
||
Database setup and models - PostgreSQL
|
||
"""
|
||
from sqlalchemy import create_engine, Column, Integer, String, DateTime, Boolean, ForeignKey, Text
|
||
from sqlalchemy.ext.declarative import declarative_base
|
||
from sqlalchemy.orm import sessionmaker, relationship
|
||
from datetime import datetime
|
||
import os
|
||
from logging_config import get_logger
|
||
|
||
log = get_logger("db")
|
||
|
||
# PostgreSQL database - configurable via environment variable
|
||
# Default: Local PostgreSQL for development
|
||
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://postgres@localhost:5433/trendyol_db")
|
||
|
||
engine = create_engine(DATABASE_URL, pool_pre_ping=True)
|
||
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
||
Base = declarative_base()
|
||
|
||
|
||
class Category(Base):
|
||
"""Category model - hierarchical structure"""
|
||
__tablename__ = "categories"
|
||
|
||
id = Column(Integer, primary_key=True, index=True)
|
||
name = Column(String, nullable=False)
|
||
parent_id = Column(Integer, ForeignKey('categories.id'), nullable=True)
|
||
trendyol_category_id = Column(Integer, nullable=True)
|
||
trendyol_url = Column(String, nullable=True)
|
||
path_model = Column(String, nullable=True) # URL slug for search API (e.g. "elbise-x-c56")
|
||
is_active = Column(Boolean, default=True)
|
||
created_at = Column(DateTime, default=datetime.utcnow)
|
||
|
||
# Relationships
|
||
children = relationship("Category", backref="parent", remote_side=[id])
|
||
snapshots = relationship("Snapshot", back_populates="category")
|
||
|
||
|
||
class Snapshot(Base):
|
||
"""Snapshot model - monthly data captures"""
|
||
__tablename__ = "snapshots"
|
||
|
||
id = Column(Integer, primary_key=True, index=True)
|
||
category_id = Column(Integer, ForeignKey('categories.id'), nullable=False, index=True)
|
||
snapshot_month = Column(String, nullable=False, index=True) # "2024-11", "2024-12"
|
||
total_products = Column(Integer, default=0)
|
||
avg_price = Column(Integer, default=0)
|
||
json_file_path = Column(String, nullable=True)
|
||
scraped_at = Column(DateTime, default=datetime.utcnow)
|
||
|
||
# Relationships
|
||
category = relationship("Category", back_populates="snapshots")
|
||
|
||
|
||
class Report(Base):
|
||
"""Report model - saved dashboard reports"""
|
||
__tablename__ = "reports"
|
||
|
||
id = Column(Integer, primary_key=True, index=True)
|
||
name = Column(String, nullable=False) # "Kasım Ayı Kozmetik Raporu"
|
||
category_id = Column(Integer, nullable=True, index=True) # Trendyol API category ID (no FK)
|
||
total_products = Column(Integer, default=0)
|
||
total_subcategories = Column(Integer, default=0)
|
||
json_file_path = Column(String, nullable=True)
|
||
html_file_path = Column(String, nullable=True)
|
||
created_at = Column(DateTime, default=datetime.utcnow)
|
||
|
||
|
||
class EnrichmentError(Base):
|
||
"""Persistent log for external enrichment errors"""
|
||
__tablename__ = "enrichment_errors"
|
||
|
||
id = Column(Integer, primary_key=True, index=True)
|
||
report_id = Column(Integer, nullable=True)
|
||
product_id = Column(Integer, nullable=True)
|
||
merchant_id = Column(Integer, nullable=True)
|
||
endpoint = Column(String, nullable=False) # reviews | social | questions | similar | followers
|
||
error_type = Column(String, nullable=True) # timeout | dns | reset | http | other
|
||
message = Column(String, nullable=True)
|
||
status_code = Column(Integer, nullable=True)
|
||
attempt = Column(Integer, default=1)
|
||
created_at = Column(DateTime, default=datetime.utcnow)
|
||
|
||
|
||
class ReportQueue(Base):
|
||
"""Queue for report generation tasks"""
|
||
__tablename__ = "report_queue"
|
||
|
||
id = Column(Integer, primary_key=True, index=True)
|
||
report_name = Column(String, nullable=False)
|
||
category_id = Column(Integer, nullable=False)
|
||
status = Column(String, nullable=False, default="PENDING")
|
||
progress = Column(Integer, nullable=False, default=0)
|
||
message = Column(Text, nullable=True)
|
||
result_report_id = Column(Integer, nullable=True)
|
||
error = Column(Text, nullable=True)
|
||
worker_id = Column(String, nullable=True)
|
||
created_at = Column(DateTime, default=datetime.utcnow)
|
||
started_at = Column(DateTime, nullable=True)
|
||
completed_at = Column(DateTime, nullable=True)
|
||
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
|
||
|
||
|
||
def init_db():
|
||
"""Initialize database - create tables"""
|
||
Base.metadata.create_all(bind=engine)
|
||
log.info("Database initialized successfully")
|
||
|
||
|
||
def get_db():
|
||
"""Get database session"""
|
||
db = SessionLocal()
|
||
try:
|
||
yield db
|
||
finally:
|
||
db.close()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
init_db()
|