pom.xml
file. This document uses a Maven project as an example.<dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client</artifactId><version>2.7.2</version></dependency>
BatchReceive
) of the client.PulsarClient pulsarClient = PulsarClient.builder()// Service access address.serviceUrl(SERVICE_URL)// Role token.authentication(AuthenticationFactory.token(AUTHENTICATION)).build();
Parameter | Description |
SERVICE_URL | |
AUTHENTICATION | |
// Create a producer of the `byte[]` typeProducer<byte[]> producer = pulsarClient.newProducer()// Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`.topic("persistent://pulsar-xxx/sdk_java/topic1").create();
persistent://clusterid/namespace/Topic
, where the clusterid/namespace/topic
part can be copied directly from the Topic page in the console.// Send the messageMessageId msgId = producer.newMessage()// Message content.value("this is a new message.".getBytes(StandardCharsets.UTF_8))// Business key.key("youKey")// Business parameter.property("mykey", "myvalue").send();
// Disable the producerproducer.close();// Disable the clientpulsarClient.close();
// Create a consumer of the `byte[]` type (default type)Consumer<byte[]> consumer = pulsarClient.newConsumer()// Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`, which can be copied from the **Topic** page..topic("persistent://pulsar-xxx/sdk_java/topic1")// You need to create a subscription on the topic details page in the console and enter the subscription name here.subscriptionName("sub_topic1")// Declare the exclusive mode as the consumption mode.subscriptionType(SubscriptionType.Exclusive)// Configure consumption starting at the earliest offset; otherwise, historical messages may not be consumed.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)// Subscription.subscribe();
persistent://clusterid/namespace/Topic
, where the clusterid/namespace/topic
part can be copied directly from the Topic page in the console.
subscriptionName
parameter, which can be viewed on the Consumption Management page.// Receive a message corresponding to the current offsetMessage<byte[]> msg = consumer.receive();MessageId msgId = msg.getMessageId();String value = new String(msg.getValue());System.out.println("receive msg " + msgId + ",value:" + value);// Messages must be acknowledged after being received; otherwise, the offset will be held in the position of the current message, causing message heap.consumer.acknowledge(msg);
// Message listenerMessageListener<byte[]> myMessageListener = (consumer, msg) -> {try {System.out.println("Message received: " + new String(msg.getData()));// Return `ack` as the acknowledgementconsumer.acknowledge(msg);} catch (Exception e){// Return `nack` if the consumption failsconsumer.negativeAcknowledge(msg);}};pulsarClient.newConsumer()// Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`, which can be copied from the **Topic** page..topic("persistent://pulsar-mmqwr5xx9n7g/sdk_java/topic1")// You need to create a subscription on the topic details page in the console and enter the subscription name here.subscriptionName("sub_topic1")// Declare the exclusive mode as the consumption mode.subscriptionType(SubscriptionType.Exclusive)// Set the listener.messageListener(myMessageListener)// Configure consumption starting at the earliest offset; otherwise, historical messages may not be consumed.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
Was this page helpful?