PracHub
QuestionsPremiumLearningGuidesCheatsheetNEW
|Home/Coding & Algorithms/Bloomberg

Implement streaming enrichment aggregator

Last updated: Apr 2, 2026

Quick Overview

This question evaluates competency in stateful stream processing, event-driven aggregation, in-memory state management, time-based expiry, and producing idempotent outputs.

  • medium
  • Bloomberg
  • Coding & Algorithms
  • Software Engineer

Implement streaming enrichment aggregator

Company: Bloomberg

Role: Software Engineer

Category: Coding & Algorithms

Difficulty: medium

Interview Round: Technical Screen

You are building a real-time document enrichment aggregator for a Kafka-like event stream. Each incoming event has the following fields: - `document_id`: string - `enricher_id`: string - `enrichment`: any JSON-compatible value Example events: ```json {"document_id": "doc1", "enricher_id": "Sentiment Analysis", "enrichment": "positive"} {"document_id": "doc2", "enricher_id": "Entity Linking", "enrichment": ["AAPL"]} {"document_id": "doc1", "enricher_id": "Topic Classification", "enrichment": ["AUTO", "ANALYST_CHANGE"]} {"document_id": "doc1", "enricher_id": "Entity Linking", "enrichment": ["TSLA"]} {"document_id": "doc3", "enricher_id": "Summarization", "enrichment": "Yesterday the Yankees won against the Mets..."} ``` The aggregator is configured with a required set of enrichers, for example: ```python ["Sentiment Analysis", "Entity Linking"] ``` Implement an in-memory Python aggregator with the following behavior: 1. Consume events one at a time. 2. Ignore events whose `enricher_id` is not in the configured required set. 3. For each `document_id`, keep the latest enrichment value for each required enricher. 4. As soon as a document has received all required enrichments, publish exactly one aggregated output for that document. 5. If 10 seconds have passed since the first relevant event for a document and not all required enrichments have arrived, publish the partial result exactly once. 6. After a document has been published, any later events for that same document should be ignored. 7. After publishing, clean up any in-memory state for that document. 8. Assume there is no background thread. Timeout handling should be done lazily when new events arrive, or through an explicit timeout-check method. The published output should have this form: ```json { "document_id": "doc1", "enricher_ids": ["Sentiment Analysis", "Entity Linking"], "enrichments": { "Sentiment Analysis": "positive", "Entity Linking": ["TSLA"] } } ``` Write the Python implementation and explain any important edge cases you would consider.

Quick Answer: This question evaluates competency in stateful stream processing, event-driven aggregation, in-memory state management, time-based expiry, and producing idempotent outputs.

Related Interview Questions

  • Solve meeting and tree problems - Bloomberg (easy)
  • Minimize travel cost with two cities - Bloomberg (easy)
  • Check connectivity between two subway stations - Bloomberg (easy)
  • Design a data structure for dynamic top‑K frequency - Bloomberg (hard)
  • Find tree root and bucket numbers - Bloomberg (hard)
Bloomberg logo
Bloomberg
Jan 11, 2026, 12:00 AM
Software Engineer
Technical Screen
Coding & Algorithms
2
0
Loading...

You are building a real-time document enrichment aggregator for a Kafka-like event stream.

Each incoming event has the following fields:

  • document_id : string
  • enricher_id : string
  • enrichment : any JSON-compatible value

Example events:

{"document_id": "doc1", "enricher_id": "Sentiment Analysis", "enrichment": "positive"}
{"document_id": "doc2", "enricher_id": "Entity Linking", "enrichment": ["AAPL"]}
{"document_id": "doc1", "enricher_id": "Topic Classification", "enrichment": ["AUTO", "ANALYST_CHANGE"]}
{"document_id": "doc1", "enricher_id": "Entity Linking", "enrichment": ["TSLA"]}
{"document_id": "doc3", "enricher_id": "Summarization", "enrichment": "Yesterday the Yankees won against the Mets..."}

The aggregator is configured with a required set of enrichers, for example:

["Sentiment Analysis", "Entity Linking"]

Implement an in-memory Python aggregator with the following behavior:

  1. Consume events one at a time.
  2. Ignore events whose enricher_id is not in the configured required set.
  3. For each document_id , keep the latest enrichment value for each required enricher.
  4. As soon as a document has received all required enrichments, publish exactly one aggregated output for that document.
  5. If 10 seconds have passed since the first relevant event for a document and not all required enrichments have arrived, publish the partial result exactly once.
  6. After a document has been published, any later events for that same document should be ignored.
  7. After publishing, clean up any in-memory state for that document.
  8. Assume there is no background thread. Timeout handling should be done lazily when new events arrive, or through an explicit timeout-check method.

The published output should have this form:

{
  "document_id": "doc1",
  "enricher_ids": ["Sentiment Analysis", "Entity Linking"],
  "enrichments": {
    "Sentiment Analysis": "positive",
    "Entity Linking": ["TSLA"]
  }
}

Write the Python implementation and explain any important edge cases you would consider.

Comments (0)

Sign in to leave a comment

Loading comments...

Browse More Questions

More Coding & Algorithms•More Bloomberg•More Software Engineer•Bloomberg Software Engineer•Bloomberg Coding & Algorithms•Software Engineer Coding & Algorithms
PracHub

Master your tech interviews with 7,500+ real questions from top companies.

Product

  • Questions
  • Learning Tracks
  • Interview Guides
  • Resources
  • Premium
  • For Universities
  • Student Access

Browse

  • By Company
  • By Role
  • By Category
  • Topic Hubs
  • SQL Questions
  • Compare Platforms
  • Discord Community

Support

  • support@prachub.com
  • (916) 541-4762

Legal

  • Privacy Policy
  • Terms of Service
  • About Us

© 2026 PracHub. All rights reserved.