diff --git a/doc/hooks.md b/doc/hooks.md index b033d3212..99c941b48 100644 --- a/doc/hooks.md +++ b/doc/hooks.md @@ -49,6 +49,7 @@ The full list of supported hooks is best found in code: [hooks.go](https://githu - `gh-ost-on-before-cut-over` - `gh-ost-on-success` - `gh-ost-on-failure` +- `gh-ost-on-batch-copy-retry` ### Context @@ -82,6 +83,7 @@ The following variable are available on particular hooks: - `GH_OST_COMMAND` is only available in `gh-ost-on-interactive-command` - `GH_OST_STATUS` is only available in `gh-ost-on-status` +- `GH_OST_LAST_BATCH_COPY_ERROR` is only available in `gh-ost-on-batch-copy-retry` ### Examples diff --git a/go/base/context.go b/go/base/context.go index c6ccb800c..891e27fef 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -611,6 +611,13 @@ func (this *MigrationContext) GetIteration() int64 { return atomic.LoadInt64(&this.Iteration) } +func (this *MigrationContext) SetNextIterationRangeMinValues() { + this.MigrationIterationRangeMinValues = this.MigrationIterationRangeMaxValues + if this.MigrationIterationRangeMinValues == nil { + this.MigrationIterationRangeMinValues = this.MigrationRangeMinValues + } +} + func (this *MigrationContext) MarkPointOfInterest() int64 { this.pointOfInterestTimeMutex.Lock() defer this.pointOfInterestTimeMutex.Unlock() @@ -970,9 +977,8 @@ func (this *MigrationContext) GetGhostTriggerName(triggerName string) string { return triggerName + this.TriggerSuffix } -// validateGhostTriggerLength check if the ghost trigger name length is not more than 64 characters +// ValidateGhostTriggerLengthBelowMaxLength checks if the given trigger name (already transformed +// by GetGhostTriggerName) does not exceed the maximum allowed length. func (this *MigrationContext) ValidateGhostTriggerLengthBelowMaxLength(triggerName string) bool { - ghostTriggerName := this.GetGhostTriggerName(triggerName) - - return utf8.RuneCountInString(ghostTriggerName) <= mysql.MaxTableNameLength + return utf8.RuneCountInString(triggerName) <= mysql.MaxTableNameLength } diff --git a/go/base/context_test.go b/go/base/context_test.go index f87bc9f13..f8bce6f27 100644 --- a/go/base/context_test.go +++ b/go/base/context_test.go @@ -86,38 +86,69 @@ func TestGetTriggerNames(t *testing.T) { } func TestValidateGhostTriggerLengthBelowMaxLength(t *testing.T) { + // Tests simulate the real call pattern: GetGhostTriggerName first, then validate the result. { + // Short trigger name with suffix appended: well under 64 chars context := NewMigrationContext() context.TriggerSuffix = "_gho" - require.True(t, context.ValidateGhostTriggerLengthBelowMaxLength("my_trigger")) + ghostName := context.GetGhostTriggerName("my_trigger") // "my_trigger_gho" = 14 chars + require.True(t, context.ValidateGhostTriggerLengthBelowMaxLength(ghostName)) } { + // 64-char original + "_ghost" suffix = 70 chars → exceeds limit context := NewMigrationContext() context.TriggerSuffix = "_ghost" - require.False(t, context.ValidateGhostTriggerLengthBelowMaxLength(strings.Repeat("my_trigger_ghost", 4))) // 64 characters + "_ghost" + ghostName := context.GetGhostTriggerName(strings.Repeat("my_trigger_ghost", 4)) // 64 + 6 = 70 + require.False(t, context.ValidateGhostTriggerLengthBelowMaxLength(ghostName)) } { + // 48-char original + "_ghost" suffix = 54 chars → valid context := NewMigrationContext() context.TriggerSuffix = "_ghost" - require.True(t, context.ValidateGhostTriggerLengthBelowMaxLength(strings.Repeat("my_trigger_ghost", 3))) // 48 characters + "_ghost" + ghostName := context.GetGhostTriggerName(strings.Repeat("my_trigger_ghost", 3)) // 48 + 6 = 54 + require.True(t, context.ValidateGhostTriggerLengthBelowMaxLength(ghostName)) } { + // RemoveTriggerSuffix: 64-char name ending in "_ghost" → suffix removed → 58 chars → valid context := NewMigrationContext() context.TriggerSuffix = "_ghost" context.RemoveTriggerSuffix = true - require.True(t, context.ValidateGhostTriggerLengthBelowMaxLength(strings.Repeat("my_trigger_ghost", 4))) // 64 characters + "_ghost" removed + ghostName := context.GetGhostTriggerName(strings.Repeat("my_trigger_ghost", 4)) // suffix removed → 58 + require.True(t, context.ValidateGhostTriggerLengthBelowMaxLength(ghostName)) } { + // RemoveTriggerSuffix: name doesn't end in suffix → suffix appended → 65 + 6 = 71 chars → exceeds context := NewMigrationContext() context.TriggerSuffix = "_ghost" context.RemoveTriggerSuffix = true - require.False(t, context.ValidateGhostTriggerLengthBelowMaxLength(strings.Repeat("my_trigger_ghost", 4)+"X")) // 65 characters + "_ghost" not removed + ghostName := context.GetGhostTriggerName(strings.Repeat("my_trigger_ghost", 4) + "X") // no match, appended → 71 + require.False(t, context.ValidateGhostTriggerLengthBelowMaxLength(ghostName)) } { + // RemoveTriggerSuffix: 70-char name ending in "_ghost" → suffix removed → 64 chars → exactly at limit → valid context := NewMigrationContext() context.TriggerSuffix = "_ghost" context.RemoveTriggerSuffix = true - require.True(t, context.ValidateGhostTriggerLengthBelowMaxLength(strings.Repeat("my_trigger_ghost", 4)+"_ghost")) // 70 characters + last "_ghost" removed + ghostName := context.GetGhostTriggerName(strings.Repeat("my_trigger_ghost", 4) + "_ghost") // suffix removed → 64 + require.True(t, context.ValidateGhostTriggerLengthBelowMaxLength(ghostName)) + } + { + // Edge case: exactly 64 chars after transformation → valid (boundary test) + context := NewMigrationContext() + context.TriggerSuffix = "_ght" + originalName := strings.Repeat("x", 60) // 60 chars + ghostName := context.GetGhostTriggerName(originalName) // 60 + 4 = 64 + require.Equal(t, 64, len(ghostName)) + require.True(t, context.ValidateGhostTriggerLengthBelowMaxLength(ghostName)) + } + { + // Edge case: 65 chars after transformation → exceeds (boundary test) + context := NewMigrationContext() + context.TriggerSuffix = "_ght" + originalName := strings.Repeat("x", 61) // 61 chars + ghostName := context.GetGhostTriggerName(originalName) // 61 + 4 = 65 + require.Equal(t, 65, len(ghostName)) + require.False(t, context.ValidateGhostTriggerLengthBelowMaxLength(ghostName)) } } diff --git a/go/logic/applier.go b/go/logic/applier.go index 68d11171b..58761d844 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -819,17 +819,6 @@ func (this *Applier) ReadMigrationRangeValues() error { // no further chunk to work through, i.e. we're past the last chunk and are done with // iterating the range (and thus done with copying row chunks) func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool, err error) { - this.LastIterationRangeMutex.Lock() - if this.migrationContext.MigrationIterationRangeMinValues != nil && this.migrationContext.MigrationIterationRangeMaxValues != nil { - this.LastIterationRangeMinValues = this.migrationContext.MigrationIterationRangeMinValues.Clone() - this.LastIterationRangeMaxValues = this.migrationContext.MigrationIterationRangeMaxValues.Clone() - } - this.LastIterationRangeMutex.Unlock() - - this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationIterationRangeMaxValues - if this.migrationContext.MigrationIterationRangeMinValues == nil { - this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationRangeMinValues - } for i := 0; i < 2; i++ { buildFunc := sql.BuildUniqueKeyRangeEndPreparedQueryViaOffset if i == 1 { @@ -1470,10 +1459,9 @@ func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) []*dmlB results = append(results, this.buildDMLEventQuery(dmlEvent)...) return results } - query, sharedArgs, uniqueKeyArgs, err := this.dmlUpdateQueryBuilder.BuildQuery(dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues()) + query, updateArgs, err := this.dmlUpdateQueryBuilder.BuildQuery(dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues()) args := sqlutils.Args() - args = append(args, sharedArgs...) - args = append(args, uniqueKeyArgs...) + args = append(args, updateArgs...) return []*dmlBuildResult{newDmlBuildResult(query, args, 0, err)} } } @@ -1558,6 +1546,43 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) if execErr != nil { return rollback(execErr) } + + // Check for warnings when PanicOnWarnings is enabled + if this.migrationContext.PanicOnWarnings { + //nolint:execinquery + rows, err := tx.Query("SHOW WARNINGS") + if err != nil { + return rollback(err) + } + defer rows.Close() + if err = rows.Err(); err != nil { + return rollback(err) + } + + var sqlWarnings []string + for rows.Next() { + var level, message string + var code int + if err := rows.Scan(&level, &code, &message); err != nil { + this.migrationContext.Log.Warningf("Failed to read SHOW WARNINGS row") + continue + } + // Duplicate warnings are formatted differently across mysql versions, hence the optional table name prefix + migrationUniqueKeyExpression := fmt.Sprintf("for key '(%s\\.)?%s'", this.migrationContext.GetGhostTableName(), this.migrationContext.UniqueKey.NameInGhostTable) + matched, _ := regexp.MatchString(migrationUniqueKeyExpression, message) + if strings.Contains(message, "Duplicate entry") && matched { + // Duplicate entry on migration unique key is expected during binlog replay + // (row was already copied during bulk copy phase) + continue + } + sqlWarnings = append(sqlWarnings, fmt.Sprintf("%s: %s (%d)", level, message, code)) + } + if len(sqlWarnings) > 0 { + warningMsg := fmt.Sprintf("Warnings detected during DML event application: %v", sqlWarnings) + return rollback(errors.New(warningMsg)) + } + } + if err := tx.Commit(); err != nil { return err } diff --git a/go/logic/applier_test.go b/go/logic/applier_test.go index 23c34ed39..9c761373a 100644 --- a/go/logic/applier_test.go +++ b/go/logic/applier_test.go @@ -147,7 +147,7 @@ func TestApplierBuildDMLEventQuery(t *testing.T) { require.Len(t, res, 1) require.NoError(t, res[0].err) require.Equal(t, - `replace /* gh-ost `+"`test`.`_test_gho`"+` */ + `insert /* gh-ost `+"`test`.`_test_gho`"+` */ ignore into `+"`test`.`_test_gho`"+` `+"(`id`, `item_id`)"+` @@ -542,7 +542,9 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQuerySuc err = applier.ReadMigrationRangeValues() suite.Require().NoError(err) + migrationContext.SetNextIterationRangeMinValues() hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues() + suite.Require().NoError(err) suite.Require().True(hasFurtherRange) @@ -620,6 +622,7 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQueryFai err = applier.AlterGhost() suite.Require().NoError(err) + migrationContext.SetNextIterationRangeMinValues() hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues() suite.Require().NoError(err) suite.Require().True(hasFurtherRange) @@ -721,6 +724,262 @@ func (suite *ApplierTestSuite) TestWriteCheckpoint() { suite.Require().Equal(chk.IsCutover, gotChk.IsCutover) } +func (suite *ApplierTestSuite) TestPanicOnWarningsWithDuplicateKeyOnNonMigrationIndex() { + ctx := context.Background() + + var err error + + // Create table with id and email columns, where id is the primary key + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100));", getTestTableName())) + suite.Require().NoError(err) + + // Create ghost table with same schema plus a new unique index on email + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100), UNIQUE KEY email_unique (email));", getTestGhostTableName())) + suite.Require().NoError(err) + + connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer) + suite.Require().NoError(err) + + migrationContext := newTestMigrationContext() + migrationContext.ApplierConnectionConfig = connectionConfig + migrationContext.SetConnectionConfig("innodb") + + migrationContext.PanicOnWarnings = true + + migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "email"}) + migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "email"}) + migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "email"}) + migrationContext.UniqueKey = &sql.UniqueKey{ + Name: "PRIMARY", + NameInGhostTable: "PRIMARY", + Columns: *sql.NewColumnList([]string{"id"}), + } + + applier := NewApplier(migrationContext) + suite.Require().NoError(applier.prepareQueries()) + defer applier.Teardown() + + err = applier.InitDBConnections() + suite.Require().NoError(err) + + // Insert initial rows into ghost table (simulating bulk copy phase) + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (id, email) VALUES (1, 'user1@example.com'), (2, 'user2@example.com'), (3, 'user3@example.com');", getTestGhostTableName())) + suite.Require().NoError(err) + + // Simulate binlog event: try to insert a row with duplicate email + // This should fail with a warning because the ghost table has a unique index on email + dmlEvents := []*binlog.BinlogDMLEvent{ + { + DatabaseName: testMysqlDatabase, + TableName: testMysqlTableName, + DML: binlog.InsertDML, + NewColumnValues: sql.ToColumnValues([]interface{}{4, "user2@example.com"}), // duplicate email + }, + } + + // This should return an error when PanicOnWarnings is enabled + err = applier.ApplyDMLEventQueries(dmlEvents) + suite.Require().Error(err) + suite.Require().Contains(err.Error(), "Duplicate entry") + + // Verify that the ghost table still has only 3 rows (no data loss) + rows, err := suite.db.Query("SELECT * FROM " + getTestGhostTableName() + " ORDER BY id") + suite.Require().NoError(err) + defer rows.Close() + + var count int + for rows.Next() { + var id int + var email string + err = rows.Scan(&id, &email) + suite.Require().NoError(err) + count += 1 + } + suite.Require().NoError(rows.Err()) + + // All 3 original rows should still be present + suite.Require().Equal(3, count) +} + +// TestUpdateModifyingUniqueKeyWithDuplicateOnOtherIndex tests the scenario where: +// 1. An UPDATE modifies the unique key (converted to DELETE+INSERT) +// 2. The INSERT would create a duplicate on a NON-migration unique index +// 3. Without warning detection: DELETE succeeds, INSERT IGNORE skips = DATA LOSS +// 4. With PanicOnWarnings: Warning detected, transaction rolled back, no data loss +// This test verifies that PanicOnWarnings correctly prevents the data loss scenario. +func (suite *ApplierTestSuite) TestUpdateModifyingUniqueKeyWithDuplicateOnOtherIndex() { + ctx := context.Background() + + var err error + + // Create table with id (PRIMARY) and email (NO unique constraint yet) + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100));", getTestTableName())) + suite.Require().NoError(err) + + // Create ghost table with id (PRIMARY) AND email unique index (being added) + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100), UNIQUE KEY email_unique (email));", getTestGhostTableName())) + suite.Require().NoError(err) + + connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer) + suite.Require().NoError(err) + + migrationContext := newTestMigrationContext() + migrationContext.ApplierConnectionConfig = connectionConfig + migrationContext.SetConnectionConfig("innodb") + + migrationContext.PanicOnWarnings = true + + migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "email"}) + migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "email"}) + migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "email"}) + migrationContext.UniqueKey = &sql.UniqueKey{ + Name: "PRIMARY", + NameInGhostTable: "PRIMARY", + Columns: *sql.NewColumnList([]string{"id"}), + } + + applier := NewApplier(migrationContext) + suite.Require().NoError(applier.prepareQueries()) + defer applier.Teardown() + + err = applier.InitDBConnections() + suite.Require().NoError(err) + + // Setup: Insert initial rows into ghost table + // Row 1: id=1, email='bob@example.com' + // Row 2: id=2, email='charlie@example.com' + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (id, email) VALUES (1, 'bob@example.com'), (2, 'charlie@example.com');", getTestGhostTableName())) + suite.Require().NoError(err) + + // Simulate binlog event: UPDATE that changes BOTH PRIMARY KEY and email + // From: id=2, email='charlie@example.com' + // To: id=3, email='bob@example.com' (duplicate email with id=1) + // This will be converted to DELETE (id=2) + INSERT (id=3, 'bob@example.com') + // With INSERT IGNORE, the INSERT will skip because email='bob@example.com' already exists in id=1 + // Result: id=2 deleted, id=3 never inserted = DATA LOSS + dmlEvents := []*binlog.BinlogDMLEvent{ + { + DatabaseName: testMysqlDatabase, + TableName: testMysqlTableName, + DML: binlog.UpdateDML, + NewColumnValues: sql.ToColumnValues([]interface{}{3, "bob@example.com"}), // new: id=3, email='bob@example.com' + WhereColumnValues: sql.ToColumnValues([]interface{}{2, "charlie@example.com"}), // old: id=2, email='charlie@example.com' + }, + } + + // First verify this would be converted to DELETE+INSERT + buildResults := applier.buildDMLEventQuery(dmlEvents[0]) + suite.Require().Len(buildResults, 2, "UPDATE modifying unique key should be converted to DELETE+INSERT") + + // Apply the event - this should FAIL because INSERT will have duplicate email warning + err = applier.ApplyDMLEventQueries(dmlEvents) + suite.Require().Error(err, "Should fail when DELETE+INSERT causes duplicate on non-migration unique key") + suite.Require().Contains(err.Error(), "Duplicate entry", "Error should mention duplicate entry") + + // Verify that BOTH rows still exist (transaction rolled back) + rows, err := suite.db.Query("SELECT id, email FROM " + getTestGhostTableName() + " ORDER BY id") + suite.Require().NoError(err) + defer rows.Close() + + var count int + var ids []int + var emails []string + for rows.Next() { + var id int + var email string + err = rows.Scan(&id, &email) + suite.Require().NoError(err) + ids = append(ids, id) + emails = append(emails, email) + count++ + } + suite.Require().NoError(rows.Err()) + + // Transaction should have rolled back, so original 2 rows should still be there + suite.Require().Equal(2, count, "Should still have 2 rows after failed transaction") + suite.Require().Equal([]int{1, 2}, ids, "Should have original ids") + suite.Require().Equal([]string{"bob@example.com", "charlie@example.com"}, emails) +} + +// TestNormalUpdateWithPanicOnWarnings tests that normal UPDATEs (not modifying unique key) work correctly +func (suite *ApplierTestSuite) TestNormalUpdateWithPanicOnWarnings() { + ctx := context.Background() + + var err error + + // Create table with id (PRIMARY) and email + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100));", getTestTableName())) + suite.Require().NoError(err) + + // Create ghost table with same schema plus unique index on email + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100), UNIQUE KEY email_unique (email));", getTestGhostTableName())) + suite.Require().NoError(err) + + connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer) + suite.Require().NoError(err) + + migrationContext := newTestMigrationContext() + migrationContext.ApplierConnectionConfig = connectionConfig + migrationContext.SetConnectionConfig("innodb") + + migrationContext.PanicOnWarnings = true + + migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "email"}) + migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "email"}) + migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "email"}) + migrationContext.UniqueKey = &sql.UniqueKey{ + Name: "PRIMARY", + NameInGhostTable: "PRIMARY", + Columns: *sql.NewColumnList([]string{"id"}), + } + + applier := NewApplier(migrationContext) + suite.Require().NoError(applier.prepareQueries()) + defer applier.Teardown() + + err = applier.InitDBConnections() + suite.Require().NoError(err) + + // Setup: Insert initial rows into ghost table + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (id, email) VALUES (1, 'alice@example.com'), (2, 'bob@example.com');", getTestGhostTableName())) + suite.Require().NoError(err) + + // Simulate binlog event: Normal UPDATE that only changes email (not PRIMARY KEY) + // This should use UPDATE query, not DELETE+INSERT + dmlEvents := []*binlog.BinlogDMLEvent{ + { + DatabaseName: testMysqlDatabase, + TableName: testMysqlTableName, + DML: binlog.UpdateDML, + NewColumnValues: sql.ToColumnValues([]interface{}{2, "robert@example.com"}), // update email only + WhereColumnValues: sql.ToColumnValues([]interface{}{2, "bob@example.com"}), + }, + } + + // Verify this generates a single UPDATE query (not DELETE+INSERT) + buildResults := applier.buildDMLEventQuery(dmlEvents[0]) + suite.Require().Len(buildResults, 1, "Normal UPDATE should generate single UPDATE query") + + // Apply the event - should succeed + err = applier.ApplyDMLEventQueries(dmlEvents) + suite.Require().NoError(err) + + // Verify the update was applied correctly + rows, err := suite.db.Query("SELECT id, email FROM " + getTestGhostTableName() + " WHERE id = 2") + suite.Require().NoError(err) + defer rows.Close() + + var id int + var email string + suite.Require().True(rows.Next(), "Should find updated row") + err = rows.Scan(&id, &email) + suite.Require().NoError(err) + suite.Require().Equal(2, id) + suite.Require().Equal("robert@example.com", email) + suite.Require().False(rows.Next(), "Should only have one row") + suite.Require().NoError(rows.Err()) +} + func TestApplier(t *testing.T) { suite.Run(t, new(ApplierTestSuite)) } diff --git a/go/logic/hooks.go b/go/logic/hooks.go index 00a8d0fab..c22e86980 100644 --- a/go/logic/hooks.go +++ b/go/logic/hooks.go @@ -28,6 +28,7 @@ const ( onInteractiveCommand = "gh-ost-on-interactive-command" onSuccess = "gh-ost-on-success" onFailure = "gh-ost-on-failure" + onBatchCopyRetry = "gh-ost-on-batch-copy-retry" onStatus = "gh-ost-on-status" onStopReplication = "gh-ost-on-stop-replication" onStartReplication = "gh-ost-on-start-replication" @@ -78,6 +79,7 @@ func (this *HooksExecutor) applyEnvironmentVariables(extraVariables ...string) [ // executeHook executes a command, and sets relevant environment variables // combined output & error are printed to the configured writer. func (this *HooksExecutor) executeHook(hook string, extraVariables ...string) error { + this.migrationContext.Log.Infof("executing hook: %+v", hook) cmd := exec.Command(hook) cmd.Env = this.applyEnvironmentVariables(extraVariables...) @@ -124,6 +126,11 @@ func (this *HooksExecutor) onBeforeRowCopy() error { return this.executeHooks(onBeforeRowCopy) } +func (this *HooksExecutor) onBatchCopyRetry(errorMessage string) error { + v := fmt.Sprintf("GH_OST_LAST_BATCH_COPY_ERROR=%s", errorMessage) + return this.executeHooks(onBatchCopyRetry, v) +} + func (this *HooksExecutor) onRowCopyComplete() error { return this.executeHooks(onRowCopyComplete) } diff --git a/go/logic/inspect.go b/go/logic/inspect.go index 044360153..97895890d 100644 --- a/go/logic/inspect.go +++ b/go/logic/inspect.go @@ -596,7 +596,7 @@ func (this *Inspector) validateGhostTriggersDontExist() error { var foundTriggers []string for _, trigger := range this.migrationContext.Triggers { triggerName := this.migrationContext.GetGhostTriggerName(trigger.Name) - query := "select 1 from information_schema.triggers where trigger_name = ? and trigger_schema = ? and event_object_table = ?" + query := "select 1 from information_schema.triggers where trigger_name = ? and trigger_schema = ?" err := sqlutils.QueryRowsMap(this.db, query, func(rowMap sqlutils.RowMap) error { triggerExists := rowMap.GetInt("1") if triggerExists == 1 { @@ -606,7 +606,6 @@ func (this *Inspector) validateGhostTriggersDontExist() error { }, triggerName, this.migrationContext.DatabaseName, - this.migrationContext.OriginalTableName, ) if err != nil { return err diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 7255fc757..aa9a97c1c 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -135,6 +135,18 @@ func (this *Migrator) sleepWhileTrue(operation func() (bool, error)) error { } } +func (this *Migrator) retryBatchCopyWithHooks(operation func() error, notFatalHint ...bool) (err error) { + wrappedOperation := func() error { + if err := operation(); err != nil { + this.hooksExecutor.onBatchCopyRetry(err.Error()) + return err + } + return nil + } + + return this.retryOperation(wrappedOperation, notFatalHint...) +} + // retryOperation attempts up to `count` attempts at running given function, // exiting as soon as it returns with non-error. func (this *Migrator) retryOperation(operation func() error, notFatalHint ...bool) (err error) { @@ -842,7 +854,7 @@ func (this *Migrator) atomicCutOver() (err error) { // If we need to create triggers we need to do it here (only create part) if this.migrationContext.IncludeTriggers && len(this.migrationContext.Triggers) > 0 { if err := this.applier.CreateTriggersOnGhost(); err != nil { - this.migrationContext.Log.Errore(err) + return this.migrationContext.Log.Errore(err) } } @@ -1362,7 +1374,9 @@ func (this *Migrator) initiateApplier() error { return err } } - this.applier.WriteChangelogState(string(GhostTableMigrated)) + if _, err := this.applier.WriteChangelogState(string(GhostTableMigrated)); err != nil { + return err + } } // ensure performance_schema.metadata_locks is available. @@ -1405,27 +1419,24 @@ func (this *Migrator) iterateChunks() error { return nil } copyRowsFunc := func() error { - if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 || atomic.LoadInt64(&hasNoFurtherRangeFlag) == 1 { - // Done. - // There's another such check down the line - return nil - } - - // When hasFurtherRange is false, original table might be write locked and CalculateNextIterationRangeEndValues would hangs forever - - hasFurtherRange := false - if err := this.retryOperation(func() (e error) { - hasFurtherRange, e = this.applier.CalculateNextIterationRangeEndValues() - return e - }); err != nil { - return terminateRowIteration(err) - } - if !hasFurtherRange { - atomic.StoreInt64(&hasNoFurtherRangeFlag, 1) - return terminateRowIteration(nil) - } + this.migrationContext.SetNextIterationRangeMinValues() // Copy task: applyCopyRowsFunc := func() error { + if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 || atomic.LoadInt64(&hasNoFurtherRangeFlag) == 1 { + // Done. + // There's another such check down the line + return nil + } + + // When hasFurtherRange is false, original table might be write locked and CalculateNextIterationRangeEndValues would hangs forever + hasFurtherRange, err := this.applier.CalculateNextIterationRangeEndValues() + if err != nil { + return err // wrapping call will retry + } + if !hasFurtherRange { + atomic.StoreInt64(&hasNoFurtherRangeFlag, 1) + return terminateRowIteration(nil) + } if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 { // No need for more writes. // This is the de-facto place where we avoid writing in the event of completed cut-over. @@ -1456,9 +1467,18 @@ func (this *Migrator) iterateChunks() error { atomic.AddInt64(&this.migrationContext.Iteration, 1) return nil } - if err := this.retryOperation(applyCopyRowsFunc); err != nil { + if err := this.retryBatchCopyWithHooks(applyCopyRowsFunc); err != nil { return terminateRowIteration(err) } + + // record last successfully copied range + this.applier.LastIterationRangeMutex.Lock() + if this.migrationContext.MigrationIterationRangeMinValues != nil && this.migrationContext.MigrationIterationRangeMaxValues != nil { + this.applier.LastIterationRangeMinValues = this.migrationContext.MigrationIterationRangeMinValues.Clone() + this.applier.LastIterationRangeMaxValues = this.migrationContext.MigrationIterationRangeMaxValues.Clone() + } + this.applier.LastIterationRangeMutex.Unlock() + return nil } // Enqueue copy operation; to be executed by executeWriteFuncs() diff --git a/go/logic/migrator_test.go b/go/logic/migrator_test.go index 7e5ddab9f..c4fd49233 100644 --- a/go/logic/migrator_test.go +++ b/go/logic/migrator_test.go @@ -6,10 +6,12 @@ package logic import ( + "bytes" "context" gosql "database/sql" "errors" "fmt" + "io" "os" "path/filepath" "strings" @@ -325,6 +327,8 @@ func (suite *MigratorTestSuite) SetupTest() { _, err := suite.db.ExecContext(ctx, "CREATE DATABASE IF NOT EXISTS "+testMysqlDatabase) suite.Require().NoError(err) + + os.Remove("/tmp/gh-ost.sock") } func (suite *MigratorTestSuite) TearDownTest() { @@ -384,6 +388,126 @@ func (suite *MigratorTestSuite) TestMigrateEmpty() { suite.Require().Equal("_testing_del", tableName) } +func (suite *MigratorTestSuite) TestRetryBatchCopyWithHooks() { + ctx := context.Background() + + _, err := suite.db.ExecContext(ctx, "CREATE TABLE test.test_retry_batch (id INT PRIMARY KEY AUTO_INCREMENT, name TEXT)") + suite.Require().NoError(err) + + const initStride = 1000 + const totalBatches = 3 + for i := 0; i < totalBatches; i++ { + dataSize := 50 * i + for j := 0; j < initStride; j++ { + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO test.test_retry_batch (name) VALUES ('%s')", strings.Repeat("a", dataSize))) + suite.Require().NoError(err) + } + } + + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("SET GLOBAL max_binlog_cache_size = %d", 1024*8)) + suite.Require().NoError(err) + defer func() { + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("SET GLOBAL max_binlog_cache_size = %d", 1024*1024*1024)) + suite.Require().NoError(err) + }() + + tmpDir, err := os.MkdirTemp("", "gh-ost-hooks") + suite.Require().NoError(err) + defer os.RemoveAll(tmpDir) + + hookScript := filepath.Join(tmpDir, "gh-ost-on-batch-copy-retry") + hookContent := `#!/bin/bash +# Mock hook that reduces chunk size on binlog cache error +ERROR_MSG="$GH_OST_LAST_BATCH_COPY_ERROR" +SOCKET_PATH="/tmp/gh-ost.sock" + +if ! [[ "$ERROR_MSG" =~ "max_binlog_cache_size" ]]; then + echo "Nothing to do for error: $ERROR_MSG" + exit 0 +fi + +CHUNK_SIZE=$(echo "chunk-size=?" | nc -U $SOCKET_PATH | tr -d '\n') + +MIN_CHUNK_SIZE=10 +NEW_CHUNK_SIZE=$(( CHUNK_SIZE * 8 / 10 )) +if [ $NEW_CHUNK_SIZE -lt $MIN_CHUNK_SIZE ]; then + NEW_CHUNK_SIZE=$MIN_CHUNK_SIZE +fi + +if [ $CHUNK_SIZE -eq $NEW_CHUNK_SIZE ]; then + echo "Chunk size unchanged: $CHUNK_SIZE" + exit 0 +fi + +echo "[gh-ost-on-batch-copy-retry]: Changing chunk size from $CHUNK_SIZE to $NEW_CHUNK_SIZE" +echo "chunk-size=$NEW_CHUNK_SIZE" | nc -U $SOCKET_PATH +echo "[gh-ost-on-batch-copy-retry]: Done, exiting..." +` + err = os.WriteFile(hookScript, []byte(hookContent), 0755) + suite.Require().NoError(err) + + origStdout := os.Stdout + origStderr := os.Stderr + + rOut, wOut, _ := os.Pipe() + rErr, wErr, _ := os.Pipe() + os.Stdout = wOut + os.Stderr = wErr + + connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer) + suite.Require().NoError(err) + + migrationContext := base.NewMigrationContext() + migrationContext.AllowedRunningOnMaster = true + migrationContext.ApplierConnectionConfig = connectionConfig + migrationContext.InspectorConnectionConfig = connectionConfig + migrationContext.DatabaseName = "test" + migrationContext.SkipPortValidation = true + migrationContext.OriginalTableName = "test_retry_batch" + migrationContext.SetConnectionConfig("innodb") + migrationContext.AlterStatementOptions = "MODIFY name LONGTEXT, ENGINE=InnoDB" + migrationContext.ReplicaServerId = 99999 + migrationContext.HeartbeatIntervalMilliseconds = 100 + migrationContext.ThrottleHTTPIntervalMillis = 100 + migrationContext.ThrottleHTTPTimeoutMillis = 1000 + migrationContext.HooksPath = tmpDir + migrationContext.ChunkSize = 1000 + migrationContext.SetDefaultNumRetries(10) + migrationContext.ServeSocketFile = "/tmp/gh-ost.sock" + + migrator := NewMigrator(migrationContext, "0.0.0") + + err = migrator.Migrate() + suite.Require().NoError(err) + + wOut.Close() + wErr.Close() + os.Stdout = origStdout + os.Stderr = origStderr + + var bufOut, bufErr bytes.Buffer + io.Copy(&bufOut, rOut) + io.Copy(&bufErr, rErr) + + outStr := bufOut.String() + errStr := bufErr.String() + + suite.Assert().Contains(outStr, "chunk-size: 1000") + suite.Assert().Contains(errStr, "[gh-ost-on-batch-copy-retry]: Changing chunk size from 1000 to 800") + suite.Assert().Contains(outStr, "chunk-size: 800") + + suite.Assert().Contains(errStr, "[gh-ost-on-batch-copy-retry]: Changing chunk size from 800 to 640") + suite.Assert().Contains(outStr, "chunk-size: 640") + + suite.Assert().Contains(errStr, "[gh-ost-on-batch-copy-retry]: Changing chunk size from 640 to 512") + suite.Assert().Contains(outStr, "chunk-size: 512") + + var count int + err = suite.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM test.test_retry_batch").Scan(&count) + suite.Require().NoError(err) + suite.Assert().Equal(3000, count) +} + func (suite *MigratorTestSuite) TestCopierIntPK() { ctx := context.Background() diff --git a/go/mysql/utils.go b/go/mysql/utils.go index 7d57afbf1..9619e1e9f 100644 --- a/go/mysql/utils.go +++ b/go/mysql/utils.go @@ -248,9 +248,9 @@ func Kill(db *gosql.DB, connectionID string) error { // GetTriggers reads trigger list from given table func GetTriggers(db *gosql.DB, databaseName, tableName string) (triggers []Trigger, err error) { - query := fmt.Sprintf(`select trigger_name as name, event_manipulation as event, action_statement as statement, action_timing as timing - from information_schema.triggers - where trigger_schema = '%s' and event_object_table = '%s'`, databaseName, tableName) + query := `select trigger_name as name, event_manipulation as event, action_statement as statement, action_timing as timing + from information_schema.triggers + where trigger_schema = ? and event_object_table = ?` err = sqlutils.QueryRowsMap(db, query, func(rowMap sqlutils.RowMap) error { triggers = append(triggers, Trigger{ @@ -260,7 +260,7 @@ func GetTriggers(db *gosql.DB, databaseName, tableName string) (triggers []Trigg Timing: rowMap.GetString("timing"), }) return nil - }) + }, databaseName, tableName) if err != nil { return nil, err } diff --git a/go/sql/builder.go b/go/sql/builder.go index 61dd9706f..940ca4ca3 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -169,11 +169,11 @@ func (b *CheckpointInsertQueryBuilder) BuildQuery(uniqueKeyArgs []interface{}) ( } convertedArgs := make([]interface{}, 0, 2*b.uniqueKeyColumns.Len()) for i, column := range b.uniqueKeyColumns.Columns() { - minArg := column.convertArg(uniqueKeyArgs[i], true) + minArg := column.convertArg(uniqueKeyArgs[i]) convertedArgs = append(convertedArgs, minArg) } for i, column := range b.uniqueKeyColumns.Columns() { - minArg := column.convertArg(uniqueKeyArgs[i+b.uniqueKeyColumns.Len()], true) + minArg := column.convertArg(uniqueKeyArgs[i+b.uniqueKeyColumns.Len()]) convertedArgs = append(convertedArgs, minArg) } return b.preparedStatement, convertedArgs, nil @@ -533,7 +533,7 @@ func (b *DMLDeleteQueryBuilder) BuildQuery(args []interface{}) (string, []interf uniqueKeyArgs := make([]interface{}, 0, b.uniqueKeyColumns.Len()) for _, column := range b.uniqueKeyColumns.Columns() { tableOrdinal := b.tableColumns.Ordinals[column.Name] - arg := column.convertArg(args[tableOrdinal], true) + arg := column.convertArg(args[tableOrdinal]) uniqueKeyArgs = append(uniqueKeyArgs, arg) } return b.preparedStatement, uniqueKeyArgs, nil @@ -566,7 +566,7 @@ func NewDMLInsertQueryBuilder(databaseName, tableName string, tableColumns, shar preparedValues := buildColumnsPreparedValues(mappedSharedColumns) stmt := fmt.Sprintf(` - replace /* gh-ost %s.%s */ + insert /* gh-ost %s.%s */ ignore into %s.%s (%s) @@ -595,7 +595,7 @@ func (b *DMLInsertQueryBuilder) BuildQuery(args []interface{}) (string, []interf sharedArgs := make([]interface{}, 0, b.sharedColumns.Len()) for _, column := range b.sharedColumns.Columns() { tableOrdinal := b.tableColumns.Ordinals[column.Name] - arg := column.convertArg(args[tableOrdinal], false) + arg := column.convertArg(args[tableOrdinal]) sharedArgs = append(sharedArgs, arg) } return b.preparedStatement, sharedArgs, nil @@ -661,20 +661,18 @@ func NewDMLUpdateQueryBuilder(databaseName, tableName string, tableColumns, shar // BuildQuery builds the arguments array for a DML event UPDATE query. // It returns the query string, the shared arguments array, and the unique key arguments array. -func (b *DMLUpdateQueryBuilder) BuildQuery(valueArgs, whereArgs []interface{}) (string, []interface{}, []interface{}, error) { - sharedArgs := make([]interface{}, 0, b.sharedColumns.Len()) +func (b *DMLUpdateQueryBuilder) BuildQuery(valueArgs, whereArgs []interface{}) (string, []interface{}, error) { + args := make([]interface{}, 0, b.sharedColumns.Len()+b.uniqueKeyColumns.Len()) for _, column := range b.sharedColumns.Columns() { tableOrdinal := b.tableColumns.Ordinals[column.Name] - arg := column.convertArg(valueArgs[tableOrdinal], false) - sharedArgs = append(sharedArgs, arg) + arg := column.convertArg(valueArgs[tableOrdinal]) + args = append(args, arg) } - - uniqueKeyArgs := make([]interface{}, 0, b.uniqueKeyColumns.Len()) for _, column := range b.uniqueKeyColumns.Columns() { tableOrdinal := b.tableColumns.Ordinals[column.Name] - arg := column.convertArg(whereArgs[tableOrdinal], true) - uniqueKeyArgs = append(uniqueKeyArgs, arg) + arg := column.convertArg(whereArgs[tableOrdinal]) + args = append(args, arg) } - return b.preparedStatement, sharedArgs, uniqueKeyArgs, nil + return b.preparedStatement, args, nil } diff --git a/go/sql/builder_test.go b/go/sql/builder_test.go index 840d85c96..7f80005b0 100644 --- a/go/sql/builder_test.go +++ b/go/sql/builder_test.go @@ -538,7 +538,7 @@ func TestBuildDMLInsertQuery(t *testing.T) { query, sharedArgs, err := builder.BuildQuery(args) require.NoError(t, err) expected := ` - replace /* gh-ost mydb.tbl */ + insert /* gh-ost mydb.tbl */ ignore into mydb.tbl (id, name, position, age) values @@ -554,7 +554,7 @@ func TestBuildDMLInsertQuery(t *testing.T) { query, sharedArgs, err := builder.BuildQuery(args) require.NoError(t, err) expected := ` - replace /* gh-ost mydb.tbl */ + insert /* gh-ost mydb.tbl */ ignore into mydb.tbl (position, name, age, id) values @@ -589,7 +589,7 @@ func TestBuildDMLInsertQuerySignedUnsigned(t *testing.T) { query, sharedArgs, err := builder.BuildQuery(args) require.NoError(t, err) expected := ` - replace /* gh-ost mydb.tbl */ + insert /* gh-ost mydb.tbl */ ignore into mydb.tbl (id, name, position, age) values @@ -607,7 +607,7 @@ func TestBuildDMLInsertQuerySignedUnsigned(t *testing.T) { query, sharedArgs, err := builder.BuildQuery(args) require.NoError(t, err) expected := ` - replace /* gh-ost mydb.tbl */ + insert /* gh-ost mydb.tbl */ ignore into mydb.tbl (id, name, position, age) values @@ -625,7 +625,7 @@ func TestBuildDMLInsertQuerySignedUnsigned(t *testing.T) { query, sharedArgs, err := builder.BuildQuery(args) require.NoError(t, err) expected := ` - replace /* gh-ost mydb.tbl */ + insert /* gh-ost mydb.tbl */ ignore into mydb.tbl (id, name, position, age) values @@ -647,7 +647,7 @@ func TestBuildDMLUpdateQuery(t *testing.T) { uniqueKeyColumns := NewColumnList([]string{"position"}) builder, err := NewDMLUpdateQueryBuilder(databaseName, tableName, tableColumns, sharedColumns, sharedColumns, uniqueKeyColumns) require.NoError(t, err) - query, sharedArgs, uniqueKeyArgs, err := builder.BuildQuery(valueArgs, whereArgs) + query, updateArgs, err := builder.BuildQuery(valueArgs, whereArgs) require.NoError(t, err) expected := ` update /* gh-ost mydb.tbl */ @@ -657,15 +657,14 @@ func TestBuildDMLUpdateQuery(t *testing.T) { ((position = ?)) ` require.Equal(t, normalizeQuery(expected), normalizeQuery(query)) - require.Equal(t, []interface{}{3, "testname", 17, 23}, sharedArgs) - require.Equal(t, []interface{}{17}, uniqueKeyArgs) + require.Equal(t, []interface{}{3, "testname", 17, 23, 17}, updateArgs) } { sharedColumns := NewColumnList([]string{"id", "name", "position", "age"}) uniqueKeyColumns := NewColumnList([]string{"position", "name"}) builder, err := NewDMLUpdateQueryBuilder(databaseName, tableName, tableColumns, sharedColumns, sharedColumns, uniqueKeyColumns) require.NoError(t, err) - query, sharedArgs, uniqueKeyArgs, err := builder.BuildQuery(valueArgs, whereArgs) + query, updateArgs, err := builder.BuildQuery(valueArgs, whereArgs) require.NoError(t, err) expected := ` update /* gh-ost mydb.tbl */ @@ -675,15 +674,14 @@ func TestBuildDMLUpdateQuery(t *testing.T) { ((position = ?) and (name = ?)) ` require.Equal(t, normalizeQuery(expected), normalizeQuery(query)) - require.Equal(t, []interface{}{3, "testname", 17, 23}, sharedArgs) - require.Equal(t, []interface{}{17, "testname"}, uniqueKeyArgs) + require.Equal(t, []interface{}{3, "testname", 17, 23, 17, "testname"}, updateArgs) } { sharedColumns := NewColumnList([]string{"id", "name", "position", "age"}) uniqueKeyColumns := NewColumnList([]string{"age"}) builder, err := NewDMLUpdateQueryBuilder(databaseName, tableName, tableColumns, sharedColumns, sharedColumns, uniqueKeyColumns) require.NoError(t, err) - query, sharedArgs, uniqueKeyArgs, err := builder.BuildQuery(valueArgs, whereArgs) + query, updateArgs, err := builder.BuildQuery(valueArgs, whereArgs) require.NoError(t, err) expected := ` update /* gh-ost mydb.tbl */ @@ -693,15 +691,14 @@ func TestBuildDMLUpdateQuery(t *testing.T) { ((age = ?)) ` require.Equal(t, normalizeQuery(expected), normalizeQuery(query)) - require.Equal(t, []interface{}{3, "testname", 17, 23}, sharedArgs) - require.Equal(t, []interface{}{56}, uniqueKeyArgs) + require.Equal(t, []interface{}{3, "testname", 17, 23, 56}, updateArgs) } { sharedColumns := NewColumnList([]string{"id", "name", "position", "age"}) uniqueKeyColumns := NewColumnList([]string{"age", "position", "id", "name"}) builder, err := NewDMLUpdateQueryBuilder(databaseName, tableName, tableColumns, sharedColumns, sharedColumns, uniqueKeyColumns) require.NoError(t, err) - query, sharedArgs, uniqueKeyArgs, err := builder.BuildQuery(valueArgs, whereArgs) + query, updateArgs, err := builder.BuildQuery(valueArgs, whereArgs) require.NoError(t, err) expected := ` update /* gh-ost mydb.tbl */ @@ -711,8 +708,7 @@ func TestBuildDMLUpdateQuery(t *testing.T) { ((age = ?) and (position = ?) and (id = ?) and (name = ?)) ` require.Equal(t, normalizeQuery(expected), normalizeQuery(query)) - require.Equal(t, []interface{}{3, "testname", 17, 23}, sharedArgs) - require.Equal(t, []interface{}{56, 17, 3, "testname"}, uniqueKeyArgs) + require.Equal(t, []interface{}{3, "testname", 17, 23, 56, 17, 3, "testname"}, updateArgs) } { sharedColumns := NewColumnList([]string{"id", "name", "position", "age"}) @@ -732,7 +728,7 @@ func TestBuildDMLUpdateQuery(t *testing.T) { uniqueKeyColumns := NewColumnList([]string{"id"}) builder, err := NewDMLUpdateQueryBuilder(databaseName, tableName, tableColumns, sharedColumns, mappedColumns, uniqueKeyColumns) require.NoError(t, err) - query, sharedArgs, uniqueKeyArgs, err := builder.BuildQuery(valueArgs, whereArgs) + query, updateArgs, err := builder.BuildQuery(valueArgs, whereArgs) require.NoError(t, err) expected := ` update /* gh-ost mydb.tbl */ @@ -742,8 +738,7 @@ func TestBuildDMLUpdateQuery(t *testing.T) { ((id = ?)) ` require.Equal(t, normalizeQuery(expected), normalizeQuery(query)) - require.Equal(t, []interface{}{3, "testname", 17, 23}, sharedArgs) - require.Equal(t, []interface{}{3}, uniqueKeyArgs) + require.Equal(t, []interface{}{3, "testname", 17, 23, 3}, updateArgs) } } @@ -759,7 +754,7 @@ func TestBuildDMLUpdateQuerySignedUnsigned(t *testing.T) { require.NoError(t, err) { // test signed - query, sharedArgs, uniqueKeyArgs, err := builder.BuildQuery(valueArgs, whereArgs) + query, updateArgs, err := builder.BuildQuery(valueArgs, whereArgs) require.NoError(t, err) expected := ` update /* gh-ost mydb.tbl */ @@ -769,14 +764,13 @@ func TestBuildDMLUpdateQuerySignedUnsigned(t *testing.T) { ((position = ?)) ` require.Equal(t, normalizeQuery(expected), normalizeQuery(query)) - require.Equal(t, []interface{}{3, "testname", int8(-17), int8(-2)}, sharedArgs) - require.Equal(t, []interface{}{int8(-3)}, uniqueKeyArgs) + require.Equal(t, []interface{}{3, "testname", int8(-17), int8(-2), int8(-3)}, updateArgs) } { // test unsigned sharedColumns.SetUnsigned("age") uniqueKeyColumns.SetUnsigned("position") - query, sharedArgs, uniqueKeyArgs, err := builder.BuildQuery(valueArgs, whereArgs) + query, updateArgs, err := builder.BuildQuery(valueArgs, whereArgs) require.NoError(t, err) expected := ` update /* gh-ost mydb.tbl */ @@ -786,8 +780,7 @@ func TestBuildDMLUpdateQuerySignedUnsigned(t *testing.T) { ((position = ?)) ` require.Equal(t, normalizeQuery(expected), normalizeQuery(query)) - require.Equal(t, []interface{}{3, "testname", int8(-17), uint8(254)}, sharedArgs) - require.Equal(t, []interface{}{uint8(253)}, uniqueKeyArgs) + require.Equal(t, []interface{}{3, "testname", int8(-17), uint8(254), uint8(253)}, updateArgs) } } diff --git a/go/sql/types.go b/go/sql/types.go index a01fb8bff..9a50bc620 100644 --- a/go/sql/types.go +++ b/go/sql/types.go @@ -57,7 +57,7 @@ type Column struct { MySQLType string } -func (this *Column) convertArg(arg interface{}, isUniqueKeyColumn bool) interface{} { +func (this *Column) convertArg(arg interface{}) interface{} { var arg2Bytes []byte if s, ok := arg.(string); ok { arg2Bytes = []byte(s) @@ -77,14 +77,14 @@ func (this *Column) convertArg(arg interface{}, isUniqueKeyColumn bool) interfac } } - if this.Type == BinaryColumnType && isUniqueKeyColumn { + if this.Type == BinaryColumnType { size := len(arg2Bytes) if uint(size) < this.BinaryOctetLength { buf := bytes.NewBuffer(arg2Bytes) for i := uint(0); i < (this.BinaryOctetLength - uint(size)); i++ { buf.Write([]byte{0}) } - arg = buf.String() + arg = buf.Bytes() } } diff --git a/go/sql/types_test.go b/go/sql/types_test.go index 7b808e64f..83c74073a 100644 --- a/go/sql/types_test.go +++ b/go/sql/types_test.go @@ -62,6 +62,56 @@ func TestConvertArgCharsetDecoding(t *testing.T) { } // Should decode []uint8 - str := col.convertArg(latin1Bytes, false) + str := col.convertArg(latin1Bytes) require.Equal(t, "Garçon !", str) } + +func TestConvertArgBinaryColumnPadding(t *testing.T) { + // Test that binary columns are padded with trailing zeros to their declared length. + // This is needed because MySQL's binlog strips trailing 0x00 bytes from binary values. + // See https://github.com/github/gh-ost/issues/909 + + // Simulates a binary(20) column where binlog delivered only 18 bytes + // (trailing zeros were stripped) + truncatedValue := []uint8{ + 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, + 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 0x10, + 0x11, 0x12, // 18 bytes, missing 2 trailing zeros + } + + col := Column{ + Name: "bin_col", + Type: BinaryColumnType, + BinaryOctetLength: 20, + } + + result := col.convertArg(truncatedValue) + resultBytes := result.([]byte) + + require.Equal(t, 20, len(resultBytes), "binary column should be padded to declared length") + // First 18 bytes should be unchanged + require.Equal(t, truncatedValue, resultBytes[:18]) + // Last 2 bytes should be zeros + require.Equal(t, []byte{0x00, 0x00}, resultBytes[18:]) +} + +func TestConvertArgBinaryColumnNoPaddingWhenFull(t *testing.T) { + // When binary value is already at full length, no padding should occur + fullValue := []uint8{ + 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, + 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 0x10, + 0x11, 0x12, 0x13, 0x14, // 20 bytes + } + + col := Column{ + Name: "bin_col", + Type: BinaryColumnType, + BinaryOctetLength: 20, + } + + result := col.convertArg(fullValue) + resultBytes := result.([]byte) + + require.Equal(t, 20, len(resultBytes)) + require.Equal(t, fullValue, resultBytes) +} diff --git a/localtests/binary-to-varbinary/create.sql b/localtests/binary-to-varbinary/create.sql new file mode 100644 index 000000000..c2867f9ea --- /dev/null +++ b/localtests/binary-to-varbinary/create.sql @@ -0,0 +1,38 @@ +-- Test for https://github.com/github/gh-ost/issues/909 variant: +-- Binary columns with trailing zeros should preserve their values +-- when migrating from binary(N) to varbinary(M), even for rows +-- modified during migration via binlog events. + +drop table if exists gh_ost_test; +create table gh_ost_test ( + id int NOT NULL AUTO_INCREMENT, + info varchar(255) NOT NULL, + data binary(20) NOT NULL, + PRIMARY KEY (id) +) auto_increment=1; + +drop event if exists gh_ost_test; +delimiter ;; +create event gh_ost_test + on schedule every 1 second + starts current_timestamp + ends current_timestamp + interval 60 second + on completion not preserve + enable + do +begin + -- Insert rows where data has trailing zeros (will be stripped by binlog) + INSERT INTO gh_ost_test (info, data) VALUES ('insert-during-1', X'aabbccdd00000000000000000000000000000000'); + INSERT INTO gh_ost_test (info, data) VALUES ('insert-during-2', X'11223344556677889900000000000000000000ee'); + + -- Update existing rows to values with trailing zeros + UPDATE gh_ost_test SET data = X'ffeeddcc00000000000000000000000000000000' WHERE info = 'update-target-1'; + UPDATE gh_ost_test SET data = X'aabbccdd11111111111111111100000000000000' WHERE info = 'update-target-2'; +end ;; + +-- Pre-existing rows (copied via rowcopy, not binlog - these should work fine) +INSERT INTO gh_ost_test (info, data) VALUES + ('pre-existing-1', X'01020304050607080910111213141516171819ff'), + ('pre-existing-2', X'0102030405060708091011121314151617181900'), + ('update-target-1', X'ffffffffffffffffffffffffffffffffffffffff'), + ('update-target-2', X'eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee'); diff --git a/localtests/binary-to-varbinary/extra_args b/localtests/binary-to-varbinary/extra_args new file mode 100644 index 000000000..7ebdacf6c --- /dev/null +++ b/localtests/binary-to-varbinary/extra_args @@ -0,0 +1 @@ +--alter="MODIFY data varbinary(32)" diff --git a/localtests/copy-retries-exhausted/after.sql b/localtests/copy-retries-exhausted/after.sql new file mode 100644 index 000000000..c3d55e5b9 --- /dev/null +++ b/localtests/copy-retries-exhausted/after.sql @@ -0,0 +1 @@ +set global max_binlog_cache_size = 1073741824; -- 1GB diff --git a/localtests/copy-retries-exhausted/before.sql b/localtests/copy-retries-exhausted/before.sql new file mode 100644 index 000000000..a3570171a --- /dev/null +++ b/localtests/copy-retries-exhausted/before.sql @@ -0,0 +1 @@ +set global max_binlog_cache_size = 1024; diff --git a/localtests/copy-retries-exhausted/create.sql b/localtests/copy-retries-exhausted/create.sql new file mode 100644 index 000000000..4e37938ec --- /dev/null +++ b/localtests/copy-retries-exhausted/create.sql @@ -0,0 +1,12 @@ +drop table if exists gh_ost_test; +create table gh_ost_test ( + id int auto_increment, + name mediumtext not null, + primary key (id) +) auto_increment=1; + +insert into gh_ost_test (name) +select repeat('a', 1500) +from information_schema.columns +cross join information_schema.tables +limit 1000; diff --git a/localtests/copy-retries-exhausted/expect_failure b/localtests/copy-retries-exhausted/expect_failure new file mode 100644 index 000000000..cd6a516ec --- /dev/null +++ b/localtests/copy-retries-exhausted/expect_failure @@ -0,0 +1 @@ +Multi-statement transaction required more than 'max_binlog_cache_size' bytes of storage diff --git a/localtests/copy-retries-exhausted/extra_args b/localtests/copy-retries-exhausted/extra_args new file mode 100644 index 000000000..e4f8a0104 --- /dev/null +++ b/localtests/copy-retries-exhausted/extra_args @@ -0,0 +1 @@ +--alter "modify column name mediumtext" --default-retries=1 --chunk-size=1000 diff --git a/localtests/test.sh b/localtests/test.sh index 8eba73be1..404eeece3 100755 --- a/localtests/test.sh +++ b/localtests/test.sh @@ -225,6 +225,11 @@ test_single() { cat $tests_path/$test_name/create.sql return 1 fi + + if [ -f $tests_path/$test_name/before.sql ]; then + gh-ost-test-mysql-master --default-character-set=utf8mb4 test < $tests_path/$test_name/before.sql + gh-ost-test-mysql-replica --default-character-set=utf8mb4 test < $tests_path/$test_name/before.sql + fi extra_args="" if [ -f $tests_path/$test_name/extra_args ]; then @@ -315,6 +320,11 @@ test_single() { gh-ost-test-mysql-replica --default-character-set=utf8mb4 test -e "set @@global.sql_mode='${original_sql_mode}'" fi + if [ -f $tests_path/$test_name/after.sql ]; then + gh-ost-test-mysql-master --default-character-set=utf8mb4 test < $tests_path/$test_name/after.sql + gh-ost-test-mysql-replica --default-character-set=utf8mb4 test < $tests_path/$test_name/after.sql + fi + if [ -f $tests_path/$test_name/destroy.sql ]; then gh-ost-test-mysql-master --default-character-set=utf8mb4 test <$tests_path/$test_name/destroy.sql fi diff --git a/localtests/trigger-ghost-name-conflict/create.sql b/localtests/trigger-ghost-name-conflict/create.sql new file mode 100644 index 000000000..32980daa0 --- /dev/null +++ b/localtests/trigger-ghost-name-conflict/create.sql @@ -0,0 +1,35 @@ +-- Bug #3 regression test: validateGhostTriggersDontExist must check whole schema +-- MySQL trigger names are unique per SCHEMA, not per table. +-- The validation must detect a trigger with the ghost name on ANY table in the schema. + +drop trigger if exists gh_ost_test_ai_ght; +drop trigger if exists gh_ost_test_ai; +drop table if exists gh_ost_test_other; +drop table if exists gh_ost_test; + +create table gh_ost_test ( + id int auto_increment, + i int not null, + primary key(id) +) auto_increment=1; + +-- This trigger has the _ght suffix (simulating a previous migration left it). +-- Ghost name = "gh_ost_test_ai" (suffix removed). +create trigger gh_ost_test_ai_ght + after insert on gh_ost_test for each row + set @dummy = 1; + +-- Create ANOTHER table with a trigger named "gh_ost_test_ai" (the ghost name). +-- Validation must detect this conflict even though the trigger is on a different table. +create table gh_ost_test_other ( + id int auto_increment, + primary key(id) +); + +create trigger gh_ost_test_ai + after insert on gh_ost_test_other for each row + set @dummy = 1; + +insert into gh_ost_test values (null, 11); +insert into gh_ost_test values (null, 13); +insert into gh_ost_test values (null, 17); diff --git a/localtests/trigger-ghost-name-conflict/destroy.sql b/localtests/trigger-ghost-name-conflict/destroy.sql new file mode 100644 index 000000000..5b5e1e4f7 --- /dev/null +++ b/localtests/trigger-ghost-name-conflict/destroy.sql @@ -0,0 +1,2 @@ +drop trigger if exists gh_ost_test_ai; +drop table if exists gh_ost_test_other; diff --git a/localtests/trigger-ghost-name-conflict/expect_failure b/localtests/trigger-ghost-name-conflict/expect_failure new file mode 100644 index 000000000..1a4c997f3 --- /dev/null +++ b/localtests/trigger-ghost-name-conflict/expect_failure @@ -0,0 +1 @@ +Found gh-ost triggers \ No newline at end of file diff --git a/localtests/trigger-ghost-name-conflict/extra_args b/localtests/trigger-ghost-name-conflict/extra_args new file mode 100644 index 000000000..128ce0a66 --- /dev/null +++ b/localtests/trigger-ghost-name-conflict/extra_args @@ -0,0 +1 @@ +--include-triggers --trigger-suffix=_ght --remove-trigger-suffix-if-exists \ No newline at end of file diff --git a/localtests/trigger-long-name-validation/create.sql b/localtests/trigger-long-name-validation/create.sql new file mode 100644 index 000000000..526713b74 --- /dev/null +++ b/localtests/trigger-long-name-validation/create.sql @@ -0,0 +1,38 @@ +-- Bug #1: Double-transformation in trigger length validation +-- A trigger with a 60-char name should be valid: ghost name = 60 + 4 (_ght) = 64 chars (max allowed). +-- But validateGhostTriggersLength() applies GetGhostTriggerName() twice, +-- computing 60 + 4 + 4 = 68, which falsely exceeds the 64-char limit. + +drop table if exists gh_ost_test; + +create table gh_ost_test ( + id int auto_increment, + i int not null, + primary key(id) +) auto_increment=1; + +-- Trigger name is exactly 60 characters (padded to 60). +-- Ghost name with _ght suffix = 64 chars = exactly at the MySQL limit. +-- 60 chars: trigger_long_name_padding_aaaaaaaaaaaaaaaaaaaaa_60chars_xxxx +create trigger trigger_long_name_padding_aaaaaaaaaaaaaaaaaaaaa_60chars_xxxx + after insert on gh_ost_test for each row + set @dummy = 1; + +insert into gh_ost_test values (null, 11); +insert into gh_ost_test values (null, 13); +insert into gh_ost_test values (null, 17); + +drop event if exists gh_ost_test; +delimiter ;; +create event gh_ost_test + on schedule every 1 second + starts current_timestamp + ends current_timestamp + interval 60 second + on completion not preserve + enable + do +begin + insert into gh_ost_test values (null, 23); + update gh_ost_test set i=i+1 where id=1; +end ;; +delimiter ; diff --git a/localtests/trigger-long-name-validation/extra_args b/localtests/trigger-long-name-validation/extra_args new file mode 100644 index 000000000..128ce0a66 --- /dev/null +++ b/localtests/trigger-long-name-validation/extra_args @@ -0,0 +1 @@ +--include-triggers --trigger-suffix=_ght --remove-trigger-suffix-if-exists \ No newline at end of file diff --git a/script/bootstrap b/script/bootstrap index 573313a75..96bf45675 100755 --- a/script/bootstrap +++ b/script/bootstrap @@ -11,7 +11,9 @@ set -e # up so it points back to us and go is none the wiser set -x -rm -rf .gopath -mkdir -p .gopath/src/github.com/github -ln -s "$PWD" .gopath/src/github.com/github/gh-ost +if [ ! -L .gopath/src/github.com/github/gh-ost ]; then + rm -rf .gopath + mkdir -p .gopath/src/github.com/github + ln -s "$PWD" .gopath/src/github.com/github/gh-ost +fi export GOPATH=$PWD/.gopath:$GOPATH diff --git a/script/ensure-go-installed b/script/ensure-go-installed index 1504a5b74..3ed35c058 100755 --- a/script/ensure-go-installed +++ b/script/ensure-go-installed @@ -1,7 +1,7 @@ #!/bin/bash PREFERRED_GO_VERSION=go1.23.0 -SUPPORTED_GO_VERSIONS='go1.2[01234]' +SUPPORTED_GO_VERSIONS='go1.2[012345]' GO_PKG_DARWIN=${PREFERRED_GO_VERSION}.darwin-amd64.pkg GO_PKG_DARWIN_SHA=e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855