解决celery与kafka的联用问题

0X01 背景

大数据过滤、导入,用celery下发任务,任务内容为kafka生产一些数据。

0X02 问题

使用confluent_kafka或python-kafka模块向kafka生产数据,本地调试时代码可以正常生产消息,但是套上celery后,kafka就无法将新消息生产到topic队列中了,具体表现为调用Producer函数后无任何反应。

https://www.cnblogs.com/dplearning/p/7520211.html
https://blog.csdn.net/weixin_34050427/article/details/85940461

0X03 解决方案

  1. 更换为pykafka库(缺点:pykafka不支持client.id)
  2. 更换为python-kafka库(缺点:每个tasker都需要实例化一个Producer,否则依然会发生confluent_kafka那种无响应的情况)
    如:
#coding: utf-8
from kafka import KafkaProducer
import json
import time

@celery.task(max_retries=1, default_retry_delay=3, ignore_result=True)
def WriteToKafka(key):
    """
    将传入数据写入Kafka
    """
    prod = KafkaProducer(
        bootstrap_servers=BROKER, client_id="xxx", retries=2)
    datas = redis_client.smembers(key)

    for data in datas:
        # 数据包格式化
        json_data = {}
        future = prod.send('xxx',
                           json.dumps(json_data).encode("utf-8")).add_errback(on_send_error)

    # Wait for any outstanding messages to be delivered and delivery report
    # callbacks to be triggered.
    # 清理使用过的Redis Key
    redis_client.delete(key)
    prod.flush()

0X04 后记

由于pykafka不支持设置client.id,所以我这里只能使用python-kafka来解决该问题。
虽然将Producer实例化放入task函数中会导致多次建立kafka链接,但是可以通过数据打包的方式让一个tasker执行更多的任务,通过减少tasker的调用量来减少Producer实例化的次数,从而提高效率。
有一个奇怪的问题,当我尝试使用confluent_kafka库,并模仿python-kafka的解决方法——在task函数中实例化Producer时,celery上面执行依然无响应,怀疑是confluent_kafka库的bug

0X05 参考内容

https://github.com/celery/celery/issues/4021
https://github.com/dpkp/kafka-python/issues/1098

本文链接:

https://www.lzskyline.com/archives/78/
1 + 9 =
快来做第一个评论的人吧~