Skip to content

Commit 62f4ebc

Browse files
committed
Send logs to the journal "from" the container
Previously, dockerd just relayed messages by itself from containers to the journal, which caused journald to apply rate limiting to messages across all containers as a single group. Here, we add another process to each container's cgroup, and have dockerd forward messages to that process over a pipe. That process, named "journal-logger", receives the messages and sends them on to the journal. As part of the container's cgroup, it's killed when the main process exits, so we only need to close the pipe and read its exit status when closing the logger. Signed-off-by: Nalin Dahyabhai <[email protected]>
1 parent b96df77 commit 62f4ebc

File tree

3 files changed

+159
-22
lines changed

3 files changed

+159
-22
lines changed

daemon/logger/journald/journald.go

Lines changed: 157 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,35 +5,61 @@
55
package journald
66

77
import (
8+
"encoding/gob"
9+
"flag"
810
"fmt"
11+
"io"
12+
"os"
13+
"os/exec"
14+
"path/filepath"
15+
"strings"
916
"sync"
17+
"syscall"
1018
"unicode"
1119

1220
"github.com/Sirupsen/logrus"
1321
"github.com/coreos/go-systemd/journal"
1422
"github.com/docker/docker/daemon/logger"
1523
"github.com/docker/docker/daemon/logger/loggerutils"
24+
"github.com/docker/docker/pkg/reexec"
25+
"golang.org/x/sys/unix"
1626
)
1727

1828
const name = "journald"
29+
const handler = "journal-logger"
1930

2031
type journald struct {
32+
// for reading
2133
vars map[string]string // additional variables and values to send to the journal along with the log message
2234
readers readerList
35+
// for writing
36+
writing sync.Mutex
37+
cmd *exec.Cmd
38+
pipe io.WriteCloser
39+
encoder *gob.Encoder
2340
}
2441

2542
type readerList struct {
2643
mu sync.Mutex
2744
readers map[*logger.LogWatcher]*logger.LogWatcher
2845
}
2946

47+
// MessageWithVars describes the packet format that we use when forwarding log
48+
// messages from the daemon to a helper process.
49+
type MessageWithVars struct {
50+
logger.Message
51+
Vars map[string]string
52+
}
53+
3054
func init() {
3155
if err := logger.RegisterLogDriver(name, New); err != nil {
3256
logrus.Fatal(err)
3357
}
3458
if err := logger.RegisterLogOptValidator(name, validateLogOpt); err != nil {
3559
logrus.Fatal(err)
3660
}
61+
gob.Register(MessageWithVars{})
62+
reexec.Register(handler, journalLoggerMain)
3763
}
3864

3965
// sanitizeKeyMode returns the sanitized string so that it could be used in journald.
@@ -62,30 +88,48 @@ func New(ctx logger.Context) (logger.Logger, error) {
6288
if !journal.Enabled() {
6389
return nil, fmt.Errorf("journald is not enabled on this host")
6490
}
65-
// Strip a leading slash so that people can search for
66-
// CONTAINER_NAME=foo rather than CONTAINER_NAME=/foo.
67-
name := ctx.ContainerName
68-
if name[0] == '/' {
69-
name = name[1:]
70-
}
7191

72-
// parse log tag
92+
// parse the log tag
7393
tag, err := loggerutils.ParseLogTag(ctx, loggerutils.DefaultTemplate)
7494
if err != nil {
7595
return nil, err
7696
}
77-
97+
// build the set of values which we'll send to the journal every time
7898
vars := map[string]string{
79-
"CONTAINER_ID": ctx.ContainerID[:12],
80-
"CONTAINER_ID_FULL": ctx.ContainerID,
81-
"CONTAINER_NAME": name,
99+
"CONTAINER_ID": ctx.ID(),
100+
"CONTAINER_ID_FULL": ctx.FullID(),
101+
"CONTAINER_NAME": ctx.Name(),
82102
"CONTAINER_TAG": tag,
83103
}
84104
extraAttrs := ctx.ExtraAttributes(sanitizeKeyMod)
85105
for k, v := range extraAttrs {
86106
vars[k] = v
87107
}
88-
return &journald{vars: vars, readers: readerList{readers: make(map[*logger.LogWatcher]*logger.LogWatcher)}}, nil
108+
// start the helper
109+
cgroupSpec, err := ctx.CGroup()
110+
if err != nil {
111+
return nil, err
112+
}
113+
cmd := reexec.Command(handler, cgroupSpec)
114+
cmd.Dir = "/"
115+
pipe, err := cmd.StdinPipe()
116+
if err != nil {
117+
return nil, fmt.Errorf("error opening pipe to logging helper: %v", err)
118+
}
119+
err = cmd.Start()
120+
if err != nil {
121+
return nil, fmt.Errorf("error starting logging helper: %v", err)
122+
}
123+
encoder := gob.NewEncoder(pipe)
124+
// gather up everything we need to hand back
125+
j := &journald{
126+
vars: vars,
127+
readers: readerList{readers: make(map[*logger.LogWatcher]*logger.LogWatcher)},
128+
cmd: cmd,
129+
pipe: pipe,
130+
encoder: encoder,
131+
}
132+
return j, nil
89133
}
90134

