tencent cloud

SDK for Go
Last updated: 2024-12-02 17:10:17
SDK for Go
Last updated: 2024-12-02 17:10:17

Overview

This document describes how to use open-source SDK to send and receive messages by using the SDK for Go as an example and helps you better understand the message sending and receiving processes.

Prerequisites

You have created the required resources as instructed in Resource Creation and Preparation.
You have installed Go.
You have downloaded the demo at here.
Note:
It is recommended to use version 0.9.0 or later.

Directions

1. Import the pulsar-client-go library in the client environment.
1.1 Run the following command in the client environment to download the dependency package of the Pulsar client.
go get -u "github.com/apache/pulsar-client-go/pulsar"
1.2 After the installation is completed, use the following code to import the client into your Go project file.
import "github.com/apache/pulsar-client-go/pulsar"
2. Create a Pulsar client.
// Create a Pulsar client
client, err := pulsar.NewClient(pulsar.ClientOptions{
// Service access address
URL: serviceUrl,
// Authorize the role token
Authentication: pulsar.NewAuthenticationToken(authentication),
OperationTimeout: 30 * time.Second,
ConnectionTimeout: 30 * time.Second,
})

if err != nil{
log.Fatalf("Could not instantiate Pulsar client: %v", err)
}

defer client.Close()

Parameter
Description
serviceUrl
Cluster access address, which can be viewed and copied on the Cluster Management page in the console.

Authentication
Role token, which can be copied in the Token column on the Role Management page.

3. Create a producer.
// Create a producer with the client
producer, err := client.CreateProducer(pulsar.ProducerOptions{
// Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`
Topic: "persistent://pulsar-mmqwr5xx9n7g/sdk_go/topic1",
})

if err != nil{
log.Fatal(err)
}
defer producer.Close()
Note:
You need to enter the complete path of the topic name, i.e., persistent://clusterid/namespace/Topic, where the clusterid/namespace/topic part can be copied directly from the Topic Management page in the console.
4. Send a message.
// Send the message
_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
// Message content
Payload: []byte("hello go client, this is a message."),
// Business key
Key: "yourKey",
// Business parameter
Properties: map[string]string{"key": "value"},
})
5. Create a consumer.
// Create a consumer with the client
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
// Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`
Topic: "persistent://pulsar-mmqwr5xx9n7g/sdk_go/topic1",
// Subscription name
SubscriptionName: "topic1_sub",
// Subscription mode
Type: pulsar.Shared,
})
if err != nil{
log.Fatal(err)
}
defer consumer.Close()
Note:
You need to enter the complete path of the topic name, i.e., persistent://clusterid/namespace/Topic, where the clusterid/namespace/topic part can be copied directly from the Topic Management page in the console.
img

You need to enter the subscription name in the subscriptionName parameter, which can be viewed on the Consumption Management page.
6. Consume a message.
// Obtain the message
msg, err := consumer.Receive(context.Background())
if err != nil{
log.Fatal(err)
}
// Simulate business processing
fmt.Printf("Received message msgId: %#v -- content: '%s'\\n",
msg.ID(), string(msg.Payload()))

// If the consumption is successful, return `ack`; otherwise, return `nack` or `ReconsumeLater` according to your business needs
consumer.Ack(msg)
7. Log in to the TDMQ for Apache Pulsar console, click Topic > Topic Name to enter the consumption management page, and click the triangle below a subscription name to view the production and consumption records.
img

Note:
The above is a brief introduction to the way of publishing and subscribing to messages. For more operations, see Demo or Pulsar Go client.

Customizing Log File Output

Use Cases

As many users don’t customize the logging library when using the Pulsar SDK for Go, logs are output to os.Stderr by default, as shown below:
// It's recommended to make this a global instance called `log`.
func New() *Logger {
return &Logger{
Out: os.Stderr, // Default output address
Formatter: new(TextFormatter),
Hooks: make(LevelHooks),
Level: InfoLevel,
ExitFunc: os.Exit,
ReportCaller: false,
}
}
Generally, log information is output to os.Stderr. If you don’t specify a custom logging library, the SDK for Go logs and business logs will be mixed, making it difficult for troubleshooting.

Solution

