Skip to content

Commit 99835fe

Browse files
authored
Merge branch 'master' into hasan-dot/early-instant-ddl
2 parents d48087b + 8bc63f0 commit 99835fe

3 files changed

Lines changed: 234 additions & 1 deletion

File tree

go/logic/migrator.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,10 +161,21 @@ func (this *Migrator) retryOperation(operation func() error, notFatalHint ...boo
161161
// sleep after previous iteration
162162
RetrySleepFn(1 * time.Second)
163163
}
164+
// Check for abort/context cancellation before each retry
165+
if abortErr := this.checkAbort(); abortErr != nil {
166+
return abortErr
167+
}
164168
err = operation()
165169
if err == nil {
166170
return nil
167171
}
172+
// Check if this is an unrecoverable error (data consistency issues won't resolve on retry)
173+
if strings.Contains(err.Error(), "warnings detected") {
174+
if len(notFatalHint) == 0 {
175+
_ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, err)
176+
}
177+
return err
178+
}
168179
// there's an error. Let's try again.
169180
}
170181
if len(notFatalHint) == 0 {
@@ -191,10 +202,21 @@ func (this *Migrator) retryOperationWithExponentialBackoff(operation func() erro
191202
if i != 0 {
192203
RetrySleepFn(time.Duration(interval) * time.Second)
193204
}
205+
// Check for abort/context cancellation before each retry
206+
if abortErr := this.checkAbort(); abortErr != nil {
207+
return abortErr
208+
}
194209
err = operation()
195210
if err == nil {
196211
return nil
197212
}
213+
// Check if this is an unrecoverable error (data consistency issues won't resolve on retry)
214+
if strings.Contains(err.Error(), "warnings detected") {
215+
if len(notFatalHint) == 0 {
216+
_ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, err)
217+
}
218+
return err
219+
}
198220
}
199221
if len(notFatalHint) == 0 {
200222
// Use helper to prevent deadlock if listenOnPanicAbort already exited

go/logic/migrator_test.go

Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -758,6 +758,118 @@ func TestMigratorRetryWithExponentialBackoff(t *testing.T) {
758758
assert.Equal(t, tries, 100)
759759
}
760760

761+
func TestMigratorRetryAbortsOnContextCancellation(t *testing.T) {
762+
oldRetrySleepFn := RetrySleepFn
763+
defer func() { RetrySleepFn = oldRetrySleepFn }()
764+
765+
migrationContext := base.NewMigrationContext()
766+
migrationContext.SetDefaultNumRetries(100)
767+
migrator := NewMigrator(migrationContext, "1.2.3")
768+
769+
RetrySleepFn = func(duration time.Duration) {
770+
// No sleep needed for this test
771+
}
772+
773+
var tries = 0
774+
retryable := func() error {
775+
tries++
776+
if tries == 5 {
777+
// Cancel context on 5th try
778+
migrationContext.CancelContext()
779+
}
780+
return errors.New("Simulated error")
781+
}
782+
783+
result := migrator.retryOperation(retryable, false)
784+
assert.Error(t, result)
785+
// Should abort after 6 tries: 5 failures + 1 checkAbort detection
786+
assert.True(t, tries <= 6, "Expected tries <= 6, got %d", tries)
787+
// Verify we got context cancellation error
788+
assert.Contains(t, result.Error(), "context canceled")
789+
}
790+
791+
func TestMigratorRetryWithExponentialBackoffAbortsOnContextCancellation(t *testing.T) {
792+
oldRetrySleepFn := RetrySleepFn
793+
defer func() { RetrySleepFn = oldRetrySleepFn }()
794+
795+
migrationContext := base.NewMigrationContext()
796+
migrationContext.SetDefaultNumRetries(100)
797+
migrationContext.SetExponentialBackoffMaxInterval(42)
798+
migrator := NewMigrator(migrationContext, "1.2.3")
799+
800+
RetrySleepFn = func(duration time.Duration) {
801+
// No sleep needed for this test
802+
}
803+
804+
var tries = 0
805+
retryable := func() error {
806+
tries++
807+
if tries == 5 {
808+
// Cancel context on 5th try
809+
migrationContext.CancelContext()
810+
}
811+
return errors.New("Simulated error")
812+
}
813+
814+
result := migrator.retryOperationWithExponentialBackoff(retryable, false)
815+
assert.Error(t, result)
816+
// Should abort after 6 tries: 5 failures + 1 checkAbort detection
817+
assert.True(t, tries <= 6, "Expected tries <= 6, got %d", tries)
818+
// Verify we got context cancellation error
819+
assert.Contains(t, result.Error(), "context canceled")
820+
}
821+
822+
func TestMigratorRetrySkipsRetriesForWarnings(t *testing.T) {
823+
oldRetrySleepFn := RetrySleepFn
824+
defer func() { RetrySleepFn = oldRetrySleepFn }()
825+
826+
migrationContext := base.NewMigrationContext()
827+
migrationContext.SetDefaultNumRetries(100)
828+
migrator := NewMigrator(migrationContext, "1.2.3")
829+
830+
RetrySleepFn = func(duration time.Duration) {
831+
t.Fatal("Should not sleep/retry for warning errors")
832+
}
833+
834+
var tries = 0
835+
retryable := func() error {
836+
tries++
837+
return errors.New("warnings detected in statement 1 of 1: [Warning: Duplicate entry 'test' for key 'idx' (1062)]")
838+
}
839+
840+
result := migrator.retryOperation(retryable, false)
841+
assert.Error(t, result)
842+
// Should only try once - no retries for warnings
843+
assert.Equal(t, 1, tries, "Expected exactly 1 try (no retries) for warning error")
844+
assert.Contains(t, result.Error(), "warnings detected")
845+
}
846+
847+
func TestMigratorRetryWithExponentialBackoffSkipsRetriesForWarnings(t *testing.T) {
848+
oldRetrySleepFn := RetrySleepFn
849+
defer func() { RetrySleepFn = oldRetrySleepFn }()
850+
851+
migrationContext := base.NewMigrationContext()
852+
migrationContext.SetDefaultNumRetries(100)
853+
migrationContext.SetExponentialBackoffMaxInterval(42)
854+
migrator := NewMigrator(migrationContext, "1.2.3")
855+
856+
RetrySleepFn = func(duration time.Duration) {
857+
t.Fatal("Should not sleep/retry for warning errors")
858+
}
859+
860+
var tries = 0
861+
retryable := func() error {
862+
tries++
863+
return errors.New("warnings detected in statement 1 of 1: [Warning: Duplicate entry 'test' for key 'idx' (1062)]")
864+
}
865+
866+
result := migrator.retryOperationWithExponentialBackoff(retryable, false)
867+
assert.Error(t, result)
868+
// Should only try once - no retries for warnings
869+
assert.Equal(t, 1, tries, "Expected exactly 1 try (no retries) for warning error")
870+
assert.Contains(t, result.Error(), "warnings detected")
871+
}
872+
761873
func (suite *MigratorTestSuite) TestCutOverLossDataCaseLockGhostBeforeRename() {
762874
ctx := context.Background()
763875

@@ -1254,3 +1366,102 @@ func TestCheckAbort_DetectsContextCancellation(t *testing.T) {
12541366
t.Fatal("Expected checkAbort to return error when context is cancelled")
12551367
}
12561368
}
1369+
1370+
func (suite *MigratorTestSuite) TestPanicOnWarningsDuplicateDuringCutoverWithHighRetries() {
1371+
ctx := context.Background()
1372+
1373+
// Create table with email column (no unique constraint initially)
1374+
_, err := suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY AUTO_INCREMENT, email VARCHAR(100))", getTestTableName()))
1375+
suite.Require().NoError(err)
1376+
1377+
// Insert initial rows with unique email values - passes pre-flight validation
1378+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (email) VALUES ('user1@example.com')", getTestTableName()))
1379+
suite.Require().NoError(err)
1380+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (email) VALUES ('user2@example.com')", getTestTableName()))
1381+
suite.Require().NoError(err)
1382+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (email) VALUES ('user3@example.com')", getTestTableName()))
1383+
suite.Require().NoError(err)
1384+
1385+
// Verify we have 3 rows
1386+
var count int
1387+
err = suite.db.QueryRowContext(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s", getTestTableName())).Scan(&count)
1388+
suite.Require().NoError(err)
1389+
suite.Require().Equal(3, count)
1390+
1391+
// Create postpone flag file
1392+
tmpDir, err := os.MkdirTemp("", "gh-ost-postpone-test")
1393+
suite.Require().NoError(err)
1394+
defer os.RemoveAll(tmpDir)
1395+
postponeFlagFile := filepath.Join(tmpDir, "postpone.flag")
1396+
err = os.WriteFile(postponeFlagFile, []byte{}, 0644)
1397+
suite.Require().NoError(err)
1398+
1399+
// Start migration in goroutine
1400+
done := make(chan error, 1)
1401+
go func() {
1402+
connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer)
1403+
if err != nil {
1404+
done <- err
1405+
return
1406+
}
1407+
1408+
migrationContext := newTestMigrationContext()
1409+
migrationContext.ApplierConnectionConfig = connectionConfig
1410+
migrationContext.InspectorConnectionConfig = connectionConfig
1411+
migrationContext.SetConnectionConfig("innodb")
1412+
migrationContext.AlterStatementOptions = "ADD UNIQUE KEY unique_email_idx (email)"
1413+
migrationContext.HeartbeatIntervalMilliseconds = 100
1414+
migrationContext.PostponeCutOverFlagFile = postponeFlagFile
1415+
migrationContext.PanicOnWarnings = true
1416+
1417+
// High retry count + exponential backoff means retries will take a long time and fail the test if not properly aborted
1418+
migrationContext.SetDefaultNumRetries(30)
1419+
migrationContext.CutOverExponentialBackoff = true
1420+
migrationContext.SetExponentialBackoffMaxInterval(128)
1421+
1422+
migrator := NewMigrator(migrationContext, "0.0.0")
1423+
1424+
//nolint:contextcheck
1425+
done <- migrator.Migrate()
1426+
}()
1427+
1428+
// Wait for migration to reach postponed state
1429+
// TODO replace this with an actual check for postponed state
1430+
time.Sleep(3 * time.Second)
1431+
1432+
// Now insert a duplicate email value while migration is postponed
1433+
// This simulates data arriving during migration that would violate the unique constraint
1434+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (email) VALUES ('user1@example.com')", getTestTableName()))
1435+
suite.Require().NoError(err)
1436+
1437+
// Verify we now have 4 rows (including the duplicate)
1438+
err = suite.db.QueryRowContext(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s", getTestTableName())).Scan(&count)
1439+
suite.Require().NoError(err)
1440+
suite.Require().Equal(4, count)
1441+
1442+
// Unpostpone the migration - gh-ost will now try to apply binlog events with the duplicate
1443+
err = os.Remove(postponeFlagFile)
1444+
suite.Require().NoError(err)
1445+
1446+
// Wait for Migrate() to return - with timeout to detect if it hangs
1447+
select {
1448+
case migrateErr := <-done:
1449+
// Success - Migrate() returned
1450+
// It should return an error due to the duplicate
1451+
suite.Require().Error(migrateErr, "Expected migration to fail due to duplicate key violation")
1452+
suite.Require().Contains(migrateErr.Error(), "Duplicate entry", "Error should mention duplicate entry")
1453+
case <-time.After(5 * time.Minute):
1454+
suite.FailNow("Migrate() hung and did not return within 5 minutes - failure to abort on warnings in retry loop")
1455+
}
1456+
1457+
// Verify all 4 rows are still in the original table (no silent data loss)
1458+
err = suite.db.QueryRowContext(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s", getTestTableName())).Scan(&count)
1459+
suite.Require().NoError(err)
1460+
suite.Require().Equal(4, count, "Original table should still have all 4 rows")
1461+
1462+
// Verify both user1@example.com entries still exist
1463+
var duplicateCount int
1464+
err = suite.db.QueryRowContext(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s WHERE email = 'user1@example.com'", getTestTableName())).Scan(&duplicateCount)
1465+
suite.Require().NoError(err)
1466+
suite.Require().Equal(2, duplicateCount, "Should have 2 duplicate email entries")
1467+
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
ERROR warnings detected in statement 1 of 1
1+
ERROR warnings detected in statement

0 commit comments

Comments
 (0)