Skip to content

Commit 69d2ba8

Browse files
authored
feat(block): Event-Driven DA Follower with WebSocket Subscriptions (#3131)
* feat: Replace the Syncer's polling DA worker with an event-driven DAFollower and introduce DA client subscription. * feat: add inline blob processing to DAFollower for zero-latency follow mode When the DA subscription delivers blobs at the current local DA height, the followLoop now processes them inline via ProcessBlobs — avoiding a round-trip re-fetch from the DA layer. Architecture: - followLoop: processes subscription blobs inline when caught up (fast path), falls through to catchupLoop when behind (slow path). - catchupLoop: unchanged — sequential RetrieveFromDA() for bulk sync. Changes: - Add Blobs field to SubscriptionEvent for carrying raw blob data - Add extractBlobData() to DA client Subscribe adapter - Export ProcessBlobs on DARetriever interface - Add handleSubscriptionEvent() to DAFollower with inline fast path - Add TestDAFollower_InlineProcessing with 3 sub-tests * feat: subscribe to both header and data namespaces for inline processing When header and data use different DA namespaces, the DAFollower now subscribes to both and merges events via a fan-in goroutine. This ensures inline blob processing works correctly for split-namespace configurations. Changes: - Add DataNamespace to DAFollowerConfig and daFollower - Subscribe to both namespaces in runSubscription with mergeSubscriptions fan-in - Guard handleSubscriptionEvent to only advance localDAHeight when ProcessBlobs returns at least one complete event (header+data matched) - Pass DataNamespace from syncer.go - Implement Subscribe on DummyDA test helper with subscriber notification * feat: add subscription watchdog to detect stalled DA subscriptions If no subscription events arrive within 3× the DA block time (default 30s), the watchdog triggers and returns an error. The followLoop then reconnects the subscription with the standard backoff. This prevents the node from silently stopping sync when the DA subscription stalls (e.g., network partition, DA node freeze). * fix: security hardening for DA subscription path * feat: Implement blob subscription for local DA and update JSON-RPC client to use WebSockets, along with E2E test updates for new `evnode` flags and P2P address retrieval. * WS client constructor * Merge * Linter * Review feedback * Review feedback * Review feedbac * Linter
1 parent 067718d commit 69d2ba8

29 files changed

+1226
-315
lines changed

apps/evm/cmd/run.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,11 @@ var RunCmd = &cobra.Command{
6060
return err
6161
}
6262

63-
blobClient, err := blobrpc.NewClient(context.Background(), nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "")
63+
blobClient, err := blobrpc.NewWSClient(cmd.Context(), nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "")
6464
if err != nil {
6565
return fmt.Errorf("failed to create blob client: %w", err)
6666
}
67+
defer blobClient.Close()
6768

6869
daClient := block.NewDAClient(blobClient, nodeConfig, logger)
6970

apps/evm/server/force_inclusion_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,13 @@ func (m *mockDA) Get(ctx context.Context, ids []da.ID, namespace []byte) ([]da.B
5050
return nil, nil
5151
}
5252

53+
func (m *mockDA) Subscribe(_ context.Context, _ []byte) (<-chan da.SubscriptionEvent, error) {
54+
// Not needed in these tests; return a closed channel.
55+
ch := make(chan da.SubscriptionEvent)
56+
close(ch)
57+
return ch, nil
58+
}
59+
5360
func (m *mockDA) Validate(ctx context.Context, ids []da.ID, proofs []da.Proof, namespace []byte) ([]bool, error) {
5461
return nil, nil
5562
}

apps/grpc/cmd/run.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ func createSequencer(
108108
genesis genesis.Genesis,
109109
executor execution.Executor,
110110
) (coresequencer.Sequencer, error) {
111-
blobClient, err := blobrpc.NewClient(ctx, nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "")
111+
blobClient, err := blobrpc.NewWSClient(ctx, nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "")
112112
if err != nil {
113113
return nil, fmt.Errorf("failed to create blob client: %w", err)
114114
}

apps/testapp/cmd/run.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ func createSequencer(
111111
genesis genesis.Genesis,
112112
executor execution.Executor,
113113
) (coresequencer.Sequencer, error) {
114-
blobClient, err := blobrpc.NewClient(ctx, nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "")
114+
blobClient, err := blobrpc.NewWSClient(ctx, nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "")
115115
if err != nil {
116116
return nil, fmt.Errorf("failed to create blob client: %w", err)
117117
}

block/components.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ func (bc *Components) Stop() error {
114114
}
115115
}
116116
if bc.Syncer != nil {
117-
if err := bc.Syncer.Stop(); err != nil {
117+
if err := bc.Syncer.Stop(context.Background()); err != nil {
118118
errs = errors.Join(errs, fmt.Errorf("failed to stop syncer: %w", err))
119119
}
120120
}

block/internal/da/client.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,71 @@ func (c *client) HasForcedInclusionNamespace() bool {
350350
return c.hasForcedNamespace
351351
}
352352

353+
// Subscribe subscribes to blobs in the given namespace via the celestia-node
354+
// Subscribe API. It returns a channel that emits a SubscriptionEvent for every
355+
// DA block containing a matching blob. The channel is closed when ctx is
356+
// cancelled. The caller must drain the channel after cancellation to avoid
357+
// goroutine leaks.
358+
func (c *client) Subscribe(ctx context.Context, namespace []byte) (<-chan datypes.SubscriptionEvent, error) {
359+
ns, err := share.NewNamespaceFromBytes(namespace)
360+
if err != nil {
361+
return nil, fmt.Errorf("invalid namespace: %w", err)
362+
}
363+
364+
rawCh, err := c.blobAPI.Subscribe(ctx, ns)
365+
if err != nil {
366+
return nil, fmt.Errorf("blob subscribe: %w", err)
367+
}
368+
369+
out := make(chan datypes.SubscriptionEvent, 16)
370+
go func() {
371+
defer close(out)
372+
for {
373+
select {
374+
case <-ctx.Done():
375+
return
376+
case resp, ok := <-rawCh:
377+
if !ok {
378+
return
379+
}
380+
if resp == nil {
381+
continue
382+
}
383+
select {
384+
case out <- datypes.SubscriptionEvent{
385+
Height: resp.Height,
386+
Blobs: extractBlobData(resp),
387+
}:
388+
case <-ctx.Done():
389+
return
390+
}
391+
}
392+
}
393+
}()
394+
395+
return out, nil
396+
}
397+
398+
// extractBlobData extracts raw byte slices from a subscription response,
399+
// filtering out nil blobs, empty data, and blobs exceeding DefaultMaxBlobSize.
400+
func extractBlobData(resp *blobrpc.SubscriptionResponse) [][]byte {
401+
if resp == nil || len(resp.Blobs) == 0 {
402+
return nil
403+
}
404+
blobs := make([][]byte, 0, len(resp.Blobs))
405+
for _, blob := range resp.Blobs {
406+
if blob == nil {
407+
continue
408+
}
409+
data := blob.Data()
410+
if len(data) == 0 || len(data) > common.DefaultMaxBlobSize {
411+
continue
412+
}
413+
blobs = append(blobs, data)
414+
}
415+
return blobs
416+
}
417+
353418
// Get fetches blobs by their IDs. Used for visualization and fetching specific blobs.
354419
func (c *client) Get(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Blob, error) {
355420
if len(ids) == 0 {

block/internal/da/interface.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@ type Client interface {
1717
// Get retrieves blobs by their IDs. Used for visualization and fetching specific blobs.
1818
Get(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Blob, error)
1919

20+
// Subscribe returns a channel that emits one SubscriptionEvent per DA block
21+
// that contains a blob in the given namespace. The channel is closed when ctx
22+
// is cancelled. Callers MUST drain the channel after cancellation.
23+
Subscribe(ctx context.Context, namespace []byte) (<-chan datypes.SubscriptionEvent, error)
24+
2025
// GetLatestDAHeight returns the latest height available on the DA layer.
2126
GetLatestDAHeight(ctx context.Context) (uint64, error)
2227

block/internal/da/tracing.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,9 @@ func (t *tracedClient) GetForcedInclusionNamespace() []byte {
145145
func (t *tracedClient) HasForcedInclusionNamespace() bool {
146146
return t.inner.HasForcedInclusionNamespace()
147147
}
148+
func (t *tracedClient) Subscribe(ctx context.Context, namespace []byte) (<-chan datypes.SubscriptionEvent, error) {
149+
return t.inner.Subscribe(ctx, namespace)
150+
}
148151

149152
type submitError struct{ msg string }
150153

block/internal/da/tracing_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,14 @@ type mockFullClient struct {
2222
getFn func(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Blob, error)
2323
getProofsFn func(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Proof, error)
2424
validateFn func(ctx context.Context, ids []datypes.ID, proofs []datypes.Proof, namespace []byte) ([]bool, error)
25+
subscribeFn func(ctx context.Context, namespace []byte) (<-chan datypes.SubscriptionEvent, error)
26+
}
27+
28+
func (m *mockFullClient) Subscribe(ctx context.Context, namespace []byte) (<-chan datypes.SubscriptionEvent, error) {
29+
if m.subscribeFn == nil {
30+
panic("not expected to be called")
31+
}
32+
return m.subscribeFn(ctx, namespace)
2533
}
2634

2735
func (m *mockFullClient) Submit(ctx context.Context, data [][]byte, gasPrice float64, namespace []byte, options []byte) datypes.ResultSubmit {

0 commit comments

Comments
 (0)