From 81ec000db880f4ada7735476c4496c727ba908ea Mon Sep 17 00:00:00 2001 From: Gabriel Gilder Date: Fri, 27 Feb 2026 21:27:10 -0800 Subject: [PATCH 01/12] Replace log.Fatale with context-based error propagation This patch modifies gh-ost to use a cancellable context instead of log.Fatale() in listenOnPanicAbort. When using gh-ost as a library, this allows the calling application to recover from aborts (e.g. log the failure reason) instead of having the entire process terminate via os.Exit(). Now we store the error and cancel a context to signal all goroutines to stop gracefully. --- go/base/context.go | 45 ++++++ go/base/context_test.go | 57 ++++++++ go/logic/applier.go | 19 ++- go/logic/migrator.go | 111 +++++++++++++-- go/logic/migrator_test.go | 281 ++++++++++++++++++++++++++++++++++++++ go/logic/server.go | 7 +- go/logic/streamer.go | 7 + go/logic/throttler.go | 32 ++++- 8 files changed, 544 insertions(+), 15 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index 891e27fef..1b795658e 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -6,6 +6,7 @@ package base import ( + "context" "fmt" "math" "os" @@ -225,6 +226,16 @@ type MigrationContext struct { InCutOverCriticalSectionFlag int64 PanicAbort chan error + // Context for cancellation signaling across all goroutines + // Stored in struct as it spans the entire migration lifecycle, not per-function. + // context.Context is safe for concurrent use by multiple goroutines. + ctx context.Context //nolint:containedctx + cancelFunc context.CancelFunc + + // Stores the fatal error that triggered abort + AbortError error + abortMutex *sync.Mutex + OriginalTableColumnsOnApplier *sql.ColumnList OriginalTableColumns *sql.ColumnList OriginalTableVirtualColumns *sql.ColumnList @@ -293,6 +304,7 @@ type ContextConfig struct { } func NewMigrationContext() *MigrationContext { + ctx, cancelFunc := context.WithCancel(context.Background()) return &MigrationContext{ Uuid: uuid.NewString(), defaultNumRetries: 60, @@ -313,6 +325,9 @@ func NewMigrationContext() *MigrationContext { lastHeartbeatOnChangelogMutex: &sync.Mutex{}, ColumnRenameMap: make(map[string]string), PanicAbort: make(chan error), + ctx: ctx, + cancelFunc: cancelFunc, + abortMutex: &sync.Mutex{}, Log: NewDefaultLogger(), } } @@ -982,3 +997,33 @@ func (this *MigrationContext) GetGhostTriggerName(triggerName string) string { func (this *MigrationContext) ValidateGhostTriggerLengthBelowMaxLength(triggerName string) bool { return utf8.RuneCountInString(triggerName) <= mysql.MaxTableNameLength } + +// GetContext returns the migration context for cancellation checking +func (this *MigrationContext) GetContext() context.Context { + return this.ctx +} + +// SetAbortError stores the fatal error that triggered abort +// Only the first error is stored (subsequent errors are ignored) +func (this *MigrationContext) SetAbortError(err error) { + this.abortMutex.Lock() + defer this.abortMutex.Unlock() + if this.AbortError == nil { + this.AbortError = err + } +} + +// GetAbortError retrieves the stored abort error +func (this *MigrationContext) GetAbortError() error { + this.abortMutex.Lock() + defer this.abortMutex.Unlock() + return this.AbortError +} + +// CancelContext cancels the migration context to signal all goroutines to stop +// The cancel function is safe to call multiple times and from multiple goroutines. +func (this *MigrationContext) CancelContext() { + if this.cancelFunc != nil { + this.cancelFunc() + } +} diff --git a/go/base/context_test.go b/go/base/context_test.go index f8bce6f27..f848f1c6a 100644 --- a/go/base/context_test.go +++ b/go/base/context_test.go @@ -6,8 +6,10 @@ package base import ( + "errors" "os" "strings" + "sync" "testing" "time" @@ -213,3 +215,58 @@ func TestReadConfigFile(t *testing.T) { } } } + +func TestSetAbortError_StoresFirstError(t *testing.T) { + ctx := NewMigrationContext() + + err1 := errors.New("first error") + err2 := errors.New("second error") + + ctx.SetAbortError(err1) + ctx.SetAbortError(err2) + + got := ctx.GetAbortError() + if got != err1 { //nolint:errorlint // Testing pointer equality for sentinel error + t.Errorf("Expected first error %v, got %v", err1, got) + } +} + +func TestSetAbortError_ThreadSafe(t *testing.T) { + ctx := NewMigrationContext() + + var wg sync.WaitGroup + errors := []error{ + errors.New("error 1"), + errors.New("error 2"), + errors.New("error 3"), + } + + // Launch 3 goroutines trying to set error concurrently + for _, err := range errors { + wg.Add(1) + go func(e error) { + defer wg.Done() + ctx.SetAbortError(e) + }(err) + } + + wg.Wait() + + // Should store exactly one of the errors + got := ctx.GetAbortError() + if got == nil { + t.Fatal("Expected error to be stored, got nil") + } + + // Verify it's one of the errors we sent + found := false + for _, err := range errors { + if got == err { //nolint:errorlint // Testing pointer equality for sentinel error + found = true + break + } + } + if !found { + t.Errorf("Stored error %v not in list of sent errors", got) + } +} diff --git a/go/logic/applier.go b/go/logic/applier.go index 4e9d74cd2..e3a7927e1 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -695,7 +695,17 @@ func (this *Applier) InitiateHeartbeat() { ticker := time.NewTicker(time.Duration(this.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond) defer ticker.Stop() - for range ticker.C { + for { + // Check for context cancellation each iteration + ctx := this.migrationContext.GetContext() + select { + case <-ctx.Done(): + this.migrationContext.Log.Debugf("Heartbeat injection cancelled") + return + case <-ticker.C: + // Process heartbeat + } + if atomic.LoadInt64(&this.finishedMigrating) > 0 { return } @@ -706,7 +716,12 @@ func (this *Applier) InitiateHeartbeat() { continue } if err := injectHeartbeat(); err != nil { - this.migrationContext.PanicAbort <- fmt.Errorf("injectHeartbeat writing failed %d times, last error: %w", numSuccessiveFailures, err) + select { + case this.migrationContext.PanicAbort <- fmt.Errorf("injectHeartbeat writing failed %d times, last error: %w", numSuccessiveFailures, err): + // Error sent successfully + case <-this.migrationContext.GetContext().Done(): + // Context cancelled, someone else already reported an error + } return } } diff --git a/go/logic/migrator.go b/go/logic/migrator.go index aa9a97c1c..7c5f3cc3c 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -163,7 +163,12 @@ func (this *Migrator) retryOperation(operation func() error, notFatalHint ...boo // there's an error. Let's try again. } if len(notFatalHint) == 0 { - this.migrationContext.PanicAbort <- err + select { + case this.migrationContext.PanicAbort <- err: + // Error sent successfully + case <-this.migrationContext.GetContext().Done(): + // Context cancelled, someone else already reported an error + } } return err } @@ -191,7 +196,12 @@ func (this *Migrator) retryOperationWithExponentialBackoff(operation func() erro } } if len(notFatalHint) == 0 { - this.migrationContext.PanicAbort <- err + select { + case this.migrationContext.PanicAbort <- err: + // Error sent successfully + case <-this.migrationContext.GetContext().Done(): + // Context cancelled, someone else already reported an error + } } return err } @@ -200,14 +210,24 @@ func (this *Migrator) retryOperationWithExponentialBackoff(operation func() erro // consumes and drops any further incoming events that may be left hanging. func (this *Migrator) consumeRowCopyComplete() { if err := <-this.rowCopyComplete; err != nil { - this.migrationContext.PanicAbort <- err + select { + case this.migrationContext.PanicAbort <- err: + // Error sent successfully + case <-this.migrationContext.GetContext().Done(): + // Context cancelled, someone else already reported an error + } } atomic.StoreInt64(&this.rowCopyCompleteFlag, 1) this.migrationContext.MarkRowCopyEndTime() go func() { for err := range this.rowCopyComplete { if err != nil { - this.migrationContext.PanicAbort <- err + select { + case this.migrationContext.PanicAbort <- err: + // Error sent successfully + case <-this.migrationContext.GetContext().Done(): + // Context cancelled, someone else already reported an error + } } } }() @@ -277,10 +297,18 @@ func (this *Migrator) onChangelogHeartbeatEvent(dmlEntry *binlog.BinlogEntry) (e } } -// listenOnPanicAbort aborts on abort request +// listenOnPanicAbort listens for fatal errors and initiates graceful shutdown func (this *Migrator) listenOnPanicAbort() { err := <-this.migrationContext.PanicAbort - this.migrationContext.Log.Fatale(err) + + // Store the error for Migrate() to return + this.migrationContext.SetAbortError(err) + + // Cancel the context to signal all goroutines to stop + this.migrationContext.CancelContext() + + // Log the error (but don't panic or exit) + this.migrationContext.Log.Errorf("Migration aborted: %v", err) } // validateAlterStatement validates the `alter` statement meets criteria. @@ -348,10 +376,36 @@ func (this *Migrator) createFlagFiles() (err error) { return nil } +// checkAbort returns abort error if migration was aborted +func (this *Migrator) checkAbort() error { + if abortErr := this.migrationContext.GetAbortError(); abortErr != nil { + return abortErr + } + + ctx := this.migrationContext.GetContext() + if ctx != nil { + select { + case <-ctx.Done(): + // Context cancelled but no abort error stored yet + if abortErr := this.migrationContext.GetAbortError(); abortErr != nil { + return abortErr + } + return ctx.Err() + default: + // Not cancelled + } + } + return nil +} + // Migrate executes the complete migration logic. This is *the* major gh-ost function. func (this *Migrator) Migrate() (err error) { this.migrationContext.Log.Infof("Migrating %s.%s", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName)) this.migrationContext.StartTime = time.Now() + + // Ensure context is cancelled on exit (cleanup) + defer this.migrationContext.CancelContext() + if this.migrationContext.Hostname, err = os.Hostname(); err != nil { return err } @@ -375,6 +429,9 @@ func (this *Migrator) Migrate() (err error) { if err := this.initiateInspector(); err != nil { return err } + if err := this.checkAbort(); err != nil { + return err + } // If we are resuming, we will initiateStreaming later when we know // the binlog coordinates to resume streaming from. // If not resuming, the streamer must be initiated before the applier, @@ -383,10 +440,16 @@ func (this *Migrator) Migrate() (err error) { if err := this.initiateStreaming(); err != nil { return err } + if err := this.checkAbort(); err != nil { + return err + } } if err := this.initiateApplier(); err != nil { return err } + if err := this.checkAbort(); err != nil { + return err + } if err := this.createFlagFiles(); err != nil { return err } @@ -493,6 +556,10 @@ func (this *Migrator) Migrate() (err error) { this.migrationContext.Log.Debugf("Operating until row copy is complete") this.consumeRowCopyComplete() this.migrationContext.Log.Infof("Row copy complete") + // Check if row copy was aborted due to error + if err := this.migrationContext.GetAbortError(); err != nil { + return err + } if err := this.hooksExecutor.onRowCopyComplete(); err != nil { return err } @@ -532,6 +599,10 @@ func (this *Migrator) Migrate() (err error) { return err } this.migrationContext.Log.Infof("Done migrating %s.%s", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName)) + // Final check for abort before declaring success + if err := this.checkAbort(); err != nil { + return err + } return nil } @@ -543,6 +614,10 @@ func (this *Migrator) Revert() error { sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName), sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OldTableName)) this.migrationContext.StartTime = time.Now() + + // Ensure context is cancelled on exit (cleanup) + defer this.migrationContext.CancelContext() + var err error if this.migrationContext.Hostname, err = os.Hostname(); err != nil { return err @@ -561,9 +636,15 @@ func (this *Migrator) Revert() error { if err := this.initiateInspector(); err != nil { return err } + if err := this.checkAbort(); err != nil { + return err + } if err := this.initiateApplier(); err != nil { return err } + if err := this.checkAbort(); err != nil { + return err + } if err := this.createFlagFiles(); err != nil { return err } @@ -588,6 +669,9 @@ func (this *Migrator) Revert() error { if err := this.initiateStreaming(); err != nil { return err } + if err := this.checkAbort(); err != nil { + return err + } if err := this.hooksExecutor.onValidated(); err != nil { return err } @@ -1293,7 +1377,12 @@ func (this *Migrator) initiateStreaming() error { this.migrationContext.Log.Debugf("Beginning streaming") err := this.eventsStreamer.StreamEvents(this.canStopStreaming) if err != nil { - this.migrationContext.PanicAbort <- err + select { + case this.migrationContext.PanicAbort <- err: + // Error sent successfully + case <-this.migrationContext.GetContext().Done(): + // Context cancelled, someone else already reported an error + } } this.migrationContext.Log.Debugf("Done streaming") }() @@ -1413,6 +1502,9 @@ func (this *Migrator) iterateChunks() error { var hasNoFurtherRangeFlag int64 // Iterate per chunk: for { + if err := this.checkAbort(); err != nil { + return terminateRowIteration(err) + } if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 || atomic.LoadInt64(&hasNoFurtherRangeFlag) == 1 { // Done // There's another such check down the line @@ -1459,7 +1551,7 @@ func (this *Migrator) iterateChunks() error { this.migrationContext.Log.Infof("ApplyIterationInsertQuery has SQL warnings! %s", warning) } joinedWarnings := strings.Join(this.migrationContext.MigrationLastInsertSQLWarnings, "; ") - terminateRowIteration(fmt.Errorf("ApplyIterationInsertQuery failed because of SQL warnings: [%s]", joinedWarnings)) + return terminateRowIteration(fmt.Errorf("ApplyIterationInsertQuery failed because of SQL warnings: [%s]", joinedWarnings)) } } @@ -1649,6 +1741,9 @@ func (this *Migrator) executeWriteFuncs() error { return nil } for { + if err := this.checkAbort(); err != nil { + return err + } if atomic.LoadInt64(&this.finishedMigrating) > 0 { return nil } diff --git a/go/logic/migrator_test.go b/go/logic/migrator_test.go index c4fd49233..42b6fed37 100644 --- a/go/logic/migrator_test.go +++ b/go/logic/migrator_test.go @@ -931,3 +931,284 @@ func (suite *MigratorTestSuite) TestRevert() { func TestMigrator(t *testing.T) { suite.Run(t, new(MigratorTestSuite)) } + +func TestPanicAbort_PropagatesError(t *testing.T) { + migrationContext := base.NewMigrationContext() + migrator := NewMigrator(migrationContext, "1.0.0") + + // Start listenOnPanicAbort + go migrator.listenOnPanicAbort() + + // Send an error to PanicAbort + testErr := errors.New("test abort error") + go func() { + migrationContext.PanicAbort <- testErr + }() + + // Wait a bit for error to be processed + time.Sleep(100 * time.Millisecond) + + // Verify error was stored + got := migrationContext.GetAbortError() + if got != testErr { //nolint:errorlint // Testing pointer equality for sentinel error + t.Errorf("Expected error %v, got %v", testErr, got) + } + + // Verify context was cancelled + ctx := migrationContext.GetContext() + select { + case <-ctx.Done(): + // Success - context was cancelled + default: + t.Error("Expected context to be cancelled") + } +} + +func TestPanicAbort_FirstErrorWins(t *testing.T) { + migrationContext := base.NewMigrationContext() + migrator := NewMigrator(migrationContext, "1.0.0") + + // Start listenOnPanicAbort + go migrator.listenOnPanicAbort() + + // Send first error + err1 := errors.New("first error") + go func() { + migrationContext.PanicAbort <- err1 + }() + + // Wait for first error to be processed + time.Sleep(50 * time.Millisecond) + + // Try to send second error (should be ignored) + err2 := errors.New("second error") + migrationContext.SetAbortError(err2) + + // Verify only first error is stored + got := migrationContext.GetAbortError() + if got != err1 { //nolint:errorlint // Testing pointer equality for sentinel error + t.Errorf("Expected first error %v, got %v", err1, got) + } +} + +func TestAbort_AfterRowCopy(t *testing.T) { + migrationContext := base.NewMigrationContext() + migrator := NewMigrator(migrationContext, "1.0.0") + + // Start listenOnPanicAbort + go migrator.listenOnPanicAbort() + + // Give listenOnPanicAbort time to start + time.Sleep(20 * time.Millisecond) + + // Simulate row copy error by sending to rowCopyComplete in a goroutine + // (unbuffered channel, so send must be async) + testErr := errors.New("row copy failed") + go func() { + migrator.rowCopyComplete <- testErr + }() + + // Consume the error (simulating what Migrate() does) + // This is a blocking call that waits for the error + migrator.consumeRowCopyComplete() + + // Wait for the error to be processed by listenOnPanicAbort + time.Sleep(50 * time.Millisecond) + + // Check that error was stored + if got := migrationContext.GetAbortError(); got == nil { + t.Fatal("Expected abort error to be stored after row copy error") + } else if got.Error() != "row copy failed" { + t.Errorf("Expected 'row copy failed', got %v", got) + } + + // Verify context was cancelled + ctx := migrationContext.GetContext() + select { + case <-ctx.Done(): + // Success + case <-time.After(1 * time.Second): + t.Error("Expected context to be cancelled after row copy error") + } +} + +func TestAbort_DuringInspection(t *testing.T) { + migrationContext := base.NewMigrationContext() + migrator := NewMigrator(migrationContext, "1.0.0") + + // Start listenOnPanicAbort + go migrator.listenOnPanicAbort() + + // Simulate error during inspection phase + testErr := errors.New("inspection failed") + go func() { + time.Sleep(10 * time.Millisecond) + select { + case migrationContext.PanicAbort <- testErr: + case <-migrationContext.GetContext().Done(): + } + }() + + // Wait for abort to be processed + time.Sleep(50 * time.Millisecond) + + // Call checkAbort (simulating what Migrate() does after initiateInspector) + err := migrator.checkAbort() + if err == nil { + t.Fatal("Expected checkAbort to return error after abort during inspection") + } + + if err.Error() != "inspection failed" { + t.Errorf("Expected 'inspection failed', got %v", err) + } +} + +func TestAbort_DuringStreaming(t *testing.T) { + migrationContext := base.NewMigrationContext() + migrator := NewMigrator(migrationContext, "1.0.0") + + // Start listenOnPanicAbort + go migrator.listenOnPanicAbort() + + // Simulate error from streaming goroutine + testErr := errors.New("streaming error") + go func() { + time.Sleep(10 * time.Millisecond) + // Use select pattern like actual code does + select { + case migrationContext.PanicAbort <- testErr: + case <-migrationContext.GetContext().Done(): + } + }() + + // Wait for abort to be processed + time.Sleep(50 * time.Millisecond) + + // Verify error stored and context cancelled + if got := migrationContext.GetAbortError(); got == nil { + t.Fatal("Expected abort error to be stored") + } else if got.Error() != "streaming error" { + t.Errorf("Expected 'streaming error', got %v", got) + } + + // Verify checkAbort catches it + err := migrator.checkAbort() + if err == nil { + t.Fatal("Expected checkAbort to return error after streaming abort") + } +} + +func TestRetryExhaustion_TriggersAbort(t *testing.T) { + migrationContext := base.NewMigrationContext() + migrationContext.SetDefaultNumRetries(2) // Only 2 retries + migrator := NewMigrator(migrationContext, "1.0.0") + + // Start listenOnPanicAbort + go migrator.listenOnPanicAbort() + + // Operation that always fails + callCount := 0 + operation := func() error { + callCount++ + return errors.New("persistent failure") + } + + // Call retryOperation (with notFatalHint=false so it sends to PanicAbort) + err := migrator.retryOperation(operation) + + // Should have called operation MaxRetries times + if callCount != 2 { + t.Errorf("Expected 2 retry attempts, got %d", callCount) + } + + // Should return the error + if err == nil { + t.Fatal("Expected retryOperation to return error") + } + + // Wait for abort to be processed + time.Sleep(100 * time.Millisecond) + + // Verify error was sent to PanicAbort and stored + if got := migrationContext.GetAbortError(); got == nil { + t.Error("Expected abort error to be stored after retry exhaustion") + } + + // Verify context was cancelled + ctx := migrationContext.GetContext() + select { + case <-ctx.Done(): + // Success + default: + t.Error("Expected context to be cancelled after retry exhaustion") + } +} + +func TestRevert_AbortsOnError(t *testing.T) { + migrationContext := base.NewMigrationContext() + migrationContext.Revert = true + migrationContext.OldTableName = "_test_del" + migrationContext.OriginalTableName = "test" + migrationContext.DatabaseName = "testdb" + migrator := NewMigrator(migrationContext, "1.0.0") + + // Start listenOnPanicAbort + go migrator.listenOnPanicAbort() + + // Simulate error during revert + testErr := errors.New("revert failed") + go func() { + time.Sleep(10 * time.Millisecond) + select { + case migrationContext.PanicAbort <- testErr: + case <-migrationContext.GetContext().Done(): + } + }() + + // Wait for abort to be processed + time.Sleep(50 * time.Millisecond) + + // Verify checkAbort catches it + err := migrator.checkAbort() + if err == nil { + t.Fatal("Expected checkAbort to return error during revert") + } + + if err.Error() != "revert failed" { + t.Errorf("Expected 'revert failed', got %v", err) + } + + // Verify context was cancelled + ctx := migrationContext.GetContext() + select { + case <-ctx.Done(): + // Success + default: + t.Error("Expected context to be cancelled during revert abort") + } +} + +func TestCheckAbort_ReturnsNilWhenNoError(t *testing.T) { + migrationContext := base.NewMigrationContext() + migrator := NewMigrator(migrationContext, "1.0.0") + + // No error has occurred + err := migrator.checkAbort() + if err != nil { + t.Errorf("Expected no error, got %v", err) + } +} + +func TestCheckAbort_DetectsContextCancellation(t *testing.T) { + migrationContext := base.NewMigrationContext() + migrator := NewMigrator(migrationContext, "1.0.0") + + // Cancel context directly (without going through PanicAbort) + migrationContext.CancelContext() + + // checkAbort should detect the cancellation + err := migrator.checkAbort() + if err == nil { + t.Fatal("Expected checkAbort to return error when context is cancelled") + } +} diff --git a/go/logic/server.go b/go/logic/server.go index 45e5b2bd4..7471b0b48 100644 --- a/go/logic/server.go +++ b/go/logic/server.go @@ -450,7 +450,12 @@ help # This message return NoPrintStatusRule, err } err := fmt.Errorf("User commanded 'panic'. The migration will be aborted without cleanup. Please drop the gh-ost tables before trying again.") - this.migrationContext.PanicAbort <- err + select { + case this.migrationContext.PanicAbort <- err: + // Error sent successfully + case <-this.migrationContext.GetContext().Done(): + // Context cancelled, someone else already reported an error + } return NoPrintStatusRule, err } default: diff --git a/go/logic/streamer.go b/go/logic/streamer.go index 63afc3f3d..cbdb095de 100644 --- a/go/logic/streamer.go +++ b/go/logic/streamer.go @@ -187,6 +187,13 @@ func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error { var successiveFailures int var reconnectCoords mysql.BinlogCoordinates for { + // Check for context cancellation each iteration + ctx := this.migrationContext.GetContext() + select { + case <-ctx.Done(): + return nil + default: + } if canStopStreaming() { return nil } diff --git a/go/logic/throttler.go b/go/logic/throttler.go index 7a6534baf..558a6ad75 100644 --- a/go/logic/throttler.go +++ b/go/logic/throttler.go @@ -362,7 +362,12 @@ func (this *Throttler) collectGeneralThrottleMetrics() error { // Regardless of throttle, we take opportunity to check for panic-abort if this.migrationContext.PanicFlagFile != "" { if base.FileExists(this.migrationContext.PanicFlagFile) { - this.migrationContext.PanicAbort <- fmt.Errorf("Found panic-file %s. Aborting without cleanup", this.migrationContext.PanicFlagFile) + select { + case this.migrationContext.PanicAbort <- fmt.Errorf("Found panic-file %s. Aborting without cleanup", this.migrationContext.PanicFlagFile): + // Error sent successfully + case <-this.migrationContext.GetContext().Done(): + // Context cancelled, someone else already reported an error + } } } @@ -385,7 +390,12 @@ func (this *Throttler) collectGeneralThrottleMetrics() error { } if criticalLoadMet && this.migrationContext.CriticalLoadIntervalMilliseconds == 0 { - this.migrationContext.PanicAbort <- fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold) + select { + case this.migrationContext.PanicAbort <- fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold): + // Error sent successfully + case <-this.migrationContext.GetContext().Done(): + // Context cancelled, someone else already reported an error + } } if criticalLoadMet && this.migrationContext.CriticalLoadIntervalMilliseconds > 0 { this.migrationContext.Log.Errorf("critical-load met once: %s=%d, >=%d. Will check again in %d millis", variableName, value, threshold, this.migrationContext.CriticalLoadIntervalMilliseconds) @@ -393,7 +403,12 @@ func (this *Throttler) collectGeneralThrottleMetrics() error { timer := time.NewTimer(time.Millisecond * time.Duration(this.migrationContext.CriticalLoadIntervalMilliseconds)) <-timer.C if criticalLoadMetAgain, variableName, value, threshold, _ := this.criticalLoadIsMet(); criticalLoadMetAgain { - this.migrationContext.PanicAbort <- fmt.Errorf("critical-load met again after %d millis: %s=%d, >=%d", this.migrationContext.CriticalLoadIntervalMilliseconds, variableName, value, threshold) + select { + case this.migrationContext.PanicAbort <- fmt.Errorf("critical-load met again after %d millis: %s=%d, >=%d", this.migrationContext.CriticalLoadIntervalMilliseconds, variableName, value, threshold): + // Error sent successfully + case <-this.migrationContext.GetContext().Done(): + // Context cancelled, someone else already reported an error + } } }() } @@ -481,7 +496,16 @@ func (this *Throttler) initiateThrottlerChecks() { ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() - for range ticker.C { + for { + // Check for context cancellation each iteration + ctx := this.migrationContext.GetContext() + select { + case <-ctx.Done(): + return + case <-ticker.C: + // Process throttle check + } + if atomic.LoadInt64(&this.finishedMigrating) > 0 { return } From d6bc344df4284a2dd085e75f96bca937251675ac Mon Sep 17 00:00:00 2001 From: Gabriel Gilder Date: Thu, 5 Mar 2026 15:16:57 -0800 Subject: [PATCH 02/12] Fix shadowing --- go/base/context_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/go/base/context_test.go b/go/base/context_test.go index f848f1c6a..a9f62150d 100644 --- a/go/base/context_test.go +++ b/go/base/context_test.go @@ -235,14 +235,14 @@ func TestSetAbortError_ThreadSafe(t *testing.T) { ctx := NewMigrationContext() var wg sync.WaitGroup - errors := []error{ + errs := []error{ errors.New("error 1"), errors.New("error 2"), errors.New("error 3"), } // Launch 3 goroutines trying to set error concurrently - for _, err := range errors { + for _, err := range errs { wg.Add(1) go func(e error) { defer wg.Done() @@ -260,7 +260,7 @@ func TestSetAbortError_ThreadSafe(t *testing.T) { // Verify it's one of the errors we sent found := false - for _, err := range errors { + for _, err := range errs { if got == err { //nolint:errorlint // Testing pointer equality for sentinel error found = true break From 87ee906a41c0a5caf3d0c0c349035579c7d8fd94 Mon Sep 17 00:00:00 2001 From: Gabriel Gilder Date: Thu, 5 Mar 2026 15:28:00 -0800 Subject: [PATCH 03/12] Simplify non-blocking poll --- go/logic/streamer.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/go/logic/streamer.go b/go/logic/streamer.go index cbdb095de..d048ee5f6 100644 --- a/go/logic/streamer.go +++ b/go/logic/streamer.go @@ -186,13 +186,11 @@ func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error { // The next should block and execute forever, unless there's a serious error. var successiveFailures int var reconnectCoords mysql.BinlogCoordinates + ctx := this.migrationContext.GetContext() for { // Check for context cancellation each iteration - ctx := this.migrationContext.GetContext() - select { - case <-ctx.Done(): + if err := ctx.Err(); err != nil { return nil - default: } if canStopStreaming() { return nil From d85d514fefa6fed94f8b7602f4b7e5ca76154b9a Mon Sep 17 00:00:00 2001 From: Gabriel Gilder Date: Thu, 5 Mar 2026 15:31:07 -0800 Subject: [PATCH 04/12] Simplify non-blocking poll in migrator.go --- go/logic/migrator.go | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 7c5f3cc3c..45271a622 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -1655,20 +1655,18 @@ func (this *Migrator) Checkpoint(ctx context.Context) (*Checkpoint, error) { this.applier.LastIterationRangeMutex.Unlock() for { - select { - case <-ctx.Done(): - return nil, ctx.Err() - default: - this.applier.CurrentCoordinatesMutex.Lock() - if coords.SmallerThanOrEquals(this.applier.CurrentCoordinates) { - id, err := this.applier.WriteCheckpoint(chk) - chk.Id = id - this.applier.CurrentCoordinatesMutex.Unlock() - return chk, err - } + if err := ctx.Err(); err != nil { + return nil, err + } + this.applier.CurrentCoordinatesMutex.Lock() + if coords.SmallerThanOrEquals(this.applier.CurrentCoordinates) { + id, err := this.applier.WriteCheckpoint(chk) + chk.Id = id this.applier.CurrentCoordinatesMutex.Unlock() - time.Sleep(500 * time.Millisecond) + return chk, err } + this.applier.CurrentCoordinatesMutex.Unlock() + time.Sleep(500 * time.Millisecond) } } From af8def1942182ebaa7431f22039bdc79f39f44f0 Mon Sep 17 00:00:00 2001 From: Gabriel Gilder Date: Thu, 5 Mar 2026 20:29:43 -0800 Subject: [PATCH 05/12] Fix error return --- go/logic/streamer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/logic/streamer.go b/go/logic/streamer.go index d048ee5f6..1c2635138 100644 --- a/go/logic/streamer.go +++ b/go/logic/streamer.go @@ -190,7 +190,7 @@ func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error { for { // Check for context cancellation each iteration if err := ctx.Err(); err != nil { - return nil + return err } if canStopStreaming() { return nil From d74dc9bb08da9ebf4332afc1bca0e46c771046fa Mon Sep 17 00:00:00 2001 From: Gabriel Gilder Date: Mon, 9 Mar 2026 10:18:31 -0700 Subject: [PATCH 06/12] Fix hang on blocking channel send --- go/logic/migrator.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 45271a622..624262ab3 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -1574,7 +1574,16 @@ func (this *Migrator) iterateChunks() error { return nil } // Enqueue copy operation; to be executed by executeWriteFuncs() - this.copyRowsQueue <- copyRowsFunc + select { + case this.copyRowsQueue <- copyRowsFunc: + // Successfully enqueued + case <-this.migrationContext.GetContext().Done(): + // Context cancelled, check for abort and exit + if err := this.checkAbort(); err != nil { + return terminateRowIteration(err) + } + return terminateRowIteration(this.migrationContext.GetContext().Err()) + } } } From ddc2ec1a1caf76203954df1716008d7272a9cbd6 Mon Sep 17 00:00:00 2001 From: Gabriel Gilder Date: Mon, 9 Mar 2026 11:36:21 -0700 Subject: [PATCH 07/12] Add defensive fix for other potential blocking channel send deadlocks --- go/logic/migrator.go | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 624262ab3..e7fe28606 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -258,7 +258,12 @@ func (this *Migrator) onChangelogStateEvent(dmlEntry *binlog.BinlogEntry) (err e case Migrated, ReadMigrationRangeValues: // no-op event case GhostTableMigrated: - this.ghostTableMigrated <- true + select { + case this.ghostTableMigrated <- true: + // Successfully sent + case <-this.migrationContext.GetContext().Done(): + // Context cancelled, migration is aborting + } case AllEventsUpToLockProcessed: var applyEventFunc tableWriteFunc = func() error { this.allEventsUpToLockProcessed <- &lockProcessedStruct{ @@ -273,7 +278,12 @@ func (this *Migrator) onChangelogStateEvent(dmlEntry *binlog.BinlogEntry) (err e // So as not to create a potential deadlock, we write this func to applyEventsQueue // asynchronously, understanding it doesn't really matter. go func() { - this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc) + select { + case this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc): + // Successfully enqueued + case <-this.migrationContext.GetContext().Done(): + // Context cancelled, migration is aborting + } }() default: return fmt.Errorf("Unknown changelog state: %+v", changelogState) @@ -1408,8 +1418,14 @@ func (this *Migrator) addDMLEventsListener() error { this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, func(dmlEntry *binlog.BinlogEntry) error { - this.applyEventsQueue <- newApplyEventStructByDML(dmlEntry) - return nil + select { + case this.applyEventsQueue <- newApplyEventStructByDML(dmlEntry): + // Successfully enqueued + return nil + case <-this.migrationContext.GetContext().Done(): + // Context cancelled, stop processing events + return this.migrationContext.GetContext().Err() + } }, ) return err From 33eaa0beb4d3497c68ebc3f63a0aa8f2b3e38995 Mon Sep 17 00:00:00 2001 From: Gabriel Gilder Date: Mon, 9 Mar 2026 12:59:13 -0700 Subject: [PATCH 08/12] Add SendWithContext helper to avoid deadlocks --- .github/CONTRIBUTING.md | 18 +++++++++ go/base/context.go | 21 ++++++++++ go/logic/applier.go | 8 +--- go/logic/migrator.go | 86 ++++++++++++----------------------------- go/logic/server.go | 8 +--- go/logic/throttler.go | 24 +++--------- 6 files changed, 74 insertions(+), 91 deletions(-) diff --git a/.github/CONTRIBUTING.md b/.github/CONTRIBUTING.md index e681e5a1b..733cf2601 100644 --- a/.github/CONTRIBUTING.md +++ b/.github/CONTRIBUTING.md @@ -19,6 +19,24 @@ Here are a few things you can do that will increase the likelihood of your pull - Keep your change as focused as possible. If there are multiple changes you would like to make that are not dependent upon each other, consider submitting them as separate pull requests. - Write a [good commit message](http://tbaggery.com/2008/04/19/a-note-about-git-commit-messages.html). +## Development Guidelines + +### Channel Safety + +When working with channels in goroutines, it's critical to prevent deadlocks that can occur when a channel receiver exits due to an error while senders are still trying to send values. Always use `base.SendWithContext` for channel sends to avoid deadlocks: + +```go +// ✅ CORRECT - Uses helper to prevent deadlock +if err := base.SendWithContext(ctx, ch, value); err != nil { + return err // context was cancelled +} + +// ❌ WRONG - Can deadlock if receiver exits +ch <- value +``` + +Even if the destination channel is buffered, deadlocks could still occur if the buffer fills up and the receiver exits, so it's important to use `SendWithContext` in those cases as well. + ## Resources - [Contributing to Open Source on GitHub](https://guides.github.com/activities/contributing-to-open-source/) diff --git a/go/base/context.go b/go/base/context.go index 1b795658e..2c8d28d56 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -1027,3 +1027,24 @@ func (this *MigrationContext) CancelContext() { this.cancelFunc() } } + +// SendWithContext attempts to send a value to a channel, but returns early +// if the context is cancelled. This prevents goroutine deadlocks when the +// channel receiver has exited due to an error. +// +// Use this instead of bare channel sends (ch <- val) in goroutines to ensure +// proper cleanup when the migration is aborted. +// +// Example: +// +// if err := base.SendWithContext(ctx, ch, value); err != nil { +// return err // context was cancelled +// } +func SendWithContext[T any](ctx context.Context, ch chan<- T, val T) error { + select { + case ch <- val: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} diff --git a/go/logic/applier.go b/go/logic/applier.go index a6d8b57f2..6c09dd61e 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -716,12 +716,8 @@ func (this *Applier) InitiateHeartbeat() { continue } if err := injectHeartbeat(); err != nil { - select { - case this.migrationContext.PanicAbort <- fmt.Errorf("injectHeartbeat writing failed %d times, last error: %w", numSuccessiveFailures, err): - // Error sent successfully - case <-this.migrationContext.GetContext().Done(): - // Context cancelled, someone else already reported an error - } + // Use helper to prevent deadlock if listenOnPanicAbort already exited + _ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, fmt.Errorf("injectHeartbeat writing failed %d times, last error: %w", numSuccessiveFailures, err)) return } } diff --git a/go/logic/migrator.go b/go/logic/migrator.go index e7fe28606..38775fa46 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -163,12 +163,8 @@ func (this *Migrator) retryOperation(operation func() error, notFatalHint ...boo // there's an error. Let's try again. } if len(notFatalHint) == 0 { - select { - case this.migrationContext.PanicAbort <- err: - // Error sent successfully - case <-this.migrationContext.GetContext().Done(): - // Context cancelled, someone else already reported an error - } + // Use helper to prevent deadlock if listenOnPanicAbort already exited + _ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, err) } return err } @@ -196,12 +192,8 @@ func (this *Migrator) retryOperationWithExponentialBackoff(operation func() erro } } if len(notFatalHint) == 0 { - select { - case this.migrationContext.PanicAbort <- err: - // Error sent successfully - case <-this.migrationContext.GetContext().Done(): - // Context cancelled, someone else already reported an error - } + // Use helper to prevent deadlock if listenOnPanicAbort already exited + _ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, err) } return err } @@ -210,24 +202,16 @@ func (this *Migrator) retryOperationWithExponentialBackoff(operation func() erro // consumes and drops any further incoming events that may be left hanging. func (this *Migrator) consumeRowCopyComplete() { if err := <-this.rowCopyComplete; err != nil { - select { - case this.migrationContext.PanicAbort <- err: - // Error sent successfully - case <-this.migrationContext.GetContext().Done(): - // Context cancelled, someone else already reported an error - } + // Use helper to prevent deadlock if listenOnPanicAbort already exited + _ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, err) } atomic.StoreInt64(&this.rowCopyCompleteFlag, 1) this.migrationContext.MarkRowCopyEndTime() go func() { for err := range this.rowCopyComplete { if err != nil { - select { - case this.migrationContext.PanicAbort <- err: - // Error sent successfully - case <-this.migrationContext.GetContext().Done(): - // Context cancelled, someone else already reported an error - } + // Use helper to prevent deadlock if listenOnPanicAbort already exited + _ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, err) } } }() @@ -258,19 +242,14 @@ func (this *Migrator) onChangelogStateEvent(dmlEntry *binlog.BinlogEntry) (err e case Migrated, ReadMigrationRangeValues: // no-op event case GhostTableMigrated: - select { - case this.ghostTableMigrated <- true: - // Successfully sent - case <-this.migrationContext.GetContext().Done(): - // Context cancelled, migration is aborting - } + // Use helper to prevent deadlock if migration aborts before receiver is ready + _ = base.SendWithContext(this.migrationContext.GetContext(), this.ghostTableMigrated, true) case AllEventsUpToLockProcessed: var applyEventFunc tableWriteFunc = func() error { - this.allEventsUpToLockProcessed <- &lockProcessedStruct{ + return base.SendWithContext(this.migrationContext.GetContext(), this.allEventsUpToLockProcessed, &lockProcessedStruct{ state: changelogStateString, coords: dmlEntry.Coordinates.Clone(), - } - return nil + }) } // at this point we know all events up to lock have been read from the streamer, // because the streamer works sequentially. So those events are either already handled, @@ -278,12 +257,8 @@ func (this *Migrator) onChangelogStateEvent(dmlEntry *binlog.BinlogEntry) (err e // So as not to create a potential deadlock, we write this func to applyEventsQueue // asynchronously, understanding it doesn't really matter. go func() { - select { - case this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc): - // Successfully enqueued - case <-this.migrationContext.GetContext().Done(): - // Context cancelled, migration is aborting - } + // Use helper to prevent deadlock if buffer fills and executeWriteFuncs exits + _ = base.SendWithContext(this.migrationContext.GetContext(), this.applyEventsQueue, newApplyEventStructByFunc(&applyEventFunc)) }() default: return fmt.Errorf("Unknown changelog state: %+v", changelogState) @@ -1387,12 +1362,8 @@ func (this *Migrator) initiateStreaming() error { this.migrationContext.Log.Debugf("Beginning streaming") err := this.eventsStreamer.StreamEvents(this.canStopStreaming) if err != nil { - select { - case this.migrationContext.PanicAbort <- err: - // Error sent successfully - case <-this.migrationContext.GetContext().Done(): - // Context cancelled, someone else already reported an error - } + // Use helper to prevent deadlock if listenOnPanicAbort already exited + _ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, err) } this.migrationContext.Log.Debugf("Done streaming") }() @@ -1418,14 +1389,9 @@ func (this *Migrator) addDMLEventsListener() error { this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, func(dmlEntry *binlog.BinlogEntry) error { - select { - case this.applyEventsQueue <- newApplyEventStructByDML(dmlEntry): - // Successfully enqueued - return nil - case <-this.migrationContext.GetContext().Done(): - // Context cancelled, stop processing events - return this.migrationContext.GetContext().Err() - } + // Use helper to prevent deadlock if buffer fills and executeWriteFuncs exits + // This is critical because this callback blocks the event streamer + return base.SendWithContext(this.migrationContext.GetContext(), this.applyEventsQueue, newApplyEventStructByDML(dmlEntry)) }, ) return err @@ -1503,7 +1469,7 @@ func (this *Migrator) initiateApplier() error { // a chunk of rows onto the ghost table. func (this *Migrator) iterateChunks() error { terminateRowIteration := func(err error) error { - this.rowCopyComplete <- err + _ = base.SendWithContext(this.migrationContext.GetContext(), this.rowCopyComplete, err) return this.migrationContext.Log.Errore(err) } if this.migrationContext.Noop { @@ -1590,15 +1556,13 @@ func (this *Migrator) iterateChunks() error { return nil } // Enqueue copy operation; to be executed by executeWriteFuncs() - select { - case this.copyRowsQueue <- copyRowsFunc: - // Successfully enqueued - case <-this.migrationContext.GetContext().Done(): + // Use helper to prevent deadlock if executeWriteFuncs exits + if err := base.SendWithContext(this.migrationContext.GetContext(), this.copyRowsQueue, copyRowsFunc); err != nil { // Context cancelled, check for abort and exit - if err := this.checkAbort(); err != nil { - return terminateRowIteration(err) + if abortErr := this.checkAbort(); abortErr != nil { + return terminateRowIteration(abortErr) } - return terminateRowIteration(this.migrationContext.GetContext().Err()) + return terminateRowIteration(err) } } } diff --git a/go/logic/server.go b/go/logic/server.go index 7471b0b48..74097acb7 100644 --- a/go/logic/server.go +++ b/go/logic/server.go @@ -450,12 +450,8 @@ help # This message return NoPrintStatusRule, err } err := fmt.Errorf("User commanded 'panic'. The migration will be aborted without cleanup. Please drop the gh-ost tables before trying again.") - select { - case this.migrationContext.PanicAbort <- err: - // Error sent successfully - case <-this.migrationContext.GetContext().Done(): - // Context cancelled, someone else already reported an error - } + // Use helper to prevent deadlock if listenOnPanicAbort already exited + _ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, err) return NoPrintStatusRule, err } default: diff --git a/go/logic/throttler.go b/go/logic/throttler.go index 558a6ad75..d52c94a13 100644 --- a/go/logic/throttler.go +++ b/go/logic/throttler.go @@ -362,12 +362,8 @@ func (this *Throttler) collectGeneralThrottleMetrics() error { // Regardless of throttle, we take opportunity to check for panic-abort if this.migrationContext.PanicFlagFile != "" { if base.FileExists(this.migrationContext.PanicFlagFile) { - select { - case this.migrationContext.PanicAbort <- fmt.Errorf("Found panic-file %s. Aborting without cleanup", this.migrationContext.PanicFlagFile): - // Error sent successfully - case <-this.migrationContext.GetContext().Done(): - // Context cancelled, someone else already reported an error - } + // Use helper to prevent deadlock if listenOnPanicAbort already exited + _ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, fmt.Errorf("Found panic-file %s. Aborting without cleanup", this.migrationContext.PanicFlagFile)) } } @@ -390,12 +386,8 @@ func (this *Throttler) collectGeneralThrottleMetrics() error { } if criticalLoadMet && this.migrationContext.CriticalLoadIntervalMilliseconds == 0 { - select { - case this.migrationContext.PanicAbort <- fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold): - // Error sent successfully - case <-this.migrationContext.GetContext().Done(): - // Context cancelled, someone else already reported an error - } + // Use helper to prevent deadlock if listenOnPanicAbort already exited + _ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold)) } if criticalLoadMet && this.migrationContext.CriticalLoadIntervalMilliseconds > 0 { this.migrationContext.Log.Errorf("critical-load met once: %s=%d, >=%d. Will check again in %d millis", variableName, value, threshold, this.migrationContext.CriticalLoadIntervalMilliseconds) @@ -403,12 +395,8 @@ func (this *Throttler) collectGeneralThrottleMetrics() error { timer := time.NewTimer(time.Millisecond * time.Duration(this.migrationContext.CriticalLoadIntervalMilliseconds)) <-timer.C if criticalLoadMetAgain, variableName, value, threshold, _ := this.criticalLoadIsMet(); criticalLoadMetAgain { - select { - case this.migrationContext.PanicAbort <- fmt.Errorf("critical-load met again after %d millis: %s=%d, >=%d", this.migrationContext.CriticalLoadIntervalMilliseconds, variableName, value, threshold): - // Error sent successfully - case <-this.migrationContext.GetContext().Done(): - // Context cancelled, someone else already reported an error - } + // Use helper to prevent deadlock if listenOnPanicAbort already exited + _ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, fmt.Errorf("critical-load met again after %d millis: %s=%d, >=%d", this.migrationContext.CriticalLoadIntervalMilliseconds, variableName, value, threshold)) } }() } From 6944341b7caaf711a7506d7157ed00fbc67f9a36 Mon Sep 17 00:00:00 2001 From: Gabriel Gilder Date: Tue, 10 Mar 2026 13:19:50 -0700 Subject: [PATCH 09/12] Fix deadlock on PanicAbort sends --- go/base/context.go | 2 +- go/logic/migrator.go | 3 +++ go/logic/throttler.go | 2 ++ 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/go/base/context.go b/go/base/context.go index 2c8d28d56..94822fa99 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -324,7 +324,7 @@ func NewMigrationContext() *MigrationContext { pointOfInterestTimeMutex: &sync.Mutex{}, lastHeartbeatOnChangelogMutex: &sync.Mutex{}, ColumnRenameMap: make(map[string]string), - PanicAbort: make(chan error), + PanicAbort: make(chan error, 1), ctx: ctx, cancelFunc: cancelFunc, abortMutex: &sync.Mutex{}, diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 38775fa46..0346c075c 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -204,6 +204,8 @@ func (this *Migrator) consumeRowCopyComplete() { if err := <-this.rowCopyComplete; err != nil { // Use helper to prevent deadlock if listenOnPanicAbort already exited _ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, err) + // Don't mark row copy as complete if there was an error + return } atomic.StoreInt64(&this.rowCopyCompleteFlag, 1) this.migrationContext.MarkRowCopyEndTime() @@ -212,6 +214,7 @@ func (this *Migrator) consumeRowCopyComplete() { if err != nil { // Use helper to prevent deadlock if listenOnPanicAbort already exited _ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, err) + return } } }() diff --git a/go/logic/throttler.go b/go/logic/throttler.go index d52c94a13..1ca40f957 100644 --- a/go/logic/throttler.go +++ b/go/logic/throttler.go @@ -364,6 +364,7 @@ func (this *Throttler) collectGeneralThrottleMetrics() error { if base.FileExists(this.migrationContext.PanicFlagFile) { // Use helper to prevent deadlock if listenOnPanicAbort already exited _ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, fmt.Errorf("Found panic-file %s. Aborting without cleanup", this.migrationContext.PanicFlagFile)) + return nil } } @@ -388,6 +389,7 @@ func (this *Throttler) collectGeneralThrottleMetrics() error { if criticalLoadMet && this.migrationContext.CriticalLoadIntervalMilliseconds == 0 { // Use helper to prevent deadlock if listenOnPanicAbort already exited _ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold)) + return nil } if criticalLoadMet && this.migrationContext.CriticalLoadIntervalMilliseconds > 0 { this.migrationContext.Log.Errorf("critical-load met once: %s=%d, >=%d. Will check again in %d millis", variableName, value, threshold, this.migrationContext.CriticalLoadIntervalMilliseconds) From 8c0f8054dbb3a3f9675a7bf2b440bc9187516379 Mon Sep 17 00:00:00 2001 From: Gabriel Gilder Date: Tue, 10 Mar 2026 15:55:08 -0700 Subject: [PATCH 10/12] Use checkAbort --- go/logic/migrator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 0346c075c..1638ba8b2 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -545,7 +545,7 @@ func (this *Migrator) Migrate() (err error) { this.consumeRowCopyComplete() this.migrationContext.Log.Infof("Row copy complete") // Check if row copy was aborted due to error - if err := this.migrationContext.GetAbortError(); err != nil { + if err := this.checkAbort(); err != nil { return err } if err := this.hooksExecutor.onRowCopyComplete(); err != nil { From 7b47215d9e14e3834cfc68d30abd422ba9462a6f Mon Sep 17 00:00:00 2001 From: Gabriel Gilder Date: Tue, 10 Mar 2026 16:04:21 -0700 Subject: [PATCH 11/12] Fix migration abort race condition --- go/logic/migrator.go | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 1638ba8b2..ca5f5a729 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -202,8 +202,8 @@ func (this *Migrator) retryOperationWithExponentialBackoff(operation func() erro // consumes and drops any further incoming events that may be left hanging. func (this *Migrator) consumeRowCopyComplete() { if err := <-this.rowCopyComplete; err != nil { - // Use helper to prevent deadlock if listenOnPanicAbort already exited - _ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, err) + // Abort synchronously to ensure checkAbort() sees the error immediately + this.abort(err) // Don't mark row copy as complete if there was an error return } @@ -212,8 +212,8 @@ func (this *Migrator) consumeRowCopyComplete() { go func() { for err := range this.rowCopyComplete { if err != nil { - // Use helper to prevent deadlock if listenOnPanicAbort already exited - _ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, err) + // Abort synchronously to ensure the error is stored immediately + this.abort(err) return } } @@ -285,10 +285,10 @@ func (this *Migrator) onChangelogHeartbeatEvent(dmlEntry *binlog.BinlogEntry) (e } } -// listenOnPanicAbort listens for fatal errors and initiates graceful shutdown -func (this *Migrator) listenOnPanicAbort() { - err := <-this.migrationContext.PanicAbort - +// abort stores the error, cancels the context, and logs the abort. +// This is the common abort logic used by both listenOnPanicAbort and +// consumeRowCopyComplete to ensure consistent error handling. +func (this *Migrator) abort(err error) { // Store the error for Migrate() to return this.migrationContext.SetAbortError(err) @@ -299,6 +299,12 @@ func (this *Migrator) listenOnPanicAbort() { this.migrationContext.Log.Errorf("Migration aborted: %v", err) } +// listenOnPanicAbort listens for fatal errors and initiates graceful shutdown +func (this *Migrator) listenOnPanicAbort() { + err := <-this.migrationContext.PanicAbort + this.abort(err) +} + // validateAlterStatement validates the `alter` statement meets criteria. // At this time this means: // - column renames are approved From c236d51690febb04f3facb2a2753e51b11ee15e6 Mon Sep 17 00:00:00 2001 From: Gabriel Gilder Date: Tue, 10 Mar 2026 16:14:12 -0700 Subject: [PATCH 12/12] Remove buffer on PanicAbort channel --- go/base/context.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/base/context.go b/go/base/context.go index 94822fa99..2c8d28d56 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -324,7 +324,7 @@ func NewMigrationContext() *MigrationContext { pointOfInterestTimeMutex: &sync.Mutex{}, lastHeartbeatOnChangelogMutex: &sync.Mutex{}, ColumnRenameMap: make(map[string]string), - PanicAbort: make(chan error, 1), + PanicAbort: make(chan error), ctx: ctx, cancelFunc: cancelFunc, abortMutex: &sync.Mutex{},