Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions pkg/sharky/slots.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@ func (sl *slots) load() (err error) {
return err
}

// save persists the free slot bitvector on disk (without closing)
// save persists the free slot bitvector on disk (without closing).
// slots only ever grow (extend is the only mutation), so sl.data is always >=
// the previous file size. Seeking to 0 and overwriting is therefore always
// safe: no stale tail bytes can survive. Truncate(0) is intentionally absent
// because truncating before the write creates a crash window where the file is
// empty; removing it eliminates that vulnerability.
func (sl *slots) save() error {
if err := sl.file.Truncate(0); err != nil {
return err
}
if _, err := sl.file.Seek(0, 0); err != nil {
return err
}
Expand Down
41 changes: 41 additions & 0 deletions pkg/storer/internal/reserve/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,47 @@ func RemoveChunkMetaData(
)
}

// DeleteCorruptedChunkMetadata removes all reserve index entries for a chunk
// whose Sharky data was found to be corrupted during recovery. It is intended
// to be called from the recovery path, where only a storage.IndexStore (not a
// full transaction.Store) is available. If the chunk has no reserve metadata
// (e.g. it belongs to the upload store or cache), the function is a no-op.
func DeleteCorruptedChunkMetadata(store storage.IndexStore, baseAddr swarm.Address, addr swarm.Address) error {
stamp, err := chunkstamp.Load(store, reserveScope, addr)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
return nil
}
return fmt.Errorf("load chunkstamp: %w", err)
}

stampHash, err := stamp.Hash()
if err != nil {
return fmt.Errorf("compute stamp hash: %w", err)
}

bin := swarm.Proximity(baseAddr.Bytes(), addr.Bytes())
batchRadiusItem := &BatchRadiusItem{
Bin: bin,
BatchID: stamp.BatchID(),
Address: addr,
StampHash: stampHash,
}
if err := store.Get(batchRadiusItem); err != nil {
if errors.Is(err, storage.ErrNotFound) {
return nil
}
return fmt.Errorf("get batch radius item: %w", err)
}

return errors.Join(
stampindex.Delete(store, reserveScope, stamp),
chunkstamp.DeleteWithStamp(store, reserveScope, addr, stamp),
store.Delete(batchRadiusItem),
store.Delete(&ChunkBinItem{Bin: bin, BinID: batchRadiusItem.BinID}),
)
}

func (r *Reserve) IterateBin(bin uint8, startBinID uint64, cb func(swarm.Address, uint64, []byte, []byte) (bool, error)) error {
err := r.st.IndexStore().Iterate(storage.Query{
Factory: func() storage.Item { return &ChunkBinItem{} },
Expand Down
9 changes: 9 additions & 0 deletions pkg/storer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type metrics struct {
ReserveSampleDuration *prometheus.HistogramVec
ReserveSampleRunSummary *prometheus.GaugeVec
ReserveSampleLastRunTimestamp prometheus.Gauge
RecoveryPrunedChunkCount prometheus.Counter
}

// newMetrics is a convenient constructor for creating new metrics.
Expand Down Expand Up @@ -192,6 +193,14 @@ func newMetrics() metrics {
Help: "Unix timestamp of the last ReserveSample run completion.",
},
),
RecoveryPrunedChunkCount: prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "recovery_pruned_chunk_count",
Help: "Number of corrupted chunks pruned from the index during sharky recovery.",
},
),
}
}

Expand Down
79 changes: 63 additions & 16 deletions pkg/storer/recover.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,27 @@ package storer
import (
"context"
"errors"
"fmt"
"io/fs"
"os"
"path/filepath"
"time"

"github.com/ethersphere/bee/v2/pkg/cac"
"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/sharky"
"github.com/ethersphere/bee/v2/pkg/soc"
storage "github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storer/internal/chunkstore"
"github.com/ethersphere/bee/v2/pkg/storer/internal/reserve"
"github.com/ethersphere/bee/v2/pkg/swarm"
)

const (
sharkyDirtyFileName = ".DIRTY"
)

