Skip to content

Commit 646b956

Browse files
committed
Handle warnings in middle of DML batch
1 parent ac2c9c5 commit 646b956

3 files changed

Lines changed: 253 additions & 69 deletions

File tree

go/cmd/gh-ost/main.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,9 @@ func main() {
316316
if migrationContext.CheckpointIntervalSeconds < 10 {
317317
migrationContext.Log.Fatalf("--checkpoint-seconds should be >=10")
318318
}
319+
if migrationContext.CountTableRows && migrationContext.PanicOnWarnings {
320+
migrationContext.Log.Warning("--exact-rowcount with --panic-on-warnings: row counts cannot be exact due to warning detection")
321+
}
319322

320323
switch *cutOver {
321324
case "atomic", "default", "":

go/logic/applier.go

Lines changed: 140 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -1486,6 +1486,107 @@ func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) []*dmlB
14861486
return []*dmlBuildResult{newDmlBuildResultError(fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML))}
14871487
}
14881488

1489+
// executeBatchWithWarningChecking executes a batch of DML statements with SHOW WARNINGS
1490+
// interleaved after each statement to detect warnings from any statement in the batch.
1491+
// This is used when PanicOnWarnings is enabled to ensure warnings from middle statements
1492+
// are not lost (SHOW WARNINGS only shows warnings from the last statement in a multi-statement batch).
1493+
func (this *Applier) executeBatchWithWarningChecking(ctx context.Context, tx *gosql.Tx, buildResults []*dmlBuildResult) (int64, error) {
1494+
// Build query with interleaved SHOW WARNINGS: stmt1; SHOW WARNINGS; stmt2; SHOW WARNINGS; ...
1495+
var queryBuilder strings.Builder
1496+
args := make([]interface{}, 0)
1497+
1498+
for _, buildResult := range buildResults {
1499+
queryBuilder.WriteString(buildResult.query)
1500+
queryBuilder.WriteString(";\nSHOW WARNINGS;\n")
1501+
args = append(args, buildResult.args...)
1502+
}
1503+
1504+
query := queryBuilder.String()
1505+
1506+
// Execute the multi-statement query
1507+
rows, err := tx.QueryContext(ctx, query, args...)
1508+
if err != nil {
1509+
return 0, fmt.Errorf("%w; query=%s; args=%+v", err, query, args)
1510+
}
1511+
defer rows.Close()
1512+
1513+
var totalDelta int64
1514+
1515+
// QueryContext with multi-statement queries returns rows positioned at the first result set
1516+
// that produces rows (i.e., the first SHOW WARNINGS), automatically skipping DML results.
1517+
// Verify we're at a SHOW WARNINGS result set (should have 3 columns: Level, Code, Message)
1518+
cols, err := rows.Columns()
1519+
if err != nil {
1520+
return 0, fmt.Errorf("failed to get columns: %w", err)
1521+
}
1522+
1523+
// If somehow we're not at a result set with columns, try to advance
1524+
if len(cols) == 0 {
1525+
if !rows.NextResultSet() {
1526+
return 0, fmt.Errorf("expected SHOW WARNINGS result set after first statement")
1527+
}
1528+
}
1529+
1530+
// Compile regex once before loop to avoid performance penalty and handle errors properly
1531+
migrationKeyRegex, err := this.compileMigrationKeyWarningRegex()
1532+
if err != nil {
1533+
return 0, err
1534+
}
1535+
1536+
// Iterate through SHOW WARNINGS result sets.
1537+
// DML statements don't create navigable result sets, so we move directly between SHOW WARNINGS.
1538+
// Pattern: [at SHOW WARNINGS #1] -> read warnings -> NextResultSet() -> [at SHOW WARNINGS #2] -> ...
1539+
for i := 0; i < len(buildResults); i++ {
1540+
// We can't get exact rows affected with QueryContext (needed for reading SHOW WARNINGS).
1541+
// Use the theoretical delta (+1 for INSERT, -1 for DELETE, 0 for UPDATE) as an approximation.
1542+
// This may be inaccurate (e.g., INSERT IGNORE with duplicate affects 0 rows but we count +1).
1543+
totalDelta += buildResults[i].rowsDelta
1544+
1545+
// Read warnings from this statement's SHOW WARNINGS result set
1546+
var sqlWarnings []string
1547+
for rows.Next() {
1548+
var level, message string
1549+
var code int
1550+
if err := rows.Scan(&level, &code, &message); err != nil {
1551+
// Scan failure means we cannot reliably read warnings.
1552+
// Since PanicOnWarnings is a safety feature, we must fail hard rather than silently skip.
1553+
return 0, fmt.Errorf("failed to scan SHOW WARNINGS for statement %d: %w", i+1, err)
1554+
}
1555+
1556+
if strings.Contains(message, "Duplicate entry") && migrationKeyRegex.MatchString(message) {
1557+
// Duplicate entry on migration unique key is expected during binlog replay
1558+
// (row was already copied during bulk copy phase)
1559+
continue
1560+
}
1561+
sqlWarnings = append(sqlWarnings, fmt.Sprintf("%s: %s (%d)", level, message, code))
1562+
}
1563+
1564+
// Check for errors that occurred while iterating through warnings
1565+
if err := rows.Err(); err != nil {
1566+
return 0, fmt.Errorf("error reading SHOW WARNINGS result set for statement %d: %w", i+1, err)
1567+
}
1568+
1569+
if len(sqlWarnings) > 0 {
1570+
return 0, fmt.Errorf("warnings detected in statement %d of %d: %v", i+1, len(buildResults), sqlWarnings)
1571+
}
1572+
1573+
// Move to the next statement's SHOW WARNINGS result set
1574+
// For the last statement, there's no next result set
1575+
// DML statements don't create result sets, so we only need one NextResultSet call
1576+
// to move from SHOW WARNINGS #N to SHOW WARNINGS #(N+1)
1577+
if i < len(buildResults)-1 {
1578+
if !rows.NextResultSet() {
1579+
if err := rows.Err(); err != nil {
1580+
return 0, fmt.Errorf("error moving to SHOW WARNINGS for statement %d: %w", i+2, err)
1581+
}
1582+
return 0, fmt.Errorf("expected SHOW WARNINGS result set for statement %d", i+2)
1583+
}
1584+
}
1585+
}
1586+
1587+
return totalDelta, nil
1588+
}
1589+
14891590
// ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table
14901591
func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) error {
14911592
var totalDelta int64
@@ -1525,82 +1626,52 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent))
15251626
}
15261627
}
15271628

