首頁>技術>

RabbitMQ是出色的訊息中介軟體,golang理所當然的也支援了。RabbitMQ是一個很棒的pub-sub系統,並且pub-sub已成為微服務中的主要通訊體系結構。在我目前的工作中,我們每天透過Go服務使用RabbitMQ推送數億個社交媒體帖子。讓我們一起來看一下如何使用開源amqp軟體包有效地釋出和訂閱訊息 。

1、概述

RabbitMQ需注意的兩個主要實體是routing keysqueues。服務將訊息(在本例中為JSON格式)釋出到routing keys 。然後RabbitMQ將該訊息複製到 訂閱該routing key的每個queue中。

2、連線

首先,沒有理由重新發明輪子。我們將使用streadway提供的amqp包來處理連線細節的細節。在我們的大多數專案中,我們[在專案的內部資料夾中構建一個小Rabbit包。它僅公開了我們專案關心的Rabbit功能。

//internal/rabbit/publish.go // Conn -type Conn struct {    Channel *amqp.Channel}// GetConn -func GetConn(rabbitURL string) (Conn, error) {    conn, err := amqp.Dial(rabbitURL)    if err != nil {        return Conn{}, err    }    ch, err := conn.Channel()    return Conn{        Channel: ch,    }, err}

該Conn結構僅保持與RabbitMQ伺服器的連線。我們還將公開一種僅使用連線URI來獲得新連線的方法。例如,amqp://username:password@localhost。

3、釋出

釋出非常容易,並且是執行緒安全的現成的。在這裡,我們僅公開了使用該連線釋出的另一個功能。呼叫程式碼僅提供路由金鑰和有效負載。

//internal/rabbit/consume.go// Publish -func (conn Conn) Publish(routingKey string, data []byte) error {    return conn.Channel.Publish(        // exchange - yours may be different        "events",        routingKey,        // mandatory - we don't care if there I no queue        false,        // immediate - we don't care if there is no consumer on the queue        false,        amqp.Publishing{            ContentType:  "application/json",            Body:         data,            DeliveryMode: amqp.Persistent,        })}

這個小型內部程式包的目的是為功能更強大的AMQP程式包設定一些預設值,並控制向我們的應用程式公開哪些功能。例如,如果我們知道我們的應用程式將始終使用events,並且我們希望設定mandatoryorimmediate標誌,則可以在此處使用。

4、消費

消費比釋出要難一些。在這裡,我們將使用一個簡單的模式,讓我們的應用程式提供一個處理程式函式,一個佇列,該佇列繫結到的routing key,以及如何在單獨的goroutine中建立最大數量的處理程式。

//internal/rabbit/consume.go// StartConsumer -func (conn Conn) StartConsumer(    queueName,    routingKey string,    handler func(d amqp.Delivery) bool,    concurrency int) error {    // create the queue if it doesn't already exist    _, err := conn.Channel.QueueDeclare(queueName, true, false, false, false, nil)    if err != nil {        return err    }    // bind the queue to the routing key    err = conn.Channel.QueueBind(queueName, routingKey, "events", false, nil)    if err != nil {        return err    }    // prefetch 4x as many messages as we can handle at once    prefetchCount := concurrency * 4    err = conn.Channel.Qos(prefetchCount, 0, false)    if err != nil {        return err    }    msgs, err := conn.Channel.Consume(        queueName, // queue        "",        // consumer        false,     // auto-ack        false,     // exclusive        false,     // no-local        false,     // no-wait        nil,       // args    )    if err != nil {        return err    }    // create a goroutine for the number of concurrent threads requested    for i := 0; i < concurrency; i++ {        fmt.Printf("Processing messages on thread %v...\n", i)        go func() {            for msg := range msgs {                // if tha handler returns true then ACK, else NACK                // the message back into the rabbit queue for                // another round of processing                if handler(msg) {                    msg.Ack(false)                } else {                    msg.Nack(false, true)                }            }            fmt.Println("Rabbit consumer closed - critical Error")            os.Exit(1)        }()    }    return nil}

如果您擔心速度,至少不用擔心少100的併發速度,這取決於處理程式的計算和記憶體密集程度。假設您的處理程式是以執行緒安全的方式編寫的,這是確保您的應用程式使用其所有可用CPU而不受到I / O瓶頸限制。如果應用程式的處理程式非常快(可能不涉及網路或磁碟),則可能需要將prefetch乘數從4更改為更大的值。prefetchCount告訴Rabbit連線每個請求從伺服器檢索多少訊息。數字越大,等待網路獲取每條訊息的時間越少。

程式在故障發生時,不論是否重啟都應該不受其影響。因此,如果RabbitMQ使用者由於任何原因而失敗,這裡將使用該os.Exit(1)命令。日誌將被收集,然後重新啟動。如果這不適用於您的服務,則可能需要其他更優雅的方法進行處理。

15
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • 位元組跳動內部微服務架構-Docker實戰學習筆記分享 真香