rabbitmq/amqp091-go

Can't catch error at from PublishWithContext() publishing message with max_message_size defined in RabbitMq

Closed this issue · 0 comments

xf0e commented

Describe the bug
Can't catch error at publishing message with max_message_size defined in RabbitMq. At publishing messages larger than configured RabbitMq max_message_size PublishWithContext() doesn't return any error. Albeit the message gets discarded in RabbitMq.

To Reproduce
Steps to reproduce the behavior:

  1. Just compiled the example from amqp091-go and modified it to put a large file in the body of a message (see code below)
  2. Run it
  3. Program terminates without of error
  4. Logs of RabbitMq contains now 2024-10-26 21:30:08.557637+02:00 [error] <0.4535.0> operation basic.publish caused a channel exception precondition_failed: message size 169150808 is larger than configured max size 134217728

Actual code:

package main

import (
	"context"
	amqp "github.com/rabbitmq/amqp091-go"
	"log"
	"os"
	"time"
	"unsafe"
)

func main() {
	// Connects opens an AMQP connection from the credentials in the URL.
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	if err != nil {
		log.Fatalf("connection.open: %s", err)
	}

	// This waits for a server acknowledgment which means the sockets will have
	// flushed all outbound publishings prior to returning.  It's important to
	// block on Close to not lose any publishings.
	defer conn.Close()

	c, err := conn.Channel()
	if err != nil {
		log.Fatalf("channel.open: %s", err)
	}

	// We declare our topology on both the publisher and consumer to ensure they
	// are the same.  This is part of AMQP being a programmable messaging model.
	//
	// See the Channel.Consume example for the complimentary declare.
	err = c.ExchangeDeclare("logs", "topic", true, false, false, false, nil)
	if err != nil {
		log.Fatalf("exchange.declare: %v", err)
	}

	// Prepare this message to be persistent.  Your publishing requirements may
	// be different.
	file, _ := os.ReadFile("large_file")

	msg := amqp.Publishing{
		DeliveryMode: amqp.Persistent,
		Timestamp:    time.Now(),
		ContentType:  "text/plain",
		Body:         []byte(file),
	}

	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	// This is not a mandatory delivery, so it will be dropped if there are no
	// queues bound to the logs exchange.
	err = c.PublishWithContext(ctx, "logs", "info", false, false, msg)
	if err != nil {
		// Since publish is asynchronous this can happen if the network connection
		// is reset or if the server has run out of resources.
		log.Fatalf("basic.publish: %v", err)
	}
}