• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

KafkaBridge: KafkaBridge 是奇虎 360 开源的 Kafka 客户端 SDK ,底层基于 librdkafk ...

原作者: [db:作者] 来自: 网络 收藏 邀请

开源软件名称:

KafkaBridge

开源软件地址:

https://gitee.com/mirrors/KafkaBridge

开源软件介绍:

简介 English

  • Qbusbridge 是 pub-sub 消息系统的客户端 SDK,目前它支持:

    用户可以通过修改配置切换到任意一个 pub-sub 消息系统。默认配置是访问 Kafka,如果想要切换到 Pulsar,需要修改配置为:

    mq.type=pulsar# Other configs for Pulsar...

    更多细节见 config

  • Qbusbridge-Kafka 底层基于 librdkafka, 与之相比封装了大量的使用细节,简单易用,使用者无需了解过多的Kafka系统细节,只需调用极少量的接口,就可完成消息的生产和消费;

  • 针对使用者比较关心的消息生产的可靠性,作了近一步的提升

特点

  • 支持多种语言:c++、php、python、golang, 且各语言接口完全统一;
  • 接口少,简单易用;
  • 针对高级用户,支持通过配置文件调整所有的librdkafka的配置;
  • 在非按key写入数据的情况下,尽最大努力将消息成功写入;
  • 支持同步和异步两种数据写入方式;
  • 在消费时,除默认自动提交offset外,允许用户通过配置手动提交offset;
  • 在php-fpm场景中,复用长连接生产消息,避免频繁创建断开连接的开销;

编译

确保你的系统上安装了 g++ (>= 4.8.5), boost (>= 1.41),cmake (>= 3.1),swig (>= 3.0.12)。

git clone

git clone --recursive https://github.com/Qihoo360/qbusbridge.git

此外,qbus SDK 静态链接到 libstdc++,因此必须确保 libstdc++.a 存在。对于 CentOS 用户,运行:

sudo yum install -y glibc-static libstdc++-static

1. 安装子模块

运行./build_dependencies.sh

它会自动下载子模块,并将其安装到cxx/thirdparts/local,即CMakeLists.txt查找头文件和库文件的目录。

./cxx/thirdparts/local

include/  librdkafka/    rdkafka.h  log4cplus/    logger.hlib/  librdkafka.a  liblog4cplus.a

2. 编译SDK

C/C++

进入cxx目录,执行./build.sh,会生成以下文件:

include/  qbus_consumer.h  qbus_producer.hlib/  debug/libQBus.so  release/libQBus.so

虽然构建 C++ SDK 需要 C++11 支持,但是 SDK 也可以被旧版本 g++ 使用。比如,使用 g++ 4.8.5 编译 qbus SDK,使用 g++ 4.4.7 使用 qbus SDK。

Go

进入golang目录,执行./build.sh,会生成以下文件:

gopath/  src/    qbus/      qbus.go      libQBus_go.so

可以运行 USE_GO_MOD=1 ./build.sh 来启用 go module,此时会生成以下文件:

examples/  go.mod  qbus/    qbus.go    go.mod    libQBus_go.so

Python

进入python目录,执行./build.sh,会生成以下文件:

examples/  qbus.py  _qbus.so

PHP

进入php目录,执行./build.sh,会生成以下文件:

examples/  qbus.php  qbus.so

3. 编译示例程序

C/C++

进入examples子目录,运行 ./build.sh [debug|release] 生成可执行文件,其中 debug 是使用 lib/debug 目录下的 libQBus.sorelease 是使用 lib/release 目录下的 libQBus.so。运行make clean删除它们。

如果要运行自己的程序,可以参考Makefile文件。

Go

进入examples子目录,运行./build.sh生成可执行文件,运行./clean.sh删除它们。

运行可执行文件时把libQBus_go.so路径加入LD_LIBRARY_PATH环境变量:

export LD_LIBRARY_PATH=$PWD/gopath/src/qbus:$LD_LIBRARY_PATH

如果要运行自己的程序,将生成的gopath目录加入GOPATH环境变量,或者将gopath/src/qbus目录移动到$GOPATH/src下。

Python

将生成的qbus.py_qbus.so拷贝至要运行的Python脚本同一路径即可。

PHP

