本文整理汇总了Java中org.apache.kafka.clients.producer.internals.DefaultPartitioner类的典型用法代码示例。如果您正苦于以下问题:Java DefaultPartitioner类的具体用法?Java DefaultPartitioner怎么用?Java DefaultPartitioner使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
DefaultPartitioner类属于org.apache.kafka.clients.producer.internals包,在下文中一共展示了DefaultPartitioner类的17个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: testPartitionSpread
import org.apache.kafka.clients.producer.internals.DefaultPartitioner; //导入依赖的package包/类
@Test
public void testPartitionSpread() throws Exception {
Multiset<Integer> results = TreeMultiset.create();
Cluster c = Cluster.empty();
try (Partitioner p = new DefaultPartitioner()) {
PartitionKeyGenerator pkg = new PartitionKeyGenerator();
mockPartitions(c);
for (int i = 0; i < messages; i++) {
results.add(p.partition("test", null, pkg.next(), null, null, c));
}
int expected = messages / partitions;
double threshold = expected * 0.05;
for (Multiset.Entry<Integer> e : results.entrySet()) {
int offBy = Math.abs(e.getCount() - expected);
assertTrue("Partition " + e.getElement() + " had " + e.getCount() + " elements, expected " + expected + ", threshold is " + threshold,
offBy < threshold);
}
}
}
开发者ID:opentable,项目名称:otj-logging,代码行数:24,代码来源:PartitionSpreadTest.java
示例2: testStreamPartitioner
import org.apache.kafka.clients.producer.internals.DefaultPartitioner; //导入依赖的package包/类
@Test
public void testStreamPartitioner() {
final RecordCollectorImpl collector = new RecordCollectorImpl(
new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer),
"RecordCollectorTest-TestStreamPartitioner");
collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
collector.send("topic1", "9", "0", null, stringSerializer, stringSerializer, streamPartitioner);
collector.send("topic1", "27", "0", null, stringSerializer, stringSerializer, streamPartitioner);
collector.send("topic1", "81", "0", null, stringSerializer, stringSerializer, streamPartitioner);
collector.send("topic1", "243", "0", null, stringSerializer, stringSerializer, streamPartitioner);
collector.send("topic1", "28", "0", null, stringSerializer, stringSerializer, streamPartitioner);
collector.send("topic1", "82", "0", null, stringSerializer, stringSerializer, streamPartitioner);
collector.send("topic1", "244", "0", null, stringSerializer, stringSerializer, streamPartitioner);
collector.send("topic1", "245", "0", null, stringSerializer, stringSerializer, streamPartitioner);
final Map<TopicPartition, Long> offsets = collector.offsets();
assertEquals((Long) 4L, offsets.get(new TopicPartition("topic1", 0)));
assertEquals((Long) 2L, offsets.get(new TopicPartition("topic1", 1)));
assertEquals((Long) 0L, offsets.get(new TopicPartition("topic1", 2)));
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:26,代码来源:RecordCollectorTest.java
示例3: visible
import org.apache.kafka.clients.producer.internals.DefaultPartitioner; //导入依赖的package包/类
@Override
public boolean visible(String name, Map<String, Object> connectorConfigs) {
String partitionerName = (String) connectorConfigs.get(PARTITIONER_CLASS_CONFIG);
try {
@SuppressWarnings("unchecked")
Class<? extends Partitioner> partitioner = (Class<? extends Partitioner>) Class.forName(partitionerName);
if (classNameEquals(partitionerName, DefaultPartitioner.class)) {
return false;
} else if (FieldPartitioner.class.isAssignableFrom(partitioner)) {
// subclass of FieldPartitioner
return name.equals(PARTITION_FIELD_NAME_CONFIG);
} else if (TimeBasedPartitioner.class.isAssignableFrom(partitioner)) {
// subclass of TimeBasedPartitioner
if (classNameEquals(partitionerName, DailyPartitioner.class) || classNameEquals(partitionerName, HourlyPartitioner.class)) {
return name.equals(LOCALE_CONFIG) || name.equals(TIMEZONE_CONFIG);
} else {
return name.equals(PARTITION_DURATION_MS_CONFIG) || name.equals(PATH_FORMAT_CONFIG) || name.equals(LOCALE_CONFIG) || name.equals(TIMEZONE_CONFIG);
}
} else {
throw new ConfigException("Not a valid partitioner class: " + partitionerName);
}
} catch (ClassNotFoundException e) {
throw new ConfigException("Partitioner class not found: " + partitionerName);
}
}
开发者ID:qubole,项目名称:streamx,代码行数:26,代码来源:HdfsSinkConnectorConfig.java
示例4: testCopartitioning
import org.apache.kafka.clients.producer.internals.DefaultPartitioner; //导入依赖的package包/类
@Test
public void testCopartitioning() {
Random rand = new Random();
DefaultPartitioner defaultPartitioner = new DefaultPartitioner();
WindowedSerializer<Integer> windowedSerializer = new WindowedSerializer<>(intSerializer);
WindowedStreamPartitioner<Integer, String> streamPartitioner = new WindowedStreamPartitioner<>(topicName, windowedSerializer);
for (int k = 0; k < 10; k++) {
Integer key = rand.nextInt();
byte[] keyBytes = intSerializer.serialize(topicName, key);
String value = key.toString();
byte[] valueBytes = stringSerializer.serialize(topicName, value);
Integer expected = defaultPartitioner.partition("topic", key, keyBytes, value, valueBytes, cluster);
for (int w = 1; w < 10; w++) {
TimeWindow window = new TimeWindow(10 * w, 20 * w);
Windowed<Integer> windowedKey = new Windowed<>(key, window);
Integer actual = streamPartitioner.partition(windowedKey, value, infos.size());
assertEquals(expected, actual);
}
}
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:30,代码来源:WindowedStreamPartitionerTest.java
示例5: testSpecificPartition
import org.apache.kafka.clients.producer.internals.DefaultPartitioner; //导入依赖的package包/类
@Test
public void testSpecificPartition() {
final RecordCollectorImpl collector = new RecordCollectorImpl(
new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer),
"RecordCollectorTest-TestSpecificPartition");
collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer);
collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer);
collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer);
collector.send("topic1", "999", "0", 1, null, stringSerializer, stringSerializer);
collector.send("topic1", "999", "0", 1, null, stringSerializer, stringSerializer);
collector.send("topic1", "999", "0", 2, null, stringSerializer, stringSerializer);
final Map<TopicPartition, Long> offsets = collector.offsets();
assertEquals((Long) 2L, offsets.get(new TopicPartition("topic1", 0)));
assertEquals((Long) 1L, offsets.get(new TopicPartition("topic1", 1)));
assertEquals((Long) 0L, offsets.get(new TopicPartition("topic1", 2)));
// ignore StreamPartitioner
collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer);
collector.send("topic1", "999", "0", 1, null, stringSerializer, stringSerializer);
collector.send("topic1", "999", "0", 2, null, stringSerializer, stringSerializer);
assertEquals((Long) 3L, offsets.get(new TopicPartition("topic1", 0)));
assertEquals((Long) 2L, offsets.get(new TopicPartition("topic1", 1)));
assertEquals((Long) 1L, offsets.get(new TopicPartition("topic1", 2)));
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:32,代码来源:RecordCollectorTest.java
示例6: shouldRetryWhenTimeoutExceptionOccursOnSend
import org.apache.kafka.clients.producer.internals.DefaultPartitioner; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Test
public void shouldRetryWhenTimeoutExceptionOccursOnSend() throws Exception {
final AtomicInteger attempt = new AtomicInteger(0);
final RecordCollectorImpl collector = new RecordCollectorImpl(
new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@Override
public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
if (attempt.getAndIncrement() == 0) {
throw new TimeoutException();
}
return super.send(record, callback);
}
},
"test");
collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
final Long offset = collector.offsets().get(new TopicPartition("topic1", 0));
assertEquals(Long.valueOf(0L), offset);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:21,代码来源:RecordCollectorTest.java
示例7: shouldThrowStreamsExceptionAfterMaxAttempts
import org.apache.kafka.clients.producer.internals.DefaultPartitioner; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Test(expected = StreamsException.class)
public void shouldThrowStreamsExceptionAfterMaxAttempts() throws Exception {
final RecordCollector collector = new RecordCollectorImpl(
new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@Override
public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
throw new TimeoutException();
}
},
"test");
collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:16,代码来源:RecordCollectorTest.java
示例8: shouldThrowStreamsExceptionOnSubsequentCallIfASendFails
import org.apache.kafka.clients.producer.internals.DefaultPartitioner; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Test(expected = StreamsException.class)
public void shouldThrowStreamsExceptionOnSubsequentCallIfASendFails() throws Exception {
final RecordCollector collector = new RecordCollectorImpl(
new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@Override
public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
callback.onCompletion(null, new Exception());
return null;
}
},
"test");
collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:16,代码来源:RecordCollectorTest.java
示例9: shouldThrowStreamsExceptionOnFlushIfASendFailed
import org.apache.kafka.clients.producer.internals.DefaultPartitioner; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Test(expected = StreamsException.class)
public void shouldThrowStreamsExceptionOnFlushIfASendFailed() throws Exception {
final RecordCollector collector = new RecordCollectorImpl(
new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@Override
public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
callback.onCompletion(null, new Exception());
return null;
}
},
"test");
collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
collector.flush();
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:16,代码来源:RecordCollectorTest.java
示例10: shouldThrowStreamsExceptionOnCloseIfASendFailed
import org.apache.kafka.clients.producer.internals.DefaultPartitioner; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Test(expected = StreamsException.class)
public void shouldThrowStreamsExceptionOnCloseIfASendFailed() throws Exception {
final RecordCollector collector = new RecordCollectorImpl(
new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@Override
public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
callback.onCompletion(null, new Exception());
return null;
}
},
"test");
collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
collector.close();
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:16,代码来源:RecordCollectorTest.java
示例11: shouldThrowIfTopicIsUnknown
import org.apache.kafka.clients.producer.internals.DefaultPartitioner; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Test(expected = StreamsException.class)
public void shouldThrowIfTopicIsUnknown() {
final RecordCollector collector = new RecordCollectorImpl(
new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@Override
public List<PartitionInfo> partitionsFor(final String topic) {
return Collections.EMPTY_LIST;
}
},
"test");
collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:15,代码来源:RecordCollectorTest.java
示例12: testPartitioner
import org.apache.kafka.clients.producer.internals.DefaultPartitioner; //导入依赖的package包/类
@Test
public void testPartitioner() throws Exception {
PartitionInfo partitionInfo0 = new PartitionInfo(topic, 0, null, null, null);
PartitionInfo partitionInfo1 = new PartitionInfo(topic, 1, null, null, null);
Cluster cluster = new Cluster(null, new ArrayList<Node>(0), asList(partitionInfo0, partitionInfo1),
Collections.<String>emptySet(), Collections.<String>emptySet());
MockProducer<String, String> producer = new MockProducer<>(cluster, true, new DefaultPartitioner(), new StringSerializer(), new StringSerializer());
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "key", "value");
Future<RecordMetadata> metadata = producer.send(record);
assertEquals("Partition should be correct", 1, metadata.get().partition());
producer.clear();
assertEquals("Clear should erase our history", 0, producer.history().size());
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:14,代码来源:MockProducerTest.java
示例13: testPartitioner
import org.apache.kafka.clients.producer.internals.DefaultPartitioner; //导入依赖的package包/类
@Test
public void testPartitioner() throws Exception {
PartitionInfo partitionInfo0 = new PartitionInfo(topic, 0, null, null, null);
PartitionInfo partitionInfo1 = new PartitionInfo(topic, 1, null, null, null);
Cluster cluster = new Cluster(new ArrayList<Node>(0), asList(partitionInfo0, partitionInfo1), Collections.<String>emptySet());
MockProducer<String, String> producer = new MockProducer<String, String>(cluster, true, new DefaultPartitioner(), new StringSerializer(), new StringSerializer());
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "key", "value");
Future<RecordMetadata> metadata = producer.send(record);
assertEquals("Partition should be correct", 1, metadata.get().partition());
producer.clear();
assertEquals("Clear should erase our history", 0, producer.history().size());
}
开发者ID:txazo,项目名称:kafka,代码行数:13,代码来源:MockProducerTest.java
示例14: producerConfigs
import org.apache.kafka.clients.producer.internals.DefaultPartitioner; //导入依赖的package包/类
@Bean
public Map<String, Object> producerConfigs() {
//FIXME: 12factorize
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerList());
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.ACKS_CONFIG, acks);
//props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
//props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
//props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
开发者ID:Eventasia,项目名称:eventasia,代码行数:16,代码来源:EventasiaKafkaConfig.java
示例15: DefaultStreamPartitioner
import org.apache.kafka.clients.producer.internals.DefaultPartitioner; //导入依赖的package包/类
public DefaultStreamPartitioner(final Serializer<K> keySerializer, final Cluster cluster, final String topic) {
this.keySerializer = keySerializer;
this.cluster = cluster;
this.topic = topic;
this.defaultPartitioner = new DefaultPartitioner();
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:7,代码来源:DefaultStreamPartitioner.java
示例16: MockProducer
import org.apache.kafka.clients.producer.internals.DefaultPartitioner; //导入依赖的package包/类
/**
* Create a new mock producer with invented metadata the given autoComplete setting and key\value serializers.
*
* Equivalent to {@link #MockProducer(Cluster, boolean, Partitioner, Serializer, Serializer)} new MockProducer(Cluster.empty(), autoComplete, new DefaultPartitioner(), keySerializer, valueSerializer)}
*/
public MockProducer(final boolean autoComplete,
final Serializer<K> keySerializer,
final Serializer<V> valueSerializer) {
this(Cluster.empty(), autoComplete, new DefaultPartitioner(), keySerializer, valueSerializer);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:11,代码来源:MockProducer.java
示例17: MockProducer
import org.apache.kafka.clients.producer.internals.DefaultPartitioner; //导入依赖的package包/类
/**
* Create a new mock producer with invented metadata the given autoComplete setting and key\value serializers
*
* Equivalent to {@link #MockProducer(Cluster, boolean, Partitioner, Serializer, Serializer)} new MockProducer(Cluster.empty(), autoComplete, new DefaultPartitioner(), keySerializer, valueSerializer)}
*/
public MockProducer(boolean autoComplete, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
this(Cluster.empty(), autoComplete, new DefaultPartitioner(), keySerializer, valueSerializer);
}
开发者ID:txazo,项目名称:kafka,代码行数:9,代码来源:MockProducer.java
注:本文中的org.apache.kafka.clients.producer.internals.DefaultPartitioner类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论