Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 138 additions & 0 deletions go/sql/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package sql

import (
"fmt"
"slices"
"strconv"
"strings"
)
Expand Down Expand Up @@ -329,6 +330,143 @@ func BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableNa
return BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, includeRangeStartValues, transactionalTable, noWait)
}

type MoveTableCopySelectQueryBuilder struct {
preparedStatement string
argsMapping []int
argsCount int
}

func NewMoveTableCopySelectQueryBuilder(sourceDatabaseName, sourceTableName string, sharedColumns *ColumnList, uniqueKey string, uniqueKeyColumns *ColumnList, includeRangeStartValues bool) (*MoveTableCopySelectQueryBuilder, error) {
sourceDatabaseName = EscapeName(sourceDatabaseName)
sourceTableName = EscapeName(sourceTableName)
sharedColumnsNames := sharedColumns.Names()
for i := range sharedColumnsNames {
sharedColumnsNames[i] = EscapeName(sharedColumnsNames[i])
}
sharedColumnsListing := strings.Join(sharedColumnsNames, ", ")
uniqueKey = EscapeName(uniqueKey)
var minRangeComparisonSign = GreaterThanComparisonSign
if includeRangeStartValues {
minRangeComparisonSign = GreaterThanOrEqualsComparisonSign
}
rangeStartValues := buildColumnsPreparedValues(uniqueKeyColumns)
rangeEndValues := buildColumnsPreparedValues(uniqueKeyColumns)
dummyArgs := make([]any, len(uniqueKeyColumns.Columns()))
for i := range dummyArgs {
dummyArgs[i] = i
}
var argsMapping []int

rangeStartComparison, rangeExplodedArgs, err := BuildRangeComparison(uniqueKeyColumns.Names(), rangeStartValues, dummyArgs, minRangeComparisonSign)
if err != nil {
return nil, err
}
for _, a := range rangeExplodedArgs {
idx := slices.Index(dummyArgs, a)
if idx == -1 {
return nil, fmt.Errorf("failed to build args mapping, missing argument pointer %v", a)
}
argsMapping = append(argsMapping, idx)
}

rangeEndComparison, rangeExplodedArgs, err := BuildRangeComparison(uniqueKeyColumns.Names(), rangeEndValues, dummyArgs, LessThanOrEqualsComparisonSign)
if err != nil {
return nil, err
}
for _, a := range rangeExplodedArgs {
idx := slices.Index(dummyArgs, a)
if idx == -1 {
return nil, fmt.Errorf("failed to build args mapping, missing argument pointer %v", a)
}
argsMapping = append(argsMapping, idx+len(dummyArgs))
}

stmt := fmt.Sprintf(`
select /* gh-ost %s.%s */ %s
from
%s.%s
force index (%s)
where
(%s and %s)
`,
sourceDatabaseName, sourceTableName, sharedColumnsListing,
sourceDatabaseName, sourceTableName,
uniqueKey,
rangeStartComparison, rangeEndComparison,
)
return &MoveTableCopySelectQueryBuilder{
preparedStatement: stmt,
argsMapping: argsMapping,
argsCount: len(dummyArgs) * 2,
}, nil
}

func (b *MoveTableCopySelectQueryBuilder) BuildQuery(rangeStartArgs, rangeEndArgs []any) (string, []any, error) {
if len(rangeStartArgs)+len(rangeEndArgs) != b.argsCount {
return "", nil, fmt.Errorf("got %d args but expected %d", len(rangeStartArgs)+len(rangeEndArgs), b.argsCount)
}
explodedArgs := make([]any, 0, len(b.argsMapping))
for _, idx := range b.argsMapping {
if idx < len(rangeStartArgs) {
explodedArgs = append(explodedArgs, rangeStartArgs[idx])
} else {
explodedArgs = append(explodedArgs, rangeEndArgs[idx-len(rangeStartArgs)])
}
}
return b.preparedStatement, explodedArgs, nil
}

type MoveTableCopyInsertQueryBuilder struct {
preparedStatement string
valueListPlaceholder string
valueListSize int
}

func NewMoveTableCopyInsertQueryBuilder(targetDatabaseName, targetTableName string, sharedColumns *ColumnList) (*MoveTableCopyInsertQueryBuilder, error) {
targetDatabaseName = EscapeName(targetDatabaseName)
targetTableName = EscapeName(targetTableName)
sharedColumnsNames := sharedColumns.Names()
for i := range sharedColumnsNames {
sharedColumnsNames[i] = EscapeName(sharedColumnsNames[i])
}
sharedColumnsListing := strings.Join(sharedColumnsNames, ", ")
valueListPlaceholder := "(" + strings.Join(buildColumnsPreparedValues(sharedColumns), ", ") + ")"
valueListSize := len(sharedColumnsNames)
stmt := fmt.Sprintf(`
insert /* gh-ost %s.%s */ ignore
into
%s.%s
(%s)
values
`,
targetDatabaseName, targetTableName,
targetDatabaseName, targetTableName,
sharedColumnsListing,
)
return &MoveTableCopyInsertQueryBuilder{
preparedStatement: stmt,
valueListPlaceholder: valueListPlaceholder,
valueListSize: valueListSize,
}, nil
}

