diff --git a/uts/src/test/kotlin/io/ably/lib/test/helper/ProxyManager.kt b/uts/src/test/kotlin/io/ably/lib/test/helper/ProxyManager.kt new file mode 100644 index 000000000..39e5c9c64 --- /dev/null +++ b/uts/src/test/kotlin/io/ably/lib/test/helper/ProxyManager.kt @@ -0,0 +1,235 @@ +package io.ably.lib.test.helper + +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.delay +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import kotlinx.coroutines.withContext +import java.io.ByteArrayInputStream +import java.net.URI +import java.net.http.HttpClient +import java.net.http.HttpRequest as JHttpRequest +import java.net.http.HttpResponse as JHttpResponse +import java.nio.channels.FileChannel +import java.nio.file.Files +import java.nio.file.Path +import java.nio.file.StandardOpenOption.CREATE +import java.nio.file.StandardOpenOption.TRUNCATE_EXISTING +import java.nio.file.StandardOpenOption.WRITE +import java.security.MessageDigest +import java.util.zip.GZIPInputStream + +/** + * Manages the lifecycle of the `uts-proxy` binary used for integration tests. + * + * Downloads the binary from GitHub releases on first use, caching it at + * `~/.cache/uts-proxy//uts-proxy`. Safe for concurrent Gradle test workers — + * a `FileLock` on `uts-proxy.lock` serialises the download across OS processes, while + * a [Mutex] serialises it within the same JVM. + * + * Call [ensureProxy] in `@BeforeAll` / `setUpAll()` for every proxy integration test suite. + */ +object ProxyManager { + + private const val PROXY_VERSION = "v0.2.0" + private const val VERSION_BARE = "0.2.0" + const val CONTROL_PORT = 9100 + private const val SANDBOX_HOST = "sandbox.realtime.ably-nonprod.net" + private const val GITHUB_BASE = + "https://github.com/ably/uts-proxy/releases/download/$PROXY_VERSION" + + val sandboxRealtimeHost: String = SANDBOX_HOST + val sandboxRestHost: String = SANDBOX_HOST + + private val CHECKSUMS = mapOf( + "uts-proxy_${VERSION_BARE}_darwin_amd64.tar.gz" to + "4abc4bd0682b61d53889c3ad3b240b44cf942878ed9fb04e8912a48070d2666d", + "uts-proxy_${VERSION_BARE}_darwin_arm64.tar.gz" to + "2b95cdb5659988f54ad3d413c713f94f944e3b0014011aba2e339b9537c59b2f", + "uts-proxy_${VERSION_BARE}_linux_amd64.tar.gz" to + "aa6d536101ebc3bfa6870ca4cfb75be1947360dc5c1c77d7a8536baa1fee7caa", + "uts-proxy_${VERSION_BARE}_linux_arm64.tar.gz" to + "c8f9363ae579508004727175a098bd0b73518ee3f08cf9071b0c372f8199767a", + ) + + private val os: String by lazy { + val name = System.getProperty("os.name").lowercase() + when { + name.contains("mac") -> "darwin" + name.contains("linux") -> "linux" + else -> error("Unsupported OS for uts-proxy: ${System.getProperty("os.name")}") + } + } + + private val arch: String by lazy { + when (System.getProperty("os.arch").lowercase()) { + "amd64", "x86_64" -> "amd64" + "aarch64", "arm64" -> "arm64" + else -> error("Unsupported arch for uts-proxy: ${System.getProperty("os.arch")}") + } + } + + private val archiveName: String get() = "uts-proxy_${VERSION_BARE}_${os}_${arch}.tar.gz" + + private val cacheDir: Path + get() = Path.of(System.getProperty("user.home"), ".cache", "uts-proxy", PROXY_VERSION) + + private val binaryPath: Path get() = cacheDir.resolve("uts-proxy") + + @Volatile private var proxyProcess: Process? = null + private val mutex = Mutex() + private val httpClient: HttpClient = HttpClient.newHttpClient() + + /** + * Ensures the `uts-proxy` process is running on [CONTROL_PORT]. + * + * If the proxy is already healthy (e.g. started by a previous test class in the same run), + * this is a no-op. Otherwise it downloads + verifies the binary and starts the process. + * + * @param timeoutMs Maximum real-time milliseconds to wait for the process to become healthy. + */ + suspend fun ensureProxy(timeoutMs: Int = 15_000): Unit = mutex.withLock { + if (isHealthy()) return + ensureBinary() + proxyProcess = withContext(Dispatchers.IO) { + ProcessBuilder(binaryPath.toString(), "--port", "$CONTROL_PORT") + .redirectErrorStream(true) + .redirectOutput(ProcessBuilder.Redirect.DISCARD) + .start() + } + waitForHealth(timeoutMs.toLong()) + } + + /** + * No-op retained for Dart API compatibility. + * The proxy process is shared for the lifetime of the test suite and exits with the JVM. + */ + fun stopProxy() = Unit + + // ── Internal ────────────────────────────────────────────────────────────── + + internal suspend fun isHealthy(): Boolean = runCatching { + withContext(Dispatchers.IO) { + val req = JHttpRequest.newBuilder() + .uri(URI.create("http://localhost:$CONTROL_PORT/health")) + .GET().build() + httpClient.send(req, JHttpResponse.BodyHandlers.ofString()).statusCode() == 200 + } + }.getOrDefault(false) + + private suspend fun waitForHealth(timeoutMs: Long) { + val deadline = System.currentTimeMillis() + timeoutMs + while (System.currentTimeMillis() < deadline) { + if (isHealthy()) return + delay(200) + } + proxyProcess?.destroyForcibly() + proxyProcess = null + error("uts-proxy did not become healthy within ${timeoutMs}ms") + } + + /** Ensures the binary is present in the cache, downloading and extracting if needed. */ + private suspend fun ensureBinary() = withContext(Dispatchers.IO) { + Files.createDirectories(cacheDir) + // FileLock serialises across multiple Gradle test worker JVMs. + val lockFile = cacheDir.resolve("uts-proxy.lock") + FileChannel.open(lockFile, CREATE, WRITE).use { channel -> + channel.lock().use { + val file = binaryPath.toFile() + if (file.exists() && sha256Hex(file.readBytes()) == CHECKSUMS[archiveName]) { + return@withContext // already cached and valid + } + val archiveBytes = downloadArchive() + verifyChecksum(archiveBytes) + val binary = extractFromTarGz(archiveBytes) + Files.write(binaryPath, binary, CREATE, TRUNCATE_EXISTING) + binaryPath.toFile().setExecutable(true) + } + } + } + + private fun downloadArchive(): ByteArray { + System.err.println("Downloading uts-proxy $PROXY_VERSION ($archiveName)…") + val req = JHttpRequest.newBuilder() + .uri(URI.create("$GITHUB_BASE/$archiveName")) + .GET().build() + val resp = httpClient.send(req, JHttpResponse.BodyHandlers.ofByteArray()) + check(resp.statusCode() == 200) { + "Failed to download uts-proxy from $GITHUB_BASE/$archiveName: HTTP ${resp.statusCode()}" + } + return resp.body() + } + + private fun verifyChecksum(bytes: ByteArray) { + val expected = CHECKSUMS[archiveName] + ?: error("No checksum for $archiveName — unsupported platform/arch") + val actual = sha256Hex(bytes) + check(actual == expected) { + "Checksum mismatch for $archiveName: expected $expected, got $actual" + } + } + + private fun sha256Hex(bytes: ByteArray): String = + MessageDigest.getInstance("SHA-256") + .digest(bytes) + .joinToString("") { "%02x".format(it) } + + /** + * Extracts the `uts-proxy` binary from a `.tar.gz` archive using only JDK stdlib. + * + * TAR format: sequential 512-byte header blocks each followed by file-data blocks + * (padded to a multiple of 512). We parse only the fields we need: + * - offset 0–99 : filename (null-terminated) + * - offset 124–135: file size in octal ASCII + * - offset 156 : entry type ('0'/NUL = regular file, '5' = directory, …) + */ + private fun extractFromTarGz(archiveBytes: ByteArray): ByteArray { + GZIPInputStream(ByteArrayInputStream(archiveBytes)).use { gzip -> + val headerBuf = ByteArray(512) + while (true) { + // Read one header block (exactly 512 bytes) + var totalRead = 0 + while (totalRead < 512) { + val n = gzip.read(headerBuf, totalRead, 512 - totalRead) + if (n < 0) break + totalRead += n + } + // End-of-archive: two consecutive zero-filled 512-byte blocks + if (totalRead < 512 || headerBuf.all { it == 0.toByte() }) break + + // Filename (null-terminated, strip leading ./ or /) + val nameEnd = (0 until 100).firstOrNull { headerBuf[it] == 0.toByte() } ?: 100 + val name = String(headerBuf, 0, nameEnd).trimStart('.', '/') + + // File size (octal ASCII at offset 124, 12 bytes) + val sizeStr = String(headerBuf, 124, 12).trimEnd('').trim() + val size = if (sizeStr.isEmpty()) 0L else sizeStr.toLong(8) + + // Entry type flag at offset 156 + val typeFlag = headerBuf[156].toInt().toChar() + val isRegularFile = typeFlag == '0' || typeFlag == '' + + if (isRegularFile && name == "uts-proxy" && size > 0) { + val content = ByteArray(size.toInt()) + var read = 0 + while (read < size) { + val n = gzip.read(content, read, (size - read).toInt()) + if (n < 0) error("Unexpected end of archive while reading uts-proxy entry") + read += n + } + return content + } + + // Skip this entry's data blocks (size rounded up to 512-byte boundary) + val dataBytes = (size + 511) / 512 * 512 + var skipped = 0L + while (skipped < dataBytes) { + val n = gzip.skip(dataBytes - skipped) + if (n <= 0) break + skipped += n + } + } + } + error("uts-proxy binary not found in archive '$archiveName'") + } +} diff --git a/uts/src/test/kotlin/io/ably/lib/test/helper/ProxySession.kt b/uts/src/test/kotlin/io/ably/lib/test/helper/ProxySession.kt new file mode 100644 index 000000000..49bfd6d8f --- /dev/null +++ b/uts/src/test/kotlin/io/ably/lib/test/helper/ProxySession.kt @@ -0,0 +1,293 @@ +package io.ably.lib.test.helper + +import com.google.gson.Gson +import com.google.gson.JsonObject +import com.google.gson.reflect.TypeToken +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.withContext +import java.net.URI +import java.net.http.HttpClient +import java.net.http.HttpRequest as JHttpRequest +import java.net.http.HttpResponse as JHttpResponse + +// ── Rule type alias ──────────────────────────────────────────────────────────── + +/** + * A proxy rule: a `Map` with at minimum `"match"` and `"action"` keys. + * + * Use the factory helpers ([wsConnectRule], [wsFrameToClientRule], [wsFrameToServerRule], + * [httpRequestRule]) to construct rules without hard-coding map literals everywhere. + * + * Rules are evaluated in order; the first matching rule wins. Unmatched traffic passes through. + * When `"times"` is set the rule auto-removes after that many firings. + */ +typealias ProxyRule = Map + +// ── Rule factory helpers ─────────────────────────────────────────────────────── + +/** + * Builds a rule that matches WebSocket connection attempts. + * + * @param action The action to take (e.g. `mapOf("type" to "refuse_connection")`). + * @param count 1-based occurrence index; `2` matches only the 2nd connection attempt. + * @param queryContains Match only if the WS URL query params contain these key/value pairs. + * Use `"*"` as a wildcard value (matches any non-null value). + * @param times Auto-remove the rule after this many firings. + */ +fun wsConnectRule( + action: Map, + count: Int? = null, + queryContains: Map? = null, + times: Int? = null, +): ProxyRule = buildMap { + put("match", buildMap { + put("type", "ws_connect") + if (count != null) put("count", count) + if (queryContains != null) put("queryContains", queryContains) + }) + put("action", action) + if (times != null) put("times", times) +} + +/** + * Builds a rule that matches WebSocket frames travelling **server → client**. + * + * @param action The action to take (e.g. `mapOf("type" to "suppress")`). + * @param messageAction The Ably protocol message action number to match (see the action table + * in proxy.md; e.g. `4` = CONNECTED, `11` = ATTACHED). + * @param channel If set, additionally match only frames for this channel name. + * @param times Auto-remove the rule after this many firings. + */ +fun wsFrameToClientRule( + action: Map, + messageAction: Int? = null, + channel: String? = null, + times: Int? = null, +): ProxyRule = buildMap { + put("match", buildMap { + put("type", "ws_frame_to_client") + if (messageAction != null) put("action", messageAction) + if (channel != null) put("channel", channel) + }) + put("action", action) + if (times != null) put("times", times) +} + +/** + * Builds a rule that matches WebSocket frames travelling **client → server**. + * + * @param action The action to take. + * @param messageAction The Ably protocol message action number to match + * (e.g. `10` = ATTACH, `17` = AUTH). + * @param channel If set, additionally match only frames for this channel name. + * @param times Auto-remove the rule after this many firings. + */ +fun wsFrameToServerRule( + action: Map, + messageAction: Int? = null, + channel: String? = null, + times: Int? = null, +): ProxyRule = buildMap { + put("match", buildMap { + put("type", "ws_frame_to_server") + if (messageAction != null) put("action", messageAction) + if (channel != null) put("channel", channel) + }) + put("action", action) + if (times != null) put("times", times) +} + +/** + * Builds a rule that matches HTTP requests passing through the proxy. + * + * @param action The action to take (e.g. `mapOf("type" to "http_respond", "status" to 401)`). + * @param pathContains Match only requests whose path contains this substring. + * @param method Match only requests with this HTTP method (e.g. `"GET"`, `"POST"`). + * @param times Auto-remove the rule after this many firings. + */ +fun httpRequestRule( + action: Map, + pathContains: String? = null, + method: String? = null, + times: Int? = null, +): ProxyRule = buildMap { + put("match", buildMap { + put("type", "http_request") + if (pathContains != null) put("pathContains", pathContains) + if (method != null) put("method", method) + }) + put("action", action) + if (times != null) put("times", times) +} + +// ── ProxySession ────────────────────────────────────────────────────────────── + +/** + * A single proxy session wrapping the `uts-proxy` control REST API. + * + * Each test should create one session, run its scenario, and call [close] in a `finally` block. + * + * ```kotlin + * val session = ProxySession.create(rules = listOf( + * wsConnectRule(action = mapOf("type" to "refuse_connection"), count = 2) + * )) + * try { + * val client = TestRealtimeClient { + * key = sandboxKey + * connectThroughProxy(session) + * } + * // … test scenario … + * } finally { + * session.close() + * } + * ``` + * + * All methods are suspend functions dispatched on [Dispatchers.IO]. + * + * > **Note:** [getLog] returns events as `List>`. Gson maps JSON numbers to + * > `Double` when the target type is `Any`, so numeric fields such as `action` or `closeCode` + * > will need an explicit `.toInt()` cast at the call site. + */ +class ProxySession private constructor( + /** Opaque session identifier assigned by the proxy. */ + val sessionId: String, + /** The port on `localhost` that the proxy is listening on for this session. */ + val proxyPort: Int, + /** Always `"localhost"`. Exposed for use by [connectThroughProxy]. */ + val proxyHost: String = "localhost", +) { + + companion object { + private val gson = Gson() + private val httpClient: HttpClient = + HttpClient.newHttpClient() + + /** + * Creates a new proxy session pointing at the Ably sandbox. + * + * @param rules Initial rule set applied to all traffic through this session. + * @param port Specific port to listen on; `0` (default) lets the proxy choose. + * @param timeoutMs Session idle-timeout in ms; `null` uses the proxy default (30 000 ms). + * @param realtimeHost Upstream Ably realtime host (defaults to sandbox). + * @param restHost Upstream Ably REST host (defaults to sandbox). + */ + suspend fun create( + rules: List = emptyList(), + port: Int = 0, + timeoutMs: Long? = null, + realtimeHost: String = ProxyManager.sandboxRealtimeHost, + restHost: String = ProxyManager.sandboxRestHost, + ): ProxySession { + val body = JsonObject().apply { + add("target", JsonObject().apply { + addProperty("realtimeHost", realtimeHost) + addProperty("restHost", restHost) + }) + add("rules", gson.toJsonTree(rules)) + if (port != 0) addProperty("port", port) + if (timeoutMs != null) addProperty("timeoutMs", timeoutMs) + } + + val responseBody = controlPost("/sessions", body.toString(), expectedStatus = 201) + val data = gson.fromJson(responseBody, JsonObject::class.java) + val sessionId = data["sessionId"].asString + val proxyPort = data.getAsJsonObject("proxy")["port"].asInt + + return ProxySession(sessionId = sessionId, proxyPort = proxyPort) + } + + // ── HTTP helpers (shared by companion + instance methods) ────────────── + + internal suspend fun controlPost( + path: String, + body: String, + expectedStatus: Int = 200, + ): String = withContext(Dispatchers.IO) { + val req = JHttpRequest.newBuilder() + .uri(URI.create("http://localhost:${ProxyManager.CONTROL_PORT}$path")) + .header("Content-Type", "application/json") + .POST(JHttpRequest.BodyPublishers.ofString(body)) + .build() + val resp = httpClient.send(req, JHttpResponse.BodyHandlers.ofString()) + check(resp.statusCode() == expectedStatus) { + "Proxy control API returned ${resp.statusCode()} for POST $path: ${resp.body()}" + } + resp.body() + } + + internal suspend fun controlGet(path: String): String = withContext(Dispatchers.IO) { + val req = JHttpRequest.newBuilder() + .uri(URI.create("http://localhost:${ProxyManager.CONTROL_PORT}$path")) + .GET().build() + val resp = httpClient.send(req, JHttpResponse.BodyHandlers.ofString()) + check(resp.statusCode() == 200) { + "Proxy control API returned ${resp.statusCode()} for GET $path: ${resp.body()}" + } + resp.body() + } + + internal suspend fun controlDelete(path: String): Unit = withContext(Dispatchers.IO) { + val req = JHttpRequest.newBuilder() + .uri(URI.create("http://localhost:${ProxyManager.CONTROL_PORT}$path")) + .DELETE().build() + httpClient.send(req, JHttpResponse.BodyHandlers.discarding()) + Unit + } + } + + // ── Session instance API ────────────────────────────────────────────────── + + /** + * Appends or prepends [rules] to this session's active rule list. + * + * @param rules Rules to add. + * @param position `"append"` (default) or `"prepend"`. + */ + suspend fun addRules(rules: List, position: String = "append") { + val body = JsonObject().apply { + add("rules", gson.toJsonTree(rules)) + addProperty("position", position) + } + controlPost("/sessions/$sessionId/rules", body.toString()) + } + + /** + * Triggers an imperative action on the current active WebSocket connection. + * + * Common actions: + * ```kotlin + * session.triggerAction(mapOf("type" to "disconnect")) + * session.triggerAction(mapOf("type" to "close", "closeCode" to 1000)) + * session.triggerAction(mapOf("type" to "inject_to_client", "message" to mapOf("action" to 6))) + * ``` + */ + suspend fun triggerAction(action: Map) { + controlPost("/sessions/$sessionId/actions", gson.toJson(action)) + } + + /** + * Returns the ordered event log recorded by the proxy for this session. + * + * Each event is a `Map` with at least a `"type"` key. Common types: + * `"ws_connect"`, `"ws_frame"`, `"ws_disconnect"`, `"http_request"`, `"http_response"`. + * + * > JSON numbers are deserialized as `Double` by Gson when the target type is `Any`. + * > Use `.toInt()` / `.toLong()` when comparing action numbers or status codes. + */ + suspend fun getLog(): List> { + val body = controlGet("/sessions/$sessionId/log") + val data = gson.fromJson(body, JsonObject::class.java) + val eventsEl = data["events"] ?: return emptyList() + val listType = object : TypeToken>>() {}.type + return gson.fromJson(eventsEl, listType) + } + + /** + * Closes this session and stops its proxy listener. + * Should always be called in a `finally` block after a test completes. + * Cleanup errors are silently ignored. + */ + suspend fun close() { + runCatching { controlDelete("/sessions/$sessionId") } + } +}