RabbitMQ是出色的訊息中介軟體,golang理所當然的也支援了。RabbitMQ是一個很棒的pub-sub系統,並且pub-sub已成為微服務中的主要通訊體系結構。在我目前的工作中,我們每天透過Go服務使用RabbitMQ推送數億個社交媒體帖子。讓我們一起來看一下如何使用開源amqp軟體包有效地釋出和訂閱訊息 。
1、概述RabbitMQ需注意的兩個主要實體是routing keys 和queues。服務將訊息(在本例中為JSON格式)釋出到routing keys 。然後RabbitMQ將該訊息複製到 訂閱該routing key的每個queue中。
2、連線首先,沒有理由重新發明輪子。我們將使用streadway提供的amqp包來處理連線細節的細節。在我們的大多數專案中,我們[在專案的內部資料夾中構建一個小Rabbit包。它僅公開了我們專案關心的Rabbit功能。
//internal/rabbit/publish.go // Conn -type Conn struct { Channel *amqp.Channel}// GetConn -func GetConn(rabbitURL string) (Conn, error) { conn, err := amqp.Dial(rabbitURL) if err != nil { return Conn{}, err } ch, err := conn.Channel() return Conn{ Channel: ch, }, err}
該Conn結構僅保持與RabbitMQ伺服器的連線。我們還將公開一種僅使用連線URI來獲得新連線的方法。例如,amqp://username:password@localhost。
3、釋出釋出非常容易,並且是執行緒安全的現成的。在這裡,我們僅公開了使用該連線釋出的另一個功能。呼叫程式碼僅提供路由金鑰和有效負載。
//internal/rabbit/consume.go// Publish -func (conn Conn) Publish(routingKey string, data []byte) error { return conn.Channel.Publish( // exchange - yours may be different "events", routingKey, // mandatory - we don't care if there I no queue false, // immediate - we don't care if there is no consumer on the queue false, amqp.Publishing{ ContentType: "application/json", Body: data, DeliveryMode: amqp.Persistent, })}
這個小型內部程式包的目的是為功能更強大的AMQP程式包設定一些預設值,並控制向我們的應用程式公開哪些功能。例如,如果我們知道我們的應用程式將始終使用events,並且我們希望設定mandatoryorimmediate標誌,則可以在此處使用。
4、消費消費比釋出要難一些。在這裡,我們將使用一個簡單的模式,讓我們的應用程式提供一個處理程式函式,一個佇列,該佇列繫結到的routing key,以及如何在單獨的goroutine中建立最大數量的處理程式。
//internal/rabbit/consume.go// StartConsumer -func (conn Conn) StartConsumer( queueName, routingKey string, handler func(d amqp.Delivery) bool, concurrency int) error { // create the queue if it doesn't already exist _, err := conn.Channel.QueueDeclare(queueName, true, false, false, false, nil) if err != nil { return err } // bind the queue to the routing key err = conn.Channel.QueueBind(queueName, routingKey, "events", false, nil) if err != nil { return err } // prefetch 4x as many messages as we can handle at once prefetchCount := concurrency * 4 err = conn.Channel.Qos(prefetchCount, 0, false) if err != nil { return err } msgs, err := conn.Channel.Consume( queueName, // queue "", // consumer false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) if err != nil { return err } // create a goroutine for the number of concurrent threads requested for i := 0; i < concurrency; i++ { fmt.Printf("Processing messages on thread %v...\n", i) go func() { for msg := range msgs { // if tha handler returns true then ACK, else NACK // the message back into the rabbit queue for // another round of processing if handler(msg) { msg.Ack(false) } else { msg.Nack(false, true) } } fmt.Println("Rabbit consumer closed - critical Error") os.Exit(1) }() } return nil}
如果您擔心速度,至少不用擔心少100的併發速度,這取決於處理程式的計算和記憶體密集程度。假設您的處理程式是以執行緒安全的方式編寫的,這是確保您的應用程式使用其所有可用CPU而不受到I / O瓶頸限制。如果應用程式的處理程式非常快(可能不涉及網路或磁碟),則可能需要將prefetch乘數從4更改為更大的值。prefetchCount告訴Rabbit連線每個請求從伺服器檢索多少訊息。數字越大,等待網路獲取每條訊息的時間越少。
程式在故障發生時,不論是否重啟都應該不受其影響。因此,如果RabbitMQ使用者由於任何原因而失敗,這裡將使用該os.Exit(1)命令。日誌將被收集,然後重新啟動。如果這不適用於您的服務,則可能需要其他更優雅的方法進行處理。