Skip to content

Commit 04c9cad

Browse files
authored
feat(pkg/p2p): reconnect on disconnected peers (#3212)
1 parent ff88b95 commit 04c9cad

9 files changed

Lines changed: 198 additions & 13 deletions

File tree

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99

1010
## [Unreleased]
1111

12+
## v1.1.0-rc.2
13+
1214
### Changes
1315

16+
- Improve P2P transient network failure [#3212](https://github.com/evstack/ev-node/pull/3212)
1417
* Improve execution/evm check for stored meta not stale [#3221](https://github.com/evstack/ev-node/pull/3221)
1518

1619
## v1.1.0-rc.1

apps/evm/go.mod

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@ module github.com/evstack/ev-node/apps/evm
22

33
go 1.25.7
44

5-
// replace (
6-
// github.com/evstack/ev-node => ../../
7-
// github.com/evstack/ev-node/execution/evm => ../../execution/evm
8-
// )
5+
replace (
6+
github.com/evstack/ev-node => ../../
7+
github.com/evstack/ev-node/execution/evm => ../../execution/evm
8+
)
99

1010
require (
1111
github.com/ethereum/go-ethereum v1.17.2

apps/evm/go.sum

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -472,12 +472,8 @@ github.com/ethereum/go-bigmodexpfix v0.0.0-20250911101455-f9e208c548ab h1:rvv6MJ
472472
github.com/ethereum/go-bigmodexpfix v0.0.0-20250911101455-f9e208c548ab/go.mod h1:IuLm4IsPipXKF7CW5Lzf68PIbZ5yl7FFd74l/E0o9A8=
473473
github.com/ethereum/go-ethereum v1.17.2 h1:ag6geu0kn8Hv5FLKTpH+Hm2DHD+iuFtuqKxEuwUsDOI=
474474
github.com/ethereum/go-ethereum v1.17.2/go.mod h1:KHcRXfGOUfUmKg51IhQ0IowiqZ6PqZf08CMtk0g5K1o=
475-
github.com/evstack/ev-node v1.1.0-rc.1 h1:NtPuuDLqN2h4/edu5zxRlZAxmLkTG3ncXBO2PlCDvVs=
476-
github.com/evstack/ev-node v1.1.0-rc.1/go.mod h1:6rhWWzuyiqNn/erDmWCk1aLxUuQphyOGIRq56/smSyk=
477475
github.com/evstack/ev-node/core v1.0.0 h1:s0Tx0uWHme7SJn/ZNEtee4qNM8UO6PIxXnHhPbbKTz8=
478476
github.com/evstack/ev-node/core v1.0.0/go.mod h1:n2w/LhYQTPsi48m6lMj16YiIqsaQw6gxwjyJvR+B3sY=
479-
github.com/evstack/ev-node/execution/evm v1.0.0 h1:UTAdCrnPsLoGzSgsBx4Kv76jkXpMmHBIpNv3MxyzWPo=
480-
github.com/evstack/ev-node/execution/evm v1.0.0/go.mod h1:UrqkiepfTMiot6M8jnswgu3VU8SSucZpaMIHIl22/1A=
481477
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
482478
github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
483479
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=

apps/testapp/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ module github.com/evstack/ev-node/apps/testapp
22

33
go 1.25.7
44

5-
// replace github.com/evstack/ev-node => ../../.
5+
replace github.com/evstack/ev-node => ../../.
66

77
require (
88
github.com/evstack/ev-node v1.1.0-rc.1

apps/testapp/go.sum

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -432,8 +432,6 @@ github.com/envoyproxy/protoc-gen-validate v1.0.1/go.mod h1:0vj8bNkYbSTNS2PIyH87K
432432
github.com/envoyproxy/protoc-gen-validate v1.0.2/go.mod h1:GpiZQP3dDbg4JouG/NNS7QWXpgx6x8QiMKdmN72jogE=
433433
github.com/envoyproxy/protoc-gen-validate v1.3.0 h1:TvGH1wof4H33rezVKWSpqKz5NXWg5VPuZ0uONDT6eb4=
434434
github.com/envoyproxy/protoc-gen-validate v1.3.0/go.mod h1:HvYl7zwPa5mffgyeTUHA9zHIH36nmrm7oCbo4YKoSWA=
435-
github.com/evstack/ev-node v1.1.0-rc.1 h1:NtPuuDLqN2h4/edu5zxRlZAxmLkTG3ncXBO2PlCDvVs=
436-
github.com/evstack/ev-node v1.1.0-rc.1/go.mod h1:6rhWWzuyiqNn/erDmWCk1aLxUuQphyOGIRq56/smSyk=
437435
github.com/evstack/ev-node/core v1.0.0 h1:s0Tx0uWHme7SJn/ZNEtee4qNM8UO6PIxXnHhPbbKTz8=
438436
github.com/evstack/ev-node/core v1.0.0/go.mod h1:n2w/LhYQTPsi48m6lMj16YiIqsaQw6gxwjyJvR+B3sY=
439437
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=

pkg/p2p/client.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,12 @@ const (
3535

3636
// peerLimit defines limit of number of peers returned during active peer discovery.
3737
peerLimit = 60
38+
39+
// seedReconnectBackoff is the initial backoff when reconnecting to a disconnected seed peer.
40+
seedReconnectBackoff = 1 * time.Second
41+
42+
// seedReconnectMaxBackoff is the maximum backoff for seed peer reconnection attempts.
43+
seedReconnectMaxBackoff = 30 * time.Second
3844
)
3945

4046
// Client is a P2P client, implemented with libp2p.
@@ -56,6 +62,11 @@ type Client struct {
5662
ps *pubsub.PubSub
5763
started bool
5864

65+
ctx context.Context
66+
cancel context.CancelFunc
67+
68+
seedPeers []peer.AddrInfo
69+
5970
metrics *Metrics
6071
}
6172

@@ -140,6 +151,7 @@ func (c *Client) Start(ctx context.Context) error {
140151

141152
func (c *Client) startWithHost(ctx context.Context, h host.Host) error {
142153
c.host = h
154+
c.ctx, c.cancel = context.WithCancel(ctx)
143155
for _, a := range c.host.Addrs() {
144156
c.logger.Info().Str("address", fmt.Sprintf("%s/p2p/%s", a, c.host.ID())).Msg("listening on address")
145157
}
@@ -170,11 +182,17 @@ func (c *Client) startWithHost(ctx context.Context, h host.Host) error {
170182
}
171183

172184
c.started = true
185+
186+
c.host.Network().Notify(c.newDisconnectNotifee())
187+
173188
return nil
174189
}
175190

176191
// Close gently stops Client.
177192
func (c *Client) Close() error {
193+
if c.cancel != nil {
194+
c.cancel()
195+
}
178196
var err error
179197
if c.dht != nil {
180198
err = errors.Join(err, c.dht.Close())
@@ -245,6 +263,77 @@ func (c *Client) Peers() []PeerConnection {
245263
return res
246264
}
247265

266+
type disconnectNotifee struct {
267+
c *Client
268+
}
269+
270+
func (n disconnectNotifee) Connected(_ network.Network, conn network.Conn) {
271+
p := conn.RemotePeer()
272+
for _, sp := range n.c.seedPeers {
273+
if sp.ID == p {
274+
n.c.logger.Info().Str("peer", p.String()).Msg("connected to seed peer")
275+
return
276+
}
277+
}
278+
}
279+
func (n disconnectNotifee) OpenedStream(_ network.Network, _ network.Stream) {}
280+
func (n disconnectNotifee) ClosedStream(_ network.Network, _ network.Stream) {}
281+
func (n disconnectNotifee) Listen(_ network.Network, _ multiaddr.Multiaddr) {}
282+
func (n disconnectNotifee) ListenClose(_ network.Network, _ multiaddr.Multiaddr) {}
283+
284+
func (n disconnectNotifee) Disconnected(_ network.Network, conn network.Conn) {
285+
p := conn.RemotePeer()
286+
for _, sp := range n.c.seedPeers {
287+
if sp.ID == p {
288+
n.c.logger.Warn().Str("peer", p.String()).Msg("disconnected from seed peer, scheduling reconnect")
289+
go n.c.reconnectSeedPeer(sp)
290+
return
291+
}
292+
}
293+
}
294+
295+
func (c *Client) reconnectSeedPeer(sp peer.AddrInfo) {
296+
backoff := seedReconnectBackoff
297+
for {
298+
if c.ctx.Err() != nil {
299+
return
300+
}
301+
if c.isConnected(sp.ID) {
302+
return
303+
}
304+
305+
err := c.host.Connect(c.ctx, sp)
306+
if err == nil {
307+
c.logger.Info().Str("peer", sp.ID.String()).Msg("reconnected to seed peer")
308+
return
309+
}
310+
if c.ctx.Err() != nil {
311+
return
312+
}
313+
314+
c.logger.Debug().Str("peer", sp.ID.String()).Dur("backoff", backoff).Err(err).Msg("failed to reconnect to seed peer, retrying")
315+
select {
316+
case <-c.ctx.Done():
317+
return
318+
case <-time.After(backoff):
319+
}
320+
321+
backoff *= 2
322+
if backoff > seedReconnectMaxBackoff {
323+
backoff = seedReconnectMaxBackoff
324+
}
325+
}
326+
}
327+
328+
func (c *Client) newDisconnectNotifee() disconnectNotifee {
329+
return disconnectNotifee{c: c}
330+
}
331+
332+
// isConnected returns true if there is an active connection to the given peer.
333+
func (c *Client) isConnected(id peer.ID) bool {
334+
return c.host.Network().Connectedness(id) == network.Connected
335+
}
336+
248337
func (c *Client) listen() (host.Host, error) {
249338
maddr, err := multiaddr.NewMultiaddr(c.conf.ListenAddress)
250339
if err != nil {
@@ -256,6 +345,7 @@ func (c *Client) listen() (host.Host, error) {
256345

257346
func (c *Client) setupDHT(ctx context.Context) error {
258347
peers := c.parseAddrInfoList(c.conf.Peers)
348+
c.seedPeers = peers
259349
if len(peers) == 0 {
260350
c.logger.Info().Msg("no peers - only listening for connections")
261351
}

pkg/p2p/client_test.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,104 @@ func waitForCondition(timeout time.Duration, conditionFunc func() bool) error {
278278
}
279279
}
280280

281+
func TestSeedPeerReconnect(t *testing.T) {
282+
require := require.New(t)
283+
assert := assert.New(t)
284+
logger := zerolog.Nop()
285+
286+
mn := mocknet.New()
287+
defer mn.Close()
288+
289+
seedKey, err := key.GenerateNodeKey()
290+
require.NoError(err)
291+
seedAddr, err := getAddr(seedKey.PrivKey)
292+
require.NoError(err)
293+
seedHost, err := mn.AddPeer(seedKey.PrivKey, seedAddr)
294+
require.NoError(err)
295+
296+
clientKey, err := key.GenerateNodeKey()
297+
require.NoError(err)
298+
clientAddr, err := getAddr(clientKey.PrivKey)
299+
require.NoError(err)
300+
clientHost, err := mn.AddPeer(clientKey.PrivKey, clientAddr)
301+
require.NoError(err)
302+
303+
seedAddrStr := seedHost.Addrs()[0].String() + "/p2p/" + seedHost.ID().String()
304+
conf := config.P2PConfig{Peers: seedAddrStr}
305+
306+
client, err := NewClient(conf, clientKey.PrivKey, dssync.MutexWrap(datastore.NewMapDatastore()), "test-chain", logger, NopMetrics())
307+
require.NoError(err)
308+
require.NotNil(client)
309+
310+
err = mn.LinkAll()
311+
require.NoError(err)
312+
err = mn.ConnectAllButSelf()
313+
require.NoError(err)
314+
315+
ctx := t.Context()
316+
err = client.startWithHost(ctx, clientHost)
317+
require.NoError(err)
318+
defer client.Close()
319+
320+
err = waitForCondition(2*time.Second, func() bool {
321+
return client.isConnected(seedHost.ID())
322+
})
323+
require.NoError(err, "client should connect to seed peer on start")
324+
325+
conns := client.host.Network().ConnsToPeer(seedHost.ID())
326+
for _, conn := range conns {
327+
conn.Close()
328+
}
329+
client.host.Network().ClosePeer(seedHost.ID())
330+
331+
assert.False(client.isConnected(seedHost.ID()), "seed peer should be disconnected")
332+
333+
err = waitForCondition(5*time.Second, func() bool {
334+
return client.isConnected(seedHost.ID())
335+
})
336+
require.NoError(err, "client should reconnect to seed peer after disconnect")
337+
}
338+
339+
func TestSeedPeerReconnectStopsOnClose(t *testing.T) {
340+
require := require.New(t)
341+
342+
mn := mocknet.New()
343+
defer mn.Close()
344+
345+
seedKey, err := key.GenerateNodeKey()
346+
require.NoError(err)
347+
seedAddr, err := getAddr(seedKey.PrivKey)
348+
require.NoError(err)
349+
seedHost, err := mn.AddPeer(seedKey.PrivKey, seedAddr)
350+
require.NoError(err)
351+
352+
clientKey, err := key.GenerateNodeKey()
353+
require.NoError(err)
354+
clientAddr, err := getAddr(clientKey.PrivKey)
355+
require.NoError(err)
356+
clientHost, err := mn.AddPeer(clientKey.PrivKey, clientAddr)
357+
require.NoError(err)
358+
359+
seedAddrStr := seedHost.Addrs()[0].String() + "/p2p/" + seedHost.ID().String()
360+
conf := config.P2PConfig{Peers: seedAddrStr}
361+
362+
client, err := NewClient(conf, clientKey.PrivKey, dssync.MutexWrap(datastore.NewMapDatastore()), "test-chain", zerolog.Nop(), NopMetrics())
363+
require.NoError(err)
364+
365+
err = mn.LinkAll()
366+
require.NoError(err)
367+
err = mn.ConnectAllButSelf()
368+
require.NoError(err)
369+
370+
ctx := t.Context()
371+
err = client.startWithHost(ctx, clientHost)
372+
require.NoError(err)
373+
374+
require.NoError(client.Close())
375+
376+
require.Error(client.ctx.Err(), "client context should be cancelled after Close")
377+
}
378+
281379
func TestClientInfoMethods(t *testing.T) {
282380
require := require.New(t)
283381
assert := assert.New(t)

pkg/signer/aws/signer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ func (s *KmsSigner) Sign(ctx context.Context, message []byte) ([]byte, error) {
159159
timeout := s.opts.timeout()
160160
maxAttempts := maxRetries + 1
161161

162-
for attempt := 0; attempt < maxAttempts; attempt++ {
162+
for attempt := range maxAttempts {
163163
if err := ctx.Err(); err != nil {
164164
return nil, err
165165
}

pkg/signer/gcp/signer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ func (s *KmsSigner) Sign(ctx context.Context, message []byte) ([]byte, error) {
189189
timeout := s.opts.timeout()
190190
maxAttempts := maxRetries + 1
191191

192-
for attempt := 0; attempt < maxAttempts; attempt++ {
192+
for attempt := range maxAttempts {
193193
if err := ctx.Err(); err != nil {
194194
return nil, err
195195
}

0 commit comments

Comments
 (0)