Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/*
* Copyright 2026 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.adk.summarizer;

import static com.google.common.base.Preconditions.checkArgument;

import com.google.adk.events.Event;
import com.google.adk.events.EventCompaction;
import com.google.adk.sessions.BaseSessionService;
import com.google.adk.sessions.Session;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Maybe;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.ListIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class performs event compaction by retaining the tail of the event stream.
*
* <ul>
* <li>Keeps the {@code retentionSize} most recent events raw.
* <li>Compacts all events that never compacted and older than the retained tail, including the
* most recent compaction event, into a new summary event.
* <li>The new summary event is generated by the {@link BaseEventSummarizer}.
* <li>Appends this new summary event to the end of the event stream.
* </ul>
*
* <p>This compactor produces a rolling summary. Each new compaction event includes the content of
* the previous compaction event (if any) along with new events, effectively superseding all prior
* compactions.
*/
public final class TailRetentionEventCompactor implements EventCompactor {

private static final Logger logger = LoggerFactory.getLogger(TailRetentionEventCompactor.class);

private final BaseEventSummarizer summarizer;
private final int retentionSize;

public TailRetentionEventCompactor(BaseEventSummarizer summarizer, int retentionSize) {
this.summarizer = summarizer;
this.retentionSize = retentionSize;
}

@Override
public Completable compact(Session session, BaseSessionService sessionService) {
checkArgument(summarizer != null, "Missing BaseEventSummarizer for event compaction");
logger.debug("Running tail retention event compaction for session {}", session.id());

return Completable.fromMaybe(
getCompactionEvents(session.events())
.flatMap(summarizer::summarizeEvents)
.flatMapSingle(e -> sessionService.appendEvent(session, e)));
}

/**
* Identifies events to be compacted based on the tail retention strategy.
*
* <p>This method iterates backwards through the event list to find the most recent compaction
* event (if any) and collects all uncompacted events that occurred after the range covered by
* that compaction. It then applies the retention policy, excluding the most recent {@code
* retentionSize} events from being compacted.
*
* <p><b>Basic Scenario:</b>
*
* <ul>
* <li>Events: E1, E2, E3, E4, E5 (Chronological order)
* <li>Retention Size: 2
* <li>Action: Compaction is triggered. The compactor identifies E1, E2, and E3 as eligible
* since E4, E5 need to be retained.
* <li>Result: E1, E2, E3 are compacted into C1.
* <li>Event stream after compaction: E1, E2, E3, E4, E5, C1. (Compaction event is appended in
* the end.)
* </ul>
*
* <p><b>Advanced Scenario (Handling Gaps):</b>
*
* <p>Consider an edge case where retention size is 3. Event E4 appears before the last compaction
* event (C2) and even the one prior (C1), but remains uncompacted and must be included in the
* third compaction (C3).
*
* <ul>
* <li>T=1: E1
* <li>T=2: E2
* <li>T=3: E3
* <li>T=4: E4
* <li>T=5: C1 (Covers T=1). Generated when getCompactionEvents returned <i>List: E1</i>. E2,
* E3, E4 were preserved.
* <li>T=6: E6
* <li>T=7: E7
* <li>T=8: C2 (Covers T=1 to T=3; starts at T=1 because it includes C1). Generated when
* getCompactionEvents returned <i>List: C1, E2, E3</i>. E4, E6, E7 were preserved.
* <li>T=9: E9.
* </ul>
*
* <p><b>Execution with Retention = 3:</b>
*
* <ol>
* <li>The method scans backward: E9, C2, E7, E6, C1, E4...
* <li><b>C2</b> is identified as the most recent compaction event (end timestamp T=3).
* <li><b>E9, E7, E6</b> are collected as they are newer than T=3.
* <li><b>C1</b> is ignored as we only care about the boundary set by the latest compaction.
* <li><b>E4</b> (T=4) is collected because it is newer than T=3.
* <li>Scanning stops at E3 as it is covered by C2 (timestamp <= T=3).
* <li>The initial list of events to summarize: <b>[E9, E7, E6, E4]</b>.
* <li>After appending the compaction event C2, the list becomes: <b>[E9, E7, E6, E4, C2]</b>
* <li>Reversing the list: <b>[C2, E4, E6, E7, E9]</b>.
* <li>Applying retention (keep last 3): <b>E6, E7, E9</b> are removed from the summary list.
* <li><b>Final Output:</b> {@code [C2, E4]}. E4 and the previous summary C2 will be compacted
* together. The new compaction event will cover the range from the start of the included
* compaction event (C2, T=1) to the end of the new events (E4, T=4).
* </ol>
*/
private Maybe<List<Event>> getCompactionEvents(List<Event> events) {
long compactionEndTimestamp = Long.MIN_VALUE;
Event lastCompactionEvent = null;
List<Event> eventsToSummarize = new ArrayList<>();

// Iterate backwards from the end of the window to summarize.
// We use a single loop to:
// 1. Collect all raw events that happened after the latest compaction.
// 2. Identify the latest compaction event to establish the stop condition (boundary).
ListIterator<Event> iter = events.listIterator(events.size());
while (iter.hasPrevious()) {
Event event = iter.previous();

if (!isCompactEvent(event)) {
// Only include events that are strictly after the last compaction range.
if (event.timestamp() > compactionEndTimestamp) {
eventsToSummarize.add(event);
continue;
} else {
// Exit early if we have reached the last event of last compaction range.
break;
}
}

EventCompaction compaction = event.actions().compaction().orElse(null);
// We use the most recent compaction event to define the time boundary. Any subsequent (older)
// compaction events are ignored.
if (lastCompactionEvent == null) {
compactionEndTimestamp = compaction.endTimestamp();
lastCompactionEvent = event;
}
}

// If there are not enough events to summarize, we can return early.
if (eventsToSummarize.size() <= retentionSize) {
return Maybe.empty();
}

// Add the last compaction event to the list of events to summarize.
// This is to ensure that the last compaction event is included in the summary.
if (lastCompactionEvent != null) {
EventCompaction compaction = lastCompactionEvent.actions().compaction().get();
eventsToSummarize.add(
lastCompactionEvent.toBuilder()
.content(compaction.compactedContent())
// Use the start timestamp so that the new summary covers the entire range.
.timestamp(compaction.startTimestamp())
.build());
}

Collections.reverse(eventsToSummarize);

// Apply retention: keep the most recent 'retentionSize' events out of the summary.
// We do this by removing them from the list of events to be summarized.
eventsToSummarize
.subList(eventsToSummarize.size() - retentionSize, eventsToSummarize.size())
.clear();
return Maybe.just(eventsToSummarize);
}

private static boolean isCompactEvent(Event event) {
return event.actions() != null && event.actions().compaction().isPresent();
}
}
Loading