本文整理汇总了C++中rd_kafka_topic_new函数的典型用法代码示例。如果您正苦于以下问题:C++ rd_kafka_topic_new函数的具体用法?C++ rd_kafka_topic_new怎么用?C++ rd_kafka_topic_new使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了rd_kafka_topic_new函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的C++代码示例。
示例1: producer_metadata
static VALUE producer_metadata(VALUE self, VALUE topicStr, VALUE timeout) {
HermannInstanceConfig *producerConfig;
rd_kafka_resp_err_t err;
hermann_metadata_ctx_t md_context;
VALUE result;
Data_Get_Struct(self, HermannInstanceConfig, producerConfig);
if (!producerConfig->isInitialized) {
producer_init_kafka(self, producerConfig);
}
md_context.rk = producerConfig->rk;
md_context.timeout_ms = rb_num2int(timeout);
if ( !NIL_P(topicStr) ) {
Check_Type(topicStr, T_STRING);
md_context.topic = rd_kafka_topic_new(producerConfig->rk, StringValuePtr(topicStr), NULL);
} else {
md_context.topic = NULL;
}
err = producer_metadata_request(&md_context);
if ( err != RD_KAFKA_RESP_ERR_NO_ERROR ) {
// annoyingly, this is always a timeout error -- the rest rdkafka just jams onto STDERR
rb_raise( rb_eRuntimeError, "%s", rd_kafka_err2str(err) );
} else {
result = producer_metadata_make_hash(md_context.data);
rd_kafka_metadata_destroy(md_context.data);
return result;
}
}
开发者ID:braintree,项目名称:hermann,代码行数:34,代码来源:hermann_rdkafka.c
示例2: p_kafka_set_topic
void p_kafka_set_topic(struct p_kafka_host *kafka_host, char *topic)
{
if (kafka_host) {
kafka_host->topic_cfg = rd_kafka_topic_conf_new();
p_kafka_apply_topic_config(kafka_host);
if (config.debug) {
const char **res;
size_t res_len, idx;
res = rd_kafka_topic_conf_dump(kafka_host->topic_cfg, &res_len);
for (idx = 0; idx < res_len; idx += 2)
Log(LOG_DEBUG, "DEBUG ( %s/%s ): librdkafka '%s' topic config: %s = %s\n", config.name, config.type, topic, res[idx], res[idx + 1]);
rd_kafka_conf_dump_free(res, res_len);
}
/* This needs to be done here otherwise kafka_host->topic_cfg is null
* and the partitioner cannot be set */
if (config.kafka_partition_dynamic && kafka_host->topic_cfg)
p_kafka_set_dynamic_partitioner(kafka_host);
/* destroy current allocation before making a new one */
if (kafka_host->topic) p_kafka_unset_topic(kafka_host);
if (kafka_host->rk && kafka_host->topic_cfg) {
kafka_host->topic = rd_kafka_topic_new(kafka_host->rk, topic, kafka_host->topic_cfg);
kafka_host->topic_cfg = NULL; /* rd_kafka_topic_new() destroys conf as per rdkafka.h */
}
}
}
开发者ID:jrossi,项目名称:pmacct-1,代码行数:31,代码来源:kafka_common.c
示例3: kfc_rdkafka_init
void kfc_rdkafka_init(rd_kafka_type_t type) {
char errstr[512];
if (type == RD_KAFKA_PRODUCER) {
char tmp[16];
snprintf(tmp, sizeof(tmp), "%i", SIGIO);
rd_kafka_conf_set(conf.rk_conf, "internal.termination.signal",
tmp, NULL, 0);
}
/* Create handle */
if (!(conf.rk = rd_kafka_new(type, conf.rk_conf,
errstr, sizeof(errstr))))
FATAL("Failed to create rd_kafka struct: %s", errstr);
rd_kafka_set_logger(conf.rk, rd_kafka_log_print);
if (conf.debug)
rd_kafka_set_log_level(conf.rk, LOG_DEBUG);
else if (conf.verbosity == 0)
rd_kafka_set_log_level(conf.rk, 0);
/* Create topic, if specified */
if (conf.topic &&
!(conf.rkt = rd_kafka_topic_new(conf.rk, conf.topic,
conf.rkt_conf)))
FATAL("Failed to create rk_kafka_topic %s: %s", conf.topic,
rd_kafka_err2str(rd_kafka_errno2err(errno)));
conf.rk_conf = NULL;
conf.rkt_conf = NULL;
}
开发者ID:fsaintjacques,项目名称:kfc,代码行数:31,代码来源:common.c
示例4: rd_kafka_conf_new
int Http::kafka_consumer_::Init(const int partition, const char* topic, const char* brokers, MsgConsume msg_consume) {
char err_str[512];
partition_ = partition;
msg_consume_ = msg_consume;
rd_kafka_conf_t *conf = rd_kafka_conf_new();
if (NULL == conf) {
return -1;
}
rd_kafka_conf_set(conf, "batch.num.messages", "100", err_str, sizeof(err_str));
if (!(rk_ = rd_kafka_new(RD_KAFKA_CONSUMER, conf, err_str, sizeof(err_str)))) {
return -1;
}
rd_kafka_set_log_level(rk_, 1);
if (rd_kafka_brokers_add(rk_, brokers) == 0) {
return -1;
}
rd_kafka_topic_conf_t *topic_conf = rd_kafka_topic_conf_new();
rkt_ = rd_kafka_topic_new(rk_, topic, topic_conf);
if (NULL == rkt_) {
return -1;
}
//RD_KAFKA_OFFSET_BEGINNING,从partition消息队列的开始进行consume;
//RD_KAFKA_OFFSET_END:从partition中的将要produce的下一条信息开始(忽略即当前所有的消息)
if (rd_kafka_consume_start(this->rkt_, partition, RD_KAFKA_OFFSET_END) == -1) {
return -1;
}
return 1;
}
开发者ID:91lilei,项目名称:work,代码行数:33,代码来源:HttpRequest.cpp
示例5: test_conf_init
rd_kafka_topic_t *test_create_producer_topic (rd_kafka_t *rk,
const char *topic, ...) {
rd_kafka_topic_t *rkt;
rd_kafka_topic_conf_t *topic_conf;
char errstr[512];
va_list ap;
const char *name, *val;
test_conf_init(NULL, &topic_conf, 20);
va_start(ap, topic);
while ((name = va_arg(ap, const char *)) &&
(val = va_arg(ap, const char *))) {
if (rd_kafka_topic_conf_set(topic_conf, name, val,
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
TEST_FAIL("Conf failed: %s\n", errstr);
}
va_end(ap);
/* Make sure all replicas are in-sync after producing
* so that consume test wont fail. */
rd_kafka_topic_conf_set(topic_conf, "request.required.acks", "-1",
errstr, sizeof(errstr));
rkt = rd_kafka_topic_new(rk, topic, topic_conf);
if (!rkt)
TEST_FAIL("Failed to create topic: %s\n",
rd_kafka_err2str(rd_kafka_errno2err(errno)));
return rkt;
}
开发者ID:antoniocorreia,项目名称:cprojects,代码行数:33,代码来源:test-1.c
示例6: kafka_partition_count
static
int kafka_partition_count(rd_kafka_t *r, const char *topic)
{
rd_kafka_topic_t *rkt;
rd_kafka_topic_conf_t *conf;
int i;//C89 compliant
//connect as consumer if required
if (r == NULL)
{
if (log_level)
{
openlog("phpkafka", 0, LOG_USER);
syslog(LOG_ERR, "phpkafka - no connection to get partition count for topic: %s", topic);
}
return -1;
}
/* Topic configuration */
conf = rd_kafka_topic_conf_new();
/* Create topic */
rkt = rd_kafka_topic_new(r, topic, conf);
//metadata API required rd_kafka_metadata_t** to be passed
const struct rd_kafka_metadata *meta = NULL;
if (RD_KAFKA_RESP_ERR_NO_ERROR == rd_kafka_metadata(r, 0, rkt, &meta, 200))
i = (int) meta->topics->partition_cnt;
else
i = 0;
if (meta) {
rd_kafka_metadata_destroy(meta);
}
rd_kafka_topic_destroy(rkt);
return i;
}
开发者ID:dwieland,项目名称:phpkafka,代码行数:33,代码来源:kafka.c
示例7: watcher
static void watcher(zhandle_t *zh, int type,
int state, const char *path, void *param)
{
char brokers[1024];
kafka_t* k = (kafka_t*) param;
rd_kafka_topic_conf_t *topic_conf;
if(k->conf == NULL) return;
char* topic = k->conf->topic[0];
if (k->no_brokers || type == ZOO_CHILD_EVENT && strncmp(
path, BROKER_PATH, sizeof(BROKER_PATH) - 1) == 0)
{
brokers[0] = '\0';
set_brokerlist_from_zookeeper(zh, brokers);
if (brokers[0] != '\0' && k->rk != NULL &&
server_list_add_once(&(k->broker_list), brokers))
{
rd_kafka_brokers_add(k->rk, brokers);
k->no_brokers = 0;
rd_kafka_poll(k->rk, 10);
topic_conf = rd_kafka_topic_conf_new();
k->rkt = rd_kafka_topic_new(k->rk, topic, topic_conf);
if(k->rkt == NULL)
printf("topic %s creation failed\n", topic);
}
}
}
开发者ID:fuse-kafka,项目名称:fuse_kafka,代码行数:26,代码来源:zookeeper.c
示例8: legacy_consumer_early_destroy
/**
* Issue #530:
* "Legacy Consumer. Delete hangs if done right after RdKafka::Consumer::create.
* But If I put a start and stop in between, there is no issue."
*/
static int legacy_consumer_early_destroy (void) {
rd_kafka_t *rk;
rd_kafka_topic_t *rkt;
int pass;
const char *topic = test_mk_topic_name(__FUNCTION__, 0);
for (pass = 0 ; pass < 2 ; pass++) {
TEST_SAY("%s: pass #%d\n", __FUNCTION__, pass);
rk = test_create_handle(RD_KAFKA_CONSUMER, NULL);
if (pass == 1) {
/* Second pass, create a topic too. */
rkt = rd_kafka_topic_new(rk, topic, NULL);
TEST_ASSERT(rkt, "failed to create topic: %s",
rd_kafka_err2str(
rd_kafka_errno2err(errno)));
rd_sleep(1);
rd_kafka_topic_destroy(rkt);
}
rd_kafka_destroy(rk);
}
return 0;
}
开发者ID:eugpermar,项目名称:librdkafka,代码行数:31,代码来源:0037-destroy_hang_local.c
示例9: consume_messages
static void consume_messages (uint64_t testid, const char *topic,
int32_t partition, int msg_base, int batch_cnt,
int msgcnt) {
rd_kafka_t *rk;
rd_kafka_topic_t *rkt;
rd_kafka_conf_t *conf;
rd_kafka_topic_conf_t *topic_conf;
int i;
test_conf_init(&conf, &topic_conf, 20);
/* Create kafka instance */
rk = test_create_handle(RD_KAFKA_CONSUMER, conf);
rkt = rd_kafka_topic_new(rk, topic, topic_conf);
if (!rkt)
TEST_FAIL("Failed to create topic: %s\n",
rd_kafka_err2str(rd_kafka_last_error()));
TEST_SAY("Consuming %i messages from partition %i\n",
batch_cnt, partition);
/* Consume messages */
if (rd_kafka_consume_start(rkt, partition,
RD_KAFKA_OFFSET_TAIL(batch_cnt)) == -1)
TEST_FAIL("consume_start(%i, -%i) failed: %s",
(int)partition, batch_cnt,
rd_kafka_err2str(rd_kafka_last_error()));
for (i = 0 ; i < batch_cnt ; i++) {
rd_kafka_message_t *rkmessage;
rkmessage = rd_kafka_consume(rkt, partition, tmout_multip(5000));
if (!rkmessage)
TEST_FAIL("Failed to consume message %i/%i from "
"partition %i: %s",
i, batch_cnt, (int)partition,
rd_kafka_err2str(rd_kafka_last_error()));
if (rkmessage->err)
TEST_FAIL("Consume message %i/%i from partition %i "
"has error: %s",
i, batch_cnt, (int)partition,
rd_kafka_err2str(rkmessage->err));
verify_consumed_msg(testid, partition, msg_base+i, rkmessage);
rd_kafka_message_destroy(rkmessage);
}
rd_kafka_consume_stop(rkt, partition);
/* Destroy topic */
rd_kafka_topic_destroy(rkt);
/* Destroy rdkafka instance */
TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk));
rd_kafka_destroy(rk);
}
开发者ID:Kitter,项目名称:librdkafka,代码行数:58,代码来源:0013-null-msgs.c
示例10: main
int main (int argc, char **argv) {
if (argc < 0 /* always false */) {
rd_kafka_version();
rd_kafka_version_str();
rd_kafka_err2str(RD_KAFKA_RESP_ERR_NO_ERROR);
rd_kafka_errno2err(EINVAL);
rd_kafka_conf_new();
rd_kafka_conf_destroy(NULL);
rd_kafka_conf_dup(NULL);
rd_kafka_conf_set(NULL, NULL, NULL, NULL, 0);
rd_kafka_conf_set_dr_cb(NULL, NULL);
rd_kafka_conf_set_error_cb(NULL, NULL);
rd_kafka_conf_set_stats_cb(NULL, NULL);
rd_kafka_conf_set_opaque(NULL, NULL);
rd_kafka_conf_dump(NULL, NULL);
rd_kafka_topic_conf_dump(NULL, NULL);
rd_kafka_conf_dump_free(NULL, 0);
rd_kafka_conf_properties_show(NULL);
rd_kafka_topic_conf_new();
rd_kafka_topic_conf_dup(NULL);
rd_kafka_topic_conf_destroy(NULL);
rd_kafka_topic_conf_set(NULL, NULL, NULL, NULL, 0);
rd_kafka_topic_conf_set_opaque(NULL, NULL);
rd_kafka_topic_conf_set_partitioner_cb(NULL, NULL);
rd_kafka_topic_partition_available(NULL, 0);
rd_kafka_msg_partitioner_random(NULL, NULL, 0, 0, NULL, NULL);
rd_kafka_new(0, NULL, NULL, 0);
rd_kafka_destroy(NULL);
rd_kafka_name(NULL);
rd_kafka_topic_new(NULL, NULL, NULL);
rd_kafka_topic_destroy(NULL);
rd_kafka_topic_name(NULL);
rd_kafka_message_destroy(NULL);
rd_kafka_message_errstr(NULL);
rd_kafka_consume_start(NULL, 0, 0);
rd_kafka_consume_stop(NULL, 0);
rd_kafka_consume(NULL, 0, 0);
rd_kafka_consume_batch(NULL, 0, 0, NULL, 0);
rd_kafka_consume_callback(NULL, 0, 0, NULL, NULL);
rd_kafka_offset_store(NULL, 0, 0);
rd_kafka_produce(NULL, 0, 0, NULL, 0, NULL, 0, NULL);
rd_kafka_poll(NULL, 0);
rd_kafka_brokers_add(NULL, NULL);
rd_kafka_set_logger(NULL, NULL);
rd_kafka_set_log_level(NULL, 0);
rd_kafka_log_print(NULL, 0, NULL, NULL);
rd_kafka_log_syslog(NULL, 0, NULL, NULL);
rd_kafka_outq_len(NULL);
rd_kafka_dump(NULL, NULL);
rd_kafka_thread_cnt();
rd_kafka_wait_destroyed(0);
}
return 0;
}
开发者ID:blblack,项目名称:librdkafka,代码行数:57,代码来源:0006-symbols.c
示例11: producer_init_kafka
/**
* producer_init_kafka
*
* Initialize the producer instance, setting up the Kafka topic and context.
*
* @param self VALUE Instance of the Producer Ruby object
* @param config HermannInstanceConfig* the instance configuration associated with this producer.
*/
void producer_init_kafka(VALUE self, HermannInstanceConfig* config) {
TRACER("initing (%p)\n", config);
config->quiet = !isatty(STDIN_FILENO);
/* Kafka configuration */
config->conf = rd_kafka_conf_new();
/* Add our `self` to the opaque pointer for error and logging callbacks
*/
rd_kafka_conf_set_opaque(config->conf, (void*)config);
rd_kafka_conf_set_error_cb(config->conf, producer_error_callback);
/* Topic configuration */
config->topic_conf = rd_kafka_topic_conf_new();
/* Set up a message delivery report callback.
* It will be called once for each message, either on successful
* delivery to broker, or upon failure to deliver to broker. */
rd_kafka_conf_set_dr_msg_cb(config->conf, msg_delivered);
/* Create Kafka handle */
if (!(config->rk = rd_kafka_new(RD_KAFKA_PRODUCER,
config->conf,
config->errstr,
sizeof(config->errstr)))) {
/* TODO: Use proper logger */
fprintf(stderr,
"%% Failed to create new producer: %s\n", config->errstr);
rb_raise(rb_eRuntimeError, "%% Failed to create new producer: %s\n", config->errstr);
}
/* Set logger */
rd_kafka_set_logger(config->rk, logger);
rd_kafka_set_log_level(config->rk, LOG_DEBUG);
if (rd_kafka_brokers_add(config->rk, config->brokers) == 0) {
/* TODO: Use proper logger */
fprintf(stderr, "%% No valid brokers specified\n");
rb_raise(rb_eRuntimeError, "No valid brokers specified");
return;
}
/* Create topic */
config->rkt = rd_kafka_topic_new(config->rk, config->topic, config->topic_conf);
/* Set the partitioner callback */
rd_kafka_topic_conf_set_partitioner_cb( config->topic_conf, producer_partitioner_callback);
/* We're now initialized */
config->isInitialized = 1;
TRACER("completed kafka init\n");
}
开发者ID:pocman,项目名称:hermann,代码行数:64,代码来源:hermann_lib.c
示例12: consumer_init
int consumer_init(const int partition, const char* topic, const char* brokers, Consume_Data consume_data, wrapper_Info* producer_info)
{
rd_kafka_conf_t *conf;
rd_kafka_topic_conf_t *topic_conf;
rd_kafka_t *rk;
char errstr[512];
producer_info->start_offset = RD_KAFKA_OFFSET_END;
producer_info->partition = partition;
if (NULL != consume_data)
producer_info->func_consume_data = consume_data;
else
return CONSUMER_INIT_FAILED;
/* Kafka configuration */
conf = rd_kafka_conf_new();
if (NULL == conf)
return CONSUMER_INIT_FAILED;
if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(conf, "group.id", "one", errstr, sizeof(errstr)))
return CONSUMER_INIT_FAILED;
/* Create Kafka handle */
if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf,
errstr, sizeof(errstr)))) {
fprintf(stderr,
"%% Failed to create new consumer: %s\n",
errstr);
return CONSUMER_INIT_FAILED;
}
rd_kafka_set_log_level(rk, LOG_DEBUG);
/* Add brokers */
if (rd_kafka_brokers_add(rk, brokers) == 0) {
fprintf(stderr, "%% No valid brokers specified\n");
return CONSUMER_INIT_FAILED;
}
/* Topic configuration */
topic_conf = rd_kafka_topic_conf_new();
/* Create topic */
producer_info->rkt = rd_kafka_topic_new(rk, topic, topic_conf);
producer_info->rk = rk;
/* Start consuming */
if (rd_kafka_consume_start(producer_info->rkt, partition, RD_KAFKA_OFFSET_END) == -1){
fprintf(stderr, "%% Failed to start consuming: %s\n",
rd_kafka_err2str(rd_kafka_errno2err(errno)));
return CONSUMER_INIT_FAILED;
}
return CONSUMER_INIT_SUCCESS;
}
开发者ID:zjpanghao,项目名称:get_shdx_data_by_index,代码行数:56,代码来源:KafkaWrapper.cpp
示例13: kafka_handle
static int kafka_handle(struct kafka_topic_context *ctx) /* {{{ */
{
char errbuf[1024];
rd_kafka_conf_t *conf;
rd_kafka_topic_conf_t *topic_conf;
if (ctx->kafka != NULL && ctx->topic != NULL)
return(0);
if (ctx->kafka == NULL) {
if ((conf = rd_kafka_conf_dup(ctx->kafka_conf)) == NULL) {
ERROR("write_kafka plugin: cannot duplicate kafka config");
return(1);
}
if ((ctx->kafka = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
errbuf, sizeof(errbuf))) == NULL) {
ERROR("write_kafka plugin: cannot create kafka handle.");
return 1;
}
rd_kafka_conf_destroy(ctx->kafka_conf);
ctx->kafka_conf = NULL;
INFO ("write_kafka plugin: created KAFKA handle : %s", rd_kafka_name(ctx->kafka));
#if defined(HAVE_LIBRDKAFKA_LOGGER) && !defined(HAVE_LIBRDKAFKA_LOG_CB)
rd_kafka_set_logger(ctx->kafka, kafka_log);
#endif
}
if (ctx->topic == NULL ) {
if ((topic_conf = rd_kafka_topic_conf_dup(ctx->conf)) == NULL) {
ERROR("write_kafka plugin: cannot duplicate kafka topic config");
return 1;
}
if ((ctx->topic = rd_kafka_topic_new(ctx->kafka, ctx->topic_name,
topic_conf)) == NULL) {
ERROR("write_kafka plugin: cannot create topic : %s\n",
rd_kafka_err2str(rd_kafka_errno2err(errno)));
return errno;
}
rd_kafka_topic_conf_destroy(ctx->conf);
ctx->conf = NULL;
INFO ("write_kafka plugin: handle created for topic : %s", rd_kafka_topic_name(ctx->topic));
}
return(0);
} /* }}} int kafka_handle */
开发者ID:4thAce,项目名称:collectd,代码行数:53,代码来源:write_kafka.c
示例14: main_0001_multiobj
int main_0001_multiobj (int argc, char **argv) {
int partition = RD_KAFKA_PARTITION_UA; /* random */
int i;
const int NUM_ITER = 10;
const char *topic = NULL;
TEST_SAY("Creating and destroying %i kafka instances\n", NUM_ITER);
/* Create, use and destroy NUM_ITER kafka instances. */
for (i = 0 ; i < NUM_ITER ; i++) {
rd_kafka_t *rk;
rd_kafka_topic_t *rkt;
rd_kafka_conf_t *conf;
rd_kafka_topic_conf_t *topic_conf;
char msg[128];
test_timing_t t_destroy;
test_conf_init(&conf, &topic_conf, 30);
if (!topic)
topic = test_mk_topic_name("0001", 0);
rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
rkt = rd_kafka_topic_new(rk, topic, topic_conf);
if (!rkt)
TEST_FAIL("Failed to create topic for "
"rdkafka instance #%i: %s\n",
i, rd_kafka_err2str(rd_kafka_errno2err(errno)));
rd_snprintf(msg, sizeof(msg), "%s test message for iteration #%i",
argv[0], i);
/* Produce a message */
rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY,
msg, strlen(msg), NULL, 0, NULL);
/* Wait for it to be sent (and possibly acked) */
rd_kafka_flush(rk, -1);
/* Destroy topic */
rd_kafka_topic_destroy(rkt);
/* Destroy rdkafka instance */
TIMING_START(&t_destroy, "rd_kafka_destroy()");
rd_kafka_destroy(rk);
TIMING_STOP(&t_destroy);
}
return 0;
}
开发者ID:lambdaknight,项目名称:librdkafka,代码行数:51,代码来源:0001-multiobj.c
示例15: p_kafka_set_topic
void p_kafka_set_topic(struct p_kafka_host *kafka_host, char *topic)
{
if (kafka_host) {
kafka_host->topic_cfg = rd_kafka_topic_conf_new();
/* destroy current allocation before making a new one */
if (kafka_host->topic) p_kafka_unset_topic(kafka_host);
if (kafka_host->rk && kafka_host->topic_cfg) {
kafka_host->topic = rd_kafka_topic_new(kafka_host->rk, topic, kafka_host->topic_cfg);
kafka_host->topic_cfg = NULL; /* rd_kafka_topic_new() destroys conf as per rdkafka.h */
}
}
}
开发者ID:Rosiak,项目名称:pmacct,代码行数:14,代码来源:kafka_common.c
示例16: RdkHandle_start
/* Shared logic of Consumer_start and Producer_start */
static PyObject *
RdkHandle_start(RdkHandle *self,
rd_kafka_type_t rdk_type,
const char *brokers,
const char *topic_name)
{
if (RdkHandle_excl_lock(self)) return NULL;
if (self->rdk_handle) {
set_pykafka_error("RdKafkaException", "Already started!");
return RdkHandle_start_fail(self, RdkHandle_stop);
}
/* Configure and start rdk_handle */
char errstr[512];
Py_BEGIN_ALLOW_THREADS /* avoid callbacks deadlocking */
self->rdk_handle = rd_kafka_new(
rdk_type, self->rdk_conf, errstr, sizeof(errstr));
self->rdk_conf = NULL; /* deallocated by rd_kafka_new() */
Py_END_ALLOW_THREADS
if (! self->rdk_handle) {
set_pykafka_error("RdKafkaException", errstr);
return RdkHandle_start_fail(self, RdkHandle_stop);
}
/* Set brokers */
int brokers_added;
Py_BEGIN_ALLOW_THREADS /* avoid callbacks deadlocking */
brokers_added = rd_kafka_brokers_add(self->rdk_handle, brokers);
Py_END_ALLOW_THREADS
if (brokers_added == 0) {
set_pykafka_error("RdKafkaException", "adding brokers failed");
return RdkHandle_start_fail(self, RdkHandle_stop);
}
/* Configure and take out a topic handle */
Py_BEGIN_ALLOW_THREADS /* avoid callbacks deadlocking */
self->rdk_topic_handle = rd_kafka_topic_new(self->rdk_handle,
topic_name,
self->rdk_topic_conf);
self->rdk_topic_conf = NULL; /* deallocated by rd_kafka_topic_new() */
Py_END_ALLOW_THREADS
if (! self->rdk_topic_handle) {
set_pykafka_error_from_code(rd_kafka_errno2err(errno), NULL);
return RdkHandle_start_fail(self, RdkHandle_stop);
}
if (RdkHandle_unlock(self)) return NULL;
Py_INCREF(Py_None);
return Py_None;
}
开发者ID:BlackRider97,项目名称:pykafka,代码行数:51,代码来源:_rd_kafkamodule.c
示例17: producer_init
int producer_init(const int partition, const char* topic, const char* brokers, Msg_Delivered func_msg_delivered, wrapper_Info* producer_info)
{
rd_kafka_conf_t *conf;
rd_kafka_topic_conf_t *topic_conf;
rd_kafka_t *rk;
char errstr[512];
producer_info->partition = partition;
strcpy(producer_info->topic, topic);
if (NULL != func_msg_delivered)
producer_info->func_msg_delivered = func_msg_delivered;
else
return PRODUCER_INIT_FAILED;
/* Kafka configuration */
conf = rd_kafka_conf_new();
if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(conf, "queue.buffering.max.messages", "500000", NULL, 0))
return PRODUCER_INIT_FAILED;
/* Set logger */
rd_kafka_conf_set_log_cb(conf, logger);
/* Topic configuration */
topic_conf = rd_kafka_topic_conf_new();
rd_kafka_conf_set_dr_cb(conf, func_msg_delivered);
/* Create Kafka handle */
if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
errstr, sizeof(errstr)))) {
fprintf(stderr,
"%% Failed to create new producer: %s\n",
errstr);
return PRODUCER_INIT_FAILED;
}
/* Add brokers */
if (rd_kafka_brokers_add(rk, brokers) == 0) {
fprintf(stderr, "%% No valid brokers specified\n");
return PRODUCER_INIT_FAILED;
}
/* Create topic */
producer_info->rkt = rd_kafka_topic_new(rk, topic, topic_conf);
producer_info->rk = rk;
return PRODUCER_INIT_SUCCESS;
}
开发者ID:zjpanghao,项目名称:get_shdx_data_by_index,代码行数:49,代码来源:KafkaWrapper.cpp
示例18: kafka_send
static VALUE kafka_send(VALUE self, VALUE topic_value, VALUE key, VALUE message)
{
rd_kafka_topic_conf_t *topic_conf = NULL;
rd_kafka_topic_t *topic = NULL;
char *topic_name = NULL;
void *message_bytes = NULL;
size_t message_len = 0;
void *key_buf = NULL;
size_t key_len = 0;
int res = 0;
if (!NIL_P(key)) {
key_buf = RSTRING_PTR(key);
key_len = RSTRING_LEN(key);
}
topic_name = StringValueCStr(topic_value);
if (!topic_name) {
rb_raise(rb_eStandardError, "topic is not a string!");
}
if(!NIL_P(message)) {
message_bytes = RSTRING_PTR(message);
if(!message_bytes) {
rb_raise(rb_eStandardError, "failed to get message ptr");
}
message_len = RSTRING_LEN(message);
}
topic_conf = rd_kafka_topic_conf_new();
if(!topic_conf) {
rb_raise(rb_eStandardError, "failed to create kafka topic configuration");
}
topic = rd_kafka_topic_new(rk, topic_name, topic_conf);
if(!topic) {
rb_raise(rb_eStandardError, "failed to create topic");
}
res = rd_kafka_produce(topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, message_bytes, message_len,
key_buf, key_len, NULL);
if (res) {
rb_raise(rb_eStandardError, "rd_kafka_produce failed: %d", res);
}
return Qnil;
}
开发者ID:ibawt,项目名称:ckafka,代码行数:49,代码来源:ckafka.c
示例19: PyErr_SetString
static PyObject *Consumer_seek (Handle *self, PyObject *args, PyObject *kwargs) {
TopicPartition *tp;
rd_kafka_resp_err_t err;
static char *kws[] = { "partition", NULL };
rd_kafka_topic_t *rkt;
if (!self->rk) {
PyErr_SetString(PyExc_RuntimeError, "Consumer closed");
return NULL;
}
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O", kws,
(PyObject **)&tp))
return NULL;
if (PyObject_Type((PyObject *)tp) != (PyObject *)&TopicPartitionType) {
PyErr_Format(PyExc_TypeError,
"expected %s", TopicPartitionType.tp_name);
return NULL;
}
rkt = rd_kafka_topic_new(self->rk, tp->topic, NULL);
if (!rkt) {
cfl_PyErr_Format(rd_kafka_last_error(),
"Failed to get topic object for "
"topic \"%s\": %s",
tp->topic,
rd_kafka_err2str(rd_kafka_last_error()));
return NULL;
}
Py_BEGIN_ALLOW_THREADS;
err = rd_kafka_seek(rkt, tp->partition, tp->offset, -1);
Py_END_ALLOW_THREADS;
rd_kafka_topic_destroy(rkt);
if (err) {
cfl_PyErr_Format(err,
"Failed to seek to offset %"CFL_PRId64": %s",
tp->offset, rd_kafka_err2str(err));
return NULL;
}
Py_RETURN_NONE;
}
开发者ID:confluentinc,项目名称:confluent-kafka-python,代码行数:48,代码来源:Consumer.c
示例20: setup_kafka
/**
* @brief setup_kafka initialises librdkafka based on the config
* wrapped in kafka_t
* @param k kafka configuration
**/
int setup_kafka(kafka_t* k)
{
char* brokers = "localhost:9092";
char* zookeepers = NULL;
char* topic = "bloh";
config* fk_conf = (config*) fuse_get_context()->private_data;
if(fk_conf->zookeepers_n > 0) zookeepers = fk_conf->zookeepers[0];
if(fk_conf->brokers_n > 0) brokers = fk_conf->brokers[0];
topic = fk_conf->topic[0];
rd_kafka_topic_conf_t *topic_conf;
rd_kafka_conf_t *conf;
conf = rd_kafka_conf_new();
rd_kafka_conf_set_dr_cb(conf, msg_delivered);
if(rd_kafka_conf_set(conf, "debug", "all",
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
rd_kafka_conf_set(conf, "batch.num.messages", "1",
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
printf("%% Debug configuration failed: %s: %s\n",
errstr, "all");
return(1);
}
if (!(k->rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
errstr, sizeof(errstr)))) {
fprintf(stderr,
"%% Failed to create new producer: %s\n",
errstr);
return(1);
}
rd_kafka_set_logger(k->rk, logger);
rd_kafka_set_log_level(k->rk, 7);
if (zookeepers != NULL)
{
initialize_zookeeper(zookeepers, k);
return 0;
}
else
{
if (rd_kafka_brokers_add(k->rk, brokers) == 0) {
fprintf(stderr, "%% No valid brokers specified\n");
return(1);
}
topic_conf = rd_kafka_topic_conf_new();
k->rkt = rd_kafka_topic_new(k->rk, topic, topic_conf);
if(k->rkt == NULL)
printf("topic %s creation failed\n", topic);
return k->rkt == NULL;
}
}
开发者ID:LLParse,项目名称:fuse_kafka,代码行数:53,代码来源:kafka_client.c
注:本文中的rd_kafka_topic_new函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论