Skip to content

A way to consume multiple messages as a single collection (e.g. List<T>) #883

@Songzhizong

Description

@Songzhizong

Is your feature request related to a problem? Please describe.

I am using RabbitMQ Stream for high-throughput scenarios, where the bottleneck often shifts from network I/O to the processing logic, specifically when interacting with downstream systems like databases (ClickHouse, Elasticsearch) or calling external APIs.

The current ConsumerBuilder only supports a single-message callback via messageHandler(BiConsumer<Context, Message> messageHandler).
This forces the application to process messages one by one. In scenarios where I need to perform bulk inserts or batch processing to optimize IOPS and transaction overhead, the current API is limiting.

Describe the solution you'd like

I would like to request native support for batch consumption in the Java Client.
Ideally, the ConsumerBuilder could offer a mechanism to receive a batch of messages.

For example, an API similar to this:

Environment environment = Environment.builder().build();

environment.consumerBuilder()
    .stream("my-stream")
    .batchSize(500) // Trigger when 500 messages are accumulated
    .batchTimeout(Duration.ofMillis(100)) // Or trigger when time window elapses
    .batchMessageHandler((context, messages) -> { // messages is List<Message>
        // Here I can perform bulk insert efficiently
        myDatabase.insertAll(messages); 
    })
    .build();

Describe alternatives you've considered

Currently, we have to implement a manual "Accumulator/Buffer" pattern on the client side:

  1. Receive message in the single-message callback.
  2. Push it into a thread-safe List/Queue.
  3. Check if the list size reaches a threshold or a timer expires.
  4. Flush the list to the business logic.

Why this alternative is not ideal:

  • Complexity: Every user has to reinvent the wheel (concurrency control, timer management, offset tracking).
  • Risk: It increases the risk of message loss or duplication if the buffering logic is not handled correctly during graceful shutdowns.
  • Performance: A native implementation within the client library (closer to the network layer/Netty buffer) would likely be more efficient than application-level buffering.

Additional context

Many other streaming clients (like Kafka's poll returning a generic collection, or Spring's batch listeners) support this out of the box. Since RabbitMQ Stream is designed for high throughput, adding batch consumption support would significantly enhance its capability in "Big Data" or "Data Pipeline" use cases.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions