diff --git a/config.go b/config.go index 2054e7670..78335575e 100644 --- a/config.go +++ b/config.go @@ -764,6 +764,10 @@ type Config struct { // The ghostferry target user should have SUPER permissions to actually write to the target DB, // if ghostferry is ran with AllowSuperUserOnReadOnly = true and the target DB is set to read_only. AllowSuperUserOnReadOnly bool + + // The interval at which the periodic schema fingerprint verification occurs, in the + // format of time.ParseDuration. Default: 60s. + PeriodicallyVerifySchemaFingerPrintInterval string } func (c *Config) ValidateConfig() error { @@ -833,5 +837,14 @@ func (c *Config) ValidateConfig() error { c.CutoverRetryWaitSeconds = 1 } + if len(c.PeriodicallyVerifySchemaFingerPrintInterval) == 0 { + c.PeriodicallyVerifySchemaFingerPrintInterval = "60s" + } else { + _, err := time.ParseDuration(c.PeriodicallyVerifySchemaFingerPrintInterval) + if err != nil { + return fmt.Errorf("PeriodicallyVerifySchemaFingerPrintInterval invalid") + } + } + return nil } diff --git a/docs/source/technicaloverview.rst b/docs/source/technicaloverview.rst index a5ec382d9..ddcaa361c 100644 --- a/docs/source/technicaloverview.rst +++ b/docs/source/technicaloverview.rst @@ -102,6 +102,10 @@ Limitations - For tables with foreign key constraints, the constraints should be removed before performing the data migration. +- Ghostferry does not support schema changes during the migration or when the migration is + interrupted. Currently, Ghostferry checks in the background the schema of the + databases being migrated every 1 minute. + Algorithm Correctness --------------------- diff --git a/ferry.go b/ferry.go index 3e9a9d5d4..839637143 100644 --- a/ferry.go +++ b/ferry.go @@ -63,6 +63,8 @@ type Ferry struct { DataIterator *DataIterator BatchWriter *BatchWriter + SchemaFingerPrintVerifier *SchemaFingerPrintVerifier + StateTracker *StateTracker ErrorHandler ErrorHandler Throttler Throttler @@ -245,6 +247,37 @@ func (f *Ferry) NewInlineVerifier() *InlineVerifier { } } +func (f *Ferry) NewSchemaFingerPrintVerifier() (*SchemaFingerPrintVerifier, error) { + var sourceSchemaFingerPrint, targetSchemaFingerPrint string + + if f.StateToResumeFrom != nil { + if len(f.StateToResumeFrom.SourceSchemaFingerPrint) != 0 { + sourceSchemaFingerPrint = f.StateToResumeFrom.SourceSchemaFingerPrint + } + if len(f.StateToResumeFrom.TargetSchemaFingerPrint) != 0 { + targetSchemaFingerPrint = f.StateToResumeFrom.TargetSchemaFingerPrint + } + } + periodicallyVerifyInterval, err := time.ParseDuration(f.Config.PeriodicallyVerifySchemaFingerPrintInterval) + if err != nil { + return nil, fmt.Errorf("invalid PeriodicallyVerifySchemaFingerPrintInterval: %v. this error should have been caught via .Validate()", err) + } + + return &SchemaFingerPrintVerifier{ + SourceDB: f.SourceDB, + TargetDB: f.TargetDB, + DatabaseRewrites: f.Config.DatabaseRewrites, + TableSchemaCache: f.Tables, + ErrorHandler: f.ErrorHandler, + PeriodicallyVerifyInterval: periodicallyVerifyInterval, + + SourceSchemaFingerprint: sourceSchemaFingerPrint, + TargetSchemaFingerprint: targetSchemaFingerPrint, + + logger: logrus.WithField("tag", "schema_fingerprint_verifier"), + }, nil +} + func (f *Ferry) NewInlineVerifierWithoutStateTracker() *InlineVerifier { v := f.NewInlineVerifier() v.StateTracker = nil @@ -489,6 +522,11 @@ func (f *Ferry) Initialize() (err error) { } } + f.SchemaFingerPrintVerifier, err = f.NewSchemaFingerPrintVerifier() + if err != nil { + return err + } + // The iterative verifier needs the binlog streamer so this has to be first. // Eventually this can be moved below the verifier initialization. f.BinlogStreamer = f.NewSourceBinlogStreamer() @@ -686,6 +724,12 @@ func (f *Ferry) Run() { }() } + supportingServicesWg.Add(1) + go func() { + defer supportingServicesWg.Done() + f.SchemaFingerPrintVerifier.PeriodicallyVerifySchemaFingerprints(ctx) + }() + inlineVerifierWg := &sync.WaitGroup{} inlineVerifierContext, stopInlineVerifier := context.WithCancel(ctx) if f.inlineVerifier != nil { @@ -787,6 +831,7 @@ func (f *Ferry) Run() { f.DoneTime = time.Now() shutdown() + supportingServicesWg.Wait() if f.Config.ProgressCallback.URI != "" { @@ -913,7 +958,7 @@ func (f *Ferry) SerializeStateToJSON() (string, error) { binlogVerifyStore = f.inlineVerifier.reverifyStore } - serializedState := f.StateTracker.Serialize(f.Tables, binlogVerifyStore) + serializedState := f.StateTracker.Serialize(f.Tables, binlogVerifyStore, f.SchemaFingerPrintVerifier.SourceSchemaFingerprint, f.SchemaFingerPrintVerifier.TargetSchemaFingerprint) if f.Config.DoNotIncludeSchemaCacheInStateDump { serializedState.LastKnownTableSchemaCache = nil @@ -945,7 +990,7 @@ func (f *Ferry) Progress() *Progress { } // Table Progress - serializedState := f.StateTracker.Serialize(nil, nil) + serializedState := f.StateTracker.Serialize(nil, nil, f.SchemaFingerPrintVerifier.SourceSchemaFingerprint, f.SchemaFingerPrintVerifier.TargetSchemaFingerprint) // Note the below will not necessarily be synchronized with serializedState. // This is fine as we don't need to be super precise with performance data. rowStatsWrittenPerTable := f.StateTracker.RowStatsWrittenPerTable() diff --git a/schema_fingerprint_verifier.go b/schema_fingerprint_verifier.go new file mode 100644 index 000000000..61c6b0070 --- /dev/null +++ b/schema_fingerprint_verifier.go @@ -0,0 +1,134 @@ +package ghostferry + +import ( + "context" + "crypto/md5" + "encoding/hex" + "encoding/json" + "fmt" + "time" + + sql "github.com/Shopify/ghostferry/sqlwrapper" + "github.com/sirupsen/logrus" +) + +type SchemaFingerPrintVerifier struct { + SourceDB *sql.DB + TargetDB *sql.DB + DatabaseRewrites map[string]string + TableSchemaCache TableSchemaCache + ErrorHandler ErrorHandler + PeriodicallyVerifyInterval time.Duration + + SourceSchemaFingerprint string + TargetSchemaFingerprint string + + logger *logrus.Entry +} + +func (sf *SchemaFingerPrintVerifier) PeriodicallyVerifySchemaFingerprints(ctx context.Context) { + sf.logger.Info("starting periodic schema fingerprint verification") + ticker := time.NewTicker(sf.PeriodicallyVerifyInterval) + + defer ticker.Stop() + + for { + select { + case <-ticker.C: + err := sf.VerifySchemaFingerprint() + if err != nil { + sf.ErrorHandler.Fatal("schema_fingerprint_verifier", err) + } + case <-ctx.Done(): + sf.logger.Info("shutdown schema_fingerprint_verifier") + return + } + } +} + +func (sf *SchemaFingerPrintVerifier) VerifySchemaFingerprint() error { + err := sf.verifySourceSchemaFingerprint() + if err != nil { + return err + } + + err = sf.verifyTargetSchemaFingerprint() + if err != nil { + return err + } + + return nil +} + +func (sf *SchemaFingerPrintVerifier) verifySourceSchemaFingerprint() error { + newSchemaSourceFingerPrint, err := sf.getSchemaFingerPrint(sf.SourceDB, false) + if err != nil { + return err + } + + if len(sf.SourceSchemaFingerprint) != 0 && newSchemaSourceFingerPrint != sf.SourceSchemaFingerprint { + return fmt.Errorf("failed to verifiy schema fingerprint on source") + } else { + sf.SourceSchemaFingerprint = newSchemaSourceFingerPrint + } + + return nil +} + +func (sf *SchemaFingerPrintVerifier) verifyTargetSchemaFingerprint() error { + newSchemaTargetFingerPrint, err := sf.getSchemaFingerPrint(sf.TargetDB, true) + if err != nil { + return err + } + + if len(sf.TargetSchemaFingerprint) != 0 && newSchemaTargetFingerPrint != sf.TargetSchemaFingerprint { + return fmt.Errorf("failed to verifiy schema fingerprint on target") + } else { + sf.TargetSchemaFingerprint = newSchemaTargetFingerPrint + } + + return nil +} + +func (sf *SchemaFingerPrintVerifier) getSchemaFingerPrint(db *sql.DB, isTargetDB bool) (string, error) { + dbSet := map[string]struct{}{} + schemaData := [][]interface{}{} + + for _, table := range sf.TableSchemaCache { + if _, found := dbSet[table.Schema]; found { + continue + } + dbSet[table.Schema] = struct{}{} + + dbname := table.Schema + if isTargetDB { + if targetDbName, exists := sf.DatabaseRewrites[dbname]; exists { + dbname = targetDbName + } + } + + query := fmt.Sprintf("SELECT * FROM information_schema.columns WHERE table_schema = '%s' ORDER BY table_name, column_name", dbname) + rows, err := db.Query(query) + if err != nil { + fmt.Println(err) + return "", err + } + + for rows.Next() { + // `information_schema.columns` table has 21 columns. + rowData, err := ScanGenericRow(rows, 21) + if err != nil { + return "", err + } + schemaData = append(schemaData, rowData) + } + } + + schemaDataInBytes, err := json.Marshal(schemaData) + if err != nil { + return "", err + } + + hash := md5.Sum([]byte(schemaDataInBytes)) + return hex.EncodeToString(hash[:]), nil +} diff --git a/state_tracker.go b/state_tracker.go index f346a5c58..5155c54d3 100644 --- a/state_tracker.go +++ b/state_tracker.go @@ -40,6 +40,8 @@ type SerializableState struct { BinlogVerifyStore BinlogVerifySerializedStore LastStoredBinlogPositionForInlineVerifier mysql.Position LastStoredBinlogPositionForTargetVerifier mysql.Position + SourceSchemaFingerPrint string + TargetSchemaFingerPrint string } func (s *SerializableState) MinSourceBinlogPosition() mysql.Position { @@ -253,7 +255,7 @@ func (s *StateTracker) updateSpeedLog(deltaPaginationKey uint64) { } } -func (s *StateTracker) Serialize(lastKnownTableSchemaCache TableSchemaCache, binlogVerifyStore *BinlogVerifyStore) *SerializableState { +func (s *StateTracker) Serialize(lastKnownTableSchemaCache TableSchemaCache, binlogVerifyStore *BinlogVerifyStore, sourceSchemaFingerPrint string, targetSchemaFingerPrint string) *SerializableState { s.BinlogRWMutex.RLock() defer s.BinlogRWMutex.RUnlock() @@ -274,6 +276,14 @@ func (s *StateTracker) Serialize(lastKnownTableSchemaCache TableSchemaCache, bin state.BinlogVerifyStore = binlogVerifyStore.Serialize() } + if len(sourceSchemaFingerPrint) > 0 { + state.SourceSchemaFingerPrint = sourceSchemaFingerPrint + } + + if len(targetSchemaFingerPrint) > 0 { + state.TargetSchemaFingerPrint = targetSchemaFingerPrint + } + // Need a copy because lastSuccessfulPaginationKeys may change after Serialize // returns. This would inaccurately reflect the state of Ghostferry when // Serialize is called. diff --git a/test/go/data_iterator_test.go b/test/go/data_iterator_test.go index bbacf529a..a31076043 100644 --- a/test/go/data_iterator_test.go +++ b/test/go/data_iterator_test.go @@ -163,7 +163,7 @@ func (this *DataIteratorTestSuite) TestDoneListenerGetsNotifiedWhenDone() { } func (this *DataIteratorTestSuite) completedTables() map[string]bool { - return this.di.StateTracker.Serialize(nil, nil).CompletedTables + return this.di.StateTracker.Serialize(nil, nil, "", "").CompletedTables } func (this *DataIteratorTestSuite) TestDataIterationBatchSizePerTableOverride() { diff --git a/test/go/schema_fingerprint_verifier_test.go b/test/go/schema_fingerprint_verifier_test.go new file mode 100644 index 000000000..78d241584 --- /dev/null +++ b/test/go/schema_fingerprint_verifier_test.go @@ -0,0 +1,76 @@ +package test + +import ( + "fmt" + "testing" + "time" + + "github.com/Shopify/ghostferry" + sql "github.com/Shopify/ghostferry/sqlwrapper" + "github.com/Shopify/ghostferry/testhelpers" + "github.com/stretchr/testify/suite" +) + +type SchemaFingerPrintVerifierTestSuite struct { + *testhelpers.GhostferryUnitTestSuite + tablename string + sf *ghostferry.SchemaFingerPrintVerifier +} + +func alterTestTableSchema(db *sql.DB, this *SchemaFingerPrintVerifierTestSuite) { + query := fmt.Sprintf("ALTER TABLE IF EXISTS %s.%s ADD COLUMN extracol VARCHAR(15)", testhelpers.TestSchemaName, this.tablename) + this.Ferry.SourceDB.Query(query) +} + +func resetTestTableSchema(db *sql.DB, this *SchemaFingerPrintVerifierTestSuite) { + query := fmt.Sprintf("ALTER TABLE IF EXISTS %s.%s DROP COLUMN extracol", testhelpers.TestSchemaName, this.tablename) + db.Query(query) +} + +func (this *SchemaFingerPrintVerifierTestSuite) SetupTest() { + this.GhostferryUnitTestSuite.SetupTest() + + this.tablename = "test_table_1" + testhelpers.SeedInitialData(this.Ferry.SourceDB, testhelpers.TestSchemaName, this.tablename, 0) + + tableFilter := &testhelpers.TestTableFilter{ + DbsFunc: testhelpers.DbApplicabilityFilter([]string{testhelpers.TestSchemaName}), + TablesFunc: nil, + } + tableSchema, err := ghostferry.LoadTables(this.Ferry.SourceDB, tableFilter, nil, nil, nil, nil) + this.Require().Nil(err) + + periodicallyVerifyInterval, _ := time.ParseDuration(this.Ferry.Config.PeriodicallyVerifySchemaFingerPrintInterval) + + this.sf = &ghostferry.SchemaFingerPrintVerifier{ + SourceDB: this.Ferry.SourceDB, + TargetDB: this.Ferry.TargetDB, + ErrorHandler: this.Ferry.ErrorHandler, + DatabaseRewrites: map[string]string{}, + TableSchemaCache: tableSchema, + PeriodicallyVerifyInterval: periodicallyVerifyInterval, + } +} + +func (this *SchemaFingerPrintVerifierTestSuite) TestVerifySchemaFingerprint() { + err := this.sf.VerifySchemaFingerprint() + this.Require().Nil(err) + + alterTestTableSchema(this.sf.SourceDB, this) + err = this.sf.VerifySchemaFingerprint() + this.Require().Error(fmt.Errorf("failed to verifiy schema fingerprint on source")) + + resetTestTableSchema(this.sf.SourceDB, this) + this.Require().Nil(err) + + alterTestTableSchema(this.sf.TargetDB, this) + err = this.sf.VerifySchemaFingerprint() + this.Require().Error(fmt.Errorf("failed to verifiy schema fingerprint on target")) +} + +func TestSchemaFingerPrintVerifier(t *testing.T) { + testhelpers.SetupTest() + suite.Run(t, &SchemaFingerPrintVerifierTestSuite{ + GhostferryUnitTestSuite: &testhelpers.GhostferryUnitTestSuite{}, + }) +} diff --git a/test/integration/interrupt_resume_test.rb b/test/integration/interrupt_resume_test.rb index 08cf3eeb2..5cf894b1c 100644 --- a/test/integration/interrupt_resume_test.rb +++ b/test/integration/interrupt_resume_test.rb @@ -518,6 +518,19 @@ def test_interrupt_resume_idempotence_with_failure_and_writes_to_source assert_ghostferry_completed(ghostferry, times: 1) end + def test_interrupt_resume_failure_with_database_schema_changed_during_interrupt + ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) + + ghostferry.on_status(Ghostferry::Status::AFTER_ROW_COPY) do + ghostferry.term_and_wait_for_exit + end + + dumped_state = ghostferry.run_expecting_interrupt + source_db.query("ALTER TABLE #{DEFAULT_FULL_TABLE_NAME} ADD COLUMN extracolumn VARCHAR(15);") + + ghostferry.run_expecting_failure(dumped_state) + end + def test_resume_from_failure_with_state_callback ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) diff --git a/test/integration/trivial_test.rb b/test/integration/trivial_test.rb index 246d31baa..0a47a8ff9 100644 --- a/test/integration/trivial_test.rb +++ b/test/integration/trivial_test.rb @@ -39,4 +39,19 @@ def test_logged_query_omits_columns end end end + + def test_fails_if_database_schema_is_changed_during_data_copy + seed_simple_database_with_single_table + ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) + + batches_written = 0 + ghostferry.on_status(Ghostferry::Status::AFTER_ROW_COPY) do + batches_written += 1 + if batches_written == 1 + source_db.query("ALTER TABLE #{DEFAULT_FULL_TABLE_NAME} ADD COLUMN extracolumn VARCHAR(15);") + end + end + + ghostferry.run_expecting_failure + end end diff --git a/test/lib/go/integrationferry.go b/test/lib/go/integrationferry.go index 9f43c3e30..7d54741a5 100644 --- a/test/lib/go/integrationferry.go +++ b/test/lib/go/integrationferry.go @@ -206,6 +206,8 @@ func NewStandardConfig() (*ghostferry.Config, error) { SkipTargetVerification: (os.Getenv("GHOSTFERRY_SKIP_TARGET_VERIFICATION") == "true"), EnableRowBatchSize: true, DumpStateToStdoutOnError: true, + + PeriodicallyVerifySchemaFingerPrintInterval: "1s", } integrationPort := os.Getenv(portEnvName)