file watcher to be used for configuration auto-reload feature (#12301)

* add config watcher to the config package

* add logging to watcher

* add test and refactor to add WatcherEvent.

* add all API calls and fix a bug with recreated files

* add tests for watcher

* remove the unnecessary use of context

* Add debug log and a test for file rename

* use inode to detect if the file is recreated/replaced and only listen to create events.

* tidy ups (#1535)

* tidy ups

* Add tests for inode reconcile

* fix linux vs windows syscall

* fix linux vs windows syscall

* fix windows compile error

* increase timeout

* use ctime ID

* remove remove/creation test as it's a use case that fail in linux

* fix linux/windows to use Ino/CreationTime

* fix the watcher to only overwrite current file id

* fix linter error

* fix remove/create test

* set reconcile loop to 200 Milliseconds

* fix watcher to not trigger event on remove, add more tests

* on a remove event try to add the file back to the watcher and trigger the handler if success

* fix race condition

* fix flaky test

* fix race conditions

* set level to info

* fix when file is removed and get an event for it after

* fix to trigger handler when we get a remove but re-add fail

* fix error message

* add tests for directory watch and fixes

* detect if a file is a symlink and return an error on Add

* rename Watcher to FileWatcher and remove symlink deref

* add fsnotify@v1.5.1

* fix go mod

* fix flaky test

* Apply suggestions from code review

Co-authored-by: Ashwin Venkatesh <ashwin@hashicorp.com>

* fix a possible stack overflow

* do not reset timer on errors, rename OS specific files

* start the watcher when creating it

* fix data race in tests

* rename New func

* do not call handler when a remove event happen

* events trigger on write and rename

* fix watcher tests

* make handler async

* remove recursive call

* do not produce events for sub directories

* trim "/" at the end of a directory when adding

* add missing test

* fix logging

* add todo

* fix failing test

* fix flaking tests

* fix flaky test

* add logs

* fix log text

* increase timeout

* reconcile when remove

* check reconcile when removed

* fix reconcile move test

* fix logging

* delete invalid file

* Apply suggestions from code review

Co-authored-by: R.B. Boyer <4903+rboyer@users.noreply.github.com>

* fix review comments

* fix is watched to properly catch a remove

* change test timeout

* fix test and rename id

* fix test to create files with different mod time.

* fix deadlock when stopping watcher

* Apply suggestions from code review

Co-authored-by: R.B. Boyer <4903+rboyer@users.noreply.github.com>

* fix a deadlock when calling stop while emitting event is blocked

* make sure to close the event channel after the event loop is done

* add go doc

* back date file instead of sleeping

* Apply suggestions from code review

Co-authored-by: R.B. Boyer <4903+rboyer@users.noreply.github.com>

* check error

Co-authored-by: Ashwin Venkatesh <ashwin@hashicorp.com>
Co-authored-by: R.B. Boyer <4903+rboyer@users.noreply.github.com>
This commit is contained in:
Dhia Ayachi 2022-02-21 11:36:52 -05:00 committed by GitHub
parent 73b6687c5b
commit cd9d8d44a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 590 additions and 0 deletions

View File

@ -0,0 +1,249 @@
package config
import (
"context"
"fmt"
"os"
"path/filepath"
"sync"
"time"
"github.com/fsnotify/fsnotify"
"github.com/hashicorp/go-hclog"
)
const timeoutDuration = 200 * time.Millisecond
type FileWatcher struct {
watcher *fsnotify.Watcher
configFiles map[string]*watchedFile
logger hclog.Logger
reconcileTimeout time.Duration
cancel context.CancelFunc
done chan interface{}
stopOnce sync.Once
//EventsCh Channel where an event will be emitted when a file change is detected
// a call to Start is needed before any event is emitted
// after a Call to Stop succeed, the channel will be closed
EventsCh chan *FileWatcherEvent
}
type watchedFile struct {
modTime time.Time
}
type FileWatcherEvent struct {
Filename string
}
//NewFileWatcher create a file watcher that will watch all the files/folders from configFiles
// if success a FileWatcher will be returned and a nil error
// otherwise an error and a nil FileWatcher are returned
func NewFileWatcher(configFiles []string, logger hclog.Logger) (*FileWatcher, error) {
ws, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
}
w := &FileWatcher{
watcher: ws,
logger: logger.Named("file-watcher"),
configFiles: make(map[string]*watchedFile),
EventsCh: make(chan *FileWatcherEvent),
reconcileTimeout: timeoutDuration,
done: make(chan interface{}),
stopOnce: sync.Once{},
}
for _, f := range configFiles {
err = w.add(f)
if err != nil {
return nil, fmt.Errorf("error adding file %q: %w", f, err)
}
}
return w, nil
}
// Start start a file watcher, with a copy of the passed context.
// calling Start multiple times is a noop
func (w *FileWatcher) Start(ctx context.Context) {
if w.cancel == nil {
cancelCtx, cancel := context.WithCancel(ctx)
w.cancel = cancel
go w.watch(cancelCtx)
}
}
// Stop the file watcher
// calling Stop multiple times is a noop, Stop must be called after a Start
func (w *FileWatcher) Stop() error {
var err error
w.stopOnce.Do(func() {
w.cancel()
<-w.done
close(w.EventsCh)
err = w.watcher.Close()
})
return err
}
func (w *FileWatcher) add(filename string) error {
if isSymLink(filename) {
return fmt.Errorf("symbolic links are not supported %s", filename)
}
filename = filepath.Clean(filename)
w.logger.Trace("adding file", "file", filename)
if err := w.watcher.Add(filename); err != nil {
return err
}
modTime, err := w.getFileModifiedTime(filename)
if err != nil {
return err
}
w.configFiles[filename] = &watchedFile{modTime: modTime}
return nil
}
func isSymLink(filename string) bool {
fi, err := os.Lstat(filename)
if err != nil {
return false
}
if fi.Mode()&os.ModeSymlink != 0 {
return true
}
return false
}
func (w *FileWatcher) watch(ctx context.Context) {
ticker := time.NewTicker(w.reconcileTimeout)
defer ticker.Stop()
defer close(w.done)
for {
select {
case event, ok := <-w.watcher.Events:
if !ok {
w.logger.Error("watcher event channel is closed")
return
}
w.logger.Trace("received watcher event", "event", event)
if err := w.handleEvent(ctx, event); err != nil {
w.logger.Error("error handling watcher event", "error", err, "event", event)
}
case _, ok := <-w.watcher.Errors:
if !ok {
w.logger.Error("watcher error channel is closed")
return
}
case <-ticker.C:
w.reconcile(ctx)
case <-ctx.Done():
return
}
}
}
func (w *FileWatcher) handleEvent(ctx context.Context, event fsnotify.Event) error {
w.logger.Trace("event received ", "filename", event.Name, "OP", event.Op)
// we only want Create and Remove events to avoid triggering a reload on file modification
if !isCreateEvent(event) && !isRemoveEvent(event) && !isWriteEvent(event) && !isRenameEvent(event) {
return nil
}
filename := filepath.Clean(event.Name)
configFile, basename, ok := w.isWatched(filename)
if !ok {
return fmt.Errorf("file %s is not watched", event.Name)
}
// we only want to update mod time and re-add if the event is on the watched file itself
if filename == basename {
if isRemoveEvent(event) {
// If the file was removed, try to reconcile and see if anything changed.
w.logger.Trace("attempt a reconcile ", "filename", event.Name, "OP", event.Op)
configFile.modTime = time.Time{}
w.reconcile(ctx)
}
}
if isCreateEvent(event) || isWriteEvent(event) || isRenameEvent(event) {
w.logger.Trace("call the handler", "filename", event.Name, "OP", event.Op)
select {
case w.EventsCh <- &FileWatcherEvent{Filename: filename}:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}
func (w *FileWatcher) isWatched(filename string) (*watchedFile, string, bool) {
path := filename
configFile, ok := w.configFiles[path]
if ok {
return configFile, path, true
}
stat, err := os.Lstat(filename)
// if the error is a not exist still try to find if the event for a configured file
if os.IsNotExist(err) || (!stat.IsDir() && stat.Mode()&os.ModeSymlink == 0) {
w.logger.Trace("not a dir and not a symlink to a dir")
// try to see if the watched path is the parent dir
newPath := filepath.Dir(path)
w.logger.Trace("get dir", "dir", newPath)
configFile, ok = w.configFiles[newPath]
}
return configFile, path, ok
}
func (w *FileWatcher) reconcile(ctx context.Context) {
for filename, configFile := range w.configFiles {
w.logger.Trace("reconciling", "filename", filename)
newModTime, err := w.getFileModifiedTime(filename)
if err != nil {
w.logger.Error("failed to get file modTime", "file", filename, "err", err)
continue
}
err = w.watcher.Add(filename)
if err != nil {
w.logger.Error("failed to add file to watcher", "file", filename, "err", err)
continue
}
if !configFile.modTime.Equal(newModTime) {
w.logger.Trace("call the handler", "filename", filename, "old modTime", configFile.modTime, "new modTime", newModTime)
w.configFiles[filename].modTime = newModTime
select {
case w.EventsCh <- &FileWatcherEvent{Filename: filename}:
case <-ctx.Done():
return
}
}
}
}
func isCreateEvent(event fsnotify.Event) bool {
return event.Op&fsnotify.Create == fsnotify.Create
}
func isRemoveEvent(event fsnotify.Event) bool {
return event.Op&fsnotify.Remove == fsnotify.Remove
}
func isWriteEvent(event fsnotify.Event) bool {
return event.Op&fsnotify.Write == fsnotify.Write
}
func isRenameEvent(event fsnotify.Event) bool {
return event.Op&fsnotify.Rename == fsnotify.Rename
}
func (w *FileWatcher) getFileModifiedTime(filename string) (time.Time, error) {
fileInfo, err := os.Stat(filename)
if err != nil {
return time.Time{}, err
}
return fileInfo.ModTime(), err
}

View File

@ -0,0 +1,337 @@
package config
import (
"context"
"fmt"
"math/rand"
"os"
"strings"
"testing"
"time"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/stretchr/testify/require"
)
const defaultTimeout = 500 * time.Millisecond
func TestNewWatcher(t *testing.T) {
w, err := NewFileWatcher([]string{}, hclog.New(&hclog.LoggerOptions{}))
require.NoError(t, err)
require.NotNil(t, w)
}
func TestWatcherRenameEvent(t *testing.T) {
fileTmp := createTempConfigFile(t, "temp_config3")
filepaths := []string{createTempConfigFile(t, "temp_config1"), createTempConfigFile(t, "temp_config2")}
w, err := NewFileWatcher(filepaths, hclog.New(&hclog.LoggerOptions{}))
require.NoError(t, err)
w.Start(context.Background())
defer func() {
_ = w.Stop()
}()
require.NoError(t, err)
err = os.Rename(fileTmp, filepaths[0])
require.NoError(t, err)
require.NoError(t, assertEvent(filepaths[0], w.EventsCh, defaultTimeout))
// make sure we consume all events
assertEvent(filepaths[0], w.EventsCh, defaultTimeout)
}
func TestWatcherAddNotExist(t *testing.T) {
file := testutil.TempFile(t, "temp_config")
filename := file.Name() + randomStr(16)
w, err := NewFileWatcher([]string{filename}, hclog.New(&hclog.LoggerOptions{}))
require.Error(t, err, "no such file or directory")
require.Nil(t, w)
}
func TestEventWatcherWrite(t *testing.T) {
file := testutil.TempFile(t, "temp_config")
_, err := file.WriteString("test config")
require.NoError(t, err)
err = file.Sync()
require.NoError(t, err)
w, err := NewFileWatcher([]string{file.Name()}, hclog.New(&hclog.LoggerOptions{}))
require.NoError(t, err)
w.Start(context.Background())
defer func() {
_ = w.Stop()
}()
_, err = file.WriteString("test config 2")
require.NoError(t, err)
err = file.Sync()
require.NoError(t, err)
require.NoError(t, assertEvent(file.Name(), w.EventsCh, defaultTimeout))
}
func TestEventWatcherRead(t *testing.T) {
filepath := createTempConfigFile(t, "temp_config1")
w, err := NewFileWatcher([]string{filepath}, hclog.New(&hclog.LoggerOptions{}))
require.NoError(t, err)
w.Start(context.Background())
defer func() {
_ = w.Stop()
}()
_, err = os.ReadFile(filepath)
require.NoError(t, err)
require.Error(t, assertEvent(filepath, w.EventsCh, defaultTimeout), "timedout waiting for event")
}
func TestEventWatcherChmod(t *testing.T) {
file := testutil.TempFile(t, "temp_config")
defer func() {
err := file.Close()
require.NoError(t, err)
}()
_, err := file.WriteString("test config")
require.NoError(t, err)
err = file.Sync()
require.NoError(t, err)
w, err := NewFileWatcher([]string{file.Name()}, hclog.New(&hclog.LoggerOptions{}))
require.NoError(t, err)
w.Start(context.Background())
defer func() {
_ = w.Stop()
}()
err = file.Chmod(0777)
require.NoError(t, err)
require.Error(t, assertEvent(file.Name(), w.EventsCh, defaultTimeout), "timedout waiting for event")
}
func TestEventWatcherRemoveCreate(t *testing.T) {
filepath := createTempConfigFile(t, "temp_config1")
w, err := NewFileWatcher([]string{filepath}, hclog.New(&hclog.LoggerOptions{}))
require.NoError(t, err)
w.Start(context.Background())
defer func() {
_ = w.Stop()
}()
require.NoError(t, err)
err = os.Remove(filepath)
require.NoError(t, err)
recreated, err := os.Create(filepath)
require.NoError(t, err)
_, err = recreated.WriteString("config 2")
require.NoError(t, err)
err = recreated.Sync()
require.NoError(t, err)
// this an event coming from the reconcile loop
require.NoError(t, assertEvent(filepath, w.EventsCh, defaultTimeout))
}
func TestEventWatcherMove(t *testing.T) {
filepath := createTempConfigFile(t, "temp_config1")
w, err := NewFileWatcher([]string{filepath}, hclog.New(&hclog.LoggerOptions{}))
require.NoError(t, err)
w.Start(context.Background())
defer func() {
_ = w.Stop()
}()
for i := 0; i < 10; i++ {
filepath2 := createTempConfigFile(t, "temp_config2")
err = os.Rename(filepath2, filepath)
require.NoError(t, err)
require.NoError(t, assertEvent(filepath, w.EventsCh, defaultTimeout))
}
}
func TestEventReconcileMove(t *testing.T) {
filepath := createTempConfigFile(t, "temp_config1")
filepath2 := createTempConfigFile(t, "temp_config2")
err := os.Chtimes(filepath, time.Now(), time.Now().Add(-1*time.Second))
require.NoError(t, err)
w, err := NewFileWatcher([]string{filepath}, hclog.New(&hclog.LoggerOptions{}))
require.NoError(t, err)
w.Start(context.Background())
defer func() {
_ = w.Stop()
}()
// remove the file from the internal watcher to only trigger the reconcile
err = w.watcher.Remove(filepath)
require.NoError(t, err)
err = os.Rename(filepath2, filepath)
require.NoError(t, err)
require.NoError(t, assertEvent(filepath, w.EventsCh, 2000*time.Millisecond))
}
func TestEventWatcherDirCreateRemove(t *testing.T) {
filepath := testutil.TempDir(t, "temp_config1")
w, err := NewFileWatcher([]string{filepath}, hclog.New(&hclog.LoggerOptions{}))
require.NoError(t, err)
w.Start(context.Background())
defer func() {
_ = w.Stop()
}()
for i := 0; i < 1; i++ {
name := filepath + "/" + randomStr(20)
file, err := os.Create(name)
require.NoError(t, err)
err = file.Close()
require.NoError(t, err)
require.NoError(t, assertEvent(filepath, w.EventsCh, defaultTimeout))
err = os.Remove(name)
require.NoError(t, err)
require.NoError(t, assertEvent(filepath, w.EventsCh, defaultTimeout))
}
}
func TestEventWatcherDirMove(t *testing.T) {
filepath := testutil.TempDir(t, "temp_config1")
name := filepath + "/" + randomStr(20)
file, err := os.Create(name)
require.NoError(t, err)
err = file.Close()
require.NoError(t, err)
w, err := NewFileWatcher([]string{filepath}, hclog.New(&hclog.LoggerOptions{}))
require.NoError(t, err)
w.Start(context.Background())
defer func() {
_ = w.Stop()
}()
for i := 0; i < 100; i++ {
filepathTmp := createTempConfigFile(t, "temp_config2")
os.Rename(filepathTmp, name)
require.NoError(t, err)
require.NoError(t, assertEvent(filepath, w.EventsCh, defaultTimeout))
}
}
func TestEventWatcherDirMoveTrim(t *testing.T) {
filepath := testutil.TempDir(t, "temp_config1")
name := filepath + "/" + randomStr(20)
file, err := os.Create(name)
require.NoError(t, err)
err = file.Close()
require.NoError(t, err)
w, err := NewFileWatcher([]string{filepath + "/"}, hclog.New(&hclog.LoggerOptions{}))
require.NoError(t, err)
w.Start(context.Background())
defer func() {
_ = w.Stop()
}()
for i := 0; i < 100; i++ {
filepathTmp := createTempConfigFile(t, "temp_config2")
os.Rename(filepathTmp, name)
require.NoError(t, err)
require.NoError(t, assertEvent(filepath, w.EventsCh, defaultTimeout))
}
}
// Consul do not support configuration in sub-directories
func TestEventWatcherSubDirMove(t *testing.T) {
filepath := testutil.TempDir(t, "temp_config1")
err := os.Mkdir(filepath+"/temp", 0777)
require.NoError(t, err)
name := filepath + "/temp/" + randomStr(20)
file, err := os.Create(name)
require.NoError(t, err)
err = file.Close()
require.NoError(t, err)
w, err := NewFileWatcher([]string{filepath}, hclog.New(&hclog.LoggerOptions{}))
require.NoError(t, err)
w.Start(context.Background())
defer func() {
_ = w.Stop()
}()
for i := 0; i < 2; i++ {
filepathTmp := createTempConfigFile(t, "temp_config2")
os.Rename(filepathTmp, name)
require.NoError(t, err)
require.Error(t, assertEvent(filepath, w.EventsCh, defaultTimeout), "timedout waiting for event")
}
}
func TestEventWatcherDirRead(t *testing.T) {
filepath := testutil.TempDir(t, "temp_config1")
name := filepath + "/" + randomStr(20)
file, err := os.Create(name)
require.NoError(t, err)
err = file.Close()
require.NoError(t, err)
w, err := NewFileWatcher([]string{filepath}, hclog.New(&hclog.LoggerOptions{}))
require.NoError(t, err)
w.Start(context.Background())
t.Cleanup(func() {
_ = w.Stop()
})
_, err = os.ReadFile(name)
require.NoError(t, err)
require.Error(t, assertEvent(filepath, w.EventsCh, defaultTimeout), "timedout waiting for event")
}
func TestEventWatcherMoveSoftLink(t *testing.T) {
filepath := createTempConfigFile(t, "temp_config1")
tempDir := testutil.TempDir(t, "temp_dir")
name := tempDir + "/" + randomStr(20)
err := os.Symlink(filepath, name)
require.NoError(t, err)
w, err := NewFileWatcher([]string{name}, hclog.New(&hclog.LoggerOptions{}))
require.Error(t, err, "symbolic link are not supported")
require.Nil(t, w)
}
func assertEvent(name string, watcherCh chan *FileWatcherEvent, timeout time.Duration) error {
select {
case ev := <-watcherCh:
if ev.Filename != name && !strings.Contains(ev.Filename, name) {
return fmt.Errorf("filename do not match %s %s", ev.Filename, name)
}
return nil
case <-time.After(timeout):
return fmt.Errorf("timedout waiting for event")
}
}
func createTempConfigFile(t *testing.T, filename string) string {
file := testutil.TempFile(t, filename)
_, err1 := file.WriteString("test config")
err2 := file.Close()
require.NoError(t, err1)
require.NoError(t, err2)
return file.Name()
}
func randomStr(length int) string {
const charset = "abcdefghijklmnopqrstuvwxyz" +
"ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
var seededRand *rand.Rand = rand.New(
rand.NewSource(time.Now().UnixNano()))
b := make([]byte, length)
for i := range b {
b[i] = charset[seededRand.Intn(len(charset))]
}
return string(b)
}

1
go.mod
View File

@ -22,6 +22,7 @@ require (
github.com/elazarl/go-bindata-assetfs v0.0.0-20160803192304-e1a2a7ec64b0
github.com/envoyproxy/go-control-plane v0.9.5
github.com/frankban/quicktest v1.11.0 // indirect
github.com/fsnotify/fsnotify v1.5.1
github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.3.5
github.com/google/go-cmp v0.5.6

3
go.sum
View File

@ -142,6 +142,8 @@ github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga
github.com/frankban/quicktest v1.11.0 h1:Yyrghcw93e1jKo4DTZkRFTTFvBsVhzbblBUPNU1vW6Q=
github.com/frankban/quicktest v1.11.0/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI=
github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU=
github.com/ghodss/yaml v0.0.0-20150909031657-73d445a93680/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-asn1-ber/asn1-ber v1.3.1/go.mod h1:hEBeB/ic+5LoWskz+yKT7vGhhPYkProFKoKdwZRWMe0=
@ -649,6 +651,7 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210303074136-134d130e1a04/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211013075003-97ac67df715c h1:taxlMj0D/1sOAuv/CbSD+MMDof2vbyPTqz5FNYKpXt8=
golang.org/x/sys v0.0.0-20211013075003-97ac67df715c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=