gRPC Server Interface
Service Registration
The Intelligence service registers 3 gRPC servicers against the shared grpc.aio.server:
health_pb2_grpc.add_HealthServicer_to_server(HealthService(), server)
intelligence_pb2_grpc.add_ChatServicer_to_server(ChatService(engine), server)
intelligence_pb2_grpc.add_ResourceServiceServicer_to_server(ResourceService(engine), server)Health Service
| RPC | Input | Output | Notes |
|---|---|---|---|
Check | HealthCheckRequest {} | HealthCheckResponse { status, version, uptime_seconds } | Always returns “healthy” if process is alive |
Ready | ReadyCheckRequest {} | ReadyCheckResponse { ready, dependencies, dependency_status } | Checks DB + embedding model |
Chat Service
| RPC | Input | Output | Notes |
|---|---|---|---|
SendMessage | ChatRequest | ChatResponse | Unary; timeout 1200s |
StreamChat | ChatRequest | stream ChatStreamChunk | Server-streaming; timeout 300s |
GetConversation | GetConversationRequest { user_id, conversation_id } | ConversationResponse { messages[] } | Fetches from chat_messages |
DeleteConversation | DeleteConversationRequest { user_id, conversation_id } | DeleteConversationResponse { success } | Checks ownership |
GenerateTitle | GenerateTitleRequest { user_id, conversation_id, user_message, assistant_message } | GenerateTitleResponse { title } | LLM call |
ChatRequest Structure
message ChatRequest {
string user_id = 1;
string conversation_id = 2;
string message = 3;
optional ChatConfig config = 4;
map<string, string> metadata = 5;
}ChatStreamChunk Structure
message ChatStreamChunk {
string conversation_id = 1;
string message_id = 2;
oneof chunk_type {
TokenChunk token = 3; // incremental LLM output
SourcesChunk sources = 4; // RAG source citations
MetricsChunk metrics = 5; // token counts, latency
ErrorChunk error = 6; // terminal error
}
bool is_final = 7;
}The Rust layer reads each ChatStreamChunk from the gRPC stream and converts it to an SSE data: event. is_final=true triggers the SSE stream close.
ResourceService Service
| RPC | Type | Input | Output | Timeout |
|---|---|---|---|---|
AddResource | Unary | AddResourceRequest | AddResourceResponse { resource_id, job_id, status } | 3000s |
ChunkedUpload | Client-streaming | stream FileChunk | ChunkedUploadResponse | 3000s |
GetResourceStatus | Unary | GetResourceStatusRequest { resource_id, user_id } | ResourceStatusResponse | 10s |
ListResources | Unary | ListResourcesRequest { user_id, filters, pagination } | ListResourcesResponse { resources[], total, next_cursor } | 30s |
DeleteResource | Unary | DeleteResourceRequest { resource_id, user_id } | DeleteResourceResponse { success } | 30s |
CancelIngestion | Unary | CancelIngestionRequest { job_id, user_id } | CancelIngestionResponse { success } | 10s |
SyncResourceMetadata | Unary | SyncMetadataRequest | SyncMetadataResponse { synced, conflicts[] } | 60s |
ChunkedUpload — Client-Streaming
Supports uploading files larger than the 100MB gRPC message limit by streaming in chunks:
- First
FileChunkmust containpayload.metadata: ChunkMetadata { user_id, resource_id, filename, content_type, total_size, total_chunks, ... } - Subsequent
FileChunkmessages containpayload.data: bytes(max 10MB per chunk) - Final chunk has
is_last = true - Maximum total file size: 1 GB
- Optional
checksuminChunkMetadatafor integrity verification
SyncResourceMetadata — Conflict Resolution
Used to reconcile state between the Rust API and Intelligence service:
enum SyncDirection {
UNSPECIFIED = 0;
API_TO_INTELLIGENCE = 1; // Rust is source of truth
INTELLIGENCE_TO_API = 2; // Python is source of truth
BIDIRECTIONAL = 3; // merge conflicts
}
enum ConflictType {
MISSING_IN_API = 0;
MISSING_IN_INTELLIGENCE = 1;
STATUS_MISMATCH = 2;
METADATA_MISMATCH = 3;
}
enum ConflictResolution {
USE_API = 0;
USE_INTELLIGENCE = 1;
MERGE = 2;
MANUAL = 3; // returned to caller for manual resolution
}Correlation ID Propagation
The Chat servicer extracts or generates a x-correlation-id from gRPC metadata:
# interfaces/chat.py
def _get_correlation_id(context: grpc.aio.ServicerContext) -> str:
metadata = dict(context.invocation_metadata())
return metadata.get('x-correlation-id', str(uuid.uuid4()))The correlation ID is included in all log lines within the request scope, enabling distributed tracing across the Rust→Python boundary.
Error Code Mapping
| Python Exception | gRPC Status Code | HTTP (after Rust mapping) |
|---|---|---|
ConversationNotFound | NOT_FOUND | 404 |
PermissionError | PERMISSION_DENIED | 403 |
ValidationError (Pydantic) | INVALID_ARGUMENT | 400 |
RateLimitError | RESOURCE_EXHAUSTED | 429 |
TimeoutError | DEADLINE_EXCEEDED | 504 |
ServiceUnavailable | UNAVAILABLE | 503 |
| All others | INTERNAL | 500 |
Last updated on