@@ -7,6 +7,7 @@ package logic
77
88import (
99 "context"
10+ "errors"
1011 "fmt"
1112 "io"
1213 "math"
@@ -22,6 +23,10 @@ import (
2223 "github.com/github/gh-ost/go/sql"
2324)
2425
26+ var (
27+ ErrMigratorUnsupportedRenameAlter = errors .New ("ALTER statement seems to RENAME the table. This is not supported, and you should run your RENAME outside gh-ost." )
28+ )
29+
2530type ChangelogState string
2631
2732const (
@@ -223,28 +228,22 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
223228 case Migrated , ReadMigrationRangeValues :
224229 // no-op event
225230 case GhostTableMigrated :
226- {
227- this .ghostTableMigrated <- true
228- }
231+ this .ghostTableMigrated <- true
229232 case AllEventsUpToLockProcessed :
230- {
231- var applyEventFunc tableWriteFunc = func () error {
232- this .allEventsUpToLockProcessed <- changelogStateString
233- return nil
234- }
235- // at this point we know all events up to lock have been read from the streamer,
236- // because the streamer works sequentially. So those events are either already handled,
237- // or have event functions in applyEventsQueue.
238- // So as not to create a potential deadlock, we write this func to applyEventsQueue
239- // asynchronously, understanding it doesn't really matter.
240- go func () {
241- this .applyEventsQueue <- newApplyEventStructByFunc (& applyEventFunc )
242- }()
233+ var applyEventFunc tableWriteFunc = func () error {
234+ this .allEventsUpToLockProcessed <- changelogStateString
235+ return nil
243236 }
237+ // at this point we know all events up to lock have been read from the streamer,
238+ // because the streamer works sequentially. So those events are either already handled,
239+ // or have event functions in applyEventsQueue.
240+ // So as not to create a potential deadlock, we write this func to applyEventsQueue
241+ // asynchronously, understanding it doesn't really matter.
242+ go func () {
243+ this .applyEventsQueue <- newApplyEventStructByFunc (& applyEventFunc )
244+ }()
244245 default :
245- {
246- return fmt .Errorf ("Unknown changelog state: %+v" , changelogState )
247- }
246+ return fmt .Errorf ("Unknown changelog state: %+v" , changelogState )
248247 }
249248 this .migrationContext .Log .Infof ("Handled changelog state %s" , changelogState )
250249 return nil
@@ -268,13 +267,13 @@ func (this *Migrator) listenOnPanicAbort() {
268267 this .migrationContext .Log .Fatale (err )
269268}
270269
271- // validateStatement validates the `alter` statement meets criteria.
270+ // validateAlterStatement validates the `alter` statement meets criteria.
272271// At this time this means:
273272// - column renames are approved
274273// - no table rename allowed
275- func (this * Migrator ) validateStatement () (err error ) {
274+ func (this * Migrator ) validateAlterStatement () (err error ) {
276275 if this .parser .IsRenameTable () {
277- return fmt . Errorf ( "ALTER statement seems to RENAME the table. This is not supported, and you should run your RENAME outside gh-ost." )
276+ return ErrMigratorUnsupportedRenameAlter
278277 }
279278 if this .parser .HasNonTrivialRenames () && ! this .migrationContext .SkipRenamedColumns {
280279 this .migrationContext .ColumnRenameMap = this .parser .GetNonTrivialRenames ()
@@ -352,7 +351,7 @@ func (this *Migrator) Migrate() (err error) {
352351 if err := this .parser .ParseAlterStatement (this .migrationContext .AlterStatement ); err != nil {
353352 return err
354353 }
355- if err := this .validateStatement (); err != nil {
354+ if err := this .validateAlterStatement (); err != nil {
356355 return err
357356 }
358357
@@ -903,72 +902,49 @@ func (this *Migrator) printMigrationStatusHint(writers ...io.Writer) {
903902 }
904903}
905904
906- // printStatus prints the progress status, and optionally additionally detailed
907- // dump of configuration.
908- // `rule` indicates the type of output expected.
909- // By default the status is written to standard output, but other writers can
910- // be used as well.
911- func (this * Migrator ) printStatus (rule PrintStatusRule , writers ... io.Writer ) {
912- if rule == NoPrintStatusRule {
913- return
914- }
915- writers = append (writers , os .Stdout )
916-
917- elapsedTime := this .migrationContext .ElapsedTime ()
918- elapsedSeconds := int64 (elapsedTime .Seconds ())
919- totalRowsCopied := this .migrationContext .GetTotalRowsCopied ()
920- rowsEstimate := atomic .LoadInt64 (& this .migrationContext .RowsEstimate ) + atomic .LoadInt64 (& this .migrationContext .RowsDeltaEstimate )
921- if atomic .LoadInt64 (& this .rowCopyCompleteFlag ) == 1 {
922- // Done copying rows. The totalRowsCopied value is the de-facto number of rows,
923- // and there is no further need to keep updating the value.
924- rowsEstimate = totalRowsCopied
925- }
926- var progressPct float64
927- if rowsEstimate == 0 {
928- progressPct = 100.0
929- } else {
930- progressPct = 100.0 * float64 (totalRowsCopied ) / float64 (rowsEstimate )
931- }
932- // we take the opportunity to update migration context with progressPct
933- this .migrationContext .SetProgressPct (progressPct )
934- // Before status, let's see if we should print a nice reminder for what exactly we're doing here.
935- shouldPrintMigrationStatusHint := (elapsedSeconds % 600 == 0 )
936- if rule == ForcePrintStatusAndHintRule {
937- shouldPrintMigrationStatusHint = true
938- }
939- if rule == ForcePrintStatusOnlyRule {
940- shouldPrintMigrationStatusHint = false
941- }
942- if shouldPrintMigrationStatusHint {
943- this .printMigrationStatusHint (writers ... )
905+ // getProgressPercent returns an estimate of migration progess as a percent.
906+ func (this * Migrator ) getProgressPercent (rowsEstimate int64 ) (progressPct float64 ) {
907+ progressPct = 100.0
908+ if rowsEstimate > 0 {
909+ progressPct *= float64 (this .migrationContext .GetTotalRowsCopied ()) / float64 (rowsEstimate )
944910 }
911+ return progressPct
912+ }
945913
946- var etaSeconds float64 = math .MaxFloat64
947- var etaDuration = time .Duration (base .ETAUnknown )
914+ // getMigrationETA returns the estimated duration of the migration
915+ func (this * Migrator ) getMigrationETA (rowsEstimate int64 ) (eta string , duration time.Duration ) {
916+ duration = time .Duration (base .ETAUnknown )
917+ progressPct := this .getProgressPercent (rowsEstimate )
948918 if progressPct >= 100.0 {
949- etaDuration = 0
919+ duration = 0
950920 } else if progressPct >= 0.1 {
921+ totalRowsCopied := this .migrationContext .GetTotalRowsCopied ()
951922 elapsedRowCopySeconds := this .migrationContext .ElapsedRowCopyTime ().Seconds ()
952923 totalExpectedSeconds := elapsedRowCopySeconds * float64 (rowsEstimate ) / float64 (totalRowsCopied )
953- etaSeconds = totalExpectedSeconds - elapsedRowCopySeconds
924+ etaSeconds : = totalExpectedSeconds - elapsedRowCopySeconds
954925 if etaSeconds >= 0 {
955- etaDuration = time .Duration (etaSeconds ) * time .Second
926+ duration = time .Duration (etaSeconds ) * time .Second
956927 } else {
957- etaDuration = 0
928+ duration = 0
958929 }
959930 }
960- this .migrationContext .SetETADuration (etaDuration )
961- var eta string
962- switch etaDuration {
931+
932+ switch duration {
963933 case 0 :
964934 eta = "due"
965935 case time .Duration (base .ETAUnknown ):
966936 eta = "N/A"
967937 default :
968- eta = base .PrettifyDurationOutput (etaDuration )
938+ eta = base .PrettifyDurationOutput (duration )
969939 }
970940
971- state := "migrating"
941+ return eta , duration
942+ }
943+
944+ // getMigrationStateAndETA returns the state and eta of the migration.
945+ func (this * Migrator ) getMigrationStateAndETA (rowsEstimate int64 ) (state , eta string , etaDuration time.Duration ) {
946+ eta , etaDuration = this .getMigrationETA (rowsEstimate )
947+ state = "migrating"
972948 if atomic .LoadInt64 (& this .migrationContext .CountingRowsFlag ) > 0 && ! this .migrationContext .ConcurrentCountTableRows {
973949 state = "counting rows"
974950 } else if atomic .LoadInt64 (& this .migrationContext .IsPostponingCutOver ) > 0 {
@@ -977,27 +953,78 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
977953 } else if isThrottled , throttleReason , _ := this .migrationContext .IsThrottled (); isThrottled {
978954 state = fmt .Sprintf ("throttled, %s" , throttleReason )
979955 }
956+ return state , eta , etaDuration
957+ }
980958
981- var shouldPrintStatus bool
982- if rule == HeuristicPrintStatusRule {
983- if elapsedSeconds <= 60 {
984- shouldPrintStatus = true
985- } else if etaSeconds <= 60 {
986- shouldPrintStatus = true
987- } else if etaSeconds <= 180 {
988- shouldPrintStatus = (elapsedSeconds % 5 == 0 )
989- } else if elapsedSeconds <= 180 {
990- shouldPrintStatus = (elapsedSeconds % 5 == 0 )
991- } else if this .migrationContext .TimeSincePointOfInterest ().Seconds () <= 60 {
992- shouldPrintStatus = (elapsedSeconds % 5 == 0 )
993- } else {
994- shouldPrintStatus = (elapsedSeconds % 30 == 0 )
995- }
959+ // shouldPrintStatus returns true when the migrator is due to print status info.
960+ func (this * Migrator ) shouldPrintStatus (rule PrintStatusRule , elapsedSeconds int64 , etaDuration time.Duration ) (shouldPrint bool ) {
961+ if rule != HeuristicPrintStatusRule {
962+ return true
963+ }
964+
965+ etaSeconds := etaDuration .Seconds ()
966+ if elapsedSeconds <= 60 {
967+ shouldPrint = true
968+ } else if etaSeconds <= 60 {
969+ shouldPrint = true
970+ } else if etaSeconds <= 180 {
971+ shouldPrint = (elapsedSeconds % 5 == 0 )
972+ } else if elapsedSeconds <= 180 {
973+ shouldPrint = (elapsedSeconds % 5 == 0 )
974+ } else if this .migrationContext .TimeSincePointOfInterest ().Seconds () <= 60 {
975+ shouldPrint = (elapsedSeconds % 5 == 0 )
996976 } else {
997- // Not heuristic
998- shouldPrintStatus = true
977+ shouldPrint = (elapsedSeconds % 30 == 0 )
978+ }
979+
980+ return shouldPrint
981+ }
982+
983+ // shouldPrintMigrationStatus returns true when the migrator is due to print the migration status hint
984+ func (this * Migrator ) shouldPrintMigrationStatusHint (rule PrintStatusRule , elapsedSeconds int64 ) (shouldPrint bool ) {
985+ if elapsedSeconds % 600 == 0 {
986+ shouldPrint = true
987+ } else if rule == ForcePrintStatusAndHintRule {
988+ shouldPrint = true
989+ }
990+ return shouldPrint
991+ }
992+
993+ // printStatus prints the progress status, and optionally additionally detailed
994+ // dump of configuration.
995+ // `rule` indicates the type of output expected.
996+ // By default the status is written to standard output, but other writers can
997+ // be used as well.
998+ func (this * Migrator ) printStatus (rule PrintStatusRule , writers ... io.Writer ) {
999+ if rule == NoPrintStatusRule {
1000+ return
1001+ }
1002+ writers = append (writers , os .Stdout )
1003+
1004+ elapsedTime := this .migrationContext .ElapsedTime ()
1005+ elapsedSeconds := int64 (elapsedTime .Seconds ())
1006+ totalRowsCopied := this .migrationContext .GetTotalRowsCopied ()
1007+ rowsEstimate := atomic .LoadInt64 (& this .migrationContext .RowsEstimate ) + atomic .LoadInt64 (& this .migrationContext .RowsDeltaEstimate )
1008+ if atomic .LoadInt64 (& this .rowCopyCompleteFlag ) == 1 {
1009+ // Done copying rows. The totalRowsCopied value is the de-facto number of rows,
1010+ // and there is no further need to keep updating the value.
1011+ rowsEstimate = totalRowsCopied
1012+ }
1013+
1014+ // we take the opportunity to update migration context with progressPct
1015+ progressPct := this .getProgressPercent (rowsEstimate )
1016+ this .migrationContext .SetProgressPct (progressPct )
1017+
1018+ // Before status, let's see if we should print a nice reminder for what exactly we're doing here.
1019+ if this .shouldPrintMigrationStatusHint (rule , elapsedSeconds ) {
1020+ this .printMigrationStatusHint (writers ... )
9991021 }
1000- if ! shouldPrintStatus {
1022+
1023+ // Get state + ETA
1024+ state , eta , etaDuration := this .getMigrationStateAndETA (rowsEstimate )
1025+ this .migrationContext .SetETADuration (etaDuration )
1026+
1027+ if ! this .shouldPrintStatus (rule , elapsedSeconds , etaDuration ) {
10011028 return
10021029 }
10031030
@@ -1016,7 +1043,7 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
10161043 )
10171044 this .applier .WriteChangelog (
10181045 fmt .Sprintf ("copy iteration %d at %d" , this .migrationContext .GetIteration (), time .Now ().Unix ()),
1019- status ,
1046+ state ,
10201047 )
10211048 w := io .MultiWriter (writers ... )
10221049 fmt .Fprintln (w , status )
0 commit comments