Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ Thumbs.db
# Environment files
.env
.env.local
.cache
24 changes: 18 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ if err != nil {

## Schema

By default, `NewOutbox` runs migrations and creates an outbox table in the schema `outbox.messages`. This can be overwritten via:
By default, `NewOutbox` runs migrations and creates partitioned outbox tables in the `outbox` schema. `messages` is partitioned by topic and then by per-topic ID ranges; flushed rows are append-only until a sealed, fully-acked partition can be dropped. The schema can be overwritten via:

```go
outbox, err := pgoutbox.NewOutbox(pool, pgoutbox.WithSchema("my_schema"))
Expand All @@ -62,6 +62,17 @@ if err := pgoutbox.Migrate(ctx, pool, pgoutbox.WithSchema("my_schema")); err !=
}
```

Partition sizing defaults to 100,000 rows per range partition and two future partitions per topic. Both values are persisted in `topic_meta` per topic and can be configured for newly-created topics:

```go
outbox, err := pgoutbox.NewOutbox(pool,
pgoutbox.WithDefaultPartitionSize(500_000),
pgoutbox.WithDefaultPartitionCount(4),
)
```

The outbox also keeps an approximate sequence-backed fill counter for each topic. It is used only to create future partitions early and to guide sizing; delivery safety is governed by the transactional per-topic ID cursor and `acked_id`.

## Multiple topics and flushers

It's easy to configure multiple destinations using topics registered for each flusher:
Expand Down Expand Up @@ -94,19 +105,20 @@ outbox.ProcessMessages(ctx, "shipments")

## Benchmarks

You can run benchmarks locally; for example, to write and flush 100k messages, you can run:
You can run benchmarks locally; for example, to write and flush 100k messages without also running the test suite, you can run:

```
go test -bench=. -benchtime=100000x
go test -run '^$' -bench=. -benchtime=100000x
```

On a local Macbook with an M3 Max core, this results in `8492 msgs/sec`:
On a local Macbook with an M3 Max core, older non-partitioned versions measured around `8492 msgs/sec`; current benchmark output includes both default partitioning and frequent-rollover partitioning cases:

```
$ go test -bench=. -benchtime=100000x
$ go test -run '^$' -bench=. -benchtime=100000x
goos: darwin
goarch: arm64
pkg: github.com/hatchet-dev/pgoutbox
cpu: Apple M3 Max
BenchmarkOutbox_WriteAndPublishThroughput-14 100000 117757 ns/op 8492 msgs/sec
BenchmarkOutbox_WriteAndPublishThroughput/DefaultPartitions-14
BenchmarkOutbox_WriteAndPublishThroughput/FrequentPartitionRollover-14
```
263 changes: 263 additions & 0 deletions internal/partitions/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
package partitions

import (
"context"
"errors"
"fmt"
"strings"

"github.com/hatchet-dev/pgoutbox/sqlc"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
)

type Manager struct {
schema string
}

type topicPartition struct {
index int
from int64
to int64
}

func NewManager(schema string) *Manager {
return &Manager{schema: schema}
}

func (m *Manager) schemaIdent() string {
return pgx.Identifier{m.schema}.Sanitize()
}

func (m *Manager) qualified(rel string) string {
return m.schemaIdent() + "." + pgx.Identifier{rel}.Sanitize()
}

func (m *Manager) LockTopic(ctx context.Context, db sqlc.DBTX, topic string) error {
_, err := db.Exec(ctx, "SELECT pg_advisory_xact_lock(hashtextextended($1, 0))", "pgoutbox:"+m.schema+":"+topic)
return err
}

func (m *Manager) EnsureHorizon(
ctx context.Context,
db sqlc.DBTX,
q *sqlc.Queries,
topic string,
slug string,
partitionSize int64,
partitionCount int,
startID int64,
endID int64,
fillHigh int64,
) error {
highID := endID
if fillHigh > highID {
highID = fillHigh
}

if err := m.ensureListPartition(ctx, db, topic, slug); err != nil {
return err
}

parts, err := m.loadTopicPartitions(ctx, db, topic)
if err != nil {
return err
}

if len(parts) == 0 {
parts = append(parts, topicPartition{index: 0, from: 1, to: 1 + partitionSize})
if err := m.createRangePartition(ctx, db, q, topic, slug, parts[0], partitionSize, "active"); err != nil {
return err
}
}

highIdx := partitionIndexContaining(parts, highID)
for highIdx == -1 || parts[len(parts)-1].index < highIdx+partitionCount {
last := parts[len(parts)-1]
next := topicPartition{
index: last.index + 1,
from: last.to,
to: last.to + partitionSize,
}
if err := m.createRangePartition(ctx, db, q, topic, slug, next, partitionSize, "future"); err != nil {
return err
}
parts = append(parts, next)
highIdx = partitionIndexContaining(parts, highID)
}

endIdx := partitionIndexContaining(parts, endID)

for _, part := range parts {
if part.to <= startID || part.from > endID {
continue
}
highWater := endID
if highWater >= part.to {
highWater = part.to - 1
}
if err := q.UpdateTopicPartitionHighWater(ctx, db, sqlc.UpdateTopicPartitionHighWaterParams{
Topic: topic,
PartitionIndex: int32(part.index),
HighWaterID: highWater,
}); err != nil {
return fmt.Errorf("update high water for partition %d: %w", part.index, err)
}
}

if endIdx > 0 {
if err := q.SealTopicPartitionsUpTo(ctx, db, sqlc.SealTopicPartitionsUpToParams{
Topic: topic,
PartitionIndex: int32(endIdx),
}); err != nil {
return fmt.Errorf("seal prior partitions: %w", err)
}
}

return nil
}

func (m *Manager) createRangePartition(
ctx context.Context,
db sqlc.DBTX,
q *sqlc.Queries,
topic string,
slug string,
part topicPartition,
partitionSize int64,
status string,
) error {
listRel := ListPartitionRelname(slug)
rangeRel := RangePartitionRelname(slug, part.index)

if err := m.ensureRangePartition(ctx, db, listRel, rangeRel, part.from, part.to); err != nil {
return err
}

if err := q.UpsertTopicPartition(ctx, db, sqlc.UpsertTopicPartitionParams{
Topic: topic,
PartitionIndex: int32(part.index),
Relname: rangeRel,
IDFrom: part.from,
IDTo: part.to,
PartitionSize: partitionSize,
Status: status,
}); err != nil {
return fmt.Errorf("record topic partition %d: %w", part.index, err)
}

return nil
}

func (m *Manager) loadTopicPartitions(ctx context.Context, db sqlc.DBTX, topic string) ([]topicPartition, error) {
sql := fmt.Sprintf(
"SELECT partition_index, id_from, id_to FROM %s.topic_partitions WHERE topic = $1 AND status <> 'dropped' ORDER BY partition_index ASC",
m.schemaIdent(),
)
rows, err := db.Query(ctx, sql, topic)
if err != nil {
return nil, err
}
defer rows.Close()

var parts []topicPartition
for rows.Next() {
var part topicPartition
if err := rows.Scan(&part.index, &part.from, &part.to); err != nil {
return nil, err
}
parts = append(parts, part)
}
if err := rows.Err(); err != nil {
return nil, err
}
return parts, nil
}

func partitionIndexContaining(parts []topicPartition, id int64) int {
if id <= 0 && len(parts) > 0 {
return parts[0].index
}
for _, part := range parts {
if part.from <= id && id < part.to {
return part.index
}
}
return -1
}

func (m *Manager) ensureListPartition(ctx context.Context, db sqlc.DBTX, topic, slug string) error {
listRel := ListPartitionRelname(slug)
sql := fmt.Sprintf(
"CREATE TABLE IF NOT EXISTS %s PARTITION OF %s.messages FOR VALUES IN (%s) PARTITION BY RANGE (id)",
m.qualified(listRel),
m.schemaIdent(),
quoteLiteral(topic),
)

_, err := db.Exec(ctx, sql)
return ignoreDuplicateTable(err)
}

func (m *Manager) ensureRangePartition(
ctx context.Context,
db sqlc.DBTX,
listRel, rangeRel string,
from, to int64,
) error {
sql := fmt.Sprintf(
"CREATE TABLE IF NOT EXISTS %s PARTITION OF %s FOR VALUES FROM (%d) TO (%d)",
m.qualified(rangeRel),
m.qualified(listRel),
from,
to,
)
_, err := db.Exec(ctx, sql)
return ignoreDuplicateTable(err)
}

func (m *Manager) EnsureFillSequence(ctx context.Context, db sqlc.DBTX, seqName string) error {
sql := fmt.Sprintf("CREATE SEQUENCE IF NOT EXISTS %s CACHE 100", m.qualified(seqName))
_, err := db.Exec(ctx, sql)
return ignoreDuplicateTable(err)
}

func (m *Manager) AdvanceFillSequence(ctx context.Context, db sqlc.DBTX, seqName string, count int) (int64, error) {
if count <= 0 {
return 0, nil
}

var value int64
seq := quoteLiteral(m.qualified(seqName))
sql := fmt.Sprintf("SELECT nextval(%s::regclass) FROM generate_series(1, %d) ORDER BY 1 DESC LIMIT 1", seq, count)
if err := db.QueryRow(ctx, sql).Scan(&value); err != nil {
return 0, err
}
return value, nil
}

func (m *Manager) DropPartition(ctx context.Context, db sqlc.DBTX, relname string) error {
_, err := db.Exec(ctx, "DROP TABLE IF EXISTS "+m.qualified(relname))
return err
}

func ignoreDuplicateTable(err error) error {
if err == nil {
return nil
}
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) {
switch pgErr.Code {
case "42P07", "23505":
return nil
}
}
if strings.Contains(err.Error(), "already exists") {
return nil
}
return err
}

func quoteLiteral(s string) string {
return "'" + strings.ReplaceAll(s, "'", "''") + "'"
}
44 changes: 44 additions & 0 deletions internal/partitions/names.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package partitions

import (
"crypto/md5"
"encoding/hex"
"fmt"
)

func TopicSlug(topic string) string {
sum := md5.Sum([]byte(topic))
return hex.EncodeToString(sum[:])[:16]
}

func FillSeqName(slug string) string {
return "fill_seq_" + slug
}

func ListPartitionRelname(slug string) string {
return "messages_t_" + slug
}

func RangePartitionRelname(slug string, index int) string {
return fmt.Sprintf("messages_t_%s_p%d", slug, index)
}

func PartitionBounds(index int, size int64) (from, to int64) {
from = int64(index)*size + 1
to = int64(index+1)*size + 1
return from, to
}

func PartitionIndexForID(id, size int64) int {
if id <= 0 {
return 0
}
return int((id - 1) / size)
}

func MaxPartitionIndex(id, size int64) int {
if id <= 0 {
return 0
}
return int((id - 1) / size)
}
Loading