tencent cloud

Feedback

SDK for Go

Last updated: 2024-12-02 17:10:17

    Overview

    This document describes how to use open-source SDK to send and receive messages by using the SDK for Go as an example and helps you better understand the message sending and receiving processes.

    Prerequisites

    You have created the required resources as instructed in Resource Creation and Preparation.
    You have installed Go.
    You have downloaded the demo at here.
    Note:
    It is recommended to use version 0.9.0 or later.

    Directions

    1. Import the pulsar-client-go library in the client environment.
    1.1 Run the following command in the client environment to download the dependency package of the Pulsar client.
    go get -u "github.com/apache/pulsar-client-go/pulsar"
    1.2 After the installation is completed, use the following code to import the client into your Go project file.
    import "github.com/apache/pulsar-client-go/pulsar"
    2. Create a Pulsar client.
    // Create a Pulsar client
    client, err := pulsar.NewClient(pulsar.ClientOptions{
    // Service access address
    URL: serviceUrl,
    // Authorize the role token
    Authentication: pulsar.NewAuthenticationToken(authentication),
    OperationTimeout: 30 * time.Second,
    ConnectionTimeout: 30 * time.Second,
    })
    
    if err != nil{
    log.Fatalf("Could not instantiate Pulsar client: %v", err)
    }
    
    defer client.Close()
    
    Parameter
    Description
    serviceUrl
    Cluster access address, which can be viewed and copied on the Cluster Management page in the console.
    
    Authentication
    Role token, which can be copied in the Token column on the Role Management page.
    
    3. Create a producer.
    // Create a producer with the client
    producer, err := client.CreateProducer(pulsar.ProducerOptions{
    // Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`
    Topic: "persistent://pulsar-mmqwr5xx9n7g/sdk_go/topic1",
    })
    
    if err != nil{
    log.Fatal(err)
    }
    defer producer.Close()
    Note:
    You need to enter the complete path of the topic name, i.e., persistent://clusterid/namespace/Topic, where the clusterid/namespace/topic part can be copied directly from the Topic Management page in the console.
    4. Send a message.
    // Send the message
    _, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
    // Message content
    Payload: []byte("hello go client, this is a message."),
    // Business key
    Key: "yourKey",
    // Business parameter
    Properties: map[string]string{"key": "value"},
    })
    5. Create a consumer.
    // Create a consumer with the client
    consumer, err := client.Subscribe(pulsar.ConsumerOptions{
    // Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`
    Topic: "persistent://pulsar-mmqwr5xx9n7g/sdk_go/topic1",
    // Subscription name
    SubscriptionName: "topic1_sub",
    // Subscription mode
    Type: pulsar.Shared,
    })
    if err != nil{
    log.Fatal(err)
    }
    defer consumer.Close()
    Note:
    You need to enter the complete path of the topic name, i.e., persistent://clusterid/namespace/Topic, where the clusterid/namespace/topic part can be copied directly from the Topic Management page in the console.
    img
    
    You need to enter the subscription name in the subscriptionName parameter, which can be viewed on the Consumption Management page.
    6. Consume a message.
    // Obtain the message
    msg, err := consumer.Receive(context.Background())
    if err != nil{
    log.Fatal(err)
    }
    // Simulate business processing
    fmt.Printf("Received message msgId: %#v -- content: '%s'\\n",
    msg.ID(), string(msg.Payload()))
    
    // If the consumption is successful, return `ack`; otherwise, return `nack` or `ReconsumeLater` according to your business needs
    consumer.Ack(msg)
    7. Log in to the TDMQ for Apache Pulsar console, click Topic > Topic Name to enter the consumption management page, and click the triangle below a subscription name to view the production and consumption records.
    img
    
    Note:
    The above is a brief introduction to the way of publishing and subscribing to messages. For more operations, see Demo or Pulsar Go client.

    Customizing Log File Output

    Use Cases

    As many users don’t customize the logging library when using the Pulsar SDK for Go, logs are output to os.Stderr by default, as shown below:
    // It's recommended to make this a global instance called `log`.
    func New() *Logger {
    return &Logger{
    Out: os.Stderr, // Default output address
    Formatter: new(TextFormatter),
    Hooks: make(LevelHooks),
    Level: InfoLevel,
    ExitFunc: os.Exit,
    ReportCaller: false,
    }
    }
    Generally, log information is output to os.Stderr. If you don’t specify a custom logging library, the SDK for Go logs and business logs will be mixed, making it difficult for troubleshooting.

    Solution

    With the logger API exposed on the client by the SDK for Go, you can customize the log output format and location and use logging libraries such as logrus and zap. Related parameters are as follows:
    1. Implement the log.Logger API provided by the Pulsar SDK for Go by customizing log lib.
    // ClientOptions is used to construct a Pulsar Client instance.
    type ClientOptions struct {
    // Configure the logger used by the client.
    // By default, a wrapped logrus.StandardLogger will be used, namely,
    // log.NewLoggerWithLogrus(logrus.StandardLogger())
    // FIXME: use `logger` as internal field name instead of `log` as it's more idiomatic
    Logger log.Logger
    }
    When using the SDK for Go, you can customize the logger API to customize log lib so that you can redirect logs to a specified location. Taking logrus as an example, the demo below shows you how to customize log lib to output the SDK for Go logs to a specified file.
    package main
    
    import(
    "fmt"
    "io"
    "os"
    
    "github.com/apache/pulsar-client-go/pulsar/log"
    "github.com/sirupsen/logrus"
    
    )
    
    // logrusWrapper implements Logger interface
    // based on underlying logrus.FieldLogger
    type logrusWrapper struct {
    l logrus.FieldLogger
    }
    
    // NewLoggerWithLogrus creates a new logger which wraps
    // the given logrus.Logger
    func NewLoggerWithLogrus(logger *logrus.Logger, outputPath string) log.Logger {
    writer1 := os.Stdout
    writer2, err := os.OpenFile(outputPath, os.O_WRONLY|os.O_CREATE, 0755)
    if err != nil{
    logrus.Error("create file log.txt failed: %v", err)
    }
    logger.SetOutput(io.MultiWriter(writer1, writer2))
    return &logrusWrapper{
    l: logger,
    }
    }
    
    func (l *logrusWrapper) SubLogger(fs log.Fields) log.Logger {
    return &logrusWrapper{
    l: l.l.WithFields(logrus.Fields(fs)),
    }
    }
    
    func (l *logrusWrapper) WithFields(fs log.Fields) log.Entry {
    return logrusEntry{
    e: l.l.WithFields(logrus.Fields(fs)),
    }
    }
    
    func (l *logrusWrapper) WithField(name string, value interface{}) log.Entry {
    return logrusEntry{
    e: l.l.WithField(name, value),
    }
    }
    
    func (l *logrusWrapper) WithError(err error) log.Entry {
    return logrusEntry{
    e: l.l.WithError(err),
    }
    }
    
    func (l *logrusWrapper) Debug(args ...interface{}) {
    l.l.Debug(args...)
    }
    
    func (l *logrusWrapper) Info(args ...interface{}) {
    l.l.Info(args...)
    }
    
    func (l *logrusWrapper) Warn(args ...interface{}) {
    l.l.Warn(args...)
    }
    
    func (l *logrusWrapper) Error(args ...interface{}) {
    l.l.Error(args...)
    }
    
    func (l *logrusWrapper) Debugf(format string, args ...interface{}) {
    l.l.Debugf(format, args...)
    }
    
    func (l *logrusWrapper) Infof(format string, args ...interface{}) {
    l.l.Infof(format, args...)
    }
    
    func (l *logrusWrapper) Warnf(format string, args ...interface{}) {
    l.l.Warnf(format, args...)
    }
    
    func (l *logrusWrapper) Errorf(format string, args ...interface{}) {
    l.l.Errorf(format, args...)
    }
    
    type logrusEntry struct {
    e logrus.FieldLogger
    }
    
    func (l logrusEntry) WithFields(fs log.Fields) log.Entry {
    return logrusEntry{
    e: l.e.WithFields(logrus.Fields(fs)),
    }
    }
    
    func (l logrusEntry) WithField(name string, value interface{}) log.Entry {
    return logrusEntry{
    e: l.e.WithField(name, value),
    }
    }
    
    func (l logrusEntry) Debug(args ...interface{}) {
    l.e.Debug(args...)
    }
    
    func (l logrusEntry) Info(args ...interface{}) {
    l.e.Info(args...)
    }
    
    func (l logrusEntry) Warn(args ...interface{}) {
    l.e.Warn(args...)
    }
    
    func (l logrusEntry) Error(args ...interface{}) {
    l.e.Error(args...)
    }
    
    func (l logrusEntry) Debugf(format string, args ...interface{}) {
    l.e.Debugf(format, args...)
    }
    
    func (l logrusEntry) Infof(format string, args ...interface{}) {
    l.e.Infof(format, args...)
    }
    
    func (l logrusEntry) Warnf(format string, args ...interface{}) {
    l.e.Warnf(format, args...)
    }
    
    func (l logrusEntry) Errorf(format string, args ...interface{}) {
    l.e.Errorf(format, args...)
    }
    2. Specify a custom log lib when creating the client.
    client, err := pulsar.NewClient(pulsar.ClientOptions{
    URL: "pulsar://localhost:6650",
    Logger: NewLoggerWithLogrus(log.StandardLogger(), "test.log"),
    })
    The above demo shows you how to redirect the log file of the Pulsar SDK for Go to the test.log file in the current path. You can redirect the log file to a specified location as needed.
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support