Tuesday, February 11, 2025
HomeGolangRabbitMQ Channel Freezing When Message Rely Exceeds QoS Restrict: The right way...

RabbitMQ Channel Freezing When Message Rely Exceeds QoS Restrict: The right way to Resolve? – Code Evaluation


[*]

I got here throughout this publish (web site channel stops when utilizing QoS on channel if the variety of message is greater than QoS outlined variety of message) whereas troubleshooting a problem I’m going through with RabbitMQ in my Go utility. The issue is, after I use QoS on a channel and the variety of messages exceeds the outlined restrict, the channel stops receiving messages.

Right here is the code I’m utilizing:

bundle rmqcode

import (
    "context"
    "fmt"
    "log"
    "sync"
    "time"

    amqp "github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Panicf("%s: %s", msg, err)
    }
}

func recWorker(worknum int, del *amqp.Supply) {
    println(fmt.Sprintf("begin %d", worknum))
    time.Sleep(time.Second * 2)
    println(fmt.Sprintf("finish %d", worknum))
    del.Ack(true)
}

func Receiver() {
    conn, err := amqp.Dial("amqp://consumer:cross@localhost:5672/")
    failOnError(err, "Failed to hook up with RabbitMQ")
    defer conn.Shut()

    ch, err := conn.Channel()
    failOnError(err, "Didn't open a channel")
    defer ch.Shut()

    q, err := ch.QueueDeclare(
        "good day", // identify
        true,    // sturdy
        false,   // delete when unused
        false,   // unique
        false,   // no-wait
        nil,     // arguments
    )
    failOnError(err, "Didn't declare a queue")

    err = ch.Qos(
        3,     // prefetch depend
        0,     // prefetch measurement
        false, // international
    )
    failOnError(err, "Didn't set QoS")

    msg, err := ch.Eat(
        q.Identify, // queue
        "",     // client
        false,  // auto-ack
        false,  // unique
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Didn't register a client")

    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        var counter = 1
        for msgs := vary msg {
            go recWorker(counter, &msgs)
            counter = counter + 1
        }
        fmt.Println("stopped")
        wg.Executed()
    }()

    log.Printf(" [*] Ready for messages. To exit press CTRL+C")
    wg.Wait()
}

func Ship() {
    conn, err := amqp.Dial("amqp://consumer:cross@localhost:5672/")
    failOnError(err, "Failed to hook up with RabbitMQ")
    defer conn.Shut()

    ch, err := conn.Channel()
    failOnError(err, "Didn't open a channel")
    defer ch.Shut()

    q, err := ch.QueueDeclare(
        "good day", // identify
        true,    // sturdy
        false,   // delete when unused
        false,   // unique
        false,   // no-wait
        nil,     // arguments
    )
    failOnError(err, "Didn't declare a queue")
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    physique := "Hiya World!"
    err = ch.PublishWithContext(ctx,
        "",     // alternate
        q.Identify, // routing key
        false,  // obligatory
        false,  // fast
        amqp.Publishing{
            ContentType: "textual content/plain",
            Physique:        []byte(physique),
        })
    failOnError(err, "Didn't publish a message")
    log.Printf(" [x] Despatched %sn", physique)
}

In my case, I’ve set the QoS to a selected quantity, however when the variety of messages exceeds this quantity, the patron doesn’t course of additional messages. I’ve tried operating the code as proven within the publish, however I nonetheless face the identical drawback. I think it’s associated to how the messages are being acknowledged or presumably the prefetch depend, however I’m not solely positive.

Has anybody else encountered this problem when working with RabbitMQ and QoS? Any recommendations on how you can resolve this or correctly deal with message queues with out the channel freezing?

[*]

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Most Popular

Recent Comments