操作场景
CKafka 连接器提供数据流出能力,您可以将 CKafka 数据分发至 Elasticsearch Service(ES)便于海量数据存储搜索、实时日志分析等操作。
说明:
只支持7.0以上版本的 Elasticsearch Service。
前提条件
该功能目前依赖 Elasticsearch Service 服务,使用时需开通相关产品功能。
已创建好数据流出目标 Elasticsearch Service 连接。
操作步骤
创建数据流出任务
2. 在左侧导航栏单击连接器 > 任务列表,选择好地域后,单击新建任务。
3. 填写任务名称,任务类型选择数据流出,数据目标类型选择 Elasticsearch Service,单击下一步。
Topic 类型:选择数据源 Topic。
弹性 Topic:选择提前创建好的弹性 Topic,详情参见 Topic 管理。 CKafka 实例内 Topic:选择在 CKafka 创建好的实例和 Topic,若实例设置了ACL 策略,请确保选中的 topic 有读写权限,详情参见 创建 Topic。 起始位置:选择转储时历史消息的处理方式,topic offset 设置。
5. 设置上述信息后,单击下一步,单击预览 Topic 消息,将会选取源 Topic 中的第一条消息进行解析。
说明
目前解析消息需要满足以下条件:
消息为 JSON 字符串结构。
源数据必须为单层 JSON 格式,嵌套 JSON 格式可使用 数据处理 进行简单的消息格式转换。 6. (可选)开启对源数据进行数据处理按钮,具体配置方法请参见 简单数据处理。 7. 单击下一步,配置数据目标信息。
源数据:单击拉取源 Topic 数据。若源Topic暂无数据,也可以自定义数据。
数据目标:选择提前创建好的数据流出的目标 Elasticsearch Service 连接。
索引名称:填写索引名称,索引名称必须全部为小写,支持 jsonpath 语法。
按日期拆分索引名称:可选,开启后需选择日期格式,写入 ES 的索引为%(索引名称)_%(日期)。
失败消息处理:选择投递失败的消息的处理方式,支持丢弃、保留和投递至 CLS (需指定投递到的日志集和日志主题并授权访问日志服务 CLS)三种方式。
保留:适合用于测试环境,任务运行失败时将会终止任务不会重试,并且在事件中心中记录失败原因。
丢弃:适合用于生产环境,任务运行失败时将会忽略当前失败消息。建议使用 "保留" 模式测试无误后,再将任务编辑成 "丢弃" 模式用于生产。
投递至 CLS:适合用于严格生产环境,任务运行失败时会将失败消息及元数据和失败原因上传到指定 CLS 主题中。
死信队列:适合用于严格生产环境,任务运行失败时会将失败消息及元数据和失败原因投递到指定的 CKafak Topic中。
数据源类型
索引时间:可以指定源数据中某一字段作为索引时间。默认为消息的投递时间。
ES文档 ID 字段:可以指定该字段的值作为 ES 文档 ID 的值。默认为topic+kafkaPartition+kafkaOffset
。
保留非 JSON 数据:若开启,则对于非 JSON 数据,会指定 KEY 进行组装投递。若关闭,则会丢弃非 JSON 数据。
本选项仅用于连接器订阅关系型数据库到 Topic 里面的数据(增删改)同步更新到 ES 。会识别数据库的增删改,保持 ES 的数据与源表的数据一致。
同步模式:若选择字段逐一匹配,则可以自定义消息字段名和目标索引字段的映射关系。若选择默认字段匹配,则会在 ES 的索引中,mapping 使用消息的 key 作为 field 名称。
目标索引类型:可以选择新建索引或者从现有的 ES 索引中选择。
主键:指定数据库表的主键作为 ES 文档 ID 的值。
索引时间:可以指定源数据中的某一字段作为索引时间。默认为消息的投递时间。
8. 单击提交,可以在任务列表看到刚刚创建的任务,在状态栏可以查看任务创建进度。
本页内容是否解决了您的问题?