Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import io.getstream.android.core.api.socket.StreamWebSocketFactory
import io.getstream.android.core.api.socket.listeners.StreamClientListener
import io.getstream.android.core.api.socket.monitor.StreamHealthMonitor
import io.getstream.android.core.api.subscribe.StreamSubscriptionManager
import io.getstream.android.core.api.telemetry.StreamTelemetry

/**
* Optional overrides for internal components used by
Expand Down Expand Up @@ -70,6 +71,7 @@ import io.getstream.android.core.api.subscribe.StreamSubscriptionManager
* @param connectionRecoveryEvaluator Reconnection heuristics evaluator.
* @param clientSubscriptionManager Socket-level listener registry.
* @param androidComponentsProvider Android system service provider.
* @param telemetry Telemetry engine. When `null`, core uses a no-op that discards all signals.
*/
@Suppress("LongParameterList")
@StreamInternalApi
Expand All @@ -87,4 +89,5 @@ public data class StreamComponentProvider(
val connectionRecoveryEvaluator: StreamConnectionRecoveryEvaluator? = null,
val clientSubscriptionManager: StreamSubscriptionManager<StreamClientListener>? = null,
val androidComponentsProvider: StreamAndroidComponentsProvider? = null,
val telemetry: StreamTelemetry? = null,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2014-2026 Stream.io Inc. All rights reserved.
*
* Licensed under the Stream License;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://github.com/GetStream/stream-core-android/blob/main/LICENSE
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.getstream.android.core.api.model.config

import io.getstream.android.core.annotations.StreamInternalApi
import io.getstream.android.core.api.telemetry.StreamSignalRedactor
import io.getstream.android.core.api.telemetry.StreamTelemetry
import io.getstream.android.core.api.telemetry.StreamTelemetryScope
import java.io.File

/**
* Configuration for [StreamTelemetry].
*
* @param root Root directory for disk spill storage. Defaults to `context.cacheDir` (passed at
* creation time via the factory).
* @param basePath Subdirectory under [root] for telemetry files (e.g. `"stream/telemetry"`).
* @param version Version tag for the disk format — typically the core SDK version. On
* initialization, any directories under `{root}/{basePath}` that don't match this version are
* deleted.
* @param memoryCapacity Maximum number of signals held in memory per scope before spilling to disk.
* @param diskCapacity Maximum bytes of disk storage per scope. When exceeded, the oldest signals
* are dropped.
* @param redactor Optional [StreamSignalRedactor] applied to every signal on
* [emit][StreamTelemetryScope.emit].
*/
@StreamInternalApi
public data class StreamTelemetryConfig(
val root: File? = null,
val basePath: String = "stream/telemetry",
val version: String,
val memoryCapacity: Int = 500,
val diskCapacity: Long = 1_000_000L,
val redactor: StreamSignalRedactor? = null,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2014-2026 Stream.io Inc. All rights reserved.
*
* Licensed under the Stream License;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://github.com/GetStream/stream-core-android/blob/main/LICENSE
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.getstream.android.core.api.model.telemetry

import io.getstream.android.core.annotations.StreamInternalApi

/**
* A single unit of telemetry data captured by a [StreamTelemetryScope].
*
* @param tag Identifies what happened (e.g. `"connected"`, `"token.refreshed"`,
* `"network.changed"`).
* @param data Pre-serialized payload associated with the signal. Callers are responsible for
* serializing domain objects to a string before emitting. May be `null` for tag-only signals.
* @param timestamp Epoch milliseconds when the signal was recorded.
*/
@StreamInternalApi
public data class StreamSignal(val tag: String, val data: String?, val timestamp: Long)
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (c) 2014-2026 Stream.io Inc. All rights reserved.
*
* Licensed under the Stream License;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://github.com/GetStream/stream-core-android/blob/main/LICENSE
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.getstream.android.core.api.telemetry

import io.getstream.android.core.annotations.StreamInternalApi
import io.getstream.android.core.api.model.telemetry.StreamSignal

/**
* Transforms or redacts sensitive data from a [StreamSignal] before it is stored.
*
* Redactors run synchronously on [StreamTelemetryScope.emit] — the returned signal is what gets
* buffered. Return `null` to drop the signal entirely.
*
* ### Example
*
* ```kotlin
* val redactor = StreamSignalRedactor { signal ->
* if (signal.tag == "auth.token") {
* signal.copy(data = "[REDACTED]")
* } else {
* signal
* }
* }
* ```
*/
@StreamInternalApi
public fun interface StreamSignalRedactor {

/**
* Transforms or redacts the given [signal].
*
* @param signal The signal to process.
* @return The redacted signal, or `null` to drop it.
*/
public fun redact(signal: StreamSignal): StreamSignal?
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright (c) 2014-2026 Stream.io Inc. All rights reserved.
*
* Licensed under the Stream License;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://github.com/GetStream/stream-core-android/blob/main/LICENSE
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.getstream.android.core.api.telemetry

import android.content.Context
import io.getstream.android.core.annotations.StreamInternalApi
import io.getstream.android.core.api.model.config.StreamTelemetryConfig
import io.getstream.android.core.api.model.telemetry.StreamSignal
import io.getstream.android.core.internal.telemetry.StreamTelemetryImpl
import kotlinx.coroutines.CoroutineScope

/**
* Engine for collecting telemetry signals across named [scopes][StreamTelemetryScope].
*
* Product SDKs create an instance via the [StreamTelemetry] factory, inject it into
* [StreamComponentProvider][io.getstream.android.core.api.model.config.StreamComponentProvider],
* and retain a reference for draining. Core emits signals internally; the product SDK decides when
* and where to send them.
*
* If no telemetry is provided, core uses [StreamTelemetryNoOp] which discards everything at zero
* cost.
*
* ### Usage
*
* ```kotlin
* // Product SDK creates and keeps a reference
* val telemetry = StreamTelemetry(context, config)
*
* // Inject into core
* val client = StreamClient(
* ...,
* components = StreamComponentProvider(telemetry = telemetry),
* )
*
* // Core emits internally
* telemetry.scope("connection").emit("connected", connectionId)
*
* // Product SDK drains on its own schedule
* val signals = telemetry.scope("connection").drain()
* ```
*/
@StreamInternalApi
public interface StreamTelemetry {

/**
* Returns the [StreamTelemetryScope] for the given [name], creating it if it doesn't exist.
*
* Scope names are open — any string is valid. Core uses names like `"connection"`, `"network"`,
* `"auth"`, and `"device"`. Product SDKs add their own (e.g. `"sfu"`, `"publisher"`).
*
* @param name Scope identifier.
* @return The scope for [name].
*/
public fun scope(name: String): StreamTelemetryScope

/**
* Deletes disk spill directories from previous SDK versions.
*
* Call this when the product SDK is ready (e.g. after initialization). Directories that don't
* match the current [StreamTelemetryConfig.version] are removed.
*
* @return [Result.success] on completion, or [Result.failure] if cleanup could not run.
*/
public suspend fun cleanStaleVersions(): Result<Unit>
}

/**
* Creates a [StreamTelemetry] instance backed by the default implementation.
*
* @param context Android application context (used for `cacheDir` when [StreamTelemetryConfig.root]
* is `null`).
* @param config Telemetry configuration.
* @param scope Coroutine scope for disk I/O operations (spill, drain, cleanup).
* @return A new [StreamTelemetry] instance.
*/
@StreamInternalApi
public fun StreamTelemetry(
context: Context,
config: StreamTelemetryConfig,
scope: CoroutineScope,
): StreamTelemetry = StreamTelemetryImpl(context.applicationContext, config, scope)

/**
* No-op implementation that discards all signals without allocating buffers.
*
* Used internally when no telemetry is provided via
* [StreamComponentProvider][io.getstream.android.core.api.model.config.StreamComponentProvider].
*/
@StreamInternalApi
public object StreamTelemetryNoOp : StreamTelemetry {

override fun scope(name: String): StreamTelemetryScope = NoOpScope

override suspend fun cleanStaleVersions(): Result<Unit> = Result.success(Unit)

private object NoOpScope : StreamTelemetryScope {
override val name: String = "noop"

override fun emit(tag: String, data: String?): Result<Unit> = Result.success(Unit)

override suspend fun drain(): Result<List<StreamSignal>> = Result.success(emptyList())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright (c) 2014-2026 Stream.io Inc. All rights reserved.
*
* Licensed under the Stream License;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://github.com/GetStream/stream-core-android/blob/main/LICENSE
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.getstream.android.core.api.telemetry

import io.getstream.android.core.annotations.StreamInternalApi
import io.getstream.android.core.api.model.telemetry.StreamSignal

/**
* A named channel that groups related [signals][StreamSignal].
*
* Scopes are created via [StreamTelemetry.scope]. Each scope maintains its own memory buffer and
* optional disk spill. Signals flow in via [emit] and out via [drain].
*
* ### Threading
*
* All operations are thread-safe. [emit] never blocks or suspends.
*
* ### Example
*
* ```kotlin
* val scope = telemetry.scope("connection")
* scope.emit("connected", connectionId)
* scope.emit("disconnected", null)
*
* // Later, when ready to send
* val signals: List<StreamSignal> = scope.drain()
* ```
*/
@StreamInternalApi
public interface StreamTelemetryScope {

/** The name of this scope (e.g. `"connection"`, `"sfu"`, `"network"`). */
public val name: String

/**
* Records a signal into this scope.
*
* The [StreamSignalRedactor] (if configured) runs synchronously before the signal is buffered.
* If the redactor returns `null`, the signal is dropped and not buffered.
*
* @param tag Identifies what happened.
* @param data Optional pre-serialized payload. Pass `null` for tag-only signals.
* @return [Result.success] if the signal was buffered (or intentionally dropped by the
* redactor), or [Result.failure] if an error occurred.
*/
public fun emit(tag: String, data: String? = null): Result<Unit>

/**
* Atomically consumes all buffered signals, including any that were spilled to disk.
*
* After this call, the scope's buffer is empty. Disk-spilled signals are read first (oldest),
* followed by in-memory signals (newest), preserving FIFO order.
*
* Disk reads run on `Dispatchers.IO`.
*
* @return [Result.success] with signals in chronological order, or [Result.failure] if the
* drain could not complete (e.g. disk I/O error).
*/
public suspend fun drain(): Result<List<StreamSignal>>
}
Loading
Loading