From 72b0954b9a1946a04daeee4a28471efc86778acc Mon Sep 17 00:00:00 2001 From: Gabe Ruttner Date: Sat, 16 May 2026 07:43:52 -0400 Subject: [PATCH] vibe: fun with partitions --- .gitignore | 1 + README.md | 24 +- internal/partitions/manager.go | 263 ++++++++++ internal/partitions/names.go | 44 ++ internal/sizing/sizing.go | 118 +++++ outbox.go | 337 +++++++++++-- outbox_bench_test.go | 85 +++- outbox_e2e_test.go | 292 ++++++++++- sqlc/copyfrom.go | 3 +- .../20260516120000_partitioned_outbox.sql | 233 +++++++++ sqlc/models.go | 29 ++ sqlc/queries.sql | 177 ++++++- sqlc/queries.sql.go | 461 +++++++++++++++++- sqlc/schema.sql | 61 ++- 14 files changed, 2035 insertions(+), 93 deletions(-) create mode 100644 internal/partitions/manager.go create mode 100644 internal/partitions/names.go create mode 100644 internal/sizing/sizing.go create mode 100644 sqlc/migrations/20260516120000_partitioned_outbox.sql diff --git a/.gitignore b/.gitignore index b73d6fe..aca27b6 100644 --- a/.gitignore +++ b/.gitignore @@ -39,3 +39,4 @@ Thumbs.db # Environment files .env .env.local +.cache \ No newline at end of file diff --git a/README.md b/README.md index e82530b..9d5efba 100644 --- a/README.md +++ b/README.md @@ -43,7 +43,7 @@ if err != nil { ## Schema -By default, `NewOutbox` runs migrations and creates an outbox table in the schema `outbox.messages`. This can be overwritten via: +By default, `NewOutbox` runs migrations and creates partitioned outbox tables in the `outbox` schema. `messages` is partitioned by topic and then by per-topic ID ranges; flushed rows are append-only until a sealed, fully-acked partition can be dropped. The schema can be overwritten via: ```go outbox, err := pgoutbox.NewOutbox(pool, pgoutbox.WithSchema("my_schema")) @@ -62,6 +62,17 @@ if err := pgoutbox.Migrate(ctx, pool, pgoutbox.WithSchema("my_schema")); err != } ``` +Partition sizing defaults to 100,000 rows per range partition and two future partitions per topic. Both values are persisted in `topic_meta` per topic and can be configured for newly-created topics: + +```go +outbox, err := pgoutbox.NewOutbox(pool, + pgoutbox.WithDefaultPartitionSize(500_000), + pgoutbox.WithDefaultPartitionCount(4), +) +``` + +The outbox also keeps an approximate sequence-backed fill counter for each topic. It is used only to create future partitions early and to guide sizing; delivery safety is governed by the transactional per-topic ID cursor and `acked_id`. + ## Multiple topics and flushers It's easy to configure multiple destinations using topics registered for each flusher: @@ -94,19 +105,20 @@ outbox.ProcessMessages(ctx, "shipments") ## Benchmarks -You can run benchmarks locally; for example, to write and flush 100k messages, you can run: +You can run benchmarks locally; for example, to write and flush 100k messages without also running the test suite, you can run: ``` -go test -bench=. -benchtime=100000x +go test -run '^$' -bench=. -benchtime=100000x ``` -On a local Macbook with an M3 Max core, this results in `8492 msgs/sec`: +On a local Macbook with an M3 Max core, older non-partitioned versions measured around `8492 msgs/sec`; current benchmark output includes both default partitioning and frequent-rollover partitioning cases: ``` -$ go test -bench=. -benchtime=100000x +$ go test -run '^$' -bench=. -benchtime=100000x goos: darwin goarch: arm64 pkg: github.com/hatchet-dev/pgoutbox cpu: Apple M3 Max -BenchmarkOutbox_WriteAndPublishThroughput-14 100000 117757 ns/op 8492 msgs/sec +BenchmarkOutbox_WriteAndPublishThroughput/DefaultPartitions-14 +BenchmarkOutbox_WriteAndPublishThroughput/FrequentPartitionRollover-14 ``` diff --git a/internal/partitions/manager.go b/internal/partitions/manager.go new file mode 100644 index 0000000..f515ec2 --- /dev/null +++ b/internal/partitions/manager.go @@ -0,0 +1,263 @@ +package partitions + +import ( + "context" + "errors" + "fmt" + "strings" + + "github.com/hatchet-dev/pgoutbox/sqlc" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" +) + +type Manager struct { + schema string +} + +type topicPartition struct { + index int + from int64 + to int64 +} + +func NewManager(schema string) *Manager { + return &Manager{schema: schema} +} + +func (m *Manager) schemaIdent() string { + return pgx.Identifier{m.schema}.Sanitize() +} + +func (m *Manager) qualified(rel string) string { + return m.schemaIdent() + "." + pgx.Identifier{rel}.Sanitize() +} + +func (m *Manager) LockTopic(ctx context.Context, db sqlc.DBTX, topic string) error { + _, err := db.Exec(ctx, "SELECT pg_advisory_xact_lock(hashtextextended($1, 0))", "pgoutbox:"+m.schema+":"+topic) + return err +} + +func (m *Manager) EnsureHorizon( + ctx context.Context, + db sqlc.DBTX, + q *sqlc.Queries, + topic string, + slug string, + partitionSize int64, + partitionCount int, + startID int64, + endID int64, + fillHigh int64, +) error { + highID := endID + if fillHigh > highID { + highID = fillHigh + } + + if err := m.ensureListPartition(ctx, db, topic, slug); err != nil { + return err + } + + parts, err := m.loadTopicPartitions(ctx, db, topic) + if err != nil { + return err + } + + if len(parts) == 0 { + parts = append(parts, topicPartition{index: 0, from: 1, to: 1 + partitionSize}) + if err := m.createRangePartition(ctx, db, q, topic, slug, parts[0], partitionSize, "active"); err != nil { + return err + } + } + + highIdx := partitionIndexContaining(parts, highID) + for highIdx == -1 || parts[len(parts)-1].index < highIdx+partitionCount { + last := parts[len(parts)-1] + next := topicPartition{ + index: last.index + 1, + from: last.to, + to: last.to + partitionSize, + } + if err := m.createRangePartition(ctx, db, q, topic, slug, next, partitionSize, "future"); err != nil { + return err + } + parts = append(parts, next) + highIdx = partitionIndexContaining(parts, highID) + } + + endIdx := partitionIndexContaining(parts, endID) + + for _, part := range parts { + if part.to <= startID || part.from > endID { + continue + } + highWater := endID + if highWater >= part.to { + highWater = part.to - 1 + } + if err := q.UpdateTopicPartitionHighWater(ctx, db, sqlc.UpdateTopicPartitionHighWaterParams{ + Topic: topic, + PartitionIndex: int32(part.index), + HighWaterID: highWater, + }); err != nil { + return fmt.Errorf("update high water for partition %d: %w", part.index, err) + } + } + + if endIdx > 0 { + if err := q.SealTopicPartitionsUpTo(ctx, db, sqlc.SealTopicPartitionsUpToParams{ + Topic: topic, + PartitionIndex: int32(endIdx), + }); err != nil { + return fmt.Errorf("seal prior partitions: %w", err) + } + } + + return nil +} + +func (m *Manager) createRangePartition( + ctx context.Context, + db sqlc.DBTX, + q *sqlc.Queries, + topic string, + slug string, + part topicPartition, + partitionSize int64, + status string, +) error { + listRel := ListPartitionRelname(slug) + rangeRel := RangePartitionRelname(slug, part.index) + + if err := m.ensureRangePartition(ctx, db, listRel, rangeRel, part.from, part.to); err != nil { + return err + } + + if err := q.UpsertTopicPartition(ctx, db, sqlc.UpsertTopicPartitionParams{ + Topic: topic, + PartitionIndex: int32(part.index), + Relname: rangeRel, + IDFrom: part.from, + IDTo: part.to, + PartitionSize: partitionSize, + Status: status, + }); err != nil { + return fmt.Errorf("record topic partition %d: %w", part.index, err) + } + + return nil +} + +func (m *Manager) loadTopicPartitions(ctx context.Context, db sqlc.DBTX, topic string) ([]topicPartition, error) { + sql := fmt.Sprintf( + "SELECT partition_index, id_from, id_to FROM %s.topic_partitions WHERE topic = $1 AND status <> 'dropped' ORDER BY partition_index ASC", + m.schemaIdent(), + ) + rows, err := db.Query(ctx, sql, topic) + if err != nil { + return nil, err + } + defer rows.Close() + + var parts []topicPartition + for rows.Next() { + var part topicPartition + if err := rows.Scan(&part.index, &part.from, &part.to); err != nil { + return nil, err + } + parts = append(parts, part) + } + if err := rows.Err(); err != nil { + return nil, err + } + return parts, nil +} + +func partitionIndexContaining(parts []topicPartition, id int64) int { + if id <= 0 && len(parts) > 0 { + return parts[0].index + } + for _, part := range parts { + if part.from <= id && id < part.to { + return part.index + } + } + return -1 +} + +func (m *Manager) ensureListPartition(ctx context.Context, db sqlc.DBTX, topic, slug string) error { + listRel := ListPartitionRelname(slug) + sql := fmt.Sprintf( + "CREATE TABLE IF NOT EXISTS %s PARTITION OF %s.messages FOR VALUES IN (%s) PARTITION BY RANGE (id)", + m.qualified(listRel), + m.schemaIdent(), + quoteLiteral(topic), + ) + + _, err := db.Exec(ctx, sql) + return ignoreDuplicateTable(err) +} + +func (m *Manager) ensureRangePartition( + ctx context.Context, + db sqlc.DBTX, + listRel, rangeRel string, + from, to int64, +) error { + sql := fmt.Sprintf( + "CREATE TABLE IF NOT EXISTS %s PARTITION OF %s FOR VALUES FROM (%d) TO (%d)", + m.qualified(rangeRel), + m.qualified(listRel), + from, + to, + ) + _, err := db.Exec(ctx, sql) + return ignoreDuplicateTable(err) +} + +func (m *Manager) EnsureFillSequence(ctx context.Context, db sqlc.DBTX, seqName string) error { + sql := fmt.Sprintf("CREATE SEQUENCE IF NOT EXISTS %s CACHE 100", m.qualified(seqName)) + _, err := db.Exec(ctx, sql) + return ignoreDuplicateTable(err) +} + +func (m *Manager) AdvanceFillSequence(ctx context.Context, db sqlc.DBTX, seqName string, count int) (int64, error) { + if count <= 0 { + return 0, nil + } + + var value int64 + seq := quoteLiteral(m.qualified(seqName)) + sql := fmt.Sprintf("SELECT nextval(%s::regclass) FROM generate_series(1, %d) ORDER BY 1 DESC LIMIT 1", seq, count) + if err := db.QueryRow(ctx, sql).Scan(&value); err != nil { + return 0, err + } + return value, nil +} + +func (m *Manager) DropPartition(ctx context.Context, db sqlc.DBTX, relname string) error { + _, err := db.Exec(ctx, "DROP TABLE IF EXISTS "+m.qualified(relname)) + return err +} + +func ignoreDuplicateTable(err error) error { + if err == nil { + return nil + } + var pgErr *pgconn.PgError + if errors.As(err, &pgErr) { + switch pgErr.Code { + case "42P07", "23505": + return nil + } + } + if strings.Contains(err.Error(), "already exists") { + return nil + } + return err +} + +func quoteLiteral(s string) string { + return "'" + strings.ReplaceAll(s, "'", "''") + "'" +} diff --git a/internal/partitions/names.go b/internal/partitions/names.go new file mode 100644 index 0000000..390d99d --- /dev/null +++ b/internal/partitions/names.go @@ -0,0 +1,44 @@ +package partitions + +import ( + "crypto/md5" + "encoding/hex" + "fmt" +) + +func TopicSlug(topic string) string { + sum := md5.Sum([]byte(topic)) + return hex.EncodeToString(sum[:])[:16] +} + +func FillSeqName(slug string) string { + return "fill_seq_" + slug +} + +func ListPartitionRelname(slug string) string { + return "messages_t_" + slug +} + +func RangePartitionRelname(slug string, index int) string { + return fmt.Sprintf("messages_t_%s_p%d", slug, index) +} + +func PartitionBounds(index int, size int64) (from, to int64) { + from = int64(index)*size + 1 + to = int64(index+1)*size + 1 + return from, to +} + +func PartitionIndexForID(id, size int64) int { + if id <= 0 { + return 0 + } + return int((id - 1) / size) +} + +func MaxPartitionIndex(id, size int64) int { + if id <= 0 { + return 0 + } + return int((id - 1) / size) +} diff --git a/internal/sizing/sizing.go b/internal/sizing/sizing.go new file mode 100644 index 0000000..bcdef00 --- /dev/null +++ b/internal/sizing/sizing.go @@ -0,0 +1,118 @@ +package sizing + +import ( + "time" + + "github.com/hatchet-dev/pgoutbox/sqlc" +) + +const ( + MinPartitionSize int64 = 1_000 + MaxPartitionSize int64 = 10_000_000 + MinPartitionCount int = 1 + MaxPartitionCount int = 16 + + resizeWriteThreshold = 50_000 + resizeAckThreshold = 50_000 +) + +type Defaults struct { + PartitionSize int64 + PartitionCount int +} + +func MaybeResize(meta *sqlc.TopicMetum, now time.Time) (size int64, count int, changed bool) { + size = meta.PartitionSize + count = int(meta.PartitionCount) + + if meta.WritesSinceResize < resizeWriteThreshold && meta.AcksSinceResize < resizeAckThreshold { + return size, count, false + } + + var writeRate, ackRate float64 + if meta.LastWriteAt.Valid { + elapsed := now.Sub(meta.LastWriteAt.Time) + if elapsed > 0 { + writeRate = float64(meta.WritesSinceResize) / elapsed.Seconds() + } + } + if meta.LastProcessAt.Valid { + elapsed := now.Sub(meta.LastProcessAt.Time) + if elapsed > 0 { + ackRate = float64(meta.AcksSinceResize) / elapsed.Seconds() + } + } + + pressure := writeRate + if ackRate > pressure { + pressure = ackRate + } + + newSize := size + newCount := count + + switch { + case pressure > 5000: + newSize = grow(size, 2) + newCount = growInt(count, 1) + case pressure > 1000: + newSize = grow(size, 1.5) + case pressure < 50 && size > MinPartitionSize*2: + newSize = shrink(size, 2) + if newCount > MinPartitionCount { + newCount-- + } + } + + newSize = clamp(newSize, MinPartitionSize, MaxPartitionSize) + newCount = clampInt(newCount, MinPartitionCount, MaxPartitionCount) + + if newSize == size && newCount == count { + return size, count, false + } + + return newSize, newCount, true +} + +func grow(v int64, factor float64) int64 { + next := int64(float64(v) * factor) + if next <= v { + next = v + MinPartitionSize + } + return next +} + +func shrink(v int64, divisor int64) int64 { + if divisor <= 0 { + return v + } + next := v / divisor + if next < MinPartitionSize { + return MinPartitionSize + } + return next +} + +func growInt(v, delta int) int { + return v + delta +} + +func clamp(v, min, max int64) int64 { + if v < min { + return min + } + if v > max { + return max + } + return v +} + +func clampInt(v, min, max int) int { + if v < min { + return min + } + if v > max { + return max + } + return v +} diff --git a/outbox.go b/outbox.go index 2d21315..bb5b3c3 100644 --- a/outbox.go +++ b/outbox.go @@ -2,11 +2,17 @@ package pgoutbox import ( "context" + "crypto/rand" + "encoding/hex" + "errors" "fmt" "math" "sync" + "time" "github.com/hatchet-dev/pgoutbox/internal/dbwrap" + "github.com/hatchet-dev/pgoutbox/internal/partitions" + "github.com/hatchet-dev/pgoutbox/internal/sizing" "github.com/hatchet-dev/pgoutbox/sqlc" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" @@ -26,34 +32,45 @@ type Outbox interface { AddMessages(ctx context.Context, tx pgx.Tx, topic string, msgs []MessageOpts) error - // ProcessMessages grabs a batch of messages for the given topic, flushes them using the registered Flusher for that - // topic, and deletes them from the outbox if the flush is successful. + // ProcessMessages grabs a batch of messages for the given topic, flushes + // them using the registered Flusher, and advances the per-topic ack cursor + // if the flush succeeds. Message rows are append-only and are removed only + // when a fully-acked sealed partition is dropped. ProcessMessages(ctx context.Context, topic string) ([]*sqlc.Message, error) } // defaultBatchSize is the number of messages ProcessMessages will pull per // call when the caller has not overridden it via WithBatchSize. const defaultBatchSize = 100 +const defaultPartitionSize int64 = 100_000 +const defaultPartitionCount = 2 type outboxImplOpts struct { - schema string - batchSize int32 - autoMigrate bool + schema string + batchSize int32 + autoMigrate bool + partitionSize int64 + partitionCount int } func defaultOpts() *outboxImplOpts { return &outboxImplOpts{ - schema: "outbox", - batchSize: defaultBatchSize, - autoMigrate: true, + schema: "outbox", + batchSize: defaultBatchSize, + autoMigrate: true, + partitionSize: defaultPartitionSize, + partitionCount: defaultPartitionCount, } } type outboxImpl struct { - queries *sqlc.Queries - pool *pgxpool.Pool - schema string - batchSize int32 + queries *sqlc.Queries + pool *pgxpool.Pool + schema string + batchSize int32 + partitionSize int64 + partitionCount int + partitionManager *partitions.Manager flushers sync.Map } @@ -88,6 +105,28 @@ func WithAutoMigrate(enabled bool) OutboxOpt { } } +// WithDefaultPartitionSize sets the default target number of rows per +// partition segment for newly-created topics. +func WithDefaultPartitionSize(z int64) OutboxOpt { + return func(opts *outboxImplOpts) { + if z <= 0 { + return + } + opts.partitionSize = z + } +} + +// WithDefaultPartitionCount sets the number of future partitions maintained +// ahead of the current write position for newly-created topics. +func WithDefaultPartitionCount(n int) OutboxOpt { + return func(opts *outboxImplOpts) { + if n <= 0 || n > math.MaxInt32 { + return + } + opts.partitionCount = n + } +} + func NewOutbox(pool *pgxpool.Pool, fs ...OutboxOpt) (Outbox, error) { opts := defaultOpts() @@ -108,10 +147,13 @@ func NewOutbox(pool *pgxpool.Pool, fs ...OutboxOpt) (Outbox, error) { } return &outboxImpl{ - queries: queries, - pool: pool, - schema: opts.schema, - batchSize: opts.batchSize, + queries: queries, + pool: pool, + schema: opts.schema, + batchSize: opts.batchSize, + partitionSize: opts.partitionSize, + partitionCount: opts.partitionCount, + partitionManager: partitions.NewManager(opts.schema), }, nil } @@ -144,19 +186,65 @@ func (o *outboxImpl) AddMessages(ctx context.Context, tx pgx.Tx, topic string, m return nil } - params := make([]sqlc.InsertMessageParams, len(msgs)) - for i, msg := range msgs { if len(msg.Payload) == 0 { return fmt.Errorf("payload for topic %q at index %d must not be empty", topic, i) } + } + + wrapped := dbwrap.New(tx, o.schema) + slug := partitions.TopicSlug(topic) + seqName := partitions.FillSeqName(slug) + + allocated, err := o.allocateTopicIDs(ctx, wrapped, topic, len(msgs)) + if err != nil { + return err + } + + fillHigh, err := o.partitionManager.AdvanceFillSequence(ctx, wrapped, seqName, len(msgs)) + if err != nil { + return fmt.Errorf("could not advance fill sequence for topic %q: %w", topic, err) + } + + partitionSize := allocated.PartitionSize + partitionCount := int(allocated.PartitionCount) + if nextSize, nextCount, changed := sizing.MaybeResize(topicMetaFromAllocation(allocated), time.Now()); changed { + if err := o.queries.UpdateTopicSizing(ctx, wrapped, sqlc.UpdateTopicSizingParams{ + Topic: topic, + PartitionSize: nextSize, + PartitionCount: int32(nextCount), + }); err != nil { + return fmt.Errorf("could not update partition sizing for topic %q: %w", topic, err) + } + partitionSize = nextSize + partitionCount = nextCount + } + + if err := o.partitionManager.EnsureHorizon( + ctx, + wrapped, + o.queries, + topic, + slug, + partitionSize, + partitionCount, + allocated.StartID, + allocated.EndID, + fillHigh, + ); err != nil { + return fmt.Errorf("could not ensure partitions for topic %q: %w", topic, err) + } + + params := make([]sqlc.InsertMessageParams, len(msgs)) + for i, msg := range msgs { params[i] = sqlc.InsertMessageParams{ + ID: allocated.StartID + int64(i), Topic: topic, Payload: msg.Payload, } } - _, err := o.queries.InsertMessage(ctx, dbwrap.New(tx, o.schema), params) + _, err = o.queries.InsertMessage(ctx, wrapped, params) if err != nil { return fmt.Errorf("could not insert messages for topic %q: %w", topic, err) @@ -172,6 +260,11 @@ func (o *outboxImpl) ProcessMessages(ctx context.Context, topic string) ([]*sqlc return nil, fmt.Errorf("no flusher registered for topic %q", topic) } + holder, err := newLeaseHolder() + if err != nil { + return nil, err + } + tx, err := o.pool.Begin(ctx) if err != nil { @@ -182,8 +275,27 @@ func (o *outboxImpl) ProcessMessages(ctx context.Context, topic string) ([]*sqlc wrapped := dbwrap.New(tx, o.schema) - msgs, err := o.queries.AcquireMessagesByTopic(ctx, wrapped, sqlc.AcquireMessagesByTopicParams{ + meta, err := o.queries.TryAcquireTopicLease(ctx, wrapped, sqlc.TryAcquireTopicLeaseParams{ Topic: topic, + Holder: pgtype.Text{ + String: holder, + Valid: true, + }, + }) + if errors.Is(err, pgx.ErrNoRows) { + if err := tx.Commit(ctx); err != nil { + return nil, fmt.Errorf("could not commit skipped processing transaction for topic %q: %w", topic, err) + } + return nil, nil + } + + if err != nil { + return nil, fmt.Errorf("could not acquire topic lease for %q: %w", topic, err) + } + + msgs, err := o.queries.ListMessagesAfterAcked(ctx, wrapped, sqlc.ListMessagesAfterAckedParams{ + Topic: topic, + ID: meta.AckedID, Limit: pgtype.Int4{ Int32: o.batchSize, Valid: true, @@ -191,11 +303,13 @@ func (o *outboxImpl) ProcessMessages(ctx context.Context, topic string) ([]*sqlc }) if err != nil { - return nil, fmt.Errorf("could not acquire messages for topic %q: %w", topic, err) + return nil, fmt.Errorf("could not list messages for topic %q: %w", topic, err) } if len(msgs) == 0 { - // just commit to avoid rollback monitoring + if err := o.releaseLease(ctx, wrapped, topic, holder); err != nil { + return nil, err + } if err := tx.Commit(ctx); err != nil { return nil, fmt.Errorf("could not commit transaction for processing messages for topic %q: %w", topic, err) } @@ -203,32 +317,185 @@ func (o *outboxImpl) ProcessMessages(ctx context.Context, topic string) ([]*sqlc return nil, nil } - // call the flusher - err = f.Flush(ctx, msgs) + highID := msgs[len(msgs)-1].ID + if err := o.queries.SetTopicLeaseHighID(ctx, wrapped, sqlc.SetTopicLeaseHighIDParams{ + Topic: topic, + LeaseHighID: pgtype.Int8{ + Int64: highID, + Valid: true, + }, + LeaseHolder: pgtype.Text{ + String: holder, + Valid: true, + }, + }); err != nil { + return nil, fmt.Errorf("could not record leased range for topic %q: %w", topic, err) + } + + if err := tx.Commit(ctx); err != nil { + return nil, fmt.Errorf("could not commit lease transaction for topic %q: %w", topic, err) + } - if err != nil { + if err := f.Flush(ctx, msgs); err != nil { + _ = o.releaseLeaseInNewTx(ctx, topic, holder) return nil, fmt.Errorf("flusher failed for topic %q: %w", topic, err) } - // delete the messages that were flushed - msgIDs := make([]int64, len(msgs)) + ackTx, err := o.pool.Begin(ctx) - for i, msg := range msgs { - msgIDs[i] = msg.ID + if err != nil { + return nil, fmt.Errorf("could not begin ack transaction for topic %q: %w", topic, err) + } + defer ackTx.Rollback(ctx) + + ackWrapped := dbwrap.New(ackTx, o.schema) + if err := o.queries.AckTopicMessages(ctx, ackWrapped, sqlc.AckTopicMessagesParams{ + Topic: topic, + AckedID: highID, + AcksSinceResize: int64(len(msgs)), + LeaseHolder: pgtype.Text{ + String: holder, + Valid: true, + }, + }); err != nil { + return nil, fmt.Errorf("could not ack messages for topic %q: %w", topic, err) + } + + if err := o.dropAckedPartitions(ctx, ackWrapped, topic, highID); err != nil { + return nil, err + } + + if err := ackTx.Commit(ctx); err != nil { + return nil, fmt.Errorf("could not commit ack transaction for topic %q: %w", topic, err) } - err = o.queries.DeleteMessagesByIds(ctx, wrapped, sqlc.DeleteMessagesByIdsParams{ + return msgs, nil +} + +func (o *outboxImpl) allocateTopicIDs(ctx context.Context, db sqlc.DBTX, topic string, count int) (*sqlc.AllocateTopicIdsRow, error) { + allocated, err := o.queries.AllocateTopicIds(ctx, db, sqlc.AllocateTopicIdsParams{ Topic: topic, - Ids: msgIDs, + Count: int64(count), }) + if err == nil { + return allocated, nil + } + if !errors.Is(err, pgx.ErrNoRows) { + return nil, fmt.Errorf("could not allocate message IDs for topic %q: %w", topic, err) + } + + if err := o.ensureTopicSetup(ctx, db, topic); err != nil { + return nil, err + } + allocated, err = o.queries.AllocateTopicIds(ctx, db, sqlc.AllocateTopicIdsParams{ + Topic: topic, + Count: int64(count), + }) if err != nil { - return nil, fmt.Errorf("could not delete messages for topic %q: %w", topic, err) + return nil, fmt.Errorf("could not allocate message IDs for topic %q: %w", topic, err) } - if err := tx.Commit(ctx); err != nil { - return nil, fmt.Errorf("could not commit transaction for processing messages for topic %q: %w", topic, err) + return allocated, nil +} + +func (o *outboxImpl) ensureTopicSetup(ctx context.Context, db sqlc.DBTX, topic string) error { + slug := partitions.TopicSlug(topic) + seqName := partitions.FillSeqName(slug) + + if err := o.partitionManager.LockTopic(ctx, db, topic); err != nil { + return fmt.Errorf("could not lock topic setup for %q: %w", topic, err) } - return msgs, nil + if err := o.partitionManager.EnsureFillSequence(ctx, db, seqName); err != nil { + return fmt.Errorf("could not ensure fill sequence for topic %q: %w", topic, err) + } + + if err := o.queries.EnsureTopicMeta(ctx, db, sqlc.EnsureTopicMetaParams{ + Topic: topic, + PartitionSize: o.partitionSize, + PartitionCount: int32(o.partitionCount), + FillSeqName: seqName, + }); err != nil { + return fmt.Errorf("could not ensure topic metadata for %q: %w", topic, err) + } + + return nil +} + +func (o *outboxImpl) releaseLease(ctx context.Context, db sqlc.DBTX, topic, holder string) error { + if err := o.queries.ReleaseTopicLease(ctx, db, sqlc.ReleaseTopicLeaseParams{ + Topic: topic, + LeaseHolder: pgtype.Text{ + String: holder, + Valid: true, + }, + }); err != nil { + return fmt.Errorf("could not release topic lease for %q: %w", topic, err) + } + return nil +} + +func (o *outboxImpl) releaseLeaseInNewTx(ctx context.Context, topic, holder string) error { + tx, err := o.pool.Begin(ctx) + if err != nil { + return err + } + defer tx.Rollback(ctx) + + if err := o.releaseLease(ctx, dbwrap.New(tx, o.schema), topic, holder); err != nil { + return err + } + return tx.Commit(ctx) +} + +func (o *outboxImpl) dropAckedPartitions(ctx context.Context, db sqlc.DBTX, topic string, ackedID int64) error { + parts, err := o.queries.ListDroppablePartitions(ctx, db, sqlc.ListDroppablePartitionsParams{ + Topic: topic, + HighWaterID: ackedID, + }) + if err != nil { + return fmt.Errorf("could not list droppable partitions for topic %q: %w", topic, err) + } + + for _, part := range parts { + if err := o.partitionManager.DropPartition(ctx, db, part.Relname); err != nil { + return fmt.Errorf("could not drop partition %q for topic %q: %w", part.Relname, topic, err) + } + if err := o.queries.MarkTopicPartitionDropped(ctx, db, sqlc.MarkTopicPartitionDroppedParams{ + Topic: topic, + PartitionIndex: part.PartitionIndex, + }); err != nil { + return fmt.Errorf("could not mark partition %q dropped for topic %q: %w", part.Relname, topic, err) + } + } + + return nil +} + +func topicMetaFromAllocation(row *sqlc.AllocateTopicIdsRow) *sqlc.TopicMetum { + return &sqlc.TopicMetum{ + Topic: row.Topic, + NextID: row.NextID, + AckedID: row.AckedID, + PartitionSize: row.PartitionSize, + PartitionCount: row.PartitionCount, + FillSeqName: row.FillSeqName, + LeaseHolder: row.LeaseHolder, + LeaseExpiresAt: row.LeaseExpiresAt, + LeaseHighID: row.LeaseHighID, + WritesSinceResize: row.WritesSinceResize, + AcksSinceResize: row.AcksSinceResize, + LastWriteAt: row.LastWriteAt, + LastProcessAt: row.LastProcessAt, + ResizedAt: row.ResizedAt, + } +} + +func newLeaseHolder() (string, error) { + var b [16]byte + if _, err := rand.Read(b[:]); err != nil { + return "", fmt.Errorf("could not create lease holder: %w", err) + } + return hex.EncodeToString(b[:]), nil } diff --git a/outbox_bench_test.go b/outbox_bench_test.go index 8eb1ee2..78dccd2 100644 --- a/outbox_bench_test.go +++ b/outbox_bench_test.go @@ -25,10 +25,24 @@ func (c *countingFlusher) Flush(_ context.Context, msgs []*sqlc.Message) error { } func BenchmarkOutbox_WriteAndPublishThroughput(b *testing.B) { + b.Run("DefaultPartitions", func(b *testing.B) { + benchmarkOutboxWriteAndPublishThroughput(b) + }) + b.Run("FrequentPartitionRollover", func(b *testing.B) { + benchmarkOutboxWriteAndPublishThroughput( + b, + pgoutbox.WithDefaultPartitionSize(1000), + pgoutbox.WithDefaultPartitionCount(4), + ) + }) +} + +func benchmarkOutboxWriteAndPublishThroughput(b *testing.B, opts ...pgoutbox.OutboxOpt) { const ( numWorkers = MAX_CONNS maxInFlight = 5000 batchSize = 500 + writeBatch = 100 ) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) @@ -38,16 +52,25 @@ func BenchmarkOutbox_WriteAndPublishThroughput(b *testing.B) { outbox, err := pgoutbox.NewOutbox( sharedPool, - pgoutbox.WithSchema(schema), - pgoutbox.WithBatchSize(batchSize), + append([]pgoutbox.OutboxOpt{ + pgoutbox.WithSchema(schema), + pgoutbox.WithBatchSize(batchSize), + }, opts...)..., ) require.NoError(b, err) // inFlight bounds the number of un-flushed rows allowed in the table at // once: producers acquire a slot before INSERT, the flusher releases it - // after the row is committed-deleted. + // after the row is flushed and the per-topic ack cursor advances. inFlight := make(chan struct{}, maxInFlight) + errCh := make(chan error, 1) var flushed atomic.Int64 + reportErr := func(err error) { + select { + case errCh <- err: + default: + } + } outbox.AddFlusher("bench", &countingFlusher{ onFlush: func(n int) { @@ -67,8 +90,8 @@ func BenchmarkOutbox_WriteAndPublishThroughput(b *testing.B) { if procCtx.Err() != nil { return } - b.Logf("ProcessMessages: %v", err) - continue + reportErr(err) + return } if len(msgs) == 0 { time.Sleep(time.Millisecond) @@ -80,41 +103,69 @@ func BenchmarkOutbox_WriteAndPublishThroughput(b *testing.B) { b.ResetTimer() - work := make(chan struct{}) + work := make(chan int) var wg sync.WaitGroup for range numWorkers { wg.Go(func() { - for range work { - inFlight <- struct{}{} + for n := range work { + for range n { + inFlight <- struct{}{} + } tx, err := sharedPool.Begin(ctx) if err != nil { - b.Errorf("begin: %v", err) + reportErr(err) return } - if err := outbox.AddMessages(ctx, tx, "bench", []pgoutbox.MessageOpts{ - {Payload: payload}, - }); err != nil { + msgs := make([]pgoutbox.MessageOpts, n) + for i := range msgs { + msgs[i] = pgoutbox.MessageOpts{Payload: payload} + } + if err := outbox.AddMessages(ctx, tx, "bench", msgs); err != nil { _ = tx.Rollback(ctx) - b.Errorf("AddMessages: %v", err) + reportErr(err) return } if err := tx.Commit(ctx); err != nil { - b.Errorf("commit: %v", err) + reportErr(err) return } } }) } - for range b.N { - work <- struct{}{} + for remaining := b.N; remaining > 0; { + n := writeBatch + if remaining < n { + n = remaining + } + select { + case err := <-errCh: + close(work) + wg.Wait() + stopProcessor() + procWg.Wait() + b.Fatalf("benchmark worker failed: %v", err) + case work <- n: + remaining -= n + } } close(work) wg.Wait() for flushed.Load() < int64(b.N) { - time.Sleep(time.Millisecond) + select { + case err := <-errCh: + stopProcessor() + procWg.Wait() + b.Fatalf("benchmark processor failed: %v", err) + case <-ctx.Done(): + stopProcessor() + procWg.Wait() + b.Fatalf("timed out waiting for flush progress: flushed=%d want=%d: %v", flushed.Load(), b.N, ctx.Err()) + default: + time.Sleep(time.Millisecond) + } } b.StopTimer() diff --git a/outbox_e2e_test.go b/outbox_e2e_test.go index b459289..7ff1d23 100644 --- a/outbox_e2e_test.go +++ b/outbox_e2e_test.go @@ -112,6 +112,77 @@ func countMessages(t *testing.T, ctx context.Context, schema, topic string) int return n } +func topicAckedID(t *testing.T, ctx context.Context, schema, topic string) int64 { + t.Helper() + + var ackedID int64 + query := fmt.Sprintf("SELECT acked_id FROM %s.topic_meta WHERE topic = $1", pgx.Identifier{schema}.Sanitize()) + err := sharedPool.QueryRow(ctx, query, topic).Scan(&ackedID) + require.NoError(t, err) + return ackedID +} + +func partitionCount(t *testing.T, ctx context.Context, schema, topic string) int { + t.Helper() + + var n int + query := fmt.Sprintf("SELECT count(*) FROM %s.topic_partitions WHERE topic = $1", pgx.Identifier{schema}.Sanitize()) + err := sharedPool.QueryRow(ctx, query, topic).Scan(&n) + require.NoError(t, err) + return n +} + +func maxPartitionIndex(t *testing.T, ctx context.Context, schema, topic string) int { + t.Helper() + + var idx int + query := fmt.Sprintf("SELECT COALESCE(MAX(partition_index), 0) FROM %s.topic_partitions WHERE topic = $1", pgx.Identifier{schema}.Sanitize()) + err := sharedPool.QueryRow(ctx, query, topic).Scan(&idx) + require.NoError(t, err) + return idx +} + +func partitionStatus(t *testing.T, ctx context.Context, schema, topic string, index int) string { + t.Helper() + + var status string + query := fmt.Sprintf("SELECT status FROM %s.topic_partitions WHERE topic = $1 AND partition_index = $2", pgx.Identifier{schema}.Sanitize()) + err := sharedPool.QueryRow(ctx, query, topic, index).Scan(&status) + require.NoError(t, err) + return status +} + +func tableReloptions(t *testing.T, ctx context.Context, schema, table string) []string { + t.Helper() + + var opts []string + err := sharedPool.QueryRow( + ctx, + "SELECT COALESCE(reloptions, ARRAY[]::text[]) FROM pg_class WHERE oid = $1::regclass", + pgx.Identifier{schema, table}.Sanitize(), + ).Scan(&opts) + require.NoError(t, err) + return opts +} + +func messageIDs(t *testing.T, ctx context.Context, schema, topic string) []int64 { + t.Helper() + + query := fmt.Sprintf("SELECT id FROM %s.messages WHERE topic = $1 ORDER BY id", pgx.Identifier{schema}.Sanitize()) + rows, err := sharedPool.Query(ctx, query, topic) + require.NoError(t, err) + defer rows.Close() + + var ids []int64 + for rows.Next() { + var id int64 + require.NoError(t, rows.Scan(&id)) + ids = append(ids, id) + } + require.NoError(t, rows.Err()) + return ids +} + func mustPayload(t *testing.T, v any) []byte { t.Helper() b, err := json.Marshal(v) @@ -151,7 +222,8 @@ func TestOutbox_AddAndProcessMessages(t *testing.T) { received := flusher.Received() require.Len(t, received, 3) - assert.Equal(t, 0, countMessages(t, ctx, schema, "orders"), "flushed messages should be deleted") + assert.Equal(t, int64(3), topicAckedID(t, ctx, schema, "orders")) + assert.Equal(t, 3, countMessages(t, ctx, schema, "orders"), "active partition rows remain append-only until their partition is dropped") // Calling ProcessMessages again with nothing pending should be a no-op. _, err = outbox.ProcessMessages(ctx, "orders") @@ -271,6 +343,7 @@ func TestOutbox_FlusherErrorLeavesMessages(t *testing.T) { // Failed flush must NOT delete the rows — the outbox guarantee is that // messages survive until a Flusher reports success. assert.Equal(t, 2, countMessages(t, ctx, schema, "orders")) + assert.Equal(t, int64(0), topicAckedID(t, ctx, schema, "orders")) // Once the flusher recovers, the same messages should be delivered. flusher.mu.Lock() @@ -280,7 +353,8 @@ func TestOutbox_FlusherErrorLeavesMessages(t *testing.T) { _, err = outbox.ProcessMessages(ctx, "orders") require.NoError(t, err) assert.Len(t, flusher.Received(), 2) - assert.Equal(t, 0, countMessages(t, ctx, schema, "orders")) + assert.Equal(t, int64(2), topicAckedID(t, ctx, schema, "orders")) + assert.Equal(t, 2, countMessages(t, ctx, schema, "orders")) } func TestOutbox_BatchSizeIsRespected(t *testing.T) { @@ -315,21 +389,24 @@ func TestOutbox_BatchSizeIsRespected(t *testing.T) { require.NoError(t, err) assert.Len(t, processed, 1, "ProcessMessages should return exactly batchSize messages") assert.Len(t, flusher.Received(), 1) - assert.Equal(t, 2, countMessages(t, ctx, schema, "orders")) + assert.Equal(t, int64(1), topicAckedID(t, ctx, schema, "orders")) + assert.Equal(t, 3, countMessages(t, ctx, schema, "orders")) // Second call: one more. processed, err = outbox.ProcessMessages(ctx, "orders") require.NoError(t, err) assert.Len(t, processed, 1) assert.Len(t, flusher.Received(), 2) - assert.Equal(t, 1, countMessages(t, ctx, schema, "orders")) + assert.Equal(t, int64(2), topicAckedID(t, ctx, schema, "orders")) + assert.Equal(t, 3, countMessages(t, ctx, schema, "orders")) // Third call: last one. processed, err = outbox.ProcessMessages(ctx, "orders") require.NoError(t, err) assert.Len(t, processed, 1) assert.Len(t, flusher.Received(), 3) - assert.Equal(t, 0, countMessages(t, ctx, schema, "orders")) + assert.Equal(t, int64(3), topicAckedID(t, ctx, schema, "orders")) + assert.Equal(t, 3, countMessages(t, ctx, schema, "orders")) // Fourth call: nothing left, no-op. processed, err = outbox.ProcessMessages(ctx, "orders") @@ -338,6 +415,185 @@ func TestOutbox_BatchSizeIsRespected(t *testing.T) { assert.Len(t, flusher.Received(), 3) } +func TestOutbox_CreatesTopicPartitionsAheadOfWrites(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + schema := uniqueSchema(t) + + outbox, err := pgoutbox.NewOutbox( + sharedPool, + pgoutbox.WithSchema(schema), + pgoutbox.WithDefaultPartitionSize(2), + pgoutbox.WithDefaultPartitionCount(2), + ) + require.NoError(t, err) + + tx, err := sharedPool.Begin(ctx) + require.NoError(t, err) + require.NoError(t, outbox.AddMessages(ctx, tx, "orders", []pgoutbox.MessageOpts{ + {Payload: mustPayload(t, map[string]int{"id": 1})}, + })) + require.NoError(t, tx.Commit(ctx)) + + assert.Equal(t, 3, partitionCount(t, ctx, schema, "orders"), "current partition plus N future partitions should be ready") + assert.Equal(t, "active", partitionStatus(t, ctx, schema, "orders", 0)) + assert.Equal(t, "future", partitionStatus(t, ctx, schema, "orders", 1)) + assert.Equal(t, "future", partitionStatus(t, ctx, schema, "orders", 2)) +} + +func TestOutbox_RollsOverAndSealsFullPartitions(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + schema := uniqueSchema(t) + + outbox, err := pgoutbox.NewOutbox( + sharedPool, + pgoutbox.WithSchema(schema), + pgoutbox.WithDefaultPartitionSize(2), + pgoutbox.WithDefaultPartitionCount(1), + ) + require.NoError(t, err) + + tx, err := sharedPool.Begin(ctx) + require.NoError(t, err) + require.NoError(t, outbox.AddMessages(ctx, tx, "orders", []pgoutbox.MessageOpts{ + {Payload: mustPayload(t, map[string]int{"id": 1})}, + {Payload: mustPayload(t, map[string]int{"id": 2})}, + {Payload: mustPayload(t, map[string]int{"id": 3})}, + })) + require.NoError(t, tx.Commit(ctx)) + + assert.Equal(t, "sealed", partitionStatus(t, ctx, schema, "orders", 0)) + assert.Equal(t, "active", partitionStatus(t, ctx, schema, "orders", 1)) + assert.Equal(t, "future", partitionStatus(t, ctx, schema, "orders", 2)) +} + +func TestOutbox_SequenceOvercountOnlyCreatesExtraFuturePartitions(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + schema := uniqueSchema(t) + + outbox, err := pgoutbox.NewOutbox( + sharedPool, + pgoutbox.WithSchema(schema), + pgoutbox.WithDefaultPartitionSize(2), + pgoutbox.WithDefaultPartitionCount(1), + ) + require.NoError(t, err) + + tx, err := sharedPool.Begin(ctx) + require.NoError(t, err) + require.NoError(t, outbox.AddMessages(ctx, tx, "orders", []pgoutbox.MessageOpts{ + {Payload: mustPayload(t, map[string]int{"id": 1})}, + })) + require.NoError(t, tx.Commit(ctx)) + + rollbackTx, err := sharedPool.Begin(ctx) + require.NoError(t, err) + require.NoError(t, outbox.AddMessages(ctx, rollbackTx, "orders", []pgoutbox.MessageOpts{ + {Payload: mustPayload(t, map[string]int{"id": 2})}, + {Payload: mustPayload(t, map[string]int{"id": 3})}, + })) + require.NoError(t, rollbackTx.Rollback(ctx)) + + tx, err = sharedPool.Begin(ctx) + require.NoError(t, err) + require.NoError(t, outbox.AddMessages(ctx, tx, "orders", []pgoutbox.MessageOpts{ + {Payload: mustPayload(t, map[string]int{"id": 4})}, + })) + require.NoError(t, tx.Commit(ctx)) + + assert.Equal(t, []int64{1, 2}, messageIDs(t, ctx, schema, "orders"), "transactional IDs should not inherit sequence gaps") + assert.GreaterOrEqual(t, maxPartitionIndex(t, ctx, schema, "orders"), 2, "sequence overcount should only move the proactive partition horizon forward") +} + +func TestOutbox_DropsSealedAckedPartitions(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + schema := uniqueSchema(t) + + outbox, err := pgoutbox.NewOutbox( + sharedPool, + pgoutbox.WithSchema(schema), + pgoutbox.WithBatchSize(2), + pgoutbox.WithDefaultPartitionSize(2), + pgoutbox.WithDefaultPartitionCount(1), + ) + require.NoError(t, err) + + flusher := &captureFlusher{} + outbox.AddFlusher("orders", flusher) + + tx, err := sharedPool.Begin(ctx) + require.NoError(t, err) + require.NoError(t, outbox.AddMessages(ctx, tx, "orders", []pgoutbox.MessageOpts{ + {Payload: mustPayload(t, map[string]int{"id": 1})}, + {Payload: mustPayload(t, map[string]int{"id": 2})}, + {Payload: mustPayload(t, map[string]int{"id": 3})}, + })) + require.NoError(t, tx.Commit(ctx)) + + processed, err := outbox.ProcessMessages(ctx, "orders") + require.NoError(t, err) + require.Len(t, processed, 2) + + assert.Equal(t, "dropped", partitionStatus(t, ctx, schema, "orders", 0)) + assert.Equal(t, 1, countMessages(t, ctx, schema, "orders"), "dropping the sealed first partition should leave only the active partition row") +} + +func TestOutbox_ConcurrentProcessorsDoNotDoubleFlushLeasedRange(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + schema := uniqueSchema(t) + + outbox, err := pgoutbox.NewOutbox(sharedPool, pgoutbox.WithSchema(schema)) + require.NoError(t, err) + + flusher := &captureFlusher{} + outbox.AddFlusher("orders", flusher) + + tx, err := sharedPool.Begin(ctx) + require.NoError(t, err) + require.NoError(t, outbox.AddMessages(ctx, tx, "orders", []pgoutbox.MessageOpts{ + {Payload: mustPayload(t, map[string]int{"id": 1})}, + {Payload: mustPayload(t, map[string]int{"id": 2})}, + {Payload: mustPayload(t, map[string]int{"id": 3})}, + })) + require.NoError(t, tx.Commit(ctx)) + + var wg sync.WaitGroup + errs := make(chan error, 2) + for range 2 { + wg.Go(func() { + _, err := outbox.ProcessMessages(ctx, "orders") + errs <- err + }) + } + wg.Wait() + close(errs) + + for err := range errs { + require.NoError(t, err) + } + assert.Len(t, flusher.Received(), 3) + assert.Equal(t, int64(3), topicAckedID(t, ctx, schema, "orders")) +} + // messagesTableExists returns true if the messages table is present in the // given schema. Used to verify the auto-migrate behavior. func messagesTableExists(t *testing.T, ctx context.Context, schema string) bool { @@ -412,7 +668,8 @@ func TestOutbox_ExplicitMigrate(t *testing.T) { require.NoError(t, err) assert.Len(t, processed, 2) assert.Len(t, flusher.Received(), 2) - assert.Equal(t, 0, countMessages(t, ctx, schema, "orders")) + assert.Equal(t, int64(2), topicAckedID(t, ctx, schema, "orders")) + assert.Equal(t, 2, countMessages(t, ctx, schema, "orders")) } func TestOutbox_InvalidSchemaNameRejected(t *testing.T) { @@ -448,3 +705,26 @@ func TestOutbox_TableLandsInConfiguredSchema(t *testing.T) { require.NoError(t, err) assert.Equal(t, schema, schemaName) } + +func TestOutbox_TrackingTablesUseAggressiveAutovacuum(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + schema := uniqueSchema(t) + + _, err := pgoutbox.NewOutbox(sharedPool, pgoutbox.WithSchema(schema)) + require.NoError(t, err) + + for _, table := range []string{"topic_meta", "topic_partitions"} { + opts := tableReloptions(t, ctx, schema, table) + assert.Contains(t, opts, "fillfactor=70") + assert.Contains(t, opts, "autovacuum_vacuum_scale_factor=0.0") + assert.Contains(t, opts, "autovacuum_vacuum_threshold=5") + assert.Contains(t, opts, "autovacuum_analyze_scale_factor=0.0") + assert.Contains(t, opts, "autovacuum_analyze_threshold=5") + assert.Contains(t, opts, "autovacuum_vacuum_cost_delay=0") + assert.Contains(t, opts, "autovacuum_vacuum_cost_limit=1000") + } +} diff --git a/sqlc/copyfrom.go b/sqlc/copyfrom.go index 3967194..9c0cea1 100644 --- a/sqlc/copyfrom.go +++ b/sqlc/copyfrom.go @@ -29,6 +29,7 @@ func (r *iteratorForInsertMessage) Next() bool { func (r iteratorForInsertMessage) Values() ([]interface{}, error) { return []interface{}{ + r.rows[0].ID, r.rows[0].Topic, r.rows[0].Payload, }, nil @@ -39,5 +40,5 @@ func (r iteratorForInsertMessage) Err() error { } func (q *Queries) InsertMessage(ctx context.Context, db DBTX, arg []InsertMessageParams) (int64, error) { - return db.CopyFrom(ctx, []string{"messages"}, []string{"topic", "payload"}, &iteratorForInsertMessage{rows: arg}) + return db.CopyFrom(ctx, []string{"messages"}, []string{"id", "topic", "payload"}, &iteratorForInsertMessage{rows: arg}) } diff --git a/sqlc/migrations/20260516120000_partitioned_outbox.sql b/sqlc/migrations/20260516120000_partitioned_outbox.sql new file mode 100644 index 0000000..281d7c0 --- /dev/null +++ b/sqlc/migrations/20260516120000_partitioned_outbox.sql @@ -0,0 +1,233 @@ +-- +goose Up +-- +goose StatementBegin +ALTER TABLE messages RENAME TO messages_legacy; + +CREATE TABLE messages ( + id BIGINT NOT NULL, + inserted_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, + topic TEXT NOT NULL, + payload JSONB NOT NULL, + CONSTRAINT messages_pkey PRIMARY KEY (topic, id) +) PARTITION BY LIST (topic); + +CREATE TABLE topic_meta ( + topic TEXT PRIMARY KEY, + next_id BIGINT NOT NULL DEFAULT 1, + acked_id BIGINT NOT NULL DEFAULT 0, + partition_size BIGINT NOT NULL, + partition_count INT NOT NULL, + fill_seq_name TEXT NOT NULL, + lease_holder TEXT, + lease_expires_at TIMESTAMPTZ, + lease_high_id BIGINT, + writes_since_resize BIGINT NOT NULL DEFAULT 0, + acks_since_resize BIGINT NOT NULL DEFAULT 0, + last_write_at TIMESTAMPTZ, + last_process_at TIMESTAMPTZ, + resized_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE TABLE topic_partitions ( + topic TEXT NOT NULL REFERENCES topic_meta (topic) ON DELETE CASCADE, + partition_index INT NOT NULL, + relname TEXT NOT NULL, + id_from BIGINT NOT NULL, + id_to BIGINT NOT NULL, + partition_size BIGINT NOT NULL, + high_water_id BIGINT NOT NULL DEFAULT 0, + status TEXT NOT NULL CHECK ( + status IN ('future', 'active', 'sealed', 'dropped') + ), + created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (topic, partition_index), + CONSTRAINT topic_partitions_relname_key UNIQUE (relname) +); + +CREATE INDEX topic_partitions_topic_status_idx ON topic_partitions (topic, status); + +ALTER TABLE topic_meta SET ( + fillfactor = 70, + autovacuum_vacuum_scale_factor = 0.0, + autovacuum_vacuum_threshold = 5, + autovacuum_analyze_scale_factor = 0.0, + autovacuum_analyze_threshold = 5, + autovacuum_vacuum_cost_delay = 0, + autovacuum_vacuum_cost_limit = 1000 +); + +ALTER TABLE topic_partitions SET ( + fillfactor = 70, + autovacuum_vacuum_scale_factor = 0.0, + autovacuum_vacuum_threshold = 5, + autovacuum_analyze_scale_factor = 0.0, + autovacuum_analyze_threshold = 5, + autovacuum_vacuum_cost_delay = 0, + autovacuum_vacuum_cost_limit = 1000 +); + +DO $$ +DECLARE + rec RECORD; + slug TEXT; + list_rel TEXT; + seq_name TEXT; + max_id BIGINT; + z BIGINT := 100000; + n INT := 2; + part_idx INT; + part_from BIGINT; + part_to BIGINT; + range_rel TEXT; + max_part_idx INT; +BEGIN + FOR rec IN + SELECT + topic, + COALESCE(MAX(id), 0) AS max_id + FROM messages_legacy + GROUP BY topic + LOOP + slug := substr(md5(rec.topic), 1, 16); + list_rel := 'messages_t_' || slug; + seq_name := 'fill_seq_' || slug; + + EXECUTE format( + 'CREATE SEQUENCE %I CACHE 100', + seq_name + ); + + IF rec.max_id > 0 THEN + EXECUTE format( + 'SELECT setval(%L, %s)', + seq_name, + rec.max_id + ); + END IF; + + INSERT INTO topic_meta ( + topic, + next_id, + acked_id, + partition_size, + partition_count, + fill_seq_name + ) VALUES ( + rec.topic, + rec.max_id + 1, + 0, + z, + n, + seq_name + ); + + EXECUTE format( + 'CREATE TABLE %I PARTITION OF messages FOR VALUES IN (%L) PARTITION BY RANGE (id)', + list_rel, + rec.topic + ); + + IF rec.max_id = 0 THEN + max_part_idx := n; + ELSE + max_part_idx := ((rec.max_id - 1) / z)::INT + n; + END IF; + + FOR part_idx IN 0..max_part_idx LOOP + part_from := part_idx::BIGINT * z + 1; + part_to := (part_idx::BIGINT + 1) * z + 1; + range_rel := list_rel || '_p' || part_idx::TEXT; + + EXECUTE format( + 'CREATE TABLE %I PARTITION OF %I FOR VALUES FROM (%s) TO (%s)', + range_rel, + list_rel, + part_from, + part_to + ); + + INSERT INTO topic_partitions ( + topic, + partition_index, + relname, + id_from, + id_to, + partition_size, + high_water_id, + status + ) VALUES ( + rec.topic, + part_idx, + range_rel, + part_from, + part_to, + z, + CASE + WHEN part_idx = ((rec.max_id - 1) / z)::INT AND rec.max_id > 0 THEN rec.max_id + ELSE 0 + END, + CASE + WHEN part_idx = ((rec.max_id - 1) / z)::INT AND rec.max_id > 0 THEN 'active' + WHEN part_idx > ((rec.max_id - 1) / z)::INT THEN 'future' + ELSE 'sealed' + END + ); + END LOOP; + END LOOP; +END $$; + +INSERT INTO messages ( + id, + inserted_at, + topic, + payload +) +SELECT + id, + inserted_at, + topic, + payload +FROM messages_legacy; + +DROP TABLE messages_legacy; +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +CREATE TABLE messages_legacy ( + id BIGINT GENERATED ALWAYS AS IDENTITY, + inserted_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, + topic TEXT NOT NULL, + payload JSONB NOT NULL, + CONSTRAINT v1_message_pkey PRIMARY KEY (topic, id) +); + +INSERT INTO messages_legacy ( + id, + inserted_at, + topic, + payload +) +OVERRIDING SYSTEM VALUE +SELECT + id, + inserted_at, + topic, + payload +FROM messages; + +DROP TABLE messages CASCADE; + +DO $$ +DECLARE + rec RECORD; +BEGIN + FOR rec IN SELECT fill_seq_name FROM topic_meta LOOP + EXECUTE format('DROP SEQUENCE IF EXISTS %I', rec.fill_seq_name); + END LOOP; +END $$; + +DROP TABLE topic_partitions; +DROP TABLE topic_meta; + +ALTER TABLE messages_legacy RENAME TO messages; +-- +goose StatementEnd diff --git a/sqlc/models.go b/sqlc/models.go index 2ba0d5e..6603ebc 100644 --- a/sqlc/models.go +++ b/sqlc/models.go @@ -14,3 +14,32 @@ type Message struct { Topic string `json:"topic"` Payload []byte `json:"payload"` } + +type TopicMetum struct { + Topic string `json:"topic"` + NextID int64 `json:"next_id"` + AckedID int64 `json:"acked_id"` + PartitionSize int64 `json:"partition_size"` + PartitionCount int32 `json:"partition_count"` + FillSeqName string `json:"fill_seq_name"` + LeaseHolder pgtype.Text `json:"lease_holder"` + LeaseExpiresAt pgtype.Timestamptz `json:"lease_expires_at"` + LeaseHighID pgtype.Int8 `json:"lease_high_id"` + WritesSinceResize int64 `json:"writes_since_resize"` + AcksSinceResize int64 `json:"acks_since_resize"` + LastWriteAt pgtype.Timestamptz `json:"last_write_at"` + LastProcessAt pgtype.Timestamptz `json:"last_process_at"` + ResizedAt pgtype.Timestamptz `json:"resized_at"` +} + +type TopicPartition struct { + Topic string `json:"topic"` + PartitionIndex int32 `json:"partition_index"` + Relname string `json:"relname"` + IDFrom int64 `json:"id_from"` + IDTo int64 `json:"id_to"` + PartitionSize int64 `json:"partition_size"` + HighWaterID int64 `json:"high_water_id"` + Status string `json:"status"` + CreatedAt pgtype.Timestamptz `json:"created_at"` +} diff --git a/sqlc/queries.sql b/sqlc/queries.sql index ec6d423..8b7943b 100644 --- a/sqlc/queries.sql +++ b/sqlc/queries.sql @@ -1,21 +1,182 @@ -- name: InsertMessage :copyfrom INSERT INTO /*tmpl*/ messages /*tmpl*/ ( + id, topic, payload -) VALUES ($1, $2); +) VALUES ($1, $2, $3); --- name: AcquireMessagesByTopic :many +-- name: EnsureTopicMeta :exec +INSERT INTO /*tmpl*/ topic_meta /*tmpl*/ ( + topic, + partition_size, + partition_count, + fill_seq_name +) VALUES ($1, $2, $3, $4) +ON CONFLICT (topic) DO NOTHING; + +-- name: GetTopicMetaForUpdate :one +SELECT + * +FROM /*tmpl*/ topic_meta /*tmpl*/ +WHERE + topic = $1 +FOR UPDATE; + +-- name: AllocateTopicIds :one +UPDATE /*tmpl*/ topic_meta /*tmpl*/ +SET + next_id = next_id + sqlc.arg('count')::bigint, + writes_since_resize = writes_since_resize + sqlc.arg('count')::bigint, + last_write_at = CURRENT_TIMESTAMP +WHERE + topic = sqlc.arg('topic') +RETURNING + (next_id - sqlc.arg('count')::bigint)::bigint AS start_id, + (next_id - 1)::bigint AS end_id, + topic, + next_id, + acked_id, + partition_size, + partition_count, + fill_seq_name, + lease_holder, + lease_expires_at, + lease_high_id, + writes_since_resize, + acks_since_resize, + last_write_at, + last_process_at, + resized_at; + +-- name: UpdateTopicSizing :exec +UPDATE /*tmpl*/ topic_meta /*tmpl*/ +SET + partition_size = $2, + partition_count = $3, + writes_since_resize = 0, + acks_since_resize = 0, + resized_at = CURRENT_TIMESTAMP +WHERE + topic = $1; + +-- name: TryAcquireTopicLease :one +UPDATE /*tmpl*/ topic_meta /*tmpl*/ +SET + lease_holder = sqlc.arg('holder'), + lease_expires_at = CURRENT_TIMESTAMP + INTERVAL '5 minutes', + lease_high_id = NULL +WHERE + topic = sqlc.arg('topic') + AND ( + lease_holder IS NULL + OR lease_expires_at < CURRENT_TIMESTAMP + OR lease_holder = sqlc.arg('holder') + ) +RETURNING + *; + +-- name: SetTopicLeaseHighID :exec +UPDATE /*tmpl*/ topic_meta /*tmpl*/ +SET + lease_high_id = $2 +WHERE + topic = $1 + AND lease_holder = $3; + +-- name: ReleaseTopicLease :exec +UPDATE /*tmpl*/ topic_meta /*tmpl*/ +SET + lease_holder = NULL, + lease_expires_at = NULL, + lease_high_id = NULL +WHERE + topic = $1 + AND lease_holder = $2; + +-- name: AckTopicMessages :exec +UPDATE /*tmpl*/ topic_meta /*tmpl*/ +SET + acked_id = $2, + acks_since_resize = acks_since_resize + $3, + last_process_at = CURRENT_TIMESTAMP, + lease_holder = NULL, + lease_expires_at = NULL, + lease_high_id = NULL +WHERE + topic = $1 + AND lease_holder = $4; + +-- name: ListMessagesAfterAcked :many SELECT * FROM /*tmpl*/ messages /*tmpl*/ WHERE topic = $1 + AND id > $2 +ORDER BY + id ASC LIMIT - COALESCE(sqlc.narg('limit')::integer, 100) -FOR UPDATE SKIP LOCKED; + COALESCE(sqlc.narg('limit')::integer, 100); + +-- name: UpsertTopicPartition :exec +INSERT INTO /*tmpl*/ topic_partitions /*tmpl*/ ( + topic, + partition_index, + relname, + id_from, + id_to, + partition_size, + status +) VALUES ($1, $2, $3, $4, $5, $6, $7) +ON CONFLICT (topic, partition_index) DO NOTHING; + +-- name: UpdateTopicPartitionHighWater :exec +UPDATE /*tmpl*/ topic_partitions /*tmpl*/ +SET + high_water_id = GREATEST(high_water_id, $3), + status = CASE + WHEN status = 'future' THEN 'active' + ELSE status + END +WHERE + topic = $1 + AND partition_index = $2; + +-- name: SealTopicPartitionsUpTo :exec +UPDATE /*tmpl*/ topic_partitions /*tmpl*/ +SET + status = 'sealed' +WHERE + topic = $1 + AND partition_index < $2 + AND status IN ('active', 'future'); + +-- name: SealFullyAckedActivePartitions :exec +UPDATE /*tmpl*/ topic_partitions /*tmpl*/ +SET + status = 'sealed' +WHERE + topic = $1 + AND status = 'active' + AND high_water_id > 0 + AND high_water_id <= $2; + +-- name: ListDroppablePartitions :many +SELECT + * +FROM /*tmpl*/ topic_partitions /*tmpl*/ +WHERE + topic = $1 + AND status = 'sealed' + AND high_water_id > 0 + AND high_water_id <= $2 +ORDER BY + partition_index ASC; --- name: DeleteMessagesByIds :exec -DELETE FROM /*tmpl*/ messages /*tmpl*/ +-- name: MarkTopicPartitionDropped :exec +UPDATE /*tmpl*/ topic_partitions /*tmpl*/ +SET + status = 'dropped' WHERE - topic = @topic - AND id = ANY(@ids::bigint[]); \ No newline at end of file + topic = $1 + AND partition_index = $2; diff --git a/sqlc/queries.sql.go b/sqlc/queries.sql.go index 952d4b2..6e96ee0 100644 --- a/sqlc/queries.sql.go +++ b/sqlc/queries.sql.go @@ -11,24 +11,245 @@ import ( "github.com/jackc/pgx/v5/pgtype" ) -const acquireMessagesByTopic = `-- name: AcquireMessagesByTopic :many +const ackTopicMessages = `-- name: AckTopicMessages :exec +UPDATE /*tmpl*/ topic_meta /*tmpl*/ +SET + acked_id = $2, + acks_since_resize = acks_since_resize + $3, + last_process_at = CURRENT_TIMESTAMP, + lease_holder = NULL, + lease_expires_at = NULL, + lease_high_id = NULL +WHERE + topic = $1 + AND lease_holder = $4 +` + +type AckTopicMessagesParams struct { + Topic string `json:"topic"` + AckedID int64 `json:"acked_id"` + AcksSinceResize int64 `json:"acks_since_resize"` + LeaseHolder pgtype.Text `json:"lease_holder"` +} + +func (q *Queries) AckTopicMessages(ctx context.Context, db DBTX, arg AckTopicMessagesParams) error { + _, err := db.Exec(ctx, ackTopicMessages, + arg.Topic, + arg.AckedID, + arg.AcksSinceResize, + arg.LeaseHolder, + ) + return err +} + +const allocateTopicIds = `-- name: AllocateTopicIds :one +UPDATE /*tmpl*/ topic_meta /*tmpl*/ +SET + next_id = next_id + $1::bigint, + writes_since_resize = writes_since_resize + $1::bigint, + last_write_at = CURRENT_TIMESTAMP +WHERE + topic = $2 +RETURNING + (next_id - $1::bigint)::bigint AS start_id, + (next_id - 1)::bigint AS end_id, + topic, + next_id, + acked_id, + partition_size, + partition_count, + fill_seq_name, + lease_holder, + lease_expires_at, + lease_high_id, + writes_since_resize, + acks_since_resize, + last_write_at, + last_process_at, + resized_at +` + +type AllocateTopicIdsParams struct { + Count int64 `json:"count"` + Topic string `json:"topic"` +} + +type AllocateTopicIdsRow struct { + StartID int64 `json:"start_id"` + EndID int64 `json:"end_id"` + Topic string `json:"topic"` + NextID int64 `json:"next_id"` + AckedID int64 `json:"acked_id"` + PartitionSize int64 `json:"partition_size"` + PartitionCount int32 `json:"partition_count"` + FillSeqName string `json:"fill_seq_name"` + LeaseHolder pgtype.Text `json:"lease_holder"` + LeaseExpiresAt pgtype.Timestamptz `json:"lease_expires_at"` + LeaseHighID pgtype.Int8 `json:"lease_high_id"` + WritesSinceResize int64 `json:"writes_since_resize"` + AcksSinceResize int64 `json:"acks_since_resize"` + LastWriteAt pgtype.Timestamptz `json:"last_write_at"` + LastProcessAt pgtype.Timestamptz `json:"last_process_at"` + ResizedAt pgtype.Timestamptz `json:"resized_at"` +} + +func (q *Queries) AllocateTopicIds(ctx context.Context, db DBTX, arg AllocateTopicIdsParams) (*AllocateTopicIdsRow, error) { + row := db.QueryRow(ctx, allocateTopicIds, arg.Count, arg.Topic) + var i AllocateTopicIdsRow + err := row.Scan( + &i.StartID, + &i.EndID, + &i.Topic, + &i.NextID, + &i.AckedID, + &i.PartitionSize, + &i.PartitionCount, + &i.FillSeqName, + &i.LeaseHolder, + &i.LeaseExpiresAt, + &i.LeaseHighID, + &i.WritesSinceResize, + &i.AcksSinceResize, + &i.LastWriteAt, + &i.LastProcessAt, + &i.ResizedAt, + ) + return &i, err +} + +const ensureTopicMeta = `-- name: EnsureTopicMeta :exec +INSERT INTO /*tmpl*/ topic_meta /*tmpl*/ ( + topic, + partition_size, + partition_count, + fill_seq_name +) VALUES ($1, $2, $3, $4) +ON CONFLICT (topic) DO NOTHING +` + +type EnsureTopicMetaParams struct { + Topic string `json:"topic"` + PartitionSize int64 `json:"partition_size"` + PartitionCount int32 `json:"partition_count"` + FillSeqName string `json:"fill_seq_name"` +} + +func (q *Queries) EnsureTopicMeta(ctx context.Context, db DBTX, arg EnsureTopicMetaParams) error { + _, err := db.Exec(ctx, ensureTopicMeta, + arg.Topic, + arg.PartitionSize, + arg.PartitionCount, + arg.FillSeqName, + ) + return err +} + +const getTopicMetaForUpdate = `-- name: GetTopicMetaForUpdate :one +SELECT + topic, next_id, acked_id, partition_size, partition_count, fill_seq_name, lease_holder, lease_expires_at, lease_high_id, writes_since_resize, acks_since_resize, last_write_at, last_process_at, resized_at +FROM /*tmpl*/ topic_meta /*tmpl*/ +WHERE + topic = $1 +FOR UPDATE +` + +func (q *Queries) GetTopicMetaForUpdate(ctx context.Context, db DBTX, topic string) (*TopicMetum, error) { + row := db.QueryRow(ctx, getTopicMetaForUpdate, topic) + var i TopicMetum + err := row.Scan( + &i.Topic, + &i.NextID, + &i.AckedID, + &i.PartitionSize, + &i.PartitionCount, + &i.FillSeqName, + &i.LeaseHolder, + &i.LeaseExpiresAt, + &i.LeaseHighID, + &i.WritesSinceResize, + &i.AcksSinceResize, + &i.LastWriteAt, + &i.LastProcessAt, + &i.ResizedAt, + ) + return &i, err +} + +type InsertMessageParams struct { + ID int64 `json:"id"` + Topic string `json:"topic"` + Payload []byte `json:"payload"` +} + +const listDroppablePartitions = `-- name: ListDroppablePartitions :many +SELECT + topic, partition_index, relname, id_from, id_to, partition_size, high_water_id, status, created_at +FROM /*tmpl*/ topic_partitions /*tmpl*/ +WHERE + topic = $1 + AND status = 'sealed' + AND high_water_id > 0 + AND high_water_id <= $2 +ORDER BY + partition_index ASC +` + +type ListDroppablePartitionsParams struct { + Topic string `json:"topic"` + HighWaterID int64 `json:"high_water_id"` +} + +func (q *Queries) ListDroppablePartitions(ctx context.Context, db DBTX, arg ListDroppablePartitionsParams) ([]*TopicPartition, error) { + rows, err := db.Query(ctx, listDroppablePartitions, arg.Topic, arg.HighWaterID) + if err != nil { + return nil, err + } + defer rows.Close() + var items []*TopicPartition + for rows.Next() { + var i TopicPartition + if err := rows.Scan( + &i.Topic, + &i.PartitionIndex, + &i.Relname, + &i.IDFrom, + &i.IDTo, + &i.PartitionSize, + &i.HighWaterID, + &i.Status, + &i.CreatedAt, + ); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const listMessagesAfterAcked = `-- name: ListMessagesAfterAcked :many SELECT id, inserted_at, topic, payload FROM /*tmpl*/ messages /*tmpl*/ WHERE topic = $1 + AND id > $2 +ORDER BY + id ASC LIMIT - COALESCE($2::integer, 100) -FOR UPDATE SKIP LOCKED + COALESCE($3::integer, 100) ` -type AcquireMessagesByTopicParams struct { +type ListMessagesAfterAckedParams struct { Topic string `json:"topic"` + ID int64 `json:"id"` Limit pgtype.Int4 `json:"limit"` } -func (q *Queries) AcquireMessagesByTopic(ctx context.Context, db DBTX, arg AcquireMessagesByTopicParams) ([]*Message, error) { - rows, err := db.Query(ctx, acquireMessagesByTopic, arg.Topic, arg.Limit) +func (q *Queries) ListMessagesAfterAcked(ctx context.Context, db DBTX, arg ListMessagesAfterAckedParams) ([]*Message, error) { + rows, err := db.Query(ctx, listMessagesAfterAcked, arg.Topic, arg.ID, arg.Limit) if err != nil { return nil, err } @@ -52,24 +273,230 @@ func (q *Queries) AcquireMessagesByTopic(ctx context.Context, db DBTX, arg Acqui return items, nil } -const deleteMessagesByIds = `-- name: DeleteMessagesByIds :exec -DELETE FROM /*tmpl*/ messages /*tmpl*/ +const markTopicPartitionDropped = `-- name: MarkTopicPartitionDropped :exec +UPDATE /*tmpl*/ topic_partitions /*tmpl*/ +SET + status = 'dropped' WHERE topic = $1 - AND id = ANY($2::bigint[]) + AND partition_index = $2 ` -type DeleteMessagesByIdsParams struct { - Topic string `json:"topic"` - Ids []int64 `json:"ids"` +type MarkTopicPartitionDroppedParams struct { + Topic string `json:"topic"` + PartitionIndex int32 `json:"partition_index"` } -func (q *Queries) DeleteMessagesByIds(ctx context.Context, db DBTX, arg DeleteMessagesByIdsParams) error { - _, err := db.Exec(ctx, deleteMessagesByIds, arg.Topic, arg.Ids) +func (q *Queries) MarkTopicPartitionDropped(ctx context.Context, db DBTX, arg MarkTopicPartitionDroppedParams) error { + _, err := db.Exec(ctx, markTopicPartitionDropped, arg.Topic, arg.PartitionIndex) return err } -type InsertMessageParams struct { - Topic string `json:"topic"` - Payload []byte `json:"payload"` +const releaseTopicLease = `-- name: ReleaseTopicLease :exec +UPDATE /*tmpl*/ topic_meta /*tmpl*/ +SET + lease_holder = NULL, + lease_expires_at = NULL, + lease_high_id = NULL +WHERE + topic = $1 + AND lease_holder = $2 +` + +type ReleaseTopicLeaseParams struct { + Topic string `json:"topic"` + LeaseHolder pgtype.Text `json:"lease_holder"` +} + +func (q *Queries) ReleaseTopicLease(ctx context.Context, db DBTX, arg ReleaseTopicLeaseParams) error { + _, err := db.Exec(ctx, releaseTopicLease, arg.Topic, arg.LeaseHolder) + return err +} + +const sealFullyAckedActivePartitions = `-- name: SealFullyAckedActivePartitions :exec +UPDATE /*tmpl*/ topic_partitions /*tmpl*/ +SET + status = 'sealed' +WHERE + topic = $1 + AND status = 'active' + AND high_water_id > 0 + AND high_water_id <= $2 +` + +type SealFullyAckedActivePartitionsParams struct { + Topic string `json:"topic"` + HighWaterID int64 `json:"high_water_id"` +} + +func (q *Queries) SealFullyAckedActivePartitions(ctx context.Context, db DBTX, arg SealFullyAckedActivePartitionsParams) error { + _, err := db.Exec(ctx, sealFullyAckedActivePartitions, arg.Topic, arg.HighWaterID) + return err +} + +const sealTopicPartitionsUpTo = `-- name: SealTopicPartitionsUpTo :exec +UPDATE /*tmpl*/ topic_partitions /*tmpl*/ +SET + status = 'sealed' +WHERE + topic = $1 + AND partition_index < $2 + AND status IN ('active', 'future') +` + +type SealTopicPartitionsUpToParams struct { + Topic string `json:"topic"` + PartitionIndex int32 `json:"partition_index"` +} + +func (q *Queries) SealTopicPartitionsUpTo(ctx context.Context, db DBTX, arg SealTopicPartitionsUpToParams) error { + _, err := db.Exec(ctx, sealTopicPartitionsUpTo, arg.Topic, arg.PartitionIndex) + return err +} + +const setTopicLeaseHighID = `-- name: SetTopicLeaseHighID :exec +UPDATE /*tmpl*/ topic_meta /*tmpl*/ +SET + lease_high_id = $2 +WHERE + topic = $1 + AND lease_holder = $3 +` + +type SetTopicLeaseHighIDParams struct { + Topic string `json:"topic"` + LeaseHighID pgtype.Int8 `json:"lease_high_id"` + LeaseHolder pgtype.Text `json:"lease_holder"` +} + +func (q *Queries) SetTopicLeaseHighID(ctx context.Context, db DBTX, arg SetTopicLeaseHighIDParams) error { + _, err := db.Exec(ctx, setTopicLeaseHighID, arg.Topic, arg.LeaseHighID, arg.LeaseHolder) + return err +} + +const tryAcquireTopicLease = `-- name: TryAcquireTopicLease :one +UPDATE /*tmpl*/ topic_meta /*tmpl*/ +SET + lease_holder = $1, + lease_expires_at = CURRENT_TIMESTAMP + INTERVAL '5 minutes', + lease_high_id = NULL +WHERE + topic = $2 + AND ( + lease_holder IS NULL + OR lease_expires_at < CURRENT_TIMESTAMP + OR lease_holder = $1 + ) +RETURNING + topic, next_id, acked_id, partition_size, partition_count, fill_seq_name, lease_holder, lease_expires_at, lease_high_id, writes_since_resize, acks_since_resize, last_write_at, last_process_at, resized_at +` + +type TryAcquireTopicLeaseParams struct { + Holder pgtype.Text `json:"holder"` + Topic string `json:"topic"` +} + +func (q *Queries) TryAcquireTopicLease(ctx context.Context, db DBTX, arg TryAcquireTopicLeaseParams) (*TopicMetum, error) { + row := db.QueryRow(ctx, tryAcquireTopicLease, arg.Holder, arg.Topic) + var i TopicMetum + err := row.Scan( + &i.Topic, + &i.NextID, + &i.AckedID, + &i.PartitionSize, + &i.PartitionCount, + &i.FillSeqName, + &i.LeaseHolder, + &i.LeaseExpiresAt, + &i.LeaseHighID, + &i.WritesSinceResize, + &i.AcksSinceResize, + &i.LastWriteAt, + &i.LastProcessAt, + &i.ResizedAt, + ) + return &i, err +} + +const updateTopicPartitionHighWater = `-- name: UpdateTopicPartitionHighWater :exec +UPDATE /*tmpl*/ topic_partitions /*tmpl*/ +SET + high_water_id = GREATEST(high_water_id, $3), + status = CASE + WHEN status = 'future' THEN 'active' + ELSE status + END +WHERE + topic = $1 + AND partition_index = $2 +` + +type UpdateTopicPartitionHighWaterParams struct { + Topic string `json:"topic"` + PartitionIndex int32 `json:"partition_index"` + HighWaterID int64 `json:"high_water_id"` +} + +func (q *Queries) UpdateTopicPartitionHighWater(ctx context.Context, db DBTX, arg UpdateTopicPartitionHighWaterParams) error { + _, err := db.Exec(ctx, updateTopicPartitionHighWater, arg.Topic, arg.PartitionIndex, arg.HighWaterID) + return err +} + +const updateTopicSizing = `-- name: UpdateTopicSizing :exec +UPDATE /*tmpl*/ topic_meta /*tmpl*/ +SET + partition_size = $2, + partition_count = $3, + writes_since_resize = 0, + acks_since_resize = 0, + resized_at = CURRENT_TIMESTAMP +WHERE + topic = $1 +` + +type UpdateTopicSizingParams struct { + Topic string `json:"topic"` + PartitionSize int64 `json:"partition_size"` + PartitionCount int32 `json:"partition_count"` +} + +func (q *Queries) UpdateTopicSizing(ctx context.Context, db DBTX, arg UpdateTopicSizingParams) error { + _, err := db.Exec(ctx, updateTopicSizing, arg.Topic, arg.PartitionSize, arg.PartitionCount) + return err +} + +const upsertTopicPartition = `-- name: UpsertTopicPartition :exec +INSERT INTO /*tmpl*/ topic_partitions /*tmpl*/ ( + topic, + partition_index, + relname, + id_from, + id_to, + partition_size, + status +) VALUES ($1, $2, $3, $4, $5, $6, $7) +ON CONFLICT (topic, partition_index) DO NOTHING +` + +type UpsertTopicPartitionParams struct { + Topic string `json:"topic"` + PartitionIndex int32 `json:"partition_index"` + Relname string `json:"relname"` + IDFrom int64 `json:"id_from"` + IDTo int64 `json:"id_to"` + PartitionSize int64 `json:"partition_size"` + Status string `json:"status"` +} + +func (q *Queries) UpsertTopicPartition(ctx context.Context, db DBTX, arg UpsertTopicPartitionParams) error { + _, err := db.Exec(ctx, upsertTopicPartition, + arg.Topic, + arg.PartitionIndex, + arg.Relname, + arg.IDFrom, + arg.IDTo, + arg.PartitionSize, + arg.Status, + ) + return err } diff --git a/sqlc/schema.sql b/sqlc/schema.sql index 8e4fa84..9e8db18 100644 --- a/sqlc/schema.sql +++ b/sqlc/schema.sql @@ -1,7 +1,62 @@ CREATE TABLE messages ( - id BIGINT GENERATED ALWAYS AS IDENTITY, + id BIGINT NOT NULL, inserted_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, topic TEXT NOT NULL, payload JSONB NOT NULL, - CONSTRAINT v1_message_pkey PRIMARY KEY (topic, id) -); \ No newline at end of file + CONSTRAINT messages_pkey PRIMARY KEY (topic, id) +) PARTITION BY LIST (topic); + +CREATE TABLE topic_meta ( + topic TEXT PRIMARY KEY, + next_id BIGINT NOT NULL DEFAULT 1, + acked_id BIGINT NOT NULL DEFAULT 0, + partition_size BIGINT NOT NULL, + partition_count INT NOT NULL, + fill_seq_name TEXT NOT NULL, + lease_holder TEXT, + lease_expires_at TIMESTAMPTZ, + lease_high_id BIGINT, + writes_since_resize BIGINT NOT NULL DEFAULT 0, + acks_since_resize BIGINT NOT NULL DEFAULT 0, + last_write_at TIMESTAMPTZ, + last_process_at TIMESTAMPTZ, + resized_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE TABLE topic_partitions ( + topic TEXT NOT NULL REFERENCES topic_meta (topic) ON DELETE CASCADE, + partition_index INT NOT NULL, + relname TEXT NOT NULL, + id_from BIGINT NOT NULL, + id_to BIGINT NOT NULL, + partition_size BIGINT NOT NULL, + high_water_id BIGINT NOT NULL DEFAULT 0, + status TEXT NOT NULL CHECK ( + status IN ('future', 'active', 'sealed', 'dropped') + ), + created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (topic, partition_index), + CONSTRAINT topic_partitions_relname_key UNIQUE (relname) +); + +CREATE INDEX topic_partitions_topic_status_idx ON topic_partitions (topic, status); + +ALTER TABLE topic_meta SET ( + fillfactor = 70, + autovacuum_vacuum_scale_factor = 0.0, + autovacuum_vacuum_threshold = 5, + autovacuum_analyze_scale_factor = 0.0, + autovacuum_analyze_threshold = 5, + autovacuum_vacuum_cost_delay = 0, + autovacuum_vacuum_cost_limit = 1000 +); + +ALTER TABLE topic_partitions SET ( + fillfactor = 70, + autovacuum_vacuum_scale_factor = 0.0, + autovacuum_vacuum_threshold = 5, + autovacuum_analyze_scale_factor = 0.0, + autovacuum_analyze_threshold = 5, + autovacuum_vacuum_cost_delay = 0, + autovacuum_vacuum_cost_limit = 1000 +);