diff --git a/README.md b/README.md index 1b825e5..08fbb89 100644 --- a/README.md +++ b/README.md @@ -98,6 +98,7 @@ Stream Android Core uses annotations to distinguish between stable public APIs a - [Batcher](#batcher) - [Debouncer](#debouncer) - [Throttler](#throttler) + - [Event Aggregator](#event-aggregator) - [Threading Utilities](#threading-utilities) - [Token Management](#token-management) - [WebSocket Connections](#websocket-connections) @@ -361,6 +362,24 @@ enum class Retention { - `AUTO_REMOVE`: UI components, fragments, activities (automatic cleanup) - `KEEP_UNTIL_CANCELLED`: Long-lived services, singletons (explicit lifecycle) +#### Handling Aggregated Events + +When the [Event Aggregator](#event-aggregator) is active, listeners registered via `StreamSubscriptionManager` may receive `StreamAggregatedEvent` instances during traffic spikes in addition to individual events. Your listener **must** handle both: + +```kotlin +subscriptionManager.forEach { listener -> + when (event) { + is StreamAggregatedEvent<*> -> { + // Batch of events in arrival order — process sequentially + event.events.forEach { listener.onEvent(it) } + } + else -> listener.onEvent(event) + } +} +``` + +See the [Event Aggregator](#event-aggregator) section for full details on ordering guarantees and atomic state updates. + --- ### Serial Processing Queue @@ -636,6 +655,87 @@ throttler.reset() --- +### Event Aggregator + +Adaptive event aggregator that switches between individual and batched event delivery based on traffic volume. + +#### How It Works + +During normal traffic, WebSocket events are deserialized and dispatched one at a time. During spikes, the aggregator collects events within a time window and delivers them as a single `StreamAggregatedEvent` — a flat list preserving arrival order. + +```kotlin +import io.getstream.android.core.api.processing.StreamEventAggregator +import io.getstream.android.core.api.processing.StreamEventAggregationPolicy +import io.getstream.android.core.api.processing.StreamAggregatedEvent + +val aggregator = StreamEventAggregator( + scope = scope, + policy = StreamEventAggregationPolicy.from( + typeExtractor = { raw -> extractType(raw) }, + deserializer = { raw -> Result.success(deserialize(raw)) }, + aggregationThreshold = 10, // Aggregate when >= 10 events accumulated + maxWindowMs = 500, // Max collection window before flushing + ), +) + +aggregator.onEvent { event -> + when (event) { + is StreamAggregatedEvent<*> -> { + // Spike — process all events sequentially in one atomic state update + event.events.forEach { singleEvent -> + handleEvent(singleEvent) + } + } + is MyEvent -> { + // Normal traffic — single event + handleEvent(event) + } + } +} + +aggregator.start() +``` + +#### Event Ordering + +`StreamAggregatedEvent.events` is a **flat `List` in arrival order** — not grouped by type. This is critical for correctness: events like "reaction added" must be processed before "reaction removed" for the same entity. Grouping by type key would lose this inter-type ordering. + +Product SDKs should process the list sequentially and apply all updates in one atomic state mutation, so UI recomposes once per batch instead of once per event. + +#### Handling Aggregated Events in Subscriptions + +When subscribing to `StreamClient` events via `StreamClientListener`, your `onEvent` handler may receive either an individual event or a `StreamAggregatedEvent`. Product SDKs must handle both: + +```kotlin +val listener = object : StreamClientListener { + override fun onEvent(event: Any) { + when (event) { + is StreamAggregatedEvent<*> -> { + // Apply all events in one state transaction + val events = event.events.filterIsInstance() + stateStore.atomicUpdate { state -> + events.fold(state) { acc, e -> applyEvent(acc, e) } + } + } + is MyProductEvent -> { + // Single event — apply directly + stateStore.update { state -> applyEvent(state, event) } + } + } + } +} +``` + +Failing to handle `StreamAggregatedEvent` means aggregated events are silently ignored during traffic spikes. + +#### Use Cases + +- WebSocket event delivery (automatic — built into `StreamSocketSession`) +- High-frequency event streams (chat rooms with many participants) +- Any scenario where burst traffic causes excessive UI recomposition + +--- + ### Threading Utilities Safe cross-thread execution with timeout protection.