Skip to content

Commit 17a6d5a

Browse files
authored
Merge branch 'master' into varun/fix-alleventsuptolockprocessed-deadlock
2 parents 83a6830 + 67cc636 commit 17a6d5a

18 files changed

Lines changed: 1237 additions & 122 deletions

File tree

.github/CONTRIBUTING.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,24 @@ Here are a few things you can do that will increase the likelihood of your pull
1919
- Keep your change as focused as possible. If there are multiple changes you would like to make that are not dependent upon each other, consider submitting them as separate pull requests.
2020
- Write a [good commit message](http://tbaggery.com/2008/04/19/a-note-about-git-commit-messages.html).
2121

22+
## Development Guidelines
23+
24+
### Channel Safety
25+
26+
When working with channels in goroutines, it's critical to prevent deadlocks that can occur when a channel receiver exits due to an error while senders are still trying to send values. Always use `base.SendWithContext` for channel sends to avoid deadlocks:
27+
28+
```go
29+
// ✅ CORRECT - Uses helper to prevent deadlock
30+
if err := base.SendWithContext(ctx, ch, value); err != nil {
31+
return err // context was cancelled
32+
}
33+
34+
// ❌ WRONG - Can deadlock if receiver exits
35+
ch <- value
36+
```
37+
38+
Even if the destination channel is buffered, deadlocks could still occur if the buffer fills up and the receiver exits, so it's important to use `SendWithContext` in those cases as well.
39+
2240
## Resources
2341

2442
- [Contributing to Open Source on GitHub](https://guides.github.com/activities/contributing-to-open-source/)

.github/workflows/replica-tests.yml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,20 @@ jobs:
2828
- name: Run tests
2929
run: script/docker-gh-ost-replica-tests run
3030

31+
- name: Set artifact name
32+
if: failure()
33+
run: |
34+
ARTIFACT_NAME=$(echo "${{ matrix.image }}" | tr '/:' '-')
35+
echo "ARTIFACT_NAME=test-logs-${ARTIFACT_NAME}" >> $GITHUB_ENV
36+
37+
- name: Upload test logs on failure
38+
if: failure()
39+
uses: actions/upload-artifact@v4
40+
with:
41+
name: ${{ env.ARTIFACT_NAME }}
42+
path: /tmp/gh-ost-test.*
43+
retention-days: 7
44+
3145
- name: Teardown environment
3246
if: always()
3347
run: script/docker-gh-ost-replica-tests down

go/base/context.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package base
77

88
import (
9+
"context"
910
"fmt"
1011
"math"
1112
"os"
@@ -225,6 +226,16 @@ type MigrationContext struct {
225226
InCutOverCriticalSectionFlag int64
226227
PanicAbort chan error
227228

229+
// Context for cancellation signaling across all goroutines
230+
// Stored in struct as it spans the entire migration lifecycle, not per-function.
231+
// context.Context is safe for concurrent use by multiple goroutines.
232+
ctx context.Context //nolint:containedctx
233+
cancelFunc context.CancelFunc
234+
235+
// Stores the fatal error that triggered abort
236+
AbortError error
237+
abortMutex *sync.Mutex
238+
228239
OriginalTableColumnsOnApplier *sql.ColumnList
229240
OriginalTableColumns *sql.ColumnList
230241
OriginalTableVirtualColumns *sql.ColumnList
@@ -293,6 +304,7 @@ type ContextConfig struct {
293304
}
294305

295306
func NewMigrationContext() *MigrationContext {
307+
ctx, cancelFunc := context.WithCancel(context.Background())
296308
return &MigrationContext{
297309
Uuid: uuid.NewString(),
298310
defaultNumRetries: 60,
@@ -313,6 +325,9 @@ func NewMigrationContext() *MigrationContext {
313325
lastHeartbeatOnChangelogMutex: &sync.Mutex{},
314326
ColumnRenameMap: make(map[string]string),
315327
PanicAbort: make(chan error),
328+
ctx: ctx,
329+
cancelFunc: cancelFunc,
330+
abortMutex: &sync.Mutex{},
316331
Log: NewDefaultLogger(),
317332
}
318333
}
@@ -982,3 +997,54 @@ func (this *MigrationContext) GetGhostTriggerName(triggerName string) string {
982997
func (this *MigrationContext) ValidateGhostTriggerLengthBelowMaxLength(triggerName string) bool {
983998
return utf8.RuneCountInString(triggerName) <= mysql.MaxTableNameLength
984999
}
1000+
1001+
// GetContext returns the migration context for cancellation checking
1002+
func (this *MigrationContext) GetContext() context.Context {
1003+
return this.ctx
1004+
}
1005+
1006+
// SetAbortError stores the fatal error that triggered abort
1007+
// Only the first error is stored (subsequent errors are ignored)
1008+
func (this *MigrationContext) SetAbortError(err error) {
1009+
this.abortMutex.Lock()
1010+
defer this.abortMutex.Unlock()
1011+
if this.AbortError == nil {
1012+
this.AbortError = err
1013+
}
1014+
}
1015+
1016+
// GetAbortError retrieves the stored abort error
1017+
func (this *MigrationContext) GetAbortError() error {
1018+
this.abortMutex.Lock()
1019+
defer this.abortMutex.Unlock()
1020+
return this.AbortError
1021+
}
1022+
1023+
// CancelContext cancels the migration context to signal all goroutines to stop
1024+
// The cancel function is safe to call multiple times and from multiple goroutines.
1025+
func (this *MigrationContext) CancelContext() {
1026+
if this.cancelFunc != nil {
1027+
this.cancelFunc()
1028+
}
1029+
}
1030+
1031+
// SendWithContext attempts to send a value to a channel, but returns early
1032+
// if the context is cancelled. This prevents goroutine deadlocks when the
1033+
// channel receiver has exited due to an error.
1034+
//
1035+
// Use this instead of bare channel sends (ch <- val) in goroutines to ensure
1036+
// proper cleanup when the migration is aborted.
1037+
//
1038+
// Example:
1039+
//
1040+
// if err := base.SendWithContext(ctx, ch, value); err != nil {
1041+
// return err // context was cancelled
1042+
// }
1043+
func SendWithContext[T any](ctx context.Context, ch chan<- T, val T) error {
1044+
select {
1045+
case ch <- val:
1046+
return nil
1047+
case <-ctx.Done():
1048+
return ctx.Err()
1049+
}
1050+
}

go/base/context_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@
66
package base
77

88
import (
9+
"errors"
910
"os"
1011
"strings"
12+
"sync"
1113
"testing"
1214
"time"
1315

@@ -213,3 +215,58 @@ func TestReadConfigFile(t *testing.T) {
213215
}
214216
}
215217
}
218+
219+
func TestSetAbortError_StoresFirstError(t *testing.T) {
220+
ctx := NewMigrationContext()
221+
222+
err1 := errors.New("first error")
223+
err2 := errors.New("second error")
224+
225+
ctx.SetAbortError(err1)
226+
ctx.SetAbortError(err2)
227+
228+
got := ctx.GetAbortError()
229+
if got != err1 { //nolint:errorlint // Testing pointer equality for sentinel error
230+
t.Errorf("Expected first error %v, got %v", err1, got)
231+
}
232+
}
233+
234+
func TestSetAbortError_ThreadSafe(t *testing.T) {
235+
ctx := NewMigrationContext()
236+
237+
var wg sync.WaitGroup
238+
errs := []error{
239+
errors.New("error 1"),
240+
errors.New("error 2"),
241+
errors.New("error 3"),
242+
}
243+
244+
// Launch 3 goroutines trying to set error concurrently
245+
for _, err := range errs {
246+
wg.Add(1)
247+
go func(e error) {
248+
defer wg.Done()
249+
ctx.SetAbortError(e)
250+
}(err)
251+
}
252+
253+
wg.Wait()
254+
255+
// Should store exactly one of the errors
256+
got := ctx.GetAbortError()
257+
if got == nil {
258+
t.Fatal("Expected error to be stored, got nil")
259+
}
260+
261+
// Verify it's one of the errors we sent
262+
found := false
263+
for _, err := range errs {
264+
if got == err { //nolint:errorlint // Testing pointer equality for sentinel error
265+
found = true
266+
break
267+
}
268+
}
269+
if !found {
270+
t.Errorf("Stored error %v not in list of sent errors", got)
271+
}
272+
}

go/logic/applier.go

Lines changed: 42 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,21 @@ func NewApplier(migrationContext *base.MigrationContext) *Applier {
9494
}
9595
}
9696

97+
// compileMigrationKeyWarningRegex compiles a regex pattern that matches duplicate key warnings
98+
// for the migration's unique key. Duplicate warnings are formatted differently across MySQL versions,
99+
// hence the optional table name prefix. Metacharacters in table/index names are escaped to avoid
100+
// regex syntax errors.
101+
func (this *Applier) compileMigrationKeyWarningRegex() (*regexp.Regexp, error) {
102+
escapedTable := regexp.QuoteMeta(this.migrationContext.GetGhostTableName())
103+
escapedKey := regexp.QuoteMeta(this.migrationContext.UniqueKey.NameInGhostTable)
104+
migrationUniqueKeyPattern := fmt.Sprintf(`for key '(%s\.)?%s'`, escapedTable, escapedKey)
105+
migrationKeyRegex, err := regexp.Compile(migrationUniqueKeyPattern)
106+
if err != nil {
107+
return nil, fmt.Errorf("failed to compile migration key pattern: %w", err)
108+
}
109+
return migrationKeyRegex, nil
110+
}
111+
97112
func (this *Applier) InitDBConnections() (err error) {
98113
applierUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName)
99114
uriWithMulti := fmt.Sprintf("%s&multiStatements=true", applierUri)
@@ -695,7 +710,17 @@ func (this *Applier) InitiateHeartbeat() {
695710

696711
ticker := time.NewTicker(time.Duration(this.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond)
697712
defer ticker.Stop()
698-
for range ticker.C {
713+
for {
714+
// Check for context cancellation each iteration
715+
ctx := this.migrationContext.GetContext()
716+
select {
717+
case <-ctx.Done():
718+
this.migrationContext.Log.Debugf("Heartbeat injection cancelled")
719+
return
720+
case <-ticker.C:
721+
// Process heartbeat
722+
}
723+
699724
if atomic.LoadInt64(&this.finishedMigrating) > 0 {
700725
return
701726
}
@@ -706,7 +731,8 @@ func (this *Applier) InitiateHeartbeat() {
706731
continue
707732
}
708733
if err := injectHeartbeat(); err != nil {
709-
this.migrationContext.PanicAbort <- fmt.Errorf("injectHeartbeat writing failed %d times, last error: %w", numSuccessiveFailures, err)
734+
// Use helper to prevent deadlock if listenOnPanicAbort already exited
735+
_ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, fmt.Errorf("injectHeartbeat writing failed %d times, last error: %w", numSuccessiveFailures, err))
710736
return
711737
}
712738
}
@@ -917,6 +943,12 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
917943
return nil, err
918944
}
919945

946+
// Compile regex once before loop to avoid performance penalty and handle errors properly
947+
migrationKeyRegex, err := this.compileMigrationKeyWarningRegex()
948+
if err != nil {
949+
return nil, err
950+
}
951+
920952
var sqlWarnings []string
921953
for rows.Next() {
922954
var level, message string
@@ -925,10 +957,7 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
925957
this.migrationContext.Log.Warningf("Failed to read SHOW WARNINGS row")
926958
continue
927959
}
928-
// Duplicate warnings are formatted differently across mysql versions, hence the optional table name prefix
929-
migrationUniqueKeyExpression := fmt.Sprintf("for key '(%s\\.)?%s'", this.migrationContext.GetGhostTableName(), this.migrationContext.UniqueKey.NameInGhostTable)
930-
matched, _ := regexp.MatchString(migrationUniqueKeyExpression, message)
931-
if strings.Contains(message, "Duplicate entry") && matched {
960+
if strings.Contains(message, "Duplicate entry") && migrationKeyRegex.MatchString(message) {
932961
continue
933962
}
934963
sqlWarnings = append(sqlWarnings, fmt.Sprintf("%s: %s (%d)", level, message, code))
@@ -1559,6 +1588,12 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent))
15591588
return rollback(err)
15601589
}
15611590

1591+
// Compile regex once before loop to avoid performance penalty and handle errors properly
1592+
migrationKeyRegex, err := this.compileMigrationKeyWarningRegex()
1593+
if err != nil {
1594+
return rollback(err)
1595+
}
1596+
15621597
var sqlWarnings []string
15631598
for rows.Next() {
15641599
var level, message string
@@ -1567,10 +1602,7 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent))
15671602
this.migrationContext.Log.Warningf("Failed to read SHOW WARNINGS row")
15681603
continue
15691604
}
1570-
// Duplicate warnings are formatted differently across mysql versions, hence the optional table name prefix
1571-
migrationUniqueKeyExpression := fmt.Sprintf("for key '(%s\\.)?%s'", this.migrationContext.GetGhostTableName(), this.migrationContext.UniqueKey.NameInGhostTable)
1572-
matched, _ := regexp.MatchString(migrationUniqueKeyExpression, message)
1573-
if strings.Contains(message, "Duplicate entry") && matched {
1605+
if strings.Contains(message, "Duplicate entry") && migrationKeyRegex.MatchString(message) {
15741606
// Duplicate entry on migration unique key is expected during binlog replay
15751607
// (row was already copied during bulk copy phase)
15761608
continue

0 commit comments

Comments
 (0)