Ingestion Pipeline
DocumentProcessor — 9-Step Pipeline
engine/ingestion/processor.py processes every incoming document through a fixed pipeline:
Any step failure is caught, logged, and recorded in ingestion_jobs.errors (JSONB array). Partial failures result in job status = 'partial'.
Text Cleaning Strategies
| Strategy | Operations |
|---|---|
minimal | Strip leading/trailing whitespace |
standard | Minimal + remove excessive blank lines, normalize Unicode whitespace |
aggressive | Standard + remove HTML tags, collapse multiple spaces, remove control characters |
Strategy is controlled per-resource via config.auto_clean flag and INGESTION_CLEANING_STRATEGY env default.
TextChunker (engine/ingestion/chunker.py)
@dataclass
class TextChunk:
content: str
index: int # sequential chunk index
start_char: int # character offset in source
end_char: int
metadata: dict # inherited from document + chunk-level
class TextChunker:
def __init__(self, chunk_size=512, chunk_overlap=50, separator="\n\n"):
...
def chunk(self, text: str, metadata: dict = {}) -> list[TextChunk]:
# Split by paragraph separator ("\n\n")
# If paragraph > chunk_size, split by sentence
# Maintain overlap of chunk_overlap chars between consecutive chunks
# Validate: no chunk > 10MB, no empty chunksAlgorithm:
- Split on
separator("\n\n"= paragraph breaks) - If any paragraph exceeds
chunk_size, split further by sentence boundary - Slide a window of
chunk_sizechars withchunk_overlapstride - Track
start_char/end_charfor source attribution
Constraints:
chunk_sizedefault: 512 characterschunk_overlapdefault: 50 characters- Maximum content length enforced before chunking:
INGESTION_MAX_CONTENT_LENGTH = 1,000,000chars
EmbeddingModel (engine/embedding/models.py)
class EmbeddingModel:
model_name: str = "sentence-transformers/all-MiniLM-L6-v2"
dimensions: int = 384
_cache: dict[str, np.ndarray] # in-memory LRU, max 10,000 entries
def __init__(self):
self.device = "cuda" if torch.cuda.is_available() else "cpu"
self.model = SentenceTransformer(model_name, device=self.device)
if self.device == "cuda":
self.model.half() # FP16 on GPU for throughput
self._cache = {}
def encode(
self,
texts: list[str],
batch_size: int = 32,
show_progress: bool = False
) -> np.ndarray:
# Check cache first
# Encode uncached texts in batches of batch_size
# L2-normalize if config.normalize = True
# Store results in _cache
# Return numpy array of shape (len(texts), 384)Cache behavior: The in-memory dict cache maps text → embedding vector. Cache size is bounded by EMBEDDING_CACHE_SIZE = 10,000. No eviction policy is specified — the cache grows until the bound is reached, at which point caching stops (not LRU; effectively a “fill then bypass” strategy).
Batch Embedding Pipeline (engine/embedding/batch.py)
async def batch_generate_embeddings(
chunks: list[TextChunk],
model: EmbeddingModel,
batch_size: int = 32
) -> list[np.ndarray]:
# Process in batches to avoid OOM on large documents
# Returns list of 384-dim vectors, one per chunkEmbeddingStorage (engine/embedding/storage.py)
Persists vectors to PostgreSQL:
INSERT INTO document_chunks (document_id, content, chunk_index, embedding, metadata)
VALUES ($1, $2, $3, $4::vector, $5)The $4::vector cast stores the numpy array as a pgvector vector(384) column, enabling HNSW index searches.
Web Scraping Subsystem (engine/ingestion/scrapers/)
For ResourceType.WEBSITE, the ingestion pipeline utilizes a multi-stage scraping architecture to handle everything from static documents to complex single-page applications:
- BrowserScraper (Playwright): A headless Chromium-based scraper that handles JavaScript-heavy pages and SPAs using improved navigation logic and modern content extraction APIs.
- HTTpx Scraper: A lightweight, high-performance fetcher for static HTML content.
- WebCrawler (
engine/ingestion/crawler.py): A robust discovery engine with optimized link finding and depth management, designed for comprehensive site indexing while respecting rate limits. - Parsing: Uses BeautifulSoup4 with
lxmlto extract clean text before converting it to Markdown for the chunking pipeline.
Configuration is centralized in ScrapingConfig:
timeout: 30smax_retries: 3user_agent: "OpenTier Intelligence Bot/1.0"
Testing & Validation
End-to-End Ingestion Test (server/intelligence/script/full_ingestion.py)
A comprehensive test script that validates the entire pipeline from URL scraping to vector storage. It can be run with custom user IDs and persistence options to verify dashboard visibility.
Usage:
uv run script/full_ingestion.py "https://example.com" --user "YOUR_USER_ID" --persist--user: Associate the ingestion with a specific user ID.--persist: Prevents the script from deleting the resource after the test, allowing it to be viewed in the dashboard.
GitHub Integration (engine/ingestion/scrapers/)
For ResourceType.CODE with a GitHub URL, PyGithub is used to:
- Fetch repository tree
- Enumerate files by extension
- Fetch file contents
- Feed into the ingestion pipeline as
CODEtype documents