本文整理汇总了C++中rd_kafka_topic_destroy函数的典型用法代码示例。如果您正苦于以下问题:C++ rd_kafka_topic_destroy函数的具体用法?C++ rd_kafka_topic_destroy怎么用?C++ rd_kafka_topic_destroy使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了rd_kafka_topic_destroy函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的C++代码示例。
示例1: kafka_partition_count
static
int kafka_partition_count(rd_kafka_t *r, const char *topic)
{
rd_kafka_topic_t *rkt;
rd_kafka_topic_conf_t *conf;
int i;//C89 compliant
//connect as consumer if required
if (r == NULL)
{
if (log_level)
{
openlog("phpkafka", 0, LOG_USER);
syslog(LOG_ERR, "phpkafka - no connection to get partition count for topic: %s", topic);
}
return -1;
}
/* Topic configuration */
conf = rd_kafka_topic_conf_new();
/* Create topic */
rkt = rd_kafka_topic_new(r, topic, conf);
//metadata API required rd_kafka_metadata_t** to be passed
const struct rd_kafka_metadata *meta = NULL;
if (RD_KAFKA_RESP_ERR_NO_ERROR == rd_kafka_metadata(r, 0, rkt, &meta, 200))
i = (int) meta->topics->partition_cnt;
else
i = 0;
if (meta) {
rd_kafka_metadata_destroy(meta);
}
rd_kafka_topic_destroy(rkt);
return i;
}
开发者ID:dwieland,项目名称:phpkafka,代码行数:33,代码来源:kafka.c
示例2: test_producer_no_connection
static void test_producer_no_connection (void) {
rd_kafka_t *rk;
rd_kafka_conf_t *conf;
rd_kafka_topic_t *rkt;
int i;
const int partition_cnt = 2;
int msgcnt = 0;
test_timing_t t_destroy;
test_conf_init(&conf, NULL, 20);
test_conf_set(conf, "bootstrap.servers", NULL);
rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
rkt = test_create_topic_object(rk, __FUNCTION__,
"message.timeout.ms", "5000", NULL);
test_produce_msgs_nowait(rk, rkt, 0, RD_KAFKA_PARTITION_UA, 0, 100,
NULL, 100, 0, &msgcnt);
for (i = 0 ; i < partition_cnt ; i++)
test_produce_msgs_nowait(rk, rkt, 0, i,
0, 100, NULL, 100, 0, &msgcnt);
rd_kafka_poll(rk, 1000);
TEST_SAY("%d messages in queue\n", rd_kafka_outq_len(rk));
rd_kafka_topic_destroy(rkt);
TIMING_START(&t_destroy, "rd_kafka_destroy()");
rd_kafka_destroy(rk);
TIMING_STOP(&t_destroy);
}
开发者ID:Whissi,项目名称:librdkafka,代码行数:33,代码来源:0043-no_connection.c
示例3: legacy_consumer_early_destroy
/**
* Issue #530:
* "Legacy Consumer. Delete hangs if done right after RdKafka::Consumer::create.
* But If I put a start and stop in between, there is no issue."
*/
static int legacy_consumer_early_destroy (void) {
rd_kafka_t *rk;
rd_kafka_topic_t *rkt;
int pass;
const char *topic = test_mk_topic_name(__FUNCTION__, 0);
for (pass = 0 ; pass < 2 ; pass++) {
TEST_SAY("%s: pass #%d\n", __FUNCTION__, pass);
rk = test_create_handle(RD_KAFKA_CONSUMER, NULL);
if (pass == 1) {
/* Second pass, create a topic too. */
rkt = rd_kafka_topic_new(rk, topic, NULL);
TEST_ASSERT(rkt, "failed to create topic: %s",
rd_kafka_err2str(
rd_kafka_errno2err(errno)));
rd_sleep(1);
rd_kafka_topic_destroy(rkt);
}
rd_kafka_destroy(rk);
}
return 0;
}
开发者ID:eugpermar,项目名称:librdkafka,代码行数:31,代码来源:0037-destroy_hang_local.c
示例4: p_kafka_unset_topic
void p_kafka_unset_topic(struct p_kafka_host *kafka_host)
{
if (kafka_host && kafka_host->topic) {
rd_kafka_topic_destroy(kafka_host->topic);
kafka_host->topic = NULL;
}
}
开发者ID:jrossi,项目名称:pmacct-1,代码行数:7,代码来源:kafka_common.c
示例5: main_0021_rkt_destroy
int main_0021_rkt_destroy (int argc, char **argv) {
const char *topic = test_mk_topic_name(__FUNCTION__, 0);
rd_kafka_t *rk;
rd_kafka_topic_t *rkt;
const int msgcnt = 1000;
uint64_t testid;
int remains = 0;
test_conf_init(NULL, NULL, 10);
testid = test_id_generate();
rk = test_create_producer();
rkt = test_create_producer_topic(rk, topic, NULL);
test_produce_msgs_nowait(rk, rkt, testid, RD_KAFKA_PARTITION_UA,
0, msgcnt, NULL, 0, &remains);
TEST_ASSERT(msgcnt == remains, "Only %d/%d messages produced",
remains, msgcnt);
rd_kafka_topic_destroy(rkt);
test_wait_delivery(rk, &remains);
rd_kafka_destroy(rk);
return 0;
}
开发者ID:2510109890,项目名称:librdkafka,代码行数:30,代码来源:0021-rkt_destroy.c
示例6: producer_close
void producer_close(wrapper_Info* producer_info)
{
/* Destroy topic */
rd_kafka_topic_destroy(producer_info->rkt);
/* Destroy the handle */
rd_kafka_destroy(producer_info->rk);
}
开发者ID:zjpanghao,项目名称:get_shdx_data_by_index,代码行数:8,代码来源:KafkaWrapper.cpp
示例7: table_metadata_free
void table_metadata_free(table_metadata_t table) {
if (table->table_name) free(table->table_name);
if (table->topic) {
rd_kafka_topic_destroy(table->topic);
}
if (table->row_schema) avro_schema_decref(table->row_schema);
if (table->key_schema) avro_schema_decref(table->key_schema);
}
开发者ID:SanthoshPrasad,项目名称:bottledwater-pg,代码行数:8,代码来源:table_mapper.c
示例8: consume_messages
static void consume_messages (uint64_t testid, const char *topic,
int32_t partition, int msg_base, int batch_cnt,
int msgcnt) {
rd_kafka_t *rk;
rd_kafka_topic_t *rkt;
rd_kafka_conf_t *conf;
rd_kafka_topic_conf_t *topic_conf;
int i;
test_conf_init(&conf, &topic_conf, 20);
/* Create kafka instance */
rk = test_create_handle(RD_KAFKA_CONSUMER, conf);
rkt = rd_kafka_topic_new(rk, topic, topic_conf);
if (!rkt)
TEST_FAIL("Failed to create topic: %s\n",
rd_kafka_err2str(rd_kafka_last_error()));
TEST_SAY("Consuming %i messages from partition %i\n",
batch_cnt, partition);
/* Consume messages */
if (rd_kafka_consume_start(rkt, partition,
RD_KAFKA_OFFSET_TAIL(batch_cnt)) == -1)
TEST_FAIL("consume_start(%i, -%i) failed: %s",
(int)partition, batch_cnt,
rd_kafka_err2str(rd_kafka_last_error()));
for (i = 0 ; i < batch_cnt ; i++) {
rd_kafka_message_t *rkmessage;
rkmessage = rd_kafka_consume(rkt, partition, tmout_multip(5000));
if (!rkmessage)
TEST_FAIL("Failed to consume message %i/%i from "
"partition %i: %s",
i, batch_cnt, (int)partition,
rd_kafka_err2str(rd_kafka_last_error()));
if (rkmessage->err)
TEST_FAIL("Consume message %i/%i from partition %i "
"has error: %s",
i, batch_cnt, (int)partition,
rd_kafka_err2str(rkmessage->err));
verify_consumed_msg(testid, partition, msg_base+i, rkmessage);
rd_kafka_message_destroy(rkmessage);
}
rd_kafka_consume_stop(rkt, partition);
/* Destroy topic */
rd_kafka_topic_destroy(rkt);
/* Destroy rdkafka instance */
TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk));
rd_kafka_destroy(rk);
}
开发者ID:Kitter,项目名称:librdkafka,代码行数:58,代码来源:0013-null-msgs.c
示例9: main
int main (int argc, char **argv) {
if (argc < 0 /* always false */) {
rd_kafka_version();
rd_kafka_version_str();
rd_kafka_err2str(RD_KAFKA_RESP_ERR_NO_ERROR);
rd_kafka_errno2err(EINVAL);
rd_kafka_conf_new();
rd_kafka_conf_destroy(NULL);
rd_kafka_conf_dup(NULL);
rd_kafka_conf_set(NULL, NULL, NULL, NULL, 0);
rd_kafka_conf_set_dr_cb(NULL, NULL);
rd_kafka_conf_set_error_cb(NULL, NULL);
rd_kafka_conf_set_stats_cb(NULL, NULL);
rd_kafka_conf_set_opaque(NULL, NULL);
rd_kafka_conf_dump(NULL, NULL);
rd_kafka_topic_conf_dump(NULL, NULL);
rd_kafka_conf_dump_free(NULL, 0);
rd_kafka_conf_properties_show(NULL);
rd_kafka_topic_conf_new();
rd_kafka_topic_conf_dup(NULL);
rd_kafka_topic_conf_destroy(NULL);
rd_kafka_topic_conf_set(NULL, NULL, NULL, NULL, 0);
rd_kafka_topic_conf_set_opaque(NULL, NULL);
rd_kafka_topic_conf_set_partitioner_cb(NULL, NULL);
rd_kafka_topic_partition_available(NULL, 0);
rd_kafka_msg_partitioner_random(NULL, NULL, 0, 0, NULL, NULL);
rd_kafka_new(0, NULL, NULL, 0);
rd_kafka_destroy(NULL);
rd_kafka_name(NULL);
rd_kafka_topic_new(NULL, NULL, NULL);
rd_kafka_topic_destroy(NULL);
rd_kafka_topic_name(NULL);
rd_kafka_message_destroy(NULL);
rd_kafka_message_errstr(NULL);
rd_kafka_consume_start(NULL, 0, 0);
rd_kafka_consume_stop(NULL, 0);
rd_kafka_consume(NULL, 0, 0);
rd_kafka_consume_batch(NULL, 0, 0, NULL, 0);
rd_kafka_consume_callback(NULL, 0, 0, NULL, NULL);
rd_kafka_offset_store(NULL, 0, 0);
rd_kafka_produce(NULL, 0, 0, NULL, 0, NULL, 0, NULL);
rd_kafka_poll(NULL, 0);
rd_kafka_brokers_add(NULL, NULL);
rd_kafka_set_logger(NULL, NULL);
rd_kafka_set_log_level(NULL, 0);
rd_kafka_log_print(NULL, 0, NULL, NULL);
rd_kafka_log_syslog(NULL, 0, NULL, NULL);
rd_kafka_outq_len(NULL);
rd_kafka_dump(NULL, NULL);
rd_kafka_thread_cnt();
rd_kafka_wait_destroyed(0);
}
return 0;
}
开发者ID:blblack,项目名称:librdkafka,代码行数:57,代码来源:0006-symbols.c
示例10: do_test_implicit_ack
/**
* @brief Test handling of implicit acks.
*
* @param batch_cnt Total number of batches, ProduceRequests, sent.
* @param initial_fail_batch_cnt How many of the initial batches should
* fail with an emulated network timeout.
*/
static void do_test_implicit_ack (const char *what,
int batch_cnt, int initial_fail_batch_cnt) {
rd_kafka_t *rk;
const char *topic = test_mk_topic_name("0090_idempotence_impl_ack", 1);
const int32_t partition = 0;
uint64_t testid;
int msgcnt = 10*batch_cnt;
rd_kafka_conf_t *conf;
rd_kafka_topic_t *rkt;
test_msgver_t mv;
TEST_SAY(_C_MAG "[ Test implicit ack: %s ]\n", what);
rd_atomic32_init(&state.produce_cnt, 0);
state.batch_cnt = batch_cnt;
state.initial_fail_batch_cnt = initial_fail_batch_cnt;
testid = test_id_generate();
test_conf_init(&conf, NULL, 60);
rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
test_conf_set(conf, "enable.idempotence", "true");
test_conf_set(conf, "batch.num.messages", "10");
test_conf_set(conf, "linger.ms", "500");
test_conf_set(conf, "retry.backoff.ms", "2000");
/* The ProduceResponse handler will inject timed-out-in-flight
* errors for the first N ProduceRequests, which will trigger retries
* that in turn will result in OutOfSequence errors. */
test_conf_set(conf, "ut_handle_ProduceResponse",
(char *)handle_ProduceResponse);
test_create_topic(topic, 1, 1);
rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
rkt = test_create_producer_topic(rk, topic, NULL);
TEST_SAY("Producing %d messages\n", msgcnt);
test_produce_msgs(rk, rkt, testid, -1, 0, msgcnt, NULL, 0);
TEST_SAY("Flushing..\n");
rd_kafka_flush(rk, 10000);
rd_kafka_topic_destroy(rkt);
rd_kafka_destroy(rk);
TEST_SAY("Verifying messages with consumer\n");
test_msgver_init(&mv, testid);
test_consume_msgs_easy_mv(NULL, topic, partition,
testid, 1, msgcnt, NULL, &mv);
test_msgver_verify("verify", &mv, TEST_MSGVER_ALL, 0, msgcnt);
test_msgver_clear(&mv);
TEST_SAY(_C_GRN "[ Test implicit ack: %s : PASS ]\n", what);
}
开发者ID:Whissi,项目名称:librdkafka,代码行数:63,代码来源:0090-idempotence.c
示例11: main_0041_fetch_max_bytes
int main_0041_fetch_max_bytes (int argc, char **argv) {
const char *topic = test_mk_topic_name(__FUNCTION__, 1);
const int partition = 0;
const int msgcnt = 2*1000;
const int MAX_BYTES = 100000;
uint64_t testid;
rd_kafka_conf_t *conf;
rd_kafka_t *rk;
rd_kafka_topic_t *rkt;
test_conf_init(NULL, NULL, 60);
testid = test_id_generate();
rk = test_create_producer();
rkt = test_create_producer_topic(rk, topic, NULL);
test_produce_msgs(rk, rkt, testid, partition, 0, msgcnt/2, NULL, MAX_BYTES/10);
test_produce_msgs(rk, rkt, testid, partition, msgcnt/2, msgcnt/2, NULL, MAX_BYTES*5);
rd_kafka_topic_destroy(rkt);
rd_kafka_destroy(rk);
TEST_SAY("Creating consumer\n");
test_conf_init(&conf, NULL, 0);
test_conf_set(conf, "fetch.message.max.bytes", tsprintf("%d", MAX_BYTES));
rk = test_create_consumer(NULL, NULL, conf, NULL);
rkt = rd_kafka_topic_new(rk, topic, NULL);
test_consumer_start("CONSUME", rkt, partition,
RD_KAFKA_OFFSET_BEGINNING);
test_consume_msgs("CONSUME", rkt, testid, partition, TEST_NO_SEEK,
0, msgcnt, 1);
test_consumer_stop("CONSUME", rkt, partition);
rd_kafka_topic_destroy(rkt);
rd_kafka_destroy(rk);
return 0;
}
开发者ID:lambdaknight,项目名称:librdkafka,代码行数:41,代码来源:0041-fetch_max_bytes.c
示例12: main_0036_partial_fetch
int main_0036_partial_fetch (int argc, char **argv) {
const char *topic = test_mk_topic_name(__FUNCTION__, 1);
const int partition = 0;
const int msgcnt = 100;
const int msgsize = 1000;
uint64_t testid;
rd_kafka_conf_t *conf;
rd_kafka_t *rk;
rd_kafka_topic_t *rkt;
TEST_SAY("Producing %d messages of size %d to %s [%d]\n",
msgcnt, (int)msgsize, topic, partition);
testid = test_id_generate();
rk = test_create_producer();
rkt = test_create_producer_topic(rk, topic, NULL);
test_produce_msgs(rk, rkt, testid, partition, 0, msgcnt, NULL, msgsize);
rd_kafka_topic_destroy(rkt);
rd_kafka_destroy(rk);
TEST_SAY("Creating consumer\n");
test_conf_init(&conf, NULL, 0);
/* This should fetch 1.5 messages per fetch, thus resulting in
* partial fetches, hopefully. */
test_conf_set(conf, "fetch.message.max.bytes", "1500");
rk = test_create_consumer(NULL, NULL, conf, NULL, NULL);
rkt = rd_kafka_topic_new(rk, topic, NULL);
test_consumer_start("CONSUME", rkt, partition,
RD_KAFKA_OFFSET_BEGINNING);
test_consume_msgs("CONSUME", rkt, testid, partition, TEST_NO_SEEK,
0, msgcnt, 1);
test_consumer_stop("CONSUME", rkt, partition);
rd_kafka_topic_destroy(rkt);
rd_kafka_destroy(rk);
return 0;
}
开发者ID:BDeus,项目名称:librdkafka,代码行数:41,代码来源:0036-partial_fetch.c
示例13: main_0001_multiobj
int main_0001_multiobj (int argc, char **argv) {
int partition = RD_KAFKA_PARTITION_UA; /* random */
int i;
const int NUM_ITER = 10;
const char *topic = NULL;
TEST_SAY("Creating and destroying %i kafka instances\n", NUM_ITER);
/* Create, use and destroy NUM_ITER kafka instances. */
for (i = 0 ; i < NUM_ITER ; i++) {
rd_kafka_t *rk;
rd_kafka_topic_t *rkt;
rd_kafka_conf_t *conf;
rd_kafka_topic_conf_t *topic_conf;
char msg[128];
test_timing_t t_destroy;
test_conf_init(&conf, &topic_conf, 30);
if (!topic)
topic = test_mk_topic_name("0001", 0);
rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
rkt = rd_kafka_topic_new(rk, topic, topic_conf);
if (!rkt)
TEST_FAIL("Failed to create topic for "
"rdkafka instance #%i: %s\n",
i, rd_kafka_err2str(rd_kafka_errno2err(errno)));
rd_snprintf(msg, sizeof(msg), "%s test message for iteration #%i",
argv[0], i);
/* Produce a message */
rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY,
msg, strlen(msg), NULL, 0, NULL);
/* Wait for it to be sent (and possibly acked) */
rd_kafka_flush(rk, -1);
/* Destroy topic */
rd_kafka_topic_destroy(rkt);
/* Destroy rdkafka instance */
TIMING_START(&t_destroy, "rd_kafka_destroy()");
rd_kafka_destroy(rk);
TIMING_STOP(&t_destroy);
}
return 0;
}
开发者ID:lambdaknight,项目名称:librdkafka,代码行数:51,代码来源:0001-multiobj.c
示例14: consumer_close
void consumer_close(wrapper_Info* consumer_info)
{
/* Stop consuming */
rd_kafka_consume_stop(consumer_info->rkt, consumer_info->partition);
while (rd_kafka_outq_len(consumer_info->rk) > 0)
rd_kafka_poll(consumer_info->rk, 10);
/* Destroy topic */
rd_kafka_topic_destroy(consumer_info->rkt);
/* Destroy handle */
rd_kafka_destroy(consumer_info->rk);
}
开发者ID:zjpanghao,项目名称:get_shdx_data_by_index,代码行数:14,代码来源:KafkaWrapper.cpp
示例15: PyErr_SetString
static PyObject *Consumer_seek (Handle *self, PyObject *args, PyObject *kwargs) {
TopicPartition *tp;
rd_kafka_resp_err_t err;
static char *kws[] = { "partition", NULL };
rd_kafka_topic_t *rkt;
if (!self->rk) {
PyErr_SetString(PyExc_RuntimeError, "Consumer closed");
return NULL;
}
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O", kws,
(PyObject **)&tp))
return NULL;
if (PyObject_Type((PyObject *)tp) != (PyObject *)&TopicPartitionType) {
PyErr_Format(PyExc_TypeError,
"expected %s", TopicPartitionType.tp_name);
return NULL;
}
rkt = rd_kafka_topic_new(self->rk, tp->topic, NULL);
if (!rkt) {
cfl_PyErr_Format(rd_kafka_last_error(),
"Failed to get topic object for "
"topic \"%s\": %s",
tp->topic,
rd_kafka_err2str(rd_kafka_last_error()));
return NULL;
}
Py_BEGIN_ALLOW_THREADS;
err = rd_kafka_seek(rkt, tp->partition, tp->offset, -1);
Py_END_ALLOW_THREADS;
rd_kafka_topic_destroy(rkt);
if (err) {
cfl_PyErr_Format(err,
"Failed to seek to offset %"CFL_PRId64": %s",
tp->offset, rd_kafka_err2str(err));
return NULL;
}
Py_RETURN_NONE;
}
开发者ID:confluentinc,项目名称:confluent-kafka-python,代码行数:48,代码来源:Consumer.c
示例16: table_metadata_update_topic
/* Returns 0 on success. On failure, sets mapper->error and returns nonzero. */
int table_metadata_update_topic(table_mapper_t mapper, table_metadata_t table, const char* table_name) {
const char* prev_table_name = table->table_name;
if (table->topic) {
if (strcmp(table_name, prev_table_name)) {
logf("Registering new table (was \"%s\", now \"%s\") for relid %" PRIu32 "\n", prev_table_name, table_name, table->relid);
free(table->table_name);
rd_kafka_topic_destroy(table->topic);
} else return 0; // table name didn't change, nothing to do
}
table->table_name = strdup(table_name);
const char *topic_name;
/* both branches set topic_name to a pointer we don't need to free,
* since rd_kafka_topic_new below is going to copy it anyway */
if (mapper->topic_prefix != NULL) {
char prefixed_name[TABLE_MAPPER_MAX_TOPIC_LEN];
int size = snprintf(prefixed_name, TABLE_MAPPER_MAX_TOPIC_LEN,
"%s%c%s",
mapper->topic_prefix, TABLE_MAPPER_TOPIC_PREFIX_DELIMITER, table_name);
if (size >= TABLE_MAPPER_MAX_TOPIC_LEN) {
mapper_error(mapper, "prefixed topic name is too long (max %d bytes): prefix %s, table name %s",
TABLE_MAPPER_MAX_TOPIC_LEN, mapper->topic_prefix, table_name);
return -1;
}
topic_name = prefixed_name;
/* needn't free topic_name because prefixed_name was stack-allocated */
} else {
topic_name = table_name;
/* needn't free topic_name because it aliases table_name which we don't own */
}
logf("Opening Kafka topic \"%s\" for table \"%s\"\n", topic_name, table_name);
table->topic = rd_kafka_topic_new(mapper->kafka, topic_name,
rd_kafka_topic_conf_dup(mapper->topic_conf));
if (!table->topic) {
mapper_error(mapper, "Cannot open Kafka topic %s: %s", topic_name,
rd_kafka_err2str(rd_kafka_errno2err(errno)));
return -1;
}
return 0;
}
开发者ID:SanthoshPrasad,项目名称:bottledwater-pg,代码行数:49,代码来源:table_mapper.c
示例17: kafka_topic_context_free
static void kafka_topic_context_free(void *p) /* {{{ */
{
struct kafka_topic_context *ctx = p;
if (ctx == NULL)
return;
if (ctx->topic_name != NULL)
sfree(ctx->topic_name);
if (ctx->topic != NULL)
rd_kafka_topic_destroy(ctx->topic);
if (ctx->conf != NULL)
rd_kafka_topic_conf_destroy(ctx->conf);
sfree(ctx);
} /* }}} void kafka_topic_context_free */
开发者ID:adanin,项目名称:collectd,代码行数:16,代码来源:write_kafka.c
示例18: legacy_consume_many
static void legacy_consume_many (char **topics, int topic_cnt, uint64_t testid){
rd_kafka_t *rk;
test_timing_t t_rkt_create;
int i;
rd_kafka_topic_t **rkts;
int msg_base = 0;
TEST_SAY(_C_MAG "%s\n" _C_CLR, __FUNCTION__);
test_conf_init(NULL, NULL, 60);
rk = test_create_consumer(NULL, NULL, NULL, NULL);
TEST_SAY("Creating %d topic objects\n", topic_cnt);
rkts = malloc(sizeof(*rkts) * topic_cnt);
TIMING_START(&t_rkt_create, "Topic object create");
for (i = 0 ; i < topic_cnt ; i++)
rkts[i] = test_create_topic_object(rk, topics[i], NULL);
TIMING_STOP(&t_rkt_create);
TEST_SAY("Start consumer for %d topics\n", topic_cnt);
for (i = 0 ; i < topic_cnt ; i++)
test_consumer_start("legacy", rkts[i], 0,
RD_KAFKA_OFFSET_BEGINNING);
TEST_SAY("Consuming from %d messages from each %d topics\n",
msgs_per_topic, topic_cnt);
for (i = 0 ; i < topic_cnt ; i++) {
test_consume_msgs("legacy", rkts[i], testid, 0, TEST_NO_SEEK,
msg_base, msgs_per_topic, 1);
msg_base += msgs_per_topic;
}
TEST_SAY("Stopping consumers\n");
for (i = 0 ; i < topic_cnt ; i++)
test_consumer_stop("legacy", rkts[i], 0);
TEST_SAY("Destroying %d topic objects\n", topic_cnt);
for (i = 0 ; i < topic_cnt ; i++)
rd_kafka_topic_destroy(rkts[i]);
free(rkts);
rd_kafka_destroy(rk);
}
开发者ID:eugpermar,项目名称:librdkafka,代码行数:47,代码来源:0042-many_topics.c
示例19: pooldestroy
Http::~Http() {
pooldestroy();
if (fpwUrl != NULL) {
fclose(fpwUrl);
}
if (fprArg != NULL) {
fclose(fprArg);
fclose(test);
}
curl_global_cleanup();
rd_kafka_consume_stop(kafka_consumer.rkt_, kafka_consumer.partition_);
rd_kafka_topic_destroy(kafka_consumer.rkt_);
rd_kafka_destroy(kafka_consumer.rk_);
pthread_cancel(kafkaInitId);
pthread_join(kafkaInitId, NULL);
}
开发者ID:91lilei,项目名称:work,代码行数:18,代码来源:HttpRequest.cpp
示例20: test_produce_msgs_easy
/**
* Create producer, produce \p msgcnt messages to \p topic \p partition,
* destroy consumer, and returns the used testid.
*/
uint64_t
test_produce_msgs_easy (const char *topic, uint64_t testid,
int32_t partition, int msgcnt) {
rd_kafka_t *rk;
rd_kafka_topic_t *rkt;
test_timing_t t_produce;
if (!testid)
testid = test_id_generate();
rk = test_create_producer();
rkt = test_create_producer_topic(rk, topic, NULL);
TIMING_START(&t_produce, "PRODUCE");
test_produce_msgs(rk, rkt, testid, partition, 0, msgcnt, NULL, 0);
TIMING_STOP(&t_produce);
rd_kafka_topic_destroy(rkt);
rd_kafka_destroy(rk);
return testid;
}
开发者ID:lindsay-show,项目名称:librdkafka,代码行数:24,代码来源:test.c
注:本文中的rd_kafka_topic_destroy函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论