Skip to content

Commit d20c72d

Browse files
committed
Handle warnings in middle of DML batch
1 parent 67cc636 commit d20c72d

3 files changed

Lines changed: 253 additions & 69 deletions

File tree

go/cmd/gh-ost/main.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,9 @@ func main() {
316316
if migrationContext.CheckpointIntervalSeconds < 10 {
317317
migrationContext.Log.Fatalf("--checkpoint-seconds should be >=10")
318318
}
319+
if migrationContext.CountTableRows && migrationContext.PanicOnWarnings {
320+
migrationContext.Log.Warning("--exact-rowcount with --panic-on-warnings: row counts cannot be exact due to warning detection")
321+
}
319322

320323
switch *cutOver {
321324
case "atomic", "default", "":

go/logic/applier.go

Lines changed: 140 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -1497,6 +1497,107 @@ func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) []*dmlB
14971497
return []*dmlBuildResult{newDmlBuildResultError(fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML))}
14981498
}
14991499

1500+
// executeBatchWithWarningChecking executes a batch of DML statements with SHOW WARNINGS
1501+
// interleaved after each statement to detect warnings from any statement in the batch.
1502+
// This is used when PanicOnWarnings is enabled to ensure warnings from middle statements
1503+
// are not lost (SHOW WARNINGS only shows warnings from the last statement in a multi-statement batch).
1504+
func (this *Applier) executeBatchWithWarningChecking(ctx context.Context, tx *gosql.Tx, buildResults []*dmlBuildResult) (int64, error) {
1505+
// Build query with interleaved SHOW WARNINGS: stmt1; SHOW WARNINGS; stmt2; SHOW WARNINGS; ...
1506+
var queryBuilder strings.Builder
1507+
args := make([]interface{}, 0)
1508+
1509+
for _, buildResult := range buildResults {
1510+
queryBuilder.WriteString(buildResult.query)
1511+
queryBuilder.WriteString(";\nSHOW WARNINGS;\n")
1512+
args = append(args, buildResult.args...)
1513+
}
1514+
1515+
query := queryBuilder.String()
1516+
1517+
// Execute the multi-statement query
1518+
rows, err := tx.QueryContext(ctx, query, args...)
1519+
if err != nil {
1520+
return 0, fmt.Errorf("%w; query=%s; args=%+v", err, query, args)
1521+
}
1522+
defer rows.Close()
1523+
1524+
var totalDelta int64
1525+
1526+
// QueryContext with multi-statement queries returns rows positioned at the first result set
1527+
// that produces rows (i.e., the first SHOW WARNINGS), automatically skipping DML results.
1528+
// Verify we're at a SHOW WARNINGS result set (should have 3 columns: Level, Code, Message)
1529+
cols, err := rows.Columns()
1530+
if err != nil {
1531+
return 0, fmt.Errorf("failed to get columns: %w", err)
1532+
}
1533+
1534+
// If somehow we're not at a result set with columns, try to advance
1535+
if len(cols) == 0 {
1536+
if !rows.NextResultSet() {
1537+
return 0, fmt.Errorf("expected SHOW WARNINGS result set after first statement")
1538+
}
1539+
}
1540+
1541+
// Compile regex once before loop to avoid performance penalty and handle errors properly
1542+
migrationKeyRegex, err := this.compileMigrationKeyWarningRegex()
1543+
if err != nil {
1544+
return 0, err
1545+
}
1546+
1547+
// Iterate through SHOW WARNINGS result sets.
1548+
// DML statements don't create navigable result sets, so we move directly between SHOW WARNINGS.
1549+
// Pattern: [at SHOW WARNINGS #1] -> read warnings -> NextResultSet() -> [at SHOW WARNINGS #2] -> ...
1550+
for i := 0; i < len(buildResults); i++ {
1551+
// We can't get exact rows affected with QueryContext (needed for reading SHOW WARNINGS).
1552+
// Use the theoretical delta (+1 for INSERT, -1 for DELETE, 0 for UPDATE) as an approximation.
1553+
// This may be inaccurate (e.g., INSERT IGNORE with duplicate affects 0 rows but we count +1).
1554+
totalDelta += buildResults[i].rowsDelta
1555+
1556+
// Read warnings from this statement's SHOW WARNINGS result set
1557+
var sqlWarnings []string
1558+
for rows.Next() {
1559+
var level, message string
1560+
var code int
1561+
if err := rows.Scan(&level, &code, &message); err != nil {
1562+
// Scan failure means we cannot reliably read warnings.
1563+
// Since PanicOnWarnings is a safety feature, we must fail hard rather than silently skip.
1564+
return 0, fmt.Errorf("failed to scan SHOW WARNINGS for statement %d: %w", i+1, err)
1565+
}
1566+
1567+
if strings.Contains(message, "Duplicate entry") && migrationKeyRegex.MatchString(message) {
1568+
// Duplicate entry on migration unique key is expected during binlog replay
1569+
// (row was already copied during bulk copy phase)
1570+
continue
1571+
}
1572+
sqlWarnings = append(sqlWarnings, fmt.Sprintf("%s: %s (%d)", level, message, code))
1573+
}
1574+
1575+
// Check for errors that occurred while iterating through warnings
1576+
if err := rows.Err(); err != nil {
1577+
return 0, fmt.Errorf("error reading SHOW WARNINGS result set for statement %d: %w", i+1, err)
1578+
}
1579+
1580+
if len(sqlWarnings) > 0 {
1581+
return 0, fmt.Errorf("warnings detected in statement %d of %d: %v", i+1, len(buildResults), sqlWarnings)
1582+
}
1583+
1584+
// Move to the next statement's SHOW WARNINGS result set
1585+
// For the last statement, there's no next result set
1586+
// DML statements don't create result sets, so we only need one NextResultSet call
1587+
// to move from SHOW WARNINGS #N to SHOW WARNINGS #(N+1)
1588+
if i < len(buildResults)-1 {
1589+
if !rows.NextResultSet() {
1590+
if err := rows.Err(); err != nil {
1591+
return 0, fmt.Errorf("error moving to SHOW WARNINGS for statement %d: %w", i+2, err)
1592+
}
1593+
return 0, fmt.Errorf("expected SHOW WARNINGS result set for statement %d", i+2)
1594+
}
1595+
}
1596+
}
1597+
1598+
return totalDelta, nil
1599+
}
1600+
15001601
// ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table
15011602
func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) error {
15021603
var totalDelta int64
@@ -1536,82 +1637,52 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent))
15361637
}
15371638
}
15381639

