目前是公司正在搞得项目, 所以只说思路, 具体代码就不贴了

修改配置中的写出类型

suricata.yaml配置中, 修改eve.json的filetype为kafka, 并在下面新增几个配置传入kafka的broker, topic, clientid(若有)等信息

增加对kafka写出类型的识别

主要修改 output-json.c 文件

可以搜索 LOGFILE_TYPE_REDIS 关键字, 仿照类似的方式增加对Kafka类型的识别, 并设置 json_ctx->json_out 的值, 方便后面的代码流程判断写出类型

读kafka配置也是一样的方法, 根据 LOGFILE_TYPE_REDIS 关键字找到相关的 ConfNodeLookupChild 方法, 从配置中读内容存到指针节点中, 方便后面的函数从指针取配置信息

响应kafka类型输出事件

主要修改 util-logopenfile.c 文件

关键字 HAVE_LIBHIREDIS 和 LOGFILE_TYPE_REDIS, 找到缓冲区内容写到Redis的相关判断逻辑以及主函数, 增加写Kafka的判断逻辑, 之后就只剩下实现一个写kafka的主函数了

缓冲区写Kafka函数

如果直接无脑用一个函数把缓冲区的内容打到Kafka, 会导致连接池爆满(每个流量写到kafka时都会创建新的client), 所以还是要参考Redis的写出方法, 先在前面用init方法将kafka对象放到指针里, 以file like的方式实现函数将数据写到kafka.

kafka consumer参考:

https://www.cnblogs.com/GnibChen/p/8604544.html
https://blog.csdn.net/qq849635649/article/details/77915615

其他还有一些比较细的小步骤上文没有提到, 比如新增报错类型, header文件增加对应结构体, 修改Makefile文件加入源文件等.

如果觉得我的文章对你有用,请随意赞赏