[*]
I got here throughout this publish (web site channel stops when utilizing QoS on channel if the variety of message is greater than QoS outlined variety of message) whereas troubleshooting a problem I’m going through with RabbitMQ in my Go utility. The issue is, after I use QoS on a channel and the variety of messages exceeds the outlined restrict, the channel stops receiving messages.
Right here is the code I’m utilizing:
bundle rmqcode
import (
"context"
"fmt"
"log"
"sync"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}
func recWorker(worknum int, del *amqp.Supply) {
println(fmt.Sprintf("begin %d", worknum))
time.Sleep(time.Second * 2)
println(fmt.Sprintf("finish %d", worknum))
del.Ack(true)
}
func Receiver() {
conn, err := amqp.Dial("amqp://consumer:cross@localhost:5672/")
failOnError(err, "Failed to hook up with RabbitMQ")
defer conn.Shut()
ch, err := conn.Channel()
failOnError(err, "Didn't open a channel")
defer ch.Shut()
q, err := ch.QueueDeclare(
"good day", // identify
true, // sturdy
false, // delete when unused
false, // unique
false, // no-wait
nil, // arguments
)
failOnError(err, "Didn't declare a queue")
err = ch.Qos(
3, // prefetch depend
0, // prefetch measurement
false, // international
)
failOnError(err, "Didn't set QoS")
msg, err := ch.Eat(
q.Identify, // queue
"", // client
false, // auto-ack
false, // unique
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Didn't register a client")
var wg sync.WaitGroup
wg.Add(1)
go func() {
var counter = 1
for msgs := vary msg {
go recWorker(counter, &msgs)
counter = counter + 1
}
fmt.Println("stopped")
wg.Executed()
}()
log.Printf(" [*] Ready for messages. To exit press CTRL+C")
wg.Wait()
}
func Ship() {
conn, err := amqp.Dial("amqp://consumer:cross@localhost:5672/")
failOnError(err, "Failed to hook up with RabbitMQ")
defer conn.Shut()
ch, err := conn.Channel()
failOnError(err, "Didn't open a channel")
defer ch.Shut()
q, err := ch.QueueDeclare(
"good day", // identify
true, // sturdy
false, // delete when unused
false, // unique
false, // no-wait
nil, // arguments
)
failOnError(err, "Didn't declare a queue")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
physique := "Hiya World!"
err = ch.PublishWithContext(ctx,
"", // alternate
q.Identify, // routing key
false, // obligatory
false, // fast
amqp.Publishing{
ContentType: "textual content/plain",
Physique: []byte(physique),
})
failOnError(err, "Didn't publish a message")
log.Printf(" [x] Despatched %sn", physique)
}
In my case, I’ve set the QoS to a selected quantity, however when the variety of messages exceeds this quantity, the patron doesn’t course of additional messages. I’ve tried operating the code as proven within the publish, however I nonetheless face the identical drawback. I think it’s associated to how the messages are being acknowledged or presumably the prefetch depend, however I’m not solely positive.
Has anybody else encountered this problem when working with RabbitMQ and QoS? Any recommendations on how you can resolve this or correctly deal with message queues with out the channel freezing?
[*]