1539-
// We batch together the DML queries into multi-statements to minimize network trips.
1540-
// We have to use the raw driver connection to access the rows affected
1541-
// for each statement in the multi-statement.
1542-
execErr := conn.Raw(func(driverConn any) error {
1543-
ex := driverConn.(driver.ExecerContext)
1544-
nvc := driverConn.(driver.NamedValueChecker)
1545-
1546-
multiArgs := make([]driver.NamedValue, 0, nArgs)
1547-
multiQueryBuilder := strings.Builder{}
1548-
for _, buildResult := range buildResults {
1549-
for _, arg := range buildResult.args {
1550-
nv := driver.NamedValue{Value: driver.Value(arg)}
1551-
nvc.CheckNamedValue(&nv)
1552-
multiArgs = append(multiArgs, nv)
1553-
}
1554-
1555-
multiQueryBuilder.WriteString(buildResult.query)
1556-
multiQueryBuilder.WriteString(";\n")
1557-
}
1558-
1559-
res, err := ex.ExecContext(ctx, multiQueryBuilder.String(), multiArgs)
1560-
if err != nil {
1561-
err = fmt.Errorf("%w; query=%s; args=%+v", err, multiQueryBuilder.String(), multiArgs)
1562-
return err
1563-
}
1564-
1565-
mysqlRes := res.(drivermysql.Result)
1566-
1567-
// each DML is either a single insert (delta +1), update (delta +0) or delete (delta -1).
1568-
// multiplying by the rows actually affected (either 0 or 1) will give an accurate row delta for this DML event
1569-
for i, rowsAffected := range mysqlRes.AllRowsAffected() {
1570-
totalDelta += buildResults[i].rowsDelta * rowsAffected
1571-
}
1572-
return nil
1573-
})
1574-
1575-
if execErr != nil {
1576-
return rollback(execErr)
1577-
}
1578-
1579-
// Check for warnings when PanicOnWarnings is enabled
1640+
// When PanicOnWarnings is enabled, we need to check warnings after each statement
1641+
// in the batch. SHOW WARNINGS only shows warnings from the last statement in a
1642+
// multi-statement query, so we interleave SHOW WARNINGS after each DML statement.
15801643
if this.migrationContext.PanicOnWarnings {
1581-
//nolint:execinquery
1582-
rows, err := tx.Query("SHOW WARNINGS")
1583-
if err != nil {
1584-
return rollback(err)
1585-
}
1586-
defer rows.Close()
1587-
if err = rows.Err(); err != nil {
1588-
return rollback(err)
1589-
}
1590-
1591-
// Compile regex once before loop to avoid performance penalty and handle errors properly
1592-
migrationKeyRegex, err := this.compileMigrationKeyWarningRegex()
1644+
totalDelta, err = this.executeBatchWithWarningChecking(ctx, tx, buildResults)
15931645
if err != nil {
15941646
return rollback(err)
15951647
}
1648+
} else {
1649+
// Fast path: batch together DML queries into multi-statements to minimize network trips.
1650+
// We use the raw driver connection to access the rows affected for each statement.
1651+
execErr := conn.Raw(func(driverConn any) error {
1652+
ex := driverConn.(driver.ExecerContext)
1653+
nvc := driverConn.(driver.NamedValueChecker)
1654+
1655+
multiArgs := make([]driver.NamedValue, 0, nArgs)
1656+
multiQueryBuilder := strings.Builder{}
1657+
for _, buildResult := range buildResults {
1658+
for _, arg := range buildResult.args {
1659+
nv := driver.NamedValue{Value: driver.Value(arg)}
1660+
nvc.CheckNamedValue(&nv)
1661+
multiArgs = append(multiArgs, nv)
1662+
}
1663+
1664+
multiQueryBuilder.WriteString(buildResult.query)
1665+
multiQueryBuilder.WriteString(";\n")
1666+
}
15961667

1597-
var sqlWarnings []string
1598-
for rows.Next() {
1599-
var level, message string
1600-
var code int
1601-
if err := rows.Scan(&level, &code, &message); err != nil {
1602-
this.migrationContext.Log.Warningf("Failed to read SHOW WARNINGS row")
1603-
continue
1668+
res, err := ex.ExecContext(ctx, multiQueryBuilder.String(), multiArgs)
1669+
if err != nil {
1670+
err = fmt.Errorf("%w; query=%s; args=%+v", err, multiQueryBuilder.String(), multiArgs)
1671+
return err
16041672
}
1605-
if strings.Contains(message, "Duplicate entry") && migrationKeyRegex.MatchString(message) {
1606-
// Duplicate entry on migration unique key is expected during binlog replay
1607-
// (row was already copied during bulk copy phase)
1608-
continue
1673+
1674+
mysqlRes := res.(drivermysql.Result)
1675+
1676+
// each DML is either a single insert (delta +1), update (delta +0) or delete (delta -1).
1677+
// multiplying by the rows actually affected (either 0 or 1) will give an accurate row delta for this DML event
1678+
for i, rowsAffected := range mysqlRes.AllRowsAffected() {
1679+
totalDelta += buildResults[i].rowsDelta * rowsAffected
16091680
}
1610-
sqlWarnings = append(sqlWarnings, fmt.Sprintf("%s: %s (%d)", level, message, code))
1611-
}
1612-
if len(sqlWarnings) > 0 {
1613-
warningMsg := fmt.Sprintf("Warnings detected during DML event application: %v", sqlWarnings)
1614-
return rollback(errors.New(warningMsg))
1681+
return nil
1682+
})
1683+
1684+
if execErr != nil {
1685+
return rollback(execErr)
16151686
}
16161687
}
16171688

go/logic/applier_test.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1367,6 +1367,116 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsDisabled() {
13671367
suite.Require().Equal("bob@example.com", results[1].email)
13681368
}
13691369

1370+
// TestMultipleDMLEventsInBatch tests that multiple DML events are processed in a single transaction
1371+
// and that if one fails due to a warning, the entire batch is rolled back - including events that
1372+
// come AFTER the failure. This proves true transaction atomicity.
1373+
func (suite *ApplierTestSuite) TestMultipleDMLEventsInBatch() {
1374+
ctx := context.Background()
1375+
1376+
var err error
1377+
1378+
// Create table with id and email columns
1379+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100));", getTestTableName()))
1380+
suite.Require().NoError(err)
1381+
1382+
// Create ghost table with unique index on email
1383+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100), UNIQUE KEY email_unique (email));", getTestGhostTableName()))
1384+
suite.Require().NoError(err)
1385+
1386+
connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer)
1387+
suite.Require().NoError(err)
1388+
1389+
migrationContext := newTestMigrationContext()
1390+
migrationContext.ApplierConnectionConfig = connectionConfig
1391+
migrationContext.SetConnectionConfig("innodb")
1392+
1393+
migrationContext.PanicOnWarnings = true
1394+
1395+
migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "email"})
1396+
migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "email"})
1397+
migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "email"})
1398+
migrationContext.UniqueKey = &sql.UniqueKey{
1399+
Name: "PRIMARY",
1400+
NameInGhostTable: "PRIMARY",
1401+
Columns: *sql.NewColumnList([]string{"id"}),
1402+
}
1403+
1404+
applier := NewApplier(migrationContext)
1405+
suite.Require().NoError(applier.prepareQueries())
1406+
defer applier.Teardown()
1407+
1408+
err = applier.InitDBConnections()
1409+
suite.Require().NoError(err)
1410+
1411+
// Insert initial rows into ghost table
1412+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (id, email) VALUES (1, 'alice@example.com'), (3, 'charlie@example.com');", getTestGhostTableName()))
1413+
suite.Require().NoError(err)
1414+
1415+
// Simulate multiple binlog events in a batch:
1416+
// 1. Duplicate on PRIMARY KEY (allowed - expected during binlog replay)
1417+
// 2. Duplicate on email index (should fail) ← FAILURE IN MIDDLE
1418+
// 3. Valid insert (would succeed) ← SUCCESS AFTER FAILURE
1419+
//
1420+
// The critical test: Even though event #3 would succeed on its own, it must be rolled back
1421+
// because event #2 failed. This proves the entire batch is truly atomic.
1422+
dmlEvents := []*binlog.BinlogDMLEvent{
1423+
{
1424+
DatabaseName: testMysqlDatabase,
1425+
TableName: testMysqlTableName,
1426+
DML: binlog.InsertDML,
1427+
NewColumnValues: sql.ToColumnValues([]interface{}{1, "alice@example.com"}), // duplicate PRIMARY (normally allowed)
1428+
},
1429+
{
1430+
DatabaseName: testMysqlDatabase,
1431+
TableName: testMysqlTableName,
1432+
DML: binlog.InsertDML,
1433+
NewColumnValues: sql.ToColumnValues([]interface{}{4, "alice@example.com"}), // duplicate email (FAILS)
1434+
},
1435+
{
1436+
DatabaseName: testMysqlDatabase,
1437+
TableName: testMysqlTableName,
1438+
DML: binlog.InsertDML,
1439+
NewColumnValues: sql.ToColumnValues([]interface{}{2, "bob@example.com"}), // valid insert (would succeed)
1440+
},
1441+
}
1442+
1443+
// Should fail due to the second event
1444+
err = applier.ApplyDMLEventQueries(dmlEvents)
1445+
suite.Require().Error(err)
1446+
suite.Require().Contains(err.Error(), "Duplicate entry")
1447+
1448+
// Verify that the entire batch was rolled back - still only the original 2 rows
1449+
// Critically: id=2 (bob@example.com) from event #3 should NOT be present
1450+
rows, err := suite.db.Query("SELECT id, email FROM " + getTestGhostTableName() + " ORDER BY id")
1451+
suite.Require().NoError(err)
1452+
defer rows.Close()
1453+
1454+
var results []struct {
1455+
id int
1456+
email string
1457+
}
1458+
for rows.Next() {
1459+
var id int
1460+
var email string
1461+
err = rows.Scan(&id, &email)
1462+
suite.Require().NoError(err)
1463+
results = append(results, struct {
1464+
id int
1465+
email string
1466+
}{id, email})
1467+
}
1468+
suite.Require().NoError(rows.Err())
1469+
1470+
// Should still have exactly 2 original rows (entire batch was rolled back)
1471+
// This proves that even event #3 (which would have succeeded) was rolled back
1472+
suite.Require().Len(results, 2)
1473+
suite.Require().Equal(1, results[0].id)
1474+
suite.Require().Equal("alice@example.com", results[0].email)
1475+
suite.Require().Equal(3, results[1].id)
1476+
suite.Require().Equal("charlie@example.com", results[1].email)
1477+
// Critically: id=2 (bob@example.com) is NOT present, proving event #3 was rolled back
1478+
}
1479+
13701480
func TestApplier(t *testing.T) {
13711481
suite.Run(t, new(ApplierTestSuite))
13721482
}

0 commit comments

Comments
 (0)