-
Notifications
You must be signed in to change notification settings - Fork 16
Description
Is your feature request related to a problem? Please describe.
I am using RabbitMQ Stream for high-throughput scenarios, where the bottleneck often shifts from network I/O to the processing logic, specifically when interacting with downstream systems like databases (ClickHouse, Elasticsearch) or calling external APIs.
The current ConsumerBuilder only supports a single-message callback via messageHandler(BiConsumer<Context, Message> messageHandler).
This forces the application to process messages one by one. In scenarios where I need to perform bulk inserts or batch processing to optimize IOPS and transaction overhead, the current API is limiting.
Describe the solution you'd like
I would like to request native support for batch consumption in the Java Client.
Ideally, the ConsumerBuilder could offer a mechanism to receive a batch of messages.
For example, an API similar to this:
Environment environment = Environment.builder().build();
environment.consumerBuilder()
.stream("my-stream")
.batchSize(500) // Trigger when 500 messages are accumulated
.batchTimeout(Duration.ofMillis(100)) // Or trigger when time window elapses
.batchMessageHandler((context, messages) -> { // messages is List<Message>
// Here I can perform bulk insert efficiently
myDatabase.insertAll(messages);
})
.build();Describe alternatives you've considered
Currently, we have to implement a manual "Accumulator/Buffer" pattern on the client side:
- Receive message in the single-message callback.
- Push it into a thread-safe List/Queue.
- Check if the list size reaches a threshold or a timer expires.
- Flush the list to the business logic.
Why this alternative is not ideal:
- Complexity: Every user has to reinvent the wheel (concurrency control, timer management, offset tracking).
- Risk: It increases the risk of message loss or duplication if the buffering logic is not handled correctly during graceful shutdowns.
- Performance: A native implementation within the client library (closer to the network layer/Netty buffer) would likely be more efficient than application-level buffering.
Additional context
Many other streaming clients (like Kafka's poll returning a generic collection, or Spring's batch listeners) support this out of the box. Since RabbitMQ Stream is designed for high throughput, adding batch consumption support would significantly enhance its capability in "Big Data" or "Data Pipeline" use cases.