首頁>技術>

本文主要研究一下golang的zap的Sink

Sink

[email protected]/sink.go

type Sink interface {    zapcore.WriteSyncer    io.Closer}type WriteSyncer interface {    io.Writer    Sync() error}type Writer interface {    Write(p []byte) (n int, err error)}type Closer interface {    Close() error}

Sink介面內嵌了zapcore.WriteSyncer(Write、Sync)、io.Closer(Close)介面

RegisterSink

[email protected]/sink.go

const schemeFile = "file"var (    _sinkMutex     sync.RWMutex    _sinkFactories map[string]func(*url.URL) (Sink, error) // keyed by scheme)func init() {    resetSinkRegistry()}func resetSinkRegistry() {    _sinkMutex.Lock()    defer _sinkMutex.Unlock()    _sinkFactories = map[string]func(*url.URL) (Sink, error){        schemeFile: newFileSink,    }}func RegisterSink(scheme string, factory func(*url.URL) (Sink, error)) error {    _sinkMutex.Lock()    defer _sinkMutex.Unlock()    if scheme == "" {        return errors.New("can't register a sink factory for empty string")    }    normalized, err := normalizeScheme(scheme)    if err != nil {        return fmt.Errorf("%q is not a valid scheme: %v", scheme, err)    }    if _, ok := _sinkFactories[normalized]; ok {        return fmt.Errorf("sink factory already registered for scheme %q", normalized)    }    _sinkFactories[normalized] = factory    return nil}

RegisterSink方法會往_sinkFactories註冊指定scheme的sink factory,該factory接收url.URL返回Sink;resetSinkRegistry方法預設註冊了scheme為file的newFileSink

newFileSink

[email protected]/sink.go

func newFileSink(u *url.URL) (Sink, error) {    if u.User != nil {        return nil, fmt.Errorf("user and password not allowed with file URLs: got %v", u)    }    if u.Fragment != "" {        return nil, fmt.Errorf("fragments not allowed with file URLs: got %v", u)    }    if u.RawQuery != "" {        return nil, fmt.Errorf("query parameters not allowed with file URLs: got %v", u)    }    // Error messages are better if we check hostname and port separately.    if u.Port() != "" {        return nil, fmt.Errorf("ports not allowed with file URLs: got %v", u)    }    if hn := u.Hostname(); hn != "" && hn != "localhost" {        return nil, fmt.Errorf("file URLs must leave host empty or use localhost: got %v", u)    }    switch u.Path {    case "stdout":        return nopCloserSink{os.Stdout}, nil    case "stderr":        return nopCloserSink{os.Stderr}, nil    }    return os.OpenFile(u.Path, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)}

newFileSink使用os.OpenFile建立*os.File,由於*os.File擁有Write、Sync、Close方法,因而它實現了Sink介面

newSink

[email protected]/sink.go

func newSink(rawURL string) (Sink, error) {    u, err := url.Parse(rawURL)    if err != nil {        return nil, fmt.Errorf("can't parse %q as a URL: %v", rawURL, err)    }    if u.Scheme == "" {        u.Scheme = schemeFile    }    _sinkMutex.RLock()    factory, ok := _sinkFactories[u.Scheme]    _sinkMutex.RUnlock()    if !ok {        return nil, &errSinkNotFound{u.Scheme}    }    return factory(u)}

newSink方法會根據rawURL解析對應的scheme,如果scheme為空則預設為file,然後從_sinkFactories找到對應的factory,建立sink返回

open

[email protected]/writer.go

func Open(paths ...string) (zapcore.WriteSyncer, func(), error) {    writers, close, err := open(paths)    if err != nil {        return nil, nil, err    }    writer := CombineWriteSyncers(writers...)    return writer, close, nil}func open(paths []string) ([]zapcore.WriteSyncer, func(), error) {    writers := make([]zapcore.WriteSyncer, 0, len(paths))    closers := make([]io.Closer, 0, len(paths))    close := func() {        for _, c := range closers {            c.Close()        }    }    var openErr error    for _, path := range paths {        sink, err := newSink(path)        if err != nil {            openErr = multierr.Append(openErr, fmt.Errorf("couldn't open sink %q: %v", path, err))            continue        }        writers = append(writers, sink)        closers = append(closers, sink)    }    if openErr != nil {        close()        return writers, nil, openErr    }    return writers, close, nil}

zap.Open方法會使用newSink來建立sink作為zapcore.WriteSyncer

例項
func registerSinkDemo() {    zap.RegisterSink("mq", mq.NewMqSink)    writer, close, err := zap.Open("mq://192.168.99.100:9876/log")    if err != nil {        panic(err)    }    defer close()    logger := zap.New(zapcore.NewCore(zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()), writer, zap.DebugLevel)).Sugar()    logger.Info("hello")}type MqWriteSyncer struct {    topic    string    producer rocketmq.Producer    ctx      context.Context}func (m *MqWriteSyncer) Close() error {    return m.producer.Shutdown()}func (m *MqWriteSyncer) Write(p []byte) (n int, err error) {    msg := &primitive.Message{        Topic: m.topic,        Body:  p,    }    err = m.producer.SendOneWay(m.ctx, msg)    return len(p), err}func (m *MqWriteSyncer) Sync() error {    return nil}func NewMqSink(url *url.URL) (zap.Sink, error) {    broker := fmt.Sprintf("%s:%s", url.Hostname(), url.Port())    topic := url.Path[1:len(url.Path)]    p, _ := rocketmq.NewProducer(        producer.WithNameServer([]string{broker}),        producer.WithRetry(2),    )    err := p.Start()    if err != nil {        fmt.Printf("start producer error: %s", err.Error())        return nil, err    }    return &MqWriteSyncer{producer: p, ctx: context.Background(), topic: topic}, nil}

這裡透過zap.RegisterSink來註冊一個mq的sink factory,然後透過zap.Open來建立MqWriteSyncer;MqWriteSyncer實現了zapcore.WriteSyncer的Write、Sync方法,同時也實現了Sink的Close方法

小結

Sink介面內嵌了zapcore.WriteSyncer(Write、Sync)、io.Closer(Close)介面;zap.RegisterSink用於註冊指定scheme的sink factory,而zap.Open則會解析url來找到對應的sink factory建立對應的sink,即writer。

doczap

16
最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • 快速上手大型軟體專案有妙招:理清程序、執行緒、佇列及元件間通訊