From f2e425eca50d55106b7ef1ae9476f9ff13d2c521 Mon Sep 17 00:00:00 2001 From: sahar-fehri Date: Tue, 5 May 2026 12:12:22 +0200 Subject: [PATCH 01/26] chore: add ohlcv websocket --- .../ohlcv/OHLCVService-method-action-types.ts | 33 + .../src/api/ohlcv/OHLCVService.test.ts | 714 ++++++++++++++++++ .../src/api/ohlcv/OHLCVService.ts | 483 ++++++++++++ packages/core-backend/src/api/ohlcv/index.ts | 18 + packages/core-backend/src/api/ohlcv/types.ts | 33 + packages/core-backend/src/index.ts | 25 + 6 files changed, 1306 insertions(+) create mode 100644 packages/core-backend/src/api/ohlcv/OHLCVService-method-action-types.ts create mode 100644 packages/core-backend/src/api/ohlcv/OHLCVService.test.ts create mode 100644 packages/core-backend/src/api/ohlcv/OHLCVService.ts create mode 100644 packages/core-backend/src/api/ohlcv/index.ts create mode 100644 packages/core-backend/src/api/ohlcv/types.ts diff --git a/packages/core-backend/src/api/ohlcv/OHLCVService-method-action-types.ts b/packages/core-backend/src/api/ohlcv/OHLCVService-method-action-types.ts new file mode 100644 index 0000000000..c53628c087 --- /dev/null +++ b/packages/core-backend/src/api/ohlcv/OHLCVService-method-action-types.ts @@ -0,0 +1,33 @@ +/** + * This file is auto generated. + * Do not edit manually. + */ + +import type { OHLCVService } from './OHLCVService'; + +/** + * Subscribe to an OHLCV channel for real-time candlestick data. + * + * @param options - The subscription parameters (assetId, interval, currency). + */ +export type OHLCVServiceSubscribeAction = { + type: `OHLCVService:subscribe`; + handler: OHLCVService['subscribe']; +}; + +/** + * Unsubscribe from an OHLCV channel. + * + * @param options - The subscription parameters to unsubscribe from. + */ +export type OHLCVServiceUnsubscribeAction = { + type: `OHLCVService:unsubscribe`; + handler: OHLCVService['unsubscribe']; +}; + +/** + * Union of all OHLCVService action types. + */ +export type OHLCVServiceMethodActions = + | OHLCVServiceSubscribeAction + | OHLCVServiceUnsubscribeAction; diff --git a/packages/core-backend/src/api/ohlcv/OHLCVService.test.ts b/packages/core-backend/src/api/ohlcv/OHLCVService.test.ts new file mode 100644 index 0000000000..44d9060c0c --- /dev/null +++ b/packages/core-backend/src/api/ohlcv/OHLCVService.test.ts @@ -0,0 +1,714 @@ +import { Messenger, MOCK_ANY_NAMESPACE } from '@metamask/messenger'; +import type { + MessengerActions, + MessengerEvents, + MockAnyNamespace, +} from '@metamask/messenger'; + +import { flushPromises } from '../../../../../tests/helpers'; +import type { ServerNotificationMessage } from '../../BackendWebSocketService'; +import { WebSocketState } from '../../BackendWebSocketService'; +import { OHLCVService } from './OHLCVService'; +import type { OHLCVServiceMessenger } from './OHLCVService'; +import type { OHLCVSubscriptionOptions } from './types'; + +// ============================================================================= +// Test Helpers +// ============================================================================= + +type AllOHLCVServiceActions = MessengerActions; +type AllOHLCVServiceEvents = MessengerEvents; + +type RootMessenger = Messenger< + MockAnyNamespace, + AllOHLCVServiceActions, + AllOHLCVServiceEvents +>; + +const completeAsyncOperations = async (timeoutMs = 0): Promise => { + await flushPromises(); + if (timeoutMs > 0) { + await new Promise((resolve) => setTimeout(resolve, timeoutMs)); + } + await flushPromises(); +}; + +function getRootMessenger(): RootMessenger { + return new Messenger({ namespace: MOCK_ANY_NAMESPACE }); +} + +const getMessenger = (): { + rootMessenger: RootMessenger; + messenger: OHLCVServiceMessenger; + mocks: { + connect: jest.Mock; + subscribe: jest.Mock; + channelHasSubscription: jest.Mock; + getSubscriptionsByChannel: jest.Mock; + findSubscriptionsByChannelPrefix: jest.Mock; + forceReconnection: jest.Mock; + addChannelCallback: jest.Mock; + removeChannelCallback: jest.Mock; + getConnectionInfo: jest.Mock; + }; +} => { + const rootMessenger = getRootMessenger(); + const messenger: OHLCVServiceMessenger = new Messenger< + 'OHLCVService', + AllOHLCVServiceActions, + AllOHLCVServiceEvents, + RootMessenger + >({ + namespace: 'OHLCVService', + parent: rootMessenger, + }); + + rootMessenger.delegate({ + actions: [ + 'BackendWebSocketService:connect', + 'BackendWebSocketService:forceReconnection', + 'BackendWebSocketService:subscribe', + 'BackendWebSocketService:getConnectionInfo', + 'BackendWebSocketService:channelHasSubscription', + 'BackendWebSocketService:getSubscriptionsByChannel', + 'BackendWebSocketService:findSubscriptionsByChannelPrefix', + 'BackendWebSocketService:addChannelCallback', + 'BackendWebSocketService:removeChannelCallback', + ], + events: ['BackendWebSocketService:connectionStateChanged'], + messenger, + }); + + const mockConnect = jest.fn(); + const mockForceReconnection = jest.fn(); + const mockSubscribe = jest.fn(); + const mockChannelHasSubscription = jest.fn().mockReturnValue(false); + const mockGetSubscriptionsByChannel = jest.fn().mockReturnValue([]); + const mockFindSubscriptionsByChannelPrefix = jest + .fn() + .mockReturnValue([]); + const mockAddChannelCallback = jest.fn(); + const mockRemoveChannelCallback = jest.fn(); + const mockGetConnectionInfo = jest.fn(); + + rootMessenger.registerActionHandler( + 'BackendWebSocketService:connect', + mockConnect, + ); + rootMessenger.registerActionHandler( + 'BackendWebSocketService:forceReconnection', + mockForceReconnection, + ); + rootMessenger.registerActionHandler( + 'BackendWebSocketService:subscribe', + mockSubscribe, + ); + rootMessenger.registerActionHandler( + 'BackendWebSocketService:channelHasSubscription', + mockChannelHasSubscription, + ); + rootMessenger.registerActionHandler( + 'BackendWebSocketService:getSubscriptionsByChannel', + mockGetSubscriptionsByChannel, + ); + rootMessenger.registerActionHandler( + 'BackendWebSocketService:findSubscriptionsByChannelPrefix', + mockFindSubscriptionsByChannelPrefix, + ); + rootMessenger.registerActionHandler( + 'BackendWebSocketService:addChannelCallback', + mockAddChannelCallback, + ); + rootMessenger.registerActionHandler( + 'BackendWebSocketService:removeChannelCallback', + mockRemoveChannelCallback, + ); + rootMessenger.registerActionHandler( + 'BackendWebSocketService:getConnectionInfo', + mockGetConnectionInfo, + ); + + return { + rootMessenger, + messenger, + mocks: { + connect: mockConnect, + subscribe: mockSubscribe, + channelHasSubscription: mockChannelHasSubscription, + getSubscriptionsByChannel: mockGetSubscriptionsByChannel, + findSubscriptionsByChannelPrefix: mockFindSubscriptionsByChannelPrefix, + forceReconnection: mockForceReconnection, + addChannelCallback: mockAddChannelCallback, + removeChannelCallback: mockRemoveChannelCallback, + getConnectionInfo: mockGetConnectionInfo, + }, + }; +}; + +type WithServiceCallback = (payload: { + service: OHLCVService; + messenger: OHLCVServiceMessenger; + rootMessenger: RootMessenger; + mocks: ReturnType['mocks']; + destroy: () => void; +}) => Promise | R; + +async function withService(fn: WithServiceCallback): Promise { + const setup = getMessenger(); + const service = new OHLCVService({ messenger: setup.messenger }); + service.init(); + + try { + return await fn({ + service, + messenger: setup.messenger, + rootMessenger: setup.rootMessenger, + mocks: setup.mocks, + destroy: () => service.destroy(), + }); + } finally { + service.destroy(); + } +} + +const getSystemNotificationCallback = (mocks: { + addChannelCallback: jest.Mock; +}): ((notification: ServerNotificationMessage) => void) => { + const call = mocks.addChannelCallback.mock.calls.find( + (c: unknown[]) => + c[0] && + typeof c[0] === 'object' && + 'channelName' in c[0] && + (c[0] as { channelName: string }).channelName === + 'system-notifications.v1.market-data.v1', + ); + + if (!call) { + throw new Error('system notification callback not registered'); + } + + return (call[0] as { callback: (n: ServerNotificationMessage) => void }) + .callback; +}; + +// ============================================================================= +// Shared Constants +// ============================================================================= + +const SUB_OPTS: OHLCVSubscriptionOptions = { + assetId: 'eip155:8453/erc20:0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913', + interval: '1m', + currency: 'usd', +}; + +const EXPECTED_CHANNEL = + 'market-data.v1.eip155:8453/erc20:0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913.1m.usd'; + +// ============================================================================= +// Tests +// ============================================================================= + +describe('OHLCVService', () => { + beforeEach(() => { + jest.useFakeTimers(); + }); + + afterEach(() => { + jest.useRealTimers(); + }); + + // =========================================================================== + // Constructor + // =========================================================================== + + describe('constructor', () => { + it('should register method action handlers and system-notifications callback', async () => { + await withService(async ({ service, mocks }) => { + expect(service).toBeInstanceOf(OHLCVService); + expect(service.name).toBe('OHLCVService'); + + expect(mocks.addChannelCallback).toHaveBeenCalledWith({ + channelName: 'system-notifications.v1.market-data.v1', + callback: expect.any(Function), + }); + }); + }); + }); + + // =========================================================================== + // Subscribe + // =========================================================================== + + describe('subscribe', () => { + it('should connect and create a WebSocket subscription for a new channel', async () => { + await withService(async ({ service, mocks }) => { + await service.subscribe(SUB_OPTS); + + expect(mocks.connect).toHaveBeenCalledTimes(1); + expect(mocks.channelHasSubscription).toHaveBeenCalledWith( + EXPECTED_CHANNEL, + ); + expect(mocks.subscribe).toHaveBeenCalledWith({ + channels: [EXPECTED_CHANNEL], + channelType: 'market-data.v1', + callback: expect.any(Function), + }); + }); + }); + + it('should skip WS subscribe if the channel already has a subscription', async () => { + await withService(async ({ service, mocks }) => { + mocks.channelHasSubscription.mockReturnValue(true); + + await service.subscribe(SUB_OPTS); + + expect(mocks.subscribe).not.toHaveBeenCalled(); + }); + }); + + it('should increment refCount on duplicate subscribe without WS traffic', async () => { + await withService(async ({ service, mocks }) => { + await service.subscribe(SUB_OPTS); + mocks.subscribe.mockClear(); + mocks.connect.mockClear(); + + await service.subscribe(SUB_OPTS); + + expect(mocks.connect).not.toHaveBeenCalled(); + expect(mocks.subscribe).not.toHaveBeenCalled(); + }); + }); + + it('should force reconnection when subscribe fails', async () => { + await withService(async ({ service, mocks, messenger }) => { + mocks.connect.mockRejectedValueOnce(new Error('connection failed')); + + const errorListener = jest.fn(); + messenger.subscribe('OHLCVService:subscriptionError', errorListener); + + await service.subscribe(SUB_OPTS); + + expect(errorListener).toHaveBeenCalledWith({ + channel: EXPECTED_CHANNEL, + error: expect.stringContaining('connection failed'), + operation: 'subscribe', + }); + expect(mocks.forceReconnection).toHaveBeenCalledTimes(1); + }); + }); + + it('should publish barUpdated events when WebSocket delivers data', async () => { + await withService(async ({ service, mocks, messenger }) => { + let capturedCallback: (n: ServerNotificationMessage) => void = jest.fn(); + + mocks.subscribe.mockImplementation((opts) => { + capturedCallback = opts.callback; + return Promise.resolve(); + }); + + await service.subscribe(SUB_OPTS); + + const barListener = jest.fn(); + messenger.subscribe('OHLCVService:barUpdated', barListener); + + capturedCallback({ + event: 'data', + subscriptionId: 'sub-1', + timestamp: 1776364071003, + channel: EXPECTED_CHANNEL, + data: { + timestamp: 1776364020, + open: 74.099, + high: 74.1, + low: 74.083, + close: 74.099, + volume: 5806.43, + }, + } as ServerNotificationMessage); + + expect(barListener).toHaveBeenCalledWith({ + channel: EXPECTED_CHANNEL, + bar: { + timestamp: 1776364020, + open: 74.099, + high: 74.1, + low: 74.083, + close: 74.099, + volume: 5806.43, + }, + }); + }); + }); + }); + + // =========================================================================== + // Unsubscribe + // =========================================================================== + + describe('unsubscribe', () => { + it('should be a no-op if channel was never subscribed', async () => { + await withService(async ({ service, mocks }) => { + await service.unsubscribe(SUB_OPTS); + + expect(mocks.getSubscriptionsByChannel).not.toHaveBeenCalled(); + }); + }); + + it('should decrement refCount without unsubscribing when other consumers remain', async () => { + await withService(async ({ service, mocks }) => { + await service.subscribe(SUB_OPTS); + await service.subscribe(SUB_OPTS); + + await service.unsubscribe(SUB_OPTS); + + // No timer should have been started, no WS unsubscribe + jest.advanceTimersByTime(5000); + await completeAsyncOperations(); + expect(mocks.getSubscriptionsByChannel).not.toHaveBeenCalled(); + }); + }); + + it('should start a grace-period timer and unsubscribe after expiry', async () => { + await withService(async ({ service, mocks }) => { + const mockUnsub = jest.fn(); + mocks.getSubscriptionsByChannel.mockReturnValue([ + { unsubscribe: mockUnsub }, + ]); + + await service.subscribe(SUB_OPTS); + await service.unsubscribe(SUB_OPTS); + + // Before grace period expires — still subscribed + expect(mockUnsub).not.toHaveBeenCalled(); + + jest.advanceTimersByTime(3000); + await completeAsyncOperations(); + + expect(mocks.getSubscriptionsByChannel).toHaveBeenCalledWith( + EXPECTED_CHANNEL, + ); + expect(mockUnsub).toHaveBeenCalledTimes(1); + }); + }); + }); + + // =========================================================================== + // Grace Period — Re-subscribe During Grace + // =========================================================================== + + describe('grace period', () => { + it('should cancel grace-period timer if re-subscribed before expiry', async () => { + await withService(async ({ service, mocks }) => { + const mockUnsub = jest.fn(); + mocks.getSubscriptionsByChannel.mockReturnValue([ + { unsubscribe: mockUnsub }, + ]); + + await service.subscribe(SUB_OPTS); + await service.unsubscribe(SUB_OPTS); + + // Re-subscribe during grace period + jest.advanceTimersByTime(1000); + mocks.subscribe.mockClear(); + mocks.connect.mockClear(); + await service.subscribe(SUB_OPTS); + + // Should NOT have called connect/subscribe again — subscription is still alive + expect(mocks.connect).not.toHaveBeenCalled(); + expect(mocks.subscribe).not.toHaveBeenCalled(); + + // Advance past original grace period — should NOT unsubscribe + jest.advanceTimersByTime(5000); + await completeAsyncOperations(); + expect(mockUnsub).not.toHaveBeenCalled(); + }); + }); + }); + + // =========================================================================== + // Reference Counting + // =========================================================================== + + describe('reference counting', () => { + it('should share a single WS subscription across multiple consumers', async () => { + await withService(async ({ service, mocks }) => { + await service.subscribe(SUB_OPTS); + await service.subscribe(SUB_OPTS); + await service.subscribe(SUB_OPTS); + + // Only one WS subscribe call + expect(mocks.subscribe).toHaveBeenCalledTimes(1); + + // Unsubscribe twice — refCount goes from 3 → 1 + await service.unsubscribe(SUB_OPTS); + await service.unsubscribe(SUB_OPTS); + + jest.advanceTimersByTime(5000); + await completeAsyncOperations(); + + // Still has one consumer — no WS unsubscribe + expect(mocks.getSubscriptionsByChannel).not.toHaveBeenCalled(); + }); + }); + + it('should unsubscribe from WS when all consumers leave and grace expires', async () => { + await withService(async ({ service, mocks }) => { + const mockUnsub = jest.fn(); + mocks.getSubscriptionsByChannel.mockReturnValue([ + { unsubscribe: mockUnsub }, + ]); + + await service.subscribe(SUB_OPTS); + await service.subscribe(SUB_OPTS); + + await service.unsubscribe(SUB_OPTS); + await service.unsubscribe(SUB_OPTS); + + jest.advanceTimersByTime(3000); + await completeAsyncOperations(); + + expect(mockUnsub).toHaveBeenCalledTimes(1); + }); + }); + }); + + // =========================================================================== + // Reconnect Resilience + // =========================================================================== + + describe('reconnect', () => { + it('should resubscribe active channels on WebSocket CONNECTED', async () => { + await withService(async ({ service, mocks, rootMessenger }) => { + await service.subscribe(SUB_OPTS); + mocks.subscribe.mockClear(); + mocks.channelHasSubscription.mockReturnValue(false); + + rootMessenger.publish( + 'BackendWebSocketService:connectionStateChanged', + { + state: WebSocketState.CONNECTED, + connectedAt: Date.now(), + reconnectAttempts: 0, + }, + ); + await completeAsyncOperations(); + + expect(mocks.subscribe).toHaveBeenCalledWith({ + channels: [EXPECTED_CHANNEL], + channelType: 'market-data.v1', + callback: expect.any(Function), + }); + }); + }); + + it('should skip resubscribe if channel already has a subscription after reconnect', async () => { + await withService(async ({ service, mocks, rootMessenger }) => { + await service.subscribe(SUB_OPTS); + mocks.subscribe.mockClear(); + mocks.channelHasSubscription.mockReturnValue(true); + + rootMessenger.publish( + 'BackendWebSocketService:connectionStateChanged', + { + state: WebSocketState.CONNECTED, + connectedAt: Date.now(), + reconnectAttempts: 0, + }, + ); + await completeAsyncOperations(); + + expect(mocks.subscribe).not.toHaveBeenCalled(); + }); + }); + + it('should publish chainStatusChanged down on DISCONNECTED', async () => { + await withService(async ({ mocks, messenger, rootMessenger }) => { + const statusListener = jest.fn(); + messenger.subscribe( + 'OHLCVService:chainStatusChanged', + statusListener, + ); + + // Simulate a system notification marking a chain as up + const systemCallback = getSystemNotificationCallback(mocks); + systemCallback({ + event: 'system-notification', + channel: 'system-notifications.v1.market-data.v1', + data: { chainIds: ['eip155:8453'], status: 'up' }, + timestamp: Date.now(), + } as ServerNotificationMessage); + + statusListener.mockClear(); + + rootMessenger.publish( + 'BackendWebSocketService:connectionStateChanged', + { + state: WebSocketState.DISCONNECTED, + connectedAt: null, + reconnectAttempts: 0, + }, + ); + await completeAsyncOperations(); + + expect(statusListener).toHaveBeenCalledWith( + expect.objectContaining({ + chainIds: ['eip155:8453'], + status: 'down', + }), + ); + }); + }); + }); + + // =========================================================================== + // System Notifications + // =========================================================================== + + describe('system notifications', () => { + it('should forward chain-down notifications via chainStatusChanged event', async () => { + await withService(async ({ mocks, messenger }) => { + const statusListener = jest.fn(); + messenger.subscribe( + 'OHLCVService:chainStatusChanged', + statusListener, + ); + + const systemCallback = getSystemNotificationCallback(mocks); + systemCallback({ + event: 'system-notification', + channel: 'system-notifications.v1.market-data.v1', + data: { chainIds: ['eip155:8453'], status: 'down' }, + timestamp: 1776364071003, + } as ServerNotificationMessage); + + expect(statusListener).toHaveBeenCalledWith({ + chainIds: ['eip155:8453'], + status: 'down', + timestamp: 1776364071003, + }); + }); + }); + + it('should forward chain-up notifications', async () => { + await withService(async ({ mocks, messenger }) => { + const statusListener = jest.fn(); + messenger.subscribe( + 'OHLCVService:chainStatusChanged', + statusListener, + ); + + const systemCallback = getSystemNotificationCallback(mocks); + systemCallback({ + event: 'system-notification', + channel: 'system-notifications.v1.market-data.v1', + data: { chainIds: ['eip155:1', 'eip155:137'], status: 'up' }, + timestamp: 1776364071003, + } as ServerNotificationMessage); + + expect(statusListener).toHaveBeenCalledWith({ + chainIds: ['eip155:1', 'eip155:137'], + status: 'up', + timestamp: 1776364071003, + }); + }); + }); + + it('should throw on invalid system notification data', async () => { + await withService(async ({ mocks }) => { + const systemCallback = getSystemNotificationCallback(mocks); + + expect(() => + systemCallback({ + event: 'system-notification', + channel: 'system-notifications.v1.market-data.v1', + data: { invalid: true }, + timestamp: Date.now(), + } as unknown as ServerNotificationMessage), + ).toThrow('Invalid system notification data'); + }); + }); + }); + + // =========================================================================== + // Error Paths + // =========================================================================== + + describe('error paths', () => { + it('should publish subscriptionError and force reconnection when unsubscribe fails', async () => { + await withService(async ({ service, mocks, messenger }) => { + mocks.getSubscriptionsByChannel.mockImplementation(() => { + throw new Error('ws gone'); + }); + + const errorListener = jest.fn(); + messenger.subscribe('OHLCVService:subscriptionError', errorListener); + + await service.subscribe(SUB_OPTS); + await service.unsubscribe(SUB_OPTS); + + jest.advanceTimersByTime(3000); + await completeAsyncOperations(); + + expect(errorListener).toHaveBeenCalledWith({ + channel: EXPECTED_CHANNEL, + error: expect.stringContaining('ws gone'), + operation: 'unsubscribe', + }); + expect(mocks.forceReconnection).toHaveBeenCalled(); + }); + }); + + it('should log and continue when resubscription fails for a channel', async () => { + await withService(async ({ service, mocks, rootMessenger }) => { + await service.subscribe(SUB_OPTS); + mocks.subscribe.mockClear(); + mocks.channelHasSubscription.mockReturnValue(false); + mocks.subscribe.mockRejectedValueOnce(new Error('resubscribe fail')); + + rootMessenger.publish( + 'BackendWebSocketService:connectionStateChanged', + { + state: WebSocketState.CONNECTED, + connectedAt: Date.now(), + reconnectAttempts: 1, + }, + ); + await completeAsyncOperations(); + + // Should have attempted but failed silently + expect(mocks.subscribe).toHaveBeenCalledTimes(1); + }); + }); + }); + + // =========================================================================== + // Destroy + // =========================================================================== + + describe('destroy', () => { + it('should clear grace-period timers and remove channel callback', async () => { + await withService(async ({ service, mocks }) => { + const mockUnsub = jest.fn(); + mocks.getSubscriptionsByChannel.mockReturnValue([ + { unsubscribe: mockUnsub }, + ]); + + await service.subscribe(SUB_OPTS); + await service.unsubscribe(SUB_OPTS); + + // Grace timer is running — destroy should clear it + service.destroy(); + + jest.advanceTimersByTime(5000); + await completeAsyncOperations(); + + // Timer was cleared so the actual unsubscribe should NOT have fired + expect(mockUnsub).not.toHaveBeenCalled(); + + expect(mocks.removeChannelCallback).toHaveBeenCalledWith( + 'system-notifications.v1.market-data.v1', + ); + }); + }); + }); +}); diff --git a/packages/core-backend/src/api/ohlcv/OHLCVService.ts b/packages/core-backend/src/api/ohlcv/OHLCVService.ts new file mode 100644 index 0000000000..1dcf9753e4 --- /dev/null +++ b/packages/core-backend/src/api/ohlcv/OHLCVService.ts @@ -0,0 +1,483 @@ +/** + * OHLCV Service for real-time candlestick data streaming via WebSocket. + * + * Wraps {@link BackendWebSocketService} through the messenger pattern to + * provide subscribe/unsubscribe semantics for OHLCV market-data channels. + * Includes reference counting, grace-period unsubscribe, idempotency checks, + * chain-status forwarding, and automatic resubscription on reconnect. + */ + +import type { TraceCallback } from '@metamask/controller-utils'; +import type { Messenger } from '@metamask/messenger'; + +import type { + WebSocketConnectionInfo, + BackendWebSocketServiceConnectionStateChangedEvent, + ServerNotificationMessage, +} from '../../BackendWebSocketService'; +import { WebSocketState } from '../../BackendWebSocketService'; +import type { BackendWebSocketServiceMethodActions } from '../../BackendWebSocketService-method-action-types'; +import { projectLogger, createModuleLogger } from '../../logger'; +import type { OHLCVServiceMethodActions } from './OHLCVService-method-action-types'; +import type { OHLCVBar, OHLCVSubscriptionOptions } from './types'; + +// ============================================================================= +// Constants +// ============================================================================= + +const SERVICE_NAME = 'OHLCVService'; + +const log = createModuleLogger(projectLogger, SERVICE_NAME); + +const MESSENGER_EXPOSED_METHODS = ['subscribe', 'unsubscribe'] as const; + +const SUBSCRIPTION_NAMESPACE = 'market-data.v1'; + +const SYSTEM_NOTIFICATIONS_CHANNEL = `system-notifications.v1.${SUBSCRIPTION_NAMESPACE}`; + +/** Delay before actually unsubscribing from a channel after refCount reaches 0. */ +const GRACE_PERIOD_MS = 3_000; + +// ============================================================================= +// Types — Channel Tracking +// ============================================================================= + +type ChannelEntry = { + refCount: number; + gracePeriodTimer?: ReturnType; +}; + +// ============================================================================= +// Types — System Notifications +// ============================================================================= + +/** + * System notification data for chain status updates on market-data channels. + */ +export type OHLCVSystemNotificationData = { + chainIds: string[]; + status: 'down' | 'up'; + timestamp?: number; +}; + +// ============================================================================= +// Types — Service Options +// ============================================================================= + +/** + * Configuration options for the OHLCV service. + */ +export type OHLCVServiceOptions = { + /** Optional callback to trace performance of OHLCV operations (default: no-op) */ + traceFn?: TraceCallback; +}; + +// ============================================================================= +// Action and Event Types +// ============================================================================= + +export type OHLCVServiceActions = OHLCVServiceMethodActions; + +export const OHLCV_SERVICE_ALLOWED_ACTIONS = [ + 'BackendWebSocketService:connect', + 'BackendWebSocketService:forceReconnection', + 'BackendWebSocketService:subscribe', + 'BackendWebSocketService:getConnectionInfo', + 'BackendWebSocketService:channelHasSubscription', + 'BackendWebSocketService:getSubscriptionsByChannel', + 'BackendWebSocketService:findSubscriptionsByChannelPrefix', + 'BackendWebSocketService:addChannelCallback', + 'BackendWebSocketService:removeChannelCallback', +] as const; + +export const OHLCV_SERVICE_ALLOWED_EVENTS = [ + 'BackendWebSocketService:connectionStateChanged', +] as const; + +export type AllowedActions = BackendWebSocketServiceMethodActions; + +// Events published by OHLCVService + +export type OHLCVServiceBarUpdatedEvent = { + type: `OHLCVService:barUpdated`; + payload: [{ channel: string; bar: OHLCVBar }]; +}; + +export type OHLCVServiceChainStatusChangedEvent = { + type: `OHLCVService:chainStatusChanged`; + payload: [{ chainIds: string[]; status: 'up' | 'down'; timestamp?: number }]; +}; + +export type OHLCVServiceSubscriptionErrorEvent = { + type: `OHLCVService:subscriptionError`; + payload: [{ channel: string; error: string; operation: string }]; +}; + +export type OHLCVServiceEvents = + | OHLCVServiceBarUpdatedEvent + | OHLCVServiceChainStatusChangedEvent + | OHLCVServiceSubscriptionErrorEvent; + +export type AllowedEvents = + BackendWebSocketServiceConnectionStateChangedEvent; + +export type OHLCVServiceMessenger = Messenger< + typeof SERVICE_NAME, + OHLCVServiceActions | AllowedActions, + OHLCVServiceEvents | AllowedEvents +>; + +// ============================================================================= +// Main Service Class +// ============================================================================= + +/** + * Service for real-time OHLCV candlestick streaming via the backend WebSocket + * gateway. Communicates with {@link BackendWebSocketService} exclusively + * through the messenger — no direct import of the class. + * + * Features: + * - Reference counting: multiple UI consumers share one WebSocket subscription + * - Grace-period unsubscribe: avoids rapid unsub/resub during navigation + * - Idempotency: duplicate subscribe calls for the same channel are no-ops + * - Reconnect resilience: resubscribes all active channels on reconnect + * - Chain-status forwarding: listens to system-notifications for chain up/down + * + * @example + * ```typescript + * const service = new OHLCVService({ messenger }); + * + * // Subscribe from a UI hook + * await messenger.call('OHLCVService:subscribe', { + * assetId: 'eip155:8453/erc20:0x833...', + * interval: '1m', + * currency: 'usd', + * }); + * + * // Listen for bar updates + * messenger.subscribe('OHLCVService:barUpdated', ({ channel, bar }) => { + * chart.appendBar(bar); + * }); + * + * // Unsubscribe when the view unmounts + * await messenger.call('OHLCVService:unsubscribe', { + * assetId: 'eip155:8453/erc20:0x833...', + * interval: '1m', + * currency: 'usd', + * }); + * ``` + */ +export class OHLCVService { + readonly name = SERVICE_NAME; + + readonly #messenger: OHLCVServiceMessenger; + + readonly #trace: TraceCallback; + + readonly #channels = new Map(); + + readonly #chainsUp = new Set(); + + // ============================================================================= + // Constructor + // ============================================================================= + + constructor( + options: OHLCVServiceOptions & { messenger: OHLCVServiceMessenger }, + ) { + this.#messenger = options.messenger; + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + this.#trace = options.traceFn ?? (((_req: any, fn?: any) => fn?.()) as TraceCallback); + + this.#messenger.registerMethodActionHandlers(this, MESSENGER_EXPOSED_METHODS); + + this.#messenger.subscribe( + 'BackendWebSocketService:connectionStateChanged', + // eslint-disable-next-line @typescript-eslint/no-misused-promises + (connectionInfo: WebSocketConnectionInfo) => + this.#handleWebSocketStateChange(connectionInfo), + ); + } + + /** + * Register the system-notifications channel callback. Must be called after + * construction so that clients are not forced to instantiate services in a + * specific order. + */ + init(): void { + this.#messenger.call('BackendWebSocketService:addChannelCallback', { + channelName: SYSTEM_NOTIFICATIONS_CHANNEL, + callback: (notification: ServerNotificationMessage) => + this.#handleSystemNotification(notification), + }); + } + + // ============================================================================= + // Public — Subscribe / Unsubscribe + // ============================================================================= + + /** + * Subscribe to an OHLCV channel. If this is the first subscriber for the + * given asset/interval/currency combination a WebSocket subscription is + * created. Additional calls for the same combination only bump the reference + * count. + * + * @param options - The subscription parameters. + */ + async subscribe(options: OHLCVSubscriptionOptions): Promise { + const channel = this.#buildChannel(options); + const entry = this.#channels.get(channel); + + if (entry?.gracePeriodTimer) { + clearTimeout(entry.gracePeriodTimer); + entry.gracePeriodTimer = undefined; + entry.refCount += 1; + log('Cancelled grace-period unsubscribe, bumped refCount', { + channel, + refCount: entry.refCount, + }); + return; + } + + if (entry && entry.refCount > 0) { + entry.refCount += 1; + return; + } + + try { + await this.#messenger.call('BackendWebSocketService:connect'); + + if ( + this.#messenger.call( + 'BackendWebSocketService:channelHasSubscription', + channel, + ) + ) { + this.#channels.set(channel, { refCount: 1 }); + return; + } + + await this.#messenger.call('BackendWebSocketService:subscribe', { + channels: [channel], + channelType: SUBSCRIPTION_NAMESPACE, + callback: (notification: ServerNotificationMessage) => { + this.#handleBarUpdate(channel, notification); + }, + }); + + this.#channels.set(channel, { refCount: 1 }); + } catch (error) { + log('Subscription failed, forcing reconnection', { channel, error }); + this.#messenger.publish('OHLCVService:subscriptionError', { + channel, + error: String(error), + operation: 'subscribe', + }); + await this.#forceReconnection(); + } + } + + /** + * Unsubscribe from an OHLCV channel. Decrements the reference count and, + * when it reaches zero, starts a grace-period timer before actually + * unsubscribing from the WebSocket to absorb rapid navigation patterns. + * + * @param options - The subscription parameters to unsubscribe from. + */ + async unsubscribe(options: OHLCVSubscriptionOptions): Promise { + const channel = this.#buildChannel(options); + const entry = this.#channels.get(channel); + + if (!entry || entry.refCount <= 0) { + return; + } + + entry.refCount -= 1; + + if (entry.refCount > 0) { + return; + } + + entry.gracePeriodTimer = setTimeout(() => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + this.#performUnsubscribe(channel); + }, GRACE_PERIOD_MS); + } + + // ============================================================================= + // Private — WebSocket Subscription Helpers + // ============================================================================= + + async #performUnsubscribe(channel: string): Promise { + this.#channels.delete(channel); + + try { + const subscriptions = this.#messenger.call( + 'BackendWebSocketService:getSubscriptionsByChannel', + channel, + ); + + for (const sub of subscriptions) { + await sub.unsubscribe(); + } + } catch (error) { + log('Unsubscription failed, forcing reconnection', { channel, error }); + this.#messenger.publish('OHLCVService:subscriptionError', { + channel, + error: String(error), + operation: 'unsubscribe', + }); + await this.#forceReconnection(); + } + } + + /** + * Resubscribe all channels that were active before a disconnect. + * Called when WebSocket transitions to CONNECTED. + */ + async #resubscribeActiveChannels(): Promise { + for (const [channel, entry] of this.#channels.entries()) { + if (entry.refCount <= 0 && !entry.gracePeriodTimer) { + continue; + } + + try { + if ( + this.#messenger.call( + 'BackendWebSocketService:channelHasSubscription', + channel, + ) + ) { + continue; + } + + await this.#messenger.call('BackendWebSocketService:subscribe', { + channels: [channel], + channelType: SUBSCRIPTION_NAMESPACE, + callback: (notification: ServerNotificationMessage) => { + this.#handleBarUpdate(channel, notification); + }, + }); + } catch (error) { + log('Resubscription failed for channel', { channel, error }); + } + } + } + + // ============================================================================= + // Private — Message Handlers + // ============================================================================= + + #handleBarUpdate( + channel: string, + notification: ServerNotificationMessage, + ): void { + const bar = notification.data as OHLCVBar; + + // eslint-disable-next-line @typescript-eslint/no-floating-promises + this.#trace( + { + name: `${SERVICE_NAME} Bar Update`, + data: { channel, timestamp: bar.timestamp }, + tags: { service: SERVICE_NAME }, + }, + () => { + this.#messenger.publish('OHLCVService:barUpdated', { channel, bar }); + }, + ); + } + + #handleSystemNotification(notification: ServerNotificationMessage): void { + const data = notification.data as OHLCVSystemNotificationData; + const { timestamp } = notification; + + if (!data.chainIds || !Array.isArray(data.chainIds) || !data.status) { + throw new Error( + 'Invalid system notification data: missing chainIds or status', + ); + } + + if (data.status === 'up') { + for (const chainId of data.chainIds) { + this.#chainsUp.add(chainId); + } + } else { + for (const chainId of data.chainIds) { + this.#chainsUp.delete(chainId); + } + } + + this.#messenger.publish('OHLCVService:chainStatusChanged', { + chainIds: data.chainIds, + status: data.status, + timestamp, + }); + + log(`Chain status change: ${data.status}`, { + chains: data.chainIds, + status: data.status, + }); + } + + async #handleWebSocketStateChange( + connectionInfo: WebSocketConnectionInfo, + ): Promise { + const { state } = connectionInfo; + + if (state === WebSocketState.CONNECTED) { + await this.#resubscribeActiveChannels(); + } else if (state === WebSocketState.DISCONNECTED) { + const chainsToMarkDown = Array.from(this.#chainsUp); + + if (chainsToMarkDown.length > 0) { + this.#messenger.publish('OHLCVService:chainStatusChanged', { + chainIds: chainsToMarkDown, + status: 'down', + timestamp: Date.now(), + }); + + log('WebSocket disconnection - marked tracked chains as down', { + count: chainsToMarkDown.length, + chains: chainsToMarkDown, + }); + + this.#chainsUp.clear(); + } + } + } + + // ============================================================================= + // Private — Utility + // ============================================================================= + + #buildChannel(options: OHLCVSubscriptionOptions): string { + return `${SUBSCRIPTION_NAMESPACE}.${options.assetId}.${options.interval}.${options.currency}`; + } + + async #forceReconnection(): Promise { + log('Forcing WebSocket reconnection'); + await this.#messenger.call('BackendWebSocketService:forceReconnection'); + } + + // ============================================================================= + // Public — Cleanup + // ============================================================================= + + /** + * Destroy the service and clean up all resources. + */ + destroy(): void { + for (const entry of this.#channels.values()) { + if (entry.gracePeriodTimer) { + clearTimeout(entry.gracePeriodTimer); + } + } + this.#channels.clear(); + + this.#messenger.call( + 'BackendWebSocketService:removeChannelCallback', + SYSTEM_NOTIFICATIONS_CHANNEL, + ); + } +} diff --git a/packages/core-backend/src/api/ohlcv/index.ts b/packages/core-backend/src/api/ohlcv/index.ts new file mode 100644 index 0000000000..e40d24d8c3 --- /dev/null +++ b/packages/core-backend/src/api/ohlcv/index.ts @@ -0,0 +1,18 @@ +export { OHLCVService } from './OHLCVService'; +export { + OHLCV_SERVICE_ALLOWED_ACTIONS, + OHLCV_SERVICE_ALLOWED_EVENTS, +} from './OHLCVService'; +export type { + OHLCVSystemNotificationData, + OHLCVServiceOptions, + OHLCVServiceActions, + AllowedActions as OHLCVServiceAllowedActions, + OHLCVServiceBarUpdatedEvent, + OHLCVServiceChainStatusChangedEvent, + OHLCVServiceSubscriptionErrorEvent, + OHLCVServiceEvents, + AllowedEvents as OHLCVServiceAllowedEvents, + OHLCVServiceMessenger, +} from './OHLCVService'; +export type { OHLCVBar, OHLCVSubscriptionOptions } from './types'; diff --git a/packages/core-backend/src/api/ohlcv/types.ts b/packages/core-backend/src/api/ohlcv/types.ts new file mode 100644 index 0000000000..b8e64caf33 --- /dev/null +++ b/packages/core-backend/src/api/ohlcv/types.ts @@ -0,0 +1,33 @@ +/** + * OHLCV WebSocket streaming types for real-time candlestick data. + */ + +/** + * A single OHLCV candlestick bar received from the market-data WebSocket stream. + */ +export type OHLCVBar = { + /** Unix timestamp (seconds) of the candle open */ + timestamp: number; + /** Opening price */ + open: number; + /** Highest price during the candle period */ + high: number; + /** Lowest price during the candle period */ + low: number; + /** Closing price (latest) */ + close: number; + /** Trading volume during the candle period */ + volume: number; +}; + +/** + * Options for subscribing to an OHLCV channel. + */ +export type OHLCVSubscriptionOptions = { + /** CAIP-19 asset identifier, e.g. "eip155:8453/erc20:0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913" */ + assetId: string; + /** Candle interval, e.g. "1m", "5m", "15m", "1h", "4h", "1d" */ + interval: string; + /** Fiat currency code, e.g. "usd", "eur" */ + currency: string; +}; diff --git a/packages/core-backend/src/index.ts b/packages/core-backend/src/index.ts index 8245157118..a25a3c3cd0 100644 --- a/packages/core-backend/src/index.ts +++ b/packages/core-backend/src/index.ts @@ -80,6 +80,31 @@ export type { ApiPlatformClientServiceMessenger, } from './ApiPlatformClientService'; +// ============================================================================ +// OHLCV SERVICE +// ============================================================================ + +export { + OHLCVService, + OHLCV_SERVICE_ALLOWED_ACTIONS, + OHLCV_SERVICE_ALLOWED_EVENTS, +} from './api/ohlcv'; + +export type { + OHLCVBar, + OHLCVSubscriptionOptions, + OHLCVSystemNotificationData, + OHLCVServiceOptions, + OHLCVServiceActions, + OHLCVServiceAllowedActions, + OHLCVServiceBarUpdatedEvent, + OHLCVServiceChainStatusChangedEvent, + OHLCVServiceSubscriptionErrorEvent, + OHLCVServiceEvents, + OHLCVServiceAllowedEvents, + OHLCVServiceMessenger, +} from './api/ohlcv'; + // ============================================================================ // API PLATFORM CLIENT // ============================================================================ From 42bfec7f6a1b5786f42e195c59ac945aabac63ea Mon Sep 17 00:00:00 2001 From: sahar-fehri Date: Tue, 5 May 2026 14:46:55 +0200 Subject: [PATCH 02/26] chore: lint --- .../src/api/ohlcv/OHLCVService.ts | 44 ++++++------------- 1 file changed, 14 insertions(+), 30 deletions(-) diff --git a/packages/core-backend/src/api/ohlcv/OHLCVService.ts b/packages/core-backend/src/api/ohlcv/OHLCVService.ts index 1dcf9753e4..6def835ecf 100644 --- a/packages/core-backend/src/api/ohlcv/OHLCVService.ts +++ b/packages/core-backend/src/api/ohlcv/OHLCVService.ts @@ -7,7 +7,11 @@ * chain-status forwarding, and automatic resubscription on reconnect. */ -import type { TraceCallback } from '@metamask/controller-utils'; +import type { + TraceCallback, + TraceContext, + TraceRequest, +} from '@metamask/controller-utils'; import type { Messenger } from '@metamask/messenger'; import type { @@ -143,29 +147,6 @@ export type OHLCVServiceMessenger = Messenger< * - Reconnect resilience: resubscribes all active channels on reconnect * - Chain-status forwarding: listens to system-notifications for chain up/down * - * @example - * ```typescript - * const service = new OHLCVService({ messenger }); - * - * // Subscribe from a UI hook - * await messenger.call('OHLCVService:subscribe', { - * assetId: 'eip155:8453/erc20:0x833...', - * interval: '1m', - * currency: 'usd', - * }); - * - * // Listen for bar updates - * messenger.subscribe('OHLCVService:barUpdated', ({ channel, bar }) => { - * chart.appendBar(bar); - * }); - * - * // Unsubscribe when the view unmounts - * await messenger.call('OHLCVService:unsubscribe', { - * assetId: 'eip155:8453/erc20:0x833...', - * interval: '1m', - * currency: 'usd', - * }); - * ``` */ export class OHLCVService { readonly name = SERVICE_NAME; @@ -187,10 +168,15 @@ export class OHLCVService { ) { this.#messenger = options.messenger; - // eslint-disable-next-line @typescript-eslint/no-explicit-any - this.#trace = options.traceFn ?? (((_req: any, fn?: any) => fn?.()) as TraceCallback); + this.#trace = + options.traceFn ?? + (((_request: TraceRequest, fn?: (context?: TraceContext) => Result) => + fn?.()) as TraceCallback); - this.#messenger.registerMethodActionHandlers(this, MESSENGER_EXPOSED_METHODS); + this.#messenger.registerMethodActionHandlers( + this, + MESSENGER_EXPOSED_METHODS, + ); this.#messenger.subscribe( 'BackendWebSocketService:connectionStateChanged', @@ -201,9 +187,7 @@ export class OHLCVService { } /** - * Register the system-notifications channel callback. Must be called after - * construction so that clients are not forced to instantiate services in a - * specific order. + * Register the system-notifications channel callback. */ init(): void { this.#messenger.call('BackendWebSocketService:addChannelCallback', { From ab85cc57f420367a4268a25b64218fe13f88ad21 Mon Sep 17 00:00:00 2001 From: sahar-fehri Date: Tue, 5 May 2026 14:53:53 +0200 Subject: [PATCH 03/26] chore: lint --- .../src/api/ohlcv/OHLCVService.test.ts | 22 +++++-------------- .../src/api/ohlcv/OHLCVService.ts | 9 ++++---- 2 files changed, 11 insertions(+), 20 deletions(-) diff --git a/packages/core-backend/src/api/ohlcv/OHLCVService.test.ts b/packages/core-backend/src/api/ohlcv/OHLCVService.test.ts index 44d9060c0c..4533adb74b 100644 --- a/packages/core-backend/src/api/ohlcv/OHLCVService.test.ts +++ b/packages/core-backend/src/api/ohlcv/OHLCVService.test.ts @@ -84,9 +84,7 @@ const getMessenger = (): { const mockSubscribe = jest.fn(); const mockChannelHasSubscription = jest.fn().mockReturnValue(false); const mockGetSubscriptionsByChannel = jest.fn().mockReturnValue([]); - const mockFindSubscriptionsByChannelPrefix = jest - .fn() - .mockReturnValue([]); + const mockFindSubscriptionsByChannelPrefix = jest.fn().mockReturnValue([]); const mockAddChannelCallback = jest.fn(); const mockRemoveChannelCallback = jest.fn(); const mockGetConnectionInfo = jest.fn(); @@ -299,7 +297,8 @@ describe('OHLCVService', () => { it('should publish barUpdated events when WebSocket delivers data', async () => { await withService(async ({ service, mocks, messenger }) => { - let capturedCallback: (n: ServerNotificationMessage) => void = jest.fn(); + let capturedCallback: (n: ServerNotificationMessage) => void = + jest.fn(); mocks.subscribe.mockImplementation((opts) => { capturedCallback = opts.callback; @@ -524,10 +523,7 @@ describe('OHLCVService', () => { it('should publish chainStatusChanged down on DISCONNECTED', async () => { await withService(async ({ mocks, messenger, rootMessenger }) => { const statusListener = jest.fn(); - messenger.subscribe( - 'OHLCVService:chainStatusChanged', - statusListener, - ); + messenger.subscribe('OHLCVService:chainStatusChanged', statusListener); // Simulate a system notification marking a chain as up const systemCallback = getSystemNotificationCallback(mocks); @@ -568,10 +564,7 @@ describe('OHLCVService', () => { it('should forward chain-down notifications via chainStatusChanged event', async () => { await withService(async ({ mocks, messenger }) => { const statusListener = jest.fn(); - messenger.subscribe( - 'OHLCVService:chainStatusChanged', - statusListener, - ); + messenger.subscribe('OHLCVService:chainStatusChanged', statusListener); const systemCallback = getSystemNotificationCallback(mocks); systemCallback({ @@ -592,10 +585,7 @@ describe('OHLCVService', () => { it('should forward chain-up notifications', async () => { await withService(async ({ mocks, messenger }) => { const statusListener = jest.fn(); - messenger.subscribe( - 'OHLCVService:chainStatusChanged', - statusListener, - ); + messenger.subscribe('OHLCVService:chainStatusChanged', statusListener); const systemCallback = getSystemNotificationCallback(mocks); systemCallback({ diff --git a/packages/core-backend/src/api/ohlcv/OHLCVService.ts b/packages/core-backend/src/api/ohlcv/OHLCVService.ts index 6def835ecf..b3cf3c351c 100644 --- a/packages/core-backend/src/api/ohlcv/OHLCVService.ts +++ b/packages/core-backend/src/api/ohlcv/OHLCVService.ts @@ -122,8 +122,7 @@ export type OHLCVServiceEvents = | OHLCVServiceChainStatusChangedEvent | OHLCVServiceSubscriptionErrorEvent; -export type AllowedEvents = - BackendWebSocketServiceConnectionStateChangedEvent; +export type AllowedEvents = BackendWebSocketServiceConnectionStateChangedEvent; export type OHLCVServiceMessenger = Messenger< typeof SERVICE_NAME, @@ -170,8 +169,10 @@ export class OHLCVService { this.#trace = options.traceFn ?? - (((_request: TraceRequest, fn?: (context?: TraceContext) => Result) => - fn?.()) as TraceCallback); + ((( + _request: TraceRequest, + fn?: (context?: TraceContext) => Result, + ) => fn?.()) as TraceCallback); this.#messenger.registerMethodActionHandlers( this, From bd0d4d2e92c9705865486073942c5d45a2c8b980 Mon Sep 17 00:00:00 2001 From: sahar-fehri Date: Tue, 5 May 2026 15:03:59 +0200 Subject: [PATCH 04/26] chore: changelog --- packages/core-backend/CHANGELOG.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/packages/core-backend/CHANGELOG.md b/packages/core-backend/CHANGELOG.md index 9e2241fcbc..4c7d2e4a3c 100644 --- a/packages/core-backend/CHANGELOG.md +++ b/packages/core-backend/CHANGELOG.md @@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Add `OHLCVService` for real-time OHLCV (candlestick) data streaming via WebSocket ([#8695](https://github.com/MetaMask/core/pull/8695)) + - Wraps `BackendWebSocketService` through the messenger pattern to provide subscribe/unsubscribe semantics for market-data OHLCV channels + - Includes reference counting, grace-period unsubscribe, idempotency checks, chain-status forwarding, and automatic resubscription on reconnect +- Export new types `OHLCVBar`, `OHLCVSubscriptionOptions`, `OHLCVSystemNotificationData`, `OHLCVServiceOptions`, `OHLCVServiceActions`, `OHLCVServiceAllowedActions`, `OHLCVServiceBarUpdatedEvent`, `OHLCVServiceChainStatusChangedEvent`, `OHLCVServiceSubscriptionErrorEvent`, `OHLCVServiceEvents`, `OHLCVServiceAllowedEvents`, and `OHLCVServiceMessenger` ([#8695](https://github.com/MetaMask/core/pull/8695)) +- Export new constants `OHLCV_SERVICE_ALLOWED_ACTIONS` and `OHLCV_SERVICE_ALLOWED_EVENTS` for configuring the messenger ([#8695](https://github.com/MetaMask/core/pull/8695)) + ### Changed - Bump `@metamask/accounts-controller` from `^37.1.0` to `^38.0.0` ([#8325](https://github.com/MetaMask/core/pull/8325), [#8363](https://github.com/MetaMask/core/pull/8363), [#8665](https://github.com/MetaMask/core/pull/8665)) From 4a8a3cf80541d386f38046589159d6e82b888e9a Mon Sep 17 00:00:00 2001 From: sahar-fehri Date: Mon, 11 May 2026 10:39:36 +0200 Subject: [PATCH 05/26] fix: re-organize code --- packages/core-backend/src/index.ts | 12 ++++++------ .../AccountActivityService-method-action-types.ts | 0 .../src/{ => ws}/AccountActivityService.test.ts | 6 +++--- .../src/{ => ws}/AccountActivityService.ts | 4 ++-- .../BackendWebSocketService-method-action-types.ts | 0 .../src/{ => ws}/BackendWebSocketService.test.ts | 2 +- .../src/{ => ws}/BackendWebSocketService.ts | 2 +- .../ohlcv/OHLCVService-method-action-types.ts | 0 .../src/{api => ws}/ohlcv/OHLCVService.test.ts | 4 ++-- .../src/{api => ws}/ohlcv/OHLCVService.ts | 6 +++--- packages/core-backend/src/{api => ws}/ohlcv/index.ts | 0 packages/core-backend/src/{api => ws}/ohlcv/types.ts | 0 12 files changed, 18 insertions(+), 18 deletions(-) rename packages/core-backend/src/{ => ws}/AccountActivityService-method-action-types.ts (100%) rename packages/core-backend/src/{ => ws}/AccountActivityService.test.ts (99%) rename packages/core-backend/src/{ => ws}/AccountActivityService.ts (99%) rename packages/core-backend/src/{ => ws}/BackendWebSocketService-method-action-types.ts (100%) rename packages/core-backend/src/{ => ws}/BackendWebSocketService.test.ts (99%) rename packages/core-backend/src/{ => ws}/BackendWebSocketService.ts (99%) rename packages/core-backend/src/{api => ws}/ohlcv/OHLCVService-method-action-types.ts (100%) rename packages/core-backend/src/{api => ws}/ohlcv/OHLCVService.test.ts (99%) rename packages/core-backend/src/{api => ws}/ohlcv/OHLCVService.ts (98%) rename packages/core-backend/src/{api => ws}/ohlcv/index.ts (100%) rename packages/core-backend/src/{api => ws}/ohlcv/types.ts (100%) diff --git a/packages/core-backend/src/index.ts b/packages/core-backend/src/index.ts index a25a3c3cd0..6197c068f8 100644 --- a/packages/core-backend/src/index.ts +++ b/packages/core-backend/src/index.ts @@ -9,7 +9,7 @@ export { getCloseReason, WebSocketState, WebSocketEventType, -} from './BackendWebSocketService'; +} from './ws/BackendWebSocketService'; export type { BackendWebSocketServiceOptions, @@ -24,7 +24,7 @@ export type { BackendWebSocketServiceConnectionStateChangedEvent, BackendWebSocketServiceEvents, BackendWebSocketServiceMessenger, -} from './BackendWebSocketService'; +} from './ws/BackendWebSocketService'; // ============================================================================ // ACCOUNT ACTIVITY SERVICE @@ -34,7 +34,7 @@ export { AccountActivityService, ACCOUNT_ACTIVITY_SERVICE_ALLOWED_ACTIONS, ACCOUNT_ACTIVITY_SERVICE_ALLOWED_EVENTS, -} from './AccountActivityService'; +} from './ws/AccountActivityService'; export type { SystemNotificationData, @@ -49,7 +49,7 @@ export type { AccountActivityServiceEvents, AllowedEvents as AccountActivityServiceAllowedEvents, AccountActivityServiceMessenger, -} from './AccountActivityService'; +} from './ws/AccountActivityService'; // ============================================================================ // SHARED TYPES @@ -88,7 +88,7 @@ export { OHLCVService, OHLCV_SERVICE_ALLOWED_ACTIONS, OHLCV_SERVICE_ALLOWED_EVENTS, -} from './api/ohlcv'; +} from './ws/ohlcv'; export type { OHLCVBar, @@ -103,7 +103,7 @@ export type { OHLCVServiceEvents, OHLCVServiceAllowedEvents, OHLCVServiceMessenger, -} from './api/ohlcv'; +} from './ws/ohlcv'; // ============================================================================ // API PLATFORM CLIENT diff --git a/packages/core-backend/src/AccountActivityService-method-action-types.ts b/packages/core-backend/src/ws/AccountActivityService-method-action-types.ts similarity index 100% rename from packages/core-backend/src/AccountActivityService-method-action-types.ts rename to packages/core-backend/src/ws/AccountActivityService-method-action-types.ts diff --git a/packages/core-backend/src/AccountActivityService.test.ts b/packages/core-backend/src/ws/AccountActivityService.test.ts similarity index 99% rename from packages/core-backend/src/AccountActivityService.test.ts rename to packages/core-backend/src/ws/AccountActivityService.test.ts index 0dd8630ee1..06525a107c 100644 --- a/packages/core-backend/src/AccountActivityService.test.ts +++ b/packages/core-backend/src/ws/AccountActivityService.test.ts @@ -7,7 +7,7 @@ import type { } from '@metamask/messenger'; import type { Hex } from '@metamask/utils'; -import { flushPromises } from '../../../tests/helpers'; +import { flushPromises } from '../../../../tests/helpers'; import { AccountActivityService } from './AccountActivityService'; import type { AccountActivityServiceMessenger, @@ -15,8 +15,8 @@ import type { } from './AccountActivityService'; import type { ServerNotificationMessage } from './BackendWebSocketService'; import { WebSocketState } from './BackendWebSocketService'; -import type { Transaction, BalanceUpdate } from './types'; -import type { AccountActivityMessage } from './types'; +import type { Transaction, BalanceUpdate } from '../types'; +import type { AccountActivityMessage } from '../types'; type AllAccountActivityServiceActions = MessengerActions; diff --git a/packages/core-backend/src/AccountActivityService.ts b/packages/core-backend/src/ws/AccountActivityService.ts similarity index 99% rename from packages/core-backend/src/AccountActivityService.ts rename to packages/core-backend/src/ws/AccountActivityService.ts index a8b61a427e..b0c8d47731 100644 --- a/packages/core-backend/src/AccountActivityService.ts +++ b/packages/core-backend/src/ws/AccountActivityService.ts @@ -21,12 +21,12 @@ import type { } from './BackendWebSocketService'; import { WebSocketState } from './BackendWebSocketService'; import type { BackendWebSocketServiceMethodActions } from './BackendWebSocketService-method-action-types'; -import { projectLogger, createModuleLogger } from './logger'; +import { projectLogger, createModuleLogger } from '../logger'; import type { Transaction, AccountActivityMessage, BalanceUpdate, -} from './types'; +} from '../types'; // ============================================================================= // Types and Constants diff --git a/packages/core-backend/src/BackendWebSocketService-method-action-types.ts b/packages/core-backend/src/ws/BackendWebSocketService-method-action-types.ts similarity index 100% rename from packages/core-backend/src/BackendWebSocketService-method-action-types.ts rename to packages/core-backend/src/ws/BackendWebSocketService-method-action-types.ts diff --git a/packages/core-backend/src/BackendWebSocketService.test.ts b/packages/core-backend/src/ws/BackendWebSocketService.test.ts similarity index 99% rename from packages/core-backend/src/BackendWebSocketService.test.ts rename to packages/core-backend/src/ws/BackendWebSocketService.test.ts index cf113e1ba7..9c17e430c2 100644 --- a/packages/core-backend/src/BackendWebSocketService.test.ts +++ b/packages/core-backend/src/ws/BackendWebSocketService.test.ts @@ -5,7 +5,7 @@ import type { MockAnyNamespace, } from '@metamask/messenger'; -import { flushPromises } from '../../../tests/helpers'; +import { flushPromises } from '../../../../tests/helpers'; import { BackendWebSocketService, getCloseReason, diff --git a/packages/core-backend/src/BackendWebSocketService.ts b/packages/core-backend/src/ws/BackendWebSocketService.ts similarity index 99% rename from packages/core-backend/src/BackendWebSocketService.ts rename to packages/core-backend/src/ws/BackendWebSocketService.ts index d8ca9139d5..5b1fad8d3c 100644 --- a/packages/core-backend/src/BackendWebSocketService.ts +++ b/packages/core-backend/src/ws/BackendWebSocketService.ts @@ -10,7 +10,7 @@ import { getErrorMessage } from '@metamask/utils'; import { v4 as uuidV4 } from 'uuid'; import type { BackendWebSocketServiceMethodActions } from './BackendWebSocketService-method-action-types'; -import { projectLogger, createModuleLogger } from './logger'; +import { projectLogger, createModuleLogger } from '../logger'; const SERVICE_NAME = 'BackendWebSocketService' as const; diff --git a/packages/core-backend/src/api/ohlcv/OHLCVService-method-action-types.ts b/packages/core-backend/src/ws/ohlcv/OHLCVService-method-action-types.ts similarity index 100% rename from packages/core-backend/src/api/ohlcv/OHLCVService-method-action-types.ts rename to packages/core-backend/src/ws/ohlcv/OHLCVService-method-action-types.ts diff --git a/packages/core-backend/src/api/ohlcv/OHLCVService.test.ts b/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts similarity index 99% rename from packages/core-backend/src/api/ohlcv/OHLCVService.test.ts rename to packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts index 4533adb74b..13f923a958 100644 --- a/packages/core-backend/src/api/ohlcv/OHLCVService.test.ts +++ b/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts @@ -6,8 +6,8 @@ import type { } from '@metamask/messenger'; import { flushPromises } from '../../../../../tests/helpers'; -import type { ServerNotificationMessage } from '../../BackendWebSocketService'; -import { WebSocketState } from '../../BackendWebSocketService'; +import type { ServerNotificationMessage } from '../BackendWebSocketService'; +import { WebSocketState } from '../BackendWebSocketService'; import { OHLCVService } from './OHLCVService'; import type { OHLCVServiceMessenger } from './OHLCVService'; import type { OHLCVSubscriptionOptions } from './types'; diff --git a/packages/core-backend/src/api/ohlcv/OHLCVService.ts b/packages/core-backend/src/ws/ohlcv/OHLCVService.ts similarity index 98% rename from packages/core-backend/src/api/ohlcv/OHLCVService.ts rename to packages/core-backend/src/ws/ohlcv/OHLCVService.ts index b3cf3c351c..c3f9215f52 100644 --- a/packages/core-backend/src/api/ohlcv/OHLCVService.ts +++ b/packages/core-backend/src/ws/ohlcv/OHLCVService.ts @@ -18,9 +18,9 @@ import type { WebSocketConnectionInfo, BackendWebSocketServiceConnectionStateChangedEvent, ServerNotificationMessage, -} from '../../BackendWebSocketService'; -import { WebSocketState } from '../../BackendWebSocketService'; -import type { BackendWebSocketServiceMethodActions } from '../../BackendWebSocketService-method-action-types'; +} from '../BackendWebSocketService'; +import { WebSocketState } from '../BackendWebSocketService'; +import type { BackendWebSocketServiceMethodActions } from '../BackendWebSocketService-method-action-types'; import { projectLogger, createModuleLogger } from '../../logger'; import type { OHLCVServiceMethodActions } from './OHLCVService-method-action-types'; import type { OHLCVBar, OHLCVSubscriptionOptions } from './types'; diff --git a/packages/core-backend/src/api/ohlcv/index.ts b/packages/core-backend/src/ws/ohlcv/index.ts similarity index 100% rename from packages/core-backend/src/api/ohlcv/index.ts rename to packages/core-backend/src/ws/ohlcv/index.ts diff --git a/packages/core-backend/src/api/ohlcv/types.ts b/packages/core-backend/src/ws/ohlcv/types.ts similarity index 100% rename from packages/core-backend/src/api/ohlcv/types.ts rename to packages/core-backend/src/ws/ohlcv/types.ts From 01237ea13ef123a842556fa47e2a13ce64fa70e8 Mon Sep 17 00:00:00 2001 From: sahar-fehri Date: Mon, 11 May 2026 10:43:30 +0200 Subject: [PATCH 06/26] fix: lint --- .../src/ws/AccountActivityService.test.ts | 4 ++-- .../core-backend/src/ws/AccountActivityService.ts | 12 ++++++------ .../core-backend/src/ws/BackendWebSocketService.ts | 2 +- packages/core-backend/src/ws/ohlcv/OHLCVService.ts | 2 +- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/packages/core-backend/src/ws/AccountActivityService.test.ts b/packages/core-backend/src/ws/AccountActivityService.test.ts index 06525a107c..ac5ea12dbc 100644 --- a/packages/core-backend/src/ws/AccountActivityService.test.ts +++ b/packages/core-backend/src/ws/AccountActivityService.test.ts @@ -8,6 +8,8 @@ import type { import type { Hex } from '@metamask/utils'; import { flushPromises } from '../../../../tests/helpers'; +import type { Transaction, BalanceUpdate } from '../types'; +import type { AccountActivityMessage } from '../types'; import { AccountActivityService } from './AccountActivityService'; import type { AccountActivityServiceMessenger, @@ -15,8 +17,6 @@ import type { } from './AccountActivityService'; import type { ServerNotificationMessage } from './BackendWebSocketService'; import { WebSocketState } from './BackendWebSocketService'; -import type { Transaction, BalanceUpdate } from '../types'; -import type { AccountActivityMessage } from '../types'; type AllAccountActivityServiceActions = MessengerActions; diff --git a/packages/core-backend/src/ws/AccountActivityService.ts b/packages/core-backend/src/ws/AccountActivityService.ts index b0c8d47731..88b97f1729 100644 --- a/packages/core-backend/src/ws/AccountActivityService.ts +++ b/packages/core-backend/src/ws/AccountActivityService.ts @@ -13,6 +13,12 @@ import type { TraceCallback } from '@metamask/controller-utils'; import type { InternalAccount } from '@metamask/keyring-internal-api'; import type { Messenger } from '@metamask/messenger'; +import { projectLogger, createModuleLogger } from '../logger'; +import type { + Transaction, + AccountActivityMessage, + BalanceUpdate, +} from '../types'; import type { AccountActivityServiceMethodActions } from './AccountActivityService-method-action-types'; import type { WebSocketConnectionInfo, @@ -21,12 +27,6 @@ import type { } from './BackendWebSocketService'; import { WebSocketState } from './BackendWebSocketService'; import type { BackendWebSocketServiceMethodActions } from './BackendWebSocketService-method-action-types'; -import { projectLogger, createModuleLogger } from '../logger'; -import type { - Transaction, - AccountActivityMessage, - BalanceUpdate, -} from '../types'; // ============================================================================= // Types and Constants diff --git a/packages/core-backend/src/ws/BackendWebSocketService.ts b/packages/core-backend/src/ws/BackendWebSocketService.ts index 5b1fad8d3c..c71075c372 100644 --- a/packages/core-backend/src/ws/BackendWebSocketService.ts +++ b/packages/core-backend/src/ws/BackendWebSocketService.ts @@ -9,8 +9,8 @@ import type { AuthenticationController } from '@metamask/profile-sync-controller import { getErrorMessage } from '@metamask/utils'; import { v4 as uuidV4 } from 'uuid'; -import type { BackendWebSocketServiceMethodActions } from './BackendWebSocketService-method-action-types'; import { projectLogger, createModuleLogger } from '../logger'; +import type { BackendWebSocketServiceMethodActions } from './BackendWebSocketService-method-action-types'; const SERVICE_NAME = 'BackendWebSocketService' as const; diff --git a/packages/core-backend/src/ws/ohlcv/OHLCVService.ts b/packages/core-backend/src/ws/ohlcv/OHLCVService.ts index c3f9215f52..ac232122bc 100644 --- a/packages/core-backend/src/ws/ohlcv/OHLCVService.ts +++ b/packages/core-backend/src/ws/ohlcv/OHLCVService.ts @@ -14,6 +14,7 @@ import type { } from '@metamask/controller-utils'; import type { Messenger } from '@metamask/messenger'; +import { projectLogger, createModuleLogger } from '../../logger'; import type { WebSocketConnectionInfo, BackendWebSocketServiceConnectionStateChangedEvent, @@ -21,7 +22,6 @@ import type { } from '../BackendWebSocketService'; import { WebSocketState } from '../BackendWebSocketService'; import type { BackendWebSocketServiceMethodActions } from '../BackendWebSocketService-method-action-types'; -import { projectLogger, createModuleLogger } from '../../logger'; import type { OHLCVServiceMethodActions } from './OHLCVService-method-action-types'; import type { OHLCVBar, OHLCVSubscriptionOptions } from './types'; From 832414322dc6b5ff6837802d3254cd7c6fd11d5e Mon Sep 17 00:00:00 2001 From: sahar-fehri Date: Mon, 11 May 2026 11:00:55 +0200 Subject: [PATCH 07/26] fix: fix messenger actions --- .../src/ws/ohlcv/OHLCVService-method-action-types.ts | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/packages/core-backend/src/ws/ohlcv/OHLCVService-method-action-types.ts b/packages/core-backend/src/ws/ohlcv/OHLCVService-method-action-types.ts index c53628c087..fc052889d8 100644 --- a/packages/core-backend/src/ws/ohlcv/OHLCVService-method-action-types.ts +++ b/packages/core-backend/src/ws/ohlcv/OHLCVService-method-action-types.ts @@ -6,9 +6,12 @@ import type { OHLCVService } from './OHLCVService'; /** - * Subscribe to an OHLCV channel for real-time candlestick data. + * Subscribe to an OHLCV channel. If this is the first subscriber for the + * given asset/interval/currency combination a WebSocket subscription is + * created. Additional calls for the same combination only bump the reference + * count. * - * @param options - The subscription parameters (assetId, interval, currency). + * @param options - The subscription parameters. */ export type OHLCVServiceSubscribeAction = { type: `OHLCVService:subscribe`; @@ -16,7 +19,9 @@ export type OHLCVServiceSubscribeAction = { }; /** - * Unsubscribe from an OHLCV channel. + * Unsubscribe from an OHLCV channel. Decrements the reference count and, + * when it reaches zero, starts a grace-period timer before actually + * unsubscribing from the WebSocket to absorb rapid navigation patterns. * * @param options - The subscription parameters to unsubscribe from. */ From 9f2e1dec73b3b8b227e88a48c60b69bc3c38c80b Mon Sep 17 00:00:00 2001 From: sahar-fehri Date: Mon, 11 May 2026 11:44:10 +0200 Subject: [PATCH 08/26] fix: lint --- eslint-suppressions.json | 31 +++++++++++++++++++++---------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/eslint-suppressions.json b/eslint-suppressions.json index 7c61892367..e2b571a412 100644 --- a/eslint-suppressions.json +++ b/eslint-suppressions.json @@ -862,34 +862,45 @@ "count": 1 } }, - "packages/core-backend/src/AccountActivityService.test.ts": { + "packages/core-backend/src/api/shared-types.ts": { "no-restricted-syntax": { - "count": 2 + "count": 1 } }, - "packages/core-backend/src/AccountActivityService.ts": { + "packages/core-backend/src/index.ts": { "no-restricted-syntax": { - "count": 1 + "count": 4 } }, - "packages/core-backend/src/BackendWebSocketService.test.ts": { + "packages/core-backend/src/ws/AccountActivityService.test.ts": { "no-restricted-syntax": { - "count": 1 + "count": 2 } }, - "packages/core-backend/src/BackendWebSocketService.ts": { + "packages/core-backend/src/ws/AccountActivityService.ts": { "no-restricted-syntax": { - "count": 5 + "count": 1 } }, - "packages/core-backend/src/api/shared-types.ts": { + "packages/core-backend/src/ws/BackendWebSocketService.test.ts": { "no-restricted-syntax": { "count": 1 } }, - "packages/core-backend/src/index.ts": { + "packages/core-backend/src/ws/BackendWebSocketService.ts": { "no-restricted-syntax": { + "count": 5 + } + }, + "packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts": { + "@typescript-eslint/naming-convention": { "count": 2 + }, + "id-length": { + "count": 1 + }, + "no-restricted-syntax": { + "count": 1 } }, "packages/delegation-controller/src/DelegationController.test.ts": { From 181ae75e10bad90168583ca429d1bd21365ba606 Mon Sep 17 00:00:00 2001 From: sahar-fehri Date: Mon, 11 May 2026 11:51:38 +0200 Subject: [PATCH 09/26] fix: unit tests --- .../src/ws/ohlcv/OHLCVService.test.ts | 73 ++++++++++++++++++- 1 file changed, 72 insertions(+), 1 deletion(-) diff --git a/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts b/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts index 13f923a958..29432e1cf7 100644 --- a/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts +++ b/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts @@ -202,6 +202,14 @@ const SUB_OPTS: OHLCVSubscriptionOptions = { const EXPECTED_CHANNEL = 'market-data.v1.eip155:8453/erc20:0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913.1m.usd'; +const BASE_CONNECTION_INFO = { + url: 'ws://test', + timeout: 10000, + reconnectDelay: 500, + maxReconnectDelay: 5000, + requestTimeout: 30000, +}; + // ============================================================================= // Tests // ============================================================================= @@ -485,6 +493,7 @@ describe('OHLCVService', () => { rootMessenger.publish( 'BackendWebSocketService:connectionStateChanged', { + ...BASE_CONNECTION_INFO, state: WebSocketState.CONNECTED, connectedAt: Date.now(), reconnectAttempts: 0, @@ -509,6 +518,33 @@ describe('OHLCVService', () => { rootMessenger.publish( 'BackendWebSocketService:connectionStateChanged', { + ...BASE_CONNECTION_INFO, + state: WebSocketState.CONNECTED, + connectedAt: Date.now(), + reconnectAttempts: 0, + }, + ); + await completeAsyncOperations(); + + expect(mocks.subscribe).not.toHaveBeenCalled(); + }); + }); + + it('should skip channels with refCount 0 and no grace period during resubscribe', async () => { + await withService(async ({ service, mocks, rootMessenger }) => { + await service.subscribe(SUB_OPTS); + await service.unsubscribe(SUB_OPTS); + + jest.advanceTimersByTime(3_000); + await completeAsyncOperations(); + + mocks.subscribe.mockClear(); + mocks.channelHasSubscription.mockReturnValue(false); + + rootMessenger.publish( + 'BackendWebSocketService:connectionStateChanged', + { + ...BASE_CONNECTION_INFO, state: WebSocketState.CONNECTED, connectedAt: Date.now(), reconnectAttempts: 0, @@ -520,6 +556,39 @@ describe('OHLCVService', () => { }); }); + it('should deliver bar updates via resubscribed channel callback', async () => { + await withService(async ({ service, mocks, messenger, rootMessenger }) => { + await service.subscribe(SUB_OPTS); + mocks.subscribe.mockClear(); + mocks.channelHasSubscription.mockReturnValue(false); + + rootMessenger.publish( + 'BackendWebSocketService:connectionStateChanged', + { + ...BASE_CONNECTION_INFO, + state: WebSocketState.CONNECTED, + connectedAt: Date.now(), + reconnectAttempts: 0, + }, + ); + await completeAsyncOperations(); + + const resubscribeCallback = mocks.subscribe.mock.calls[0][0].callback; + const barListener = jest.fn(); + messenger.subscribe('OHLCVService:barUpdated', barListener); + + resubscribeCallback({ + data: { timestamp: 100, open: 1, high: 2, low: 0.5, close: 1.5, volume: 999 }, + timestamp: Date.now(), + }); + + expect(barListener).toHaveBeenCalledWith({ + channel: EXPECTED_CHANNEL, + bar: { timestamp: 100, open: 1, high: 2, low: 0.5, close: 1.5, volume: 999 }, + }); + }); + }); + it('should publish chainStatusChanged down on DISCONNECTED', async () => { await withService(async ({ mocks, messenger, rootMessenger }) => { const statusListener = jest.fn(); @@ -539,8 +608,9 @@ describe('OHLCVService', () => { rootMessenger.publish( 'BackendWebSocketService:connectionStateChanged', { + ...BASE_CONNECTION_INFO, state: WebSocketState.DISCONNECTED, - connectedAt: null, + connectedAt: undefined, reconnectAttempts: 0, }, ); @@ -658,6 +728,7 @@ describe('OHLCVService', () => { rootMessenger.publish( 'BackendWebSocketService:connectionStateChanged', { + ...BASE_CONNECTION_INFO, state: WebSocketState.CONNECTED, connectedAt: Date.now(), reconnectAttempts: 1, From 5b1b3c4e21f3c3ff3e6358a4e4d0704067b24f5b Mon Sep 17 00:00:00 2001 From: sahar-fehri Date: Mon, 11 May 2026 12:15:18 +0200 Subject: [PATCH 10/26] fix: lint --- .../src/ws/ohlcv/OHLCVService.test.ts | 76 +++++++++++-------- 1 file changed, 46 insertions(+), 30 deletions(-) diff --git a/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts b/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts index 29432e1cf7..f92217dfe5 100644 --- a/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts +++ b/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts @@ -557,36 +557,52 @@ describe('OHLCVService', () => { }); it('should deliver bar updates via resubscribed channel callback', async () => { - await withService(async ({ service, mocks, messenger, rootMessenger }) => { - await service.subscribe(SUB_OPTS); - mocks.subscribe.mockClear(); - mocks.channelHasSubscription.mockReturnValue(false); - - rootMessenger.publish( - 'BackendWebSocketService:connectionStateChanged', - { - ...BASE_CONNECTION_INFO, - state: WebSocketState.CONNECTED, - connectedAt: Date.now(), - reconnectAttempts: 0, - }, - ); - await completeAsyncOperations(); - - const resubscribeCallback = mocks.subscribe.mock.calls[0][0].callback; - const barListener = jest.fn(); - messenger.subscribe('OHLCVService:barUpdated', barListener); - - resubscribeCallback({ - data: { timestamp: 100, open: 1, high: 2, low: 0.5, close: 1.5, volume: 999 }, - timestamp: Date.now(), - }); - - expect(barListener).toHaveBeenCalledWith({ - channel: EXPECTED_CHANNEL, - bar: { timestamp: 100, open: 1, high: 2, low: 0.5, close: 1.5, volume: 999 }, - }); - }); + await withService( + async ({ service, mocks, messenger, rootMessenger }) => { + await service.subscribe(SUB_OPTS); + mocks.subscribe.mockClear(); + mocks.channelHasSubscription.mockReturnValue(false); + + rootMessenger.publish( + 'BackendWebSocketService:connectionStateChanged', + { + ...BASE_CONNECTION_INFO, + state: WebSocketState.CONNECTED, + connectedAt: Date.now(), + reconnectAttempts: 0, + }, + ); + await completeAsyncOperations(); + + const resubscribeCallback = mocks.subscribe.mock.calls[0][0].callback; + const barListener = jest.fn(); + messenger.subscribe('OHLCVService:barUpdated', barListener); + + resubscribeCallback({ + data: { + timestamp: 100, + open: 1, + high: 2, + low: 0.5, + close: 1.5, + volume: 999, + }, + timestamp: Date.now(), + }); + + expect(barListener).toHaveBeenCalledWith({ + channel: EXPECTED_CHANNEL, + bar: { + timestamp: 100, + open: 1, + high: 2, + low: 0.5, + close: 1.5, + volume: 999, + }, + }); + }, + ); }); it('should publish chainStatusChanged down on DISCONNECTED', async () => { From 1f601d6f4017fd6be8efb6ece8dde4cdd00b223c Mon Sep 17 00:00:00 2001 From: sahar-fehri Date: Mon, 11 May 2026 13:41:30 +0200 Subject: [PATCH 11/26] fix: cleanup --- .../src/ws/ohlcv/OHLCVService.test.ts | 26 ------------------- .../core-backend/src/ws/ohlcv/OHLCVService.ts | 6 +---- 2 files changed, 1 insertion(+), 31 deletions(-) diff --git a/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts b/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts index f92217dfe5..48f9afc0a4 100644 --- a/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts +++ b/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts @@ -530,32 +530,6 @@ describe('OHLCVService', () => { }); }); - it('should skip channels with refCount 0 and no grace period during resubscribe', async () => { - await withService(async ({ service, mocks, rootMessenger }) => { - await service.subscribe(SUB_OPTS); - await service.unsubscribe(SUB_OPTS); - - jest.advanceTimersByTime(3_000); - await completeAsyncOperations(); - - mocks.subscribe.mockClear(); - mocks.channelHasSubscription.mockReturnValue(false); - - rootMessenger.publish( - 'BackendWebSocketService:connectionStateChanged', - { - ...BASE_CONNECTION_INFO, - state: WebSocketState.CONNECTED, - connectedAt: Date.now(), - reconnectAttempts: 0, - }, - ); - await completeAsyncOperations(); - - expect(mocks.subscribe).not.toHaveBeenCalled(); - }); - }); - it('should deliver bar updates via resubscribed channel callback', async () => { await withService( async ({ service, mocks, messenger, rootMessenger }) => { diff --git a/packages/core-backend/src/ws/ohlcv/OHLCVService.ts b/packages/core-backend/src/ws/ohlcv/OHLCVService.ts index ac232122bc..2fe66ad959 100644 --- a/packages/core-backend/src/ws/ohlcv/OHLCVService.ts +++ b/packages/core-backend/src/ws/ohlcv/OHLCVService.ts @@ -322,11 +322,7 @@ export class OHLCVService { * Called when WebSocket transitions to CONNECTED. */ async #resubscribeActiveChannels(): Promise { - for (const [channel, entry] of this.#channels.entries()) { - if (entry.refCount <= 0 && !entry.gracePeriodTimer) { - continue; - } - + for (const [channel] of this.#channels.entries()) { try { if ( this.#messenger.call( From 510d33d5160e5f00d63be604f0dcf3513ae29b8f Mon Sep 17 00:00:00 2001 From: sahar-fehri Date: Mon, 11 May 2026 14:14:55 +0200 Subject: [PATCH 12/26] fix: add debug logs --- .../core-backend/src/ws/ohlcv/OHLCVService.ts | 51 ++++++++++++++++--- 1 file changed, 44 insertions(+), 7 deletions(-) diff --git a/packages/core-backend/src/ws/ohlcv/OHLCVService.ts b/packages/core-backend/src/ws/ohlcv/OHLCVService.ts index 2fe66ad959..0e166bb597 100644 --- a/packages/core-backend/src/ws/ohlcv/OHLCVService.ts +++ b/packages/core-backend/src/ws/ohlcv/OHLCVService.ts @@ -191,6 +191,7 @@ export class OHLCVService { * Register the system-notifications channel callback. */ init(): void { + log('OHLCV-WS: Initializing — registering system-notifications callback'); this.#messenger.call('BackendWebSocketService:addChannelCallback', { channelName: SYSTEM_NOTIFICATIONS_CHANNEL, callback: (notification: ServerNotificationMessage) => @@ -212,13 +213,14 @@ export class OHLCVService { */ async subscribe(options: OHLCVSubscriptionOptions): Promise { const channel = this.#buildChannel(options); + log('OHLCV-WS: Subscribe requested', { channel }); const entry = this.#channels.get(channel); if (entry?.gracePeriodTimer) { clearTimeout(entry.gracePeriodTimer); entry.gracePeriodTimer = undefined; entry.refCount += 1; - log('Cancelled grace-period unsubscribe, bumped refCount', { + log('OHLCV-WS: Cancelled grace-period unsubscribe, bumped refCount', { channel, refCount: entry.refCount, }); @@ -227,6 +229,10 @@ export class OHLCVService { if (entry && entry.refCount > 0) { entry.refCount += 1; + log('OHLCV-WS: Incremented refCount for existing subscription', { + channel, + refCount: entry.refCount, + }); return; } @@ -239,6 +245,9 @@ export class OHLCVService { channel, ) ) { + log('OHLCV-WS: Channel already has WS subscription (idempotency), skipping', { + channel, + }); this.#channels.set(channel, { refCount: 1 }); return; } @@ -252,8 +261,9 @@ export class OHLCVService { }); this.#channels.set(channel, { refCount: 1 }); + log('OHLCV-WS: Subscribe succeeded — new WS subscription created', { channel }); } catch (error) { - log('Subscription failed, forcing reconnection', { channel, error }); + log('OHLCV-WS: Subscription failed, forcing reconnection', { channel, error }); this.#messenger.publish('OHLCVService:subscriptionError', { channel, error: String(error), @@ -272,18 +282,25 @@ export class OHLCVService { */ async unsubscribe(options: OHLCVSubscriptionOptions): Promise { const channel = this.#buildChannel(options); + log('OHLCV-WS: Unsubscribe requested', { channel }); const entry = this.#channels.get(channel); if (!entry || entry.refCount <= 0) { + log('OHLCV-WS: Unsubscribe no-op — channel not tracked or refCount 0', { channel }); return; } entry.refCount -= 1; if (entry.refCount > 0) { + log('OHLCV-WS: Decremented refCount, still has consumers', { + channel, + refCount: entry.refCount, + }); return; } + log('OHLCV-WS: refCount reached 0, starting grace period', { channel }); entry.gracePeriodTimer = setTimeout(() => { // eslint-disable-next-line @typescript-eslint/no-floating-promises this.#performUnsubscribe(channel); @@ -295,6 +312,7 @@ export class OHLCVService { // ============================================================================= async #performUnsubscribe(channel: string): Promise { + log('OHLCV-WS: Grace period expired — performing actual WS unsubscribe', { channel }); this.#channels.delete(channel); try { @@ -306,8 +324,9 @@ export class OHLCVService { for (const sub of subscriptions) { await sub.unsubscribe(); } + log('OHLCV-WS: WS unsubscribe completed', { channel }); } catch (error) { - log('Unsubscription failed, forcing reconnection', { channel, error }); + log('OHLCV-WS: Unsubscription failed, forcing reconnection', { channel, error }); this.#messenger.publish('OHLCVService:subscriptionError', { channel, error: String(error), @@ -322,6 +341,11 @@ export class OHLCVService { * Called when WebSocket transitions to CONNECTED. */ async #resubscribeActiveChannels(): Promise { + const channelCount = this.#channels.size; + log('OHLCV-WS: Resubscribing active channels after reconnect', { + count: channelCount, + }); + for (const [channel] of this.#channels.entries()) { try { if ( @@ -330,6 +354,9 @@ export class OHLCVService { channel, ) ) { + log('OHLCV-WS: Channel already subscribed on server, skipping resubscribe', { + channel, + }); continue; } @@ -340,8 +367,9 @@ export class OHLCVService { this.#handleBarUpdate(channel, notification); }, }); + log('OHLCV-WS: Resubscription succeeded', { channel }); } catch (error) { - log('Resubscription failed for channel', { channel, error }); + log('OHLCV-WS: Resubscription failed for channel', { channel, error }); } } } @@ -355,6 +383,11 @@ export class OHLCVService { notification: ServerNotificationMessage, ): void { const bar = notification.data as OHLCVBar; + log('OHLCV-WS: Bar update received', { + channel, + close: bar.close, + timestamp: bar.timestamp, + }); // eslint-disable-next-line @typescript-eslint/no-floating-promises this.#trace( @@ -395,7 +428,7 @@ export class OHLCVService { timestamp, }); - log(`Chain status change: ${data.status}`, { + log(`OHLCV-WS: Chain status change: ${data.status}`, { chains: data.chainIds, status: data.status, }); @@ -405,6 +438,7 @@ export class OHLCVService { connectionInfo: WebSocketConnectionInfo, ): Promise { const { state } = connectionInfo; + log('OHLCV-WS: WebSocket state changed', { state }); if (state === WebSocketState.CONNECTED) { await this.#resubscribeActiveChannels(); @@ -418,7 +452,7 @@ export class OHLCVService { timestamp: Date.now(), }); - log('WebSocket disconnection - marked tracked chains as down', { + log('OHLCV-WS: WebSocket disconnection — marked tracked chains as down', { count: chainsToMarkDown.length, chains: chainsToMarkDown, }); @@ -437,7 +471,7 @@ export class OHLCVService { } async #forceReconnection(): Promise { - log('Forcing WebSocket reconnection'); + log('OHLCV-WS: Forcing WebSocket reconnection'); await this.#messenger.call('BackendWebSocketService:forceReconnection'); } @@ -449,6 +483,9 @@ export class OHLCVService { * Destroy the service and clean up all resources. */ destroy(): void { + log('OHLCV-WS: Destroying — clearing all channels and timers', { + channelCount: this.#channels.size, + }); for (const entry of this.#channels.values()) { if (entry.gracePeriodTimer) { clearTimeout(entry.gracePeriodTimer); From c064480f97c5c189c076073fbec4846d849d2cea Mon Sep 17 00:00:00 2001 From: sahar-fehri Date: Tue, 12 May 2026 00:06:59 +0200 Subject: [PATCH 13/26] fix: add grace period flush logic and log cleanup --- .../src/ws/ohlcv/OHLCVService.test.ts | 107 ++++++++++++++++++ .../core-backend/src/ws/ohlcv/OHLCVService.ts | 82 ++++++++------ 2 files changed, 154 insertions(+), 35 deletions(-) diff --git a/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts b/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts index 48f9afc0a4..ae516b44ef 100644 --- a/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts +++ b/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts @@ -430,6 +430,113 @@ describe('OHLCVService', () => { expect(mockUnsub).not.toHaveBeenCalled(); }); }); + + it('should flush other grace-period channels when subscribing to a new channel', async () => { + const otherOpts: OHLCVSubscriptionOptions = { + assetId: + 'eip155:8453/erc20:0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913', + interval: '1h', + currency: 'usd', + }; + const otherChannel = + 'market-data.v1.eip155:8453/erc20:0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913.1h.usd'; + + await withService(async ({ service, mocks }) => { + const mockUnsub = jest.fn(); + mocks.getSubscriptionsByChannel.mockReturnValue([ + { unsubscribe: mockUnsub }, + ]); + + // Subscribe to first channel (1m), then unsubscribe to put it in grace period + await service.subscribe(SUB_OPTS); + await service.unsubscribe(SUB_OPTS); + + // Grace period is running for the 1m channel + expect(mockUnsub).not.toHaveBeenCalled(); + + // Subscribe to a different channel (1h) — should flush the 1m grace period + mocks.subscribe.mockClear(); + await service.subscribe(otherOpts); + + // The old channel (1m) should have been immediately unsubscribed + expect(mocks.getSubscriptionsByChannel).toHaveBeenCalledWith( + EXPECTED_CHANNEL, + ); + expect(mockUnsub).toHaveBeenCalledTimes(1); + + // The new channel (1h) should have been subscribed + expect(mocks.subscribe).toHaveBeenCalledWith({ + channels: [otherChannel], + channelType: 'market-data.v1', + callback: expect.any(Function), + }); + }); + }); + + it('should not flush same-channel grace period (reuse instead)', async () => { + await withService(async ({ service, mocks }) => { + const mockUnsub = jest.fn(); + mocks.getSubscriptionsByChannel.mockReturnValue([ + { unsubscribe: mockUnsub }, + ]); + + await service.subscribe(SUB_OPTS); + await service.unsubscribe(SUB_OPTS); + + // Re-subscribe to the SAME channel — should cancel grace, not flush + mocks.subscribe.mockClear(); + mocks.connect.mockClear(); + await service.subscribe(SUB_OPTS); + + // Should NOT have unsubscribed (grace was cancelled, not flushed) + expect(mockUnsub).not.toHaveBeenCalled(); + // Should NOT have created a new WS subscription (reused existing) + expect(mocks.subscribe).not.toHaveBeenCalled(); + }); + }); + + it('should handle rapid time-range switching without accumulating subscriptions', async () => { + const opts15m = SUB_OPTS; // 1m interval + const opts1h: OHLCVSubscriptionOptions = { + ...SUB_OPTS, + interval: '1h', + }; + const opts1d: OHLCVSubscriptionOptions = { + ...SUB_OPTS, + interval: '1d', + }; + + await withService(async ({ service, mocks }) => { + const mockUnsub = jest.fn(); + mocks.getSubscriptionsByChannel.mockReturnValue([ + { unsubscribe: mockUnsub }, + ]); + + // Subscribe 1m → unsubscribe → subscribe 1h → unsubscribe → subscribe 1d + await service.subscribe(opts15m); + await service.unsubscribe(opts15m); + // 1m is now in grace period + + await service.subscribe(opts1h); + // 1m should have been flushed + expect(mockUnsub).toHaveBeenCalledTimes(1); + + await service.unsubscribe(opts1h); + // 1h is now in grace period + + mockUnsub.mockClear(); + await service.subscribe(opts1d); + // 1h should have been flushed + expect(mockUnsub).toHaveBeenCalledTimes(1); + + // After all this, only 1d should be active — no accumulation + // Advance timers well past any grace period — no additional unsubscribes + mockUnsub.mockClear(); + jest.advanceTimersByTime(5000); + await completeAsyncOperations(); + expect(mockUnsub).not.toHaveBeenCalled(); + }); + }); }); // =========================================================================== diff --git a/packages/core-backend/src/ws/ohlcv/OHLCVService.ts b/packages/core-backend/src/ws/ohlcv/OHLCVService.ts index 0e166bb597..c0c32c7755 100644 --- a/packages/core-backend/src/ws/ohlcv/OHLCVService.ts +++ b/packages/core-backend/src/ws/ohlcv/OHLCVService.ts @@ -213,7 +213,6 @@ export class OHLCVService { */ async subscribe(options: OHLCVSubscriptionOptions): Promise { const channel = this.#buildChannel(options); - log('OHLCV-WS: Subscribe requested', { channel }); const entry = this.#channels.get(channel); if (entry?.gracePeriodTimer) { @@ -229,13 +228,23 @@ export class OHLCVService { if (entry && entry.refCount > 0) { entry.refCount += 1; - log('OHLCV-WS: Incremented refCount for existing subscription', { - channel, - refCount: entry.refCount, - }); return; } + // Flush other channels sitting in grace period to free server-side slots + // and prevent accumulation when switching time ranges rapidly. + for (const [otherChannel, otherEntry] of this.#channels.entries()) { + if (otherChannel !== channel && otherEntry.gracePeriodTimer) { + clearTimeout(otherEntry.gracePeriodTimer); + otherEntry.gracePeriodTimer = undefined; + log('OHLCV-WS: Flushing grace-period channel before new subscribe', { + flushedChannel: otherChannel, + newChannel: channel, + }); + await this.#performUnsubscribe(otherChannel); + } + } + try { await this.#messenger.call('BackendWebSocketService:connect'); @@ -245,9 +254,12 @@ export class OHLCVService { channel, ) ) { - log('OHLCV-WS: Channel already has WS subscription (idempotency), skipping', { - channel, - }); + log( + 'OHLCV-WS: Channel already has WS subscription (idempotency), skipping', + { + channel, + }, + ); this.#channels.set(channel, { refCount: 1 }); return; } @@ -261,9 +273,14 @@ export class OHLCVService { }); this.#channels.set(channel, { refCount: 1 }); - log('OHLCV-WS: Subscribe succeeded — new WS subscription created', { channel }); + log('OHLCV-WS: Subscribe succeeded — new WS subscription created', { + channel, + }); } catch (error) { - log('OHLCV-WS: Subscription failed, forcing reconnection', { channel, error }); + log('OHLCV-WS: Subscription failed, forcing reconnection', { + channel, + error, + }); this.#messenger.publish('OHLCVService:subscriptionError', { channel, error: String(error), @@ -282,25 +299,18 @@ export class OHLCVService { */ async unsubscribe(options: OHLCVSubscriptionOptions): Promise { const channel = this.#buildChannel(options); - log('OHLCV-WS: Unsubscribe requested', { channel }); const entry = this.#channels.get(channel); if (!entry || entry.refCount <= 0) { - log('OHLCV-WS: Unsubscribe no-op — channel not tracked or refCount 0', { channel }); return; } entry.refCount -= 1; if (entry.refCount > 0) { - log('OHLCV-WS: Decremented refCount, still has consumers', { - channel, - refCount: entry.refCount, - }); return; } - log('OHLCV-WS: refCount reached 0, starting grace period', { channel }); entry.gracePeriodTimer = setTimeout(() => { // eslint-disable-next-line @typescript-eslint/no-floating-promises this.#performUnsubscribe(channel); @@ -312,7 +322,9 @@ export class OHLCVService { // ============================================================================= async #performUnsubscribe(channel: string): Promise { - log('OHLCV-WS: Grace period expired — performing actual WS unsubscribe', { channel }); + log('OHLCV-WS: Grace period expired — performing actual WS unsubscribe', { + channel, + }); this.#channels.delete(channel); try { @@ -326,7 +338,10 @@ export class OHLCVService { } log('OHLCV-WS: WS unsubscribe completed', { channel }); } catch (error) { - log('OHLCV-WS: Unsubscription failed, forcing reconnection', { channel, error }); + log('OHLCV-WS: Unsubscription failed, forcing reconnection', { + channel, + error, + }); this.#messenger.publish('OHLCVService:subscriptionError', { channel, error: String(error), @@ -354,9 +369,12 @@ export class OHLCVService { channel, ) ) { - log('OHLCV-WS: Channel already subscribed on server, skipping resubscribe', { - channel, - }); + log( + 'OHLCV-WS: Channel already subscribed on server, skipping resubscribe', + { + channel, + }, + ); continue; } @@ -383,11 +401,6 @@ export class OHLCVService { notification: ServerNotificationMessage, ): void { const bar = notification.data as OHLCVBar; - log('OHLCV-WS: Bar update received', { - channel, - close: bar.close, - timestamp: bar.timestamp, - }); // eslint-disable-next-line @typescript-eslint/no-floating-promises this.#trace( @@ -438,7 +451,6 @@ export class OHLCVService { connectionInfo: WebSocketConnectionInfo, ): Promise { const { state } = connectionInfo; - log('OHLCV-WS: WebSocket state changed', { state }); if (state === WebSocketState.CONNECTED) { await this.#resubscribeActiveChannels(); @@ -452,10 +464,13 @@ export class OHLCVService { timestamp: Date.now(), }); - log('OHLCV-WS: WebSocket disconnection — marked tracked chains as down', { - count: chainsToMarkDown.length, - chains: chainsToMarkDown, - }); + log( + 'OHLCV-WS: WebSocket disconnection — marked tracked chains as down', + { + count: chainsToMarkDown.length, + chains: chainsToMarkDown, + }, + ); this.#chainsUp.clear(); } @@ -483,9 +498,6 @@ export class OHLCVService { * Destroy the service and clean up all resources. */ destroy(): void { - log('OHLCV-WS: Destroying — clearing all channels and timers', { - channelCount: this.#channels.size, - }); for (const entry of this.#channels.values()) { if (entry.gracePeriodTimer) { clearTimeout(entry.gracePeriodTimer); From 094d56fb32ec27b427d4b45aed762c0f59754da9 Mon Sep 17 00:00:00 2001 From: sahar-fehri Date: Tue, 12 May 2026 00:11:07 +0200 Subject: [PATCH 14/26] fix: lint --- packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts b/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts index ae516b44ef..4f520e840d 100644 --- a/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts +++ b/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts @@ -433,8 +433,7 @@ describe('OHLCVService', () => { it('should flush other grace-period channels when subscribing to a new channel', async () => { const otherOpts: OHLCVSubscriptionOptions = { - assetId: - 'eip155:8453/erc20:0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913', + assetId: 'eip155:8453/erc20:0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913', interval: '1h', currency: 'usd', }; From dbcc5ba38f66ac8846862ddac5c5772fa736efee Mon Sep 17 00:00:00 2001 From: sahar-fehri Date: Tue, 12 May 2026 02:10:31 +0200 Subject: [PATCH 15/26] fix: handle rejection --- .../src/ws/ohlcv/OHLCVService.test.ts | 27 +++++++++++++++++++ .../core-backend/src/ws/ohlcv/OHLCVService.ts | 9 ++++--- 2 files changed, 33 insertions(+), 3 deletions(-) diff --git a/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts b/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts index 4f520e840d..1a5f670613 100644 --- a/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts +++ b/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts @@ -814,6 +814,33 @@ describe('OHLCVService', () => { }); }); + it('should not produce unhandled rejection when forceReconnection throws during grace-period unsubscribe', async () => { + await withService(async ({ service, mocks, messenger }) => { + mocks.getSubscriptionsByChannel.mockImplementation(() => { + throw new Error('ws gone'); + }); + mocks.forceReconnection.mockRejectedValue( + new Error('reconnect also failed'), + ); + + const errorListener = jest.fn(); + messenger.subscribe('OHLCVService:subscriptionError', errorListener); + + await service.subscribe(SUB_OPTS); + await service.unsubscribe(SUB_OPTS); + + jest.advanceTimersByTime(3000); + await completeAsyncOperations(); + + expect(errorListener).toHaveBeenCalledWith({ + channel: EXPECTED_CHANNEL, + error: expect.stringContaining('ws gone'), + operation: 'unsubscribe', + }); + expect(mocks.forceReconnection).toHaveBeenCalled(); + }); + }); + it('should log and continue when resubscription fails for a channel', async () => { await withService(async ({ service, mocks, rootMessenger }) => { await service.subscribe(SUB_OPTS); diff --git a/packages/core-backend/src/ws/ohlcv/OHLCVService.ts b/packages/core-backend/src/ws/ohlcv/OHLCVService.ts index c0c32c7755..351ae3af93 100644 --- a/packages/core-backend/src/ws/ohlcv/OHLCVService.ts +++ b/packages/core-backend/src/ws/ohlcv/OHLCVService.ts @@ -312,8 +312,9 @@ export class OHLCVService { } entry.gracePeriodTimer = setTimeout(() => { - // eslint-disable-next-line @typescript-eslint/no-floating-promises - this.#performUnsubscribe(channel); + this.#performUnsubscribe(channel).catch(() => { + // no-op + }); }, GRACE_PERIOD_MS); } @@ -347,7 +348,9 @@ export class OHLCVService { error: String(error), operation: 'unsubscribe', }); - await this.#forceReconnection(); + await this.#forceReconnection().catch(() => { + // no-op + }); } } From 96765457bef2ae04fd914a1f2f7a9a24be2c964a Mon Sep 17 00:00:00 2001 From: sahar-fehri Date: Tue, 12 May 2026 02:15:48 +0200 Subject: [PATCH 16/26] fix: do not resubscribe channels in grace period --- .../src/ws/ohlcv/OHLCVService.test.ts | 24 +++++++++++++++++++ .../core-backend/src/ws/ohlcv/OHLCVService.ts | 6 ++++- 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts b/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts index 1a5f670613..79a20f9b3c 100644 --- a/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts +++ b/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts @@ -636,6 +636,30 @@ describe('OHLCVService', () => { }); }); + it('should not resubscribe channels in grace period (refCount === 0)', async () => { + await withService(async ({ service, mocks, rootMessenger }) => { + await service.subscribe(SUB_OPTS); + await service.unsubscribe(SUB_OPTS); + + // Channel is now in grace period (refCount === 0, timer running) + mocks.subscribe.mockClear(); + mocks.channelHasSubscription.mockReturnValue(false); + + rootMessenger.publish( + 'BackendWebSocketService:connectionStateChanged', + { + ...BASE_CONNECTION_INFO, + state: WebSocketState.CONNECTED, + connectedAt: Date.now(), + reconnectAttempts: 0, + }, + ); + await completeAsyncOperations(); + + expect(mocks.subscribe).not.toHaveBeenCalled(); + }); + }); + it('should deliver bar updates via resubscribed channel callback', async () => { await withService( async ({ service, mocks, messenger, rootMessenger }) => { diff --git a/packages/core-backend/src/ws/ohlcv/OHLCVService.ts b/packages/core-backend/src/ws/ohlcv/OHLCVService.ts index 351ae3af93..7392bc392f 100644 --- a/packages/core-backend/src/ws/ohlcv/OHLCVService.ts +++ b/packages/core-backend/src/ws/ohlcv/OHLCVService.ts @@ -364,7 +364,11 @@ export class OHLCVService { count: channelCount, }); - for (const [channel] of this.#channels.entries()) { + for (const [channel, entry] of this.#channels.entries()) { + if (entry.refCount === 0) { + continue; + } + try { if ( this.#messenger.call( From b80626897d00e6534890f892f590e0dce270023e Mon Sep 17 00:00:00 2001 From: sahar-fehri Date: Tue, 12 May 2026 14:40:58 +0200 Subject: [PATCH 17/26] fix: add channel lock logic to handle concurrent sub/unsub --- .../src/ws/ohlcv/OHLCVService.test.ts | 82 +++++++++++++++++++ .../core-backend/src/ws/ohlcv/OHLCVService.ts | 39 +++++++++ 2 files changed, 121 insertions(+) diff --git a/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts b/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts index 79a20f9b3c..55532be27c 100644 --- a/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts +++ b/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts @@ -585,6 +585,88 @@ describe('OHLCVService', () => { }); }); + // =========================================================================== + // Race Condition — Per-Channel Locking + // =========================================================================== + + describe('per-channel locking', () => { + it('should serialize concurrent subscribes so refCount is correct', async () => { + await withService(async ({ service, mocks }) => { + let connectResolve!: () => void; + mocks.connect.mockImplementation( + () => + new Promise((resolve) => { + connectResolve = resolve; + }), + ); + + const p1 = service.subscribe(SUB_OPTS); + // Let the microtask tick so `connect` is called and `connectResolve` is assigned + await flushPromises(); + + const p2 = service.subscribe(SUB_OPTS); + + // p1 is waiting on connect, p2 is queued behind it via the lock + connectResolve(); + mocks.connect.mockResolvedValue(undefined); + await p1; + await p2; + + expect(mocks.subscribe).toHaveBeenCalledTimes(1); + + const mockUnsub = jest.fn(); + mocks.getSubscriptionsByChannel.mockReturnValue([ + { unsubscribe: mockUnsub }, + ]); + + // refCount must be 2 — first unsubscribe drops it to 1, no grace timer + await service.unsubscribe(SUB_OPTS); + jest.advanceTimersByTime(5000); + await completeAsyncOperations(); + expect(mockUnsub).not.toHaveBeenCalled(); + + // Second unsubscribe drops refCount to 0 → grace timer → WS unsubscribe + await service.unsubscribe(SUB_OPTS); + jest.advanceTimersByTime(3000); + await completeAsyncOperations(); + expect(mockUnsub).toHaveBeenCalledTimes(1); + }); + }); + + it('should serialize concurrent subscribe + unsubscribe so refCount never corrupts', async () => { + await withService(async ({ service, mocks }) => { + let connectResolve!: () => void; + mocks.connect.mockImplementation( + () => + new Promise((resolve) => { + connectResolve = resolve; + }), + ); + + const pSub = service.subscribe(SUB_OPTS); + await flushPromises(); + + const pUnsub = service.unsubscribe(SUB_OPTS); + + connectResolve(); + await pSub; + await pUnsub; + + // After subscribe then unsubscribe, refCount is 0 → grace timer starts + // Advance past grace period + const mockUnsub = jest.fn(); + mocks.getSubscriptionsByChannel.mockReturnValue([ + { unsubscribe: mockUnsub }, + ]); + + jest.advanceTimersByTime(3000); + await completeAsyncOperations(); + + expect(mockUnsub).toHaveBeenCalledTimes(1); + }); + }); + }); + // =========================================================================== // Reconnect Resilience // =========================================================================== diff --git a/packages/core-backend/src/ws/ohlcv/OHLCVService.ts b/packages/core-backend/src/ws/ohlcv/OHLCVService.ts index 7392bc392f..8b52f0977a 100644 --- a/packages/core-backend/src/ws/ohlcv/OHLCVService.ts +++ b/packages/core-backend/src/ws/ohlcv/OHLCVService.ts @@ -156,6 +156,8 @@ export class OHLCVService { readonly #channels = new Map(); + readonly #channelLocks = new Map>(); + readonly #chainsUp = new Set(); // ============================================================================= @@ -210,9 +212,14 @@ export class OHLCVService { * count. * * @param options - The subscription parameters. + * @returns A promise that resolves once the subscription is established. */ async subscribe(options: OHLCVSubscriptionOptions): Promise { const channel = this.#buildChannel(options); + return this.#withChannelLock(channel, () => this.#subscribeInner(channel)); + } + + async #subscribeInner(channel: string): Promise { const entry = this.#channels.get(channel); if (entry?.gracePeriodTimer) { @@ -296,9 +303,16 @@ export class OHLCVService { * unsubscribing from the WebSocket to absorb rapid navigation patterns. * * @param options - The subscription parameters to unsubscribe from. + * @returns A promise that resolves once the unsubscription is processed. */ async unsubscribe(options: OHLCVSubscriptionOptions): Promise { const channel = this.#buildChannel(options); + return this.#withChannelLock(channel, () => + this.#unsubscribeInner(channel), + ); + } + + async #unsubscribeInner(channel: string): Promise { const entry = this.#channels.get(channel); if (!entry || entry.refCount <= 0) { @@ -492,6 +506,30 @@ export class OHLCVService { return `${SUBSCRIPTION_NAMESPACE}.${options.assetId}.${options.interval}.${options.currency}`; } + /** + * Serialize async operations for the same channel so that concurrent + * subscribe/unsubscribe calls cannot interleave and corrupt refCount. + * Different channels are not blocked by each other. + * + * @param channel - The channel key to lock on. + * @param fn - The async function to execute under the lock. + */ + async #withChannelLock( + channel: string, + fn: () => Promise, + ): Promise { + const prev = this.#channelLocks.get(channel) ?? Promise.resolve(); + const next = prev.then(fn, fn); + this.#channelLocks.set(channel, next); + try { + await next; + } finally { + if (this.#channelLocks.get(channel) === next) { + this.#channelLocks.delete(channel); + } + } + } + async #forceReconnection(): Promise { log('OHLCV-WS: Forcing WebSocket reconnection'); await this.#messenger.call('BackendWebSocketService:forceReconnection'); @@ -511,6 +549,7 @@ export class OHLCVService { } } this.#channels.clear(); + this.#channelLocks.clear(); this.#messenger.call( 'BackendWebSocketService:removeChannelCallback', From c7e584108c78602ee501395439f55b5466af4d14 Mon Sep 17 00:00:00 2001 From: sahar-fehri Date: Tue, 12 May 2026 14:46:05 +0200 Subject: [PATCH 18/26] fix: jsdoc --- .../src/ws/ohlcv/OHLCVService-method-action-types.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/core-backend/src/ws/ohlcv/OHLCVService-method-action-types.ts b/packages/core-backend/src/ws/ohlcv/OHLCVService-method-action-types.ts index fc052889d8..6c30d85382 100644 --- a/packages/core-backend/src/ws/ohlcv/OHLCVService-method-action-types.ts +++ b/packages/core-backend/src/ws/ohlcv/OHLCVService-method-action-types.ts @@ -12,6 +12,7 @@ import type { OHLCVService } from './OHLCVService'; * count. * * @param options - The subscription parameters. + * @returns A promise that resolves once the subscription is established. */ export type OHLCVServiceSubscribeAction = { type: `OHLCVService:subscribe`; @@ -24,6 +25,7 @@ export type OHLCVServiceSubscribeAction = { * unsubscribing from the WebSocket to absorb rapid navigation patterns. * * @param options - The subscription parameters to unsubscribe from. + * @returns A promise that resolves once the unsubscription is processed. */ export type OHLCVServiceUnsubscribeAction = { type: `OHLCVService:unsubscribe`; From bb01736ce8c3539cd8e6df3abd6152914a7b36d5 Mon Sep 17 00:00:00 2001 From: sahar-fehri Date: Tue, 12 May 2026 15:37:42 +0200 Subject: [PATCH 19/26] fix: add channel lock for performunsub --- .../src/ws/ohlcv/OHLCVService.test.ts | 33 ++++++++++- .../core-backend/src/ws/ohlcv/OHLCVService.ts | 56 ++++++++++--------- 2 files changed, 62 insertions(+), 27 deletions(-) diff --git a/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts b/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts index 55532be27c..fe50c06df6 100644 --- a/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts +++ b/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts @@ -26,7 +26,11 @@ type RootMessenger = Messenger< >; const completeAsyncOperations = async (timeoutMs = 0): Promise => { - await flushPromises(); + // Multiple rounds are needed because the channel lock chains promises + // through .then(), requiring several microtask ticks to fully settle. + for (let i = 0; i < 5; i++) { + await flushPromises(); + } if (timeoutMs > 0) { await new Promise((resolve) => setTimeout(resolve, timeoutMs)); } @@ -665,6 +669,33 @@ describe('OHLCVService', () => { expect(mockUnsub).toHaveBeenCalledTimes(1); }); }); + + it('should create a fresh WS subscription when subscribe races with grace-period unsubscribe', async () => { + await withService(async ({ service, mocks }) => { + let unsubResolve!: () => void; + const mockUnsub = jest.fn( + () => + new Promise((resolve) => { + unsubResolve = resolve; + }), + ); + mocks.getSubscriptionsByChannel.mockReturnValue([ + { unsubscribe: mockUnsub }, + ]); + + await service.subscribe(SUB_OPTS); + await service.unsubscribe(SUB_OPTS); + + jest.advanceTimersByTime(3000); + await flushPromises(); + + const subscribePromise = service.subscribe(SUB_OPTS); + unsubResolve(); + await subscribePromise; + + expect(mocks.subscribe).toHaveBeenCalledTimes(2); + }); + }); }); // =========================================================================== diff --git a/packages/core-backend/src/ws/ohlcv/OHLCVService.ts b/packages/core-backend/src/ws/ohlcv/OHLCVService.ts index 8b52f0977a..25f3d6b85d 100644 --- a/packages/core-backend/src/ws/ohlcv/OHLCVService.ts +++ b/packages/core-backend/src/ws/ohlcv/OHLCVService.ts @@ -326,6 +326,7 @@ export class OHLCVService { } entry.gracePeriodTimer = setTimeout(() => { + entry.gracePeriodTimer = undefined; this.#performUnsubscribe(channel).catch(() => { // no-op }); @@ -337,35 +338,38 @@ export class OHLCVService { // ============================================================================= async #performUnsubscribe(channel: string): Promise { - log('OHLCV-WS: Grace period expired — performing actual WS unsubscribe', { - channel, - }); - this.#channels.delete(channel); - - try { - const subscriptions = this.#messenger.call( - 'BackendWebSocketService:getSubscriptionsByChannel', - channel, + return this.#withChannelLock(channel, async () => { + log( + 'OHLCV-WS: Grace period expired — performing actual WS unsubscribe', + { channel }, ); + this.#channels.delete(channel); + + try { + const subscriptions = this.#messenger.call( + 'BackendWebSocketService:getSubscriptionsByChannel', + channel, + ); - for (const sub of subscriptions) { - await sub.unsubscribe(); + for (const sub of subscriptions) { + await sub.unsubscribe(); + } + log('OHLCV-WS: WS unsubscribe completed', { channel }); + } catch (error) { + log('OHLCV-WS: Unsubscription failed, forcing reconnection', { + channel, + error, + }); + this.#messenger.publish('OHLCVService:subscriptionError', { + channel, + error: String(error), + operation: 'unsubscribe', + }); + await this.#forceReconnection().catch(() => { + // no-op + }); } - log('OHLCV-WS: WS unsubscribe completed', { channel }); - } catch (error) { - log('OHLCV-WS: Unsubscription failed, forcing reconnection', { - channel, - error, - }); - this.#messenger.publish('OHLCVService:subscriptionError', { - channel, - error: String(error), - operation: 'unsubscribe', - }); - await this.#forceReconnection().catch(() => { - // no-op - }); - } + }); } /** From 6a86401055f3bd858180b13b8fbb6736200a696c Mon Sep 17 00:00:00 2001 From: sahar-fehri Date: Tue, 12 May 2026 16:07:15 +0200 Subject: [PATCH 20/26] fix: fix unsub condition --- packages/core-backend/src/ws/ohlcv/OHLCVService.ts | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/packages/core-backend/src/ws/ohlcv/OHLCVService.ts b/packages/core-backend/src/ws/ohlcv/OHLCVService.ts index 25f3d6b85d..bafc1a445f 100644 --- a/packages/core-backend/src/ws/ohlcv/OHLCVService.ts +++ b/packages/core-backend/src/ws/ohlcv/OHLCVService.ts @@ -339,6 +339,15 @@ export class OHLCVService { async #performUnsubscribe(channel: string): Promise { return this.#withChannelLock(channel, async () => { + const entry = this.#channels.get(channel); + if (entry && entry.refCount > 0) { + log( + 'OHLCV-WS: Skipping unsubscribe — new subscriber arrived while queued', + { channel, refCount: entry.refCount }, + ); + return; + } + log( 'OHLCV-WS: Grace period expired — performing actual WS unsubscribe', { channel }, From 5b75ca1dd6b1f87632f876cad903493087bd4b3d Mon Sep 17 00:00:00 2001 From: sahar-fehri Date: Tue, 12 May 2026 16:49:22 +0200 Subject: [PATCH 21/26] fix: use mutex for ohlcv lock --- packages/core-backend/package.json | 1 + .../src/ws/ohlcv/OHLCVService.test.ts | 76 +++---------------- .../core-backend/src/ws/ohlcv/OHLCVService.ts | 66 +++++----------- yarn.lock | 1 + 4 files changed, 32 insertions(+), 112 deletions(-) diff --git a/packages/core-backend/package.json b/packages/core-backend/package.json index 97dc4a6095..5e7019a6cc 100644 --- a/packages/core-backend/package.json +++ b/packages/core-backend/package.json @@ -60,6 +60,7 @@ "@metamask/profile-sync-controller": "^28.0.2", "@metamask/utils": "^11.9.0", "@tanstack/query-core": "^5.62.16", + "async-mutex": "^0.5.0", "uuid": "^8.3.2" }, "devDependencies": { diff --git a/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts b/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts index fe50c06df6..a155f1797c 100644 --- a/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts +++ b/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts @@ -435,47 +435,6 @@ describe('OHLCVService', () => { }); }); - it('should flush other grace-period channels when subscribing to a new channel', async () => { - const otherOpts: OHLCVSubscriptionOptions = { - assetId: 'eip155:8453/erc20:0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913', - interval: '1h', - currency: 'usd', - }; - const otherChannel = - 'market-data.v1.eip155:8453/erc20:0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913.1h.usd'; - - await withService(async ({ service, mocks }) => { - const mockUnsub = jest.fn(); - mocks.getSubscriptionsByChannel.mockReturnValue([ - { unsubscribe: mockUnsub }, - ]); - - // Subscribe to first channel (1m), then unsubscribe to put it in grace period - await service.subscribe(SUB_OPTS); - await service.unsubscribe(SUB_OPTS); - - // Grace period is running for the 1m channel - expect(mockUnsub).not.toHaveBeenCalled(); - - // Subscribe to a different channel (1h) — should flush the 1m grace period - mocks.subscribe.mockClear(); - await service.subscribe(otherOpts); - - // The old channel (1m) should have been immediately unsubscribed - expect(mocks.getSubscriptionsByChannel).toHaveBeenCalledWith( - EXPECTED_CHANNEL, - ); - expect(mockUnsub).toHaveBeenCalledTimes(1); - - // The new channel (1h) should have been subscribed - expect(mocks.subscribe).toHaveBeenCalledWith({ - channels: [otherChannel], - channelType: 'market-data.v1', - callback: expect.any(Function), - }); - }); - }); - it('should not flush same-channel grace period (reuse instead)', async () => { await withService(async ({ service, mocks }) => { const mockUnsub = jest.fn(); @@ -498,16 +457,12 @@ describe('OHLCVService', () => { }); }); - it('should handle rapid time-range switching without accumulating subscriptions', async () => { - const opts15m = SUB_OPTS; // 1m interval + it('should unsubscribe old channels via grace period during rapid time-range switching', async () => { + const opts1m = SUB_OPTS; const opts1h: OHLCVSubscriptionOptions = { ...SUB_OPTS, interval: '1h', }; - const opts1d: OHLCVSubscriptionOptions = { - ...SUB_OPTS, - interval: '1d', - }; await withService(async ({ service, mocks }) => { const mockUnsub = jest.fn(); @@ -515,29 +470,18 @@ describe('OHLCVService', () => { { unsubscribe: mockUnsub }, ]); - // Subscribe 1m → unsubscribe → subscribe 1h → unsubscribe → subscribe 1d - await service.subscribe(opts15m); - await service.unsubscribe(opts15m); - // 1m is now in grace period - + // Subscribe 1m → unsubscribe → subscribe 1h + await service.subscribe(opts1m); + await service.unsubscribe(opts1m); await service.subscribe(opts1h); - // 1m should have been flushed - expect(mockUnsub).toHaveBeenCalledTimes(1); - - await service.unsubscribe(opts1h); - // 1h is now in grace period - mockUnsub.mockClear(); - await service.subscribe(opts1d); - // 1h should have been flushed - expect(mockUnsub).toHaveBeenCalledTimes(1); + // 1m is in grace period, not yet unsubscribed + expect(mockUnsub).not.toHaveBeenCalled(); - // After all this, only 1d should be active — no accumulation - // Advance timers well past any grace period — no additional unsubscribes - mockUnsub.mockClear(); - jest.advanceTimersByTime(5000); + // Grace period expires — old channel cleaned up + jest.advanceTimersByTime(3000); await completeAsyncOperations(); - expect(mockUnsub).not.toHaveBeenCalled(); + expect(mockUnsub).toHaveBeenCalledTimes(1); }); }); }); diff --git a/packages/core-backend/src/ws/ohlcv/OHLCVService.ts b/packages/core-backend/src/ws/ohlcv/OHLCVService.ts index bafc1a445f..4ab5701020 100644 --- a/packages/core-backend/src/ws/ohlcv/OHLCVService.ts +++ b/packages/core-backend/src/ws/ohlcv/OHLCVService.ts @@ -13,6 +13,7 @@ import type { TraceRequest, } from '@metamask/controller-utils'; import type { Messenger } from '@metamask/messenger'; +import { Mutex } from 'async-mutex'; import { projectLogger, createModuleLogger } from '../../logger'; import type { @@ -156,7 +157,7 @@ export class OHLCVService { readonly #channels = new Map(); - readonly #channelLocks = new Map>(); + readonly #mutex = new Mutex(); readonly #chainsUp = new Set(); @@ -216,7 +217,12 @@ export class OHLCVService { */ async subscribe(options: OHLCVSubscriptionOptions): Promise { const channel = this.#buildChannel(options); - return this.#withChannelLock(channel, () => this.#subscribeInner(channel)); + const releaseLock = await this.#mutex.acquire(); + try { + await this.#subscribeInner(channel); + } finally { + releaseLock(); + } } async #subscribeInner(channel: string): Promise { @@ -238,20 +244,6 @@ export class OHLCVService { return; } - // Flush other channels sitting in grace period to free server-side slots - // and prevent accumulation when switching time ranges rapidly. - for (const [otherChannel, otherEntry] of this.#channels.entries()) { - if (otherChannel !== channel && otherEntry.gracePeriodTimer) { - clearTimeout(otherEntry.gracePeriodTimer); - otherEntry.gracePeriodTimer = undefined; - log('OHLCV-WS: Flushing grace-period channel before new subscribe', { - flushedChannel: otherChannel, - newChannel: channel, - }); - await this.#performUnsubscribe(otherChannel); - } - } - try { await this.#messenger.call('BackendWebSocketService:connect'); @@ -307,9 +299,12 @@ export class OHLCVService { */ async unsubscribe(options: OHLCVSubscriptionOptions): Promise { const channel = this.#buildChannel(options); - return this.#withChannelLock(channel, () => - this.#unsubscribeInner(channel), - ); + const releaseLock = await this.#mutex.acquire(); + try { + await this.#unsubscribeInner(channel); + } finally { + releaseLock(); + } } async #unsubscribeInner(channel: string): Promise { @@ -338,7 +333,8 @@ export class OHLCVService { // ============================================================================= async #performUnsubscribe(channel: string): Promise { - return this.#withChannelLock(channel, async () => { + const releaseLock = await this.#mutex.acquire(); + try { const entry = this.#channels.get(channel); if (entry && entry.refCount > 0) { log( @@ -378,7 +374,9 @@ export class OHLCVService { // no-op }); } - }); + } finally { + releaseLock(); + } } /** @@ -519,30 +517,6 @@ export class OHLCVService { return `${SUBSCRIPTION_NAMESPACE}.${options.assetId}.${options.interval}.${options.currency}`; } - /** - * Serialize async operations for the same channel so that concurrent - * subscribe/unsubscribe calls cannot interleave and corrupt refCount. - * Different channels are not blocked by each other. - * - * @param channel - The channel key to lock on. - * @param fn - The async function to execute under the lock. - */ - async #withChannelLock( - channel: string, - fn: () => Promise, - ): Promise { - const prev = this.#channelLocks.get(channel) ?? Promise.resolve(); - const next = prev.then(fn, fn); - this.#channelLocks.set(channel, next); - try { - await next; - } finally { - if (this.#channelLocks.get(channel) === next) { - this.#channelLocks.delete(channel); - } - } - } - async #forceReconnection(): Promise { log('OHLCV-WS: Forcing WebSocket reconnection'); await this.#messenger.call('BackendWebSocketService:forceReconnection'); @@ -562,7 +536,7 @@ export class OHLCVService { } } this.#channels.clear(); - this.#channelLocks.clear(); + this.#chainsUp.clear(); this.#messenger.call( 'BackendWebSocketService:removeChannelCallback', diff --git a/yarn.lock b/yarn.lock index 65bc1d5905..ed8670119d 100644 --- a/yarn.lock +++ b/yarn.lock @@ -3382,6 +3382,7 @@ __metadata: "@tanstack/query-core": "npm:^5.62.16" "@ts-bridge/cli": "npm:^0.6.4" "@types/jest": "npm:^29.5.14" + async-mutex: "npm:^0.5.0" deepmerge: "npm:^4.2.2" jest: "npm:^29.7.0" jest-environment-jsdom: "npm:^29.7.0" From f098394f6f1f98b92a63e632e9e78faf217d4e75 Mon Sep 17 00:00:00 2001 From: sahar-fehri Date: Tue, 12 May 2026 16:57:43 +0200 Subject: [PATCH 22/26] fix: lint --- packages/core-backend/src/ws/ohlcv/OHLCVService.ts | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/packages/core-backend/src/ws/ohlcv/OHLCVService.ts b/packages/core-backend/src/ws/ohlcv/OHLCVService.ts index 4ab5701020..0ba45a4160 100644 --- a/packages/core-backend/src/ws/ohlcv/OHLCVService.ts +++ b/packages/core-backend/src/ws/ohlcv/OHLCVService.ts @@ -344,10 +344,9 @@ export class OHLCVService { return; } - log( - 'OHLCV-WS: Grace period expired — performing actual WS unsubscribe', - { channel }, - ); + log('OHLCV-WS: Grace period expired — performing actual WS unsubscribe', { + channel, + }); this.#channels.delete(channel); try { From 68f5cc72b03df2ee95bfcd9f730f0975454700eb Mon Sep 17 00:00:00 2001 From: sahar-fehri Date: Tue, 12 May 2026 17:22:07 +0200 Subject: [PATCH 23/26] fix: add mutex on concurrent resubscrible --- .../src/ws/ohlcv/OHLCVService.test.ts | 59 ++++++++++++++++ .../core-backend/src/ws/ohlcv/OHLCVService.ts | 70 +++++++++++-------- 2 files changed, 98 insertions(+), 31 deletions(-) diff --git a/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts b/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts index a155f1797c..06f8906528 100644 --- a/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts +++ b/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts @@ -946,6 +946,65 @@ describe('OHLCVService', () => { }); }); + // =========================================================================== + // Reconnect + Concurrent Mutation Safety + // =========================================================================== + + describe('resubscribe holds mutex to prevent concurrent mutation', () => { + it('should block unsubscribe until resubscription completes, preventing orphaned WS subscriptions', async () => { + await withService(async ({ service, mocks, rootMessenger }) => { + await service.subscribe(SUB_OPTS); + mocks.subscribe.mockClear(); + mocks.channelHasSubscription.mockReturnValue(false); + + // Make the WS subscribe during reconnect take time so we can + // attempt a concurrent unsubscribe while it's in progress. + let resubResolve!: () => void; + mocks.subscribe.mockImplementation( + () => + new Promise((resolve) => { + resubResolve = resolve; + }), + ); + + // Trigger reconnect — this calls #resubscribeActiveChannels which + // now holds the mutex across the entire loop. + rootMessenger.publish( + 'BackendWebSocketService:connectionStateChanged', + { + ...BASE_CONNECTION_INFO, + state: WebSocketState.CONNECTED, + connectedAt: Date.now(), + reconnectAttempts: 1, + }, + ); + await flushPromises(); + + // Concurrent unsubscribe — must queue behind the mutex. + const unsubPromise = service.unsubscribe(SUB_OPTS); + + // The unsubscribe hasn't run yet because the mutex is held. + // Complete the WS resubscription. + resubResolve(); + await flushPromises(); + await unsubPromise; + + // refCount was 1 at reconnect time; after resubscribe completes + // the queued unsubscribe drops it to 0 and starts the grace timer. + const mockUnsub = jest.fn(); + mocks.getSubscriptionsByChannel.mockReturnValue([ + { unsubscribe: mockUnsub }, + ]); + + jest.advanceTimersByTime(3000); + await completeAsyncOperations(); + + // The grace-period unsubscribe fires cleanly — no orphaned subscription. + expect(mockUnsub).toHaveBeenCalledTimes(1); + }); + }); + }); + // =========================================================================== // Destroy // =========================================================================== diff --git a/packages/core-backend/src/ws/ohlcv/OHLCVService.ts b/packages/core-backend/src/ws/ohlcv/OHLCVService.ts index 0ba45a4160..52f3821f2d 100644 --- a/packages/core-backend/src/ws/ohlcv/OHLCVService.ts +++ b/packages/core-backend/src/ws/ohlcv/OHLCVService.ts @@ -383,43 +383,51 @@ export class OHLCVService { * Called when WebSocket transitions to CONNECTED. */ async #resubscribeActiveChannels(): Promise { - const channelCount = this.#channels.size; - log('OHLCV-WS: Resubscribing active channels after reconnect', { - count: channelCount, - }); + const releaseLock = await this.#mutex.acquire(); + try { + const channelCount = this.#channels.size; + log('OHLCV-WS: Resubscribing active channels after reconnect', { + count: channelCount, + }); - for (const [channel, entry] of this.#channels.entries()) { - if (entry.refCount === 0) { - continue; - } + for (const [channel, entry] of this.#channels.entries()) { + if (entry.refCount === 0) { + continue; + } - try { - if ( - this.#messenger.call( - 'BackendWebSocketService:channelHasSubscription', - channel, - ) - ) { - log( - 'OHLCV-WS: Channel already subscribed on server, skipping resubscribe', - { + try { + if ( + this.#messenger.call( + 'BackendWebSocketService:channelHasSubscription', channel, + ) + ) { + log( + 'OHLCV-WS: Channel already subscribed on server, skipping resubscribe', + { + channel, + }, + ); + continue; + } + + await this.#messenger.call('BackendWebSocketService:subscribe', { + channels: [channel], + channelType: SUBSCRIPTION_NAMESPACE, + callback: (notification: ServerNotificationMessage) => { + this.#handleBarUpdate(channel, notification); }, - ); - continue; + }); + log('OHLCV-WS: Resubscription succeeded', { channel }); + } catch (error) { + log('OHLCV-WS: Resubscription failed for channel', { + channel, + error, + }); } - - await this.#messenger.call('BackendWebSocketService:subscribe', { - channels: [channel], - channelType: SUBSCRIPTION_NAMESPACE, - callback: (notification: ServerNotificationMessage) => { - this.#handleBarUpdate(channel, notification); - }, - }); - log('OHLCV-WS: Resubscription succeeded', { channel }); - } catch (error) { - log('OHLCV-WS: Resubscription failed for channel', { channel, error }); } + } finally { + releaseLock(); } } From 01e018024f7508d76a6b5013bc1316c266a43c16 Mon Sep 17 00:00:00 2001 From: sahar-fehri Date: Tue, 12 May 2026 17:39:13 +0200 Subject: [PATCH 24/26] fix: resubscribe after disconnect --- .../src/ws/ohlcv/OHLCVService.test.ts | 93 +++++++++++++++++++ .../core-backend/src/ws/ohlcv/OHLCVService.ts | 14 ++- 2 files changed, 104 insertions(+), 3 deletions(-) diff --git a/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts b/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts index 06f8906528..28858f7f1a 100644 --- a/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts +++ b/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts @@ -418,6 +418,9 @@ describe('OHLCVService', () => { await service.subscribe(SUB_OPTS); await service.unsubscribe(SUB_OPTS); + // WS subscription still exists (no disconnect happened) + mocks.channelHasSubscription.mockReturnValue(true); + // Re-subscribe during grace period jest.advanceTimersByTime(1000); mocks.subscribe.mockClear(); @@ -445,6 +448,9 @@ describe('OHLCVService', () => { await service.subscribe(SUB_OPTS); await service.unsubscribe(SUB_OPTS); + // WS subscription still exists (no disconnect happened) + mocks.channelHasSubscription.mockReturnValue(true); + // Re-subscribe to the SAME channel — should cancel grace, not flush mocks.subscribe.mockClear(); mocks.connect.mockClear(); @@ -717,6 +723,93 @@ describe('OHLCVService', () => { }); }); + it('should recreate WS subscription when re-subscribing during grace period after disconnect', async () => { + await withService( + async ({ service, mocks, messenger, rootMessenger }) => { + // 1. Subscribe — creates WS subscription, refCount = 1 + await service.subscribe(SUB_OPTS); + + // 2. Unsubscribe — refCount = 0, grace-period timer starts + await service.unsubscribe(SUB_OPTS); + + // 3. Disconnect — BackendWebSocketService clears all server-side + // subscriptions. channelHasSubscription now returns false. + mocks.channelHasSubscription.mockReturnValue(false); + rootMessenger.publish( + 'BackendWebSocketService:connectionStateChanged', + { + ...BASE_CONNECTION_INFO, + state: WebSocketState.DISCONNECTED, + connectedAt: undefined, + reconnectAttempts: 0, + }, + ); + await completeAsyncOperations(); + + // 4. Reconnect — resubscribeActiveChannels skips this channel + // because refCount is 0 (correct behaviour). + mocks.subscribe.mockClear(); + mocks.connect.mockClear(); + rootMessenger.publish( + 'BackendWebSocketService:connectionStateChanged', + { + ...BASE_CONNECTION_INFO, + state: WebSocketState.CONNECTED, + connectedAt: Date.now(), + reconnectAttempts: 1, + }, + ); + await completeAsyncOperations(); + expect(mocks.subscribe).not.toHaveBeenCalled(); + + // 5. User re-subscribes BEFORE grace timer fires. + // The grace-period branch cancels the timer and bumps refCount, + // but the underlying WS subscription no longer exists. + // The fix must detect this and create a fresh WS subscription. + mocks.subscribe.mockClear(); + mocks.connect.mockClear(); + await service.subscribe(SUB_OPTS); + + expect(mocks.connect).toHaveBeenCalledTimes(1); + expect(mocks.subscribe).toHaveBeenCalledWith({ + channels: [EXPECTED_CHANNEL], + channelType: 'market-data.v1', + callback: expect.any(Function), + }); + + // 6. Verify bar updates are delivered through the new subscription. + const capturedCallback = + mocks.subscribe.mock.calls[0][0].callback; + const barListener = jest.fn(); + messenger.subscribe('OHLCVService:barUpdated', barListener); + + capturedCallback({ + data: { + timestamp: 200, + open: 10, + high: 20, + low: 5, + close: 15, + volume: 1000, + }, + timestamp: Date.now(), + }); + + expect(barListener).toHaveBeenCalledWith({ + channel: EXPECTED_CHANNEL, + bar: { + timestamp: 200, + open: 10, + high: 20, + low: 5, + close: 15, + volume: 1000, + }, + }); + }, + ); + }); + it('should deliver bar updates via resubscribed channel callback', async () => { await withService( async ({ service, mocks, messenger, rootMessenger }) => { diff --git a/packages/core-backend/src/ws/ohlcv/OHLCVService.ts b/packages/core-backend/src/ws/ohlcv/OHLCVService.ts index 52f3821f2d..7c523a127a 100644 --- a/packages/core-backend/src/ws/ohlcv/OHLCVService.ts +++ b/packages/core-backend/src/ws/ohlcv/OHLCVService.ts @@ -236,10 +236,18 @@ export class OHLCVService { channel, refCount: entry.refCount, }); - return; - } - if (entry && entry.refCount > 0) { + if ( + this.#messenger.call( + 'BackendWebSocketService:channelHasSubscription', + channel, + ) + ) { + return; + } + // WS subscription was lost (e.g. after disconnect/reconnect) — fall + // through to recreate it while keeping the already-bumped refCount. + } else if (entry && entry.refCount > 0) { entry.refCount += 1; return; } From 337abd924969c0f3a2917aadc29048dcdfec2644 Mon Sep 17 00:00:00 2001 From: sahar-fehri Date: Tue, 12 May 2026 17:43:11 +0200 Subject: [PATCH 25/26] fix: lint --- packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts b/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts index 28858f7f1a..92aac6a00e 100644 --- a/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts +++ b/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts @@ -778,8 +778,7 @@ describe('OHLCVService', () => { }); // 6. Verify bar updates are delivered through the new subscription. - const capturedCallback = - mocks.subscribe.mock.calls[0][0].callback; + const capturedCallback = mocks.subscribe.mock.calls[0][0].callback; const barListener = jest.fn(); messenger.subscribe('OHLCVService:barUpdated', barListener); From 150787a7dc6ce36ad4b4a12af44745d261f2757e Mon Sep 17 00:00:00 2001 From: sahar-fehri Date: Tue, 12 May 2026 17:56:18 +0200 Subject: [PATCH 26/26] fix: fix refCount pump in subscribe --- packages/core-backend/src/ws/ohlcv/OHLCVService.ts | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/packages/core-backend/src/ws/ohlcv/OHLCVService.ts b/packages/core-backend/src/ws/ohlcv/OHLCVService.ts index 7c523a127a..aca5183306 100644 --- a/packages/core-backend/src/ws/ohlcv/OHLCVService.ts +++ b/packages/core-backend/src/ws/ohlcv/OHLCVService.ts @@ -231,10 +231,8 @@ export class OHLCVService { if (entry?.gracePeriodTimer) { clearTimeout(entry.gracePeriodTimer); entry.gracePeriodTimer = undefined; - entry.refCount += 1; - log('OHLCV-WS: Cancelled grace-period unsubscribe, bumped refCount', { + log('OHLCV-WS: Cancelled grace-period unsubscribe', { channel, - refCount: entry.refCount, }); if ( @@ -243,10 +241,15 @@ export class OHLCVService { channel, ) ) { + entry.refCount += 1; + log('OHLCV-WS: WS subscription still alive, bumped refCount', { + channel, + refCount: entry.refCount, + }); return; } // WS subscription was lost (e.g. after disconnect/reconnect) — fall - // through to recreate it while keeping the already-bumped refCount. + // through to recreate it. refCount is bumped only after success below. } else if (entry && entry.refCount > 0) { entry.refCount += 1; return; @@ -288,6 +291,7 @@ export class OHLCVService { channel, error, }); + this.#channels.delete(channel); this.#messenger.publish('OHLCVService:subscriptionError', { channel, error: String(error),