tencent cloud

All product documents
Stream Compute Service
Last updated: 2023-11-08 15:55:14
Pulsar
Last updated: 2023-11-08 15:55:14

Overview

The Pulsar SQL connector allows you to read data from or write data to Pulsar topics using simple SQL queries or Flink Table API.

Versions

The Flink Pulsar connector, based on StreamNative/Flink,‌ is supported by Flink 1.13 and 1.14. For information about the DataStream API, see the StreamNative document.‌
Flink Version
Description
Source Code
1.11
Unsupported
-
1.13
Supported
1.14
Supported
1.16
Supported


Limits

Pulsar can be used as a source or as a sink for tuple and upsert streams.
It does not support dimension tables.

Creating a Pulsar table

The example below shows you how to create a Pulsar table:
CREATE TABLE PulsarTable (
`user_id` bigint,
`item_id` bigint,
`behavior` STRING,
`publish_time` TIMESTAMP_LTZ(3) METADATA FROM 'publish_time' VIRTUAL
) WITH (
'connector' = 'pulsar',
'service-url' = 'pulsar://pulsar:6650',
'admin-url' = 'http://pulsar:8080',
-- 'pulsar.client.authPluginClassName' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',
-- 'pulsar.client.authParams' = 'token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0In0.Cpsuc4hRw3yL231Kp8iQAQkYc2SvVTfeziAMcLxxxxx',
'topics' = 'user_behavior',
'format' = 'json',
'source.subscription-name' = 'flink',
'source.start.message-id' = 'earliest'
);
Note
If authentication is enabled for your Pulsar cluster, please use a token with admin permissions.

About tokens

The Flink Pulsar connector uses Pulsar admin APIs to listen for partition changes and subscription creation, so if token authentication is enabled for your Pulsar cluster, a token with admin permissions is needed. You will also need permission to read from and write to the topic.
# Checking admin permissions
admin_url=http://172.28.28.46:8080,172.28.28.29:8080,172.28.28.105:8080
token=eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJteS1zdXBlci11c2VyIn0.T-Bi__yGCs1lxEwdUcDKBDuJsWLO0X9LrePhnuXXXXX
namespace=public/default
pulsar-admin --admin-url ${admin_url} --auth-params token:${token} --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken topics list ${namespace}

# Checking topic read and write permissions
service_url=pulsar://172.28.28.46:6650,172.28.28.29:6650,172.28.28.105:6650
token=eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJteS1zdXBlci11c2VyIn0.T-Bi__yGCs1lxEwdUcDKBDuJsWLO0X9LrePhnuXXXXX
namespace=public/default
topic=xxx
subscription=yyy
pulsar-client --url ${service_url} --auth-params token:${token} --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken consume -s ${subscription} -n 10 persistent://${namespace}/${topic} -p Earliest

Connector options

