Skip to content

Storing streams offsets using Consumer#store(long) is not working (silently)  #77

@aaltihami

Description

@aaltihami

As per the blog https://blog.rabbitmq.com/posts/2021/09/rabbitmq-streams-offset-tracking/

"
The application code handles the tracking explicitly with Context#storeOffset() .
The Consumer#store(long) method is another possibility to store an offset
"

The Consumer#store(long) method is supposed to change the offset, However it doesn't store the new offset and there is no exception thrown.

environment
	.consumerBuilder()
	.stream(stream)
	.name("consumer_" + stream)
	.manualTrackingStrategy()
	.checkInterval(Duration.ofSeconds(2))
	.builder()
	.offset(OffsetSpecification.first())
	.messageHandler((context, message) -> {
		long messageContent = Long.parseLong(new String(message.getBodyAsBinary()));
		System.out.println(" message : " + messageContent);
		if (someCondition()) {
		    // moving the offset is not working 
		    context.consumer().store(OffsetSpecification.next().getOffset());
		}
	})
	.build();

Metadata

Metadata

Assignees

No one assigned

    Labels

    invalidThis doesn't seem right

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions