Skip to content

Commit 27878d6

Browse files
Merge branch 'github:master' into master
2 parents 2f34a5b + 8bc63f0 commit 27878d6

11 files changed

Lines changed: 562 additions & 73 deletions

File tree

go/cmd/gh-ost/main.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,9 @@ func main() {
316316
if migrationContext.CheckpointIntervalSeconds < 10 {
317317
migrationContext.Log.Fatalf("--checkpoint-seconds should be >=10")
318318
}
319+
if migrationContext.CountTableRows && migrationContext.PanicOnWarnings {
320+
migrationContext.Log.Warning("--exact-rowcount with --panic-on-warnings: row counts cannot be exact due to warning detection")
321+
}
319322

320323
switch *cutOver {
321324
case "atomic", "default", "":

go/logic/applier.go

Lines changed: 140 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -1522,6 +1522,107 @@ func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) []*dmlB
15221522
return []*dmlBuildResult{newDmlBuildResultError(fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML))}
15231523
}
15241524

1525+
// executeBatchWithWarningChecking executes a batch of DML statements with SHOW WARNINGS
1526+
// interleaved after each statement to detect warnings from any statement in the batch.
1527+
// This is used when PanicOnWarnings is enabled to ensure warnings from middle statements
1528+
// are not lost (SHOW WARNINGS only shows warnings from the last statement in a multi-statement batch).
1529+
func (this *Applier) executeBatchWithWarningChecking(ctx context.Context, tx *gosql.Tx, buildResults []*dmlBuildResult) (int64, error) {
1530+
// Build query with interleaved SHOW WARNINGS: stmt1; SHOW WARNINGS; stmt2; SHOW WARNINGS; ...
1531+
var queryBuilder strings.Builder
1532+
args := make([]interface{}, 0)
1533+
1534+
for _, buildResult := range buildResults {
1535+
queryBuilder.WriteString(buildResult.query)
1536+
queryBuilder.WriteString(";\nSHOW WARNINGS;\n")
1537+
args = append(args, buildResult.args...)
1538+
}
1539+
1540+
query := queryBuilder.String()
1541+
1542+
// Execute the multi-statement query
1543+
rows, err := tx.QueryContext(ctx, query, args...)
1544+
if err != nil {
1545+
return 0, fmt.Errorf("%w; query=%s; args=%+v", err, query, args)
1546+
}
1547+
defer rows.Close()
1548+
1549+
var totalDelta int64
1550+
1551+
// QueryContext with multi-statement queries returns rows positioned at the first result set
1552+
// that produces rows (i.e., the first SHOW WARNINGS), automatically skipping DML results.
1553+
// Verify we're at a SHOW WARNINGS result set (should have 3 columns: Level, Code, Message)
1554+
cols, err := rows.Columns()
1555+
if err != nil {
1556+
return 0, fmt.Errorf("failed to get columns: %w", err)
1557+
}
1558+
1559+
// If somehow we're not at a result set with columns, try to advance
1560+
if len(cols) == 0 {
1561+
if !rows.NextResultSet() {
1562+
return 0, fmt.Errorf("expected SHOW WARNINGS result set after first statement")
1563+
}
1564+
}
1565+
1566+
// Compile regex once before loop to avoid performance penalty and handle errors properly
1567+
migrationKeyRegex, err := this.compileMigrationKeyWarningRegex()
1568+
if err != nil {
1569+
return 0, err
1570+
}
1571+
1572+
// Iterate through SHOW WARNINGS result sets.
1573+
// DML statements don't create navigable result sets, so we move directly between SHOW WARNINGS.
1574+
// Pattern: [at SHOW WARNINGS #1] -> read warnings -> NextResultSet() -> [at SHOW WARNINGS #2] -> ...
1575+
for i := 0; i < len(buildResults); i++ {
1576+
// We can't get exact rows affected with QueryContext (needed for reading SHOW WARNINGS).
1577+
// Use the theoretical delta (+1 for INSERT, -1 for DELETE, 0 for UPDATE) as an approximation.
1578+
// This may be inaccurate (e.g., INSERT IGNORE with duplicate affects 0 rows but we count +1).
1579+
totalDelta += buildResults[i].rowsDelta
1580+
1581+
// Read warnings from this statement's SHOW WARNINGS result set
1582+
var sqlWarnings []string
1583+
for rows.Next() {
1584+
var level, message string
1585+
var code int
1586+
if err := rows.Scan(&level, &code, &message); err != nil {
1587+
// Scan failure means we cannot reliably read warnings.
1588+
// Since PanicOnWarnings is a safety feature, we must fail hard rather than silently skip.
1589+
return 0, fmt.Errorf("failed to scan SHOW WARNINGS for statement %d: %w", i+1, err)
1590+
}
1591+
1592+
if strings.Contains(message, "Duplicate entry") && migrationKeyRegex.MatchString(message) {
1593+
// Duplicate entry on migration unique key is expected during binlog replay
1594+
// (row was already copied during bulk copy phase)
1595+
continue
1596+
}
1597+
sqlWarnings = append(sqlWarnings, fmt.Sprintf("%s: %s (%d)", level, message, code))
1598+
}
1599+
1600+
// Check for errors that occurred while iterating through warnings
1601+
if err := rows.Err(); err != nil {
1602+
return 0, fmt.Errorf("error reading SHOW WARNINGS result set for statement %d: %w", i+1, err)
1603+
}
1604+
1605+
if len(sqlWarnings) > 0 {
1606+
return 0, fmt.Errorf("warnings detected in statement %d of %d: %v", i+1, len(buildResults), sqlWarnings)
1607+
}
1608+
1609+
// Move to the next statement's SHOW WARNINGS result set
1610+
// For the last statement, there's no next result set
1611+
// DML statements don't create result sets, so we only need one NextResultSet call
1612+
// to move from SHOW WARNINGS #N to SHOW WARNINGS #(N+1)
1613+
if i < len(buildResults)-1 {
1614+
if !rows.NextResultSet() {
1615+
if err := rows.Err(); err != nil {
1616+
return 0, fmt.Errorf("error moving to SHOW WARNINGS for statement %d: %w", i+2, err)
1617+
}
1618+
return 0, fmt.Errorf("expected SHOW WARNINGS result set for statement %d", i+2)
1619+
}
1620+
}
1621+
}
1622+
1623+
return totalDelta, nil
1624+
}
1625+
15251626
// ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table
15261627
func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) error {
15271628
var totalDelta int64
@@ -1561,82 +1662,52 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent))
15611662
}
15621663
}
15631664