With the logger API exposed on the client by the SDK for Go, you can customize the log output format and location and use logging libraries such as logrus and zap. Related parameters are as follows:
1. Implement the log.Logger API provided by the Pulsar SDK for Go by customizing log lib.
// ClientOptions is used to construct a Pulsar Client instance.
type ClientOptions struct {
// Configure the logger used by the client.
// By default, a wrapped logrus.StandardLogger will be used, namely,
// log.NewLoggerWithLogrus(logrus.StandardLogger())
// FIXME: use `logger` as internal field name instead of `log` as it's more idiomatic
Logger log.Logger
}
When using the SDK for Go, you can customize the logger API to customize log lib so that you can redirect logs to a specified location. Taking logrus as an example, the demo below shows you how to customize log lib to output the SDK for Go logs to a specified file.
package main

import(
"fmt"
"io"
"os"

"github.com/apache/pulsar-client-go/pulsar/log"
"github.com/sirupsen/logrus"

)

// logrusWrapper implements Logger interface
// based on underlying logrus.FieldLogger
type logrusWrapper struct {
l logrus.FieldLogger
}

// NewLoggerWithLogrus creates a new logger which wraps
// the given logrus.Logger
func NewLoggerWithLogrus(logger *logrus.Logger, outputPath string) log.Logger {
writer1 := os.Stdout
writer2, err := os.OpenFile(outputPath, os.O_WRONLY|os.O_CREATE, 0755)
if err != nil{
logrus.Error("create file log.txt failed: %v", err)
}
logger.SetOutput(io.MultiWriter(writer1, writer2))
return &logrusWrapper{
l: logger,
}
}

func (l *logrusWrapper) SubLogger(fs log.Fields) log.Logger {
return &logrusWrapper{
l: l.l.WithFields(logrus.Fields(fs)),
}
}

func (l *logrusWrapper) WithFields(fs log.Fields) log.Entry {
return logrusEntry{
e: l.l.WithFields(logrus.Fields(fs)),
}
}

func (l *logrusWrapper) WithField(name string, value interface{}) log.Entry {
return logrusEntry{
e: l.l.WithField(name, value),
}
}

func (l *logrusWrapper) WithError(err error) log.Entry {
return logrusEntry{
e: l.l.WithError(err),
}
}

func (l *logrusWrapper) Debug(args ...interface{}) {
l.l.Debug(args...)
}

func (l *logrusWrapper) Info(args ...interface{}) {
l.l.Info(args...)
}

func (l *logrusWrapper) Warn(args ...interface{}) {
l.l.Warn(args...)
}

func (l *logrusWrapper) Error(args ...interface{}) {
l.l.Error(args...)
}

func (l *logrusWrapper) Debugf(format string, args ...interface{}) {
l.l.Debugf(format, args...)
}

func (l *logrusWrapper) Infof(format string, args ...interface{}) {
l.l.Infof(format, args...)
}

func (l *logrusWrapper) Warnf(format string, args ...interface{}) {
l.l.Warnf(format, args...)
}

func (l *logrusWrapper) Errorf(format string, args ...interface{}) {
l.l.Errorf(format, args...)
}

type logrusEntry struct {
e logrus.FieldLogger
}

func (l logrusEntry) WithFields(fs log.Fields) log.Entry {
return logrusEntry{
e: l.e.WithFields(logrus.Fields(fs)),
}
}

func (l logrusEntry) WithField(name string, value interface{}) log.Entry {
return logrusEntry{
e: l.e.WithField(name, value),
}
}

func (l logrusEntry) Debug(args ...interface{}) {
l.e.Debug(args...)
}

func (l logrusEntry) Info(args ...interface{}) {
l.e.Info(args...)
}

func (l logrusEntry) Warn(args ...interface{}) {
l.e.Warn(args...)
}

func (l logrusEntry) Error(args ...interface{}) {
l.e.Error(args...)
}

func (l logrusEntry) Debugf(format string, args ...interface{}) {
l.e.Debugf(format, args...)
}

func (l logrusEntry) Infof(format string, args ...interface{}) {
l.e.Infof(format, args...)
}

func (l logrusEntry) Warnf(format string, args ...interface{}) {
l.e.Warnf(format, args...)
}

func (l logrusEntry) Errorf(format string, args ...interface{}) {
l.e.Errorf(format, args...)
}
2. Specify a custom log lib when creating the client.
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
Logger: NewLoggerWithLogrus(log.StandardLogger(), "test.log"),
})
The above demo shows you how to redirect the log file of the Pulsar SDK for Go to the test.log file in the current path. You can redirect the log file to a specified location as needed.
Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback