本文整理汇总了Java中org.apache.kafka.test.TestUtils类的典型用法代码示例。如果您正苦于以下问题:Java TestUtils类的具体用法?Java TestUtils怎么用?Java TestUtils使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
TestUtils类属于org.apache.kafka.test包,在下文中一共展示了TestUtils类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: produceMessages
import org.apache.kafka.test.TestUtils; //导入依赖的package包/类
private void produceMessages(final long timestamp)
throws ExecutionException, InterruptedException {
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
streamOneInput,
Arrays.asList(
new KeyValue<>(1, "A"),
new KeyValue<>(2, "B"),
new KeyValue<>(3, "C"),
new KeyValue<>(4, "D"),
new KeyValue<>(5, "E")),
TestUtils.producerConfig(
CLUSTER.bootstrapServers(),
IntegerSerializer.class,
StringSerializer.class,
new Properties()),
timestamp);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:18,代码来源:KStreamAggregationIntegrationTest.java
示例2: commitInvalidOffsets
import org.apache.kafka.test.TestUtils; //导入依赖的package包/类
private void commitInvalidOffsets() {
final KafkaConsumer consumer = new KafkaConsumer(TestUtils.consumerConfig(
CLUSTER.bootstrapServers(),
streamsConfiguration.getProperty(StreamsConfig.APPLICATION_ID_CONFIG),
StringDeserializer.class,
StringDeserializer.class));
final Map<TopicPartition, OffsetAndMetadata> invalidOffsets = new HashMap<>();
invalidOffsets.put(new TopicPartition(TOPIC_1_2, 0), new OffsetAndMetadata(5, null));
invalidOffsets.put(new TopicPartition(TOPIC_2_2, 0), new OffsetAndMetadata(5, null));
invalidOffsets.put(new TopicPartition(TOPIC_A_2, 0), new OffsetAndMetadata(5, null));
invalidOffsets.put(new TopicPartition(TOPIC_C_2, 0), new OffsetAndMetadata(5, null));
invalidOffsets.put(new TopicPartition(TOPIC_Y_2, 0), new OffsetAndMetadata(5, null));
invalidOffsets.put(new TopicPartition(TOPIC_Z_2, 0), new OffsetAndMetadata(5, null));
consumer.commitSync(invalidOffsets);
consumer.close();
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:20,代码来源:KStreamsFineGrainedAutoResetIntegrationTest.java
示例3: waitUntilMinKeyValueRecordsReceived
import org.apache.kafka.test.TestUtils; //导入依赖的package包/类
/**
* Wait until enough data (key-value records) has been consumed.
*
* @param consumerConfig Kafka Consumer configuration
* @param topic Topic to consume from
* @param expectedNumRecords Minimum number of expected records
* @param waitTime Upper bound in waiting time in milliseconds
* @return All the records consumed, or null if no records are consumed
* @throws InterruptedException
* @throws AssertionError if the given wait time elapses
*/
public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(final Properties consumerConfig,
final String topic,
final int expectedNumRecords,
final long waitTime) throws InterruptedException {
final List<KeyValue<K, V>> accumData = new ArrayList<>();
try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) {
final TestCondition valuesRead = new TestCondition() {
@Override
public boolean conditionMet() {
final List<KeyValue<K, V>> readData =
readKeyValues(topic, consumer, waitTime, expectedNumRecords);
accumData.addAll(readData);
return accumData.size() >= expectedNumRecords;
}
};
final String conditionDetails =
"Expecting " + expectedNumRecords + " records from topic " + topic +
" while only received " + accumData.size() + ": " + accumData;
TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
}
return accumData;
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:34,代码来源:IntegrationTestUtils.java
示例4: before
import org.apache.kafka.test.TestUtils; //导入依赖的package包/类
@Before
public void before() throws InterruptedException {
testNo++;
String applicationId = "kstream-repartition-join-test-" + testNo;
builder = new KStreamBuilder();
createTopics();
streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
streamOne = builder.stream(Serdes.Long(), Serdes.Integer(), streamOneInput);
streamTwo = builder.stream(Serdes.Integer(), Serdes.String(), streamTwoInput);
streamFour = builder.stream(Serdes.Integer(), Serdes.String(), streamFourInput);
keyMapper = MockKeyValueMapper.SelectValueKeyValueMapper();
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:22,代码来源:KStreamRepartitionJoinTest.java
示例5: produceStreamTwoInputTo
import org.apache.kafka.test.TestUtils; //导入依赖的package包/类
private void produceStreamTwoInputTo(final String streamTwoInput)
throws ExecutionException, InterruptedException {
IntegrationTestUtils.produceKeyValuesSynchronously(
streamTwoInput,
Arrays.asList(
new KeyValue<>(1, "A"),
new KeyValue<>(2, "B"),
new KeyValue<>(3, "C"),
new KeyValue<>(4, "D"),
new KeyValue<>(5, "E")),
TestUtils.producerConfig(
CLUSTER.bootstrapServers(),
IntegerSerializer.class,
StringSerializer.class,
new Properties()),
mockTime);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:18,代码来源:KStreamRepartitionJoinTest.java
示例6: produceToStreamOne
import org.apache.kafka.test.TestUtils; //导入依赖的package包/类
private void produceToStreamOne()
throws ExecutionException, InterruptedException {
IntegrationTestUtils.produceKeyValuesSynchronously(
streamOneInput,
Arrays.asList(
new KeyValue<>(10L, 1),
new KeyValue<>(5L, 2),
new KeyValue<>(12L, 3),
new KeyValue<>(15L, 4),
new KeyValue<>(20L, 5),
new KeyValue<Long, Integer>(70L, null)), // nulls should be filtered
TestUtils.producerConfig(
CLUSTER.bootstrapServers(),
LongSerializer.class,
IntegerSerializer.class,
new Properties()),
mockTime);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:19,代码来源:KStreamRepartitionJoinTest.java
示例7: before
import org.apache.kafka.test.TestUtils; //导入依赖的package包/类
@Before
public void before() throws InterruptedException {
testNo++;
userClicksTopic = "user-clicks-" + testNo;
userRegionsTopic = "user-regions-" + testNo;
userRegionsStoreName = "user-regions-store-name-" + testNo;
outputTopic = "output-topic-" + testNo;
CLUSTER.createTopics(userClicksTopic, userRegionsTopic, outputTopic);
streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "join-integration-test-" + testNo);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getPath());
streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:22,代码来源:KStreamKTableJoinIntegrationTest.java
示例8: produceTopicValues
import org.apache.kafka.test.TestUtils; //导入依赖的package包/类
private void produceTopicValues(final String topic) throws java.util.concurrent.ExecutionException, InterruptedException {
IntegrationTestUtils.produceKeyValuesSynchronously(
topic,
Arrays.asList(
new KeyValue<>("a", 1L),
new KeyValue<>("b", 2L),
new KeyValue<>("c", 3L),
new KeyValue<>("d", 4L),
new KeyValue<>("e", 5L)),
TestUtils.producerConfig(
CLUSTER.bootstrapServers(),
StringSerializer.class,
LongSerializer.class,
new Properties()),
mockTime);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:17,代码来源:GlobalKTableIntegrationTest.java
示例9: produceGlobalTableValues
import org.apache.kafka.test.TestUtils; //导入依赖的package包/类
private void produceGlobalTableValues() throws java.util.concurrent.ExecutionException, InterruptedException {
IntegrationTestUtils.produceKeyValuesSynchronously(
globalOne,
Arrays.asList(
new KeyValue<>(1L, "F"),
new KeyValue<>(2L, "G"),
new KeyValue<>(3L, "H"),
new KeyValue<>(4L, "I"),
new KeyValue<>(5L, "J")),
TestUtils.producerConfig(
CLUSTER.bootstrapServers(),
LongSerializer.class,
StringSerializer.class,
new Properties()),
mockTime);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:17,代码来源:GlobalKTableIntegrationTest.java
示例10: prepareTest
import org.apache.kafka.test.TestUtils; //导入依赖的package包/类
private Properties prepareTest() throws Exception {
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + testNo);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
return streamsConfiguration;
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:20,代码来源:ResetIntegrationTest.java
示例11: prepareInputData
import org.apache.kafka.test.TestUtils; //导入依赖的package包/类
private void prepareInputData() throws Exception {
CLUSTER.deleteAndRecreateTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);
final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, StringSerializer.class);
mockTime.sleep(10);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "aaa")), producerConfig, mockTime.milliseconds());
mockTime.sleep(10);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "bbb")), producerConfig, mockTime.milliseconds());
mockTime.sleep(10);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "ccc")), producerConfig, mockTime.milliseconds());
mockTime.sleep(10);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "ddd")), producerConfig, mockTime.milliseconds());
mockTime.sleep(10);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "eee")), producerConfig, mockTime.milliseconds());
mockTime.sleep(10);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "fff")), producerConfig, mockTime.milliseconds());
mockTime.sleep(1);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "ggg")), producerConfig, mockTime.milliseconds());
mockTime.sleep(1);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "hhh")), producerConfig, mockTime.milliseconds());
mockTime.sleep(1);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "iii")), producerConfig, mockTime.milliseconds());
mockTime.sleep(1);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "jjj")), producerConfig, mockTime.milliseconds());
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:27,代码来源:ResetIntegrationTest.java
示例12: readResult
import org.apache.kafka.test.TestUtils; //导入依赖的package包/类
private List<KeyValue<Long, Long>> readResult(final int numberOfRecords,
final String groupId) throws Exception {
if (groupId != null) {
return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
TestUtils.consumerConfig(
CLUSTER.bootstrapServers(),
groupId,
LongDeserializer.class,
LongDeserializer.class,
new Properties() {
{
put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
}
}),
SINGLE_PARTITION_OUTPUT_TOPIC,
numberOfRecords
);
}
// read uncommitted
return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
TestUtils.consumerConfig(CLUSTER.bootstrapServers(), LongDeserializer.class, LongDeserializer.class),
SINGLE_PARTITION_OUTPUT_TOPIC,
numberOfRecords
);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:27,代码来源:EosIntegrationTest.java
示例13: testRead
import org.apache.kafka.test.TestUtils; //导入依赖的package包/类
/**
* Test a simple append and read.
*/
@Test
public void testRead() throws IOException {
FileRecords read = fileRecords.read(0, fileRecords.sizeInBytes());
TestUtils.checkEquals(fileRecords.batches(), read.batches());
List<RecordBatch> items = batches(read);
RecordBatch second = items.get(1);
read = fileRecords.read(second.sizeInBytes(), fileRecords.sizeInBytes());
assertEquals("Try a read starting from the second message",
items.subList(1, 3), batches(read));
read = fileRecords.read(second.sizeInBytes(), second.sizeInBytes());
assertEquals("Try a read of a single message starting from the second message",
Collections.singletonList(second), batches(read));
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:20,代码来源:FileRecordsTest.java
示例14: doAggregateSessionWindows
import org.apache.kafka.test.TestUtils; //导入依赖的package包/类
private void doAggregateSessionWindows(final Map<Windowed<String>, Integer> results) throws Exception {
driver = new KStreamTestDriver(builder, TestUtils.tempDirectory());
driver.setTime(10);
driver.process(TOPIC, "1", "1");
driver.setTime(15);
driver.process(TOPIC, "2", "2");
driver.setTime(30);
driver.process(TOPIC, "1", "1");
driver.setTime(70);
driver.process(TOPIC, "1", "1");
driver.setTime(90);
driver.process(TOPIC, "1", "1");
driver.setTime(100);
driver.process(TOPIC, "1", "1");
driver.flushState();
assertEquals(Integer.valueOf(2), results.get(new Windowed<>("1", new SessionWindow(10, 30))));
assertEquals(Integer.valueOf(1), results.get(new Windowed<>("2", new SessionWindow(15, 15))));
assertEquals(Integer.valueOf(3), results.get(new Windowed<>("1", new SessionWindow(70, 100))));
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:20,代码来源:KGroupedStreamImplTest.java
示例15: doCountSessionWindows
import org.apache.kafka.test.TestUtils; //导入依赖的package包/类
private void doCountSessionWindows(final Map<Windowed<String>, Long> results) throws Exception {
driver = new KStreamTestDriver(builder, TestUtils.tempDirectory());
driver.setTime(10);
driver.process(TOPIC, "1", "1");
driver.setTime(15);
driver.process(TOPIC, "2", "2");
driver.setTime(30);
driver.process(TOPIC, "1", "1");
driver.setTime(70);
driver.process(TOPIC, "1", "1");
driver.setTime(90);
driver.process(TOPIC, "1", "1");
driver.setTime(100);
driver.process(TOPIC, "1", "1");
driver.flushState();
assertEquals(Long.valueOf(2), results.get(new Windowed<>("1", new SessionWindow(10, 30))));
assertEquals(Long.valueOf(1), results.get(new Windowed<>("2", new SessionWindow(15, 15))));
assertEquals(Long.valueOf(3), results.get(new Windowed<>("1", new SessionWindow(70, 100))));
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:20,代码来源:KGroupedStreamImplTest.java
示例16: doReduceSessionWindows
import org.apache.kafka.test.TestUtils; //导入依赖的package包/类
private void doReduceSessionWindows(final Map<Windowed<String>, String> results) throws Exception {
driver = new KStreamTestDriver(builder, TestUtils.tempDirectory());
driver.setTime(10);
driver.process(TOPIC, "1", "A");
driver.setTime(15);
driver.process(TOPIC, "2", "Z");
driver.setTime(30);
driver.process(TOPIC, "1", "B");
driver.setTime(70);
driver.process(TOPIC, "1", "A");
driver.setTime(90);
driver.process(TOPIC, "1", "B");
driver.setTime(100);
driver.process(TOPIC, "1", "C");
driver.flushState();
assertEquals("A:B", results.get(new Windowed<>("1", new SessionWindow(10, 30))));
assertEquals("Z", results.get(new Windowed<>("2", new SessionWindow(15, 15))));
assertEquals("A:B:C", results.get(new Windowed<>("1", new SessionWindow(70, 100))));
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:20,代码来源:KGroupedStreamImplTest.java
示例17: doCountWindowed
import org.apache.kafka.test.TestUtils; //导入依赖的package包/类
private void doCountWindowed(final List<KeyValue<Windowed<String>, Long>> results) throws Exception {
driver = new KStreamTestDriver(builder, TestUtils.tempDirectory(), 0);
driver.setTime(0);
driver.process(TOPIC, "1", "A");
driver.process(TOPIC, "2", "B");
driver.process(TOPIC, "3", "C");
driver.setTime(500);
driver.process(TOPIC, "1", "A");
driver.process(TOPIC, "1", "A");
driver.process(TOPIC, "2", "B");
driver.process(TOPIC, "2", "B");
assertThat(results, equalTo(Arrays.asList(
KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 1L),
KeyValue.pair(new Windowed<>("2", new TimeWindow(0, 500)), 1L),
KeyValue.pair(new Windowed<>("3", new TimeWindow(0, 500)), 1L),
KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 1L),
KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 2L),
KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 1L),
KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 2L)
)));
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:22,代码来源:KGroupedStreamImplTest.java
示例18: testReadWrite
import org.apache.kafka.test.TestUtils; //导入依赖的package包/类
@Test
public void testReadWrite() throws IOException {
File f = TestUtils.tempFile();
OffsetCheckpoint checkpoint = new OffsetCheckpoint(f);
try {
Map<TopicPartition, Long> offsets = new HashMap<>();
offsets.put(new TopicPartition(topic, 0), 0L);
offsets.put(new TopicPartition(topic, 1), 1L);
offsets.put(new TopicPartition(topic, 2), 2L);
checkpoint.write(offsets);
assertEquals(offsets, checkpoint.read());
checkpoint.delete();
assertFalse(f.exists());
offsets.put(new TopicPartition(topic, 3), 3L);
checkpoint.write(offsets);
assertEquals(offsets, checkpoint.read());
} finally {
checkpoint.delete();
}
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:25,代码来源:OffsetCheckpointTest.java
示例19: shouldCreateLoggingEnabledStoreWhenWindowStoreLogged
import org.apache.kafka.test.TestUtils; //导入依赖的package包/类
@Test
public void shouldCreateLoggingEnabledStoreWhenWindowStoreLogged() throws Exception {
store = createStore(true, false);
final List<ProducerRecord> logged = new ArrayList<>();
final NoOpRecordCollector collector = new NoOpRecordCollector() {
@Override
public <K, V> void send(final String topic,
K key,
V value,
Integer partition,
Long timestamp,
Serializer<K> keySerializer,
Serializer<V> valueSerializer) {
logged.add(new ProducerRecord<K, V>(topic, partition, timestamp, key, value));
}
};
final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(),
Serdes.String(),
Serdes.String(),
collector,
cache);
context.setTime(1);
store.init(context, store);
store.put("a", "b");
assertFalse(logged.isEmpty());
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:27,代码来源:RocksDBWindowStoreSupplierTest.java
示例20: shouldNotBeLoggingEnabledStoreWhenLogginNotEnabled
import org.apache.kafka.test.TestUtils; //导入依赖的package包/类
@Test
public void shouldNotBeLoggingEnabledStoreWhenLogginNotEnabled() throws Exception {
store = createStore(false, false);
final List<ProducerRecord> logged = new ArrayList<>();
final NoOpRecordCollector collector = new NoOpRecordCollector() {
@Override
public <K, V> void send(final String topic,
K key,
V value,
Integer partition,
Long timestamp,
Serializer<K> keySerializer,
Serializer<V> valueSerializer) {
logged.add(new ProducerRecord<K, V>(topic, partition, timestamp, key, value));
}
};
final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(),
Serdes.String(),
Serdes.String(),
collector,
cache);
context.setTime(1);
store.init(context, store);
store.put("a", "b");
assertTrue(logged.isEmpty());
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:27,代码来源:RocksDBWindowStoreSupplierTest.java
注:本文中的org.apache.kafka.test.TestUtils类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论