func sharkyRecovery(ctx context.Context, sharkyBasePath string, store storage.Store, opts *Options) (closerFn, error) {
func sharkyRecovery(ctx context.Context, sharkyBasePath string, store storage.Store, opts *Options) (closerFn, int, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

Expand All @@ -32,7 +37,7 @@ func sharkyRecovery(ctx context.Context, sharkyBasePath string, store storage.St
closer := func() error { return os.Remove(dirtyFilePath) }

if _, err := os.Stat(dirtyFilePath); errors.Is(err, fs.ErrNotExist) {
return closer, os.WriteFile(dirtyFilePath, []byte{}, 0644)
return closer, 0, os.WriteFile(dirtyFilePath, []byte{}, 0644)
}

logger.Info("localstore sharky .DIRTY file exists: starting recovery due to previous dirty exit")
Expand All @@ -42,7 +47,7 @@ func sharkyRecovery(ctx context.Context, sharkyBasePath string, store storage.St

sharkyRecover, err := sharky.NewRecovery(sharkyBasePath, sharkyNoOfShards, swarm.SocMaxChunkSize)
if err != nil {
return closer, err
return closer, 0, err
}

defer func() {
Expand All @@ -51,29 +56,71 @@ func sharkyRecovery(ctx context.Context, sharkyBasePath string, store storage.St
}
}()

c := chunkstore.IterateLocations(ctx, store)

if err := addLocations(c, sharkyRecover); err != nil {
return closer, err
pruned, err := validateAndAddLocations(ctx, store, sharkyRecover, opts.Address, logger)
if err != nil {
return closer, 0, err
}

return closer, nil
return closer, pruned, nil
}

func addLocations(locationResultC <-chan chunkstore.LocationResult, sharkyRecover *sharky.Recovery) error {
for res := range locationResultC {
if res.Err != nil {
return res.Err
// validateAndAddLocations iterates every chunk index entry, reads its data from
// Sharky, and validates the content hash. Valid chunks are registered with the
// recovery so their slots are preserved. Corrupted entries (unreadable data or
// hash mismatch) are logged, excluded from the recovery bitmap, and deleted from
// the index store — including all associated reserve metadata (BatchRadiusItem,
// ChunkBinItem, stampindex, chunkstamp) — so the node starts clean without
// serving invalid data and with correct reserve size accounting.
// If a corrupted index entry cannot be deleted, an error is returned and the
// node startup is aborted to prevent serving or operating on corrupt state.
// It returns the number of corrupted entries that were pruned.
func validateAndAddLocations(ctx context.Context, store storage.Store, sharkyRecover *sharky.Recovery, baseAddr swarm.Address, logger log.Logger) (int, error) {
var corrupted []*chunkstore.RetrievalIndexItem

buf := make([]byte, swarm.SocMaxChunkSize)

err := chunkstore.IterateItems(store, func(item *chunkstore.RetrievalIndexItem) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

if err := sharkyRecover.Read(ctx, item.Location, buf[:item.Location.Length]); err != nil {
Comment thread
martinconic marked this conversation as resolved.
logger.Warning("recovery: unreadable chunk, marking corrupted", "address", item.Address, "err", err)
corrupted = append(corrupted, item)
return nil
}

if err := sharkyRecover.Add(res.Location); err != nil {
return err
ch := swarm.NewChunk(item.Address, buf[:item.Location.Length])
if !cac.Valid(ch) && !soc.Valid(ch) {
logger.Warning("recovery: invalid chunk hash, marking corrupted", "address", item.Address)
corrupted = append(corrupted, item)
return nil
}

return sharkyRecover.Add(item.Location)
})
if err != nil {
return 0, fmt.Errorf("recovery: failed iterating chunk index: %w", err)
}

if err := sharkyRecover.Save(); err != nil {
return err
return 0, fmt.Errorf("recovery: failed saving sharky recovery state: %w", err)
}

for _, item := range corrupted {
if err := reserve.DeleteCorruptedChunkMetadata(store, baseAddr, item.Address); err != nil {
return 0, fmt.Errorf("recovery: failed deleting corrupted chunk metadata %s: %w", item.Address, err)
}
if err := store.Delete(item); err != nil {
return 0, fmt.Errorf("recovery: failed deleting corrupted chunk index %s: %w", item.Address, err)
}
}

if len(corrupted) > 0 {
Comment thread
martinconic marked this conversation as resolved.
logger.Warning("recovery: removed corrupted chunk index entries", "count", len(corrupted))
}

return nil
return len(corrupted), nil
}
Loading
Loading