Skip to content

Commit 9099f65

Browse files
authored
Merge pull request #47 from factorysh/features/restart
Handle proper restarts
2 parents 091f449 + 66923b5 commit 9099f65

File tree

11 files changed

+248
-20
lines changed

11 files changed

+248
-20
lines changed

application/application.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
package application
22

33
import (
4+
"context"
45
"net/http"
56
"os"
7+
"os/signal"
68
"path"
79
"path/filepath"
10+
"syscall"
11+
"time"
812

913
"github.com/docker/go-events"
1014
"github.com/factorysh/microdensity/badge"
@@ -18,6 +22,7 @@ import (
1822
"github.com/factorysh/microdensity/sessions"
1923
"github.com/factorysh/microdensity/sink"
2024
"github.com/factorysh/microdensity/storage"
25+
"github.com/factorysh/microdensity/task"
2126
"github.com/factorysh/microdensity/volumes"
2227
"github.com/getsentry/sentry-go"
2328
"github.com/go-chi/chi/v5"
@@ -39,6 +44,8 @@ type Application struct {
3944
logger *zap.Logger
4045
queue *queue.Queue
4146
Sink events.Sink
47+
Server *http.Server
48+
Stopper chan (os.Signal)
4249
}
4350

4451
func New(cfg *conf.Conf) (*Application, error) {
@@ -114,6 +121,7 @@ func New(cfg *conf.Conf) (*Application, error) {
114121
logger: logger,
115122
queue: &q,
116123
Sink: &sink.VoidSink{},
124+
Stopper: make(chan os.Signal, 1),
117125
}
118126
// A good base middleware stack
119127
r.Use(middleware.RequestID)
@@ -202,3 +210,90 @@ func (a *Application) ListServices() []string {
202210

203211
return list
204212
}
213+
214+
// Run make the app listen and serve requests
215+
func (a *Application) Run(listen string) error {
216+
// listen for stop/restart signals and sends them to the stopper channel
217+
signal.Notify(a.Stopper, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
218+
219+
// setup the router
220+
a.Server = &http.Server{
221+
Addr: listen,
222+
Handler: a.Router,
223+
}
224+
225+
// interrupted tasks becomes ready task and are added to queue
226+
tasks, err := a.storage.All()
227+
if err != nil {
228+
return err
229+
}
230+
231+
for _, t := range tasks {
232+
if t.State == task.Ready || t.State == task.Interrupted {
233+
t.State = task.Ready
234+
parsedArgs, err := a.Services[t.Service].Validate(t.Args)
235+
// non blocking error
236+
if err != nil {
237+
t.State = task.Failed
238+
err := a.storage.Upsert(t)
239+
if err != nil {
240+
a.logger.Fatal("unable to save task", zap.Error(err))
241+
}
242+
243+
a.logger.Error("error when validating task args", zap.String("task", t.Id.String()))
244+
continue
245+
}
246+
247+
err = a.addTask(t, parsedArgs.Environments)
248+
// non blocking error
249+
if err != nil {
250+
a.logger.Error("error when adding task", zap.Error(err))
251+
}
252+
}
253+
}
254+
255+
// start and serve
256+
go func() {
257+
if err := a.Server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
258+
a.logger.Fatal("fatal error when running server", zap.Error(err))
259+
}
260+
}()
261+
262+
return nil
263+
}
264+
265+
// Shutdown the server and put running tasks into interrupted state
266+
func (a *Application) Shutdown() error {
267+
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
268+
defer cancel()
269+
270+
// handle the last pending requests
271+
// we don't want this error stop the shutdown flow
272+
// just log it
273+
err := a.Server.Shutdown(ctx)
274+
if err != nil {
275+
a.logger.Error("error on server shutdown", zap.Error(err))
276+
}
277+
278+
tasks, err := a.storage.All()
279+
if err != nil {
280+
return err
281+
}
282+
283+
for _, t := range tasks {
284+
// running tasks becomes interrupted tasks
285+
if t.State == task.Running {
286+
// TODO: send a cancel request to docker ?
287+
t.State = task.Interrupted
288+
err := a.storage.Upsert(t)
289+
// same here, non blocking error
290+
if err != nil {
291+
a.logger.Error("error when updating task status", zap.Error(err), zap.String("task id", t.Id.String()))
292+
}
293+
}
294+
}
295+
296+
a.logger.Info("server shutdown")
297+
298+
return nil
299+
}

application/application_test.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"net/http/httptest"
1111
"net/url"
1212
"os"
13+
"path"
1314
"path/filepath"
1415
"testing"
1516
"time"
@@ -19,6 +20,8 @@ import (
1920
"github.com/factorysh/microdensity/conf"
2021
"github.com/factorysh/microdensity/mockup"
2122
"github.com/factorysh/microdensity/service"
23+
"github.com/factorysh/microdensity/storage"
24+
"github.com/factorysh/microdensity/task"
2225
"github.com/stretchr/testify/assert"
2326
)
2427

@@ -187,6 +190,70 @@ func TestApplication(t *testing.T) {
187190

188191
}
189192

193+
func TestApplicationStart(t *testing.T) {
194+
// state 5 is interrupted
195+
rawJSON := `{"id":"f79b5c4c-94b4-11ec-a442-00163e007d68","service":"waiter","project":"group%2Fproject","branch":"master","commit":"7e15b158cfc3e8f6bbe3e441a0cdb64bba135ef3","creation":"2022-02-23T15:29:11.082288364+01:00","Args":{"WAIT":10},"State":5}`
196+
197+
gitlab := httptest.NewServer(mockup.GitlabJWK(&key.PublicKey))
198+
defer gitlab.Close()
199+
200+
cfg, cb, err := SpawnConfig(gitlab.URL)
201+
defer cb()
202+
assert.NoError(t, err)
203+
204+
a, err := New(cfg)
205+
assert.NoError(t, err)
206+
207+
taskPath := path.Join(cfg.DataPath, "data", "wait", "group%2Fproject", "master", "f79b5c4c-94b4-11ec-a442-00163e007d68")
208+
err = os.MkdirAll(taskPath, storage.DirMode)
209+
assert.NoError(t, err)
210+
211+
err = os.WriteFile(path.Join(taskPath, "task.json"), []byte(rawJSON), 0644)
212+
assert.NoError(t, err)
213+
214+
err = a.Run(":9090")
215+
assert.NoError(t, err)
216+
217+
tasks, err := a.storage.All()
218+
assert.NoError(t, err)
219+
assert.Equal(t, task.Running, tasks[0].State)
220+
221+
err = a.Shutdown()
222+
assert.NoError(t, err)
223+
}
224+
225+
func TestApplicationStop(t *testing.T) {
226+
// state 5 is interrupted
227+
rawJSON := `{"id":"f79b5c4c-94b4-11ec-a442-00163e007d68","service":"waiter","project":"group%2Fproject","branch":"master","commit":"7e15b158cfc3e8f6bbe3e441a0cdb64bba135ef3","creation":"2022-02-23T15:29:11.082288364+01:00","Args":{"WAIT":10},"State":1}`
228+
229+
gitlab := httptest.NewServer(mockup.GitlabJWK(&key.PublicKey))
230+
defer gitlab.Close()
231+
232+
cfg, cb, err := SpawnConfig(gitlab.URL)
233+
defer cb()
234+
assert.NoError(t, err)
235+
236+
a, err := New(cfg)
237+
assert.NoError(t, err)
238+
239+
taskPath := path.Join(cfg.DataPath, "data", "wait", "group%2Fproject", "master", "f79b5c4c-94b4-11ec-a442-00163e007d68")
240+
err = os.MkdirAll(taskPath, storage.DirMode)
241+
assert.NoError(t, err)
242+
243+
err = os.WriteFile(path.Join(taskPath, "task.json"), []byte(rawJSON), 0644)
244+
assert.NoError(t, err)
245+
246+
err = a.Run(":9090")
247+
assert.NoError(t, err)
248+
249+
err = a.Shutdown()
250+
assert.NoError(t, err)
251+
252+
tasks, err := a.storage.All()
253+
assert.NoError(t, err)
254+
assert.Equal(t, task.Interrupted, tasks[0].State)
255+
}
256+
190257
func SpawnConfig(gitlabURL string) (*conf.Conf, func(), error) {
191258
dataDir, err := ioutil.TempDir(os.TempDir(), "data")
192259
if err != nil {

application/badge.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/factorysh/microdensity/badge"
1212
_badge "github.com/factorysh/microdensity/badge"
13+
"github.com/factorysh/microdensity/task"
1314
"github.com/go-chi/chi/v5"
1415
"go.uber.org/zap"
1516
)
@@ -35,6 +36,7 @@ func (a *Application) BadgeMyTaskHandler(latest bool) http.HandlerFunc {
3536
t, err := a.storage.GetByCommit(service, project, branch, commit, latest)
3637
if err != nil {
3738
l.Warn("Task get error", zap.Error(err))
39+
badge.WriteBadge(service, "not found", _badge.Colors.Default, w)
3840
w.WriteHeader(http.StatusNotFound)
3941
return
4042
}
@@ -43,6 +45,12 @@ func (a *Application) BadgeMyTaskHandler(latest bool) http.HandlerFunc {
4345
p := filepath.Join(a.storage.GetVolumePath(t), "/data", fmt.Sprintf("%s.badge", bdg))
4446
_, err = os.Stat(p)
4547

48+
// if running return early
49+
if t.State == task.Running {
50+
badge.WriteBadge(service, t.State.String(), _badge.Colors.Get(t.State), w)
51+
return
52+
}
53+
4654
// if not found
4755
if err != nil {
4856
// fallback to status badge
@@ -51,7 +59,7 @@ func (a *Application) BadgeMyTaskHandler(latest bool) http.HandlerFunc {
5159
badge.WriteBadge(service, t.State.String(), _badge.Colors.Get(t.State), w)
5260
return
5361
}
54-
w.WriteHeader(http.StatusBadRequest)
62+
w.WriteHeader(http.StatusInternalServerError)
5563
return
5664
}
5765
l = l.With(zap.String("path", p))

application/task.go

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -85,29 +85,40 @@ func (a *Application) PostTaskHandler(w http.ResponseWriter, r *http.Request) {
8585
Args: args,
8686
State: task.Ready,
8787
}
88-
err = a.storage.Upsert(t)
88+
89+
err = a.addTask(t, parsedArgs.Environments)
8990
if err != nil {
90-
l.Warn("Queue error", zap.Error(err))
91-
panic(err)
91+
l.Error("error when adding task", zap.String("task", t.Id.String()), zap.Error(err))
92+
return
9293
}
93-
err = a.storage.EnsureVolumesDir(t)
94+
render.JSON(w, r, map[string]string{
95+
"id": id.String(),
96+
})
97+
}
98+
99+
// addTask adds a task to a queue
100+
func (a *Application) addTask(t *task.Task, args map[string]string) error {
101+
err := a.storage.EnsureVolumesDir(t)
94102
if err != nil {
95-
l.Warn("Volume creation", zap.Error(err))
96-
panic(err)
103+
return err
104+
}
105+
106+
err = a.storage.Upsert(t)
107+
if err != nil {
108+
return err
97109
}
98-
err = a.queue.Put(t, parsedArgs.Environments)
110+
111+
err = a.queue.Put(t, args)
99112
if err != nil {
100-
l.Warn("Task prepare/put", zap.Error(err))
101-
panic(err)
113+
return err
102114
}
115+
103116
err = a.storage.SetLatest(t)
104117
if err != nil {
105-
l.Warn("Task set latest", zap.Error(err))
106-
panic(err)
118+
return err
107119
}
108-
render.JSON(w, r, map[string]string{
109-
"id": id.String(),
110-
})
120+
121+
return err
111122
}
112123

113124
// TaskHandler show a Task

demo/services/waiter/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# Demo service
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
---
2+
3+
services:
4+
background:
5+
image: busybox
6+
command: sleep 50000
7+
hello:
8+
depends_on:
9+
- background
10+
image: busybox
11+
command: >-
12+
&& sleep ${WAIT}
13+
&& echo '{\"color\": \"lime\", \"subject\":\"wait\", \"status\":\"${WAIT}\"}' > /data/demo.badge"
14+
volumes:
15+
- "./data:/data"

demo/services/waiter/meta.js

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
function validate(params) {
2+
if (!("WAIT" in params)) {
3+
throw "WAIT argument is mandatory";
4+
}
5+
if (!Number.isInteger(params.WAIT)) {
6+
throw `WAIT is only numbers : [${params.WAIT}]`;
7+
}
8+
return {
9+
environments: {
10+
WAIT: params.WAIT,
11+
},
12+
};
13+
}

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ require (
99
github.com/docker/cli v20.10.12+incompatible
1010
github.com/docker/compose/v2 v2.2.3
1111
github.com/docker/docker v20.10.12+incompatible
12+
github.com/docker/go-events v0.0.0-20190806004212-e31b211e4f1c
1213
github.com/dop251/goja v0.0.0-20220214123719-b09a6bfa842f
1314
github.com/getsentry/sentry-go v0.12.0
1415
github.com/go-chi/chi/v5 v5.0.7
@@ -40,6 +41,7 @@ require (
4041
github.com/containerd/console v1.0.3 // indirect
4142
github.com/containerd/containerd v1.5.9 // indirect
4243
github.com/containerd/continuity v0.1.0 // indirect
44+
github.com/containerd/fifo v1.0.0 // indirect
4345
github.com/containerd/typeurl v1.0.2 // indirect
4446
github.com/distribution/distribution/v3 v3.0.0-20210316161203-a01c71e2477e // indirect
4547
github.com/dlclark/regexp2 v1.4.1-0.20201116162257-a2a8dda75c91 // indirect
@@ -48,7 +50,6 @@ require (
4850
github.com/docker/docker-credential-helpers v0.6.4 // indirect
4951
github.com/docker/go v1.5.1-1.0.20160303222718-d30aec9fd63c // indirect
5052
github.com/docker/go-connections v0.4.0 // indirect
51-
github.com/docker/go-events v0.0.0-20190806004212-e31b211e4f1c // indirect
5253
github.com/docker/go-metrics v0.0.1 // indirect
5354
github.com/docker/go-units v0.4.0 // indirect
5455
github.com/fvbommel/sortorder v1.0.1 // indirect

main.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"encoding/json"
55
"fmt"
66
"log"
7-
"net/http"
87
"os"
98

109
"github.com/factorysh/microdensity/application"
@@ -52,6 +51,19 @@ func main() {
5251
os.Exit(1)
5352
}
5453

55-
l.Info("starting")
56-
http.ListenAndServe(cfg.Listen, a.Router)
54+
logger.Info("starting")
55+
err = a.Run(cfg.Listen)
56+
if err != nil {
57+
l.Error("Run", zap.Error(err))
58+
os.Exit(1)
59+
}
60+
61+
<-a.Stopper
62+
63+
logger.Info("shutdown signal received")
64+
65+
err = a.Shutdown()
66+
if err != nil {
67+
l.Error("error on shutdown", zap.Error(err))
68+
}
5769
}

0 commit comments

Comments
 (0)