Option
Required
Default Value
Data Type
Description
connector
Yes
-
String
The connector to use. For Apache Pulsar, use 'pulsar' or 'upsert-pulsar'.
admin-url
Yes
-
String
The Pulsar admin URL, such as http://my-broker.example.com:8080 or https://my-broker.example.com:8443.
service-url
Yes
-
String
The URL of the Pulsar server.
A Pulsar protocol URL is needed for a Pulsar client to connect to a Pulsar cluster. Example: pulsar://localhost:6650
URL for multiple brokers: pulsar://localhost:6550,localhost:6651,localhost:6652
Production clusters are usually accessed via domains, such as pulsar://pulsar.us-west.example.com:6650.
URL with TLS authentication enabled: pulsar+ssl://pulsar.us-west.example.com:6651
topics
Yes
-
String
The names of the Apache Pulsar topics from/to which data is read/written.
This option can be one topic name or multiple topic names separated by ";", such as topic-1;topic-2.
You can specify a set of topics or partitions or both, such as topic-a-partition-0;topic-a-partition-2;some-topic2.
If both a topic and its partition are specified, only the topic will be used. For example, some-topic1;some-topic1-partition-0 is equivalent to some-topic1.
pulsar.client.authPluginClassName
No
-
String
The authentication plugin class name. For token authentication, use org.apache.pulsar.client.impl.auth.AuthenticationToken.
pulsar.client.authParams
No
-
String
The authentication parameters. The format for token authentication is token:xxxx.
explicit
No
true
Boolean
Whether the table is an explicit Flink table, which is used by PulsarCatalog. For details, see the PulsarCatalog introduction below.
key.fields
No
-
List<String>
The physical fields in the Flink table corresponding to the key fields in Pulsar messages. Note that these fields are unrelated to primary keys.
key.format
No
-
String
The format used to deserialize and serialize the key part of Pulsar messages.
Valid values include 'csv', 'json', and 'avro'. To learn more, see Formats.
format
No
-
String
The format used to deserialize and serialize the value part of Pulsar messages.
Valid values include 'csv', 'json', and 'avro'. To learn more, see Formats.
Between format and value.format, you need to specify at least one. If you specify both, format will be applied.
value.format
No
-
String
The format used to deserialize and serialize the value part of Pulsar messages.
Valid values include 'csv', 'json', and 'avro'. To learn more, see Formats.
Between format and value.format, you need to specify at least one. If you specify both, format will be applied.
sink.topic-routing-mode
No
round-robin
Enum
The topic routing policy. Valid values include round-robin and message-key-hash. The default value is round-robin. You can also configure a custom routing policy using the sink.custom-topic-router option.
sink.custom-topic-router
No
-
String
The full class name for the custom topic routing policy. If you specify this option, do not set sink.topic-routing-mode.
sink.message-delay-interval
No
0
Duration
The delay time for sending messages, such as 10ms, 1s, or 1min. This allows you to delay the consumption of messages. For details, see the Pulsar document Delayed message delivery.
pulsar.sink.deliveryGuarantee
No
none
Enum
The message delivery guarantee for the Pulsar sink. Valid values include none, at-least-once, and exactly-once. To use exactly-once, your Pulsar cluster must support transactions.
pulsar.sink.transactionTimeoutMillis
No
10800000
Long
The Pulsar transaction timeout period (milliseconds), which must be longer than the checkpoint interval. The default value is 10800000 (3 hours).
pulsar.producer.batchingEnabled
No
false
Boolean
Whether to enable batch write.
pulsar.producer.batchingMaxMessages
No
1000
Int
The maximum number of Pulsar messages that can be written at a time.
source.start.message-id
No
-
String
The start of source consumption. It can be set to earliest, latest, or a specific message ID in the format ledgerId:entryId:partitionId (e.g., "12:2:-1").
source.start.publish-time
No
-
Long
The publishing time (Unix timestamp) of the starting message of source consumption.
source.subscription-name
No
flink-sql-connector-pulsar-<RANDOM>
String
The Pulsar subscription name. The default value is flink-sql-connector-pulsar-<RANDOM>, where RANDOM is five random letters.
source.subscription-type
No
Exclusive
Enum
The Pulsar subscription type. Valid values include Exclusive and Shared. For more information about subscription types, see Subscription types.
source.stop.at-message-id
No
-
String
The end of source consumption. It can be set to earliest, latest, or a specific message ID in the format ledgerId:entryId:partitionId (e.g., "12:2:-1").
source.stop.at-publish-time
No
-
Long
The publishing time (Unix timestamp) of the ending message of source consumption.
source.stop.after-message-id
No
-
String
The ID of the ending message of source consumption in the format ledgerId:entryId:partitionId (e.g., "12:2:-1"). The ending message will be consumed as well.
pulsar.source.partitionDiscoveryIntervalMs
No
30000
Long
The interval (milliseconds) at which the Pulsar source checks for new partitions. If this is 0 or a negative value, partition detection will be disabled.
pulsar.admin.requestRetries
No
5
Int
The number of retries in case of failure to call Pulsar admin RESTful APIs.
pulsar.client.*
No
-
-
An arbitrary Pulsar client parameter.
pulsar.admin.*
No
-
-
An arbitrary Pulsar admin parameter.
pulsar.sink..*
No
-
-
An arbitrary Pulsar sink parameter.
pulsar.producer.*
No
-
-
An arbitrary Pulsar producer API parameter.
pulsar.source..*
No
-
-
An arbitrary Pulsar source parameter.
pulsar.consumer.*
No
-
-
An arbitrary Pulsar consumer API parameter.

