package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutils" import ( "bytes" "context" "fmt" "io" "os" "strconv" "sync" "time" "github.com/docker/docker/daemon/logger" "github.com/docker/docker/daemon/logger/loggerutils/multireader" "github.com/docker/docker/pkg/filenotify" "github.com/docker/docker/pkg/pubsub" "github.com/docker/docker/pkg/tailfile" "github.com/fsnotify/fsnotify" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) // LogFile is Logger implementation for default Docker logging. type LogFile struct { f *os.File // store for closing closed bool mu sync.RWMutex capacity int64 //maximum size of each file currentSize int64 // current size of the latest file maxFiles int //maximum number of files notifyRotate *pubsub.Publisher marshal logger.MarshalFunc createDecoder makeDecoderFunc } type makeDecoderFunc func(rdr io.Reader) func() (*logger.Message, error) //NewLogFile creates new LogFile func NewLogFile(logPath string, capacity int64, maxFiles int, marshaller logger.MarshalFunc, decodeFunc makeDecoderFunc) (*LogFile, error) { log, err := os.OpenFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0640) if err != nil { return nil, err } size, err := log.Seek(0, os.SEEK_END) if err != nil { return nil, err } return &LogFile{ f: log, capacity: capacity, currentSize: size, maxFiles: maxFiles, notifyRotate: pubsub.NewPublisher(0, 1), marshal: marshaller, createDecoder: decodeFunc, }, nil } // WriteLogEntry writes the provided log message to the current log file. // This may trigger a rotation event if the max file/capacity limits are hit. func (w *LogFile) WriteLogEntry(msg *logger.Message) error { b, err := w.marshal(msg) if err != nil { return errors.Wrap(err, "error marshalling log message") } logger.PutMessage(msg) w.mu.Lock() if w.closed { w.mu.Unlock() return errors.New("cannot write because the output file was closed") } if err := w.checkCapacityAndRotate(); err != nil { w.mu.Unlock() return err } n, err := w.f.Write(b) if err == nil { w.currentSize += int64(n) } w.mu.Unlock() return err } func (w *LogFile) checkCapacityAndRotate() error { if w.capacity == -1 { return nil } if w.currentSize >= w.capacity { name := w.f.Name() if err := w.f.Close(); err != nil { return errors.Wrap(err, "error closing file") } if err := rotate(name, w.maxFiles); err != nil { return err } file, err := os.OpenFile(name, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640) if err != nil { return err } w.f = file w.currentSize = 0 w.notifyRotate.Publish(struct{}{}) } return nil } func rotate(name string, maxFiles int) error { if maxFiles < 2 { return nil } for i := maxFiles - 1; i > 1; i-- { toPath := name + "." + strconv.Itoa(i) fromPath := name + "." + strconv.Itoa(i-1) if err := os.Rename(fromPath, toPath); err != nil && !os.IsNotExist(err) { return errors.Wrap(err, "error rotating old log entries") } } if err := os.Rename(name, name+".1"); err != nil && !os.IsNotExist(err) { return errors.Wrap(err, "error rotating current log") } return nil } // MaxFiles return maximum number of files func (w *LogFile) MaxFiles() int { return w.maxFiles } // Close closes underlying file and signals all readers to stop. func (w *LogFile) Close() error { w.mu.Lock() defer w.mu.Unlock() if w.closed { return nil } if err := w.f.Close(); err != nil { return err } w.closed = true return nil } // ReadLogs decodes entries from log files and sends them the passed in watcher func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher) { w.mu.RLock() files, err := w.openRotatedFiles() if err != nil { w.mu.RUnlock() watcher.Err <- err return } defer func() { for _, f := range files { f.Close() } }() currentFile, err := os.Open(w.f.Name()) if err != nil { w.mu.RUnlock() watcher.Err <- err return } defer currentFile.Close() currentChunk, err := newSectionReader(currentFile) w.mu.RUnlock() if err != nil { watcher.Err <- err return } if config.Tail != 0 { seekers := make([]io.ReadSeeker, 0, len(files)+1) for _, f := range files { seekers = append(seekers, f) } if currentChunk.Size() > 0 { seekers = append(seekers, currentChunk) } if len(seekers) > 0 { tailFile(multireader.MultiReadSeeker(seekers...), watcher, w.createDecoder, config) } } w.mu.RLock() if !config.Follow || w.closed { w.mu.RUnlock() return } w.mu.RUnlock() notifyRotate := w.notifyRotate.Subscribe() defer w.notifyRotate.Evict(notifyRotate) followLogs(currentFile, watcher, notifyRotate, w.createDecoder, config.Since, config.Until) } func (w *LogFile) openRotatedFiles() (files []*os.File, err error) { defer func() { if err == nil { return } for _, f := range files { f.Close() } }() for i := w.maxFiles; i > 1; i-- { f, err := os.Open(fmt.Sprintf("%s.%d", w.f.Name(), i-1)) if err != nil { if !os.IsNotExist(err) { return nil, err } continue } files = append(files, f) } return files, nil } func newSectionReader(f *os.File) (*io.SectionReader, error) { // seek to the end to get the size // we'll leave this at the end of the file since section reader does not advance the reader size, err := f.Seek(0, os.SEEK_END) if err != nil { return nil, errors.Wrap(err, "error getting current file size") } return io.NewSectionReader(f, 0, size), nil } type decodeFunc func() (*logger.Message, error) func tailFile(f io.ReadSeeker, watcher *logger.LogWatcher, createDecoder makeDecoderFunc, config logger.ReadConfig) { var rdr io.Reader = f if config.Tail > 0 { ls, err := tailfile.TailFile(f, config.Tail) if err != nil { watcher.Err <- err return } rdr = bytes.NewBuffer(bytes.Join(ls, []byte("\n"))) } decodeLogLine := createDecoder(rdr) for { msg, err := decodeLogLine() if err != nil { if err != io.EOF { watcher.Err <- err } return } if !config.Since.IsZero() && msg.Timestamp.Before(config.Since) { continue } if !config.Until.IsZero() && msg.Timestamp.After(config.Until) { return } select { case <-watcher.WatchClose(): return case watcher.Msg <- msg: } } } func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, createDecoder makeDecoderFunc, since, until time.Time) { decodeLogLine := createDecoder(f) name := f.Name() fileWatcher, err := watchFile(name) if err != nil { logWatcher.Err <- err return } defer func() { f.Close() fileWatcher.Remove(name) fileWatcher.Close() }() ctx, cancel := context.WithCancel(context.Background()) defer cancel() go func() { select { case <-logWatcher.WatchClose(): fileWatcher.Remove(name) cancel() case <-ctx.Done(): return } }() var retries int handleRotate := func() error { f.Close() fileWatcher.Remove(name) // retry when the file doesn't exist for retries := 0; retries <= 5; retries++ { f, err = os.Open(name) if err == nil || !os.IsNotExist(err) { break } } if err != nil { return err } if err := fileWatcher.Add(name); err != nil { return err } decodeLogLine = createDecoder(f) return nil } errRetry := errors.New("retry") errDone := errors.New("done") waitRead := func() error { select { case e := <-fileWatcher.Events(): switch e.Op { case fsnotify.Write: decodeLogLine = createDecoder(f) return nil case fsnotify.Rename, fsnotify.Remove: select { case <-notifyRotate: case <-ctx.Done(): return errDone } if err := handleRotate(); err != nil { return err } return nil } return errRetry case err := <-fileWatcher.Errors(): logrus.Debug("logger got error watching file: %v", err) // Something happened, let's try and stay alive and create a new watcher if retries <= 5 { fileWatcher.Close() fileWatcher, err = watchFile(name) if err != nil { return err } retries++ return errRetry } return err case <-ctx.Done(): return errDone } } handleDecodeErr := func(err error) error { if err != io.EOF { return err } for { err := waitRead() if err == nil { break } if err == errRetry { continue } return err } return nil } // main loop for { msg, err := decodeLogLine() if err != nil { if err := handleDecodeErr(err); err != nil { if err == errDone { return } // we got an unrecoverable error, so return logWatcher.Err <- err return } // ready to try again continue } retries = 0 // reset retries since we've succeeded if !since.IsZero() && msg.Timestamp.Before(since) { continue } if !until.IsZero() && msg.Timestamp.After(until) { return } select { case logWatcher.Msg <- msg: case <-ctx.Done(): logWatcher.Msg <- msg for { msg, err := decodeLogLine() if err != nil { return } if !since.IsZero() && msg.Timestamp.Before(since) { continue } if !until.IsZero() && msg.Timestamp.After(until) { return } logWatcher.Msg <- msg } } } } func watchFile(name string) (filenotify.FileWatcher, error) { fileWatcher, err := filenotify.New() if err != nil { return nil, err } logger := logrus.WithFields(logrus.Fields{ "module": "logger", "fille": name, }) if err := fileWatcher.Add(name); err != nil { logger.WithError(err).Warnf("falling back to file poller") fileWatcher.Close() fileWatcher = filenotify.NewPollingWatcher() if err := fileWatcher.Add(name); err != nil { fileWatcher.Close() logger.WithError(err).Debugf("error watching log file for modifications") return nil, err } } return fileWatcher, nil }