pulsar-client-go
library in the client environment.go get -u "github.com/apache/pulsar-client-go/pulsar"
import "github.com/apache/pulsar-client-go/pulsar"
// Create a Pulsar clientclient, err := pulsar.NewClient(pulsar.ClientOptions{// Service access addressURL: serviceUrl,// Authorize the role tokenAuthentication: pulsar.NewAuthenticationToken(authentication),OperationTimeout: 30 * time.Second,ConnectionTimeout: 30 * time.Second,})if err != nil{log.Fatalf("Could not instantiate Pulsar client: %v", err)}defer client.Close()
Parameter | Description |
serviceUrl | Cluster access address, which can be viewed and copied on the Cluster Management page in the console. |
Authentication |
// Create a producer with the clientproducer, err := client.CreateProducer(pulsar.ProducerOptions{// Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`Topic: "persistent://pulsar-mmqwr5xx9n7g/sdk_go/topic1",})if err != nil{log.Fatal(err)}defer producer.Close()
persistent://clusterid/namespace/Topic
, where the clusterid/namespace/topic
part can be copied directly from the Topic Management page in the console.// Send the message_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{// Message contentPayload: []byte("hello go client, this is a message."),// Business keyKey: "yourKey",// Business parameterProperties: map[string]string{"key": "value"},})
// Create a consumer with the clientconsumer, err := client.Subscribe(pulsar.ConsumerOptions{// Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`Topic: "persistent://pulsar-mmqwr5xx9n7g/sdk_go/topic1",// Subscription nameSubscriptionName: "topic1_sub",// Subscription modeType: pulsar.Shared,})if err != nil{log.Fatal(err)}defer consumer.Close()
persistent://clusterid/namespace/Topic
, where the clusterid/namespace/topic
part can be copied directly from the Topic Management page in the console.
subscriptionName
parameter, which can be viewed on the Consumption Management page.// Obtain the messagemsg, err := consumer.Receive(context.Background())if err != nil{log.Fatal(err)}// Simulate business processingfmt.Printf("Received message msgId: %#v -- content: '%s'\\n",msg.ID(), string(msg.Payload()))// If the consumption is successful, return `ack`; otherwise, return `nack` or `ReconsumeLater` according to your business needsconsumer.Ack(msg)
os.Stderr
by default, as shown below:// It's recommended to make this a global instance called `log`.func New() *Logger {return &Logger{Out: os.Stderr, // Default output addressFormatter: new(TextFormatter),Hooks: make(LevelHooks),Level: InfoLevel,ExitFunc: os.Exit,ReportCaller: false,}}
os.Stderr
. If you don’t specify a custom logging library, the SDK for Go logs and business logs will be mixed, making it difficult for troubleshooting.logger
API exposed on the client by the SDK for Go, you can customize the log output format and location and use logging libraries such as logrus
and zap
. Related parameters are as follows:log.Logger
API provided by the Pulsar SDK for Go by customizing log lib
.// ClientOptions is used to construct a Pulsar Client instance.type ClientOptions struct {// Configure the logger used by the client.// By default, a wrapped logrus.StandardLogger will be used, namely,// log.NewLoggerWithLogrus(logrus.StandardLogger())// FIXME: use `logger` as internal field name instead of `log` as it's more idiomaticLogger log.Logger}
logger
API to customize log lib
so that you can redirect logs to a specified location. Taking logrus
as an example, the demo below shows you how to customize log lib
to output the SDK for Go logs to a specified file.package mainimport("fmt""io""os""github.com/apache/pulsar-client-go/pulsar/log""github.com/sirupsen/logrus")// logrusWrapper implements Logger interface// based on underlying logrus.FieldLoggertype logrusWrapper struct {l logrus.FieldLogger}// NewLoggerWithLogrus creates a new logger which wraps// the given logrus.Loggerfunc NewLoggerWithLogrus(logger *logrus.Logger, outputPath string) log.Logger {writer1 := os.Stdoutwriter2, err := os.OpenFile(outputPath, os.O_WRONLY|os.O_CREATE, 0755)if err != nil{logrus.Error("create file log.txt failed: %v", err)}logger.SetOutput(io.MultiWriter(writer1, writer2))return &logrusWrapper{l: logger,}}func (l *logrusWrapper) SubLogger(fs log.Fields) log.Logger {return &logrusWrapper{l: l.l.WithFields(logrus.Fields(fs)),}}func (l *logrusWrapper) WithFields(fs log.Fields) log.Entry {return logrusEntry{e: l.l.WithFields(logrus.Fields(fs)),}}func (l *logrusWrapper) WithField(name string, value interface{}) log.Entry {return logrusEntry{e: l.l.WithField(name, value),}}func (l *logrusWrapper) WithError(err error) log.Entry {return logrusEntry{e: l.l.WithError(err),}}func (l *logrusWrapper) Debug(args ...interface{}) {l.l.Debug(args...)}func (l *logrusWrapper) Info(args ...interface{}) {l.l.Info(args...)}func (l *logrusWrapper) Warn(args ...interface{}) {l.l.Warn(args...)}func (l *logrusWrapper) Error(args ...interface{}) {l.l.Error(args...)}func (l *logrusWrapper) Debugf(format string, args ...interface{}) {l.l.Debugf(format, args...)}func (l *logrusWrapper) Infof(format string, args ...interface{}) {l.l.Infof(format, args...)}func (l *logrusWrapper) Warnf(format string, args ...interface{}) {l.l.Warnf(format, args...)}func (l *logrusWrapper) Errorf(format string, args ...interface{}) {l.l.Errorf(format, args...)}type logrusEntry struct {e logrus.FieldLogger}func (l logrusEntry) WithFields(fs log.Fields) log.Entry {return logrusEntry{e: l.e.WithFields(logrus.Fields(fs)),}}func (l logrusEntry) WithField(name string, value interface{}) log.Entry {return logrusEntry{e: l.e.WithField(name, value),}}func (l logrusEntry) Debug(args ...interface{}) {l.e.Debug(args...)}func (l logrusEntry) Info(args ...interface{}) {l.e.Info(args...)}func (l logrusEntry) Warn(args ...interface{}) {l.e.Warn(args...)}func (l logrusEntry) Error(args ...interface{}) {l.e.Error(args...)}func (l logrusEntry) Debugf(format string, args ...interface{}) {l.e.Debugf(format, args...)}func (l logrusEntry) Infof(format string, args ...interface{}) {l.e.Infof(format, args...)}func (l logrusEntry) Warnf(format string, args ...interface{}) {l.e.Warnf(format, args...)}func (l logrusEntry) Errorf(format string, args ...interface{}) {l.e.Errorf(format, args...)}
log lib
when creating the client.client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650",Logger: NewLoggerWithLogrus(log.StandardLogger(), "test.log"),})
test.log
file in the current path. You can redirect the log file to a specified location as needed.
Was this page helpful?