Available metadata

Metadata Key
Data Type
R/W
Description
topic
STRING NOT NULL
R
The topic name of a Pulsar message.
message_size
INT NOT NULL
R
The Pulsar message size.
producer_name
STRING NOT NULL
R
The producer name of a Pulsar message.
message_id
BYTES NOT NULL
R
The ID of a Pulsar message.
sequenceId
BIGINT NOT NULL
R
The sequence ID of a Pulsar message.
publish_time
TIMESTAMP_LTZ(3) NOT NULL
R
The publishing time of a Pulsar message.
event_time
TIMESTAMP_LTZ(3) NOT NULL
R/W
The properties of a Pulsar message.
properties
MAP<STRING, STRING> NOT NULL
R/W
The event time of a Pulsar message.
Note
R/W determines metadata access, which can be read only or write. Read-only columns should be excluded in VIRTUAL and INSERT INTO operations.
For a list of the fields in Pulsar messages, see the Pulsar document Messages.

Data type mappings

Pulsar Schema
Flink Format
AVRO
avro
JSON
json
PROTOBUF
Not supported yet
PROTOBUF_NATIVE
Not supported yet
AUTO_CONSUME
Not supported yet
AUTO_PUBLISH
Not supported yet
NONE/BYTES
raw
BOOLEAN
raw
STRING
raw
DOUBLE
raw
FLOAT
raw
INT8
raw
INT16
raw
INT32
raw
INT64
raw
LOCAL_DATE
Not supported yet
LOCAL_TIME
Not supported yet
LOCAL_DATE_TIME
Not supported yet

PulsarCatalog

PulsarCatalog can store Pulsar clusters as metadata of Flink tables.

Explicit and native tables

PulsarCatalog defines two types of tables: explicit tables and native tables.
Explicit tables are tables created explicitly by the CREATE statement or using a table API. They work similarly to the tables in other SQL connectors. You can create an explicit table and read from or write to the table.
Native tables are created automatically by PulsarCatalog. PulsarCatalog scans all the non-system topics in a Pulsar cluster and converts each topic into a Flink table. Such tables are not created using the CREATE statement.

Explicit tables

PulsarCatalog uses the schemaInfo field in the schema of a topic to store the metadata of an explicit table. For each explicit table, PulsarCatalog creates a place-holding topic. You can specify the tenant of the topic using the catalog-tenant option. The default tenant is __flink_catalog. Your Flink database maps to a namespace with the same name under this tenant. Then a topic named table_<FLINK_TABLE_NAME> is created, whose schema stores the metadata of the Flink table.
For example, if you create a database testdb and a Flink table users, PulsarCatalog will create a topic table_users in the namespace testdb under the tenant __flink_catalog.
The topic table_users is a place-holding topic because it doesn't have any producers or consumers. You can use the schema of this topic to store the metadata of the Flink table.
To get the metadata of a topic, you can use the Pulsar admin command line tool:
pulsar-admin schemas get persistent://<tenant>/<namespace>/<topic>

Native tables

‌Native tables do not have place-holding topics. PulsarCatalog maps topic schema to Flink table schema. For more information about Pulsar schema, see the Pulsar document Understand schema.
Pulsar Schema
Flink Data Type
Flink Format
Work
AVRO
It is decided by the Avro format.
avro
Yes
JSON
It is decided by the JSON format.
json
Yes
PROTOBUF
Not supported yet
/
No
PROTOBUF_NATIVE
It is decided by the Protobuf definition.
Not supported yet
No
AUTO_CONSUME
Not supported yet
/
No
AUTO_PUBLISH
Not supported yet
/
No
NONE/BYTES
DataTypes.BYTES()
raw
Yes
BOOLEAN
DataTypes.BOOLEAN()
raw
Yes
LOCAL_DATE
DataTypes.DATE()
/
No
LOCAL_TIME
DataTypes.TIME()
/
No
LOCAL_DATE_TIME
DataTypes.TIMESTAMP(3)
/
No
STRING
DataTypes.STRING()
raw
Yes
DOUBLE
DataTypes.DOUBLE()
raw
Yes
FLOAT
DataTypes.FLOAT()
raw
Yes
INT8
DataTypes.TINYINT()
raw
Yes
INT16
DataTypes.SMALLINT()
raw
Yes
INT32
DataTypes.INT()
raw
Yes
INT64
DataTypes.BIGINT()
raw
Yes
Note
Although the Pulsar schema types LOCAL_DATE and LOCAL_TIME have corresponding Flink data types, Flink cannot parse data based on the two schema types, and automatic schema mapping will fail.

