Skip to Content

Ingestion Pipeline

DocumentProcessor — 9-Step Pipeline

engine/ingestion/processor.py processes every incoming document through a fixed pipeline:

Any step failure is caught, logged, and recorded in ingestion_jobs.errors (JSONB array). Partial failures result in job status = 'partial'.

Text Cleaning Strategies

StrategyOperations
minimalStrip leading/trailing whitespace
standardMinimal + remove excessive blank lines, normalize Unicode whitespace
aggressiveStandard + remove HTML tags, collapse multiple spaces, remove control characters

Strategy is controlled per-resource via config.auto_clean flag and INGESTION_CLEANING_STRATEGY env default.

TextChunker (engine/ingestion/chunker.py)

@dataclass class TextChunk: content: str index: int # sequential chunk index start_char: int # character offset in source end_char: int metadata: dict # inherited from document + chunk-level class TextChunker: def __init__(self, chunk_size=512, chunk_overlap=50, separator="\n\n"): ... def chunk(self, text: str, metadata: dict = {}) -> list[TextChunk]: # Split by paragraph separator ("\n\n") # If paragraph > chunk_size, split by sentence # Maintain overlap of chunk_overlap chars between consecutive chunks # Validate: no chunk > 10MB, no empty chunks

Algorithm:

  1. Split on separator ("\n\n" = paragraph breaks)
  2. If any paragraph exceeds chunk_size, split further by sentence boundary
  3. Slide a window of chunk_size chars with chunk_overlap stride
  4. Track start_char / end_char for source attribution

Constraints:

  • chunk_size default: 512 characters
  • chunk_overlap default: 50 characters
  • Maximum content length enforced before chunking: INGESTION_MAX_CONTENT_LENGTH = 1,000,000 chars

EmbeddingModel (engine/embedding/models.py)

class EmbeddingModel: model_name: str = "sentence-transformers/all-MiniLM-L6-v2" dimensions: int = 384 _cache: dict[str, np.ndarray] # in-memory LRU, max 10,000 entries def __init__(self): self.device = "cuda" if torch.cuda.is_available() else "cpu" self.model = SentenceTransformer(model_name, device=self.device) if self.device == "cuda": self.model.half() # FP16 on GPU for throughput self._cache = {} def encode( self, texts: list[str], batch_size: int = 32, show_progress: bool = False ) -> np.ndarray: # Check cache first # Encode uncached texts in batches of batch_size # L2-normalize if config.normalize = True # Store results in _cache # Return numpy array of shape (len(texts), 384)

Cache behavior: The in-memory dict cache maps text → embedding vector. Cache size is bounded by EMBEDDING_CACHE_SIZE = 10,000. No eviction policy is specified — the cache grows until the bound is reached, at which point caching stops (not LRU; effectively a “fill then bypass” strategy).

Batch Embedding Pipeline (engine/embedding/batch.py)

async def batch_generate_embeddings( chunks: list[TextChunk], model: EmbeddingModel, batch_size: int = 32 ) -> list[np.ndarray]: # Process in batches to avoid OOM on large documents # Returns list of 384-dim vectors, one per chunk

EmbeddingStorage (engine/embedding/storage.py)

Persists vectors to PostgreSQL:

INSERT INTO document_chunks (document_id, content, chunk_index, embedding, metadata) VALUES ($1, $2, $3, $4::vector, $5)

The $4::vector cast stores the numpy array as a pgvector vector(384) column, enabling HNSW index searches.

Web Scraping Subsystem (engine/ingestion/scrapers/)

For ResourceType.WEBSITE, the ingestion pipeline utilizes a multi-stage scraping architecture to handle everything from static documents to complex single-page applications:

  1. BrowserScraper (Playwright): A headless Chromium-based scraper that handles JavaScript-heavy pages and SPAs using improved navigation logic and modern content extraction APIs.
  2. HTTpx Scraper: A lightweight, high-performance fetcher for static HTML content.
  3. WebCrawler (engine/ingestion/crawler.py): A robust discovery engine with optimized link finding and depth management, designed for comprehensive site indexing while respecting rate limits.
  4. Parsing: Uses BeautifulSoup4 with lxml to extract clean text before converting it to Markdown for the chunking pipeline.

Configuration is centralized in ScrapingConfig:

  • timeout: 30s
  • max_retries: 3
  • user_agent: "OpenTier Intelligence Bot/1.0"

Testing & Validation

End-to-End Ingestion Test (server/intelligence/script/full_ingestion.py)

A comprehensive test script that validates the entire pipeline from URL scraping to vector storage. It can be run with custom user IDs and persistence options to verify dashboard visibility.

Usage:

uv run script/full_ingestion.py "https://example.com" --user "YOUR_USER_ID" --persist
  • --user: Associate the ingestion with a specific user ID.
  • --persist: Prevents the script from deleting the resource after the test, allowing it to be viewed in the dashboard.

GitHub Integration (engine/ingestion/scrapers/)

For ResourceType.CODE with a GitHub URL, PyGithub is used to:

  1. Fetch repository tree
  2. Enumerate files by extension
  3. Fetch file contents
  4. Feed into the ingestion pipeline as CODE type documents
Last updated on