Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
ad48778
feat: rpc calls optimisation
sbackend123 Mar 12, 2026
1da63cc
Merge branch 'master' into feat/rpc-calls-optimisation
sbackend123 Mar 12, 2026
68ecefb
fix: linter issues
sbackend123 Mar 12, 2026
04f6520
Merge branch 'master' into feat/rpc-calls-optimisation
sbackend123 Mar 13, 2026
1ff4f75
fix: enable cache metrics
sbackend123 Mar 16, 2026
095f4af
fix: review issues
sbackend123 Mar 17, 2026
e3b1aa5
fix: dead code
sbackend123 Mar 18, 2026
f8e3049
fix: update cache implementation
sbackend123 Mar 25, 2026
e6945ef
fix: wrapped backend test
sbackend123 Mar 25, 2026
a9fd888
fix: make linter happy
sbackend123 Mar 30, 2026
bed886c
chore: calculate blocks and sync with chain once in N seconds (#5422)
sbackend123 Apr 6, 2026
acabf46
Merge branch 'master' into feat/rpc-calls-optimisation
sbackend123 Apr 6, 2026
a1e3f83
fix: block sync interval flag
sbackend123 Apr 6, 2026
bf4a030
fix: review issues
sbackend123 Apr 8, 2026
54ee6af
fix: return dep
sbackend123 Apr 8, 2026
ba08feb
fix: remove extra metric test
sbackend123 Apr 8, 2026
2a11aa6
fix: simplify cache
sbackend123 Apr 12, 2026
e4befcf
fix: remove comments for linter
sbackend123 Apr 12, 2026
2e40f56
fix: metric naming
sbackend123 Apr 12, 2026
c7bd723
fix: remove expiresAt because there is no meaningful usage
sbackend123 Apr 14, 2026
76a6e1d
fix: prevent dividing to zero
sbackend123 Apr 14, 2026
ba744e9
fix: clean up
sbackend123 Apr 14, 2026
a858f42
fix: add env vars
sbackend123 Apr 15, 2026
433f3ff
fix: add new flag to all manifests
sbackend123 Apr 15, 2026
083da1f
Merge branch 'master' into feat/rpc-calls-optimisation
sbackend123 Apr 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/bee/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ const (
optionNameRedistributionAddress = "redistribution-address"
optionNameStakingAddress = "staking-address"
optionNameBlockTime = "block-time"
optionNameBlockSyncInterval = "block-sync-interval"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update the packaging folder with the new flag (in docker-compose,...)

optionWarmUpTime = "warmup-time"
optionNameMainNet = "mainnet"
optionNameRetrievalCaching = "cache-retrieval"
Expand Down Expand Up @@ -312,6 +313,7 @@ func (c *command) setAllFlags(cmd *cobra.Command) {
cmd.Flags().String(optionNameRedistributionAddress, "", "redistribution contract address")
cmd.Flags().String(optionNameStakingAddress, "", "staking contract address")
cmd.Flags().Uint64(optionNameBlockTime, 5, "chain block time")
cmd.Flags().Uint64(optionNameBlockSyncInterval, 10, "block number cache sync interval in blocks")
cmd.Flags().Duration(optionWarmUpTime, time.Minute*5, "maximum node warmup duration; proceeds when stable or after this time")
cmd.Flags().Bool(optionNameMainNet, true, "triggers connect to main net bootnodes.")
cmd.Flags().Bool(optionNameRetrievalCaching, true, "enable forwarded content caching")
Expand Down
1 change: 1 addition & 0 deletions cmd/bee/cmd/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func (c *command) initDeployCmd() error {
IdleTimeout: c.config.GetDuration(configKeyBlockchainRpcIdleTimeout),
Keepalive: c.config.GetDuration(configKeyBlockchainRpcKeepalive),
},
c.config.GetUint64(optionNameBlockSyncInterval),
)
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions cmd/bee/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ func buildBeeNode(ctx context.Context, c *command, cmd *cobra.Command, logger lo
BlockchainRpcKeepalive: c.config.GetDuration(configKeyBlockchainRpcKeepalive),
BlockProfile: c.config.GetBool(optionNamePProfBlock),
BlockTime: networkConfig.blockTime,
BlockSyncInterval: c.config.GetUint64(optionNameBlockSyncInterval),
BootnodeMode: bootNode,
Bootnodes: networkConfig.bootNodes,
CacheCapacity: c.config.GetUint64(optionNameCacheCapacity),
Expand Down
2 changes: 2 additions & 0 deletions packaging/bee.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
# api-addr: 127.0.0.1:1633
## chain block time
# block-time: "5"
## block number cache sync interval in blocks
# block-sync-interval: 10
## blockchain rpc configuration
# blockchain-rpc:
# endpoint: ""
Expand Down
1 change: 1 addition & 0 deletions packaging/docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ services:
- BEE_AUTOTLS_DOMAIN
- BEE_AUTOTLS_REGISTRATION_ENDPOINT
- BEE_BLOCK_TIME
- BEE_BLOCK_SYNC_INTERVAL
- BEE_BLOCKCHAIN_RPC_DIAL_TIMEOUT
- BEE_BLOCKCHAIN_RPC_ENDPOINT
- BEE_BLOCKCHAIN_RPC_IDLE_TIMEOUT
Expand Down
2 changes: 2 additions & 0 deletions packaging/docker/env
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# BEE_AUTOTLS_REGISTRATION_ENDPOINT=
## chain block time (default 5)
# BEE_BLOCK_TIME=5
## sets how many estimated blocks Bee can trust local block-number extrapolation before forcing a fresh HeaderByNumber(nil) RPC sync (default 10)
# BEE_BLOCK_SYNC_INTERVAL=10
## blockchain rpc TCP dial timeout (default 30s)
# BEE_BLOCKCHAIN_RPC_DIAL_TIMEOUT=30s
## rpc blockchain endpoint (default empty)
Expand Down
2 changes: 2 additions & 0 deletions packaging/homebrew-amd64/bee.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
# api-addr: 127.0.0.1:1633
## chain block time
# block-time: "5"
## block number cache sync interval in blocks
# block-sync-interval: 10
## blockchain rpc configuration
# blockchain-rpc:
# endpoint: ""
Expand Down
2 changes: 2 additions & 0 deletions packaging/homebrew-arm64/bee.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
# api-addr: 127.0.0.1:1633
## chain block time
# block-time: "5"
## block number cache sync interval in blocks
# block-sync-interval: 10
## blockchain rpc configuration
# blockchain-rpc:
# endpoint: ""
Expand Down
2 changes: 2 additions & 0 deletions packaging/scoop/bee.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
# api-addr: 127.0.0.1:1633
## chain block time
# block-time: "5"
## block number cache sync interval in blocks
# block-sync-interval: 10
## blockchain rpc configuration
# blockchain-rpc:
# endpoint: ""
Expand Down
3 changes: 2 additions & 1 deletion pkg/node/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func InitChain(
minimumGasTipCap uint64,
fallbackGasLimit uint64,
rpcCfg BlockchainRPCConfig,
blockSyncInterval uint64,
) (transaction.Backend, common.Address, int64, transaction.Monitor, transaction.Service, error) {
backend := backendnoop.New(chainID)

Expand Down Expand Up @@ -97,7 +98,7 @@ func InitChain(

logger.Info("connected to blockchain backend", "version", versionString)

backend = wrapped.NewBackend(ethclient.NewClient(rpcClient), minimumGasTipCap)
backend = wrapped.NewBackend(ethclient.NewClient(rpcClient), minimumGasTipCap, pollingInterval, blockSyncInterval)
}

backendChainID, err := backend.ChainID(ctx)
Expand Down
2 changes: 2 additions & 0 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ type Options struct {
BlockchainRpcKeepalive time.Duration
BlockProfile bool
BlockTime time.Duration
BlockSyncInterval uint64
BootnodeMode bool
Bootnodes []string
CacheCapacity uint64
Expand Down Expand Up @@ -424,6 +425,7 @@ func NewBee(
IdleTimeout: o.BlockchainRpcIdleTimeout,
Keepalive: o.BlockchainRpcKeepalive,
},
o.BlockSyncInterval,
)
if err != nil {
return nil, fmt.Errorf("init chain: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/storer/reserve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ func TestSubscribeBin(t *testing.T) {
t.Helper()
var (
chunksPerPO uint64 = 50
chunks = make([]swarm.Chunk, 0, int(chunksPerPO)*2)
chunks = make([]swarm.Chunk, 0, chunksPerPO*2)
putter = storer.ReservePutter()
)

Expand Down
25 changes: 12 additions & 13 deletions pkg/transaction/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,9 @@ func (t *transactionService) waitForAllPendingTx() error {
return err
}

pendingTxs = t.filterPendingTransactions(t.ctx, pendingTxs)
pending := t.filterPendingTransactions(t.ctx, pendingTxs)

for _, txHash := range pendingTxs {
for txHash := range pending {
t.waitForPendingTx(txHash)
}

Expand Down Expand Up @@ -392,18 +392,16 @@ func (t *transactionService) nextNonce(ctx context.Context) (uint64, error) {
return 0, err
}

pendingTxs = t.filterPendingTransactions(t.ctx, pendingTxs)
pending := t.filterPendingTransactions(t.ctx, pendingTxs)

// PendingNonceAt returns the nonce we should use, but we will
// compare this to our pending tx list, therefore the -1.
maxNonce := onchainNonce - 1
for _, txHash := range pendingTxs {
trx, _, err := t.backend.TransactionByHash(ctx, txHash)
if err != nil {
t.logger.Error(err, "pending transaction not found", "tx", txHash)
return 0, err
for txHash, trx := range pending {
if trx == nil {
t.logger.Warning("pending transaction data unavailable, relying on onchain nonce", "tx", txHash)
continue
}

maxNonce = max(maxNonce, trx.Nonce())
}

Expand Down Expand Up @@ -457,11 +455,12 @@ func (t *transactionService) PendingTransactions() ([]common.Hash, error) {

// filterPendingTransactions will filter supplied transaction hashes removing those that are not pending anymore.
// Removed transactions will be also removed from store.
func (t *transactionService) filterPendingTransactions(ctx context.Context, txHashes []common.Hash) []common.Hash {
result := make([]common.Hash, 0, len(txHashes))
// Returns the pending transactions keyed by hash.
func (t *transactionService) filterPendingTransactions(ctx context.Context, txHashes []common.Hash) map[common.Hash]*types.Transaction {
result := make(map[common.Hash]*types.Transaction, len(txHashes))

for _, txHash := range txHashes {
_, isPending, err := t.backend.TransactionByHash(ctx, txHash)
trx, isPending, err := t.backend.TransactionByHash(ctx, txHash)
// When error occurres consider transaction as pending (so this transaction won't be filtered out),
// unless it was not found
if err != nil {
Expand All @@ -475,7 +474,7 @@ func (t *transactionService) filterPendingTransactions(ctx context.Context, txHa
}

if isPending {
result = append(result, txHash)
result[txHash] = trx
} else {
err := t.store.Delete(pendingTransactionKey(txHash))
if err != nil {
Expand Down
84 changes: 84 additions & 0 deletions pkg/transaction/wrapped/cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright 2026 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package cache

import (
"context"
"sync"

"github.com/prometheus/client_golang/prometheus"
"resenje.org/singleflight"
)

type Loader[T any] func() (T, error)
type ReuseEvaluator[T any] func(value T) bool

type SingleFlightCache[T any] struct {
mu sync.RWMutex
value T

group singleflight.Group[string, any]
key string
metrics metricSet
}

func NewSingleFlightCache[T any](metricsPrefix string) *SingleFlightCache[T] {
return &SingleFlightCache[T]{
key: metricsPrefix,
metrics: newMetricSet(metricsPrefix),
}
}

func (c *SingleFlightCache[T]) Collectors() []prometheus.Collector {
return []prometheus.Collector{
c.metrics.Hits,
c.metrics.Misses,
c.metrics.Loads,
c.metrics.SharedLoads,
c.metrics.LoadErrors,
}
}

func (c *SingleFlightCache[T]) Set(value T) {
c.mu.Lock()
defer c.mu.Unlock()

c.value = value
}

func (c *SingleFlightCache[T]) PeekOrLoad(ctx context.Context, canReuse ReuseEvaluator[T], loader Loader[T]) (T, error) {
c.mu.RLock()
value := c.value
c.mu.RUnlock()

if canReuse(value) {
c.metrics.Hits.Inc()
return value, nil
}

c.metrics.Misses.Inc()

result, shared, err := c.group.Do(ctx, c.key, func(ctx context.Context) (any, error) {
c.metrics.Loads.Inc()
value, err := loader()
if err != nil {
c.metrics.LoadErrors.Inc()
return value, err
}
c.Set(value)
return value, nil
})

if shared {
c.metrics.SharedLoads.Inc()
}

if err != nil {
var zero T
return zero, err
}

return result.(T), nil
}
Loading
Loading