Explicit table versus native table

With native tables, you can read data from existing Pulsar topics. PulsarCatalog automatically reads the schema of a topic and determines the format to use for decoding/encoding. However, native tables do not support watermark or primary keys. Therefore, with native tables, you cannot perform data aggregation based on time windows. A native table maps tenant/namespace to the Flink database and the topic name to the Flink table name.
If you want full control of a table, create an explicit table, define the watermark, and specify the metadata fields and custom formats. It's similar to creating a Pulsar table in GenericInMemoryCatalog. You can bind an explicit table to a Pulsar topic. Each Pulsar topic can be bound with multiple Flink tables.

PulsarCatalog parameters

Key
Default
Type
Description
Required
catalog-admin-url
"http://localhost:8080&quot;
String
The Pulsar admin URL, such as http://my-broker.example.com:8080 or https://my-broker.example.com:8443.
Yes
catalog-auth-params
-
String
The authentication parameters for accessing the Pulsar cluster.
-
catalog-auth-plugin
-
String
The name of the authentication plugin for accessing the Pulsar cluster.
-
catalog-service-url
"pulsar://localhost:6650"
String
The URL of the Pulsar server.
A Pulsar protocol URL is needed for a Pulsar client to connect to a Pulsar cluster. Example: pulsar://localhost:6650
URL for multiple brokers: pulsar://localhost:6550,localhost:6651,localhost:6652
Production clusters are usually accessed via domains, such as pulsar://pulsar.us-west.example.com:6650.
URL with TLS authentication enabled: pulsar+ssl://pulsar.us-west.example.com:6651
Yes
catalog-tenant
"__flink_catalog"
String
The Pulsar tenant that stores table information.
-
default-database
"default_database"
String
The default database of PulsarCatalog. If a database with this name does not exist, one will be created automatically.
-

PulsarCatalog example

CREATE CATALOG pulsar WITH (
'type' = 'pulsar-catalog',
-- 'catalog-auth-plugin' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',
-- 'catalog-auth-params' = 'token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJteS1zdXBlci11c2VyIn0.T-Bi__yGCs1lxEwdUcDKBDuJsWLO0X9LrePhnuxxxxx',
'catalog-admin-url' = '<ADMIN_URL>',
'catalog-service-url' = '<SERVICE_URL>'
);

Full example

Pulsar source and sink

The example below shows how to create a Pulsar source and a Pulsar sink that guarantee exactly-once, with a transaction timeout period of 2 minutes (note that the transaction timeout period must be longer than the checkpoint interval).
CREATE TABLE `pulsar_source` (
`user_id` bigint,
`item_id` bigint,
`behavior` STRING
) WITH (
'connector' = 'pulsar',
'service-url' = 'pulsar://pulsar:6650',
'admin-url' = 'http://pulsar:8080',
-- 'pulsar.client.authPluginClassName' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',
-- 'pulsar.client.authParams' = 'token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0In0.Cpsuc4hRw3yL231Kp8iQAQkYc2SvVTfeziAMcLxxxxx',
'topics' = 'topic_source',
'format' = 'json',
'source.subscription-name' = 'flink',
'source.start.message-id' = 'earliest'
);

CREATE TABLE `pulsar_sink` (
`user_id` bigint,
`item_id` bigint,
`behavior` STRING
) WITH (
'connector' = 'pulsar',
'service-url' = 'pulsar://pulsar:6650',
'admin-url' = 'http://pulsar:8080',
-- 'pulsar.client.authPluginClassName' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',
-- 'pulsar.client.authParams' = 'token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0In0.Cpsuc4hRw3yL231Kp8iQAQkYc2SvVTfeziAMcLxxxxx',
'topics' = 'topic_sink',
'format' = 'json',
'pulsar.sink.deliveryGuarantee' = 'exactly-once',
'pulsar.sink.transactionTimeoutMillis' = '120000'
);

