This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
Stream Android Core is an internal foundational library that powers all of Stream's Android SDKs (Chat, Video, Feeds). It provides shared infrastructure for real-time connectivity, authentication, state management, and reliability.
Key characteristics:
- Not for public consumption - used only by Stream product SDKs
- Uses explicit API mode (
kotlin { explicitApi() }) - Three visibility levels:
@StreamPublishedApi(stable, SDK-exposed),@StreamInternalApi(may change),@StreamDelicateApi(advanced/dangerous) - Result-based error handling throughout (
Result<T>) - Thread-safe APIs with coroutine-based concurrency
- Moshi for JSON serialization with KSP code generation
# Run all unit tests for core library
./gradlew :stream-android-core:test
# Run tests with coverage report
./gradlew koverHtmlReportCoverage
# Coverage report: stream-android-core/build/reports/kover/htmlCoverage/index.html
# Run a specific test class
./gradlew :stream-android-core:test --tests "io.getstream.android.core.internal.processing.StreamRetryProcessorImplTest"
# Run a specific test method
./gradlew :stream-android-core:test --tests "io.getstream.android.core.internal.processing.StreamRetryProcessorImplTest.testExponentialBackoff"
# Run instrumented tests on connected device
./gradlew :stream-android-core:connectedDebugAndroidTest# Build the library
./gradlew :stream-android-core:build
# Build all modules
./gradlew build
# Assemble release AAR
./gradlew :stream-android-core:assembleRelease
# Output: stream-android-core/build/outputs/aar/stream-android-core-release.aar
# Clean build artifacts
./gradlew clean# Run detekt (static analysis) - auto-corrects issues
./gradlew detekt
# Run spotless (code formatting with ktfmt)
./gradlew spotlessApply
# Run all checks (tests + lint + detekt)
./gradlew check
# Run lint
./gradlew :stream-android-core:lint# Print all artifacts that will be published
./gradlew printAllArtifacts
# Publish to Maven Central (requires credentials)
./gradlew publishStreamClient is the main interface for establishing connections to Stream services. Factory function location: stream-android-core/src/main/java/io/getstream/android/core/api/StreamClient.kt
Implementation: stream-android-core/src/main/java/io/getstream/android/core/internal/StreamClientImpl.kt
Key operations:
suspend fun connect(data: ConnectUserData): Result<StreamConnectionState.Connected>- Opens socket, authenticates, starts monitoringsuspend fun disconnect(): Result<Unit>- Closes socket, stops monitoring, cleans upval connectionState: StateFlow<StreamConnectionState>- Observable connection state- Uses single-flight processor to prevent concurrent connect/disconnect
Location: stream-android-core/src/main/java/io/getstream/android/core/api/model/connection/StreamConnectionState.kt
States:
sealed class StreamConnectionState {
object Idle // Initial state, no connection attempt
sealed class Connecting {
data class Opening(userId: String) // Socket opening
data class Authenticating(userId: String) // Sending auth request
}
data class Connected(
user: StreamConnectedUser,
connectionId: String
)
data class Disconnected(cause: Throwable? = null)
}State Transitions:
-
Idle → Connecting.Opening
- Trigger: User calls
streamClient.connect(data) - Location:
StreamClientImpl.kt:75-157,StreamSocketSession.kt:311 - Action: Opens WebSocket with HTTP upgrade request
- Trigger: User calls
-
Opening → Authenticating
- Trigger: Socket receives HTTP 101 response
- Location:
StreamSocketSession.kt:324-366 - Action: Sends
StreamWSAuthMessageRequestwith JWT token and user details
-
Authenticating → Connected
- Trigger: Receives
StreamClientConnectedEvent(type: "connection.ok") - Location:
StreamSocketSession.kt:376-398 - Action: Starts health monitor, emits Connected state
- Trigger: Receives
-
Connected → Disconnected
- Triggers:
- Network lost:
StreamNetworkMonitordetects connectivity loss - App backgrounded:
StreamLifecycleMonitordetects background state - Health check timeout: No events received within liveness threshold (60s)
- Explicit disconnect: User calls
streamClient.disconnect() - Socket error: WebSocket failure/close callback
- Network lost:
- Location:
StreamSocketSession.kt:100-119,StreamConnectionRecoveryEvaluatorImpl.kt:68-70
- Triggers:
-
Any → Disconnected
- Trigger: Socket failure callback (
onFailure,onClosed) - Location:
StreamSocketSession.kt:100-119
- Trigger: Socket failure callback (
Recovery Logic:
StreamConnectionRecoveryEvaluatorImpl.kt:44-99evaluates whether to auto-reconnect- Reconnects if: previously connected AND (network became available OR app returned to foreground with network)
- Disconnects if: connected/connecting AND (network unavailable OR app backgrounded)
StreamClient (Main Interface)
├── StreamSocketSession (WebSocket coordinator)
│ ├── StreamWebSocket (OkHttp WebSocket wrapper)
│ ├── StreamHealthMonitor (Heartbeat: 25s interval, 60s timeout)
│ └── StreamBatcher (10 items, 100ms initial, 1s max delay)
├── StreamTokenManager (Auth token lifecycle)
│ ├── StreamTokenProvider (User-implemented token fetcher)
│ └── StreamSingleFlightProcessor (Token refresh deduplication)
├── StreamNetworkAndLifeCycleMonitor (Combined state)
│ ├── StreamNetworkMonitor (ConnectivityManager callbacks)
│ └── StreamLifecycleMonitor (ProcessLifecycleOwner observer)
├── StreamConnectionRecoveryEvaluator (Reconnect heuristics)
├── StreamWatcher<T> (Watch registry & rewatch coordinator)
│ ├── StateFlow<StreamConnectionState> (Observes connection state)
│ └── StreamSubscriptionManager<StreamRewatchListener<T>> (Rewatch listener registry)
└── StreamSubscriptionManager<StreamClientListener> (Event distribution)
Purpose: Manages a registry of watched resources (channels/conversations) and automatically triggers re-watching when the WebSocket connection state changes.
Location:
- Interface:
stream-android-core/src/main/java/io/getstream/android/core/api/watcher/StreamWatcher.kt - Implementation:
stream-android-core/src/main/java/io/getstream/android/core/internal/watcher/StreamWatcherImpl.kt - Listener:
stream-android-core/src/main/java/io/getstream/android/core/api/watcher/StreamRewatchListener.kt
Key Concept: When the WebSocket reconnects (network recovery, app resume), all active watches must be re-established on the server. StreamWatcher<T> maintains which identifiers of type T (channel IDs, conversation IDs, etc.) are currently watched and notifies listeners on every Connected state transition. The generic type parameter allows flexibility - use String for channel IDs, custom data classes, or any other identifier type.
Core Operations:
// Add an item to the watch registry
fun watch(item: T): Result<T>
// Remove an item from the watch registry
fun stopWatching(item: T): Result<T>
// Clear all watched entries
fun clear(): Result<Unit>
// Subscribe to rewatch notifications
fun subscribe(
listener: StreamRewatchListener<T>,
options: StreamSubscriptionManager.Options
): Result<StreamSubscription>
// Lifecycle management (StreamStartableComponent)
fun start(): Result<Unit>
fun stop(): Result<Unit>Factory Function:
fun <T> StreamWatcher(
scope: CoroutineScope,
logger: StreamLogger,
connectionState: StateFlow<StreamConnectionState>
): StreamWatcher<T>The factory creates the watcher with an internal StreamSubscriptionManager<StreamRewatchListener<T>> automatically - product SDKs don't need to manage this.
Usage Flow:
- Create watcher with connection state flow:
val watcher = StreamWatcher<String>(scope, logger, streamClient.connectionState) - Product SDK registers a rewatch listener:
watcher.subscribe(StreamRewatchListener { ids, connectionId -> ... }) - Call
watcher.start()to begin monitoring connection state changes - Product SDK watches channels:
watcher.watch("messaging:general") - On
StreamConnectionState.Connectedevent, watcher invokes all listeners with complete identifier list AND the current connectionId - Product SDK re-establishes server-side watches for each identifier using the provided connectionId
Complete Example:
// Create watcher for channel IDs
val watcher = StreamWatcher<String>(
scope = CoroutineScope(SupervisorJob() + Dispatchers.Default),
logger = logger,
connectionState = streamClient.connectionState
)
// Register rewatch listener - onRewatch is suspend!
watcher.subscribe(
listener = StreamRewatchListener { channelIds, connectionId ->
// Directly call suspend API functions - no need to launch!
channelIds.forEach { channelId ->
channelApi.watch(channelId, connectionId) // suspend function
}
},
options = StreamSubscriptionManager.Options()
).getOrThrow()
// Start monitoring
watcher.start().getOrThrow()
// Watch channels as users view them
watcher.watch("messaging:general")
watcher.watch("messaging:random")
// When done
watcher.stop()Implementation Details:
- Generic type parameter: Allows watching any type
T- common usage isStringfor channel IDs, but can be custom data classes - StateFlow observation: Watcher observes
StateFlow<StreamConnectionState>directly, no intermediate subscription manager needed- Safer: Only receives connection state, no access to full StreamClient APIs
- Simpler: Factory creates internal subscription manager automatically
- Testable: Easy to test with
MutableStateFlow - Standalone: Truly independent component with minimal dependencies
- Suspend callback:
StreamRewatchListener.onRewatchis a suspend function- Product SDKs can directly call suspend API functions (e.g.,
channelApi.watch()) - No need to launch coroutines manually in the callback
- Sequential execution: Callbacks are called one at a time in order
- Clean API: Natural flow for making API calls during reconnection
- Product SDKs can directly call suspend API functions (e.g.,
- Thread-safe: Uses
ConcurrentHashMap<T, Unit>for the registry (line 55 inStreamWatcherImpl.kt) - Thread-safe lifecycle:
start()andstop()usesynchronizedblocks to prevent race conditions when called concurrently (lines 62-73, 104-110)- Prevents multiple collection jobs from being created
- Truly idempotent: checks
collectionJob?.isActive == trueinstead of just null (line 63) - Safe to call
start()multiple times - returns success immediately if already started - After
stop(), callingstart()creates a new collection job - Concurrent
start()andstop()calls are handled safely
- Sequential execution: StateFlow collector processes states sequentially; within each state, listeners are invoked sequentially (lines 75-102)
- Error handling: Exceptions from rewatch callbacks are caught with
runCatchingCancellableand logged; one failing callback doesn't prevent others from executing (lines 91-96) - Idempotent: Multiple
watch()calls with the same identifier only maintain one entry - Only triggers on
Connectedstate when registry is non-empty (line 77) - Connection ID extracted from
Connectedstate and passed to all listeners (line 79)
Test Coverage:
- Location:
stream-android-core/src/test/java/io/getstream/android/core/internal/watcher/StreamWatcherImplTest.kt - 33 comprehensive test cases covering watch operations, lifecycle, state changes, error handling, concurrency (including concurrent start/stop), and connectionId verification
- Tests use MutableStateFlow to directly emit connection state changes, verifying watcher responds correctly
- Concurrency tests verify thread-safety of concurrent
start()andstop()calls - Idempotency test verifies
start()afterstop()creates a new active collection job - 100% instruction/branch/line coverage (verified via Kover)
Location: stream-android-core/src/main/java/io/getstream/android/core/internal/processing/StreamBatcherImpl.kt:160-173
batchSize: Int = 10 // Max items before forced flush
initialDelayMs: Long = 100 // Initial debounce window
maxDelayMs: Long = 1_000 // Maximum debounce window
autoStart: Boolean = true // Auto-start on first item
channelCapacity: Int = UNLIMITED // Buffer capacityAdaptive window: Doubles delay if batch was full, resets to initial if not (lines 120-125)
Location: stream-android-core/src/main/java/io/getstream/android/core/internal/socket/monitor/StreamHealthMonitorImpl.kt:39-49
interval: Long = 25_000 // Send health check every 25 seconds
livenessThreshold: Long = 60_000 // Mark unhealthy after 60s without ackHealth check event: StreamClientConnectedEvent echoed back to server (line 212-227 in StreamSocketSession.kt)
Location: stream-android-core/src/main/java/io/getstream/android/core/api/model/retry/StreamRetryPolicy.kt
Exponential backoff (lines 71-92):
minRetries: Int = 1
maxRetries: Int = 5
backoffStepMillis: Long = 250 // 0ms → 250ms → 500ms → 1000ms...
maxBackoffMillis: Long = 15_000 // Cap at 15 seconds
initialDelayMillis: Long = 0
giveUpFunction: (attempt, error) -> Boolean = { _, _ -> false }Linear backoff (lines 111-130):
minRetries: Int = 1
maxRetries: Int = 5
initialDelayMillis: Long = 1_000 // 1s → 2s → 3s → 4s...
maxDelayMillis: Long = 30_000 // Cap at 30 secondsLocation: stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamSocketConfig.kt:31-98
url: String // WebSocket URL (wss://...)
apiKey: StreamApiKey // API authentication key
authType: String // "jwt" or "anonymous"
clientInfoHeader: StreamHttpClientInfoHeader // X-Stream-Client headerOkHttp interceptors (automatically added if automaticInterceptors = true):
StreamApiKeyInterceptor- Adds?api_key=<key>query parameterStreamAuthInterceptor- Adds Authorization header, handles token refreshStreamClientInfoInterceptor- AddsX-Stream-ClientheaderStreamConnectionIdInterceptor- Adds?connection_id=<id>if availableStreamEndpointErrorInterceptor- Parses error responses into exceptions
Step-by-step (StreamSocketSession.connect() lines 182-462):
-
Register listeners (lines 314-454)
- Permanent event listener for ongoing messages
- Temporary handshake listener for auth response
-
Emit Opening state (line 311)
notifyState(StreamConnectionState.Connecting.Opening(userId))
-
Open WebSocket (line 457)
- Factory creates HTTP upgrade request:
StreamWebSocketFactoryImpl.kt:36-54 - Headers included:
?api_key=<key>stream-auth-type: jwtX-Stream-Client: stream-android-core/<version>/<platform>
- Factory creates HTTP upgrade request:
-
Receive HTTP 101 response (lines 324-366)
- Emit Authenticating state
- Build
StreamWSAuthMessageRequest:StreamWSAuthMessageRequest( products = ["chat", "messaging"], // Product list token = "<JWT>", // From TokenManager userDetails = StreamConnectUserDetailsRequest( id, name, image, language, invisible, custom ) )
- Serialize and send via WebSocket (lines 345-356)
-
Receive authentication response (lines 369-415)
-
Success:
StreamClientConnectedEvent(type: "connection.ok")- Extract
connectionIdandme(user data) - Store event for health checks (line 397)
- Start health monitor (line 288)
- Emit Connected state (line 289)
- Resume coroutine with success (line 291)
- Extract
-
Failure:
StreamClientConnectionErrorEvent(type: "connection.error")- Contains
StreamEndpointErrorDatawith error details - Emit Disconnected state
- Resume coroutine with failure (line 405)
- Contains
-
-
Token refresh on auth failure (lines 180-201 in
StreamClientImpl.kt)val response = socketSession.connect(data) .onTokenError { error, code -> tokenManager.invalidate() val refreshed = tokenManager.refresh().getOrThrow() socketSession.connect(data.copy(token = refreshed.rawValue)) }
Token error codes:
40- Token signature invalid41- Token expired42- Token revoked
These codes come from StreamEndpointErrorData.code, not HTTP status codes. Check location: StreamAuthInterceptor.kt:119
StreamEndpointErrorData (API errors from backend):
code: Int // Error code (40, 41, 42 for token errors)
duration: String? // Request processing time
message: String? // Human-readable message
moreInfo: String? // Documentation URL
statusCode: Int? // HTTP status code
details: List<Int>? // Additional error detail codes
unrecoverable: Boolean? // Cannot retry flag
exceptionFields: Map<String, String>? // Extra contextStreamEndpointException - Wraps API errors:
message: String
cause: Throwable?
apiError: StreamEndpointErrorData?StreamClientException - Client-level errors:
- Socket health failure:
"Socket did not receive any events."(line 231 inStreamSocketSession.kt) - Message drop:
"Failed to offer message to debounce processor"(line 90-91)
IOException - Socket closure:
IOException("Socket closed. Code: $code, Reason: $reason")1. onTokenError Extension (StreamClientImpl.kt:193-200):
suspend fun <T> Result<T>.onTokenError(
handler: suspend (error: StreamEndpointErrorData, code: Int) -> Result<T>
): Result<T> = fold(
onSuccess = { Result.success(it) },
onFailure { error ->
if (error is StreamEndpointException && error.apiError?.code in listOf(40, 41, 42)) {
handler(error.apiError!!, error.apiError!!.code)
} else {
Result.failure(error)
}
}
)2. HTTP Interceptor Token Refresh (StreamAuthInterceptor.kt:65-111):
override fun intercept(chain: Interceptor.Chain): Response {
val token = runBlocking { tokenManager.loadIfAbsent() }
val authed = original.withAuthHeaders(authType, token.rawValue)
val first = chain.proceed(authed)
// Check for token error
if (!first.isSuccessful && isTokenInvalidErrorCode(errorCode)) {
tokenManager.invalidate()
val refreshed = runBlocking { tokenManager.refresh() }
return chain.proceed(retried) // Single retry
}
return first
}Note: Uses runBlocking because OkHttp interceptors are synchronous.
3. Recovery Evaluator (StreamConnectionRecoveryEvaluatorImpl.kt:44-99):
fun evaluate(snapshot: StateSnapshot): Recovery? = when {
shouldConnect(snapshot) -> Recovery.Connect(snapshot)
shouldDisconnect(snapshot) -> Recovery.Disconnect(reason)
else -> null // No action needed
}Request Order (outgoing):
Request
↓
1. StreamApiKeyInterceptor // Add ?api_key=<key>
↓
2. StreamAuthInterceptor // Add Authorization: <token>
// Handle token refresh on 401/403
↓
3. StreamClientInfoInterceptor // Add X-Stream-Client: <version>
↓
4. StreamConnectionIdInterceptor // Add ?connection_id=<id>
↓
5. StreamEndpointErrorInterceptor // Parse error responses
↓
Network
↓
5. StreamEndpointErrorInterceptor // Throw StreamEndpointException if error
↓
Response
Key Implementation Details:
- StreamAuthInterceptor: Synchronous; uses
runBlockingfor token operations (lines 65-111) - Token refresh: Automatic single retry with refreshed token on codes 40, 41, 42 (not 401/403 HTTP status)
- Error parsing:
StreamEndpointErrorInterceptorpeeks response body without consuming (non-intrusive)
| Component | Thread Model | Mechanism | Notes |
|---|---|---|---|
| StreamClientImpl | Coroutine-based | Launched on provided scope | Recommend CoroutineScope(SupervisorJob() + Dispatchers.IO) |
| StreamSocketSession | Callback thread | OkHttp WebSocket listener | Callbacks run on OkHttp's thread pool |
| StreamBatcherImpl | Single worker | Dedicated Job |
Sequential batch processing (lines 99-131) |
| StreamSerialProcessingQueueImpl | Single worker | Dedicated Job + Mutex |
FIFO execution, thread-safe submission (lines 60-121) |
| StreamSingleFlightProcessor | Caller's thread | ConcurrentHashMap + async(LAZY) |
Multiple callers await same deferred (lines 41-82) |
| StreamAuthInterceptor | OkHttp thread | runBlocking for suspend ops |
Blocks HTTP thread during token operations |
| TokenManager | Coroutine-safe | StateFlow + SingleFlight |
Only one refresh in flight at a time |
| ConnectionIdHolder | Lock-free | AtomicReference |
Thread-safe read/write |
| HealthMonitor | Worker coroutine | scope.launch + delay |
Periodic checks on scope's dispatcher (lines 71-92) |
| SubscriptionManager | Thread-safe | ConcurrentLinkedQueue + GC |
Supports concurrent subscribe/unsubscribe/notify |
Key Concurrency Patterns:
-
Single-Flight Deduplication (
StreamSingleFlightProcessorImpl.kt:41-82):override suspend fun <T> run(key: StreamTypedKey<T>, block: suspend () -> T): Result<T> { // Fast path: reuse existing flights[key]?.let { return it.await() } // Slow path: create new async(LAZY) val newExecution = scope.async(start = LAZY) { runCatching { block() } } val existing = flights.putIfAbsent(key, newExecution) val job = existing ?: newExecution.also { it.start() } return job.await() }
- Guarantee: Only first caller executes; others await same result
- Cancellation: Cancelling one awaiter doesn't cancel others (result cached)
- Cleanup: Entry removed in
finallyblock only if same job still mapped
-
Serial Queue Backpressure (
StreamSerialProcessingQueueImpl.kt:60-97):override suspend fun <T> submit(job: suspend () -> T): Result<T> { val reply = CompletableDeferred<Result<T>>() inbox.send(JobItem(block = job, reply = reply)) // Suspends if full return reply.await() }
- Guarantee: Jobs execute in submission order
- Worker loop: Single coroutine processes inbox sequentially
- Cancellation: Job-level cancellation doesn't stop queue
-
Atomic State Guards (
StreamSocketSession.kt:68-71):private val closingByUs = AtomicBoolean(false) private val cleaned = AtomicBoolean(false)
- Prevents duplicate disconnect notifications
- Idempotent cleanup via
compareAndSet(line 465)
Location: stream-android-core/src/main/java/io/getstream/android/core/internal/serialization/StreamCompositeEventSerializationImpl.kt
Purpose: Route events to appropriate deserializers based on "type" field.
Event Container:
class StreamCompositeSerializationEvent<T>(
val core: StreamClientWsEvent? = null, // Internal events
val product: T? = null // Product-specific events
)Deserialization Process (lines 98-125):
-
Peek "type" field (lines 127-156):
private fun peekType(raw: String): String? { val reader = JsonReader.of(Buffer().writeUtf8(raw)) // Parse JSON, find "type" field, return value }
-
Route by type:
when (type) { in alsoExternal -> { // Parse as BOTH core and product core = internal.deserialize(raw) product = external.deserialize(raw) return both(core, product) } in internalTypes -> { // Parse as CORE only ("connection.ok", "connection.error", "health.check") core = internal.deserialize(raw) return internal(core) } else -> { // Parse as PRODUCT only (Chat, Video, Feeds events) product = external.deserialize(raw) return external(product) } }
-
Notify listeners (
StreamSocketSession.kt:256-262):subscriptionManager.forEach { listener -> coreEvent?.takeUnless { it is StreamHealthCheckEvent }?.let { listener.onEvent(it) } productEvent?.let { listener.onEvent(it) } }
StreamClientConnectedEvent (type: "connection.ok"):
connectionId: String
me: StreamConnectedUser // Full user profile
type: "connection.ok"StreamClientConnectionErrorEvent (type: "connection.error"):
connectionId: String
createdAt: Date
error: StreamEndpointErrorData
type: "connection.error"StreamHealthCheckEvent (type: "health.check"):
- Echoes back
StreamClientConnectedEventevery 25 seconds - Filtered from user event callbacks (line 258 in
StreamSocketSession.kt)
StreamWSAuthMessageRequest:
products: List<String> // ["chat", "messaging", "video"]
token: String // JWT token from TokenManager
userDetails: StreamConnectUserDetailsRequest {
id: String
name: String?
image: String?
invisible: Boolean = false
language: String?
custom: Map<String, Any?>?
}All APIs return Result<T> instead of throwing exceptions:
component.start()
.onSuccess { /* started */ }
.onFailure { error -> logger.e(error) { "Failed to start" } }Exception: CancellationException is rethrown (not captured in Result).
Most components implement StreamStartableComponent:
interface StreamStartableComponent {
fun start(): Result<Unit>
fun stop(): Result<Unit>
}Always: Call start() before use, stop() when done. Components don't work until started.
Hot state flows provide immediate state snapshots to new collectors:
streamClient.connectionState.collect { state ->
when (state) {
is StreamConnectionState.Idle -> {}
is StreamConnectionState.Connecting.Opening -> {}
is StreamConnectionState.Connecting.Authenticating -> {}
is StreamConnectionState.Connected -> { /* use connectionId */ }
is StreamConnectionState.Disconnected -> { /* check cause */ }
}
}- All public APIs are thread-safe
- Android Lifecycle operations must run on main thread - library uses
runOnMainLooperinternally - Suspend functions don't block threads
- Use
SupervisorJobfor structured concurrency (failures don't cancel siblings)
Prefer factory functions over direct instantiation:
// ✅ Factory function (stable across implementation changes)
val queue = StreamSerialProcessingQueue(logger, scope)
// ❌ Direct instantiation (couples to implementation)
val queue = StreamSerialProcessingQueueImpl(logger, scope)// For UI components (auto-cleanup when GC'd)
subscriptionManager.subscribe(
listener = this,
options = Options(retention = AUTO_REMOVE)
)
// For services/singletons (explicit lifecycle)
val subscription = subscriptionManager.subscribe(
listener = this,
options = Options(retention = KEEP_UNTIL_CANCELLED)
).getOrThrow()
// Always cancel when done
subscription.cancel()Use atomic guards for idempotency:
private val cleaned = AtomicBoolean(false)
private fun cleanup() {
if (!cleaned.compareAndSet(false, true)) {
return // Already cleaned
}
// Perform cleanup
}Window doubles if batch was full (high traffic), resets if not (low traffic):
// StreamBatcherImpl.kt:120-125
val isFull = buffer.size >= batchSize
windowMs = if (isFull) {
(windowMs * 2).coerceAtMost(maxDelayMs)
} else {
initialDelayMs
}- Located in
stream-android-core/src/test/java/ - Use JUnit 4, MockK for mocking
- Robolectric for Android components (Lifecycle, Network)
- Test class naming:
<ClassUnderTest>Test.kt(e.g.,StreamRetryProcessorImplTest.kt) - Always test both success and failure paths
- Use
runTestfromkotlinx-coroutines-testfor coroutine testing
Android Lifecycle/Network components require Robolectric:
@RunWith(RobolectricTestRunner::class)
@Config(sdk = [Build.VERSION_CODES.UPSIDE_DOWN_CAKE])
class LifecycleTest {
@Test
fun testLifecycleMonitor() {
// Test with real Android components
}
}Use factory functions with test implementations:
class TestSerialQueue : StreamSerialProcessingQueue {
val submittedWork = mutableListOf<suspend () -> Any>()
override suspend fun <T : Any> submit(job: suspend () -> T): Result<T> {
submittedWork.add(job)
return Result.success(Unit as T)
}
override suspend fun start(): Result<Unit> = Result.success(Unit)
override suspend fun stop(timeout: Long?): Result<Unit> = Result.success(Unit)
}
@Test
fun testComponent() {
val testQueue = TestSerialQueue()
val component = MyComponent(testQueue)
component.doSomething()
assertEquals(1, testQueue.submittedWork.size)
}@Test
fun testSuspendFunction() = runTest {
val result = suspendingOperation()
assertTrue(result.isSuccess)
}Components don't work until started. Always follow: create → start() → use → stop()
Never share the same StreamSubscriptionManager instance across different components - creates event loops. Create separate instances per component.
Using KEEP_UNTIL_CANCELLED without cancelling causes leaks. Either use AUTO_REMOVE for UI or explicitly cancel.
Share a single StreamTokenManager instance across components. Multiple instances = multiple concurrent refreshes.
Token errors use codes 40, 41, 42 (from StreamEndpointErrorData.code), NOT HTTP status codes 401/403. Check for these specific codes when detecting token errors.
StreamAuthInterceptor uses runBlocking for token operations. Long token fetches block HTTP thread. Keep TokenProvider.getToken() fast.
Always use giveUpFunction in StreamRetryPolicy to stop on non-retryable errors:
val policy = StreamRetryPolicy.Exponential(
maxRetries = 10,
giveUpFunction = { attempt, error ->
error is UnauthorizedException ||
error is ForbiddenException ||
error is NotFoundException
}
)Match component lifetime to coroutine scope lifetime. Never use GlobalScope. Use viewModelScope, lifecycle scopes, or custom managed scopes.
Always handle Result - silent failures lead to hard-to-debug issues:
component.start()
.onSuccess { logger.i { "Started" } }
.onFailure { error ->
logger.e(error) { "Failed to start" }
// Handle error appropriately
}Socket close callbacks may fire after explicit disconnect. Use atomic flags to detect "closed by us" vs "closed by server":
// StreamSocketSession.kt:67-71
private val closingByUs = AtomicBoolean(false)
override fun onClosed(code: Int, reason: String) {
if (!closingByUs.get()) {
// Server closed socket, not us
notifyState(Disconnected(cause))
}
}- Client interface:
stream-android-core/src/main/java/io/getstream/android/core/api/StreamClient.kt - Client implementation:
stream-android-core/src/main/java/io/getstream/android/core/internal/StreamClientImpl.kt - Socket session:
stream-android-core/src/main/java/io/getstream/android/core/internal/socket/StreamSocketSession.kt - WebSocket wrapper:
stream-android-core/src/main/java/io/getstream/android/core/internal/socket/StreamWebSocketImpl.kt
- Connection state:
stream-android-core/src/main/java/io/getstream/android/core/api/model/connection/StreamConnectionState.kt - Connected user:
stream-android-core/src/main/java/io/getstream/android/core/api/model/connection/StreamConnectedUser.kt - Retry policy:
stream-android-core/src/main/java/io/getstream/android/core/api/model/retry/StreamRetryPolicy.kt - Error data:
stream-android-core/src/main/java/io/getstream/android/core/api/model/exceptions/StreamEndpointErrorData.kt
- Single-flight:
stream-android-core/src/main/java/io/getstream/android/core/internal/processing/StreamSingleFlightProcessorImpl.kt - Serial queue:
stream-android-core/src/main/java/io/getstream/android/core/internal/processing/StreamSerialProcessingQueueImpl.kt - Batcher:
stream-android-core/src/main/java/io/getstream/android/core/internal/processing/StreamBatcherImpl.kt - Retry:
stream-android-core/src/main/java/io/getstream/android/core/internal/processing/StreamRetryProcessorImpl.kt
- Token manager:
stream-android-core/src/main/java/io/getstream/android/core/internal/authentication/StreamTokenManagerImpl.kt - Auth interceptor:
stream-android-core/src/main/java/io/getstream/android/core/internal/http/interceptor/StreamAuthInterceptor.kt
- Health monitor:
stream-android-core/src/main/java/io/getstream/android/core/internal/socket/monitor/StreamHealthMonitorImpl.kt - Network monitor:
stream-android-core/src/main/java/io/getstream/android/core/internal/observers/network/StreamNetworkMonitorImpl.kt - Lifecycle monitor:
stream-android-core/src/main/java/io/getstream/android/core/internal/observers/lifecycle/StreamLifecycleMonitorImpl.kt - Recovery evaluator:
stream-android-core/src/main/java/io/getstream/android/core/internal/recovery/StreamConnectionRecoveryEvaluatorImpl.kt
- Composite serialization:
stream-android-core/src/main/java/io/getstream/android/core/internal/serialization/StreamCompositeEventSerializationImpl.kt - Moshi implementation:
stream-android-core/src/main/java/io/getstream/android/core/internal/serialization/StreamMoshiJsonSerializationImpl.kt
- Test directory:
stream-android-core/src/test/java/io/getstream/android/core/ - Example test:
stream-android-core/src/test/java/io/getstream/android/core/internal/processing/StreamRetryProcessorImplTest.kt
- build.gradle.kts: Uses explicit API mode, enables coroutines, KSP for Moshi codegen, JVM target 11
- detekt.yml:
config/detekt/detekt.yml- static analysis rules with auto-correct enabled - lint.xml: Root
lint.xml- Android lint configuration (abort on error, warnings as errors) - consumer-rules.pro: ProGuard rules for library consumers
- gradle.properties: JVM args, AndroidX, Kotlin code style
- Main branch:
develop(notmainormaster) - Current branch: Development happens on feature branches (e.g.,
stream-watcher) - PR target: Always target
developfor pull requests - Commit messages: Follow existing style, use imperative mood ("Add feature" not "Added feature")
- No force push: To
developormainbranches
When adding new APIs, choose the appropriate annotation:
@StreamPublishedApi- Stable APIs that product SDKs expose to integrators (breaking changes require major version bump)@StreamInternalApi- Internal infrastructure, may change without notice (most common for new features)@StreamDelicateApi- Advanced APIs requiring careful use (e.g., direct socket access)
Use @JsonClass(generateAdapter = true) for data classes:
@JsonClass(generateAdapter = true)
data class MyEvent(
val type: String,
val data: String
)KSP generates adapters at compile time. Never write adapters manually unless dealing with polymorphism or custom serialization logic.
Use lambda-based logging for performance:
logger.d { "Expensive: ${computeExpensiveString()}" } // ✅ Only called if DEBUG enabled
logger.d("Expensive: ${computeExpensiveString()}") // ❌ Always computedAlways use SupervisorJob for scopes powering multiple components:
val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)This prevents one component's failure from cancelling siblings. Without SupervisorJob, a failure in the health monitor would cancel the batcher, socket session, etc.