首頁>技術>

本文主要研究一下dapr的fswatcher

fswatcher

dapr/pkg/fswatcher/fswatcher.go

import (    "context"    "strings"    "time"    "github.com/fsnotify/fsnotify"    "github.com/pkg/errors")func Watch(ctx context.Context, dir string, eventCh chan<- struct{}) error {    watcher, err := fsnotify.NewWatcher()    if err != nil {        return errors.Wrap(err, "failed to create watcher")    }    defer watcher.Close()    if err := watcher.Add(dir); err != nil {        return errors.Wrap(err, "watcher error")    }LOOP:    for {        select {        // watch for events        case event := <-watcher.Events:            if event.Op == fsnotify.Create || event.Op == fsnotify.Write {                if strings.Contains(event.Name, dir) {                    // give time for other updates to occur                    time.Sleep(time.Second * 1)                    eventCh <- struct{}{}                }            }        case <-watcher.Errors:            break LOOP        case <-ctx.Done():            break LOOP        }    }    return nil}

Watch方法使用fsnotify的watcher來監聽檔案,之後透過for迴圈進行select,如果監聽到fsnotify.Create或者fsnotify.Write的時候判斷event.Name是否包含dir,如果包含則sleep一秒然後通知eventCh

Add

github.com/fsnotify/[email protected]/kqueue.go

// Add starts watching the named file or directory (non-recursively).func (w *Watcher) Add(name string) error {    w.mu.Lock()    w.externalWatches[name] = true    w.mu.Unlock()    _, err := w.addWatch(name, noteAllEvents)    return err}

Add方法設定externalWatches[name]為true,然後執行addWatch(name, noteAllEvents)

addWatch

github.com/fsnotify/[email protected]/kqueue.go

// addWatch adds name to the watched file set.// The flags are interpreted as described in kevent(2).// Returns the real path to the file which was added, if any, which may be different from the one passed in the case of symlinks.func (w *Watcher) addWatch(name string, flags uint32) (string, error) {    var isDir bool    // Make ./name and name equivalent    name = filepath.Clean(name)    w.mu.Lock()    if w.isClosed {        w.mu.Unlock()        return "", errors.New("kevent instance already closed")    }    watchfd, alreadyWatching := w.watches[name]    // We already have a watch, but we can still override flags.    if alreadyWatching {        isDir = w.paths[watchfd].isDir    }    w.mu.Unlock()    if !alreadyWatching {        fi, err := os.Lstat(name)        if err != nil {            return "", err        }        // Don't watch sockets.        if fi.Mode()&os.ModeSocket == os.ModeSocket {            return "", nil        }        // Don't watch named pipes.        if fi.Mode()&os.ModeNamedPipe == os.ModeNamedPipe {            return "", nil        }        // Follow Symlinks        // Unfortunately, Linux can add bogus symlinks to watch list without        // issue, and Windows can't do symlinks period (AFAIK). To  maintain        // consistency, we will act like everything is fine. There will simply        // be no file events for broken symlinks.        // Hence the returns of nil on errors.        if fi.Mode()&os.ModeSymlink == os.ModeSymlink {            name, err = filepath.EvalSymlinks(name)            if err != nil {                return "", nil            }            w.mu.Lock()            _, alreadyWatching = w.watches[name]            w.mu.Unlock()            if alreadyWatching {                return name, nil            }            fi, err = os.Lstat(name)            if err != nil {                return "", nil            }        }        watchfd, err = unix.Open(name, openMode, 0700)        if watchfd == -1 {            return "", err        }        isDir = fi.IsDir()    }    const registerAdd = unix.EV_ADD | unix.EV_CLEAR | unix.EV_ENABLE    if err := register(w.kq, []int{watchfd}, registerAdd, flags); err != nil {        unix.Close(watchfd)        return "", err    }    if !alreadyWatching {        w.mu.Lock()        w.watches[name] = watchfd        w.paths[watchfd] = pathInfo{name: name, isDir: isDir}        w.mu.Unlock()    }    if isDir {        // Watch the directory if it has not been watched before,        // or if it was watched before, but perhaps only a NOTE_DELETE (watchDirectoryFiles)        w.mu.Lock()        watchDir := (flags&unix.NOTE_WRITE) == unix.NOTE_WRITE &&            (!alreadyWatching || (w.dirFlags[name]&unix.NOTE_WRITE) != unix.NOTE_WRITE)        // Store flags so this watch can be updated later        w.dirFlags[name] = flags        w.mu.Unlock()        if watchDir {            if err := w.watchDirectoryFiles(name); err != nil {                return "", err            }        }    }    return name, nil}

addWatch方法針對尚未watch的執行os.Lstat(name)及unix.Open(name, openMode, 0700);之後註冊registerAdd;另外針對isDir的情況執行watchDirectoryFiles

watchDirectoryFiles

github.com/fsnotify/[email protected]/kqueue.go

// watchDirectoryFiles to mimic inotify when adding a watch on a directoryfunc (w *Watcher) watchDirectoryFiles(dirPath string) error {    // Get all files    files, err := ioutil.ReadDir(dirPath)    if err != nil {        return err    }    for _, fileInfo := range files {        filePath := filepath.Join(dirPath, fileInfo.Name())        filePath, err = w.internalWatch(filePath, fileInfo)        if err != nil {            return err        }        w.mu.Lock()        w.fileExists[filePath] = true        w.mu.Unlock()    }    return nil}

watchDirectoryFiles遍歷files,挨個執行internalWatch

internalWatch

github.com/fsnotify/[email protected]/kqueue.go

func (w *Watcher) internalWatch(name string, fileInfo os.FileInfo) (string, error) {    if fileInfo.IsDir() {        // mimic Linux providing delete events for subdirectories        // but preserve the flags used if currently watching subdirectory        w.mu.Lock()        flags := w.dirFlags[name]        w.mu.Unlock()        flags |= unix.NOTE_DELETE | unix.NOTE_RENAME        return w.addWatch(name, flags)    }    // watch file to mimic Linux inotify    return w.addWatch(name, noteAllEvents)}

internalWatch針對dir設定的flag為NOTE_DELETE、NOTE_RENAME

小結

dapr的fswatcher使用fsnotify的watcher來監聽檔案,之後透過for迴圈進行select,如果監聽到fsnotify.Create或者fsnotify.Write的時候判斷event.Name是否包含dir,如果包含則sleep一秒然後通知eventCh。

docdapr

8
最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • lc1780_判斷一個數字是否可以表示成三的冪的和