Flink Version | Description | Source Code |
1.11 | Unsupported | - |
1.13 | Supported | |
1.14 | Supported | |
1.16 | Supported | |
CREATE TABLE PulsarTable (`user_id` bigint,`item_id` bigint,`behavior` STRING,`publish_time` TIMESTAMP_LTZ(3) METADATA FROM 'publish_time' VIRTUAL) WITH ('connector' = 'pulsar','service-url' = 'pulsar://pulsar:6650','admin-url' = 'http://pulsar:8080',-- 'pulsar.client.authPluginClassName' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',-- 'pulsar.client.authParams' = 'token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0In0.Cpsuc4hRw3yL231Kp8iQAQkYc2SvVTfeziAMcLxxxxx','topics' = 'user_behavior','format' = 'json','source.subscription-name' = 'flink','source.start.message-id' = 'earliest');
# Checking admin permissionsadmin_url=http://172.28.28.46:8080,172.28.28.29:8080,172.28.28.105:8080token=eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJteS1zdXBlci11c2VyIn0.T-Bi__yGCs1lxEwdUcDKBDuJsWLO0X9LrePhnuXXXXXnamespace=public/defaultpulsar-admin --admin-url ${admin_url} --auth-params token:${token} --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken topics list ${namespace}# Checking topic read and write permissionsservice_url=pulsar://172.28.28.46:6650,172.28.28.29:6650,172.28.28.105:6650token=eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJteS1zdXBlci11c2VyIn0.T-Bi__yGCs1lxEwdUcDKBDuJsWLO0X9LrePhnuXXXXXnamespace=public/defaulttopic=xxxsubscription=yyypulsar-client --url ${service_url} --auth-params token:${token} --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken consume -s ${subscription} -n 10 persistent://${namespace}/${topic} -p Earliest
Option | Required | Default Value | Data Type | Description |
connector | Yes | - | String | The connector to use. For Apache Pulsar, use 'pulsar' or 'upsert-pulsar' . |
admin-url | Yes | - | String | The Pulsar admin URL, such as http://my-broker.example.com:8080 or https://my-broker.example.com:8443 . |
service-url | Yes | - | String | The URL of the Pulsar server. A Pulsar protocol URL is needed for a Pulsar client to connect to a Pulsar cluster. Example: pulsar://localhost:6650 URL for multiple brokers: pulsar://localhost:6550,localhost:6651,localhost:6652 Production clusters are usually accessed via domains, such as pulsar://pulsar.us-west.example.com:6650 .URL with TLS authentication enabled: pulsar+ssl://pulsar.us-west.example.com:6651 |
topics | Yes | - | String | The names of the Apache Pulsar topics from/to which data is read/written. This option can be one topic name or multiple topic names separated by ";", such as topic-1;topic-2 .You can specify a set of topics or partitions or both, such as topic-a-partition-0;topic-a-partition-2;some-topic2 .If both a topic and its partition are specified, only the topic will be used. For example, some-topic1;some-topic1-partition-0 is equivalent to some-topic1 . |
pulsar.client.authPluginClassName | No | - | String | The authentication plugin class name. For token authentication, use org.apache.pulsar.client.impl.auth.AuthenticationToken . |
pulsar.client.authParams | No | - | String | The authentication parameters. The format for token authentication is token:xxxx . |
explicit | No | true | Boolean | Whether the table is an explicit Flink table, which is used by PulsarCatalog. For details, see the PulsarCatalog introduction below. |
key.fields | No | - | List<String> | The physical fields in the Flink table corresponding to the key fields in Pulsar messages. Note that these fields are unrelated to primary keys. |
key.format | No | - | String | The format used to deserialize and serialize the key part of Pulsar messages. |
format | No | - | String | The format used to deserialize and serialize the value part of Pulsar messages. Between format and value.format , you need to specify at least one. If you specify both, format will be applied. |
value.format | No | - | String | The format used to deserialize and serialize the value part of Pulsar messages. Between format and value.format , you need to specify at least one. If you specify both, format will be applied. |
sink.topic-routing-mode | No | round-robin | Enum | The topic routing policy. Valid values include round-robin and message-key-hash . The default value is round-robin . You can also configure a custom routing policy using the sink.custom-topic-router option. |
sink.custom-topic-router | No | - | String | The full class name for the custom topic routing policy. If you specify this option, do not set sink.topic-routing-mode . |
sink.message-delay-interval | No | 0 | Duration | The delay time for sending messages, such as 10ms , 1s , or 1min . This allows you to delay the consumption of messages. For details, see the Pulsar document Delayed message delivery. |
pulsar.sink.deliveryGuarantee | No | none | Enum | The message delivery guarantee for the Pulsar sink. Valid values include none , at-least-once , and exactly-once . To use exactly-once , your Pulsar cluster must support transactions. |
pulsar.sink.transactionTimeoutMillis | No | 10800000 | Long | The Pulsar transaction timeout period (milliseconds), which must be longer than the checkpoint interval. The default value is 10800000 (3 hours). |
pulsar.producer.batchingEnabled | No | false | Boolean | Whether to enable batch write. |
pulsar.producer.batchingMaxMessages | No | 1000 | Int | The maximum number of Pulsar messages that can be written at a time. |
source.start.message-id | No | - | String | The start of source consumption. It can be set to earliest , latest , or a specific message ID in the format ledgerId:entryId:partitionId (e.g., "12:2:-1"). |
source.start.publish-time | No | - | Long | The publishing time (Unix timestamp) of the starting message of source consumption. |
source.subscription-name | No | flink-sql-connector-pulsar-<RANDOM> | String | The Pulsar subscription name. The default value is flink-sql-connector-pulsar-<RANDOM> , where RANDOM is five random letters. |
source.subscription-type | No | Exclusive | Enum | The Pulsar subscription type. Valid values include Exclusive and Shared . For more information about subscription types, see Subscription types. |
source.stop.at-message-id | No | - | String | The end of source consumption. It can be set to earliest , latest , or a specific message ID in the format ledgerId:entryId:partitionId (e.g., "12:2:-1"). |
source.stop.at-publish-time | No | - | Long | The publishing time (Unix timestamp) of the ending message of source consumption. |
source.stop.after-message-id | No | - | String | The ID of the ending message of source consumption in the format ledgerId:entryId:partitionId (e.g., "12:2:-1"). The ending message will be consumed as well. |
pulsar.source.partitionDiscoveryIntervalMs | No | 30000 | Long | The interval (milliseconds) at which the Pulsar source checks for new partitions. If this is 0 or a negative value, partition detection will be disabled. |
pulsar.admin.requestRetries | No | 5 | Int | The number of retries in case of failure to call Pulsar admin RESTful APIs. |
pulsar.client.* | No | - | - | An arbitrary Pulsar client parameter. |
pulsar.admin.* | No | - | - | An arbitrary Pulsar admin parameter. |
pulsar.sink..* | No | - | - | An arbitrary Pulsar sink parameter. |
pulsar.producer.* | No | - | - | An arbitrary Pulsar producer API parameter. |
pulsar.source..* | No | - | - | An arbitrary Pulsar source parameter. |
pulsar.consumer.* | No | - | - | An arbitrary Pulsar consumer API parameter. |
Metadata Key | Data Type | R/W | Description |
topic | STRING NOT NULL | R | The topic name of a Pulsar message. |
message_size | INT NOT NULL | R | The Pulsar message size. |
producer_name | STRING NOT NULL | R | The producer name of a Pulsar message. |
message_id | BYTES NOT NULL | R | The ID of a Pulsar message. |
sequenceId | BIGINT NOT NULL | R | The sequence ID of a Pulsar message. |
publish_time | TIMESTAMP_LTZ(3) NOT NULL | R | The publishing time of a Pulsar message. |
event_time | TIMESTAMP_LTZ(3) NOT NULL | R/W | The properties of a Pulsar message. |
properties | MAP<STRING, STRING> NOT NULL | R/W | The event time of a Pulsar message. |
VIRTUAL
and INSERT INTO
operations.Pulsar Schema | Flink Format |
AVRO | avro |
JSON | json |
PROTOBUF | Not supported yet |
PROTOBUF_NATIVE | Not supported yet |
AUTO_CONSUME | Not supported yet |
AUTO_PUBLISH | Not supported yet |
NONE/BYTES | raw |
BOOLEAN | raw |
STRING | raw |
DOUBLE | raw |
FLOAT | raw |
INT8 | raw |
INT16 | raw |
INT32 | raw |
INT64 | raw |
LOCAL_DATE | Not supported yet |
LOCAL_TIME | Not supported yet |
LOCAL_DATE_TIME | Not supported yet |
schemaInfo
field in the schema of a topic to store the metadata of an explicit table. For each explicit table, PulsarCatalog creates a place-holding topic. You can specify the tenant of the topic using the catalog-tenant
option. The default tenant is __flink_catalog
. Your Flink database maps to a namespace with the same name under this tenant. Then a topic named table_<FLINK_TABLE_NAME>
is created, whose schema stores the metadata of the Flink table.testdb
and a Flink table users
, PulsarCatalog will create a topic table_users
in the namespace testdb
under the tenant __flink_catalog
.table_users
is a place-holding topic because it doesn't have any producers or consumers. You can use the schema of this topic to store the metadata of the Flink table.pulsar-admin schemas get persistent://<tenant>/<namespace>/<topic>
Pulsar Schema | Flink Data Type | Flink Format | Work |
AVRO | It is decided by the Avro format. | avro | Yes |
JSON | It is decided by the JSON format. | json | Yes |
PROTOBUF | Not supported yet | / | No |
PROTOBUF_NATIVE | It is decided by the Protobuf definition. | Not supported yet | No |
AUTO_CONSUME | Not supported yet | / | No |
AUTO_PUBLISH | Not supported yet | / | No |
NONE/BYTES | DataTypes.BYTES() | raw | Yes |
BOOLEAN | DataTypes.BOOLEAN() | raw | Yes |
LOCAL_DATE | DataTypes.DATE() | / | No |
LOCAL_TIME | DataTypes.TIME() | / | No |
LOCAL_DATE_TIME | DataTypes.TIMESTAMP(3) | / | No |
STRING | DataTypes.STRING() | raw | Yes |
DOUBLE | DataTypes.DOUBLE() | raw | Yes |
FLOAT | DataTypes.FLOAT() | raw | Yes |
INT8 | DataTypes.TINYINT() | raw | Yes |
INT16 | DataTypes.SMALLINT() | raw | Yes |
INT32 | DataTypes.INT() | raw | Yes |
INT64 | DataTypes.BIGINT() | raw | Yes |
LOCAL_DATE
and LOCAL_TIME
have corresponding Flink data types, Flink cannot parse data based on the two schema types, and automatic schema mapping will fail.tenant/namespace
to the Flink database and the topic name to the Flink table name.GenericInMemoryCatalog
. You can bind an explicit table to a Pulsar topic. Each Pulsar topic can be bound with multiple Flink tables.Key | Default | Type | Description | Required |
catalog-admin-url | "http://localhost:8080" | String | The Pulsar admin URL, such as http://my-broker.example.com:8080 or https://my-broker.example.com:8443 . | Yes |
catalog-auth-params | - | String | The authentication parameters for accessing the Pulsar cluster. | - |
catalog-auth-plugin | - | String | The name of the authentication plugin for accessing the Pulsar cluster. | - |
catalog-service-url | "pulsar://localhost:6650" | String | The URL of the Pulsar server. A Pulsar protocol URL is needed for a Pulsar client to connect to a Pulsar cluster. Example: pulsar://localhost:6650 URL for multiple brokers: pulsar://localhost:6550,localhost:6651,localhost:6652 Production clusters are usually accessed via domains, such as pulsar://pulsar.us-west.example.com:6650 .URL with TLS authentication enabled: pulsar+ssl://pulsar.us-west.example.com:6651 | Yes |
catalog-tenant | "__flink_catalog" | String | The Pulsar tenant that stores table information. | - |
default-database | "default_database" | String | The default database of PulsarCatalog. If a database with this name does not exist, one will be created automatically. | - |
CREATE CATALOG pulsar WITH ('type' = 'pulsar-catalog',-- 'catalog-auth-plugin' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',-- 'catalog-auth-params' = 'token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJteS1zdXBlci11c2VyIn0.T-Bi__yGCs1lxEwdUcDKBDuJsWLO0X9LrePhnuxxxxx','catalog-admin-url' = '<ADMIN_URL>','catalog-service-url' = '<SERVICE_URL>');
CREATE TABLE `pulsar_source` (`user_id` bigint,`item_id` bigint,`behavior` STRING) WITH ('connector' = 'pulsar','service-url' = 'pulsar://pulsar:6650','admin-url' = 'http://pulsar:8080',-- 'pulsar.client.authPluginClassName' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',-- 'pulsar.client.authParams' = 'token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0In0.Cpsuc4hRw3yL231Kp8iQAQkYc2SvVTfeziAMcLxxxxx','topics' = 'topic_source','format' = 'json','source.subscription-name' = 'flink','source.start.message-id' = 'earliest');CREATE TABLE `pulsar_sink` (`user_id` bigint,`item_id` bigint,`behavior` STRING) WITH ('connector' = 'pulsar','service-url' = 'pulsar://pulsar:6650','admin-url' = 'http://pulsar:8080',-- 'pulsar.client.authPluginClassName' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',-- 'pulsar.client.authParams' = 'token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0In0.Cpsuc4hRw3yL231Kp8iQAQkYc2SvVTfeziAMcLxxxxx','topics' = 'topic_sink','format' = 'json','pulsar.sink.deliveryGuarantee' = 'exactly-once','pulsar.sink.transactionTimeoutMillis' = '120000');INSERT INTO `pulsar_sink` SELECT * FROM `pulsar_source`;
CREATE CATALOG `pulsar` WITH ('type' = 'pulsar-catalog',-- 'catalog-auth-plugin' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',-- 'catalog-auth-params' = 'token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJteS1zdXBlci11c2VyIn0.T-Bi__yGCs1lxEwdUcDKBDuJsWLO0X9LrePhnuxxxxx','catalog-admin-url' = 'http://pulsar:8080','catalog-service-url' = 'pulsar://pulsar:6650');INSERT INTO `pulsar`.`default_database`.`pulsar_sink` SELECT * FROM `pulsar`.`default_database`.`pulsar_source`;
pulsar_source
and pulsar_sink
tables in the above example are created using the following statements (which can be put in the same SQL job).CREATE TABLE IF NOT EXISTS `pulsar`.`default_database`.`pulsar_source` (`user_id` bigint,`item_id` bigint,`behavior` STRING) WITH ('connector' = 'pulsar','service-url' = 'pulsar://pulsar:6650','admin-url' = 'http://pulsar:8080',-- 'pulsar.client.authPluginClassName' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',-- 'pulsar.client.authParams' = 'token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0In0.Cpsuc4hRw3yL231Kp8iQAQkYc2SvVTfeziAMcLxxxxx','topics' = 'topic_source','format' = 'json','source.subscription-name' = 'flink','source.start.message-id' = 'earliest');CREATE TABLE IF NOT EXISTS `pulsar`.`default_database`.`pulsar_sink` (`user_id` bigint,`item_id` bigint,`behavior` STRING) WITH ('connector' = 'pulsar','service-url' = 'pulsar://pulsar:6650','admin-url' = 'http://pulsar:8080',-- 'pulsar.client.authPluginClassName' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',-- 'pulsar.client.authParams' = 'token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0In0.Cpsuc4hRw3yL231Kp8iQAQkYc2SvVTfeziAMcLxxxxx','topics' = 'topic_sink','format' = 'json','pulsar.sink.deliveryGuarantee' = 'exactly-once','pulsar.sink.transactionTimeoutMillis' = '120000');
{"schema": "{\\"type\\":\\"record\\",\\"name\\":\\"userBehavior\\",\\"namespace\\":\\"my.example\\",\\"fields\\":[{\\"name\\":\\"user_id\\",\\"type\\":\\"long\\"},{\\"name\\":\\"item_id\\",\\"type\\":\\"long\\"},{\\"name\\":\\"behavior\\",\\"type\\":\\"string\\"}]}","type": "JSON","properties": {}}
# Configure the schemabin/pulsar-admin schemas upload -f ./schema.json topic_sourcebin/pulsar-admin schemas upload -f ./schema.json topic_sink# Check the schemabin/pulsar-admin schemas get topic_sourcebin/pulsar-admin schemas get topic_sink
public/default
is tenant/namespace
, which is the default Pulsar cluster.CREATE CATALOG `pulsar` WITH ('type' = 'pulsar-catalog',-- 'catalog-auth-plugin' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',-- 'catalog-auth-params' = 'token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJteS1zdXBlci11c2VyIn0.T-Bi__yGCs1lxEwdUcDKBDuJsWLO0X9LrePhnuxxxxx','catalog-admin-url' = 'http://pulsar:8080','catalog-service-url' = 'pulsar://pulsar:6650');INSERT INTO `pulsar`.`public/default`.`topic_sink` SELECT * FROM `pulsar`.`public/default`.`topic_source`;
java.lang.NullPointerException: You haven't enable transaction in Pulsar client.
pulsar-admin transactions slow-transactions -t 1s
to view transactions in OPEN state. After OPEN transactions are committed or reverted, you will be able to read data written to the topic after the restart.
Suggestion: Configure an appropriate transaction timeout period using WITH parameters (the default Pulsar transaction timeout is 3 hours). For example, you can use 'pulsar.sink.transactionTimeoutMillis' = '120000'
to set the timeout period to 2 minutes. Note that the transaction timeout period must be larger than the checkpoint interval.java.lang.IllegalArgumentException: We only support normal message id currently
occurs, it's because batch write is enabled for Pulsar write operations. Currently, Pulsar sources do not support restoring batch-write messages. With Stream Compute Service, batch write is disabled by default for Pulsar sinks.Caused by: java.lang.IllegalArgumentException: We only support normal message id currently.
PulsarOrderedPartitionSplitReader#beforeCreatingConsumer
).NonDurable
subscription mode for the Pulsar source?PulsarSourceEnumerator#createSubscription
creates a Durable
subscription first.PulsarPartitionSplitReaderBase#createPulsarConsumer
then consumes data in the NonDurable
mode, the error Durable subscription with the same name already exists
will occur.Option | Required | Default Value | Data Type | Description |
pulsar.consumer.subscriptionMode | No | Durable | Enum | The Pulsar subscription mode. Valid values include Durable and NonDurable . In the Durable mode, the cursor is durable, which retains messages and persists the current position. If a broker restarts from a failure, it can recover the cursor from the persistent storage (bookie), so that messages can continue to be consumed from the last consumed position. In the NonDurable mode, once a broker stops, the cursor is lost and can never be recovered, so messages cannot continue to be consumed from the last consumed position. To learn more, see Subscription modes. |
174:1:0 > 174:1:-1
.publish-time
?publish-time
will return HTTP 307 Temporary Redirect
, and the Pulsar client API used in the Flink connector will return HTTP 500 Server Error
. The job will fail to start. You can use the RESTful API get-message-by-id to view the error.## 1662480195714 is a publishing time accurate to the millisecond.curl http://${adminUrl}:8080/admin/v2/persistent/public/default/${topic}/messageid/1662480195714
publish-time
is not used, you can specify source.stop.at-publish-time
.pulsar.admin.requestRetries
(the number of retries for RESTful APIs, which is 5 by default) to avoid this issue.'pulsar.source.partitionDiscoveryIntervalMs' ='0'
.
Was this page helpful?