编辑php.ini文件,添加extension=<module-path><module-path>qbus.so路径。

使用

数据生产

  • 在非按key写入的情况下,sdk尽最大努力提交每一条消息,只要Kafka集群存有一台broker正常,就会重试发送;
  • 每次写入数据只需要调用produce接口,在异步发送的场景下,通过返回值可以判断发送队列是否填满,发送队列可通过配置文件调整;
  • 在同步发送的场景中,produce接口返回当前消息是否写入成功,但是写入性能会有所下降,CPU使用率会有所上升,推荐还是使用异步写入方式;。
  • 下面是生产接口,以c++为例:
bool QbusProducer::init(const string& broker_list,                        const string& log_path,                        const string& config_path,                        const string& topic_name);bool QbusProducer::produce(const char* data,                           size_t data_len,                           const std::string& key);void QbusProducer::uninit();
  • c++ sdk的使用范例:
#include <string>#include <iostream>#include "qbus_producer.h"int main(int argc, const char* argv[]) {    qbus::QbusProducer qbus_producer;    if (!qbus_producer.init("127.0.0.1:9092",                    "./log",                    "./config",                    "topic_test")) {        std::cout << "Failed to init" << std::endl;        return 0;    }    std::string msg("test\n");    if (!qbus_producer.produce(msg.c_str(), msg.length(), "key")) {        std::cout << "Failed to produce" << std::endl;    }    qbus_producer.uninit();    return 0;}

数据消费

  • 消费只需调用subscribeOne订阅topic(也支持同时订阅多个topic),然后执行start就开始消费,当前进程非阻塞,每条消息通过callback接口回调给使用者;
  • sdk还支持用户手动提交offset方式,用户可以通过callback中返回的消息体,在代码其他逻辑中进行提交。
  • 下面是消费接口,以c++为例:
bool QbusConsumer::init(const std::string& broker_list,                        const std::string& log_path,                        const std::string& config_path,                        const QbusConsumerCallback& callback);bool QbusConsumer::subscribeOne(const std::string& group, const std::string& topic);bool QbusConsumer::subscribe(const std::string& group,                             const std::vector<std::string>& topics);bool QbusConsumer::start();void QbusConsumer::stop();bool QbusConsumer::pause(const std::vector<std::string>& topics);bool QbusConsumer::resume(const std::vector<std::string>& topics);
  • c++ sdk的使用范例:
#include <iostream>#include "qbus_consumer.h"qbus::QbusConsumer qbus_consumer;class MyCallback: public qbus::QbusConsumerCallback {    public:        virtual void deliveryMsg(const std::string& topic,                    const char* msg,                    const size_t msg_len) const {            std::cout << "topic: " << topic << " | msg: " << std::string(msg, msg_len) << std::endl;        }};int main(int argc, char* argv[]) {    MyCallback my_callback;    if (qbus_consumer.init("127.0.0.1:9092",                    "log",                    "config",                    my_callback)) {        if (qbus_consumer.subscribeOne("groupid_test", "topic_test")) {            if (!qbus_consumer.start()) {                std::cout << "Failed to start" << std::endl;                return NULL;            }            while (1) sleep(1);  //可以执行其他业务逻辑            qbus_consumer.stop();        } else {            std::cout << "Failed subscribe" << std::endl;        }    } else {        std::cout << "Failed init" << std::endl;    }    return 0;}

可以用pause()resume()方法来暂停或恢复某些主题的消费,具体示例见qbus_pause_resume_example.cc

更多API使用方法参考C examplesC++ examplesGo examplesPython examplesPHP examples目录下的示例代码。

配置

配置文件是INI格式:

[global][topic][sdk]

globaltopic配置见rdkafka 1.0.x configurationsdk配置见sdk configuration

通常情况下kafkabridge使用空配置文件即可工作,但是如果broker版本低于0.10.0.0,必须添加api.version相关的配置,见broker version compatibility.

例如,对0.9.0.1版本的broker,必须添加以下配置:

[global]api.version.request=falsebroker.version.fallback=0.9.0.1

当前配置和 broker 0.9.0.1 兼容。因此,如果使用了高版本的 broker,api.version.request 应该配置为 true。否则消息协议会使用旧版本,比如,没有时间戳字段。

联系我们

QQ 群:876834263


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap