Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
db8d5d7
[ECO-5426][ECO-5439] Initialize live objects foundation with core int…
sacOO7 Jul 2, 2025
e5be483
[ECO-5426][ECO-5439] Implement object utilities and validation framework
sacOO7 Jul 3, 2025
f8fc694
[ECO-5426][ECO-5439] Establish plugin architecture and live objects core
sacOO7 Jul 4, 2025
968c91b
[ECO-5426][ECO-5439] Implement base live object with comprehensive te…
sacOO7 Jul 7, 2025
b8f3353
[ECO-5426][ECO-5439] Design object message structure with validation …
sacOO7 Jul 8, 2025
2f5d65c
[ECO-5426][ECO-5439] Establish comprehensive test infrastructure
sacOO7 Jul 9, 2025
34c45e2
[ECO-5426][ECO-5439] Implement thread-safe objects pool with lifecycl…
sacOO7 Jul 10, 2025
42f96ba
[ECO-5426][ECO-5439] Create objects manager for sync and operation pr…
sacOO7 Jul 11, 2025
36537cc
[ECO-5426][ECO-5439] Implement sync tracking system with state transi…
sacOO7 Jul 14, 2025
1059f78
[ECO-5426][ECO-5439] Design LiveMap entry system with test fixtures
sacOO7 Jul 15, 2025
05e3517
[ECO-5426][ECO-5439] Implement LiveMap core functionality and operations
sacOO7 Jul 16, 2025
7b97349
[ECO-5426][ECO-5439] Create LiveMap manager for advanced operation pr…
sacOO7 Jul 17, 2025
a664865
[ECO-5426][ECO-5439] Implement LiveCounter with atomic operations
sacOO7 Jul 18, 2025
2ba1e32
[ECO-5426][ECO-5439] Create LiveCounter manager for operation handling
sacOO7 Jul 21, 2025
cba94eb
[ECO-5426][ECO-5439] Complete DefaultLiveObjects integration with tes…
sacOO7 Jul 22, 2025
940059c
[ECO-5426][ECO-5439] Implement JSON serialization with Gson integration
sacOO7 Jul 23, 2025
aba0e2c
[ECO-5426][ECO-5439] Create MessagePack serialization system
sacOO7 Jul 24, 2025
56021a6
[ECO-5426][ECO-5439] Implement message serialization and size validation
sacOO7 Jul 25, 2025
b59ea60
[ECO-5426][ECO-5439] Integrate live objects with realtime channel system
sacOO7 Jul 28, 2025
34449c7
[ECO-5426][ECO-5439] Renamed LiveObjectTest to RealtimeObjectsTest
sacOO7 Jul 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/src/main/java/io/ably/lib/objects/LiveCounter.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,5 @@ public interface LiveCounter {
*/
@NotNull
@Contract(pure = true) // Indicates this method does not modify the state of the object.
Long value();
Double value();
}
12 changes: 12 additions & 0 deletions lib/src/main/java/io/ably/lib/objects/LiveObjectsPlugin.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.ably.lib.objects;

import io.ably.lib.realtime.ChannelState;
import io.ably.lib.types.ProtocolMessage;
import org.jetbrains.annotations.NotNull;

Expand Down Expand Up @@ -30,6 +31,17 @@ public interface LiveObjectsPlugin {
*/
void handle(@NotNull ProtocolMessage message);

/**
* Handles state changes for a specific channel.
* This method is invoked whenever a channel's state changes, allowing the implementation
* to update the LiveObjects instances accordingly based on the new state and presence of objects.
*
* @param channelName the name of the channel whose state has changed.
* @param state the new state of the channel.
* @param hasObjects flag indicates whether the channel has any associated live objects.
*/
void handleStateChange(@NotNull String channelName, @NotNull ChannelState state, boolean hasObjects);

/**
* Disposes of the LiveObjects instance associated with the specified channel name.
* This method removes the LiveObjects instance for the given channel, releasing any
Expand Down
16 changes: 16 additions & 0 deletions lib/src/main/java/io/ably/lib/realtime/ChannelBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,15 @@ private void setState(ChannelState newState, ErrorInfo reason, boolean resumed,
this.reason = stateChange.reason;
}

// cover states other than attached, ChannelState.attached already covered in setAttached
if (liveObjectsPlugin != null && newState!= ChannelState.attached) {
try {
liveObjectsPlugin.handleStateChange(name, newState, false);
} catch (Throwable t) {
Log.e(TAG, "Unexpected exception in LiveObjectsPlugin.handle", t);
}
}

if (newState != ChannelState.attaching && newState != ChannelState.suspended) {
this.retryAttempt = 0;
}
Expand Down Expand Up @@ -439,6 +448,13 @@ private void setAttached(ProtocolMessage message) {
}
return;
}
if (liveObjectsPlugin != null) {
try {
liveObjectsPlugin.handleStateChange(name, ChannelState.attached, message.hasFlag(Flag.has_objects));
} catch (Throwable t) {
Log.e(TAG, "Unexpected exception in LiveObjectsPlugin.handle", t);
}
}
if(state == ChannelState.attached) {
Log.v(TAG, String.format(Locale.ROOT, "Server initiated attach for channel %s", name));
if (!message.hasFlag(Flag.resumed)) { // RTL12
Expand Down
172 changes: 161 additions & 11 deletions live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt
Original file line number Diff line number Diff line change
@@ -1,12 +1,59 @@
package io.ably.lib.objects

import io.ably.lib.realtime.ChannelState
import io.ably.lib.types.Callback
import io.ably.lib.types.ProtocolMessage
import io.ably.lib.util.Log
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.flow.MutableSharedFlow

internal class DefaultLiveObjects(private val channelName: String, private val adapter: LiveObjectsAdapter): LiveObjects {
private val tag = DefaultLiveObjects::class.simpleName
/**
* @spec RTO2 - enum representing objects state
*/
internal enum class ObjectsState {
INITIALIZED,
SYNCING,
SYNCED
}

/**
* Default implementation of LiveObjects interface.
* Provides the core functionality for managing live objects on a channel.
*/
internal class DefaultLiveObjects(internal val channelName: String, internal val adapter: LiveObjectsAdapter): LiveObjects {
private val tag = "DefaultLiveObjects"
/**
* @spec RTO3 - Objects pool storing all live objects by object ID
*/
internal val objectsPool = ObjectsPool(this)

internal var state = ObjectsState.INITIALIZED

/**
* @spec RTO4 - Used for handling object messages and object sync messages
*/
private val objectsManager = ObjectsManager(this)

/**
* Coroutine scope for running sequential operations on a single thread, used to avoid concurrency issues.
*/
private val sequentialScope =
CoroutineScope(Dispatchers.Default.limitedParallelism(1) + CoroutineName(channelName) + SupervisorJob())

/**
* Event bus for handling incoming object messages sequentially.
*/
private val objectsEventBus = MutableSharedFlow<ProtocolMessage>(extraBufferCapacity = UNLIMITED)
private val incomingObjectsHandler: Job

init {
incomingObjectsHandler = initializeHandlerForIncomingObjectMessages()
}

/**
* @spec RTO1 - Returns the root LiveMap object with proper validation and sync waiting
*/
override fun getRoot(): LiveMap {
TODO("Not yet implemented")
}
Expand Down Expand Up @@ -47,18 +94,121 @@ internal class DefaultLiveObjects(private val channelName: String, private val a
TODO("Not yet implemented")
}

fun handle(msg: ProtocolMessage) {
// RTL15b
msg.channelSerial?.let {
if (msg.action === ProtocolMessage.Action.`object`) {
Log.v(tag, "Setting channel serial for channelName: $channelName, value: ${msg.channelSerial}")
adapter.setChannelSerial(channelName, msg.channelSerial)
/**
* Handles a ProtocolMessage containing proto action as `object` or `object_sync`.
* @spec RTL1 - Processes incoming object messages and object sync messages
*/
internal fun handle(protocolMessage: ProtocolMessage) {
// RTL15b - Set channel serial for OBJECT messages
adapter.setChannelSerial(channelName, protocolMessage)

if (protocolMessage.state == null || protocolMessage.state.isEmpty()) {
Log.w(tag, "Received ProtocolMessage with null or empty objects, ignoring")
return
}

objectsEventBus.tryEmit(protocolMessage)
}

/**
* Initializes the handler for incoming object messages and object sync messages.
* Processes the messages sequentially to ensure thread safety and correct order of operations.
*
* @spec OM2 - Populates missing fields from parent protocol message
*/
private fun initializeHandlerForIncomingObjectMessages(): Job {
return sequentialScope.launch {
objectsEventBus.collect { protocolMessage ->
// OM2 - Populate missing fields from parent
val objects = protocolMessage.state.filterIsInstance<ObjectMessage>()
.mapIndexed { index, objMsg ->
objMsg.copy(
connectionId = objMsg.connectionId ?: protocolMessage.connectionId, // OM2c
timestamp = objMsg.timestamp ?: protocolMessage.timestamp, // OM2e
id = objMsg.id ?: (protocolMessage.id + ':' + index) // OM2a
)
}

try {
when (protocolMessage.action) {
ProtocolMessage.Action.`object` -> objectsManager.handleObjectMessages(objects)
ProtocolMessage.Action.object_sync -> objectsManager.handleObjectSyncMessages(
objects,
protocolMessage.channelSerial
)
else -> Log.w(tag, "Ignoring protocol message with unhandled action: ${protocolMessage.action}")
}
} catch (exception: Exception) {
// Skip current message if an error occurs, don't rethrow to avoid crashing the collector
Log.e(tag, "Error handling objects message with protocolMsg id ${protocolMessage.id}", exception)
}
}
}
}

fun dispose() {
// Dispose of any resources associated with this LiveObjects instance
// For example, close any open connections or clean up references
internal fun handleStateChange(state: ChannelState, hasObjects: Boolean) {
sequentialScope.launch {
when (state) {
ChannelState.attached -> {
Log.v(tag, "Objects.onAttached() channel=$channelName, hasObjects=$hasObjects")

// RTO4a
val fromInitializedState = this@DefaultLiveObjects.state == ObjectsState.INITIALIZED
if (hasObjects || fromInitializedState) {
// should always start a new sync sequence if we're in the initialized state, no matter the HAS_OBJECTS flag value.
// this guarantees we emit both "syncing" -> "synced" events in that order.
objectsManager.startNewSync(null)
}

// RTO4b
if (!hasObjects) {
// if no HAS_OBJECTS flag received on attach, we can end sync sequence immediately and treat it as no objects on a channel.
// reset the objects pool to its initial state, and emit update events so subscribers to root object get notified about changes.
objectsPool.resetToInitialPool(true) // RTO4b1, RTO4b2
objectsManager.clearSyncObjectsDataPool() // RTO4b3
objectsManager.clearBufferedObjectOperations() // RTO4b5
// defer the state change event until the next tick if we started a new sequence just now due to being in initialized state.
// this allows any event listeners to process the start of the new sequence event that was emitted earlier during this event loop.
objectsManager.endSync(fromInitializedState) // RTO4b4
}
}
ChannelState.detached,
ChannelState.failed -> {
// do not emit data update events as the actual current state of Objects data is unknown when we're in these channel states
objectsPool.clearObjectsData(false)
objectsManager.clearSyncObjectsDataPool()
}

else -> {
// No action needed for other states
}
}
}
}

/**
* Changes the state and emits events.
*
* @spec RTO2 - Emits state change events for syncing and synced states
*/
internal fun stateChange(newState: ObjectsState, deferEvent: Boolean) {
if (state == newState) {
return
}

state = newState
Log.v(tag, "Objects state changed to: $newState")

// TODO: Emit state change events
}

// Dispose of any resources associated with this LiveObjects instance
fun dispose(reason: String) {
val cancellationError = CancellationException("Objects disposed for channel $channelName, reason: $reason")
incomingObjectsHandler.cancel(cancellationError) // objectsEventBus automatically garbage collected when collector is cancelled
objectsPool.dispose()
objectsManager.dispose()
// Don't cancel sequentialScope (needed in public methods), just cancel ongoing coroutines
sequentialScope.coroutineContext.cancelChildren(cancellationError)
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.ably.lib.objects

import io.ably.lib.realtime.ChannelState
import io.ably.lib.types.ProtocolMessage
import java.util.concurrent.ConcurrentHashMap

Expand All @@ -16,14 +17,18 @@ public class DefaultLiveObjectsPlugin(private val adapter: LiveObjectsAdapter) :
liveObjects[channelName]?.handle(msg)
}

override fun handleStateChange(channelName: String, state: ChannelState, hasObjects: Boolean) {
liveObjects[channelName]?.handleStateChange(state, hasObjects)
}

override fun dispose(channelName: String) {
liveObjects[channelName]?.dispose()
liveObjects[channelName]?.dispose("Channel has ben released using channels.release()")
liveObjects.remove(channelName)
}

override fun dispose() {
liveObjects.values.forEach {
it.dispose()
it.dispose("AblyClient has been closed using client.close()")
}
liveObjects.clear()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ internal enum class ErrorCode(public val code: Int) {
BadRequest(40_000),
InternalError(50_000),
MaxMessageSizeExceeded(40_009),
InvalidObject(92_000),
// LiveMap specific error codes
MapKeyShouldBeString(40_003),
MapValueDataTypeUnsupported(40_013),
}

internal enum class HttpStatusCode(public val code: Int) {
Expand Down
7 changes: 7 additions & 0 deletions live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ internal fun LiveObjectsAdapter.ensureMessageSizeWithinLimit(objectMessages: Arr
}
}

internal fun LiveObjectsAdapter.setChannelSerial(channelName: String, protocolMessage: ProtocolMessage) {
if (protocolMessage.action != ProtocolMessage.Action.`object`) return
val channelSerial = protocolMessage.channelSerial
if (channelSerial.isNullOrEmpty()) return
setChannelSerial(channelName, channelSerial)
}

internal class Binary(val data: ByteArray) {
override fun equals(other: Any?): Boolean {
if (this === other) return true
Expand Down
62 changes: 62 additions & 0 deletions live-objects/src/main/kotlin/io/ably/lib/objects/ObjectId.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package io.ably.lib.objects

import io.ably.lib.objects.type.ObjectType

internal class ObjectId private constructor(
internal val type: ObjectType,
private val hash: String,
private val timestampMs: Long
) {
/**
* Converts ObjectId to string representation.
*/
override fun toString(): String {
return "${type.value}:$hash@$timestampMs"
}

companion object {
/**
* Creates ObjectId instance from hashed object id string.
*/
fun fromString(objectId: String): ObjectId {
if (objectId.isEmpty()) {
throw objectError("Invalid object id: $objectId")
}

// Parse format: type:hash@msTimestamp
val parts = objectId.split(':')
if (parts.size != 2) {
throw objectError("Invalid object id: $objectId")
}

val (typeStr, rest) = parts

val type = when (typeStr) {
"map" -> ObjectType.Map
"counter" -> ObjectType.Counter
else -> throw objectError("Invalid object type in object id: $objectId")
}

val hashAndTimestamp = rest.split('@')
if (hashAndTimestamp.size != 2) {
throw objectError("Invalid object id: $objectId")
}

val hash = hashAndTimestamp[0]

if (hash.isEmpty()) {
throw objectError("Invalid object id: $objectId")
}

val msTimestampStr = hashAndTimestamp[1]

val msTimestamp = try {
msTimestampStr.toLong()
} catch (e: NumberFormatException) {
throw objectError("Invalid object id: $objectId", e)
}

return ObjectId(type, hash, msTimestamp)
}
}
}
Loading
Loading