Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions filtermanager/filter_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ import (

"go.uber.org/zap"

"github.com/ozontech/seq-db/cache"
"github.com/ozontech/seq-db/frac"
"github.com/ozontech/seq-db/frac/processor"
"github.com/ozontech/seq-db/fracmanager"
"github.com/ozontech/seq-db/logger"
"github.com/ozontech/seq-db/node"
"github.com/ozontech/seq-db/parser"
"github.com/ozontech/seq-db/seq"
"github.com/ozontech/seq-db/util"
Expand All @@ -32,6 +34,8 @@ const (

const (
defaultMaintenanceInterval = 30 * time.Second
defaultCacheCleanInterval = 10 * time.Millisecond
defaultCacheGCDelay = 1 * time.Second
)

type MappingProvider interface {
Expand Down Expand Up @@ -60,6 +64,12 @@ type FilterManager struct {
maintenanceWG *sync.WaitGroup
maintenanceInterval time.Duration
maintenanceStop context.CancelFunc

cacheCleanInterval time.Duration
cacheGCDelay time.Duration

headersCache *cache.Cache[[]lidsBlockHeader]
headersCacheCleaner *cache.Cleaner
}

func New(
Expand All @@ -80,6 +90,8 @@ func New(
filtersMap[f.Hash()] = f
}

cacheCleaner := cache.NewCleaner(cfg.CacheSizeLimit, nil)

return &FilterManager{
ctx: ctx,
config: cfg,
Expand All @@ -89,6 +101,10 @@ func New(
mp: mp,
rateLimit: make(chan struct{}, workers),
maintenanceInterval: defaultMaintenanceInterval,
cacheCleanInterval: defaultCacheCleanInterval,
cacheGCDelay: defaultCacheGCDelay,
headersCache: cache.NewCache[[]lidsBlockHeader](cacheCleaner, nil),
headersCacheCleaner: cacheCleaner,
}
}

Expand All @@ -109,6 +125,8 @@ func (fm *FilterManager) Start(ctx context.Context, fracs fracmanager.List) {
fm.maintenanceStop = cancel
fm.startMaintenance(ctx)

go fm.cacheCleanLoop()

mapping := fm.mp.GetMapping()

go func() {
Expand All @@ -129,6 +147,32 @@ func (fm *FilterManager) Stop() {
fm.maintenanceWG.Wait()
}

func (fm *FilterManager) GetHideFlagIteratorByFrac(fracName string, minLID, maxLID uint32, reverse bool) (node.Node, error) {
fm.fracsMu.RLock()
defer fm.fracsMu.RUnlock()

fracFiles, has := fm.fracs[fracName]
if !has {
return &EmptyIterator{}, nil
}

iterators := make([]node.Node, 0, len(fracFiles))
for _, f := range fracFiles {
loader, err := newLoader(f, fm.headersCache)
if err != nil {
logger.Error("can't open filtered lids file", zap.String("path", f), zap.Error(err))
return nil, err
}
if reverse {
iterators = append(iterators, (*IteratorAsc)(NewIterator(loader, minLID, maxLID)))
} else {
iterators = append(iterators, (*IteratorDesc)(NewIterator(loader, minLID, maxLID)))
}
}

return NewNMergedIterators(iterators), nil
}

// RefreshFrac replaces frac's filter files with newly found results. Used after active frac is sealed.
func (fm *FilterManager) RefreshFrac(fraction frac.Fraction) {
fm.fracsMu.RLock()
Expand Down Expand Up @@ -372,6 +416,25 @@ func (fm *FilterManager) startMaintenance(ctx context.Context) {
})
}

func (fm *FilterManager) cacheCleanLoop() {
runs := 0
gcRunsCount := int(fm.cacheGCDelay / fm.cacheCleanInterval)

for {
runs++
fm.headersCacheCleaner.Cleanup(&cache.CleanStat{})
fm.headersCacheCleaner.Rotate()

if runs >= gcRunsCount {
runs = 0
fm.headersCacheCleaner.CleanEmptyGenerations()
fm.headersCacheCleaner.ReleaseBuckets()
}

time.Sleep(fm.cacheCleanInterval)
}
}

func (fm *FilterManager) checkDiskUsage() {
du := int64(0)

Expand Down
61 changes: 61 additions & 0 deletions filtermanager/iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package filtermanager

import "sort"

type Iterator struct {
loader *loader

minLID uint32
maxLID uint32

blockIndex int
tryNextBlock bool

lids []uint32
}

func NewIterator(
loader *loader,
minLID uint32,
maxLID uint32,
) *Iterator {
return &Iterator{
loader: loader,
minLID: minLID,
maxLID: maxLID,
tryNextBlock: true,
}
}

func (it *Iterator) hasLIDsInRange() bool {
if it.loader.headers[it.blockIndex].MinLID > it.maxLID {
return false
}
if it.loader.headers[it.blockIndex].MaxLID < it.minLID {
return false
}

return true
}

// narrowLIDsRange cuts LIDs between from and to. Returns new lids
func (it *Iterator) narrowLIDsRange(lids []uint32) []uint32 {
if len(lids) == 0 {
return lids
}

first := lids[0]
last := lids[len(lids)-1]

if it.minLID > first {
left := sort.Search(len(lids), func(i int) bool { return lids[i] >= it.minLID })
lids = lids[left:]
}

if it.maxLID <= last {
right := sort.Search(len(lids), func(i int) bool { return lids[i] > it.maxLID })
lids = lids[:right]
}

return lids
}
72 changes: 72 additions & 0 deletions filtermanager/iterator_asc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package filtermanager

import (
"go.uber.org/zap"

"github.com/ozontech/seq-db/logger"
"github.com/ozontech/seq-db/node"
)

type IteratorAsc Iterator

func (it *IteratorAsc) String() string {
return "HIDE_FLAG_ITERATOR_ASC"
}

func (it *IteratorAsc) Next() node.LID {
if it.loader.headers == nil {
headers, err := it.loader.getHeaders()
if err != nil {
logger.Panic("can't load filter file headers", zap.Error(err))
}
it.loader.headers = headers
it.blockIndex = len(it.loader.headers) - 1
}

for len(it.lids) == 0 {
if !it.tryNextBlock {
if err := it.loader.release(); err != nil {
logger.Panic("error closing loader", zap.Error(err))
}
return node.NullLID()
}

it.loadNextLIDsBlock()
it.lids = (*Iterator)(it).narrowLIDsRange(it.lids)
}

i := len(it.lids) - 1
lid := it.lids[i]
it.lids = it.lids[:i]
return node.NewAscLID(lid)
}

func (it *IteratorAsc) NextGeq(nextID node.LID) node.LID {
// TODO: implement NextGeq
lid := it.Next()
for lid.Less(nextID) {
lid = it.Next()
}
return lid
}

func (it *IteratorAsc) loadNextLIDsBlock() {
hasLIDsInRange := (*Iterator)(it).hasLIDsInRange()
if !hasLIDsInRange {
it.needTryNextBlock()
return
}

block, err := it.loader.loadBlock(it.blockIndex)
if err != nil {
logger.Panic("error loading LIDs block", zap.Error(err))
}

it.lids = block
it.needTryNextBlock()
}

func (it *IteratorAsc) needTryNextBlock() {
it.tryNextBlock = it.blockIndex > 0
it.blockIndex--
}
72 changes: 72 additions & 0 deletions filtermanager/iterator_asc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package filtermanager

import (
"math"
"os"
"path/filepath"
"slices"
"testing"

"github.com/stretchr/testify/require"

"github.com/ozontech/seq-db/cache"
"github.com/ozontech/seq-db/seq"
)

func TestIteratorAsc(t *testing.T) {
multipleBlocksSize := maxLIDsBlockLen*3 + 15
multipleBlocksLIDs := make([]seq.LID, 0, multipleBlocksSize)
for i := range multipleBlocksSize {
multipleBlocksLIDs = append(multipleBlocksLIDs, seq.LID(i+1))
}

reversed := make([]uint32, len(multipleBlocksLIDs))
copy(reversed, lidsToUint32s(multipleBlocksLIDs))
slices.Reverse(reversed)

type testCase struct {
title string
minLID, maxLID uint32
expected []uint32
}

tests := []testCase{
{
title: "ok_without_borders",
minLID: 0,
maxLID: math.MaxUint32,
expected: reversed,
},
{
title: "ok_with_borders",
minLID: maxLIDsBlockLen + 11,
maxLID: uint32(len(multipleBlocksLIDs) - (maxLIDsBlockLen + 11)),
expected: reversed[maxLIDsBlockLen+11 : len(multipleBlocksLIDs)-(maxLIDsBlockLen+10)],
},
{
title: "ok_out_of_borders",
minLID: uint32(len(multipleBlocksLIDs) + 100),
maxLID: uint32(len(multipleBlocksLIDs) + 200),
expected: []uint32{},
},
}

for _, tc := range tests {
t.Run(tc.title, func(t *testing.T) {
rawDocsFilter := marshalDocsFilter(nil, &DocsFilterBinIn{LIDs: multipleBlocksLIDs})
filePath := filepath.Join(t.TempDir(), "some.filter")
err := os.WriteFile(filePath, rawDocsFilter, 0o644)
require.NoError(t, err)

loader, err := newLoader(filePath, cache.NewCache[[]lidsBlockHeader](nil, nil))
require.NoError(t, err)

iterator := (*IteratorAsc)(NewIterator(loader, tc.minLID, tc.maxLID))
resLIDs := make([]uint32, 0, len(tc.expected))
for lid := iterator.Next(); !lid.IsNull(); lid = iterator.Next() {
resLIDs = append(resLIDs, lid.Unpack())
}
require.Equal(t, tc.expected, resLIDs)
})
}
}
Loading
Loading