diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamComponentProvider.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamComponentProvider.kt index b1c18b9..0bc1e7b 100644 --- a/stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamComponentProvider.kt +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamComponentProvider.kt @@ -31,6 +31,7 @@ import io.getstream.android.core.api.socket.StreamWebSocketFactory import io.getstream.android.core.api.socket.listeners.StreamClientListener import io.getstream.android.core.api.socket.monitor.StreamHealthMonitor import io.getstream.android.core.api.subscribe.StreamSubscriptionManager +import io.getstream.android.core.api.telemetry.StreamTelemetry /** * Optional overrides for internal components used by @@ -70,6 +71,7 @@ import io.getstream.android.core.api.subscribe.StreamSubscriptionManager * @param connectionRecoveryEvaluator Reconnection heuristics evaluator. * @param clientSubscriptionManager Socket-level listener registry. * @param androidComponentsProvider Android system service provider. + * @param telemetry Telemetry engine. When `null`, core uses a no-op that discards all signals. */ @Suppress("LongParameterList") @StreamInternalApi @@ -87,4 +89,5 @@ public data class StreamComponentProvider( val connectionRecoveryEvaluator: StreamConnectionRecoveryEvaluator? = null, val clientSubscriptionManager: StreamSubscriptionManager? = null, val androidComponentsProvider: StreamAndroidComponentsProvider? = null, + val telemetry: StreamTelemetry? = null, ) diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamTelemetryConfig.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamTelemetryConfig.kt new file mode 100644 index 0000000..208140c --- /dev/null +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamTelemetryConfig.kt @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2014-2026 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.api.model.config + +import io.getstream.android.core.annotations.StreamInternalApi +import io.getstream.android.core.api.telemetry.StreamSignalRedactor +import io.getstream.android.core.api.telemetry.StreamTelemetry +import io.getstream.android.core.api.telemetry.StreamTelemetryScope +import java.io.File + +/** + * Configuration for [StreamTelemetry]. + * + * @param root Root directory for disk spill storage. Defaults to `context.cacheDir` (passed at + * creation time via the factory). + * @param basePath Subdirectory under [root] for telemetry files (e.g. `"stream/telemetry"`). + * @param version Version tag for the disk format — typically the core SDK version. On + * initialization, any directories under `{root}/{basePath}` that don't match this version are + * deleted. + * @param memoryCapacity Maximum number of signals held in memory per scope before spilling to disk. + * @param diskCapacity Maximum bytes of disk storage per scope. When exceeded, the oldest signals + * are dropped. + * @param redactor Optional [StreamSignalRedactor] applied to every signal on + * [emit][StreamTelemetryScope.emit]. + */ +@StreamInternalApi +public data class StreamTelemetryConfig( + val root: File? = null, + val basePath: String = "stream/telemetry", + val version: String, + val memoryCapacity: Int = 500, + val diskCapacity: Long = 1_000_000L, + val redactor: StreamSignalRedactor? = null, +) diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/model/telemetry/StreamSignal.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/model/telemetry/StreamSignal.kt new file mode 100644 index 0000000..ca0b5a7 --- /dev/null +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/model/telemetry/StreamSignal.kt @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2014-2026 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.api.model.telemetry + +import io.getstream.android.core.annotations.StreamInternalApi + +/** + * A single unit of telemetry data captured by a [StreamTelemetryScope]. + * + * @param tag Identifies what happened (e.g. `"connected"`, `"token.refreshed"`, + * `"network.changed"`). + * @param data Pre-serialized payload associated with the signal. Callers are responsible for + * serializing domain objects to a string before emitting. May be `null` for tag-only signals. + * @param timestamp Epoch milliseconds when the signal was recorded. + */ +@StreamInternalApi +public data class StreamSignal(val tag: String, val data: String?, val timestamp: Long) diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamSignalRedactor.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamSignalRedactor.kt new file mode 100644 index 0000000..6ab6ce1 --- /dev/null +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamSignalRedactor.kt @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2014-2026 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.api.telemetry + +import io.getstream.android.core.annotations.StreamInternalApi +import io.getstream.android.core.api.model.telemetry.StreamSignal + +/** + * Transforms or redacts sensitive data from a [StreamSignal] before it is stored. + * + * Redactors run synchronously on [StreamTelemetryScope.emit] — the returned signal is what gets + * buffered. Return `null` to drop the signal entirely. + * + * ### Example + * + * ```kotlin + * val redactor = StreamSignalRedactor { signal -> + * if (signal.tag == "auth.token") { + * signal.copy(data = "[REDACTED]") + * } else { + * signal + * } + * } + * ``` + */ +@StreamInternalApi +public fun interface StreamSignalRedactor { + + /** + * Transforms or redacts the given [signal]. + * + * @param signal The signal to process. + * @return The redacted signal, or `null` to drop it. + */ + public fun redact(signal: StreamSignal): StreamSignal? +} diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetry.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetry.kt new file mode 100644 index 0000000..4850275 --- /dev/null +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetry.kt @@ -0,0 +1,117 @@ +/* + * Copyright (c) 2014-2026 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.api.telemetry + +import android.content.Context +import io.getstream.android.core.annotations.StreamInternalApi +import io.getstream.android.core.api.model.config.StreamTelemetryConfig +import io.getstream.android.core.api.model.telemetry.StreamSignal +import io.getstream.android.core.internal.telemetry.StreamTelemetryImpl +import kotlinx.coroutines.CoroutineScope + +/** + * Engine for collecting telemetry signals across named [scopes][StreamTelemetryScope]. + * + * Product SDKs create an instance via the [StreamTelemetry] factory, inject it into + * [StreamComponentProvider][io.getstream.android.core.api.model.config.StreamComponentProvider], + * and retain a reference for draining. Core emits signals internally; the product SDK decides when + * and where to send them. + * + * If no telemetry is provided, core uses [StreamTelemetryNoOp] which discards everything at zero + * cost. + * + * ### Usage + * + * ```kotlin + * // Product SDK creates and keeps a reference + * val telemetry = StreamTelemetry(context, config) + * + * // Inject into core + * val client = StreamClient( + * ..., + * components = StreamComponentProvider(telemetry = telemetry), + * ) + * + * // Core emits internally + * telemetry.scope("connection").emit("connected", connectionId) + * + * // Product SDK drains on its own schedule + * val signals = telemetry.scope("connection").drain() + * ``` + */ +@StreamInternalApi +public interface StreamTelemetry { + + /** + * Returns the [StreamTelemetryScope] for the given [name], creating it if it doesn't exist. + * + * Scope names are open — any string is valid. Core uses names like `"connection"`, `"network"`, + * `"auth"`, and `"device"`. Product SDKs add their own (e.g. `"sfu"`, `"publisher"`). + * + * @param name Scope identifier. + * @return The scope for [name]. + */ + public fun scope(name: String): StreamTelemetryScope + + /** + * Deletes disk spill directories from previous SDK versions. + * + * Call this when the product SDK is ready (e.g. after initialization). Directories that don't + * match the current [StreamTelemetryConfig.version] are removed. + * + * @return [Result.success] on completion, or [Result.failure] if cleanup could not run. + */ + public suspend fun cleanStaleVersions(): Result +} + +/** + * Creates a [StreamTelemetry] instance backed by the default implementation. + * + * @param context Android application context (used for `cacheDir` when [StreamTelemetryConfig.root] + * is `null`). + * @param config Telemetry configuration. + * @param scope Coroutine scope for disk I/O operations (spill, drain, cleanup). + * @return A new [StreamTelemetry] instance. + */ +@StreamInternalApi +public fun StreamTelemetry( + context: Context, + config: StreamTelemetryConfig, + scope: CoroutineScope, +): StreamTelemetry = StreamTelemetryImpl(context.applicationContext, config, scope) + +/** + * No-op implementation that discards all signals without allocating buffers. + * + * Used internally when no telemetry is provided via + * [StreamComponentProvider][io.getstream.android.core.api.model.config.StreamComponentProvider]. + */ +@StreamInternalApi +public object StreamTelemetryNoOp : StreamTelemetry { + + override fun scope(name: String): StreamTelemetryScope = NoOpScope + + override suspend fun cleanStaleVersions(): Result = Result.success(Unit) + + private object NoOpScope : StreamTelemetryScope { + override val name: String = "noop" + + override fun emit(tag: String, data: String?): Result = Result.success(Unit) + + override suspend fun drain(): Result> = Result.success(emptyList()) + } +} diff --git a/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetryScope.kt b/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetryScope.kt new file mode 100644 index 0000000..439c271 --- /dev/null +++ b/stream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetryScope.kt @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2014-2026 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.api.telemetry + +import io.getstream.android.core.annotations.StreamInternalApi +import io.getstream.android.core.api.model.telemetry.StreamSignal + +/** + * A named channel that groups related [signals][StreamSignal]. + * + * Scopes are created via [StreamTelemetry.scope]. Each scope maintains its own memory buffer and + * optional disk spill. Signals flow in via [emit] and out via [drain]. + * + * ### Threading + * + * All operations are thread-safe. [emit] never blocks or suspends. + * + * ### Example + * + * ```kotlin + * val scope = telemetry.scope("connection") + * scope.emit("connected", connectionId) + * scope.emit("disconnected", null) + * + * // Later, when ready to send + * val signals: List = scope.drain() + * ``` + */ +@StreamInternalApi +public interface StreamTelemetryScope { + + /** The name of this scope (e.g. `"connection"`, `"sfu"`, `"network"`). */ + public val name: String + + /** + * Records a signal into this scope. + * + * The [StreamSignalRedactor] (if configured) runs synchronously before the signal is buffered. + * If the redactor returns `null`, the signal is dropped and not buffered. + * + * @param tag Identifies what happened. + * @param data Optional pre-serialized payload. Pass `null` for tag-only signals. + * @return [Result.success] if the signal was buffered (or intentionally dropped by the + * redactor), or [Result.failure] if an error occurred. + */ + public fun emit(tag: String, data: String? = null): Result + + /** + * Atomically consumes all buffered signals, including any that were spilled to disk. + * + * After this call, the scope's buffer is empty. Disk-spilled signals are read first (oldest), + * followed by in-memory signals (newest), preserving FIFO order. + * + * Disk reads run on `Dispatchers.IO`. + * + * @return [Result.success] with signals in chronological order, or [Result.failure] if the + * drain could not complete (e.g. disk I/O error). + */ + public suspend fun drain(): Result> +} diff --git a/stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImpl.kt b/stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImpl.kt new file mode 100644 index 0000000..ab7c2ff --- /dev/null +++ b/stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImpl.kt @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2014-2026 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.internal.telemetry + +import android.content.Context +import io.getstream.android.core.api.model.config.StreamTelemetryConfig +import io.getstream.android.core.api.telemetry.StreamTelemetry +import io.getstream.android.core.api.telemetry.StreamTelemetryScope +import io.getstream.android.core.api.utils.runCatchingCancellable +import java.io.File +import java.util.concurrent.ConcurrentHashMap +import kotlinx.coroutines.CoroutineScope + +/** + * Default [StreamTelemetry] implementation. + * + * Scans the telemetry base directory on [cleanStaleVersions] and deletes any version subdirectories + * that don't match [StreamTelemetryConfig.version]. This ensures stale spill files from older SDK + * versions are never deserialized with an incompatible format. + */ +internal class StreamTelemetryImpl( + context: Context, + private val config: StreamTelemetryConfig, + private val scope: CoroutineScope, +) : StreamTelemetry { + + private val scopes = ConcurrentHashMap() + + private val baseDir: File = + File(config.root ?: context.cacheDir, "${config.basePath}/${config.version}") + + override fun scope(name: String): StreamTelemetryScope = + scopes.getOrPut(name) { + StreamTelemetryScopeImpl( + name = name, + memoryCapacity = config.memoryCapacity, + diskCapacity = config.diskCapacity, + spillDir = File(baseDir, name), + redactor = config.redactor, + scope = scope, + ) + } + + override suspend fun cleanStaleVersions(): Result = runCatchingCancellable { + val versionParent = baseDir.parentFile ?: return@runCatchingCancellable + if (!versionParent.exists()) { + return@runCatchingCancellable + } + versionParent.listFiles()?.forEach { dir -> + if (dir.isDirectory && dir.name != config.version) { + dir.deleteRecursively() + } + } + } +} diff --git a/stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.kt b/stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.kt new file mode 100644 index 0000000..3ca6df4 --- /dev/null +++ b/stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.kt @@ -0,0 +1,187 @@ +/* + * Copyright (c) 2014-2026 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.internal.telemetry + +import io.getstream.android.core.api.model.telemetry.StreamSignal +import io.getstream.android.core.api.telemetry.StreamSignalRedactor +import io.getstream.android.core.api.telemetry.StreamTelemetryScope +import io.getstream.android.core.api.utils.runCatchingCancellable +import java.io.File +import java.util.concurrent.atomic.AtomicBoolean +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import kotlinx.coroutines.withContext + +/** + * Thread-safe [StreamTelemetryScope] backed by an in-memory ring buffer that spills to disk when + * full. + * + * ### Buffer swap + * + * [emit] appends to a mutable list guarded by [synchronized]. [drain] atomically swaps the list for + * a fresh one, so reads and writes never contend beyond the swap itself. + * + * ### Disk concurrency + * + * All disk I/O ([spillToDisk], [trimDiskIfNeeded], [drainDisk]) is serialized through a [Mutex]. + * This prevents races between a spill write and a drain read on the same file. + * + * ### Disk spill + * + * When the memory buffer exceeds the configured capacity, the oldest signals are serialized to disk + * on [Dispatchers.IO]. If a spill is already in progress, the oldest in-memory signal is dropped + * instead. Disk storage is capped per scope; when exceeded the oldest lines are removed in a single + * pass. + * + * ### Drain order + * + * [drain] returns disk-spilled signals first (oldest), then in-memory signals (newest), preserving + * FIFO order across both tiers. + */ +internal class StreamTelemetryScopeImpl( + override val name: String, + private val memoryCapacity: Int, + private val diskCapacity: Long, + private val spillDir: File, + private val redactor: StreamSignalRedactor?, + private val scope: CoroutineScope, + private val ioDispatcher: CoroutineDispatcher = Dispatchers.IO, +) : StreamTelemetryScope { + + private val lock = Any() + private val diskMutex = Mutex() + private val spillFile: File + get() = File(spillDir, SPILL_FILE_NAME) + + private var buffer = mutableListOf() + private val spilling = AtomicBoolean(false) + + override fun emit(tag: String, data: String?): Result = + // runCatching (not cancellable) — emit is not a suspend function. + runCatching { + val raw = StreamSignal(tag = tag, data = data, timestamp = System.currentTimeMillis()) + val signal = if (redactor != null) redactor.redact(raw) else raw + if (signal == null) return@runCatching // redactor dropped the signal + synchronized(lock) { + buffer.add(signal) + if (buffer.size > memoryCapacity) { + if (spilling.compareAndSet(false, true)) { + val snapshot = buffer + buffer = mutableListOf() + scope.launch(ioDispatcher) { spillToDisk(snapshot) } + } else { + buffer.removeAt(0) + } + } + } + } + + override suspend fun drain(): Result> = runCatchingCancellable { + val memorySnapshot: List + synchronized(lock) { + memorySnapshot = buffer + buffer = mutableListOf() + } + val diskSignals = withContext(ioDispatcher) { drainDisk() } + if (diskSignals.isEmpty()) { + memorySnapshot + } else { + diskSignals + memorySnapshot + } + } + + // --- Disk spill ---------------------------------------------------------------- + + private suspend fun spillToDisk(signals: List) { + runCatchingCancellable { + diskMutex.withLock { + spillDir.mkdirs() + val file = spillFile + file.appendText( + signals.joinToString(separator = "\n", postfix = "\n") { encode(it) } + ) + trimDiskIfNeeded(file) + } + } + // Disk I/O failure — signals are lost. That's acceptable for telemetry. + // Always reset the spilling flag so future emits can trigger new spills. + spilling.set(false) + } + + private fun trimDiskIfNeeded(file: File) { + if (!file.exists() || file.length() <= diskCapacity) { + return + } + val lines = file.readLines() + var bytes = 0L + var startIdx = lines.size + for (i in lines.indices.reversed()) { + val next = bytes + lines[i].toByteArray(Charsets.UTF_8).size + 1 + if (next > diskCapacity) break + bytes = next + startIdx = i + } + val kept = lines.subList(startIdx, lines.size) + file.writeText(if (kept.isEmpty()) "" else kept.joinToString("\n", postfix = "\n")) + } + + private suspend fun drainDisk(): List = + diskMutex.withLock { + val file = spillFile + if (!file.exists() || file.length() == 0L) { + return@withLock emptyList() + } + val signals = file.readLines().mapNotNull { decode(it) } + if (!file.delete()) { + file.writeText("") + } + signals + } + + // --- Serialization (simple line-based format) ----------------------------------- + + private fun encode(signal: StreamSignal): String { + val escapedTag = signal.tag.replace(DELIMITER, DELIMITER_ESCAPE) + val escapedData = signal.data?.replace(DELIMITER, DELIMITER_ESCAPE).orEmpty() + return "${signal.timestamp}$DELIMITER$escapedTag$DELIMITER$escapedData" + } + + @Suppress("ReturnCount") + private fun decode(line: String): StreamSignal? { + if (line.isBlank()) { + return null + } + val parts = line.split(DELIMITER, limit = 3) + if (parts.size < 2) { + return null + } + val timestamp = parts[0].toLongOrNull() ?: return null + val tag = parts[1].replace(DELIMITER_ESCAPE, DELIMITER) + val data = parts.getOrNull(2)?.replace(DELIMITER_ESCAPE, DELIMITER)?.ifEmpty { null } + return StreamSignal(tag = tag, data = data, timestamp = timestamp) + } + + private companion object { + const val SPILL_FILE_NAME = "spill.bin" + const val DELIMITER = "\t" + const val DELIMITER_ESCAPE = "\\t" + } +} diff --git a/stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImplTest.kt b/stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImplTest.kt new file mode 100644 index 0000000..67e787f --- /dev/null +++ b/stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImplTest.kt @@ -0,0 +1,301 @@ +/* + * Copyright (c) 2014-2026 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.internal.telemetry + +import android.content.Context +import io.getstream.android.core.api.model.config.StreamTelemetryConfig +import io.getstream.android.core.api.telemetry.StreamTelemetryNoOp +import io.mockk.every +import io.mockk.mockk +import java.io.File +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancel +import kotlinx.coroutines.runBlocking +import org.junit.After +import org.junit.Assert.assertEquals +import org.junit.Assert.assertFalse +import org.junit.Assert.assertSame +import org.junit.Assert.assertTrue +import org.junit.Before +import org.junit.Rule +import org.junit.Test +import org.junit.rules.TemporaryFolder + +class StreamTelemetryImplTest { + + @get:Rule val tempDir = TemporaryFolder() + + private lateinit var cacheDir: File + private lateinit var context: Context + private lateinit var scope: CoroutineScope + + @Before + fun setUp() { + cacheDir = tempDir.newFolder("cache") + context = mockk { + every { applicationContext } returns this + every { this@mockk.cacheDir } returns this@StreamTelemetryImplTest.cacheDir + } + scope = CoroutineScope(SupervisorJob() + Dispatchers.Default) + } + + @After + fun tearDown() { + scope.cancel() + } + + private fun config( + version: String = "1.0.0", + root: File? = null, + basePath: String = "stream/telemetry", + ): StreamTelemetryConfig = + StreamTelemetryConfig(root = root, basePath = basePath, version = version) + + // ======================================== + // Scope creation + // ======================================== + + @Test + fun `scope creates a new scope with the given name`() { + val sut = StreamTelemetryImpl(context, config(), scope) + val telemetryScope = sut.scope("connection") + assertEquals("connection", telemetryScope.name) + } + + @Test + fun `scope returns same instance for same name`() { + val sut = StreamTelemetryImpl(context, config(), scope) + val first = sut.scope("connection") + val second = sut.scope("connection") + assertSame(first, second) + } + + @Test + fun `scope returns different instances for different names`() { + val sut = StreamTelemetryImpl(context, config(), scope) + val connection = sut.scope("connection") + val network = sut.scope("network") + assertTrue(connection !== network) + assertEquals("connection", connection.name) + assertEquals("network", network.name) + } + + @Test + fun `many scopes can coexist`() { + val sut = StreamTelemetryImpl(context, config(), scope) + val names = (1..100).map { "scope-$it" } + val scopes = names.map { sut.scope(it) } + + assertEquals(100, scopes.toSet().size) + scopes.forEachIndexed { idx, s -> assertEquals(names[idx], s.name) } + } + + @Test + fun `scope with empty name works`() { + val sut = StreamTelemetryImpl(context, config(), scope) + val telemetryScope = sut.scope("") + assertEquals("", telemetryScope.name) + } + + @Test + fun `scope with special characters in name works`() { + val sut = StreamTelemetryImpl(context, config(), scope) + val telemetryScope = sut.scope("sfu/publisher.video:h264") + assertEquals("sfu/publisher.video:h264", telemetryScope.name) + } + + // ======================================== + // Version cleanup + // ======================================== + + @Test + fun `init deletes stale version directories`() = runBlocking { + val baseDir = File(cacheDir, "stream/telemetry") + // Create old version dirs + File(baseDir, "0.9.0/connection").mkdirs() + File(baseDir, "0.9.0/connection/spill.bin").writeText("old-data") + File(baseDir, "0.8.0/network").mkdirs() + File(baseDir, "0.8.0/network/spill.bin").writeText("older-data") + + val sut = StreamTelemetryImpl(context, config(version = "1.0.0"), scope) + sut.cleanStaleVersions() + + assertFalse("0.9.0 should be deleted", File(baseDir, "0.9.0").exists()) + assertFalse("0.8.0 should be deleted", File(baseDir, "0.8.0").exists()) + } + + @Test + fun `init keeps current version directory`() = runBlocking { + val baseDir = File(cacheDir, "stream/telemetry") + File(baseDir, "1.0.0/connection").mkdirs() + File(baseDir, "1.0.0/connection/spill.bin").writeText("current-data") + + val sut = StreamTelemetryImpl(context, config(version = "1.0.0"), scope) + sut.cleanStaleVersions() + + assertTrue("1.0.0 should be kept", File(baseDir, "1.0.0").exists()) + assertTrue("spill.bin should be kept", File(baseDir, "1.0.0/connection/spill.bin").exists()) + } + + @Test + fun `cleanStaleVersions with no existing directories does not crash`() = runBlocking { + // cacheDir exists but no telemetry dirs + val sut = StreamTelemetryImpl(context, config(version = "1.0.0"), scope) + val result = sut.cleanStaleVersions() + assertTrue(result.isSuccess) + } + + @Test + fun `init only deletes directories under basePath, not siblings`() = runBlocking { + // Create a file outside telemetry path + val unrelated = File(cacheDir, "stream/other-data.txt") + unrelated.parentFile?.mkdirs() + unrelated.writeText("important") + + val sut = StreamTelemetryImpl(context, config(version = "1.0.0"), scope) + sut.cleanStaleVersions() + + assertTrue("Unrelated file should survive", unrelated.exists()) + } + + @Test + fun `cleanup only targets version directories, not files`() = runBlocking { + val baseDir = File(cacheDir, "stream/telemetry") + baseDir.mkdirs() + // Create a file (not directory) in the base path + File(baseDir, "some-file.txt").writeText("data") + // Create an old version dir + File(baseDir, "0.9.0").mkdirs() + + val sut = StreamTelemetryImpl(context, config(version = "1.0.0"), scope) + sut.cleanStaleVersions() + + assertTrue("File should survive (not a directory)", File(baseDir, "some-file.txt").exists()) + assertFalse("Old version dir should be deleted", File(baseDir, "0.9.0").exists()) + } + + // ======================================== + // Custom root and basePath + // ======================================== + + @Test + fun `custom root overrides cacheDir for cleanup`() = runBlocking { + val customRoot = tempDir.newFolder("custom-root") + // Seed stale version under custom root and cacheDir + val staleUnderCustomRoot = File(customRoot, "stream/telemetry/0.9.0") + staleUnderCustomRoot.mkdirs() + val staleUnderCacheDir = File(cacheDir, "stream/telemetry/0.9.0") + staleUnderCacheDir.mkdirs() + + val sut = StreamTelemetryImpl(context, config(version = "2.0.0", root = customRoot), scope) + sut.cleanStaleVersions() + + assertFalse("Stale under custom root should be deleted", staleUnderCustomRoot.exists()) + assertTrue( + "cacheDir should not be touched when custom root is set", + staleUnderCacheDir.exists(), + ) + } + + @Test + fun `custom basePath is used`() = runBlocking { + val baseDir = File(cacheDir, "custom/path") + + // Create old version to verify cleanup uses custom path + File(baseDir, "0.9.0").mkdirs() + + val sut = + StreamTelemetryImpl(context, config(version = "1.0.0", basePath = "custom/path"), scope) + sut.cleanStaleVersions() + + assertFalse( + "Old version under custom path should be deleted", + File(baseDir, "0.9.0").exists(), + ) + } + + // ======================================== + // Integration: scope + emit + drain + // ======================================== + + @Test + fun `end-to-end emit and drain through telemetry`() = runBlocking { + val sut = StreamTelemetryImpl(context, config(), scope) + + sut.scope("connection").emit("connected", "user-1") + sut.scope("network").emit("wifi", "5GHz") + + val connectionSignals = sut.scope("connection").drain().getOrThrow() + val networkSignals = sut.scope("network").drain().getOrThrow() + + assertEquals(1, connectionSignals.size) + assertEquals("connected", connectionSignals[0].tag) + + assertEquals(1, networkSignals.size) + assertEquals("wifi", networkSignals[0].tag) + } + + @Test + fun `scopes are isolated - draining one does not affect another`() = runBlocking { + val sut = StreamTelemetryImpl(context, config(), scope) + + sut.scope("a").emit("event-a", null) + sut.scope("b").emit("event-b", null) + + sut.scope("a").drain() + + val bSignals = sut.scope("b").drain().getOrThrow() + assertEquals(1, bSignals.size) + assertEquals("event-b", bSignals[0].tag) + } + + // ======================================== + // NoOp + // ======================================== + + @Test + fun `NoOp scope name is noop`() { + val telemetryScope = StreamTelemetryNoOp.scope("anything") + assertEquals("noop", telemetryScope.name) + } + + @Test + fun `NoOp emit does nothing`() = runBlocking { + val telemetryScope = StreamTelemetryNoOp.scope("test") + telemetryScope.emit("event", "data") + + val result = telemetryScope.drain() + assertTrue(result.isSuccess) + assertTrue(result.getOrThrow().isEmpty()) + } + + @Test + fun `NoOp returns same scope for any name`() { + val a = StreamTelemetryNoOp.scope("a") + val b = StreamTelemetryNoOp.scope("b") + assertSame(a, b) + } + + @Test + fun `NoOp drain returns success empty list`() = runBlocking { + val result = StreamTelemetryNoOp.scope("x").drain() + assertTrue(result.isSuccess) + assertEquals(emptyList(), result.getOrThrow()) + } +} diff --git a/stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImplTest.kt b/stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImplTest.kt new file mode 100644 index 0000000..27722b2 --- /dev/null +++ b/stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImplTest.kt @@ -0,0 +1,1092 @@ +/* + * Copyright (c) 2014-2026 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-core-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.android.core.internal.telemetry + +import io.getstream.android.core.api.model.telemetry.StreamSignal +import io.getstream.android.core.api.telemetry.StreamSignalRedactor +import java.io.File +import java.util.concurrent.CountDownLatch +import java.util.concurrent.CyclicBarrier +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicInteger +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancel +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import org.junit.After +import org.junit.Assert.assertEquals +import org.junit.Assert.assertFalse +import org.junit.Assert.assertNotNull +import org.junit.Assert.assertTrue +import org.junit.Before +import org.junit.Rule +import org.junit.Test +import org.junit.rules.TemporaryFolder + +@Suppress("LargeClass") +class StreamTelemetryScopeImplTest { + + @get:Rule val tempDir = TemporaryFolder() + + private lateinit var spillDir: File + private lateinit var scope: CoroutineScope + + @Before + fun setUp() { + spillDir = tempDir.newFolder("spill") + scope = CoroutineScope(SupervisorJob() + Dispatchers.Default) + } + + @After + fun tearDown() { + scope.cancel() + } + + private fun createScope( + name: String = "test", + memoryCapacity: Int = 500, + diskCapacity: Long = 1_000_000L, + redactor: StreamSignalRedactor? = null, + dir: File = spillDir, + cs: CoroutineScope = scope, + ): StreamTelemetryScopeImpl = + StreamTelemetryScopeImpl( + name = name, + memoryCapacity = memoryCapacity, + diskCapacity = diskCapacity, + spillDir = dir, + redactor = redactor, + scope = cs, + ) + + private fun waitForSpill() = Thread.sleep(500) + + // ======================================== + // Basic emit + drain + // ======================================== + + @Test + fun `emit single signal and drain returns it`() = runBlocking { + val sut = createScope() + sut.emit("connected", "user-123") + + val result = sut.drain() + assertTrue(result.isSuccess) + val signals = result.getOrThrow() + assertEquals(1, signals.size) + assertEquals("connected", signals[0].tag) + assertEquals("user-123", signals[0].data) + } + + @Test + fun `emit multiple signals preserves order`() = runBlocking { + val sut = createScope() + sut.emit("first", null) + sut.emit("second", null) + sut.emit("third", null) + + val signals = sut.drain().getOrThrow() + assertEquals(3, signals.size) + assertEquals("first", signals[0].tag) + assertEquals("second", signals[1].tag) + assertEquals("third", signals[2].tag) + } + + @Test + fun `emit with null data`() = runBlocking { + val sut = createScope() + sut.emit("event", null) + + val signals = sut.drain().getOrThrow() + assertEquals(1, signals.size) + assertEquals(null, signals[0].data) + } + + @Test + fun `emit with string data`() = runBlocking { + val sut = createScope() + sut.emit("event", """{"key":"value","count":42}""") + + val signals = sut.drain().getOrThrow() + assertEquals(1, signals.size) + assertEquals("""{"key":"value","count":42}""", signals[0].data) + } + + @Test + fun `signal has non-zero timestamp`() = runBlocking { + val before = System.currentTimeMillis() + val sut = createScope() + sut.emit("event", null) + val after = System.currentTimeMillis() + + val signal = sut.drain().getOrThrow().single() + assertTrue(signal.timestamp >= before) + assertTrue(signal.timestamp <= after) + } + + // ======================================== + // Drain semantics + // ======================================== + + @Test + fun `drain on empty scope returns success with empty list`() = runBlocking { + val sut = createScope() + + val result = sut.drain() + assertTrue(result.isSuccess) + assertTrue(result.getOrThrow().isEmpty()) + } + + @Test + fun `drain clears the buffer`() = runBlocking { + val sut = createScope() + sut.emit("first", null) + sut.drain() + + val result = sut.drain() + assertTrue(result.isSuccess) + assertTrue(result.getOrThrow().isEmpty()) + } + + @Test + fun `drain twice without emit returns empty second time`() = runBlocking { + val sut = createScope() + sut.emit("event", null) + + val first = sut.drain().getOrThrow() + assertEquals(1, first.size) + + val second = sut.drain().getOrThrow() + assertTrue(second.isEmpty()) + } + + @Test + fun `emit after drain goes into new buffer`() = runBlocking { + val sut = createScope() + sut.emit("before-drain", null) + sut.drain() + + sut.emit("after-drain", null) + val signals = sut.drain().getOrThrow() + assertEquals(1, signals.size) + assertEquals("after-drain", signals[0].tag) + } + + // ======================================== + // Scope name + // ======================================== + + @Test + fun `name is set correctly`() { + val sut = createScope(name = "connection") + assertEquals("connection", sut.name) + } + + // ======================================== + // Redactor + // ======================================== + + @Test + fun `redactor transforms signal data`() = runBlocking { + val redactor = StreamSignalRedactor { signal -> + if (signal.tag == "auth.token") { + signal.copy(data = "[REDACTED]") + } else { + signal + } + } + val sut = createScope(redactor = redactor) + + sut.emit("auth.token", "secret-jwt-token") + sut.emit("connected", "user-123") + + val signals = sut.drain().getOrThrow() + assertEquals(2, signals.size) + assertEquals("[REDACTED]", signals[0].data) + assertEquals("user-123", signals[1].data) + } + + @Test + fun `redactor returning null drops the signal`() = runBlocking { + val redactor = StreamSignalRedactor { null } + val sut = createScope(redactor = redactor) + + sut.emit("event", "data") + + val signals = sut.drain().getOrThrow() + assertTrue("Signal should be dropped when redactor returns null", signals.isEmpty()) + } + + @Test + fun `redactor selectively drops signals`() = runBlocking { + val redactor = StreamSignalRedactor { signal -> + if (signal.tag == "debug") null else signal + } + val sut = createScope(redactor = redactor) + + sut.emit("debug", "verbose info") + sut.emit("connected", "user-123") + + val signals = sut.drain().getOrThrow() + assertEquals(1, signals.size) + assertEquals("connected", signals[0].tag) + } + + @Test + fun `redactor can modify tag`() = runBlocking { + val redactor = StreamSignalRedactor { signal -> signal.copy(tag = "prefix.${signal.tag}") } + val sut = createScope(redactor = redactor) + sut.emit("event", null) + + val signals = sut.drain().getOrThrow() + assertEquals("prefix.event", signals[0].tag) + } + + @Test + fun `redactor exception does not crash emit and returns failure`() = runBlocking { + val redactor = StreamSignalRedactor { throw RuntimeException("redactor boom") } + val sut = createScope(redactor = redactor) + + val emitResult = sut.emit("event", null) + assertTrue("emit should capture redactor failure in Result", emitResult.isFailure) + + val drainResult = sut.drain() + assertTrue("drain should still succeed", drainResult.isSuccess) + } + + @Test + fun `no redactor leaves signal unchanged`() = runBlocking { + val sut = createScope(redactor = null) + sut.emit("event", "data") + + val signal = sut.drain().getOrThrow().single() + assertEquals("event", signal.tag) + assertEquals("data", signal.data) + } + + // ======================================== + // Memory capacity + spill + // ======================================== + + @Test + fun `signals within memory capacity stay in memory only`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 10, dir = dir) + repeat(10) { sut.emit("event-$it", null) } + + waitForSpill() + val spillFile = File(dir, "spill.bin") + assertTrue(!spillFile.exists() || spillFile.length() == 0L) + + val signals = sut.drain().getOrThrow() + assertEquals(10, signals.size) + } + + @Test + fun `exceeding memory capacity triggers spill`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 5, dir = dir) + repeat(6) { sut.emit("event-$it", null) } + + waitForSpill() + + val signals = sut.drain().getOrThrow() + assertTrue(signals.isNotEmpty()) + } + + @Test + fun `spill preserves FIFO order - disk signals come before memory`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 3, dir = dir) + + sut.emit("a", null) + sut.emit("b", null) + sut.emit("c", null) + sut.emit("d", null) + + waitForSpill() + + val tags = sut.drain().getOrThrow().map { it.tag } + assertEquals(listOf("a", "b", "c", "d"), tags) + } + + @Test + fun `multiple spills accumulate on disk`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 2, dir = dir) + + repeat(3) { sut.emit("s$it", null) } + waitForSpill() + repeat(2) { sut.emit("s${it + 3}", null) } + waitForSpill() + + val signals = sut.drain().getOrThrow() + assertTrue("Expected at least 4 signals, got ${signals.size}", signals.size >= 4) + } + + // ======================================== + // Disk capacity + rotation + // ======================================== + + @Test + fun `disk rotation drops oldest when capacity exceeded`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 2, diskCapacity = 100, dir = dir) + + repeat(20) { sut.emit("event-$it", "some-payload-data") } + waitForSpill() + + val spillFile = File(dir, "spill.bin") + if (spillFile.exists()) { + assertTrue( + "Spill file should be around disk capacity, was ${spillFile.length()}", + spillFile.length() <= 150, + ) + } + } + + // ======================================== + // Drop oldest under pressure + // ======================================== + + @Test + fun `when spill in progress, oldest signal is dropped from buffer`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 2, dir = dir) + + repeat(100) { sut.emit("event-$it", null) } + waitForSpill() + + val signals = sut.drain().getOrThrow() + assertTrue("Expected some signals, got ${signals.size}", signals.isNotEmpty()) + assertTrue("Expected some drops, got ${signals.size}", signals.size <= 100) + } + + // ======================================== + // Thread safety + // ======================================== + + @Test + fun `concurrent emit from multiple threads does not crash`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 50, dir = dir) + val threads = 10 + val emitsPerThread = 100 + val barrier = CyclicBarrier(threads) + val latch = CountDownLatch(threads) + + repeat(threads) { threadIdx -> + Thread { + barrier.await() + repeat(emitsPerThread) { i -> sut.emit("thread-$threadIdx-event-$i", null) } + latch.countDown() + } + .start() + } + + assertTrue(latch.await(10, TimeUnit.SECONDS)) + waitForSpill() + + val signals = sut.drain().getOrThrow() + assertTrue( + "Expected signals from concurrent emit, got ${signals.size}", + signals.isNotEmpty(), + ) + } + + @Test + fun `concurrent emit and drain does not crash`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 10, dir = dir) + val emitting = AtomicInteger(1) + val drainResults = mutableListOf>>() + + val emitJob = + scope.launch { + var i = 0 + while (emitting.get() == 1) { + sut.emit("event-${i++}", null) + delay(1) + } + } + + repeat(20) { + Thread.sleep(10) + drainResults.add(sut.drain()) + } + + emitting.set(0) + emitJob.cancel() + + drainResults.forEach { result -> + assertTrue("drain() should not fail during concurrent access", result.isSuccess) + } + } + + // ======================================== + // Disk Mutex — concurrent spill + drain + // ======================================== + + @Test + fun `drain while spill is in flight returns consistent data`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 3, dir = dir) + + repeat(4) { sut.emit("event-$it", null) } + + Thread.sleep(100) + val result = sut.drain() + + assertTrue("drain should succeed", result.isSuccess) + val signals = result.getOrThrow() + assertEquals(4, signals.size) + } + + @Test + fun `rapid spill cycles with tiny memory capacity`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 1, dir = dir) + + repeat(200) { sut.emit("event-$it", null) } + waitForSpill() + + val result = sut.drain() + assertTrue("drain should succeed after many spill cycles", result.isSuccess) + val signals = result.getOrThrow() + assertTrue("Expected signals, got ${signals.size}", signals.isNotEmpty()) + + signals.forEach { signal -> + assertTrue("Tag should start with event-", signal.tag.startsWith("event-")) + assertTrue("Timestamp should be positive", signal.timestamp > 0) + } + } + + @Test + fun `concurrent drain and emit with disk overflow`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 2, diskCapacity = 200, dir = dir) + val emitting = AtomicInteger(1) + val allDrainedSignals = mutableListOf() + val drainFailures = AtomicInteger(0) + + val emitJob = + scope.launch { + var i = 0 + while (emitting.get() == 1) { + sut.emit("event-${i++}", "payload") + delay(1) + } + } + + repeat(30) { + Thread.sleep(10) + val result = sut.drain() + if (result.isSuccess) { + synchronized(allDrainedSignals) { allDrainedSignals.addAll(result.getOrThrow()) } + } else { + drainFailures.incrementAndGet() + } + } + + emitting.set(0) + emitJob.cancel() + waitForSpill() + + sut.drain().onSuccess { remaining -> + synchronized(allDrainedSignals) { allDrainedSignals.addAll(remaining) } + } + + assertTrue("Should have collected some signals", allDrainedSignals.isNotEmpty()) + assertEquals("No drain should have failed", 0, drainFailures.get()) + + val uniqueTimestampTagPairs = allDrainedSignals.map { "${it.timestamp}-${it.tag}" }.toSet() + assertEquals( + "No duplicate signals expected", + allDrainedSignals.size, + uniqueTimestampTagPairs.size, + ) + } + + @Test + fun `multiple coroutines draining same scope concurrently`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 5, dir = dir) + + repeat(20) { sut.emit("event-$it", null) } + waitForSpill() + + val results = (1..10).map { scope.launch { sut.drain() } } + results.forEach { it.join() } + + val remaining = sut.drain().getOrThrow() + assertTrue("Scope should be empty after concurrent drains", remaining.isEmpty()) + } + + @Test + fun `spill and drain interleaved rapidly`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 2, dir = dir) + val allSignals = mutableListOf() + + repeat(50) { cycle -> + repeat(3) { sut.emit("cycle-$cycle-event-$it", null) } + Thread.sleep(50) + sut.drain().onSuccess { signals -> + synchronized(allSignals) { allSignals.addAll(signals) } + } + } + + waitForSpill() + sut.drain().onSuccess { signals -> synchronized(allSignals) { allSignals.addAll(signals) } } + + assertTrue("Expected most of 150 signals, got ${allSignals.size}", allSignals.size >= 100) + } + + // ======================================== + // Disk I/O failures + // ======================================== + + @Test + fun `drain handles corrupt spill file gracefully`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(dir = dir) + + dir.mkdirs() + File(dir, "spill.bin").writeText("corrupt\u0000garbage\u0000not\u0000valid") + + sut.emit("memory-signal", null) + val result = sut.drain() + assertTrue(result.isSuccess) + } + + @Test + fun `emit succeeds when spill directory cannot be created`() = runBlocking { + // Use a file as parent so mkdirs() fails deterministically (not uid-dependent) + val blocker = tempDir.newFile("blocker-file") + val sut = createScope(memoryCapacity = 2, dir = File(blocker, "nested")) + + repeat(5) { sut.emit("event-$it", null) } + waitForSpill() + + val result = sut.drain() + assertTrue("drain should succeed even after spill failure", result.isSuccess) + } + + @Test + fun `drain with non-existent spill directory returns memory signals only`() = runBlocking { + val nonExistent = File(tempDir.root, "does-not-exist") + val sut = createScope(dir = nonExistent) + + sut.emit("event", "data") + val signals = sut.drain().getOrThrow() + assertEquals(1, signals.size) + } + + @Test + fun `drain with empty spill file returns memory signals only`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(dir = dir) + + dir.mkdirs() + File(dir, "spill.bin").createNewFile() + + sut.emit("event", "data") + val signals = sut.drain().getOrThrow() + assertEquals(1, signals.size) + assertEquals("event", signals[0].tag) + } + + // ======================================== + // Serialization edge cases + // ======================================== + + @Test + fun `signal with tab in tag survives spill roundtrip`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 2, dir = dir) + + sut.emit("tag\twith\ttabs", "data") + sut.emit("trigger-spill", null) + sut.emit("trigger-spill-2", null) + waitForSpill() + + val signals = sut.drain().getOrThrow() + val tabSignal = signals.find { it.tag == "tag\twith\ttabs" } + assertNotNull("Signal with tabs in tag should survive roundtrip", tabSignal) + } + + @Test + fun `signal with tab in data survives spill roundtrip`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 2, dir = dir) + + sut.emit("event", "data\twith\ttabs") + sut.emit("trigger-spill", null) + sut.emit("trigger-spill-2", null) + waitForSpill() + + val signals = sut.drain().getOrThrow() + val found = signals.find { it.tag == "event" } + assertNotNull("Signal should survive", found) + assertEquals("data\twith\ttabs", found?.data) + } + + @Test + fun `signal with newline in data - known limitation`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 2, dir = dir) + + sut.emit("event", "line1\nline2") + sut.emit("trigger-spill", null) + sut.emit("trigger-spill-2", null) + waitForSpill() + + val signals = sut.drain().getOrThrow() + assertTrue(signals.isNotEmpty()) + } + + @Test + fun `signal with empty string data`() = runBlocking { + val sut = createScope() + sut.emit("event", "") + + val signal = sut.drain().getOrThrow().single() + assertEquals("event", signal.tag) + assertEquals("", signal.data) + } + + @Test + fun `signal with very long data`() = runBlocking { + val sut = createScope() + val longData = "x".repeat(10_000) + sut.emit("event", longData) + + val signal = sut.drain().getOrThrow().single() + assertEquals(longData, signal.data) + } + + @Test + fun `signal with unicode data`() = runBlocking { + val sut = createScope() + sut.emit("event", "données télémétrie 日本語") + + val signal = sut.drain().getOrThrow().single() + assertEquals("données télémétrie 日本語", signal.data) + } + + @Test + fun `signal with empty tag`() = runBlocking { + val sut = createScope() + sut.emit("", "data") + + val signal = sut.drain().getOrThrow().single() + assertEquals("", signal.tag) + } + + // ======================================== + // Memory capacity boundary values + // ======================================== + + @Test + fun `exactly at memory capacity does not spill`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 5, dir = dir) + repeat(5) { sut.emit("event-$it", null) } + + waitForSpill() + val spillFile = File(dir, "spill.bin") + assertTrue( + "Should not spill at exactly capacity", + !spillFile.exists() || spillFile.length() == 0L, + ) + + val signals = sut.drain().getOrThrow() + assertEquals(5, signals.size) + } + + @Test + fun `one over memory capacity triggers spill`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 5, dir = dir) + repeat(6) { sut.emit("event-$it", null) } + + waitForSpill() + val signals = sut.drain().getOrThrow() + assertTrue(signals.isNotEmpty()) + } + + @Test + fun `memory capacity of 1 spills on every second emit`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 1, dir = dir) + + sut.emit("a", null) + sut.emit("b", null) + waitForSpill() + + val signals = sut.drain().getOrThrow() + assertEquals(2, signals.size) + } + + // ======================================== + // Interleaved emit/drain cycles + // ======================================== + + @Test + fun `multiple emit-drain cycles work correctly`() = runBlocking { + val sut = createScope() + + repeat(5) { cycle -> + sut.emit("cycle-$cycle", null) + val signals = sut.drain().getOrThrow() + assertEquals(1, signals.size) + assertEquals("cycle-$cycle", signals[0].tag) + } + } + + @Test + fun `drain between spill cycles returns accumulated signals`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 2, dir = dir) + + repeat(3) { sut.emit("batch1-$it", null) } + waitForSpill() + val first = sut.drain().getOrThrow() + assertTrue(first.isNotEmpty()) + + repeat(3) { sut.emit("batch2-$it", null) } + waitForSpill() + val second = sut.drain().getOrThrow() + assertTrue(second.isNotEmpty()) + + val third = sut.drain().getOrThrow() + assertTrue(third.isEmpty()) + } + + // ======================================== + // Emit never throws + // ======================================== + + @Test + fun `emit never throws regardless of internal failure`() { + val sut = + StreamTelemetryScopeImpl( + name = "test", + memoryCapacity = 0, + diskCapacity = 0, + spillDir = File("/nonexistent/path/that/cannot/exist"), + redactor = null, + scope = scope, + ) + + repeat(10) { sut.emit("event-$it", null) } + } + + // ======================================== + // Boundary value: disk capacity + // ======================================== + + @Test + fun `disk capacity of 0 means spilled signals are trimmed away`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 2, diskCapacity = 0, dir = dir) + + repeat(5) { sut.emit("event-$it", null) } + waitForSpill() + + val signals = sut.drain().getOrThrow() + assertTrue("Expected fewer than 5 due to disk trim, got ${signals.size}", signals.size < 5) + } + + @Test + fun `disk capacity of 1 byte trims aggressively`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 2, diskCapacity = 1, dir = dir) + + repeat(5) { sut.emit("event-$it", null) } + waitForSpill() + + val spillFile = File(dir, "spill.bin") + if (spillFile.exists()) { + assertTrue( + "Spill file should be trimmed, was ${spillFile.length()} bytes", + spillFile.length() <= 50, + ) + } + } + + @Test + fun `disk capacity exactly matches one signal line`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 2, diskCapacity = 50, dir = dir) + + repeat(10) { sut.emit("evt", "d") } + waitForSpill() + + val spillFile = File(dir, "spill.bin") + if (spillFile.exists()) { + assertTrue("Spill file should stay around capacity", spillFile.length() <= 100) + } + } + + // ======================================== + // Boundary value: memory capacity + // ======================================== + + @Test + fun `memory capacity of 0 spills every signal`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 0, dir = dir) + + sut.emit("event", null) + waitForSpill() + + val spillFile = File(dir, "spill.bin") + val signals = sut.drain().getOrThrow() + assertTrue(signals.isNotEmpty() || spillFile.exists()) + } + + @Test + fun `large memory capacity holds everything in memory`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 100_000, dir = dir) + + repeat(10_000) { sut.emit("event-$it", null) } + + val spillFile = File(dir, "spill.bin") + assertTrue( + "No spill expected with large capacity", + !spillFile.exists() || spillFile.length() == 0L, + ) + + val signals = sut.drain().getOrThrow() + assertEquals(10_000, signals.size) + } + + // ======================================== + // Disk write failure (deterministic) + // ======================================== + + @Test + fun `emit survives when spill dir parent is a file`() = runBlocking { + // A file as parent makes mkdirs() fail deterministically regardless of uid + val blocker = tempDir.newFile("blocker") + val sut = + StreamTelemetryScopeImpl( + name = "test", + memoryCapacity = 2, + diskCapacity = 1_000_000L, + spillDir = File(blocker, "nested"), + redactor = null, + scope = scope, + ) + + repeat(10) { sut.emit("event-$it", null) } + waitForSpill() + + val result = sut.drain() + assertTrue("drain should succeed even after spill failure", result.isSuccess) + } + + @Test + fun `drain succeeds when spill file was deleted externally`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 2, dir = dir) + + repeat(3) { sut.emit("event-$it", null) } + waitForSpill() + + File(dir, "spill.bin").delete() + + val result = sut.drain() + assertTrue(result.isSuccess) + } + + @Test + fun `drain succeeds when spill directory was deleted externally`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 2, dir = dir) + + repeat(3) { sut.emit("event-$it", null) } + waitForSpill() + + dir.deleteRecursively() + + sut.emit("after-delete", null) + val result = sut.drain() + assertTrue(result.isSuccess) + val signals = result.getOrThrow() + assertTrue(signals.any { it.tag == "after-delete" }) + } + + // ======================================== + // Corrupt spill file + // ======================================== + + @Test + fun `drain handles spill file with only blank lines`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(dir = dir) + dir.mkdirs() + File(dir, "spill.bin").writeText("\n\n\n\n") + + sut.emit("memory-signal", null) + val signals = sut.drain().getOrThrow() + assertEquals(1, signals.size) + assertEquals("memory-signal", signals[0].tag) + } + + @Test + fun `drain handles spill file with mixed valid and invalid lines`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(dir = dir) + dir.mkdirs() + val content = + listOf( + "${System.currentTimeMillis()}\tvalid-signal\tdata", + "not-a-timestamp\tbad", + "", + "${System.currentTimeMillis()}\tanother-valid\t", + ) + .joinToString("\n") + File(dir, "spill.bin").writeText(content + "\n") + + val signals = sut.drain().getOrThrow() + val diskSignals = signals.filter { it.tag == "valid-signal" || it.tag == "another-valid" } + assertEquals(2, diskSignals.size) + } + + @Test + fun `drain handles spill file with only one field per line`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(dir = dir) + dir.mkdirs() + File(dir, "spill.bin").writeText("justOneField\nandAnother\n") + + sut.emit("memory", null) + val signals = sut.drain().getOrThrow() + assertEquals(1, signals.size) + assertEquals("memory", signals[0].tag) + } + + @Test + fun `drain handles binary garbage in spill file`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(dir = dir) + dir.mkdirs() + File(dir, "spill.bin") + .writeBytes(byteArrayOf(0, 1, 2, 3, 0xFF.toByte(), 0xFE.toByte(), 10, 0, 0, 10)) + + sut.emit("memory", null) + val result = sut.drain() + assertTrue(result.isSuccess) + } + + @Test + fun `drain handles very large spill file`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(dir = dir, diskCapacity = 10_000_000) + dir.mkdirs() + + val lines = + (1..10_000).joinToString("\n") { i -> + "${System.currentTimeMillis()}\tevent-$i\tpayload-$i" + } + File(dir, "spill.bin").writeText(lines + "\n") + + val signals = sut.drain().getOrThrow() + assertEquals(10_000, signals.size) + + assertFalse(File(dir, "spill.bin").exists()) + } + + // ======================================== + // Spill file lifecycle + // ======================================== + + @Test + fun `spill file is deleted after successful drain`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 2, dir = dir) + + repeat(3) { sut.emit("event-$it", null) } + waitForSpill() + + val spillFile = File(dir, "spill.bin") + assertTrue("Spill file should exist before drain", spillFile.exists()) + + sut.drain() + assertFalse("Spill file should be deleted after drain", spillFile.exists()) + } + + @Test + fun `spill file is not created when signals fit in memory`() = runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 100, dir = dir) + + repeat(50) { sut.emit("event-$it", null) } + waitForSpill() + + assertFalse("No spill file expected", File(dir, "spill.bin").exists()) + } + + // ======================================== + // Encode/decode edge cases + // ======================================== + + @Test + fun `signal with literal backslash-t in tag - known limitation after spill roundtrip`() = + runBlocking { + val dir = tempDir.newFolder() + val sut = createScope(memoryCapacity = 1, dir = dir) + + sut.emit("tag\\twith\\tescape", "data") + sut.emit("trigger", null) + waitForSpill() + + val signals = sut.drain().getOrThrow() + // After roundtrip, \\t becomes \t (real tab) — the escape is ambiguous + assertTrue(signals.any { it.tag == "tag\twith\tescape" }) + } + + @Test + fun `signal with only whitespace tag and data`() = runBlocking { + val sut = createScope() + sut.emit(" ", " ") + + val signal = sut.drain().getOrThrow().single() + assertEquals(" ", signal.tag) + assertEquals(" ", signal.data) + } + + @Test + fun `timestamps are monotonically non-decreasing`() = runBlocking { + val sut = createScope() + repeat(100) { sut.emit("event-$it", null) } + + val signals = sut.drain().getOrThrow() + for (i in 1 until signals.size) { + assertTrue( + "Timestamps should be monotonically non-decreasing", + signals[i].timestamp >= signals[i - 1].timestamp, + ) + } + } +}