在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
开源软件名称:KafkaBridge开源软件地址:https://gitee.com/mirrors/KafkaBridge开源软件介绍:简介 English
特点
编译确保你的系统上安装了 g++ (>= 4.8.5), boost (>= 1.41),cmake (>= 3.1),swig (>= 3.0.12)。 git clonegit clone --recursive https://github.com/Qihoo360/qbusbridge.git 此外,qbus SDK 静态链接到 libstdc++,因此必须确保 sudo yum install -y glibc-static libstdc++-static 1. 安装子模块运行 它会自动下载子模块,并将其安装到 见 include/ librdkafka/ rdkafka.h log4cplus/ logger.hlib/ librdkafka.a liblog4cplus.a 2. 编译SDKC/C++进入 include/ qbus_consumer.h qbus_producer.hlib/ debug/libQBus.so release/libQBus.so
Go进入 gopath/ src/ qbus/ qbus.go libQBus_go.so 可以运行 examples/ go.mod qbus/ qbus.go go.mod libQBus_go.so Python进入 examples/ qbus.py _qbus.so PHP进入 examples/ qbus.php qbus.so 3. 编译示例程序C/C++进入 如果要运行自己的程序,可以参考 Go进入 运行可执行文件时把 export LD_LIBRARY_PATH=$PWD/gopath/src/qbus:$LD_LIBRARY_PATH 如果要运行自己的程序,将生成的 Python将生成的 PHP编辑 使用数据生产
bool QbusProducer::init(const string& broker_list, const string& log_path, const string& config_path, const string& topic_name);bool QbusProducer::produce(const char* data, size_t data_len, const std::string& key);void QbusProducer::uninit();
#include <string>#include <iostream>#include "qbus_producer.h"int main(int argc, const char* argv[]) { qbus::QbusProducer qbus_producer; if (!qbus_producer.init("127.0.0.1:9092", "./log", "./config", "topic_test")) { std::cout << "Failed to init" << std::endl; return 0; } std::string msg("test\n"); if (!qbus_producer.produce(msg.c_str(), msg.length(), "key")) { std::cout << "Failed to produce" << std::endl; } qbus_producer.uninit(); return 0;} 数据消费
bool QbusConsumer::init(const std::string& broker_list, const std::string& log_path, const std::string& config_path, const QbusConsumerCallback& callback);bool QbusConsumer::subscribeOne(const std::string& group, const std::string& topic);bool QbusConsumer::subscribe(const std::string& group, const std::vector<std::string>& topics);bool QbusConsumer::start();void QbusConsumer::stop();bool QbusConsumer::pause(const std::vector<std::string>& topics);bool QbusConsumer::resume(const std::vector<std::string>& topics);
#include <iostream>#include "qbus_consumer.h"qbus::QbusConsumer qbus_consumer;class MyCallback: public qbus::QbusConsumerCallback { public: virtual void deliveryMsg(const std::string& topic, const char* msg, const size_t msg_len) const { std::cout << "topic: " << topic << " | msg: " << std::string(msg, msg_len) << std::endl; }};int main(int argc, char* argv[]) { MyCallback my_callback; if (qbus_consumer.init("127.0.0.1:9092", "log", "config", my_callback)) { if (qbus_consumer.subscribeOne("groupid_test", "topic_test")) { if (!qbus_consumer.start()) { std::cout << "Failed to start" << std::endl; return NULL; } while (1) sleep(1); //可以执行其他业务逻辑 qbus_consumer.stop(); } else { std::cout << "Failed subscribe" << std::endl; } } else { std::cout << "Failed init" << std::endl; } return 0;} 可以用 更多API使用方法参考C examples,C++ examples,Go examples,Python examples,PHP examples目录下的示例代码。 配置配置文件是INI格式: [global][topic][sdk] global和topic配置见rdkafka 1.0.x configuration,sdk配置见sdk configuration。 通常情况下kafkabridge使用空配置文件即可工作,但是如果broker版本低于0.10.0.0,必须添加api.version相关的配置,见broker version compatibility. 例如,对0.9.0.1版本的broker,必须添加以下配置: [global]api.version.request=falsebroker.version.fallback=0.9.0.1 当前配置和 broker 0.9.0.1 兼容。因此,如果使用了高版本的 broker, 联系我们QQ 群:876834263 |
请发表评论