@@ -20,8 +20,7 @@ package job
20
20
import (
21
21
"context"
22
22
"fmt"
23
-
24
- "github.com/robfig/cron/v3"
23
+ "time"
25
24
26
25
"github.com/polarismesh/polaris/cache"
27
26
commonlog "github.com/polarismesh/polaris/common/log"
@@ -36,8 +35,8 @@ var log = commonlog.GetScopeOrDefaultByName(commonlog.DefaultLoggerName)
36
35
type MaintainJobs struct {
37
36
jobs map [string ]maintainJob
38
37
startedJobs map [string ]maintainJob
39
- scheduler * cron.Cron
40
38
storage store.Store
39
+ cancel context.CancelFunc
41
40
}
42
41
43
42
// NewMaintainJobs
@@ -53,13 +52,14 @@ func NewMaintainJobs(namingServer service.DiscoverServer, cacheMgn *cache.CacheM
53
52
storage : storage },
54
53
},
55
54
startedJobs : map [string ]maintainJob {},
56
- scheduler : newCron (),
57
55
storage : storage ,
58
56
}
59
57
}
60
58
61
59
// StartMaintainJobs
62
60
func (mj * MaintainJobs ) StartMaintianJobs (configs []JobConfig ) error {
61
+ ctx , cancel := context .WithCancel (context .Background ())
62
+ mj .cancel = cancel
63
63
for _ , cfg := range configs {
64
64
if ! cfg .Enable {
65
65
log .Infof ("[Maintain][Job] job (%s) not enable" , cfg .Name )
@@ -83,33 +83,27 @@ func (mj *MaintainJobs) StartMaintianJobs(configs []JobConfig) error {
83
83
log .Errorf ("[Maintain][Job][%s] start leader election err: %v" , cfg .Name , err )
84
84
return err
85
85
}
86
- _ , err = mj . scheduler . AddFunc (cfg .CronSpec , newCronCmd ( cfg . Name , job , mj . storage ) )
86
+ dur , err := time . ParseDuration (cfg .Interval )
87
87
if err != nil {
88
- log .Errorf ("[Maintain][Job] job (%s) fail to start, err: %v" , cfg .Name , err )
89
- return fmt . Errorf ( "[Maintain][Job] job (%s) fail to start" , cfg . Name )
88
+ log .Errorf ("[Maintain][Job][%s] parse job exec interval err: %v" , cfg .Name , err )
89
+ return err
90
90
}
91
+ runAdminJob (ctx , cfg .Name , dur , job , mj .storage )
91
92
mj .startedJobs [cfg .Name ] = job
92
93
}
93
- mj .scheduler .Start ()
94
94
return nil
95
95
}
96
96
97
97
// StopMaintainJobs
98
98
func (mj * MaintainJobs ) StopMaintainJobs () {
99
- ctx := mj .scheduler .Stop ()
100
- <- ctx .Done ()
99
+ if mj .cancel != nil {
100
+ mj .cancel ()
101
+ }
101
102
mj .startedJobs = map [string ]maintainJob {}
102
103
}
103
104
104
- func newCron () * cron.Cron {
105
- return cron .New (cron .WithChain (
106
- cron .Recover (cron .DefaultLogger )),
107
- cron .WithParser (cron .NewParser (
108
- cron .Minute | cron .Hour | cron .Dom | cron .Month | cron .Dow | cron .Descriptor )))
109
- }
110
-
111
- func newCronCmd (name string , job maintainJob , storage store.Store ) func () {
112
- return func () {
105
+ func runAdminJob (ctx context.Context , name string , interval time.Duration , job maintainJob , storage store.Store ) {
106
+ f := func () {
113
107
if ! storage .IsLeader (store .ElectionKeyMaintainJobPrefix + name ) {
114
108
log .Infof ("[Maintain][Job][%s] I am follower" , name )
115
109
job .clear ()
@@ -118,8 +112,19 @@ func newCronCmd(name string, job maintainJob, storage store.Store) func() {
118
112
log .Infof ("[Maintain][Job][%s] I am leader, job start" , name )
119
113
job .execute ()
120
114
log .Infof ("[Maintain][Job][%s] I am leader, job end" , name )
121
-
122
115
}
116
+
117
+ ticker := time .NewTicker (interval )
118
+ go func (ctx context.Context ) {
119
+ for {
120
+ select {
121
+ case <- ctx .Done ():
122
+ return
123
+ case <- ticker .C :
124
+ f ()
125
+ }
126
+ }
127
+ }(ctx )
123
128
}
124
129
125
130
type maintainJob interface {
0 commit comments