消息队列 CKafka 支持用户转储消息的能力,您可以将 CKafka 消息转储同步转储至消息队列 CKafka,用于 CKafka 集群间的数据同步。
该功能目前依赖云函数(SCF)、消息队列 CKafka 服务。使用时需提前开通云函数 SCF 相关服务及功能。
转储消息队列 CKafka 的方案将使用 SCF 的 CKafka 触发器进行,通过 CKafka 触发器将消息同步至消息队列另一个集群内。
在通用创建流程中,无法直接跨地域或对自建 Kafka 进行转储,需对函数进行相关网络或投递信息设置。跨地域转储操作流程如下:
新建 CKafka 转储模板,并跳转到云函数控制台,投递实例及 Topic 可任意填写。
在函数配置中修改环境变量及所属网络配置。
环境变量配置说明:
kafka_address :Kafka IP 地址
kafka_topic_name: Kafka Topic 名称
说明:
- 如 CKafka 跨地域转储,修改相关环境变量即可,VPC 网络需配置对等连接。
- 如 CVM 自建 Kafka,需修改为与自建 Kafka 相同的 VPC 及 Kafka Topic 信息。
- 如其他自建 Kafka,需修改环境变量的 IP 及 Topic 信息为自建信息;如无专线需用云函数公网传输。
保存相关配置,并开启转储功能可完成转储。
云函数支持 CKafka 的 PLAINTEXT
和 SASL_PLAINTEXT
两种接入方式,可在云函数代码中自行修改。
PLAINTEXT
接入方式:kafka_to_kafka = KafkaToKafka(kafka_address)
SASL_PLAINTEXT
接入方式kafka_to_kafka= KafkaToKafka(
kafka_address
security_protocol = "SASL_PLAINTEXT",
sasl_mechanism="PLAIN",
sasl_plain_username="ckafka-80o10xxx#Taborxx",
sasl_plain_password="Taborxxxx",
api_version=(0, 10, 2)
)
说明:sasl_plain_username 包含实例 ID 和用户名,使用 # 拼接。
CKafka 转储能力基于 SCF 实现,可在 SCF 日志中查询到相关转储的信息及转储状态。
本页内容是否解决了您的问题?