-
Notifications
You must be signed in to change notification settings - Fork 16
Closed
Labels
invalidThis doesn't seem rightThis doesn't seem right
Description
As per the blog https://blog.rabbitmq.com/posts/2021/09/rabbitmq-streams-offset-tracking/
"
The application code handles the tracking explicitly with Context#storeOffset() .
The Consumer#store(long) method is another possibility to store an offset
"
The Consumer#store(long) method is supposed to change the offset, However it doesn't store the new offset and there is no exception thrown.
environment
.consumerBuilder()
.stream(stream)
.name("consumer_" + stream)
.manualTrackingStrategy()
.checkInterval(Duration.ofSeconds(2))
.builder()
.offset(OffsetSpecification.first())
.messageHandler((context, message) -> {
long messageContent = Long.parseLong(new String(message.getBodyAsBinary()));
System.out.println(" message : " + messageContent);
if (someCondition()) {
// moving the offset is not working
context.consumer().store(OffsetSpecification.next().getOffset());
}
})
.build();
Metadata
Metadata
Assignees
Labels
invalidThis doesn't seem rightThis doesn't seem right