用户可以编写云函数来处理 CKafka 中收取到的消息。云函数后台模块可以作为消费者消费 CKafka 中的消息,并将消息传递给云函数。
CKafka 触发器具有以下特点:
Pull 模型:云函数的后台模块作为消费者,连接 CKafka 实例并消费消息。在后台模块获取到消息后,会将消息封装到数据结构中并调用指定的函数,将消息数据传递给云函数。
同步调用: CKafka 触发器使用同步调用类型来调用函数。有关调用类型的更多信息,请参见 调用类型。 说明:
对于运行错误(含用户代码错误和 Runtime 错误),CKafka 触发器会按照您配置的重试次数进行重试。默认重试10000次。
对于系统错误,CKafka 触发器会采用指数退避的方式持续重试,直至成功为止。
CKafka 触发器属性
CKafka 实例:配置连接的 CKafka 实例,仅支持选择同地域下的实例。
Topic:支持在 CKafka 实例中已经创建的 Topic(仅支持未创建 ACL 策略的 Topic)。
最大批量消息数:在拉取并批量投递给当前云函数时的最大消息数,目前支持最高配置为10000。结合消息大小、写入速度等因素影响,每次触发云函数并投递的消息数量不一定能达到最大值,而是处在1 - 最大消息数之间的一个变动值。
起始位置:触发器消费消息的起始位置,默认从最新位置开始消费。支持最新、最开始、按指定时间点三种配置。
重试次数:函数发生运行错误(含用户代码错误和 Runtime 错误)时的最大重试次数。
CKafka 消费及消息传递
由于 CKafka 消息无主动推送能力,需要消费方通过拉取的方式,拉取到消息并进行消费。因此,在配置 CKafka 触发器后,云函数后台会通过启动 CKafka 消费模块,作为消费者,并在 CKafka 中创立独立的消费组进行消费。
云函数后台的消费模块在消费到消息后,会根据一定的超时时间、累积消息数量大小及最大批量消息数等信息,组合为事件结构并发起函数调用(同步调用)。相关限制说明如下:
超时时间:目前云函数后台的消费模块的超时时间为60秒,避免时延过长才进行消费。例如,Ckafka Topic 的消息写入很少,消费模块在60秒内没有凑够最大批量消息数的消息,则依然会发起函数调用。
同步调用的事件大小限制:6MB,详情请参见 限制说明。如果 Ckafka Topic 的消息很大,例如单条消息就已经达到6MB,那么由于同步调用的6MB限制,所以传递给云函数的事件结构中只会有一条消息,而不是用户配置的最大消息个数。 最大批量消息数:同 CKafka 触发器属性,由用户设置,目前支持最高配置为10000。
云函数后台的消费模块会循环这个过程,且会保证消息消费的顺序性,即前一批消息消费完(同步调用),再进行下一批消息的消费。
说明:
在这个过程中,每次组合的消息数量不一定相同,即每个事件结构内的消息个数在1 - 配置的最大消息个数之间。如果配置的最大消息数过大,有可能出现事件结构内的消息个数始终不会达到最大消息数的情况。
在云函数中获取到事件内容后,可选择循环处理的方式,确保每一条消息都得到处理,而不应假定每次传递的消息个数均是恒定的。
CKafka 触发器的事件消息结构
在指定的 CKafka Topic 接收到消息时,云函数的后台消费者模块会消费到消息,并将消息组装为类似以下的 JSON 格式事件,触发绑定的函数并将数据内容作为入参传递给函数。
{
"Records": [
{
"Ckafka": {
"topic": "test-topic",
"Partition":1,
"offset":36,
"msgKey": "None",
"msgBody": "Hello from Ckafka!"
}
},
{
"Ckafka": {
"topic": "test-topic",
"Partition":1,
"offset":37,
"msgKey": "None",
"msgBody": "Hello from Ckafka again!"
}
}
]
}
数据结构内容详细说明如下:
常见问题
CKafka 消息堆积了很多该如何处理?
在您配置 CKafka 触发器后,云函数后台会通过启动 CKafka 消费模块作为消费者,在 CKafka 中创立独立的消费组进行消费,且消费模块的数量等于 Ckafka Topic 的分区(partition)数量。
如果堆积了很多 Ckafka 消息,则需要提升消费能力。提升消费能力有以下方法:
增加 Ckafka Topic 的分区数。云函数的消费能力正比于分区数量,云函数后台的 CKafka 消费模块会自动匹配 Ckafka Topic 分区数,即可以通过增加分区来提升消费能力。
优化云函数的运行时间。云函数的运行时间越短,消费能力就越强。若云函数的运行时间变长(例如,云函数内需要写 DB,DB 的响应变慢),则消费速度就会下降。
本页内容是否解决了您的问题?