@@ -305,7 +305,32 @@ func (this *Applier) AttemptInstantDDL() error {
305305 return err
306306 }
307307 // We don't need a trx, because for instant DDL the SQL mode doesn't matter.
308- _ , err := this .db .Exec (query )
308+ return retryOnLockWaitTimeout (func () error {
309+ _ , err := this .db .Exec (query )
310+ return err
311+ }, this .migrationContext .Log )
312+ }
313+
314+ // retryOnLockWaitTimeout retries the given operation on MySQL lock wait timeout
315+ // (errno 1205). Non-timeout errors return immediately. This is used for instant
316+ // DDL attempts where the operation may be blocked by a long-running transaction.
317+ func retryOnLockWaitTimeout (operation func () error , logger base.Logger ) error {
318+ const maxRetries = 5
319+ var err error
320+ for i := 0 ; i < maxRetries ; i ++ {
321+ if i != 0 {
322+ logger .Infof ("Retrying after lock wait timeout (attempt %d/%d)" , i + 1 , maxRetries )
323+ RetrySleepFn (time .Duration (i ) * 5 * time .Second )
324+ }
325+ err = operation ()
326+ if err == nil {
327+ return nil
328+ }
329+ var mysqlErr * drivermysql.MySQLError
330+ if ! errors .As (err , & mysqlErr ) || mysqlErr .Number != 1205 {
331+ return err
332+ }
333+ }
309334 return err
310335}
311336
@@ -1497,6 +1522,107 @@ func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) []*dmlB
14971522 return []* dmlBuildResult {newDmlBuildResultError (fmt .Errorf ("Unknown dml event type: %+v" , dmlEvent .DML ))}
14981523}
14991524
1525+ // executeBatchWithWarningChecking executes a batch of DML statements with SHOW WARNINGS
1526+ // interleaved after each statement to detect warnings from any statement in the batch.
1527+ // This is used when PanicOnWarnings is enabled to ensure warnings from middle statements
1528+ // are not lost (SHOW WARNINGS only shows warnings from the last statement in a multi-statement batch).
1529+ func (this * Applier ) executeBatchWithWarningChecking (ctx context.Context , tx * gosql.Tx , buildResults []* dmlBuildResult ) (int64 , error ) {
1530+ // Build query with interleaved SHOW WARNINGS: stmt1; SHOW WARNINGS; stmt2; SHOW WARNINGS; ...
1531+ var queryBuilder strings.Builder
1532+ args := make ([]interface {}, 0 )
1533+
1534+ for _ , buildResult := range buildResults {
1535+ queryBuilder .WriteString (buildResult .query )
1536+ queryBuilder .WriteString (";\n SHOW WARNINGS;\n " )
1537+ args = append (args , buildResult .args ... )
1538+ }
1539+
1540+ query := queryBuilder .String ()
1541+
1542+ // Execute the multi-statement query
1543+ rows , err := tx .QueryContext (ctx , query , args ... )
1544+ if err != nil {
1545+ return 0 , fmt .Errorf ("%w; query=%s; args=%+v" , err , query , args )
1546+ }
1547+ defer rows .Close ()
1548+
1549+ var totalDelta int64
1550+
1551+ // QueryContext with multi-statement queries returns rows positioned at the first result set
1552+ // that produces rows (i.e., the first SHOW WARNINGS), automatically skipping DML results.
1553+ // Verify we're at a SHOW WARNINGS result set (should have 3 columns: Level, Code, Message)
1554+ cols , err := rows .Columns ()
1555+ if err != nil {
1556+ return 0 , fmt .Errorf ("failed to get columns: %w" , err )
1557+ }
1558+
1559+ // If somehow we're not at a result set with columns, try to advance
1560+ if len (cols ) == 0 {
1561+ if ! rows .NextResultSet () {
1562+ return 0 , fmt .Errorf ("expected SHOW WARNINGS result set after first statement" )
1563+ }
1564+ }
1565+
1566+ // Compile regex once before loop to avoid performance penalty and handle errors properly
1567+ migrationKeyRegex , err := this .compileMigrationKeyWarningRegex ()
1568+ if err != nil {
1569+ return 0 , err
1570+ }
1571+
1572+ // Iterate through SHOW WARNINGS result sets.
1573+ // DML statements don't create navigable result sets, so we move directly between SHOW WARNINGS.
1574+ // Pattern: [at SHOW WARNINGS #1] -> read warnings -> NextResultSet() -> [at SHOW WARNINGS #2] -> ...
1575+ for i := 0 ; i < len (buildResults ); i ++ {
1576+ // We can't get exact rows affected with QueryContext (needed for reading SHOW WARNINGS).
1577+ // Use the theoretical delta (+1 for INSERT, -1 for DELETE, 0 for UPDATE) as an approximation.
1578+ // This may be inaccurate (e.g., INSERT IGNORE with duplicate affects 0 rows but we count +1).
1579+ totalDelta += buildResults [i ].rowsDelta
1580+
1581+ // Read warnings from this statement's SHOW WARNINGS result set
1582+ var sqlWarnings []string
1583+ for rows .Next () {
1584+ var level , message string
1585+ var code int
1586+ if err := rows .Scan (& level , & code , & message ); err != nil {
1587+ // Scan failure means we cannot reliably read warnings.
1588+ // Since PanicOnWarnings is a safety feature, we must fail hard rather than silently skip.
1589+ return 0 , fmt .Errorf ("failed to scan SHOW WARNINGS for statement %d: %w" , i + 1 , err )
1590+ }
1591+
1592+ if strings .Contains (message , "Duplicate entry" ) && migrationKeyRegex .MatchString (message ) {
1593+ // Duplicate entry on migration unique key is expected during binlog replay
1594+ // (row was already copied during bulk copy phase)
1595+ continue
1596+ }
1597+ sqlWarnings = append (sqlWarnings , fmt .Sprintf ("%s: %s (%d)" , level , message , code ))
1598+ }
1599+
1600+ // Check for errors that occurred while iterating through warnings
1601+ if err := rows .Err (); err != nil {
1602+ return 0 , fmt .Errorf ("error reading SHOW WARNINGS result set for statement %d: %w" , i + 1 , err )
1603+ }
1604+
1605+ if len (sqlWarnings ) > 0 {
1606+ return 0 , fmt .Errorf ("warnings detected in statement %d of %d: %v" , i + 1 , len (buildResults ), sqlWarnings )
1607+ }
1608+
1609+ // Move to the next statement's SHOW WARNINGS result set
1610+ // For the last statement, there's no next result set
1611+ // DML statements don't create result sets, so we only need one NextResultSet call
1612+ // to move from SHOW WARNINGS #N to SHOW WARNINGS #(N+1)
1613+ if i < len (buildResults )- 1 {
1614+ if ! rows .NextResultSet () {
1615+ if err := rows .Err (); err != nil {
1616+ return 0 , fmt .Errorf ("error moving to SHOW WARNINGS for statement %d: %w" , i + 2 , err )
1617+ }
1618+ return 0 , fmt .Errorf ("expected SHOW WARNINGS result set for statement %d" , i + 2 )
1619+ }
1620+ }
1621+ }
1622+
1623+ return totalDelta , nil
1624+ }
1625+
15001626// ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table
15011627func (this * Applier ) ApplyDMLEventQueries (dmlEvents [](* binlog.BinlogDMLEvent )) error {
15021628 var totalDelta int64
@@ -1536,82 +1662,52 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent))
15361662 }
15371663 }
15381664
1539- // We batch together the DML queries into multi-statements to minimize network trips.
1540- // We have to use the raw driver connection to access the rows affected
1541- // for each statement in the multi-statement.
1542- execErr := conn .Raw (func (driverConn any ) error {
1543- ex := driverConn .(driver.ExecerContext )
1544- nvc := driverConn .(driver.NamedValueChecker )
1545-
1546- multiArgs := make ([]driver.NamedValue , 0 , nArgs )
1547- multiQueryBuilder := strings.Builder {}
1548- for _ , buildResult := range buildResults {
1549- for _ , arg := range buildResult .args {
1550- nv := driver.NamedValue {Value : driver .Value (arg )}
1551- nvc .CheckNamedValue (& nv )
1552- multiArgs = append (multiArgs , nv )
1553- }
1554-
1555- multiQueryBuilder .WriteString (buildResult .query )
1556- multiQueryBuilder .WriteString (";\n " )
1557- }
1558-
1559- res , err := ex .ExecContext (ctx , multiQueryBuilder .String (), multiArgs )
1560- if err != nil {
1561- err = fmt .Errorf ("%w; query=%s; args=%+v" , err , multiQueryBuilder .String (), multiArgs )
1562- return err
1563- }
1564-
1565- mysqlRes := res .(drivermysql.Result )
1566-
1567- // each DML is either a single insert (delta +1), update (delta +0) or delete (delta -1).
1568- // multiplying by the rows actually affected (either 0 or 1) will give an accurate row delta for this DML event
1569- for i , rowsAffected := range mysqlRes .AllRowsAffected () {
1570- totalDelta += buildResults [i ].rowsDelta * rowsAffected
1571- }
1572- return nil
1573- })
1574-
1575- if execErr != nil {
1576- return rollback (execErr )
1577- }
1578-
1579- // Check for warnings when PanicOnWarnings is enabled
1665+ // When PanicOnWarnings is enabled, we need to check warnings after each statement
1666+ // in the batch. SHOW WARNINGS only shows warnings from the last statement in a
1667+ // multi-statement query, so we interleave SHOW WARNINGS after each DML statement.
15801668 if this .migrationContext .PanicOnWarnings {
1581- //nolint:execinquery
1582- rows , err := tx .Query ("SHOW WARNINGS" )
1583- if err != nil {
1584- return rollback (err )
1585- }
1586- defer rows .Close ()
1587- if err = rows .Err (); err != nil {
1588- return rollback (err )
1589- }
1590-
1591- // Compile regex once before loop to avoid performance penalty and handle errors properly
1592- migrationKeyRegex , err := this .compileMigrationKeyWarningRegex ()
1669+ totalDelta , err = this .executeBatchWithWarningChecking (ctx , tx , buildResults )
15931670 if err != nil {
15941671 return rollback (err )
15951672 }
1673+ } else {
1674+ // Fast path: batch together DML queries into multi-statements to minimize network trips.
1675+ // We use the raw driver connection to access the rows affected for each statement.
1676+ execErr := conn .Raw (func (driverConn any ) error {
1677+ ex := driverConn .(driver.ExecerContext )
1678+ nvc := driverConn .(driver.NamedValueChecker )
1679+
1680+ multiArgs := make ([]driver.NamedValue , 0 , nArgs )
1681+ multiQueryBuilder := strings.Builder {}
1682+ for _ , buildResult := range buildResults {
1683+ for _ , arg := range buildResult .args {
1684+ nv := driver.NamedValue {Value : driver .Value (arg )}
1685+ nvc .CheckNamedValue (& nv )
1686+ multiArgs = append (multiArgs , nv )
1687+ }
1688+
1689+ multiQueryBuilder .WriteString (buildResult .query )
1690+ multiQueryBuilder .WriteString (";\n " )
1691+ }
15961692
1597- var sqlWarnings []string
1598- for rows .Next () {
1599- var level , message string
1600- var code int
1601- if err := rows .Scan (& level , & code , & message ); err != nil {
1602- this .migrationContext .Log .Warningf ("Failed to read SHOW WARNINGS row" )
1603- continue
1693+ res , err := ex .ExecContext (ctx , multiQueryBuilder .String (), multiArgs )
1694+ if err != nil {
1695+ err = fmt .Errorf ("%w; query=%s; args=%+v" , err , multiQueryBuilder .String (), multiArgs )
1696+ return err
16041697 }
1605- if strings .Contains (message , "Duplicate entry" ) && migrationKeyRegex .MatchString (message ) {
1606- // Duplicate entry on migration unique key is expected during binlog replay
1607- // (row was already copied during bulk copy phase)
1608- continue
1698+
1699+ mysqlRes := res .(drivermysql.Result )
1700+
1701+ // each DML is either a single insert (delta +1), update (delta +0) or delete (delta -1).
1702+ // multiplying by the rows actually affected (either 0 or 1) will give an accurate row delta for this DML event
1703+ for i , rowsAffected := range mysqlRes .AllRowsAffected () {
1704+ totalDelta += buildResults [i ].rowsDelta * rowsAffected
16091705 }
1610- sqlWarnings = append ( sqlWarnings , fmt . Sprintf ( "%s: %s (%d)" , level , message , code ))
1611- }
1612- if len ( sqlWarnings ) > 0 {
1613- warningMsg := fmt . Sprintf ( "Warnings detected during DML event application: %v" , sqlWarnings )
1614- return rollback (errors . New ( warningMsg ) )
1706+ return nil
1707+ })
1708+
1709+ if execErr != nil {
1710+ return rollback (execErr )
16151711 }
16161712 }
16171713
0 commit comments