1564-
// We batch together the DML queries into multi-statements to minimize network trips.
1565-
// We have to use the raw driver connection to access the rows affected
1566-
// for each statement in the multi-statement.
1567-
execErr := conn.Raw(func(driverConn any) error {
1568-
ex := driverConn.(driver.ExecerContext)
1569-
nvc := driverConn.(driver.NamedValueChecker)
1570-
1571-
multiArgs := make([]driver.NamedValue, 0, nArgs)
1572-
multiQueryBuilder := strings.Builder{}
1573-
for _, buildResult := range buildResults {
1574-
for _, arg := range buildResult.args {
1575-
nv := driver.NamedValue{Value: driver.Value(arg)}
1576-
nvc.CheckNamedValue(&nv)
1577-
multiArgs = append(multiArgs, nv)
1578-
}
1579-
1580-
multiQueryBuilder.WriteString(buildResult.query)
1581-
multiQueryBuilder.WriteString(";\n")
1582-
}
1583-
1584-
res, err := ex.ExecContext(ctx, multiQueryBuilder.String(), multiArgs)
1585-
if err != nil {
1586-
err = fmt.Errorf("%w; query=%s; args=%+v", err, multiQueryBuilder.String(), multiArgs)
1587-
return err
1588-
}
1589-
1590-
mysqlRes := res.(drivermysql.Result)
1591-
1592-
// each DML is either a single insert (delta +1), update (delta +0) or delete (delta -1).
1593-
// multiplying by the rows actually affected (either 0 or 1) will give an accurate row delta for this DML event
1594-
for i, rowsAffected := range mysqlRes.AllRowsAffected() {
1595-
totalDelta += buildResults[i].rowsDelta * rowsAffected
1596-
}
1597-
return nil
1598-
})
1599-
1600-
if execErr != nil {
1601-
return rollback(execErr)
1602-
}
1603-
1604-
// Check for warnings when PanicOnWarnings is enabled
1665+
// When PanicOnWarnings is enabled, we need to check warnings after each statement
1666+
// in the batch. SHOW WARNINGS only shows warnings from the last statement in a
1667+
// multi-statement query, so we interleave SHOW WARNINGS after each DML statement.
16051668
if this.migrationContext.PanicOnWarnings {
1606-
//nolint:execinquery
1607-
rows, err := tx.Query("SHOW WARNINGS")
1608-
if err != nil {
1609-
return rollback(err)
1610-
}
1611-
defer rows.Close()
1612-
if err = rows.Err(); err != nil {
1613-
return rollback(err)
1614-
}
1615-
1616-
// Compile regex once before loop to avoid performance penalty and handle errors properly
1617-
migrationKeyRegex, err := this.compileMigrationKeyWarningRegex()
1669+
totalDelta, err = this.executeBatchWithWarningChecking(ctx, tx, buildResults)
16181670
if err != nil {
16191671
return rollback(err)
16201672
}
1673+
} else {
1674+
// Fast path: batch together DML queries into multi-statements to minimize network trips.
1675+
// We use the raw driver connection to access the rows affected for each statement.
1676+
execErr := conn.Raw(func(driverConn any) error {
1677+
ex := driverConn.(driver.ExecerContext)
1678+
nvc := driverConn.(driver.NamedValueChecker)
1679+
1680+
multiArgs := make([]driver.NamedValue, 0, nArgs)
1681+
multiQueryBuilder := strings.Builder{}
1682+
for _, buildResult := range buildResults {
1683+
for _, arg := range buildResult.args {
1684+
nv := driver.NamedValue{Value: driver.Value(arg)}
1685+
nvc.CheckNamedValue(&nv)
1686+
multiArgs = append(multiArgs, nv)
1687+
}
1688+
1689+
multiQueryBuilder.WriteString(buildResult.query)
1690+
multiQueryBuilder.WriteString(";\n")
1691+
}
16211692

1622-
var sqlWarnings []string
1623-
for rows.Next() {
1624-
var level, message string
1625-
var code int
1626-
if err := rows.Scan(&level, &code, &message); err != nil {
1627-
this.migrationContext.Log.Warningf("Failed to read SHOW WARNINGS row")
1628-
continue
1693+
res, err := ex.ExecContext(ctx, multiQueryBuilder.String(), multiArgs)
1694+
if err != nil {
1695+
err = fmt.Errorf("%w; query=%s; args=%+v", err, multiQueryBuilder.String(), multiArgs)
1696+
return err
16291697
}
1630-
if strings.Contains(message, "Duplicate entry") && migrationKeyRegex.MatchString(message) {
1631-
// Duplicate entry on migration unique key is expected during binlog replay
1632-
// (row was already copied during bulk copy phase)
1633-
continue
1698+
1699+
mysqlRes := res.(drivermysql.Result)
1700+
1701+
// each DML is either a single insert (delta +1), update (delta +0) or delete (delta -1).
1702+
// multiplying by the rows actually affected (either 0 or 1) will give an accurate row delta for this DML event
1703+
for i, rowsAffected := range mysqlRes.AllRowsAffected() {
1704+
totalDelta += buildResults[i].rowsDelta * rowsAffected
16341705
}
1635-
sqlWarnings = append(sqlWarnings, fmt.Sprintf("%s: %s (%d)", level, message, code))
1636-
}
1637-
if len(sqlWarnings) > 0 {
1638-
warningMsg := fmt.Sprintf("Warnings detected during DML event application: %v", sqlWarnings)
1639-
return rollback(errors.New(warningMsg))
1706+
return nil
1707+
})
1708+
1709+
if execErr != nil {
1710+
return rollback(execErr)
16401711
}
16411712
}
16421713

go/logic/applier_test.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1435,6 +1435,116 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsDisabled() {
14351435
suite.Require().Equal("bob@example.com", results[1].email)
14361436
}
14371437

1438+
// TestMultipleDMLEventsInBatch tests that multiple DML events are processed in a single transaction
1439+
// and that if one fails due to a warning, the entire batch is rolled back - including events that
1440+
// come AFTER the failure. This proves true transaction atomicity.
1441+
func (suite *ApplierTestSuite) TestMultipleDMLEventsInBatch() {
1442+
ctx := context.Background()
1443+
1444+
var err error
1445+
1446+
// Create table with id and email columns
1447+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100));", getTestTableName()))
1448+
suite.Require().NoError(err)
1449+
1450+
// Create ghost table with unique index on email
1451+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100), UNIQUE KEY email_unique (email));", getTestGhostTableName()))
1452+
suite.Require().NoError(err)
1453+
1454+
connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer)
1455+
suite.Require().NoError(err)
1456+
1457+
migrationContext := newTestMigrationContext()
1458+
migrationContext.ApplierConnectionConfig = connectionConfig
1459+
migrationContext.SetConnectionConfig("innodb")
1460+
1461+
migrationContext.PanicOnWarnings = true
1462+
1463+
migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "email"})
1464+
migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "email"})
1465+
migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "email"})
1466+
migrationContext.UniqueKey = &sql.UniqueKey{
1467+
Name: "PRIMARY",
1468+
NameInGhostTable: "PRIMARY",
1469+
Columns: *sql.NewColumnList([]string{"id"}),
1470+
}
1471+
1472+
applier := NewApplier(migrationContext)
1473+
suite.Require().NoError(applier.prepareQueries())
1474+
defer applier.Teardown()
1475+
1476+
err = applier.InitDBConnections()
1477+
suite.Require().NoError(err)
1478+
1479+
// Insert initial rows into ghost table
1480+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (id, email) VALUES (1, 'alice@example.com'), (3, 'charlie@example.com');", getTestGhostTableName()))
1481+
suite.Require().NoError(err)
1482+
1483+
// Simulate multiple binlog events in a batch:
1484+
// 1. Duplicate on PRIMARY KEY (allowed - expected during binlog replay)
1485+
// 2. Duplicate on email index (should fail) ← FAILURE IN MIDDLE
1486+
// 3. Valid insert (would succeed) ← SUCCESS AFTER FAILURE
1487+
//
1488+
// The critical test: Even though event #3 would succeed on its own, it must be rolled back
1489+
// because event #2 failed. This proves the entire batch is truly atomic.
1490+
dmlEvents := []*binlog.BinlogDMLEvent{
1491+
{
1492+
DatabaseName: testMysqlDatabase,
1493+
TableName: testMysqlTableName,
1494+
DML: binlog.InsertDML,
1495+
NewColumnValues: sql.ToColumnValues([]interface{}{1, "alice@example.com"}), // duplicate PRIMARY (normally allowed)
1496+
},
1497+
{
1498+
DatabaseName: testMysqlDatabase,
1499+
TableName: testMysqlTableName,
1500+
DML: binlog.InsertDML,
1501+
NewColumnValues: sql.ToColumnValues([]interface{}{4, "alice@example.com"}), // duplicate email (FAILS)
1502+
},
1503+
{
1504+
DatabaseName: testMysqlDatabase,
1505+
TableName: testMysqlTableName,
1506+
DML: binlog.InsertDML,
1507+
NewColumnValues: sql.ToColumnValues([]interface{}{2, "bob@example.com"}), // valid insert (would succeed)
1508+
},
1509+
}
1510+
1511+
// Should fail due to the second event
1512+
err = applier.ApplyDMLEventQueries(dmlEvents)
1513+
suite.Require().Error(err)
1514+
suite.Require().Contains(err.Error(), "Duplicate entry")
1515+
1516+
// Verify that the entire batch was rolled back - still only the original 2 rows
1517+
// Critically: id=2 (bob@example.com) from event #3 should NOT be present
1518+
rows, err := suite.db.Query("SELECT id, email FROM " + getTestGhostTableName() + " ORDER BY id")
1519+
suite.Require().NoError(err)
1520+
defer rows.Close()
1521+
1522+
var results []struct {
1523+
id int
1524+
email string
1525+
}
1526+
for rows.Next() {
1527+
var id int
1528+
var email string
1529+
err = rows.Scan(&id, &email)
1530+
suite.Require().NoError(err)
1531+
results = append(results, struct {
1532+
id int
1533+
email string
1534+
}{id, email})
1535+
}
1536+
suite.Require().NoError(rows.Err())
1537+
1538+
// Should still have exactly 2 original rows (entire batch was rolled back)
1539+
// This proves that even event #3 (which would have succeeded) was rolled back
1540+
suite.Require().Len(results, 2)
1541+
suite.Require().Equal(1, results[0].id)
1542+
suite.Require().Equal("alice@example.com", results[0].email)
1543+
suite.Require().Equal(3, results[1].id)
1544+
suite.Require().Equal("charlie@example.com", results[1].email)
1545+
// Critically: id=2 (bob@example.com) is NOT present, proving event #3 was rolled back
1546+
}
1547+
14381548
func TestApplier(t *testing.T) {
14391549
suite.Run(t, new(ApplierTestSuite))
14401550
}

0 commit comments

Comments
 (0)