• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

lua-resty-kafka配置文档

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

参考网址:https://github.com/doujiang24/lua-resty-kafka

 

一、例子

content_by_lua '

          -- 引入lua所有api

         local cjson = require "cjson"

         local producer = require "resty.kafka.producer"

 

         -- 定义kafka broker地址,ip需要和kafka的host.name配置一致 

         local broker_list = {

             { host = "192.168.101.223", port = 9092 }, 

             { host = "192.168.101.224", port = 9092 }

         }

         local key = "key"

         local message = "halo world"

         local error_handle = function (topic, partition_id, queue, index, err, retryable)

                  ngx.log(ngx.ERR, "failed to send to kafka, topic: ", topic, "; partition_id: ", partition_id, "; retryable: ", retryable)

         end

 

         local p = producer:new(broker_list, { producer_type = "async", max_retry = 1, batch_num = 1, error_handle = error_handle })

 

         local ok, err = p:send("test", key, message)

         if not ok then

                  ngx.say("send err:", err)

             return

         end

         ngx.say("send ok:", ok)

         p:flush()

';

二、语法

New

p = producer:new(broker_list, producer_config?, cluster_name?)

 

broker_list :客户端列表

    local broker_list = {

             { host = "192.168.101.223", port = 9092 }, 

             { host = "192.168.101.224", port = 9092 }

         }

 

producer_config? :可选参数

 

    producer_type 生产者类型,同步或者异步,"async" or "sync"

         request_timeout 请求超时,默认是 2000 ms

         required_acks 请求应答不能为0,默认是1

    max_retry 信息发送重试最大次数,默认是3

    retry_backoff 信息发送重试补偿,默认100ms

    partitioner 选择分区从键和分区num的分区。

       local partitioner = function (key, partition_num, correlation_id) end

       默认

         local function default_partitioner(key, num, correlation_id)

               local id = key and crc32(key) or correlation_id

               -- partition_id is continuous and start from 0

               return id % num

        end

 

         !!!以下为缓冲区参数,只有producer_type = "async"有效

         flush_time 队列最大缓存时间,默认1000ms

    batch_num 队列最大批次数量,默认200

    batch_size 保存缓存大小,默认是1M(最大可以为2M),需要小心,这个跟kafka配置有关,socket.request.max.byts为2-10k

    max_buffering 队列最大缓存大小,默认50000

    error_handle 错误处理,当缓冲区发送到kafka错误时处理数据。

                   error_handle = function (topic, partition_id, message_queue, index, err, retryable) end

       失败的消息队列如{ key1, msg1, key2, msg2 },键值key在消息队列中是为””相当与orign中的nil,

       Index为消息队列的长度

       Retryable为true时,这意味着kafka服务器肯定没有提交这些消息。可以尝试重试发送消息,

    暂时不支持压缩

 

cluster_name? :可选参数

    指定集群的名称,默认是1(该参数是一个数值),当您有两个或多个kafka集群时,您可以指定不同的名称,只有producer_type = "async"有效

 

 

 

Send

ok, err = p:send(topic, key, message)

    同步模式时 :

       如果成功,返回当前代理和分区的偏移量(** cdata: LL **)。如果出现错误,返回nil,用字符串描述错误

         异步模式时 :

       消息将首先写入缓冲区。当缓冲区超过batch_num时,它将发送到kafka服务器,或者每个flush_time刷新缓冲区。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
skynet源码分析之网络层——Lua层介绍发布时间:2022-07-22
下一篇:
ESP8266LUA脚本语言开发:外设篇-GPIO输出高低电平发布时间:2022-07-22
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap