Add StreamTelemetry capabilities to be reused across products#59
Add StreamTelemetry capabilities to be reused across products#59aleksandar-apostolov merged 9 commits intodevelopfrom
StreamTelemetry capabilities to be reused across products#59Conversation
Introduce a general-purpose telemetry engine for collecting signals across named scopes. Product SDKs inject it via StreamComponentProvider, core emits internally, product drains on its own schedule. - StreamSignal: tag + data + timestamp - StreamTelemetryScope: emit() + drain() with buffer-swap thread safety - StreamTelemetryScopeImpl: memory-first, disk spill on overflow, drop oldest under pressure - StreamSignalRedactor: anonymization hook on emit() - StreamTelemetryConfig: per-scope memory/disk caps, configurable paths, SDK-versioned disk layout with stale version cleanup - StreamTelemetryNoOp: zero-cost default when telemetry not provided - StreamComponentProvider: added telemetry field
PR checklist ✅All required conditions are satisfied:
🎉 Great job! This PR is ready for review. |
StreamTelemetry capabilities to be reused across products
…or disk ops - StreamSignal → api/model/telemetry/ (data models package) - StreamTelemetryConfig → api/model/config/ (alongside other configs) - drain() is now suspend, disk reads on Dispatchers.IO - spillToDisk launches on Dispatchers.IO via scope - cleanStaleVersions runs on Dispatchers.IO at init - StreamTelemetryImpl and StreamTelemetryScopeImpl accept CoroutineScope
- drain() now returns Result<List<StreamSignal>> so product SDK can distinguish between empty (no signals) and failure (disk I/O error) - Use runCatchingCancellable to properly rethrow CancellationException - drainDisk() no longer swallows exceptions — they propagate to Result
77 tests covering: - Basic emit/drain, ordering, timestamps - Drain semantics (clear, idempotent, interleaved cycles) - Redactor (transform, drop fallback, exception safety) - Memory capacity boundaries (at, below, above, 0, 1, large) - Disk spill (FIFO order, multiple spills, accumulation) - Disk capacity boundaries (0, 1 byte, exact fit, rotation) - Thread safety (concurrent emit, concurrent emit+drain) - Disk I/O failures (read-only dir, corrupt files, external deletion) - Corrupt spill files (blank lines, mixed valid/invalid, binary garbage, single-field lines, very large files) - Spill file lifecycle (created on overflow, deleted after drain) - Serialization edge cases (tabs, unicode, empty strings, long data, newline limitation, escape sequence ambiguity) - Version cleanup (stale dirs deleted, current kept, siblings safe) - Scope creation (identity, isolation, special chars, empty name) - Custom config (root override, basePath) - NoOp (emit discards, drain returns empty success) Fix: replace List.removeFirst() with removeAt(0) for JDK < 21 compat
- Add Mutex for disk I/O — serializes spillToDisk and drainDisk to prevent concurrent read/write on the same spill file - emit() returns Result<Unit> via runCatching - drain() already returns Result<List<StreamSignal>> via runCatchingCancellable - spillToDisk uses runCatchingCancellable instead of try/catch - Every public method now returns Result<*> for API consistency
5 new tests exercising the disk Mutex under concurrent access: - drain while spill is in flight - rapid spill cycles with memoryCapacity=1 - concurrent drain and emit with disk overflow + no-duplicate check - multiple coroutines draining same scope concurrently - spill and drain interleaved rapidly over 50 cycles
- cleanStaleVersions() is now a public suspend method on StreamTelemetry - Product SDK calls it when ready, not auto-fired in init - Removed hardcoded Dispatchers.IO from StreamTelemetryImpl - Uses runCatchingCancellable, returns Result<Unit> - Updated tests to call cleanStaleVersions() explicitly
WalkthroughThis PR introduces a comprehensive telemetry system to stream-android-core, enabling the capture, buffering, and draining of telemetry signals with configurable memory and disk storage, signal redaction, and a no-op fallback implementation. Changes
Sequence Diagram(s)sequenceDiagram
participant App as Application
participant Tel as StreamTelemetry
participant Scope as StreamTelemetryScope
participant Mem as Memory Buffer
participant Disk as Disk Storage
App->>Tel: scope(name)
activate Tel
Tel->>Scope: create or retrieve
deactivate Tel
loop Emit Phase
App->>Scope: emit(tag, data)
activate Scope
Scope->>Scope: apply redactor
Scope->>Mem: add signal
alt Memory at capacity
Scope->>Disk: spill oldest signals (async)
end
Scope->>App: return Result<Unit>
deactivate Scope
end
App->>Scope: drain()
activate Scope
Scope->>Disk: read and delete spill file
Scope->>Mem: snapshot and clear buffer
Scope->>Scope: merge disk + memory signals
Scope->>App: return Result<List<StreamSignal>>
deactivate Scope
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 7
🧹 Nitpick comments (2)
stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImplTest.kt (1)
337-352: Assertion is conditional onspillFile.exists()and trivially passes if no spill occurred.If, for any reason (timing, impl change), the spill never lands on disk by the time
waitForSpill()returns, theif (spillFile.exists())branch is skipped and the test passes without exercising rotation at all. Since the test's purpose is disk rotation under capacity pressure, make the existence of the spill file a precondition:🔧 Suggested tightening
val spillFile = File(dir, "spill.bin") - if (spillFile.exists()) { - assertTrue( - "Spill file should be around disk capacity, was ${spillFile.length()}", - spillFile.length() <= 150, - ) - } + assertTrue("Spill file must exist to test rotation", spillFile.exists()) + assertTrue( + "Spill file should be around disk capacity, was ${spillFile.length()}", + spillFile.length() <= 150, + )The same pattern appears in
disk capacity of 1 byte trims aggressively(lines 836-842) anddisk capacity exactly matches one signal line(lines 853-856) — consider applying the same change there.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImplTest.kt` around lines 337 - 352, The test disk rotation assertion currently skips verification if the spill file doesn't exist; change the test `disk rotation drops oldest when capacity exceeded` to require the spill file's existence after calling `waitForSpill()`—replace the conditional `if (spillFile.exists())` with an explicit assertion that `spillFile.exists()` is true, then perform the length assertion against `spillFile.length()`; do the same tightening for the other tests mentioned (`disk capacity of 1 byte trims aggressively` and `disk capacity exactly matches one signal line`) so they assert the spill file exists before checking its size, and keep use of `createScope(...)`, `waitForSpill()`, `memoryCapacity`, and `diskCapacity` unchanged.stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImplTest.kt (1)
128-141: Consider replacingrunBlocking+Thread.sleepwithrunTestand a controllable dispatcher.The init cleanup runs on
Dispatchers.Defaultvia the injectedscope, and tests wait a fixed 500ms. On slower CI agents this can be racy (flaky failures) or unnecessarily slow when cleanup finishes in 1 ms. As per coding guidelines (use MockK and coroutines-test to control timing), prefer injecting aStandardTestDispatcher/UnconfinedTestDispatcherand usingrunTest { … advanceUntilIdle() }, or expose aJobfrom the cleanup so tests canjoin()deterministically. This comment applies to everyrunBlocking { … Thread.sleep(500) }usage in both telemetry test files.As per coding guidelines: "Use MockK and coroutines-test to control timing".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImplTest.kt` around lines 128 - 141, The test uses runBlocking + Thread.sleep to wait for async cleanup started by StreamTelemetryImpl, which is flaky; change the tests to use kotlinx.coroutines.test runTest with a StandardTestDispatcher/UnconfinedTestDispatcher and inject a TestScope/TestDispatcher into StreamTelemetryImpl (or expose the cleanup Job) so you can deterministically wait: construct StreamTelemetryImpl(context, config(version = "1.0.0"), testScope) then call advanceUntilIdle() (or join the exposed Job) instead of Thread.sleep; apply the same pattern to all telemetry tests that use runBlocking + Thread.sleep.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@stream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.kt`:
- Around line 149-168: encode(...) currently calls signal.data?.toString() so
decode(...) always yields StreamSignal.data as String?, causing memory vs disk
round-trip type inconsistency; update the public API docs: add KDoc to
StreamSignal.data (and to emit(...) / drain(...) where relevant) explicitly
stating that spills to disk coerce data to String? and callers must
pre-serialize or expect String, and include a note next steps (optionally change
StreamSignal.data to String? or replace encode/decode with a proper serializer
such as Moshi to preserve types); ensure you reference StreamSignal.data,
encode, and decode in the KDoc so callers see the behavior.
- Around line 74-91: The emit method in StreamTelemetryScopeImpl currently uses
"val signal = redactor?.redact(raw) ?: raw" which accidentally re-inserts
unredacted `raw` when a non-null redactor intentionally returns null to drop a
signal; change this to call redact and if the result is null, exit without
buffering (i.e., do not add to buffer or trigger spilling). Specifically, in
emit(), replace the Elvis expression with a nullable result from
StreamSignalRedactor.redact(raw) and if that result is null, return/skip the
rest of the synchronized block so the signal is dropped rather than buffered.
- Around line 125-134: trimDiskIfNeeded currently removes one line and rewrites
the whole file repeatedly causing O(N²) disk I/O; change it to compute which
suffix of lines fits within diskCapacity once and then write that suffix in a
single file.writeText call. Specifically, in trimDiskIfNeeded: read all lines
into a list, iterate from the end accumulating the byte size of each line
(including the separator/newline) until the cumulative size <= diskCapacity,
then create the targetSlice as the tail subset that fits and call
file.writeText(targetSlice.joinToString("\n", postfix = "\n")) once (do not
re-check file.length() inside the loop). Ensure you reference the function name
trimDiskIfNeeded and use the same line-reading approach but only perform one
write.
In
`@stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImplTest.kt`:
- Around line 197-209: The test computes expectedDir but never verifies the
telemetry uses the custom root; update the test for StreamTelemetryImpl by
forcing a spill (e.g., emit enough events or call the telemetry scope's
flush/spill method after creating telemetryScope via
StreamTelemetryImpl.scope("test") and emitting an event) and then assert that
the actual spill path (the directory or file produced by that spill) is under
the computed expectedDir (File(customRoot, "stream/telemetry/2.0.0/test")), or
alternatively mirror the approach from the "custom basePath is used" test to
assert version cleanup/paths use config(root = customRoot) instead of cacheDir.
In
`@stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImplTest.kt`:
- Around line 250-257: Update the test `redactor exception does not crash emit`
to actually assert the emit result and downstream behavior: call
`sut.emit("event", null)` and assert the returned Result is successful (e.g.,
result.isSuccess), then call `sut.drain()` and assert the drained signal is the
raw/fallback signal (matching the behavior tested in the `redactor returning
null falls back to raw signal` test). Use the existing `StreamSignalRedactor`
that throws and the `createScope` helper to locate the code paths
(`StreamSignalRedactor`, `createScope`, `sut.emit`, `sut.drain`) and ensure the
test fails if emit silently did nothing.
- Around line 311-317: The current FIFO assertion is weak because
tags.indexOf("a") can be -1; update the test around sut.drain().getOrThrow() and
tags so you first assert required disk and memory tags exist (e.g., ensure
tags.contains("a") and tags.contains("d")), then strengthen the FIFO check by
computing all disk-tag indices and all memory-tag indices and asserting that the
maximum disk index is less than the minimum memory index (ensuring every disk
signal precedes every memory signal) instead of only comparing tags.indexOf("a")
and tags.indexOf("d").
- Around line 586-597: The two tests 'emit does not throw when spill directory
is read-only' and 'emit survives when spill dir is read-only' are flaky because
File.setReadOnly() can be ignored by root; replace the unreliable permission
approach with a deterministic write barrier: create a real file and pass
File(existingFile, "nested") as the spill dir so mkdirs() will always fail, or
add an early guard using Assume.assumeFalse(isRunningAsRoot()) to skip the test
under root; update the test setup in the methods that call
createScope(memoryCapacity = ..., dir = ...) to use the existing-file-parent
pattern (or the Assume guard) so the spill path is exercised consistently.
---
Nitpick comments:
In
`@stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImplTest.kt`:
- Around line 128-141: The test uses runBlocking + Thread.sleep to wait for
async cleanup started by StreamTelemetryImpl, which is flaky; change the tests
to use kotlinx.coroutines.test runTest with a
StandardTestDispatcher/UnconfinedTestDispatcher and inject a
TestScope/TestDispatcher into StreamTelemetryImpl (or expose the cleanup Job) so
you can deterministically wait: construct StreamTelemetryImpl(context,
config(version = "1.0.0"), testScope) then call advanceUntilIdle() (or join the
exposed Job) instead of Thread.sleep; apply the same pattern to all telemetry
tests that use runBlocking + Thread.sleep.
In
`@stream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImplTest.kt`:
- Around line 337-352: The test disk rotation assertion currently skips
verification if the spill file doesn't exist; change the test `disk rotation
drops oldest when capacity exceeded` to require the spill file's existence after
calling `waitForSpill()`—replace the conditional `if (spillFile.exists())` with
an explicit assertion that `spillFile.exists()` is true, then perform the length
assertion against `spillFile.length()`; do the same tightening for the other
tests mentioned (`disk capacity of 1 byte trims aggressively` and `disk capacity
exactly matches one signal line`) so they assert the spill file exists before
checking its size, and keep use of `createScope(...)`, `waitForSpill()`,
`memoryCapacity`, and `diskCapacity` unchanged.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 8fd9783f-e9a4-42fb-a690-65c38324f54e
📒 Files selected for processing (10)
stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamComponentProvider.ktstream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamTelemetryConfig.ktstream-android-core/src/main/java/io/getstream/android/core/api/model/telemetry/StreamSignal.ktstream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamSignalRedactor.ktstream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetry.ktstream-android-core/src/main/java/io/getstream/android/core/api/telemetry/StreamTelemetryScope.ktstream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImpl.ktstream-android-core/src/main/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImpl.ktstream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryImplTest.ktstream-android-core/src/test/java/io/getstream/android/core/internal/telemetry/StreamTelemetryScopeImplTest.kt
Source fixes: - StreamSignal.data: Any? → String? — callers pre-serialize, no silent toString() coercion on disk spill (addresses Petar's review + bot #3) - Redactor null return now drops the signal instead of falling back to raw — honors the StreamSignalRedactor KDoc contract (bot #1) - trimDiskIfNeeded O(N²) → O(N) single-pass — compute suffix that fits within diskCapacity and write once (bot #2) - Updated KDocs on StreamSignal, StreamTelemetryScope.emit Test fixes: - FIFO assertion strengthened — assertEquals exact order instead of weak indexOf check (bot #6) - Redactor tests updated for null = drop behavior - Redactor exception test now asserts Result.isFailure + drain success (bot #5) - Custom root test verifies cleanup targets custom root, not cacheDir (bot #4) - setReadOnly tests replaced with file-as-parent barrier for deterministic mkdirs() failure regardless of uid (bot #7)
- Replace hardcoded Dispatchers.IO with injectable ioDispatcher param
(defaults to Dispatchers.IO, testable with test dispatchers)
- Handle file.delete() return value — fallback to writeText("") if
delete fails
|
|
🚀 Available in v5.0.0 |



Goal
Introduce a general-purpose telemetry engine in core that product SDKs (Chat, Video, Feeds) can use to collect, buffer, and drain operational signals.
Implementation
API layer (
api/telemetry/):StreamSignal— data class: tag + data + timestampStreamTelemetryScope— named scope withemit()anddrain()StreamTelemetry— engine interface, factory function, andStreamTelemetryNoOpStreamSignalRedactor— anonymization hook that runs onemit()StreamTelemetryConfig— memory/disk capacity, configurable paths, SDK-versioned disk layoutInternal layer (
internal/telemetry/):StreamTelemetryScopeImpl— buffer-swap thread safety, memory-first with disk spill on overflow, drops oldest under pressureStreamTelemetryImpl— scope factory with stale version cleanup on initIntegration:
telemetry: StreamTelemetry? = nulltoStreamComponentProviderStreamTelemetryNoOp(zero cost)Design highlights:
emit()never throws or affects the callerTesting
StreamTelemetryScopeImpl(emit, drain, spill, redaction, thread safety)StreamTelemetryImpl(scope creation, version cleanup)pr: core
Summary by CodeRabbit
Release Notes