Skip to content

Commit 091f449

Browse files
authored
Merge pull request #25 from factorysh/events
Events
2 parents 7dc01b6 + 948e7dc commit 091f449

File tree

7 files changed

+83
-17
lines changed

7 files changed

+83
-17
lines changed

application/application.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"path"
77
"path/filepath"
88

9+
"github.com/docker/go-events"
910
"github.com/factorysh/microdensity/badge"
1011
"github.com/factorysh/microdensity/conf"
1112
"github.com/factorysh/microdensity/middlewares/jwt"
@@ -15,6 +16,7 @@ import (
1516
"github.com/factorysh/microdensity/run"
1617
"github.com/factorysh/microdensity/service"
1718
"github.com/factorysh/microdensity/sessions"
19+
"github.com/factorysh/microdensity/sink"
1820
"github.com/factorysh/microdensity/storage"
1921
"github.com/factorysh/microdensity/volumes"
2022
"github.com/getsentry/sentry-go"
@@ -36,6 +38,7 @@ type Application struct {
3638
volumes *volumes.Volumes
3739
logger *zap.Logger
3840
queue *queue.Queue
41+
Sink events.Sink
3942
}
4043

4144
func New(cfg *conf.Conf) (*Application, error) {
@@ -110,6 +113,7 @@ func New(cfg *conf.Conf) (*Application, error) {
110113
volumes: v,
111114
logger: logger,
112115
queue: &q,
116+
Sink: &sink.VoidSink{},
113117
}
114118
// A good base middleware stack
115119
r.Use(middleware.RequestID)

event/event.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package event
2+
3+
import (
4+
"github.com/factorysh/microdensity/task"
5+
"github.com/google/uuid"
6+
)
7+
8+
type Event struct {
9+
Id uuid.UUID `json:"id"`
10+
State task.State `json:"state"`
11+
Error error `json:"error,omit_empty"`
12+
}

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ require (
4848
github.com/docker/docker-credential-helpers v0.6.4 // indirect
4949
github.com/docker/go v1.5.1-1.0.20160303222718-d30aec9fd63c // indirect
5050
github.com/docker/go-connections v0.4.0 // indirect
51+
github.com/docker/go-events v0.0.0-20190806004212-e31b211e4f1c // indirect
5152
github.com/docker/go-metrics v0.0.1 // indirect
5253
github.com/docker/go-units v0.4.0 // indirect
5354
github.com/fvbommel/sortorder v1.0.1 // indirect

queue/queue.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@ import (
44
"fmt"
55
"sync"
66

7+
"github.com/docker/go-events"
8+
"github.com/factorysh/microdensity/event"
79
"github.com/factorysh/microdensity/run"
10+
"github.com/factorysh/microdensity/sink"
811
"github.com/factorysh/microdensity/storage"
912
"github.com/factorysh/microdensity/task"
1013
"github.com/oleiade/lane"
@@ -34,6 +37,7 @@ type Queue struct {
3437
BatchEnded chan bool
3538
logger *zap.Logger
3639
working bool
40+
Sink events.Sink
3741
}
3842

3943
// NewQueue inits a new queue struct
@@ -50,6 +54,7 @@ func NewQueue(s storage.Storage, runner *run.Runner) Queue {
5054
runner: runner,
5155
storage: s,
5256
logger: logger,
57+
Sink: &sink.VoidSink{},
5358
}
5459
}
5560

@@ -76,6 +81,10 @@ func (q *Queue) Put(item *task.Task, env map[string]string) error {
7681
queueAdded.Inc()
7782
queueSize.Inc()
7883
q.logger.Info("queue add", zap.Any("task", item))
84+
q.Sink.Write(event.Event{
85+
Id: item.Id,
86+
State: item.State,
87+
})
7988

8089
if !q.working {
8190
q.logger.Info("Start queue")
@@ -126,6 +135,11 @@ func (q *Queue) DequeueWhile() {
126135
} else {
127136
t.State = task.Failed
128137
}
138+
q.Sink.Write(event.Event{
139+
Id: t.Id,
140+
State: t.State,
141+
Error: err,
142+
})
129143

130144
err = q.storage.Upsert(t)
131145
// FIXME: handle err

queue/queue_test.go

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,36 @@
11
package queue
22

33
import (
4+
"fmt"
45
"io/ioutil"
56
"os"
7+
"sync"
68
"testing"
79

10+
"github.com/docker/go-events"
811
"github.com/factorysh/microdensity/run"
912
"github.com/factorysh/microdensity/storage"
1013
"github.com/factorysh/microdensity/task"
1114
"github.com/google/uuid"
1215
"github.com/stretchr/testify/assert"
1316
)
1417

18+
var _ events.Sink = (*DummyEventLogger)(nil)
19+
20+
type DummyEventLogger struct {
21+
Cpt *sync.WaitGroup
22+
}
23+
24+
func (d *DummyEventLogger) Write(evt events.Event) error {
25+
d.Cpt.Done()
26+
fmt.Println("evt", evt)
27+
return nil
28+
}
29+
30+
func (d *DummyEventLogger) Close() error {
31+
return nil
32+
}
33+
1534
func TestDeq(t *testing.T) {
1635
dir, err := ioutil.TempDir(os.TempDir(), "data-")
1736
assert.NoError(t, err)
@@ -22,44 +41,42 @@ func TestDeq(t *testing.T) {
2241
r, err := run.NewRunner("../demo/services", "/tmp/microdensity/volumes", []string{})
2342
assert.NoError(t, err)
2443
que := NewQueue(store, r)
44+
snk := &DummyEventLogger{
45+
Cpt: &sync.WaitGroup{},
46+
}
47+
que.Sink = snk
48+
49+
dummySink := &DummyEventLogger{}
50+
dummySink.Cpt = &sync.WaitGroup{}
2551

2652
tsk1 := &task.Task{
2753
Id: uuid.New(),
2854
Service: "demo",
2955
Project: "beuha",
3056
}
31-
err = r.Prepare(tsk1, nil)
32-
assert.NoError(t, err)
3357

3458
tsk2 := &task.Task{
3559
Id: uuid.New(),
3660
Service: "demo",
3761
Project: "alice",
3862
}
39-
err = r.Prepare(tsk2, nil)
40-
assert.NoError(t, err)
4163

4264
tsk3 := &task.Task{
4365
Id: uuid.New(),
4466
Project: "another",
4567
Service: "demo",
4668
}
47-
err = r.Prepare(tsk3, nil)
48-
assert.NoError(t, err)
49-
50-
tsk4 := &task.Task{
51-
Id: uuid.New(),
52-
Project: "notprepared",
53-
Service: "demo",
54-
}
55-
assert.NoError(t, err)
5669

70+
snk.Cpt.Add(4)
5771
// FIXME: asserts on state status
58-
que.Put(tsk1, nil)
59-
que.Put(tsk2, nil)
60-
que.Put(tsk3, nil)
61-
que.Put(tsk4, nil)
72+
err = que.Put(tsk1, nil)
73+
assert.NoError(t, err)
74+
err = que.Put(tsk2, nil)
75+
assert.NoError(t, err)
76+
err = que.Put(tsk3, nil)
77+
assert.NoError(t, err)
6278

6379
<-que.BatchEnded
80+
//snk.Cpt.Wait()
6481

6582
}

run/run.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ func NewRunner(servicesDir string, volumesRoot string, hosts []string) (*Runner,
4646
}
4747

4848
// Prepare the run
49+
// Prepare is synchronous, in order to raise an error in the REST endpoint.
50+
// Prepare checks volumes stuff.
4951
func (r *Runner) Prepare(t *task.Task, env map[string]string) error {
5052
if t.Id == uuid.Nil {
5153
return fmt.Errorf("task requires an ID to be prepared")

sink/sink.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package sink
2+
3+
import "github.com/docker/go-events"
4+
5+
var _ events.Sink = (*VoidSink)(nil)
6+
7+
type VoidSink struct {
8+
}
9+
10+
func (v *VoidSink) Write(events.Event) error {
11+
return nil
12+
}
13+
14+
func (v *VoidSink) Close() error {
15+
return nil
16+
}

0 commit comments

Comments
 (0)