You are building a real-time document enrichment aggregator for a Kafka-like event stream.
Each incoming event has the following fields:
-
document_id
: string
-
enricher_id
: string
-
enrichment
: any JSON-compatible value
Example events:
{"document_id": "doc1", "enricher_id": "Sentiment Analysis", "enrichment": "positive"}
{"document_id": "doc2", "enricher_id": "Entity Linking", "enrichment": ["AAPL"]}
{"document_id": "doc1", "enricher_id": "Topic Classification", "enrichment": ["AUTO", "ANALYST_CHANGE"]}
{"document_id": "doc1", "enricher_id": "Entity Linking", "enrichment": ["TSLA"]}
{"document_id": "doc3", "enricher_id": "Summarization", "enrichment": "Yesterday the Yankees won against the Mets..."}
The aggregator is configured with a required set of enrichers, for example:
["Sentiment Analysis", "Entity Linking"]
Implement an in-memory Python aggregator with the following behavior:
-
Consume events one at a time.
-
Ignore events whose
enricher_id
is not in the configured required set.
-
For each
document_id
, keep the latest enrichment value for each required enricher.
-
As soon as a document has received all required enrichments, publish exactly one aggregated output for that document.
-
If 10 seconds have passed since the first relevant event for a document and not all required enrichments have arrived, publish the partial result exactly once.
-
After a document has been published, any later events for that same document should be ignored.
-
After publishing, clean up any in-memory state for that document.
-
Assume there is no background thread. Timeout handling should be done lazily when new events arrive, or through an explicit timeout-check method.
The published output should have this form:
{
"document_id": "doc1",
"enricher_ids": ["Sentiment Analysis", "Entity Linking"],
"enrichments": {
"Sentiment Analysis": "positive",
"Entity Linking": ["TSLA"]
}
}
Write the Python implementation and explain any important edge cases you would consider.