INSERT INTO `pulsar_sink` SELECT * FROM `pulsar_source`;

PulsarCatalog example

Explicit table

CREATE CATALOG `pulsar` WITH (
'type' = 'pulsar-catalog',
-- 'catalog-auth-plugin' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',
-- 'catalog-auth-params' = 'token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJteS1zdXBlci11c2VyIn0.T-Bi__yGCs1lxEwdUcDKBDuJsWLO0X9LrePhnuxxxxx',
'catalog-admin-url' = 'http://pulsar:8080',
'catalog-service-url' = 'pulsar://pulsar:6650'
);
INSERT INTO `pulsar`.`default_database`.`pulsar_sink` SELECT * FROM `pulsar`.`default_database`.`pulsar_source`;
The pulsar_source and pulsar_sink tables in the above example are created using the following statements (which can be put in the same SQL job).
CREATE TABLE IF NOT EXISTS `pulsar`.`default_database`.`pulsar_source` (
`user_id` bigint,
`item_id` bigint,
`behavior` STRING
) WITH (
'connector' = 'pulsar',
'service-url' = 'pulsar://pulsar:6650',
'admin-url' = 'http://pulsar:8080',
-- 'pulsar.client.authPluginClassName' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',
-- 'pulsar.client.authParams' = 'token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0In0.Cpsuc4hRw3yL231Kp8iQAQkYc2SvVTfeziAMcLxxxxx',
'topics' = 'topic_source',
'format' = 'json',
'source.subscription-name' = 'flink',
'source.start.message-id' = 'earliest'
);

CREATE TABLE IF NOT EXISTS `pulsar`.`default_database`.`pulsar_sink` (
`user_id` bigint,
`item_id` bigint,
`behavior` STRING
) WITH (
'connector' = 'pulsar',
'service-url' = 'pulsar://pulsar:6650',
'admin-url' = 'http://pulsar:8080',
-- 'pulsar.client.authPluginClassName' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',
-- 'pulsar.client.authParams' = 'token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0In0.Cpsuc4hRw3yL231Kp8iQAQkYc2SvVTfeziAMcLxxxxx',
'topics' = 'topic_sink',
'format' = 'json',
'pulsar.sink.deliveryGuarantee' = 'exactly-once',
'pulsar.sink.transactionTimeoutMillis' = '120000'
);

Native table

1. Prepare a JSON file of the topic schema. Name it "schema.json".
{
"schema": "{\"type\":\"record\",\"name\":\"userBehavior\",\"namespace\":\"my.example\",\"fields\":[{\"name\":\"user_id\",\"type\":\"long\"},{\"name\":\"item_id\",\"type\":\"long\"},{\"name\":\"behavior\",\"type\":\"string\"}]}",
"type": "JSON",
"properties": {}
}
2. Use the Pulsar admin command line tool to configure the topic schema.
# Configure the schema
bin/pulsar-admin schemas upload -f ./schema.json topic_source
bin/pulsar-admin schemas upload -f ./schema.json topic_sink

# Check the schema
bin/pulsar-admin schemas get topic_source
bin/pulsar-admin schemas get topic_sink
3. Below is an example of a job. The format of the Flink table's database public/default is tenant/namespace, which is the default Pulsar cluster.
CREATE CATALOG `pulsar` WITH (
'type' = 'pulsar-catalog',
-- 'catalog-auth-plugin' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',
-- 'catalog-auth-params' = 'token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJteS1zdXBlci11c2VyIn0.T-Bi__yGCs1lxEwdUcDKBDuJsWLO0X9LrePhnuxxxxx',
'catalog-admin-url' = 'http://pulsar:8080',
'catalog-service-url' = 'pulsar://pulsar:6650'
);

INSERT INTO `pulsar`.`public/default`.`topic_sink` SELECT * FROM `pulsar`.`public/default`.`topic_source`;

FAQs

What should I do if an error occurs saying that I haven't enabled transactions?

java.lang.NullPointerException: You haven't enable transaction in Pulsar client.
To enable transactions, see How to use transactions?.

