Skip to Content
Intelligence Service (Python)gRPC Server Interface

gRPC Server Interface

Service Registration

The Intelligence service registers 3 gRPC servicers against the shared grpc.aio.server:

health_pb2_grpc.add_HealthServicer_to_server(HealthService(), server) intelligence_pb2_grpc.add_ChatServicer_to_server(ChatService(engine), server) intelligence_pb2_grpc.add_ResourceServiceServicer_to_server(ResourceService(engine), server)

Health Service

RPCInputOutputNotes
CheckHealthCheckRequest {}HealthCheckResponse { status, version, uptime_seconds }Always returns “healthy” if process is alive
ReadyReadyCheckRequest {}ReadyCheckResponse { ready, dependencies, dependency_status }Checks DB + embedding model

Chat Service

RPCInputOutputNotes
SendMessageChatRequestChatResponseUnary; timeout 1200s
StreamChatChatRequeststream ChatStreamChunkServer-streaming; timeout 300s
GetConversationGetConversationRequest { user_id, conversation_id }ConversationResponse { messages[] }Fetches from chat_messages
DeleteConversationDeleteConversationRequest { user_id, conversation_id }DeleteConversationResponse { success }Checks ownership
GenerateTitleGenerateTitleRequest { user_id, conversation_id, user_message, assistant_message }GenerateTitleResponse { title }LLM call

ChatRequest Structure

message ChatRequest { string user_id = 1; string conversation_id = 2; string message = 3; optional ChatConfig config = 4; map<string, string> metadata = 5; }

ChatStreamChunk Structure

message ChatStreamChunk { string conversation_id = 1; string message_id = 2; oneof chunk_type { TokenChunk token = 3; // incremental LLM output SourcesChunk sources = 4; // RAG source citations MetricsChunk metrics = 5; // token counts, latency ErrorChunk error = 6; // terminal error } bool is_final = 7; }

The Rust layer reads each ChatStreamChunk from the gRPC stream and converts it to an SSE data: event. is_final=true triggers the SSE stream close.

ResourceService Service

RPCTypeInputOutputTimeout
AddResourceUnaryAddResourceRequestAddResourceResponse { resource_id, job_id, status }3000s
ChunkedUploadClient-streamingstream FileChunkChunkedUploadResponse3000s
GetResourceStatusUnaryGetResourceStatusRequest { resource_id, user_id }ResourceStatusResponse10s
ListResourcesUnaryListResourcesRequest { user_id, filters, pagination }ListResourcesResponse { resources[], total, next_cursor }30s
DeleteResourceUnaryDeleteResourceRequest { resource_id, user_id }DeleteResourceResponse { success }30s
CancelIngestionUnaryCancelIngestionRequest { job_id, user_id }CancelIngestionResponse { success }10s
SyncResourceMetadataUnarySyncMetadataRequestSyncMetadataResponse { synced, conflicts[] }60s

ChunkedUpload — Client-Streaming

Supports uploading files larger than the 100MB gRPC message limit by streaming in chunks:

  1. First FileChunk must contain payload.metadata: ChunkMetadata { user_id, resource_id, filename, content_type, total_size, total_chunks, ... }
  2. Subsequent FileChunk messages contain payload.data: bytes (max 10MB per chunk)
  3. Final chunk has is_last = true
  4. Maximum total file size: 1 GB
  5. Optional checksum in ChunkMetadata for integrity verification

SyncResourceMetadata — Conflict Resolution

Used to reconcile state between the Rust API and Intelligence service:

enum SyncDirection { UNSPECIFIED = 0; API_TO_INTELLIGENCE = 1; // Rust is source of truth INTELLIGENCE_TO_API = 2; // Python is source of truth BIDIRECTIONAL = 3; // merge conflicts } enum ConflictType { MISSING_IN_API = 0; MISSING_IN_INTELLIGENCE = 1; STATUS_MISMATCH = 2; METADATA_MISMATCH = 3; } enum ConflictResolution { USE_API = 0; USE_INTELLIGENCE = 1; MERGE = 2; MANUAL = 3; // returned to caller for manual resolution }

Correlation ID Propagation

The Chat servicer extracts or generates a x-correlation-id from gRPC metadata:

# interfaces/chat.py def _get_correlation_id(context: grpc.aio.ServicerContext) -> str: metadata = dict(context.invocation_metadata()) return metadata.get('x-correlation-id', str(uuid.uuid4()))

The correlation ID is included in all log lines within the request scope, enabling distributed tracing across the Rust→Python boundary.

Error Code Mapping

Python ExceptiongRPC Status CodeHTTP (after Rust mapping)
ConversationNotFoundNOT_FOUND404
PermissionErrorPERMISSION_DENIED403
ValidationError (Pydantic)INVALID_ARGUMENT400
RateLimitErrorRESOURCE_EXHAUSTED429
TimeoutErrorDEADLINE_EXCEEDED504
ServiceUnavailableUNAVAILABLE503
All othersINTERNAL500
Last updated on