1528-
// We batch together the DML queries into multi-statements to minimize network trips.
1529-
// We have to use the raw driver connection to access the rows affected
1530-
// for each statement in the multi-statement.
1531-
execErr := conn.Raw(func(driverConn any) error {
1532-
ex := driverConn.(driver.ExecerContext)
1533-
nvc := driverConn.(driver.NamedValueChecker)
1534-
1535-
multiArgs := make([]driver.NamedValue, 0, nArgs)
1536-
multiQueryBuilder := strings.Builder{}
1537-
for _, buildResult := range buildResults {
1538-
for _, arg := range buildResult.args {
1539-
nv := driver.NamedValue{Value: driver.Value(arg)}
1540-
nvc.CheckNamedValue(&nv)
1541-
multiArgs = append(multiArgs, nv)
1542-
}
1543-
1544-
multiQueryBuilder.WriteString(buildResult.query)
1545-
multiQueryBuilder.WriteString(";\n")
1546-
}
1547-
1548-
res, err := ex.ExecContext(ctx, multiQueryBuilder.String(), multiArgs)
1549-
if err != nil {
1550-
err = fmt.Errorf("%w; query=%s; args=%+v", err, multiQueryBuilder.String(), multiArgs)
1551-
return err
1552-
}
1553-
1554-
mysqlRes := res.(drivermysql.Result)
1555-
1556-
// each DML is either a single insert (delta +1), update (delta +0) or delete (delta -1).
1557-
// multiplying by the rows actually affected (either 0 or 1) will give an accurate row delta for this DML event
1558-
for i, rowsAffected := range mysqlRes.AllRowsAffected() {
1559-
totalDelta += buildResults[i].rowsDelta * rowsAffected
1560-
}
1561-
return nil
1562-
})
1563-
1564-
if execErr != nil {
1565-
return rollback(execErr)
1566-
}
1567-
1568-
// Check for warnings when PanicOnWarnings is enabled
1629+
// When PanicOnWarnings is enabled, we need to check warnings after each statement
1630+
// in the batch. SHOW WARNINGS only shows warnings from the last statement in a
1631+
// multi-statement query, so we interleave SHOW WARNINGS after each DML statement.
15691632
if this.migrationContext.PanicOnWarnings {
1570-
//nolint:execinquery
1571-
rows, err := tx.Query("SHOW WARNINGS")
1572-
if err != nil {
1573-
return rollback(err)
1574-
}
1575-
defer rows.Close()
1576-
if err = rows.Err(); err != nil {
1577-
return rollback(err)
1578-
}
1579-
1580-
// Compile regex once before loop to avoid performance penalty and handle errors properly
1581-
migrationKeyRegex, err := this.compileMigrationKeyWarningRegex()
1633+
totalDelta, err = this.executeBatchWithWarningChecking(ctx, tx, buildResults)
15821634
if err != nil {
15831635
return rollback(err)
15841636
}
1637+
} else {
1638+
// Fast path: batch together DML queries into multi-statements to minimize network trips.
1639+
// We use the raw driver connection to access the rows affected for each statement.
1640+
execErr := conn.Raw(func(driverConn any) error {
1641+
ex := driverConn.(driver.ExecerContext)
1642+
nvc := driverConn.(driver.NamedValueChecker)
1643+
1644+
multiArgs := make([]driver.NamedValue, 0, nArgs)
1645+
multiQueryBuilder := strings.Builder{}
1646+
for _, buildResult := range buildResults {
1647+
for _, arg := range buildResult.args {
1648+
nv := driver.NamedValue{Value: driver.Value(arg)}
1649+
nvc.CheckNamedValue(&nv)
1650+
multiArgs = append(multiArgs, nv)
1651+
}
1652+
1653+
multiQueryBuilder.WriteString(buildResult.query)
1654+
multiQueryBuilder.WriteString(";\n")
1655+
}
15851656

1586-
var sqlWarnings []string
1587-
for rows.Next() {
1588-
var level, message string
1589-
var code int
1590-
if err := rows.Scan(&level, &code, &message); err != nil {
1591-
this.migrationContext.Log.Warningf("Failed to read SHOW WARNINGS row")
1592-
continue
1657+
res, err := ex.ExecContext(ctx, multiQueryBuilder.String(), multiArgs)
1658+
if err != nil {
1659+
err = fmt.Errorf("%w; query=%s; args=%+v", err, multiQueryBuilder.String(), multiArgs)
1660+
return err
15931661
}
1594-
if strings.Contains(message, "Duplicate entry") && migrationKeyRegex.MatchString(message) {
1595-
// Duplicate entry on migration unique key is expected during binlog replay
1596-
// (row was already copied during bulk copy phase)
1597-
continue
1662+
1663+
mysqlRes := res.(drivermysql.Result)
1664+
1665+
// each DML is either a single insert (delta +1), update (delta +0) or delete (delta -1).
1666+
// multiplying by the rows actually affected (either 0 or 1) will give an accurate row delta for this DML event
1667+
for i, rowsAffected := range mysqlRes.AllRowsAffected() {
1668+
totalDelta += buildResults[i].rowsDelta * rowsAffected
15981669
}
1599-
sqlWarnings = append(sqlWarnings, fmt.Sprintf("%s: %s (%d)", level, message, code))
1600-
}
1601-
if len(sqlWarnings) > 0 {
1602-
warningMsg := fmt.Sprintf("Warnings detected during DML event application: %v", sqlWarnings)
1603-
return rollback(errors.New(warningMsg))
1670+
return nil
1671+
})
1672+
1673+
if execErr != nil {
1674+
return rollback(execErr)
16041675
}
16051676
}
16061677

go/logic/applier_test.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1367,6 +1367,116 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsDisabled() {
13671367
suite.Require().Equal("bob@example.com", results[1].email)
13681368
}
13691369

1370+
// TestMultipleDMLEventsInBatch tests that multiple DML events are processed in a single transaction
1371+
// and that if one fails due to a warning, the entire batch is rolled back - including events that
1372+
// come AFTER the failure. This proves true transaction atomicity.
1373+
func (suite *ApplierTestSuite) TestMultipleDMLEventsInBatch() {
1374+
ctx := context.Background()
1375+
1376+
var err error
1377+
1378+
// Create table with id and email columns
1379+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100));", getTestTableName()))
1380+
suite.Require().NoError(err)
1381+
1382+
// Create ghost table with unique index on email
1383+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100), UNIQUE KEY email_unique (email));", getTestGhostTableName()))
1384+
suite.Require().NoError(err)
1385+
1386+
connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer)
1387+
suite.Require().NoError(err)
1388+
1389+
migrationContext := newTestMigrationContext()
1390+
migrationContext.ApplierConnectionConfig = connectionConfig
1391+
migrationContext.SetConnectionConfig("innodb")
1392+
1393+
migrationContext.PanicOnWarnings = true
1394+
1395+
migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "email"})
1396+
migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "email"})
1397+
migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "email"})
1398+
migrationContext.UniqueKey = &sql.UniqueKey{
1399+
Name: "PRIMARY",
1400+
NameInGhostTable: "PRIMARY",
1401+
Columns: *sql.NewColumnList([]string{"id"}),
1402+
}
1403+
1404+
applier := NewApplier(migrationContext)
1405+
suite.Require().NoError(applier.prepareQueries())
1406+
defer applier.Teardown()
1407+
1408+
err = applier.InitDBConnections()
1409+
suite.Require().NoError(err)
1410+
1411+
// Insert initial rows into ghost table
1412+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (id, email) VALUES (1, 'alice@example.com'), (3, 'charlie@example.com');", getTestGhostTableName()))
1413+
suite.Require().NoError(err)
1414+
1415+
// Simulate multiple binlog events in a batch:
1416+
// 1. Duplicate on PRIMARY KEY (allowed - expected during binlog replay)
1417+
// 2. Duplicate on email index (should fail) ← FAILURE IN MIDDLE
1418+
// 3. Valid insert (would succeed) ← SUCCESS AFTER FAILURE
1419+
//
1420+
// The critical test: Even though event #3 would succeed on its own, it must be rolled back
1421+
// because event #2 failed. This proves the entire batch is truly atomic.
1422+
dmlEvents := []*binlog.BinlogDMLEvent{
1423+
{
1424+
DatabaseName: testMysqlDatabase,
1425+
TableName: testMysqlTableName,
1426+
DML: binlog.InsertDML,
1427+
NewColumnValues: sql.ToColumnValues([]interface{}{1, "alice@example.com"}), // duplicate PRIMARY (normally allowed)
1428+
},
1429+
{
1430+
DatabaseName: testMysqlDatabase,
1431+
TableName: testMysqlTableName,
1432+
DML: binlog.InsertDML,
1433+
NewColumnValues: sql.ToColumnValues([]interface{}{4, "alice@example.com"}), // duplicate email (FAILS)
1434+
},
1435+
{
1436+
DatabaseName: testMysqlDatabase,
1437+
TableName: testMysqlTableName,
1438+
DML: binlog.InsertDML,
1439+
NewColumnValues: sql.ToColumnValues([]interface{}{2, "bob@example.com"}), // valid insert (would succeed)
1440+
},
1441+
}
1442+
1443+
// Should fail due to the second event
1444+
err = applier.ApplyDMLEventQueries(dmlEvents)
1445+
suite.Require().Error(err)
1446+
suite.Require().Contains(err.Error(), "Duplicate entry")
1447+
1448+
// Verify that the entire batch was rolled back - still only the original 2 rows
1449+
// Critically: id=2 (bob@example.com) from event #3 should NOT be present
1450+
rows, err := suite.db.Query("SELECT id, email FROM " + getTestGhostTableName() + " ORDER BY id")
1451+
suite.Require().NoError(err)
1452+
defer rows.Close()
1453+
1454+
var results []struct {
1455+
id int
1456+
email string
1457+
}
1458+
for rows.Next() {
1459+
var id int
1460+
var email string
1461+
err = rows.Scan(&id, &email)
1462+
suite.Require().NoError(err)
1463+
results = append(results, struct {
1464+
id int
1465+
email string
1466+
}{id, email})
1467+
}
1468+
suite.Require().NoError(rows.Err())
1469+
1470+
// Should still have exactly 2 original rows (entire batch was rolled back)
1471+
// This proves that even event #3 (which would have succeeded) was rolled back
1472+
suite.Require().Len(results, 2)
1473+
suite.Require().Equal(1, results[0].id)
1474+
suite.Require().Equal("alice@example.com", results[0].email)
1475+
suite.Require().Equal(3, results[1].id)
1476+
suite.Require().Equal("charlie@example.com", results[1].email)
1477+
// Critically: id=2 (bob@example.com) is NOT present, proving event #3 was rolled back
1478+
}
1479+
13701480
func TestApplier(t *testing.T) {
13711481
suite.Run(t, new(ApplierTestSuite))
13721482
}

0 commit comments

Comments
 (0)