Messages are written to a Pulsar sink with exactly-once guaranteed by default. After the job restarts due to an error, the topic data cannot be consumed. Why?

Cause: This may be because you have uncommitted transactions before the job restarts, and the OPEN transactions blocked operations to read data written to the topic after the restart. You can use pulsar-admin transactions slow-transactions -t 1s to view transactions in OPEN state. After OPEN transactions are committed or reverted, you will be able to read data written to the topic after the restart. Suggestion: Configure an appropriate transaction timeout period using WITH parameters (the default Pulsar transaction timeout is 3 hours). For example, you can use 'pulsar.sink.transactionTimeoutMillis' = '120000' to set the timeout period to 2 minutes. Note that the transaction timeout period must be larger than the checkpoint interval.

Why does the Pulsar source fail to restore data from a checkpoint in the batch messaging scenario?

If the error java.lang.IllegalArgumentException: We only support normal message id currently occurs, it's because batch write is enabled for Pulsar write operations. Currently, Pulsar sources do not support restoring batch-write messages. With Stream Compute Service, batch write is disabled by default for Pulsar sinks.
Caused by: java.lang.IllegalArgumentException: We only support normal message id currently.

What is the relationship between the start of Pulsar source consumption and subscriptions?

If a topic does not have a subscription, a subscription will be created based on the ID of the starting message of source consumption.
If the topic has a subscription, the start of source consumption will be ignored.
If data is not restored from a checkpoint, consumption will start from the subscription cursor. If data is restored from a checkpoint, consumption will start from the message following the message ID recorded by the checkpoint. This is achieved by resetting the subscription cursor (for details, see PulsarOrderedPartitionSplitReader#beforeCreatingConsumer).

Why can't I use the NonDurable subscription mode for the Pulsar source?

PulsarSourceEnumerator#createSubscription creates a Durable subscription first.
If PulsarPartitionSplitReaderBase#createPulsarConsumer then consumes data in the NonDurable mode, the error Durable subscription with the same name already exists will occur.
Option
Required
Default Value
Data Type
Description
pulsar.consumer.subscriptionMode
No
Durable
Enum
The Pulsar subscription mode. Valid values include Durable and NonDurable. In the Durable mode, the cursor is durable, which retains messages and persists the current position. If a broker restarts from a failure, it can recover the cursor from the persistent storage (bookie), so that messages can continue to be consumed from the last consumed position. In the NonDurable mode, once a broker stops, the cursor is lost and can never be recovered, so messages cannot continue to be consumed from the last consumed position. To learn more, see Subscription modes.

About MessageId

See Message Storage and ID Generation Rules. Messages IDs can be compared, for example, 174:1:0 > 174:1:-1.

Why does the Pulsar source fail to consume data according to publish-time?

Cause: If the broker connected does not provide namespace information of the topic, the RESTful API getting the message ID according to publish-time will return HTTP 307 Temporary Redirect, and the Pulsar client API used in the Flink connector will return HTTP 500 Server Error. The job will fail to start. You can use the RESTful API get-message-by-id to view the error.
## 1662480195714 is a publishing time accurate to the millisecond.
curl http://${adminUrl}:8080/admin/v2/persistent/public/default/${topic}/messageid/1662480195714
Note
Because the RESTful API querying message IDs according to publish-time is not used, you can specify source.stop.at-publish-time.
Suggestion: You can try increasing pulsar.admin.requestRetries (the number of retries for RESTful APIs, which is 5 by default) to avoid this issue.

Why doesn't the job stop at the configured ending position for Pulsar source consumption?

Solution: Disable automatic partition detection using 'pulsar.source.partitionDiscoveryIntervalMs' ='0'.
Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback

Contact Us

Contact our sales team or business advisors to help your business.

Technical Support

Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

7x24 Phone Support
Hong Kong, China
+852 800 906 020 (Toll Free)
United States
+1 844 606 0804 (Toll Free)
United Kingdom
+44 808 196 4551 (Toll Free)
Canada
+1 888 605 7930 (Toll Free)
Australia
+61 1300 986 386 (Toll Free)
EdgeOne hotline
+852 300 80699
More local hotlines coming soon