Storage Architecture
All file I/O in be-podcast-etl goes through a storage abstraction layer. No stage or script accesses the filesystem directly.
Storage ABC
The Storage base class defines the interface:
class Storage(ABC):
def read_json(self, key: str) -> dict | None: ...
def write_json(self, key: str, data: dict) -> None: ...
def read_jsonl(self, key: str) -> list[dict] | None: ...
def write_text(self, key: str, content: str) -> None: ...
def read_bytes(self, key: str) -> bytes | None: ...
def write_bytes(self, key: str, data: bytes, content_type: str) -> None: ...
def exists(self, key: str) -> bool: ...
def delete(self, key: str) -> None: ...
def list_prefix(self, prefix: str) -> list[str]: ...
# Batch operations (with default implementations)
def read_json_batch(self, keys: list[str]) -> dict[str, dict | None]: ...
def exists_batch(self, keys: list[str]) -> dict[str, bool]: ...
def count_prefix(self, prefix: str) -> int: ...
def iter_prefix(self, prefix: str, batch_size: int = 100) -> Iterator[dict]: ...
Keys are path-like strings (e.g., beliefs/a7/b_a7f3c2e1.json). The backend resolves them to the appropriate storage location.
Backends
LocalStorage (development)
Reads and writes to the local filesystem under LOCAL_ROOT (default: ./data).
STORAGE_BACKEND=local
LOCAL_ROOT=./data
SupabaseStorage (remote)
Reads and writes to a Supabase storage bucket. Used for Docker deployments without local disk.
STORAGE_BACKEND=supabase
SUPABASE_URL=https://xxx.supabase.co
SUPABASE_ETL_KEY=eyJ...
SUPABASE_ETL_BUCKET=podcast-etl-data
DualStorage (recommended)
Writes to both local and Supabase simultaneously. Reads from local only (fast). Supabase failures are logged but don't block the pipeline.
STORAGE_BACKEND=dual
LOCAL_ROOT=./data
SUPABASE_URL=https://xxx.supabase.co
SUPABASE_ETL_KEY=eyJ...
SUPABASE_ETL_BUCKET=podcast-etl-data
This gives you local speed with automatic cloud backup.
Data Layout
data/
├── raw/podcasts/{podcast}/episodes/{episode}/
│ └── transcript.json # Raw transcription (from be-flow-dtd)
├── beliefs/{shard}/{id}.json # Extracted beliefs (sharded by ID prefix)
├── embeddings/{shard}/ # Embedding cache (separate from beliefs)
├── matrices/{shard}/ # Domain scores and weights
├── persons/{person-slug}/
│ ├── profile.json # Wikipedia metadata
│ ├── sprite.png # 8-bit avatar
│ ├── matrix.json # Aggregated beliefs
│ ├── beliefs.jsonl # All beliefs (JSONL)
│ ├── embedding.json # Person embedding
│ └── similarities.json # Similar persons
├── search/
│ └── index.json # Person search index
├── runs/manifests/ # Pipeline state tracking
│ └── {podcast}/{episode}.json # Per-episode manifest
└── podcasts.db # SQLite for feed tracking
Sharding
Beliefs and embeddings are sharded by the first two characters of the content-hash ID:
beliefs/a7/b_a7f3c2e1.json
beliefs/3c/b_3c8d1a2f.json
This prevents any single directory from growing too large.
Manifest-Based State
Pipeline progress is tracked via JSON manifests in data/runs/manifests/. Each episode has one manifest file that records which stages have completed.
{
"podcast_slug": "lex-fridman",
"episode_slug": "naval-ravikant-2023",
"stages": {
"speakers": {"status": "complete", "completed_at": "2026-02-24T14:30:00Z"},
"ads": {"status": "complete", "completed_at": "2026-02-24T14:30:05Z"},
"extract": {"status": "complete", "completed_at": "2026-02-24T14:31:22Z"},
"abstract": {"status": "pending"},
"embed": {"status": "pending"}
}
}
The backfill scripts scan manifests to find incomplete episodes and resume from the last completed stage.
Backup & Recovery
With DualStorage
DualStorage automatically backs up every write to Supabase. If local data is lost:
python scripts/restore_from_supabase.py --prefix persons/
python scripts/restore_from_supabase.py --prefix beliefs/
Manual Backup
For local-only deployments, use the backup scripts:
# Push local data to Supabase
python scripts/backup_to_supabase.py --prefix beliefs/ persons/
# Restore from Supabase to local
python scripts/restore_from_supabase.py --prefix persons/
Circuit Breakers
External service calls (OpenAI, Supabase, Qdrant) are wrapped in circuit breakers to prevent cascading failures from API rate limits or outages:
from src.core.circuits import openai_breaker
async with openai_breaker:
response = await client.chat.completions.create(...)
If a service fails repeatedly, the breaker opens and short-circuits further calls until the service recovers.