func (b *MoveTableCopyInsertQueryBuilder) BuildQuery(values []*ColumnValues) (string, []any, error) {
var explodedArgs []any
var builder strings.Builder
builder.WriteString(b.preparedStatement)
for i, value := range values {
if len(value.AbstractValues()) != b.valueListSize {
return "", nil, fmt.Errorf("got %d column values but expected %d", len(value.AbstractValues()), b.valueListSize)
}
if i > 0 {
builder.WriteString(",\n")
}
builder.WriteString(b.valueListPlaceholder)
explodedArgs = append(explodedArgs, value.AbstractValues()...)
}
return builder.String(), explodedArgs, nil
}

func BuildUniqueKeyRangeEndPreparedQueryViaOffset(databaseName, tableName string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, chunkSize int64, includeRangeStartValues bool, hint string) (result string, explodedArgs []interface{}, err error) {
if uniqueKeyColumns.Len() == 0 {
return "", explodedArgs, fmt.Errorf("got 0 columns in BuildUniqueKeyRangeEndPreparedQuery")
Expand Down
227 changes: 227 additions & 0 deletions go/sql/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,233 @@ func TestBuildDMLUpdateQuerySignedUnsigned(t *testing.T) {
}
}

func TestMoveTableCopySelectQueryBuilder(t *testing.T) {
t.Run("single column unique key", func(t *testing.T) {
sharedColumns := NewColumnList([]string{"id", "name", "position"})
uniqueKeyColumns := NewColumnList([]string{"id"})

builder, err := NewMoveTableCopySelectQueryBuilder("mydb", "tbl", sharedColumns, "PRIMARY", uniqueKeyColumns, true)
require.NoError(t, err)

query, args, err := builder.BuildQuery([]any{3}, []any{103})
require.NoError(t, err)

expected := `
select /* gh-ost mydb.tbl */ id, name, position
from
mydb.tbl
force index (PRIMARY)
where
(((id > ?) or ((id = ?))) and ((id < ?) or ((id = ?))))
`
require.Equal(t, normalizeQuery(expected), normalizeQuery(query))
require.Equal(t, []any{3, 3, 103, 103}, args)
})

t.Run("single column unique key without range start", func(t *testing.T) {
sharedColumns := NewColumnList([]string{"id", "name", "position"})
uniqueKeyColumns := NewColumnList([]string{"id"})

builder, err := NewMoveTableCopySelectQueryBuilder("mydb", "tbl", sharedColumns, "PRIMARY", uniqueKeyColumns, false)
require.NoError(t, err)

query, args, err := builder.BuildQuery([]any{3}, []any{103})
require.NoError(t, err)

expected := `
select /* gh-ost mydb.tbl */ id, name, position
from
mydb.tbl
force index (PRIMARY)
where
(((id > ?)) and ((id < ?) or ((id = ?))))
`
require.Equal(t, normalizeQuery(expected), normalizeQuery(query))
require.Equal(t, []any{3, 103, 103}, args)
})

t.Run("compound unique key", func(t *testing.T) {
sharedColumns := NewColumnList([]string{"id", "name", "position"})
uniqueKeyColumns := NewColumnList([]string{"name", "position"})

builder, err := NewMoveTableCopySelectQueryBuilder("mydb", "tbl", sharedColumns, "name_position_uidx", uniqueKeyColumns, true)
require.NoError(t, err)

query, args, err := builder.BuildQuery([]any{3, 17}, []any{103, 117})
require.NoError(t, err)

expected := `
select /* gh-ost mydb.tbl */ id, name, position
from
mydb.tbl
force index (name_position_uidx)
where
(((name > ?) or (((name = ?)) AND (position > ?)) or ((name = ?) and (position = ?)))
and ((name < ?) or (((name = ?)) AND (position < ?)) or ((name = ?) and (position = ?))))
`
require.Equal(t, normalizeQuery(expected), normalizeQuery(query))
require.Equal(t, []any{3, 3, 17, 3, 17, 103, 103, 117, 103, 117}, args)
})

t.Run("reuses prepared statement across calls", func(t *testing.T) {
sharedColumns := NewColumnList([]string{"id", "name"})
uniqueKeyColumns := NewColumnList([]string{"id"})

builder, err := NewMoveTableCopySelectQueryBuilder("mydb", "tbl", sharedColumns, "PRIMARY", uniqueKeyColumns, true)
require.NoError(t, err)

query1, args1, err := builder.BuildQuery([]any{1}, []any{10})
require.NoError(t, err)
query2, args2, err := builder.BuildQuery([]any{11}, []any{20})
require.NoError(t, err)

require.Equal(t, query1, query2)
require.Equal(t, []any{1, 1, 10, 10}, args1)
require.Equal(t, []any{11, 11, 20, 20}, args2)
})

t.Run("wrong args count", func(t *testing.T) {
sharedColumns := NewColumnList([]string{"id", "name"})
uniqueKeyColumns := NewColumnList([]string{"id"})

builder, err := NewMoveTableCopySelectQueryBuilder("mydb", "tbl", sharedColumns, "PRIMARY", uniqueKeyColumns, true)
require.NoError(t, err)

_, _, err = builder.BuildQuery([]any{1, 2}, []any{10})
require.Error(t, err)
})
}

func BenchmarkMoveTableCopySelectQueryBuilderBuildQuery(b *testing.B) {
sharedColumns := NewColumnList([]string{"id", "name", "position"})
uniqueKeyColumns := NewColumnList([]string{"name", "position"})

builder, err := NewMoveTableCopySelectQueryBuilder("mydb", "tbl", sharedColumns, "name_position_uidx", uniqueKeyColumns, true)
if err != nil {
b.Fatal(err)
}

rangeStartArgs := []any{3, 17}
rangeEndArgs := []any{103, 117}

b.ResetTimer()
for i := 0; i < b.N; i++ {
_, _, err := builder.BuildQuery(rangeStartArgs, rangeEndArgs)
if err != nil {
b.Fatal(err)
}
}
}

func TestMoveTableCopyInsertQueryBuilder(t *testing.T) {
t.Run("single row", func(t *testing.T) {
sharedColumns := NewColumnList([]string{"id", "name", "position"})

builder, err := NewMoveTableCopyInsertQueryBuilder("mydb", "ghost", sharedColumns)
require.NoError(t, err)

values := []*ColumnValues{
ToColumnValues([]interface{}{1, "alice", 10}),
}
query, args, err := builder.BuildQuery(values)
require.NoError(t, err)

expected := `
insert /* gh-ost mydb.ghost */ ignore
into
mydb.ghost
(id, name, position)
values
(?, ?, ?)
`
require.Equal(t, normalizeQuery(expected), normalizeQuery(query))
require.Equal(t, []any{1, "alice", 10}, args)
})

t.Run("multiple rows", func(t *testing.T) {
sharedColumns := NewColumnList([]string{"id", "name", "position"})

builder, err := NewMoveTableCopyInsertQueryBuilder("mydb", "ghost", sharedColumns)
require.NoError(t, err)

values := []*ColumnValues{
ToColumnValues([]interface{}{1, "alice", 10}),
ToColumnValues([]interface{}{2, "bob", 20}),
ToColumnValues([]interface{}{3, "carol", 30}),
}
query, args, err := builder.BuildQuery(values)
require.NoError(t, err)

expected := `
insert /* gh-ost mydb.ghost */ ignore
into
mydb.ghost
(id, name, position)
values
(?, ?, ?),
(?, ?, ?),
(?, ?, ?)
`
require.Equal(t, normalizeQuery(expected), normalizeQuery(query))
require.Equal(t, []any{1, "alice", 10, 2, "bob", 20, 3, "carol", 30}, args)
})

t.Run("wrong column count", func(t *testing.T) {
sharedColumns := NewColumnList([]string{"id", "name", "position"})

builder, err := NewMoveTableCopyInsertQueryBuilder("mydb", "ghost", sharedColumns)
require.NoError(t, err)

values := []*ColumnValues{
ToColumnValues([]interface{}{1, "alice"}),
}
_, _, err = builder.BuildQuery(values)
require.Error(t, err)
})

t.Run("reuses prepared statement", func(t *testing.T) {
sharedColumns := NewColumnList([]string{"id", "name"})

builder, err := NewMoveTableCopyInsertQueryBuilder("mydb", "ghost", sharedColumns)
require.NoError(t, err)

values1 := []*ColumnValues{ToColumnValues([]interface{}{1, "a"})}
values2 := []*ColumnValues{ToColumnValues([]interface{}{2, "b"})}

query1, args1, err := builder.BuildQuery(values1)
require.NoError(t, err)
query2, args2, err := builder.BuildQuery(values2)
require.NoError(t, err)

require.Equal(t, query1, query2)
require.Equal(t, []any{1, "a"}, args1)
require.Equal(t, []any{2, "b"}, args2)
})
}

func BenchmarkMoveTableCopyInsertQueryBuilderBuildQuery(b *testing.B) {
sharedColumns := NewColumnList([]string{"id", "name", "position"})

builder, err := NewMoveTableCopyInsertQueryBuilder("mydb", "ghost", sharedColumns)
if err != nil {
b.Fatal(err)
}

values := []*ColumnValues{
ToColumnValues([]interface{}{1, "alice", 10}),
ToColumnValues([]interface{}{2, "bob", 20}),
ToColumnValues([]interface{}{3, "carol", 30}),
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
_, _, err := builder.BuildQuery(values)
if err != nil {
b.Fatal(err)
}
}
}

func TestCheckpointQueryBuilder(t *testing.T) {
databaseName := "mydb"
tableName := "_tbl_ghk"
Expand Down
Loading