Can't catch error at from PublishWithContext() publishing message with max_message_size defined in RabbitMq
Closed this issue · 0 comments
xf0e commented
Describe the bug
Can't catch error at publishing message with max_message_size defined in RabbitMq. At publishing messages larger than configured RabbitMq max_message_size PublishWithContext() doesn't return any error. Albeit the message gets discarded in RabbitMq.
To Reproduce
Steps to reproduce the behavior:
- Just compiled the example from amqp091-go and modified it to put a large file in the body of a message (see code below)
- Run it
- Program terminates without of error
- Logs of RabbitMq contains now
2024-10-26 21:30:08.557637+02:00 [error] <0.4535.0> operation basic.publish caused a channel exception precondition_failed: message size 169150808 is larger than configured max size 134217728
Actual code:
package main
import (
"context"
amqp "github.com/rabbitmq/amqp091-go"
"log"
"os"
"time"
"unsafe"
)
func main() {
// Connects opens an AMQP connection from the credentials in the URL.
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("connection.open: %s", err)
}
// This waits for a server acknowledgment which means the sockets will have
// flushed all outbound publishings prior to returning. It's important to
// block on Close to not lose any publishings.
defer conn.Close()
c, err := conn.Channel()
if err != nil {
log.Fatalf("channel.open: %s", err)
}
// We declare our topology on both the publisher and consumer to ensure they
// are the same. This is part of AMQP being a programmable messaging model.
//
// See the Channel.Consume example for the complimentary declare.
err = c.ExchangeDeclare("logs", "topic", true, false, false, false, nil)
if err != nil {
log.Fatalf("exchange.declare: %v", err)
}
// Prepare this message to be persistent. Your publishing requirements may
// be different.
file, _ := os.ReadFile("large_file")
msg := amqp.Publishing{
DeliveryMode: amqp.Persistent,
Timestamp: time.Now(),
ContentType: "text/plain",
Body: []byte(file),
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// This is not a mandatory delivery, so it will be dropped if there are no
// queues bound to the logs exchange.
err = c.PublishWithContext(ctx, "logs", "info", false, false, msg)
if err != nil {
// Since publish is asynchronous this can happen if the network connection
// is reset or if the server has run out of resources.
log.Fatalf("basic.publish: %v", err)
}
}