91135
// We don't actually accept any options, but we have to supply a callback for
@@ -104,19 +148,110 @@ func validateLogOpt(cfg map[string]string) error {
104148
}
105149

106150
func (s *journald) Log(msg *logger.Message) error {
107-
vars := map[string]string{}
108-
for k, v := range s.vars {
109-
vars[k] = v
110-
}
111-
if msg.Partial {
112-
vars["CONTAINER_PARTIAL_MESSAGE"] = "true"
113-
}
114-
if msg.Source == "stderr" {
115-
return journal.Send(string(msg.Line), journal.PriErr, vars)
151+
// build the message struct for the helper, and send it on down
152+
message := MessageWithVars{
153+
Message: *msg,
154+
Vars: s.vars,
116155
}
117-
return journal.Send(string(msg.Line), journal.PriInfo, vars)
156+
s.writing.Lock()
157+
defer s.writing.Unlock()
158+
return s.encoder.Encode(&message)
118159
}
119160

120161
func (s *journald) Name() string {
121162
return name
122163
}
164+
165+
func (s *journald) closeWriter() {
166+
s.pipe.Close()
167+
if err := s.cmd.Wait(); err != nil {
168+
eerr, ok := err.(*exec.ExitError)
169+
if !ok {
170+
logrus.Errorf("error waiting on log handler: %v", err)
171+
return
172+
}
173+
status, ok := eerr.Sys().(syscall.WaitStatus)
174+
if !ok {
175+
logrus.Errorf("error waiting on log handler: %v", err)
176+
return
177+
}
178+
if !status.Signaled() || (status.Signal() != syscall.SIGTERM && status.Signal() != syscall.SIGKILL) {
179+
logrus.Errorf("error waiting on log handler: %v", err)
180+
return
181+
}
182+
}
183+
}
184+
185+
func loggerLog(f string, args ...interface{}) {
186+
s := fmt.Sprintf(f, args...)
187+
journal.Send(s, journal.PriInfo, nil)
188+
fmt.Fprintln(os.Stderr, s)
189+
}
190+
191+
func joinScope(scope string) error {
192+
// This is... not ideal. But if we're here, we're just going to have
193+
// to assume that we know how to compute the same path that runc is
194+
// going to use, based on a value of the form "parent:docker:ID", where
195+
// the "docker" is literal.
196+
parts := strings.Split(scope, ":")
197+
fs, err := os.Open("/sys/fs/cgroup")
198+
if err != nil {
199+
return err
200+
}
201+
defer fs.Close()
202+
mountPoint := fs.Name()
203+
controllers, err := fs.Readdirnames(-1)
204+
if err != nil {
205+
return err
206+
}
207+
for _, controller := range controllers {
208+
scopeDir := filepath.Join(mountPoint, controller, parts[0], parts[1]+"-"+parts[2]+".scope")
209+
procsFile := filepath.Join(scopeDir, "cgroup.procs")
210+
f, err := os.OpenFile(procsFile, os.O_WRONLY, 0644)
211+
if err != nil && !os.IsNotExist(err) {
212+
return err
213+
}
214+
defer f.Close()
215+
fmt.Fprintln(f, unix.Getpid())
216+
}
217+
return nil
218+
}
219+
220+
func journalLoggerMain() {
221+
flag.Parse()
222+
args := flag.Args()
223+
if len(args) < 0 {
224+
loggerLog("should be invoked with the name of the container's scope")
225+
return
226+
}
227+
joined := false
228+
decoder := gob.NewDecoder(os.Stdin)
229+
for {
230+
var msg MessageWithVars
231+
// wait for the next chunk of data to log
232+
if err := decoder.Decode(&msg); err != nil {
233+
if err == io.EOF {
234+
break
235+
}
236+
loggerLog("error decoding message: %v", err)
237+
continue
238+
}
239+
// if we haven't joined the container's scope yet, do that now
240+
if !joined {
241+
if err := joinScope(args[0]); err != nil {
242+
loggerLog("error joining scope %q: %v", args[0], err)
243+
}
244+
joined = true
245+
}
246+
msg.Vars["CONTAINER_SOURCE"] = msg.Source
247+
// add a note if this message is a partial message
248+
if msg.Partial {
249+
msg.Vars["CONTAINER_PARTIAL_MESSAGE"] = "true"
250+
}
251+
if msg.Source == "stderr" {
252+
journal.Send(string(msg.Line), journal.PriErr, msg.Vars)
253+
continue
254+
}
255+
journal.Send(string(msg.Line), journal.PriInfo, msg.Vars)
256+
}
257+
}

daemon/logger/journald/read.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ import (
162162

163163
func (s *journald) Close() error {
164164
s.readers.mu.Lock()
165+
s.closeWriter()
165166
for reader := range s.readers.readers {
166167
reader.Close()
167168
}

daemon/logger/journald/read_unsupported.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,6 @@
33
package journald
44

55
func (s *journald) Close() error {
6+
s.closeWriter()
67
return nil
78
}

0 commit comments

Comments
 (0)