DB - pipeline_runs

abstract

Tracks each execution of the multi-agent LLM pipeline — one row per analysis run, linking the trigger (account + symbol + timeframe) to its outcome (signal, trade, or hold).

Table Info

PropertyValue
Table Namepipeline_runs
SQLAlchemy Modelbackend/db/models.py :: PipelineRun
Pydantic Schemabackend/api/routes/pipeline.py
Migration Filealembic/versions/
TimescaleDB HypertableNo
Partition Column

Columns

ColumnSQLAlchemy TypeNullableDefaultDescription
idIntegerNoautoPrimary key
account_idInteger FKNoFK → accounts.id (indexed)
symbolString(20)NoSymbol analyzed (indexed)
timeframeString(10)NoTimeframe of analysis (e.g., M15)
task_typeString(20)No"signal"signal | maintenance
statusString(20)No"running"running | completed | hold | skipped | failed
final_actionString(15)YesnullFinal decision: BUY, SELL, HOLD, etc.
total_duration_msIntegerYesnullTotal pipeline wall-clock time in ms
journal_idInteger FKYesnullFK → ai_journal.id (the resulting signal)
trade_idInteger FKYesnullFK → trades.id (the resulting trade, if any)
strategy_idInteger FKYesnullFK → strategies.id (SET NULL on delete)
created_atDateTime(timezone=True)Nodatetime.now(UTC)Run start timestamp (indexed)

Constraints & Indexes

NameTypeColumnsPurpose
pk_pipeline_runsPRIMARY KEYidRow uniqueness
idx_pipeline_accountINDEXaccount_idFilter by account
idx_pipeline_symbolINDEXsymbolFilter by symbol
idx_pipeline_createdINDEXcreated_atTime-range queries

Entity Relationships

SQLAlchemy Model (reference snapshot)

class PipelineRun(Base):
    __tablename__ = "pipeline_runs"
 
    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    account_id: Mapped[int] = mapped_column(Integer, ForeignKey("accounts.id"), index=True)
    symbol: Mapped[str] = mapped_column(String(20), index=True)
    timeframe: Mapped[str] = mapped_column(String(10))
    task_type: Mapped[str] = mapped_column(String(20), default="signal")
    status: Mapped[str] = mapped_column(String(20), default="running")
    final_action: Mapped[str | None] = mapped_column(String(15), nullable=True)
    total_duration_ms: Mapped[int | None] = mapped_column(Integer, nullable=True)
    journal_id: Mapped[int | None] = mapped_column(Integer, ForeignKey("ai_journal.id"), nullable=True)
    trade_id: Mapped[int | None] = mapped_column(Integer, ForeignKey("trades.id"), nullable=True)
    strategy_id: Mapped[int | None] = mapped_column(
        Integer, ForeignKey("strategies.id", ondelete="SET NULL"), nullable=True
    )
    created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=lambda: datetime.now(UTC), index=True)
 
    steps: Mapped[list["PipelineStep"]] = relationship(
        "PipelineStep", back_populates="run", cascade="all, delete-orphan",
        order_by="PipelineStep.seq",
    )

Service Layer

LayerFilePurpose
Routerbackend/api/routes/pipeline.pyList runs, stream status
AI Pipelinebackend/ai/Creates and updates runs during execution
RoleLink
API EndpointAPI-GET-v1-Pipeline
Frontend PagePage - Agent Workflow
Related TableDB - pipeline_steps
Related TableDB - ai_journal
Related TableDB - llm_calls