序
本文主要研究一下golang的zap的Sink
Sink[email protected]/sink.go
type Sink interface { zapcore.WriteSyncer io.Closer}type WriteSyncer interface { io.Writer Sync() error}type Writer interface { Write(p []byte) (n int, err error)}type Closer interface { Close() error}
Sink介面內嵌了zapcore.WriteSyncer(Write、Sync)、io.Closer(Close)介面
RegisterSink[email protected]/sink.go
const schemeFile = "file"var ( _sinkMutex sync.RWMutex _sinkFactories map[string]func(*url.URL) (Sink, error) // keyed by scheme)func init() { resetSinkRegistry()}func resetSinkRegistry() { _sinkMutex.Lock() defer _sinkMutex.Unlock() _sinkFactories = map[string]func(*url.URL) (Sink, error){ schemeFile: newFileSink, }}func RegisterSink(scheme string, factory func(*url.URL) (Sink, error)) error { _sinkMutex.Lock() defer _sinkMutex.Unlock() if scheme == "" { return errors.New("can't register a sink factory for empty string") } normalized, err := normalizeScheme(scheme) if err != nil { return fmt.Errorf("%q is not a valid scheme: %v", scheme, err) } if _, ok := _sinkFactories[normalized]; ok { return fmt.Errorf("sink factory already registered for scheme %q", normalized) } _sinkFactories[normalized] = factory return nil}
RegisterSink方法會往_sinkFactories註冊指定scheme的sink factory,該factory接收url.URL返回Sink;resetSinkRegistry方法預設註冊了scheme為file的newFileSink
newFileSink[email protected]/sink.go
func newFileSink(u *url.URL) (Sink, error) { if u.User != nil { return nil, fmt.Errorf("user and password not allowed with file URLs: got %v", u) } if u.Fragment != "" { return nil, fmt.Errorf("fragments not allowed with file URLs: got %v", u) } if u.RawQuery != "" { return nil, fmt.Errorf("query parameters not allowed with file URLs: got %v", u) } // Error messages are better if we check hostname and port separately. if u.Port() != "" { return nil, fmt.Errorf("ports not allowed with file URLs: got %v", u) } if hn := u.Hostname(); hn != "" && hn != "localhost" { return nil, fmt.Errorf("file URLs must leave host empty or use localhost: got %v", u) } switch u.Path { case "stdout": return nopCloserSink{os.Stdout}, nil case "stderr": return nopCloserSink{os.Stderr}, nil } return os.OpenFile(u.Path, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)}
newFileSink使用os.OpenFile建立*os.File,由於*os.File擁有Write、Sync、Close方法,因而它實現了Sink介面
newSink[email protected]/sink.go
func newSink(rawURL string) (Sink, error) { u, err := url.Parse(rawURL) if err != nil { return nil, fmt.Errorf("can't parse %q as a URL: %v", rawURL, err) } if u.Scheme == "" { u.Scheme = schemeFile } _sinkMutex.RLock() factory, ok := _sinkFactories[u.Scheme] _sinkMutex.RUnlock() if !ok { return nil, &errSinkNotFound{u.Scheme} } return factory(u)}
newSink方法會根據rawURL解析對應的scheme,如果scheme為空則預設為file,然後從_sinkFactories找到對應的factory,建立sink返回
open[email protected]/writer.go
func Open(paths ...string) (zapcore.WriteSyncer, func(), error) { writers, close, err := open(paths) if err != nil { return nil, nil, err } writer := CombineWriteSyncers(writers...) return writer, close, nil}func open(paths []string) ([]zapcore.WriteSyncer, func(), error) { writers := make([]zapcore.WriteSyncer, 0, len(paths)) closers := make([]io.Closer, 0, len(paths)) close := func() { for _, c := range closers { c.Close() } } var openErr error for _, path := range paths { sink, err := newSink(path) if err != nil { openErr = multierr.Append(openErr, fmt.Errorf("couldn't open sink %q: %v", path, err)) continue } writers = append(writers, sink) closers = append(closers, sink) } if openErr != nil { close() return writers, nil, openErr } return writers, close, nil}
zap.Open方法會使用newSink來建立sink作為zapcore.WriteSyncer
例項func registerSinkDemo() { zap.RegisterSink("mq", mq.NewMqSink) writer, close, err := zap.Open("mq://192.168.99.100:9876/log") if err != nil { panic(err) } defer close() logger := zap.New(zapcore.NewCore(zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()), writer, zap.DebugLevel)).Sugar() logger.Info("hello")}type MqWriteSyncer struct { topic string producer rocketmq.Producer ctx context.Context}func (m *MqWriteSyncer) Close() error { return m.producer.Shutdown()}func (m *MqWriteSyncer) Write(p []byte) (n int, err error) { msg := &primitive.Message{ Topic: m.topic, Body: p, } err = m.producer.SendOneWay(m.ctx, msg) return len(p), err}func (m *MqWriteSyncer) Sync() error { return nil}func NewMqSink(url *url.URL) (zap.Sink, error) { broker := fmt.Sprintf("%s:%s", url.Hostname(), url.Port()) topic := url.Path[1:len(url.Path)] p, _ := rocketmq.NewProducer( producer.WithNameServer([]string{broker}), producer.WithRetry(2), ) err := p.Start() if err != nil { fmt.Printf("start producer error: %s", err.Error()) return nil, err } return &MqWriteSyncer{producer: p, ctx: context.Background(), topic: topic}, nil}
這裡透過zap.RegisterSink來註冊一個mq的sink factory,然後透過zap.Open來建立MqWriteSyncer;MqWriteSyncer實現了zapcore.WriteSyncer的Write、Sync方法,同時也實現了Sink的Close方法
小結Sink介面內嵌了zapcore.WriteSyncer(Write、Sync)、io.Closer(Close)介面;zap.RegisterSink用於註冊指定scheme的sink factory,而zap.Open則會解析url來找到對應的sink factory建立對應的sink,即writer。
doczap