diff --git a/go/sql/builder.go b/go/sql/builder.go index 6e41eb4e1..79c19de56 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -7,6 +7,7 @@ package sql import ( "fmt" + "slices" "strconv" "strings" ) @@ -329,6 +330,143 @@ func BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableNa return BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, includeRangeStartValues, transactionalTable, noWait) } +type MoveTableCopySelectQueryBuilder struct { + preparedStatement string + argsMapping []int + argsCount int +} + +func NewMoveTableCopySelectQueryBuilder(sourceDatabaseName, sourceTableName string, sharedColumns *ColumnList, uniqueKey string, uniqueKeyColumns *ColumnList, includeRangeStartValues bool) (*MoveTableCopySelectQueryBuilder, error) { + sourceDatabaseName = EscapeName(sourceDatabaseName) + sourceTableName = EscapeName(sourceTableName) + sharedColumnsNames := sharedColumns.Names() + for i := range sharedColumnsNames { + sharedColumnsNames[i] = EscapeName(sharedColumnsNames[i]) + } + sharedColumnsListing := strings.Join(sharedColumnsNames, ", ") + uniqueKey = EscapeName(uniqueKey) + var minRangeComparisonSign = GreaterThanComparisonSign + if includeRangeStartValues { + minRangeComparisonSign = GreaterThanOrEqualsComparisonSign + } + rangeStartValues := buildColumnsPreparedValues(uniqueKeyColumns) + rangeEndValues := buildColumnsPreparedValues(uniqueKeyColumns) + dummyArgs := make([]any, len(uniqueKeyColumns.Columns())) + for i := range dummyArgs { + dummyArgs[i] = i + } + var argsMapping []int + + rangeStartComparison, rangeExplodedArgs, err := BuildRangeComparison(uniqueKeyColumns.Names(), rangeStartValues, dummyArgs, minRangeComparisonSign) + if err != nil { + return nil, err + } + for _, a := range rangeExplodedArgs { + idx := slices.Index(dummyArgs, a) + if idx == -1 { + return nil, fmt.Errorf("failed to build args mapping, missing argument pointer %v", a) + } + argsMapping = append(argsMapping, idx) + } + + rangeEndComparison, rangeExplodedArgs, err := BuildRangeComparison(uniqueKeyColumns.Names(), rangeEndValues, dummyArgs, LessThanOrEqualsComparisonSign) + if err != nil { + return nil, err + } + for _, a := range rangeExplodedArgs { + idx := slices.Index(dummyArgs, a) + if idx == -1 { + return nil, fmt.Errorf("failed to build args mapping, missing argument pointer %v", a) + } + argsMapping = append(argsMapping, idx+len(dummyArgs)) + } + + stmt := fmt.Sprintf(` + select /* gh-ost %s.%s */ %s + from + %s.%s + force index (%s) + where + (%s and %s) + `, + sourceDatabaseName, sourceTableName, sharedColumnsListing, + sourceDatabaseName, sourceTableName, + uniqueKey, + rangeStartComparison, rangeEndComparison, + ) + return &MoveTableCopySelectQueryBuilder{ + preparedStatement: stmt, + argsMapping: argsMapping, + argsCount: len(dummyArgs) * 2, + }, nil +} + +func (b *MoveTableCopySelectQueryBuilder) BuildQuery(rangeStartArgs, rangeEndArgs []any) (string, []any, error) { + if len(rangeStartArgs)+len(rangeEndArgs) != b.argsCount { + return "", nil, fmt.Errorf("got %d args but expected %d", len(rangeStartArgs)+len(rangeEndArgs), b.argsCount) + } + explodedArgs := make([]any, 0, len(b.argsMapping)) + for _, idx := range b.argsMapping { + if idx < len(rangeStartArgs) { + explodedArgs = append(explodedArgs, rangeStartArgs[idx]) + } else { + explodedArgs = append(explodedArgs, rangeEndArgs[idx-len(rangeStartArgs)]) + } + } + return b.preparedStatement, explodedArgs, nil +} + +type MoveTableCopyInsertQueryBuilder struct { + preparedStatement string + valueListPlaceholder string + valueListSize int +} + +func NewMoveTableCopyInsertQueryBuilder(targetDatabaseName, targetTableName string, sharedColumns *ColumnList) (*MoveTableCopyInsertQueryBuilder, error) { + targetDatabaseName = EscapeName(targetDatabaseName) + targetTableName = EscapeName(targetTableName) + sharedColumnsNames := sharedColumns.Names() + for i := range sharedColumnsNames { + sharedColumnsNames[i] = EscapeName(sharedColumnsNames[i]) + } + sharedColumnsListing := strings.Join(sharedColumnsNames, ", ") + valueListPlaceholder := "(" + strings.Join(buildColumnsPreparedValues(sharedColumns), ", ") + ")" + valueListSize := len(sharedColumnsNames) + stmt := fmt.Sprintf(` + insert /* gh-ost %s.%s */ ignore + into + %s.%s + (%s) + values + `, + targetDatabaseName, targetTableName, + targetDatabaseName, targetTableName, + sharedColumnsListing, + ) + return &MoveTableCopyInsertQueryBuilder{ + preparedStatement: stmt, + valueListPlaceholder: valueListPlaceholder, + valueListSize: valueListSize, + }, nil +} + +func (b *MoveTableCopyInsertQueryBuilder) BuildQuery(values []*ColumnValues) (string, []any, error) { + var explodedArgs []any + var builder strings.Builder + builder.WriteString(b.preparedStatement) + for i, value := range values { + if len(value.AbstractValues()) != b.valueListSize { + return "", nil, fmt.Errorf("got %d column values but expected %d", len(value.AbstractValues()), b.valueListSize) + } + if i > 0 { + builder.WriteString(",\n") + } + builder.WriteString(b.valueListPlaceholder) + explodedArgs = append(explodedArgs, value.AbstractValues()...) + } + return builder.String(), explodedArgs, nil +} + func BuildUniqueKeyRangeEndPreparedQueryViaOffset(databaseName, tableName string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, chunkSize int64, includeRangeStartValues bool, hint string) (result string, explodedArgs []interface{}, err error) { if uniqueKeyColumns.Len() == 0 { return "", explodedArgs, fmt.Errorf("got 0 columns in BuildUniqueKeyRangeEndPreparedQuery") diff --git a/go/sql/builder_test.go b/go/sql/builder_test.go index 0d10b75e7..b0e756f13 100644 --- a/go/sql/builder_test.go +++ b/go/sql/builder_test.go @@ -784,6 +784,233 @@ func TestBuildDMLUpdateQuerySignedUnsigned(t *testing.T) { } } +func TestMoveTableCopySelectQueryBuilder(t *testing.T) { + t.Run("single column unique key", func(t *testing.T) { + sharedColumns := NewColumnList([]string{"id", "name", "position"}) + uniqueKeyColumns := NewColumnList([]string{"id"}) + + builder, err := NewMoveTableCopySelectQueryBuilder("mydb", "tbl", sharedColumns, "PRIMARY", uniqueKeyColumns, true) + require.NoError(t, err) + + query, args, err := builder.BuildQuery([]any{3}, []any{103}) + require.NoError(t, err) + + expected := ` + select /* gh-ost mydb.tbl */ id, name, position + from + mydb.tbl + force index (PRIMARY) + where + (((id > ?) or ((id = ?))) and ((id < ?) or ((id = ?)))) + ` + require.Equal(t, normalizeQuery(expected), normalizeQuery(query)) + require.Equal(t, []any{3, 3, 103, 103}, args) + }) + + t.Run("single column unique key without range start", func(t *testing.T) { + sharedColumns := NewColumnList([]string{"id", "name", "position"}) + uniqueKeyColumns := NewColumnList([]string{"id"}) + + builder, err := NewMoveTableCopySelectQueryBuilder("mydb", "tbl", sharedColumns, "PRIMARY", uniqueKeyColumns, false) + require.NoError(t, err) + + query, args, err := builder.BuildQuery([]any{3}, []any{103}) + require.NoError(t, err) + + expected := ` + select /* gh-ost mydb.tbl */ id, name, position + from + mydb.tbl + force index (PRIMARY) + where + (((id > ?)) and ((id < ?) or ((id = ?)))) + ` + require.Equal(t, normalizeQuery(expected), normalizeQuery(query)) + require.Equal(t, []any{3, 103, 103}, args) + }) + + t.Run("compound unique key", func(t *testing.T) { + sharedColumns := NewColumnList([]string{"id", "name", "position"}) + uniqueKeyColumns := NewColumnList([]string{"name", "position"}) + + builder, err := NewMoveTableCopySelectQueryBuilder("mydb", "tbl", sharedColumns, "name_position_uidx", uniqueKeyColumns, true) + require.NoError(t, err) + + query, args, err := builder.BuildQuery([]any{3, 17}, []any{103, 117}) + require.NoError(t, err) + + expected := ` + select /* gh-ost mydb.tbl */ id, name, position + from + mydb.tbl + force index (name_position_uidx) + where + (((name > ?) or (((name = ?)) AND (position > ?)) or ((name = ?) and (position = ?))) + and ((name < ?) or (((name = ?)) AND (position < ?)) or ((name = ?) and (position = ?)))) + ` + require.Equal(t, normalizeQuery(expected), normalizeQuery(query)) + require.Equal(t, []any{3, 3, 17, 3, 17, 103, 103, 117, 103, 117}, args) + }) + + t.Run("reuses prepared statement across calls", func(t *testing.T) { + sharedColumns := NewColumnList([]string{"id", "name"}) + uniqueKeyColumns := NewColumnList([]string{"id"}) + + builder, err := NewMoveTableCopySelectQueryBuilder("mydb", "tbl", sharedColumns, "PRIMARY", uniqueKeyColumns, true) + require.NoError(t, err) + + query1, args1, err := builder.BuildQuery([]any{1}, []any{10}) + require.NoError(t, err) + query2, args2, err := builder.BuildQuery([]any{11}, []any{20}) + require.NoError(t, err) + + require.Equal(t, query1, query2) + require.Equal(t, []any{1, 1, 10, 10}, args1) + require.Equal(t, []any{11, 11, 20, 20}, args2) + }) + + t.Run("wrong args count", func(t *testing.T) { + sharedColumns := NewColumnList([]string{"id", "name"}) + uniqueKeyColumns := NewColumnList([]string{"id"}) + + builder, err := NewMoveTableCopySelectQueryBuilder("mydb", "tbl", sharedColumns, "PRIMARY", uniqueKeyColumns, true) + require.NoError(t, err) + + _, _, err = builder.BuildQuery([]any{1, 2}, []any{10}) + require.Error(t, err) + }) +} + +func BenchmarkMoveTableCopySelectQueryBuilderBuildQuery(b *testing.B) { + sharedColumns := NewColumnList([]string{"id", "name", "position"}) + uniqueKeyColumns := NewColumnList([]string{"name", "position"}) + + builder, err := NewMoveTableCopySelectQueryBuilder("mydb", "tbl", sharedColumns, "name_position_uidx", uniqueKeyColumns, true) + if err != nil { + b.Fatal(err) + } + + rangeStartArgs := []any{3, 17} + rangeEndArgs := []any{103, 117} + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _, err := builder.BuildQuery(rangeStartArgs, rangeEndArgs) + if err != nil { + b.Fatal(err) + } + } +} + +func TestMoveTableCopyInsertQueryBuilder(t *testing.T) { + t.Run("single row", func(t *testing.T) { + sharedColumns := NewColumnList([]string{"id", "name", "position"}) + + builder, err := NewMoveTableCopyInsertQueryBuilder("mydb", "ghost", sharedColumns) + require.NoError(t, err) + + values := []*ColumnValues{ + ToColumnValues([]interface{}{1, "alice", 10}), + } + query, args, err := builder.BuildQuery(values) + require.NoError(t, err) + + expected := ` + insert /* gh-ost mydb.ghost */ ignore + into + mydb.ghost + (id, name, position) + values + (?, ?, ?) + ` + require.Equal(t, normalizeQuery(expected), normalizeQuery(query)) + require.Equal(t, []any{1, "alice", 10}, args) + }) + + t.Run("multiple rows", func(t *testing.T) { + sharedColumns := NewColumnList([]string{"id", "name", "position"}) + + builder, err := NewMoveTableCopyInsertQueryBuilder("mydb", "ghost", sharedColumns) + require.NoError(t, err) + + values := []*ColumnValues{ + ToColumnValues([]interface{}{1, "alice", 10}), + ToColumnValues([]interface{}{2, "bob", 20}), + ToColumnValues([]interface{}{3, "carol", 30}), + } + query, args, err := builder.BuildQuery(values) + require.NoError(t, err) + + expected := ` + insert /* gh-ost mydb.ghost */ ignore + into + mydb.ghost + (id, name, position) + values + (?, ?, ?), + (?, ?, ?), + (?, ?, ?) + ` + require.Equal(t, normalizeQuery(expected), normalizeQuery(query)) + require.Equal(t, []any{1, "alice", 10, 2, "bob", 20, 3, "carol", 30}, args) + }) + + t.Run("wrong column count", func(t *testing.T) { + sharedColumns := NewColumnList([]string{"id", "name", "position"}) + + builder, err := NewMoveTableCopyInsertQueryBuilder("mydb", "ghost", sharedColumns) + require.NoError(t, err) + + values := []*ColumnValues{ + ToColumnValues([]interface{}{1, "alice"}), + } + _, _, err = builder.BuildQuery(values) + require.Error(t, err) + }) + + t.Run("reuses prepared statement", func(t *testing.T) { + sharedColumns := NewColumnList([]string{"id", "name"}) + + builder, err := NewMoveTableCopyInsertQueryBuilder("mydb", "ghost", sharedColumns) + require.NoError(t, err) + + values1 := []*ColumnValues{ToColumnValues([]interface{}{1, "a"})} + values2 := []*ColumnValues{ToColumnValues([]interface{}{2, "b"})} + + query1, args1, err := builder.BuildQuery(values1) + require.NoError(t, err) + query2, args2, err := builder.BuildQuery(values2) + require.NoError(t, err) + + require.Equal(t, query1, query2) + require.Equal(t, []any{1, "a"}, args1) + require.Equal(t, []any{2, "b"}, args2) + }) +} + +func BenchmarkMoveTableCopyInsertQueryBuilderBuildQuery(b *testing.B) { + sharedColumns := NewColumnList([]string{"id", "name", "position"}) + + builder, err := NewMoveTableCopyInsertQueryBuilder("mydb", "ghost", sharedColumns) + if err != nil { + b.Fatal(err) + } + + values := []*ColumnValues{ + ToColumnValues([]interface{}{1, "alice", 10}), + ToColumnValues([]interface{}{2, "bob", 20}), + ToColumnValues([]interface{}{3, "carol", 30}), + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _, err := builder.BuildQuery(values) + if err != nil { + b.Fatal(err) + } + } +} + func TestCheckpointQueryBuilder(t *testing.T) { databaseName := "mydb" tableName := "_tbl_ghk"