在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
使用docker部署
2 准备配置文件testkafka.conf# vi testkafka.conf lua_package_path "/usr/local/openresty/lualib/resty/kafka/?.lua;;"; lua_need_request_body on; server { listen 80; server_name testkafka; location /test { content_by_lua ' local testfile = "/tmp/test.log" local cjson = require "cjson" local client = require "resty.kafka.client" local producer = require "resty.kafka.producer" local broker_list = { { host = "127.0.0.1", port = 9092 } } local topic = "test" local key = "key" local message = "halo world" -- usually we do not use this library directly local cli = client:new(broker_list) local brokers, partitions = cli:fetch_metadata(topic) if not brokers then ngx.say("fetch_metadata failed, err:", partitions) end --ngx.say("brokers: ", cjson.encode(brokers), "; partitions: ", cjson.encode(partitions)) -- sync producer_type local p = producer:new(broker_list) local f = io.open(testfile, "a+") f:write(topic .. ":" .. key .. ":" .. message .. "\\n") f:close() local offset, err = p:send(topic, key, message) if not offset then ngx.say("send err:", err) return end ngx.say("send success, offset: ", tonumber(offset)) -- this is async producer_type and bp will be reused in the whole nginx worker local bp = producer:new(broker_list, { producer_type = "async" }) local ok, err = bp:send(topic, key, message) if not ok then ngx.say("send err:", err) return end ngx.say("host : ", ngx.var.host) ngx.say("uri : ", ngx.var.uri) ngx.say("args : ", ngx.var.args) ngx.say("body : ", ngx.req.get_body_data()) ngx.say("client ip : ", ngx.var.remote_addr) ngx.say("time : ", ngx.var.time_local) ngx.say("send success, ok:", ok) '; } } 功能:发送kafka、写日志到/tmp/test.log,打印请求信息 修改其中broker的ip和端口,以及topic名; 3 启动docker
挂载testkafka.conf以及kafka lib目录 4 测试
5 更多1)可以将nginx访问日志发送到kafka 2)可以将请求数据作为消息发送到kafka(从uri中的path解析出topic) 6 报错有可能报错:no resolver defined to resolve 这是因为kafka broker配置的是hostname,而不是ip,而nginx遇到hostname必须通过dns解析,而不能依靠/etc/hosts来解析,所以会报以上错误,这时有两种解决方法: 1)安装dnsmasq; 2)修改kafka配置中的advertised.host.name,将其修改为ip即可;
参考:https://github.com/doujiang24/lua-resty-kafka
|
请发表评论