Flink 版本 | 说明 | 源码 |
1.11 | 不支持 | - |
1.13 | 支持 | |
1.14 | 支持 | |
1.16 | 支持 | |
CREATE TABLE PulsarTable (`user_id` bigint,`item_id` bigint,`behavior` STRING,`publish_time` TIMESTAMP_LTZ(3) METADATA FROM 'publish_time' VIRTUAL) WITH ('connector' = 'pulsar','service-url' = 'pulsar://pulsar:6650','admin-url' = 'http://pulsar:8080',-- 'pulsar.client.authPluginClassName' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',-- 'pulsar.client.authParams' = 'token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0In0.Cpsuc4hRw3yL231Kp8iQAQkYc2SvVTfeziAMcLxxxxx','topics' = 'user_behavior','format' = 'json','source.subscription-name' = 'flink','source.start.message-id' = 'earliest');
# 验证 admin 权限admin_url=http://172.28.28.46:8080,172.28.28.29:8080,172.28.28.105:8080token=eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJteS1zdXBlci11c2VyIn0.T-Bi__yGCs1lxEwdUcDKBDuJsWLO0X9LrePhnuXXXXXnamespace=public/defaultpulsar-admin --admin-url ${admin_url} --auth-params token:${token} --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken topics list ${namespace}# 验证 topic 读权限service_url=pulsar://172.28.28.46:6650,172.28.28.29:6650,172.28.28.105:6650token=eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJteS1zdXBlci11c2VyIn0.T-Bi__yGCs1lxEwdUcDKBDuJsWLO0X9LrePhnuXXXXXnamespace=public/defaulttopic=xxxsubscription=yyypulsar-client --url ${service_url} --auth-params token:${token} --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken consume -s ${subscription} -n 10 persistent://${namespace}/${topic} -p Earliest
参数值 | 必填 | 默认值 | 数据类型 | 描述 |
connector | 必选 | (none) | String | 指定要使用的连接器,Apache Pulsar 连接器使用: 'pulsar' 、'upsert-pulsar' |
admin-url | 必选 | (none) | String | Pulsar 管理端地址 HTTP 地址,例如 http://my-broker.example.com:8080 或者 https://my-broker.example.com:8443 |
service-url | 必选 | (none) | String | Pulsar 服务端 URL pulsar client 连接 puslar 集群需要 pulsar 协议的 URL。例如: pulsar://localhost:6650 多个 broker 节点的 URL: pulsar://localhost:6550,localhost:6651,localhost:6652 生产环境集群一般以域名方式访问,例如: pulsar://pulsar.us-west.example.com:6650 开启 TLS 验证的 URL: pulsar+ssl://pulsar.us-west.example.com:6651 |
topics | 必选 | (none) | String | 用于读取和写入的 Apache Pulsar 的 topic 名称 可以是单个 topic 名称,也可以是以“;”分割的 topic 列表,例如: topic-1;topic-2 可以指定一组 topic 或者分区或者是两者都有,例如: topic-a-partition-0;topic-a-partition-2;some-topic2 如果同时指定了某个 topic 和其下属的分区,那么将会自动将两者合并,仅使用外层的 topic,例如: some-topic1;some-topic1-partition-0 等价于 some-topic1 |
pulsar.client.authPluginClassName | 可选 | (none) | String | 鉴权插件类名,token 鉴权填写: org.apache.pulsar.client.impl.auth.AuthenticationToken |
pulsar.client.authParams | 可选 | (none) | String | 鉴权参数,token 鉴权填写格式为: token:xxxx |
explicit | 可选 | true | Boolean | 是否为 explicit flink 表,用于 pulsar catalog。参考 pulsar catalog 介绍 |
key.fields | 可选 | (none) | List<String> | Pulsar 消息中的 key 字段对应的 flink 表物理字段。注意这些字段与 primary key 没有关系 |
key.format | 可选 | (none) | String | 用于对 Pulsar 消息中 key 部分序列化和反序列化的格式。 |
format | 可选 | (none) | String | 用于对 pulsar 消息中 value 部分序列化和反序列化的格式。 选项 format 和 value.format 必选其一,format 具有高优先级 |
value.format | 可选 | (none) | String | 用于对 pulsar 消息中 value 部分序列化和反序列化的格式 选项 format 和 value.format 必选其一,format 具有高优先级 |
sink.topic-routing-mode | 可选 | round-robin | Enum | Topic 路由策略。可用选项为 round-robin 和 message-key-hash 。默认情况下,它设置为 round-robin 。如果要使用自定义 topic 路由策略,请使用 sink.custom-topic-router 选项设置 |
sink.custom-topic-router | 可选 | (none) | String | 自定义 topic 路由策略完整类名,如果设置了该选项,请不要设置 sink.topic-routing-mode |
sink.message-delay-interval | 可选 | 0 | Duration | |
pulsar.sink.deliveryGuarantee | 可选 | none | Enum | Pulsar sink 的数据一致性保证,可选 none 、at-least-once 、exactly-once 。exactly-once 需要 Pulsar 集群支持事务 |
pulsar.sink.transactionTimeoutMillis | 可选 | 10800000 | Long | 单位毫秒,Pulsar 事务超时时间,默认为 3 小时(10800000 毫秒),注意必须保证事务超时时间大于 checkpoint 时间间隔 |
pulsar.producer.batchingEnabled | 可选 | false | Boolean | 是否开启分批写 |
pulsar.producer.batchingMaxMessages | 可选 | 1000 | Int | Pulsar 批次写入最大消息条数 |
source.start.message-id | 可选 | (none) | String | 指定 source 消费起始位点,可选值为 earliest , latest 或消息 ID (格式 ledgerId:entryId:partitionId , 例如 "12:2:-1") |
source.start.publish-time | 可选 | (none) | Long | 指定 source 消费起始点的发布时间戳(unix 毫秒级时间戳) |
source.subscription-name | 可选 | flink-sql-connector-pulsar-<RANDOM> | String | Pulsar 消息订阅名字。默认为 flink-sql-connector-pulsar-<RANDOM> ,其中 RANDOM 为长度为 5 的随机字母 |
source.subscription-type | 可选 | Exclusive | Enum | |
source.stop.at-message-id | 可选 | (none) | String | 指定 source 消费结束位点,可选值 never , latest 或消息 ID (格式 ledgerId:entryId:partitionId , 例如 "12:2:-1") |
source.stop.at-publish-time | 可选 | (none) | Long | 指定 source 消费结束点的发布时间戳(unix 毫秒级时间戳) |
source.stop.after-message-id | 可选 | (none) | String | 指定 source 消费结束位点的消息 ID(包含该消息),格式 ledgerId:entryId:partitionId , 例如 "12:2:-1" |
pulsar.source.partitionDiscoveryIntervalMs | 可选 | 30000 | Long | 单位毫秒,pulsar source 自动探测新增 partition 的时间间隔。设置为 0 或者负值禁用 partition 探测 |
pulsar.admin.requestRetries | 可选 | 5 | Int | Pulsar admin Rest API 调用失败重试次数 |
pulsar.client.* | 可选 | (none) | - | 该选项可以传递任意的的 pulsar client 参数 |
pulsar.admin.* | 可选 | (none) | - | 该选项可以传递任意的 pulsar 管理端参数 |
pulsar.sink..* | 可选 | (none) | - | 该选项可以传递其他的 pulsar sink connector 参数 |
pulsar.producer.* | 可选 | (none) | - | 该选项可以传递任意的 pulsar producer API 配置参数 |
pulsar.source..* | 可选 | (none) | - | 该选项可以传递其他的 pulsar source connector 参数 |
pulsar.consumer.* | 可选 | (none) | - | 该选项可以传递任意的 pulsar consumer API 配置参数 |
元数据 Key | 数据类型 | R/W | 描述 |
topic | STRING NOT NULL | R | Pulsar 消息的 Topic name 字段。 |
message_size | INT NOT NULL | R | Pulsar 消息大小。 |
producer_name | STRING NOT NULL | R | Pulsar 消息的 Producer name 字段。 |
message_id | BYTES NOT NULL | R | Pulsar 消息的 Message ID 字段。 |
sequenceId | BIGINT NOT NULL | R | Pulsar 消息的 Sequence ID 字段。 |
publish_time | TIMESTAMP_LTZ(3) NOT NULL | R | Pulsar 消息的 Publish time 字段。 |
event_time | TIMESTAMP_LTZ(3) NOT NULL | R/W | Pulsar 消息的 Properties 字段。 |
properties | MAP<STRING, STRING> NOT NULL | R/W | Pulsar 消息的 Event time 字段。 |
R/W
列定义了一个元数据是可读的(R
)还是可写的(W
)。 只读列必须声明为 VIRTUAL
以在 INSERT INTO
操作中排除它们。Pulsar schema | Flink forma |
AVRO | avro |
JSON | json |
PROTOBUF | Not supported yet |
PROTOBUF_NATIVE | Not supported yet |
AUTO_CONSUME | Not supported yet |
AUTO_PUBLISH | Not supported yet |
NONE/BYTES | raw |
BOOLEAN | raw |
STRING | raw |
DOUBLE | raw |
FLOAT | raw |
INT8 | raw |
INT16 | raw |
INT32 | raw |
INT64 | raw |
LOCAL_DATE | Not supported yet |
LOCAL_TIME | Not supported yet |
LOCAL_DATE_TIME | Not supported yet |
explicit
表 and native
tables。explicit
表是使用 CREATE 语句或 table API 显式创建的表,它类似于其他 SQL connector 中的常用模式。您可以创建表,然后从表中查询数据或向表中写入数据。native
表由 PulsarCatalog 自动创建。PulsarCatalog 扫描 Pulsar 集群中的所有非系统主题,然后将每个 topic 映射成 Flink 表,而不使用 CREATE 语句。schemaInfo
字段存储 explicit
表的元数据信息。对于每个 explicit
表,PulsarCatalog 在默认为__flink_catalog
的 tenant 下创建一个占位 topic。可通过 catalog-tenant
选项设置 tenant。Flink 的 database 映射为该 tenant 下同样名字的 namespace。然后创建一个名为 table_<FLINK_TABLE_NAME>
的 topic,该 topic 的 schema 存储了 Flink 表的 schema 元数据信息。testdb
,表为 users
的 flink 表,那么 PulsarCatalog 创建 tenant 为 __flink_catalog
,namespace 为 testdb
的 topic table_users
。table_users
只所以称之为占位 topic 是因为它没有任何 producer 或者 consumer,您可以使用占位 topic 的 schema 来存储 Flink 表的元数据信息。pulsar-admin
命令行工具来获取 topic 的元数据信息:pulsar-admin schemas get persistent://<tenant>/<namespace>/<topic>
native
表没有任何占位 topic,PulsarCatalog 把 topic 的 schema 映射为 Flink 表的 schema。关于 Pulsar schema,参考 Pulsar 官网文档 Understand schema。Pulsar schema | Flink data type | Flink format | Work or not |
AVRO | It is decided by the Avro format. | avro | Yes |
JSON | It is decided by the JSON format. | json | Yes |
PROTOBUF | Not supported yet | / | No |
PROTOBUF_NATIVE | It is decided by the Protobuf definition. | Not supported yet | No |
AUTO_CONSUME | Not supported yet | / | No |
AUTO_PUBLISH | Not supported yet | / | No |
NONE/BYTES | DataTypes.BYTES() | raw | Yes |
BOOLEAN | DataTypes.BOOLEAN() | raw | Yes |
LOCAL_DATE | DataTypes.DATE() | / | No |
LOCAL_TIME | DataTypes.TIME() | / | No |
LOCAL_DATE_TIME | DataTypes.TIMESTAMP(3) | / | No |
STRING | DataTypes.STRING() | raw | Yes |
DOUBLE | DataTypes.DOUBLE() | raw | Yes |
FLOAT | DataTypes.FLOAT() | raw | Yes |
INT8 | DataTypes.TINYINT() | raw | Yes |
INT16 | DataTypes.SMALLINT() | raw | Yes |
INT32 | DataTypes.INT() | raw | Yes |
INT64 | DataTypes.BIGINT() | raw | Yes |
LOCAL_DATE
, LOCAL_TIME
和 LOCAL_DATE_TIME
有相应的 flink 数据类型,但 flink 基于这几种 Pulsar schema 无法解析数据,因此自动 schema 映射会失败。native
表,您可以从现有 Pulsar topic 中查询数据。PulsarCatalog 自动读取 topic 的 schema,并决定使用哪种 format 来解码/编码。但是,native
表不支持 watermark
和主键,因此,不能使用 native
表进行基于事件时间的窗口聚合。native
表将 tenant/namespace
映射到 flink 的 database,topic 名字映射为 flink 表名。explicit
表定义 watermark
、指定元数据字段和指定自定义格式。其用法类似于在 GenericInMemoryCatalog
中创建 Pulsar 表。您可以将 explicit
表绑定到 Pulsar 的 topic,每个 Pulsar topic 可以绑定到多个 Flink 表(包括 native
表)。Key | Default | Type | Description | Required |
catalog-admin-url | "http://localhost:8080" | String | Pulsar 管理端地址 HTTP 地址,例如 http://my-broker.example.com:8080 或者 https://my-broker.example.com:8443 。 | Yes |
catalog-auth-params | (none) | String | 访问 Pulsar 集群的认证参数。 | - |
catalog-auth-plugin | (none) | String | 访问 Pulsar 集群的认证 plugin 名字。 | - |
catalog-service-url | "pulsar://localhost:6650" | String | Pulsar 服务端 URL pulsar client 连接 puslar 集群需要 pulsar 协议的 URL。例如: pulsar://localhost:6650 多个 broker 节点的 URL: pulsar://localhost:6550,localhost:6651,localhost:6652 生产环境集群一般以域名方式访问,例如: pulsar://pulsar.us-west.example.com:6650 。开启 TLS 验证的 URL: pulsar+ssl://pulsar.us-west.example.com:6651 。 | Yes |
catalog-tenant | "__flink_catalog" | String | 存储表信息的 Pulsar tenant。 | - |
default-database | "default_database" | String | PulsarCatalog 的默认 database,不存在时会自动创建。 | - |
CREATE CATALOG pulsar WITH ('type' = 'pulsar-catalog',-- 'catalog-auth-plugin' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',-- 'catalog-auth-params' = 'token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJteS1zdXBlci11c2VyIn0.T-Bi__yGCs1lxEwdUcDKBDuJsWLO0X9LrePhnuxxxxx','catalog-admin-url' = '<ADMIN_URL>','catalog-service-url' = '<SERVICE_URL>');
exactly-once
的数据一致性保证,事务超时时间 2 分钟(注意必须保证事务超时时间大于 checkpoint 时间间隔)。CREATE TABLE `pulsar_source` (`user_id` bigint,`item_id` bigint,`behavior` STRING) WITH ('connector' = 'pulsar','service-url' = 'pulsar://pulsar:6650','admin-url' = 'http://pulsar:8080',-- 'pulsar.client.authPluginClassName' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',-- 'pulsar.client.authParams' = 'token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0In0.Cpsuc4hRw3yL231Kp8iQAQkYc2SvVTfeziAMcLxxxxx','topics' = 'topic_source','format' = 'json','source.subscription-name' = 'flink','source.start.message-id' = 'earliest');CREATE TABLE `pulsar_sink` (`user_id` bigint,`item_id` bigint,`behavior` STRING) WITH ('connector' = 'pulsar','service-url' = 'pulsar://pulsar:6650','admin-url' = 'http://pulsar:8080',-- 'pulsar.client.authPluginClassName' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',-- 'pulsar.client.authParams' = 'token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0In0.Cpsuc4hRw3yL231Kp8iQAQkYc2SvVTfeziAMcLxxxxx','topics' = 'topic_sink','format' = 'json','pulsar.sink.deliveryGuarantee' = 'exactly-once','pulsar.sink.transactionTimeoutMillis' = '120000');INSERT INTO `pulsar_sink` SELECT * FROM `pulsar_source`;
CREATE CATALOG `pulsar` WITH ('type' = 'pulsar-catalog',-- 'catalog-auth-plugin' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',-- 'catalog-auth-params' = 'token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJteS1zdXBlci11c2VyIn0.T-Bi__yGCs1lxEwdUcDKBDuJsWLO0X9LrePhnuxxxxx','catalog-admin-url' = 'http://pulsar:8080','catalog-service-url' = 'pulsar://pulsar:6650');INSERT INTO `pulsar`.`default_database`.`pulsar_sink` SELECT * FROM `pulsar`.`default_database`.`pulsar_source`;
pulsar_source
和 pulsar_sink
表用下面语句创建(可以放到同一个 SQL 作业里)。CREATE TABLE IF NOT EXISTS `pulsar`.`default_database`.`pulsar_source` (`user_id` bigint,`item_id` bigint,`behavior` STRING) WITH ('connector' = 'pulsar','service-url' = 'pulsar://pulsar:6650','admin-url' = 'http://pulsar:8080',-- 'pulsar.client.authPluginClassName' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',-- 'pulsar.client.authParams' = 'token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0In0.Cpsuc4hRw3yL231Kp8iQAQkYc2SvVTfeziAMcLxxxxx','topics' = 'topic_source','format' = 'json','source.subscription-name' = 'flink','source.start.message-id' = 'earliest');CREATE TABLE IF NOT EXISTS `pulsar`.`default_database`.`pulsar_sink` (`user_id` bigint,`item_id` bigint,`behavior` STRING) WITH ('connector' = 'pulsar','service-url' = 'pulsar://pulsar:6650','admin-url' = 'http://pulsar:8080',-- 'pulsar.client.authPluginClassName' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',-- 'pulsar.client.authParams' = 'token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0In0.Cpsuc4hRw3yL231Kp8iQAQkYc2SvVTfeziAMcLxxxxx','topics' = 'topic_sink','format' = 'json','pulsar.sink.deliveryGuarantee' = 'exactly-once','pulsar.sink.transactionTimeoutMillis' = '120000');
{"schema": "{\\"type\\":\\"record\\",\\"name\\":\\"userBehavior\\",\\"namespace\\":\\"my.example\\",\\"fields\\":[{\\"name\\":\\"user_id\\",\\"type\\":\\"long\\"},{\\"name\\":\\"item_id\\",\\"type\\":\\"long\\"},{\\"name\\":\\"behavior\\",\\"type\\":\\"string\\"}]}","type": "JSON","properties": {}}
# 设置 schemabin/pulsar-admin schemas upload -f ./schema.json topic_sourcebin/pulsar-admin schemas upload -f ./schema.json topic_sink# 检查 schemabin/pulsar-admin schemas get topic_sourcebin/pulsar-admin schemas get topic_sink
public/default
格式为 tenant/namespace
,为 Pulsar 集群的默认值。CREATE CATALOG `pulsar` WITH ('type' = 'pulsar-catalog',-- 'catalog-auth-plugin' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',-- 'catalog-auth-params' = 'token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJteS1zdXBlci11c2VyIn0.T-Bi__yGCs1lxEwdUcDKBDuJsWLO0X9LrePhnuxxxxx','catalog-admin-url' = 'http://pulsar:8080','catalog-service-url' = 'pulsar://pulsar:6650');INSERT INTO `pulsar`.`public/default`.`topic_sink` SELECT * FROM `pulsar`.`public/default`.`topic_source`;
java.lang.NullPointerException: You haven't enable transaction in Pulsar client.
pulsar-admin transactions slow-transactions -t 1s
查看 OPEN 状态的事务。当 OPEN 状态的事务提交或者回滚后,topic 后续写入的数据即可读取。
规避建议:使用 with 参数设置合适的事务超时时间(pulsar 默认为 3 小时),例如 'pulsar.sink.transactionTimeoutMillis' = '120000'
设置事务超时时间为 2 分钟。注意必须保证事务超时时间大于 checkpoint 时间间隔。java.lang.IllegalArgumentException: We only support normal message id currently
,这是因为 Pulsar 写入端开启了批次写,当前 Pulsar source 不支持批量写消息的状态恢复。Oceanus Pulsar sink 默认不开启批量写。Caused by: java.lang.IllegalArgumentException: We only support normal message id currently.
NonDurable
订阅模式Durable
的 subscription。NonDurable
模式消费数据,报错 Durable subscription with the same name already exists
。参数值 | 必填 | 默认值 | 数据类型 | 描述 |
pulsar.consumer.subscriptionMode | 可选 | Durable | Enum | Pulsar 消息订阅模式,可选 Durable 、NonDurable 。Durable 模式下,cursor 是持久的,它保留消息并持久化当前位置。如果 broker 从故障中重新启动,它可以从持久存储(bookie)中恢复 cursor,以便消息可以从上次消费的位置继续消费。NonDurable 模式下,一旦 broker 停止,cursor 将丢失且无法恢复,因此无法从上次消费的位置继续消费。更多信息参考 Subscription modes。 |
HTTP 307 Temporary Redirect
flink connector 中使用的 pulsar-client api 则会返回 HTTP 500 Server Error
错误,导致作业无法启动。可用 get-message-by-id 的 Rest API 接口查看错误情况。## 1662480195714 为毫秒级的消息 publish-timecurl http://${adminUrl}:8080/admin/v2/persistent/public/default/${topic}/messageid/1662480195714
source.stop.at-publish-time
可以使用,因为不涉及到到根据 publish-time 查找消息 ID 的 Rest API 调用。pulsar.admin.requestRetries
,默认 5)来规避该问题。 'pulsar.source.partitionDiscoveryIntervalMs' ='0'
本页内容是否解决了您的问题?