go get "github.com/rabbitmq/amqp091-go"
import (amqp "github.com/rabbitmq/amqp091-go")
// Required parametersconst (host = "amqp-xx.rabbitmq.x.tencenttdmq.com" // Service access addressusername = "roleName" // Role name in the consolepassword = "eyJrZX..." // Role keyvhost = "amqp-xx|Vhost" // Full name of the vhost to be used)// Create a connectionconn, err := amqp.Dial("amqp://" + username + ":" + password + "@" + host + ":5672/" + vhost)failOnError(err, "Failed to connect to RabbitMQ")defer func(conn *amqp.Connection) {err := conn.Close()if err != nil {}}(conn)// Establish a channelch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer func(ch *amqp.Channel) {err := ch.Close()if err != nil {}}(ch)
Parameter | Description |
host | Cluster access address, which can be obtained from Access Address in the Operation column on the Cluster page. |
username | |
password | |
vhost | Vhost name in the format of "cluster ID + | + vhost name", which can be copied on the Vhost page in the console. |
// Declare the exchange (the name and type must be the same as those of the existing exchange)err = ch.ExchangeDeclare("logs-exchange", // Exchange name"fanout", // Exchange typetrue, // durablefalse, // auto-deletedfalse, // internalfalse, // no-waitnil, // arguments)failOnError(err, "Failed to declare a exchange")
hello world
and work
).// Message contentbody := "this is new message."// Publish messages to the exchangeerr = ch.Publish("logs-exchange", // exchange"", // Routing key (set whether the routing key is required based on the used exchange type). If exchange is not selected, this parameter will be the message queue namefalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body: []byte(body),})failOnError(err, "Failed to publish a message")
// Publish messages to the specified message queueerr = ch.Publish("", // exchangequeue.Name, // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body: []byte(body),})failOnError(err, "Failed to publish a message")
// Create a consumer and consume messages in the specified message queuemsgs, err := ch.Consume("message-queue", // message-queue"", // consumerfalse, // Set to manual acknowledgment as neededfalse, // exclusivefalse, // no-localfalse, // no-waitnil, // args)failOnError(err, "Failed to register a consumer")// Get messages in the message queueforever := make(chan bool)go func() {for d := range msgs {log.Printf("Received a message: %s", d.Body)t := time.Duration(1)time.Sleep(t * time.Second)// Manually return the acknowledgmentd.Ack(false)}}()log.Printf(" [Consumer] Waiting for messages.")<-forever
// You need to specify the exchange and routing key in the message queueerr = ch.QueueBind(q.Name, // queue name"routing_key", // routing key"topic_demo", // exchangefalse,nil,)failOnError(err, "Failed to bind a queue")
Was this page helpful?