本文整理汇总了Java中org.apache.kafka.connect.source.SourceTaskContext类的典型用法代码示例。如果您正苦于以下问题:Java SourceTaskContext类的具体用法?Java SourceTaskContext怎么用?Java SourceTaskContext使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
SourceTaskContext类属于org.apache.kafka.connect.source包,在下文中一共展示了SourceTaskContext类的17个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: before
import org.apache.kafka.connect.source.SourceTaskContext; //导入依赖的package包/类
@BeforeEach
public void before() {
this.sourceTaskContext = mock(SourceTaskContext.class);
this.offsetStorageReader = mock(OffsetStorageReader.class);
when(this.sourceTaskContext.offsetStorageReader()).thenReturn(this.offsetStorageReader);
this.task = new KinesisSourceTask();
this.task.initialize(this.sourceTaskContext);
this.kinesisClient = mock(AmazonKinesis.class);
this.task.time = mock(Time.class);
this.task.kinesisClientFactory = mock(KinesisClientFactory.class);
when(this.task.kinesisClientFactory.create(any())).thenReturn(this.kinesisClient);
this.settings = TestData.settings();
this.config = new KinesisSourceConnectorConfig(this.settings);
}
开发者ID:jcustenborder,项目名称:kafka-connect-kinesis,代码行数:17,代码来源:KinesisSourceTaskTest.java
示例2: setup
import org.apache.kafka.connect.source.SourceTaskContext; //导入依赖的package包/类
@Before
public void setup() throws IOException, SQLException {
String mysqlHost = "10.100.172.86";
connection = DriverManager.getConnection("jdbc:mysql://" + mysqlHost + ":3306/mysql", "root", "passwd");
config = new HashMap<>();
config.put(MySqlSourceConnector.USER_CONFIG, "maxwell");
config.put(MySqlSourceConnector.PASSWORD_CONFIG, "XXXXXX");
config.put(MySqlSourceConnector.PORT_CONFIG, "3306");
config.put(MySqlSourceConnector.HOST_CONFIG, mysqlHost);
task = new MySqlSourceTask();
offsetStorageReader = PowerMock.createMock(OffsetStorageReader.class);
context = PowerMock.createMock(SourceTaskContext.class);
task.initialize(context);
runSql("drop table if exists test.users");
runSql("drop database if exists test");
}
开发者ID:wushujames,项目名称:kafka-mysql-connector,代码行数:20,代码来源:MySqlSourceTaskTest.java
示例3: setup
import org.apache.kafka.connect.source.SourceTaskContext; //导入依赖的package包/类
@Before
public void setup() throws IOException {
tempFile = File.createTempFile("file-stream-source-task-test", null);
config = new HashMap<>();
config.put(FileStreamSourceConnector.FILE_CONFIG, tempFile.getAbsolutePath());
config.put(FileStreamSourceConnector.TOPIC_CONFIG, TOPIC);
task = new FileStreamSourceTask();
offsetStorageReader = PowerMock.createMock(OffsetStorageReader.class);
context = PowerMock.createMock(SourceTaskContext.class);
task.initialize(context);
}
开发者ID:wngn123,项目名称:wngn-jms-kafka,代码行数:12,代码来源:FileStreamSourceTaskTest.java
示例4: testPollsInBackground
import org.apache.kafka.connect.source.SourceTaskContext; //导入依赖的package包/类
@Test
public void testPollsInBackground() throws Exception {
createWorkerTask();
sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
EasyMock.expectLastCall();
sourceTask.start(TASK_PROPS);
EasyMock.expectLastCall();
statusListener.onStartup(taskId);
EasyMock.expectLastCall();
final CountDownLatch pollLatch = expectPolls(10);
// In this test, we don't flush, so nothing goes any further than the offset writer
sourceTask.stop();
EasyMock.expectLastCall();
expectOffsetFlush(true);
statusListener.onShutdown(taskId);
EasyMock.expectLastCall();
producer.close(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
EasyMock.expectLastCall();
transformationChain.close();
EasyMock.expectLastCall();
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
Future<?> taskFuture = executor.submit(workerTask);
assertTrue(awaitLatch(pollLatch));
workerTask.stop();
assertTrue(workerTask.awaitStop(1000));
taskFuture.get();
PowerMock.verifyAll();
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:41,代码来源:WorkerSourceTaskTest.java
示例5: initTask
import org.apache.kafka.connect.source.SourceTaskContext; //导入依赖的package包/类
@Before
public void initTask() {
task = new FsSourceTask();
taskConfig = new HashMap<String, String>() {{
String uris[] = directories.stream().map(dir -> dir.toString())
.toArray(size -> new String[size]);
put(FsSourceTaskConfig.FS_URIS, String.join(",", uris));
put(FsSourceTaskConfig.TOPIC, "topic_test");
put(FsSourceTaskConfig.POLICY_CLASS, SimplePolicy.class.getName());
put(FsSourceTaskConfig.FILE_READER_CLASS, TextFileReader.class.getName());
put(FsSourceTaskConfig.POLICY_REGEXP, "^[0-9]*\\.txt$");
}};
//Mock initialization
taskContext = PowerMock.createMock(SourceTaskContext.class);
offsetStorageReader = PowerMock.createMock(OffsetStorageReader.class);
EasyMock.expect(taskContext.offsetStorageReader())
.andReturn(offsetStorageReader);
EasyMock.expect(taskContext.offsetStorageReader())
.andReturn(offsetStorageReader);
EasyMock.expect(offsetStorageReader.offset(EasyMock.anyObject()))
.andReturn(new HashMap<String, Object>() {{
put("offset", 5L);
}});
EasyMock.expect(offsetStorageReader.offset(EasyMock.anyObject()))
.andReturn(new HashMap<String, Object>() {{
put("offset", 5L);
}});
EasyMock.checkOrder(taskContext, false);
EasyMock.replay(taskContext);
EasyMock.checkOrder(offsetStorageReader, false);
EasyMock.replay(offsetStorageReader);
task.initialize(taskContext);
}
开发者ID:mmolimar,项目名称:kafka-connect-fs,代码行数:42,代码来源:FsSourceTaskTestBase.java
示例6: testPause
import org.apache.kafka.connect.source.SourceTaskContext; //导入依赖的package包/类
@Test
public void testPause() throws Exception {
createWorkerTask();
sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
EasyMock.expectLastCall();
sourceTask.start(TASK_PROPS);
EasyMock.expectLastCall();
statusListener.onStartup(taskId);
EasyMock.expectLastCall();
AtomicInteger count = new AtomicInteger(0);
CountDownLatch pollLatch = expectPolls(10, count);
// In this test, we don't flush, so nothing goes any further than the offset writer
statusListener.onPause(taskId);
EasyMock.expectLastCall();
sourceTask.stop();
EasyMock.expectLastCall();
expectOffsetFlush(true);
statusListener.onShutdown(taskId);
EasyMock.expectLastCall();
producer.close(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
EasyMock.expectLastCall();
transformationChain.close();
EasyMock.expectLastCall();
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
Future<?> taskFuture = executor.submit(workerTask);
assertTrue(awaitLatch(pollLatch));
workerTask.transitionTo(TargetState.PAUSED);
int priorCount = count.get();
Thread.sleep(100);
// since the transition is observed asynchronously, the count could be off by one loop iteration
assertTrue(count.get() - priorCount <= 1);
workerTask.stop();
assertTrue(workerTask.awaitStop(1000));
taskFuture.get();
PowerMock.verifyAll();
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:53,代码来源:WorkerSourceTaskTest.java
示例7: testFailureInPoll
import org.apache.kafka.connect.source.SourceTaskContext; //导入依赖的package包/类
@Test
public void testFailureInPoll() throws Exception {
createWorkerTask();
sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
EasyMock.expectLastCall();
sourceTask.start(TASK_PROPS);
EasyMock.expectLastCall();
statusListener.onStartup(taskId);
EasyMock.expectLastCall();
final CountDownLatch pollLatch = new CountDownLatch(1);
final RuntimeException exception = new RuntimeException();
EasyMock.expect(sourceTask.poll()).andAnswer(new IAnswer<List<SourceRecord>>() {
@Override
public List<SourceRecord> answer() throws Throwable {
pollLatch.countDown();
throw exception;
}
});
statusListener.onFailure(taskId, exception);
EasyMock.expectLastCall();
sourceTask.stop();
EasyMock.expectLastCall();
expectOffsetFlush(true);
producer.close(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
EasyMock.expectLastCall();
transformationChain.close();
EasyMock.expectLastCall();
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
Future<?> taskFuture = executor.submit(workerTask);
assertTrue(awaitLatch(pollLatch));
workerTask.stop();
assertTrue(workerTask.awaitStop(1000));
taskFuture.get();
PowerMock.verifyAll();
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:48,代码来源:WorkerSourceTaskTest.java
示例8: testCommit
import org.apache.kafka.connect.source.SourceTaskContext; //导入依赖的package包/类
@Test
public void testCommit() throws Exception {
// Test that the task commits properly when prompted
createWorkerTask();
sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
EasyMock.expectLastCall();
sourceTask.start(TASK_PROPS);
EasyMock.expectLastCall();
statusListener.onStartup(taskId);
EasyMock.expectLastCall();
// We'll wait for some data, then trigger a flush
final CountDownLatch pollLatch = expectPolls(1);
expectOffsetFlush(true);
sourceTask.stop();
EasyMock.expectLastCall();
expectOffsetFlush(true);
statusListener.onShutdown(taskId);
EasyMock.expectLastCall();
producer.close(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
EasyMock.expectLastCall();
transformationChain.close();
EasyMock.expectLastCall();
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
Future<?> taskFuture = executor.submit(workerTask);
assertTrue(awaitLatch(pollLatch));
assertTrue(workerTask.commitOffsets());
workerTask.stop();
assertTrue(workerTask.awaitStop(1000));
taskFuture.get();
PowerMock.verifyAll();
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:44,代码来源:WorkerSourceTaskTest.java
示例9: testCommitFailure
import org.apache.kafka.connect.source.SourceTaskContext; //导入依赖的package包/类
@Test
public void testCommitFailure() throws Exception {
// Test that the task commits properly when prompted
createWorkerTask();
sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
EasyMock.expectLastCall();
sourceTask.start(TASK_PROPS);
EasyMock.expectLastCall();
statusListener.onStartup(taskId);
EasyMock.expectLastCall();
// We'll wait for some data, then trigger a flush
final CountDownLatch pollLatch = expectPolls(1);
expectOffsetFlush(true);
sourceTask.stop();
EasyMock.expectLastCall();
expectOffsetFlush(false);
statusListener.onShutdown(taskId);
EasyMock.expectLastCall();
producer.close(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
EasyMock.expectLastCall();
transformationChain.close();
EasyMock.expectLastCall();
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
Future<?> taskFuture = executor.submit(workerTask);
assertTrue(awaitLatch(pollLatch));
assertTrue(workerTask.commitOffsets());
workerTask.stop();
assertTrue(workerTask.awaitStop(1000));
taskFuture.get();
PowerMock.verifyAll();
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:44,代码来源:WorkerSourceTaskTest.java
示例10: testSlowTaskStart
import org.apache.kafka.connect.source.SourceTaskContext; //导入依赖的package包/类
@Test
public void testSlowTaskStart() throws Exception {
final CountDownLatch startupLatch = new CountDownLatch(1);
final CountDownLatch finishStartupLatch = new CountDownLatch(1);
createWorkerTask();
sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
EasyMock.expectLastCall();
sourceTask.start(TASK_PROPS);
EasyMock.expectLastCall().andAnswer(new IAnswer<Object>() {
@Override
public Object answer() throws Throwable {
startupLatch.countDown();
assertTrue(awaitLatch(finishStartupLatch));
return null;
}
});
statusListener.onStartup(taskId);
EasyMock.expectLastCall();
sourceTask.stop();
EasyMock.expectLastCall();
expectOffsetFlush(true);
statusListener.onShutdown(taskId);
EasyMock.expectLastCall();
producer.close(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
EasyMock.expectLastCall();
transformationChain.close();
EasyMock.expectLastCall();
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
Future<?> workerTaskFuture = executor.submit(workerTask);
// Stopping immediately while the other thread has work to do should result in no polling, no offset commits,
// exiting the work thread immediately, and the stop() method will be invoked in the background thread since it
// cannot be invoked immediately in the thread trying to stop the task.
assertTrue(awaitLatch(startupLatch));
workerTask.stop();
finishStartupLatch.countDown();
assertTrue(workerTask.awaitStop(1000));
workerTaskFuture.get();
PowerMock.verifyAll();
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:53,代码来源:WorkerSourceTaskTest.java
示例11: poll
import org.apache.kafka.connect.source.SourceTaskContext; //导入依赖的package包/类
protected void poll(final String packageName, TestCase testCase) throws InterruptedException, IOException {
String keySchemaConfig = ObjectMapperFactory.INSTANCE.writeValueAsString(testCase.keySchema);
String valueSchemaConfig = ObjectMapperFactory.INSTANCE.writeValueAsString(testCase.valueSchema);
Map<String, String> settings = Maps.newLinkedHashMap();
settings.put(SpoolDirSourceConnectorConfig.INPUT_PATH_CONFIG, this.inputPath.getAbsolutePath());
settings.put(SpoolDirSourceConnectorConfig.FINISHED_PATH_CONFIG, this.finishedPath.getAbsolutePath());
settings.put(SpoolDirSourceConnectorConfig.ERROR_PATH_CONFIG, this.errorPath.getAbsolutePath());
settings.put(SpoolDirSourceConnectorConfig.INPUT_FILE_PATTERN_CONF, String.format("^.*\\.%s", packageName));
settings.put(SpoolDirSourceConnectorConfig.TOPIC_CONF, "testing");
settings.put(SpoolDirSourceConnectorConfig.KEY_SCHEMA_CONF, keySchemaConfig);
settings.put(SpoolDirSourceConnectorConfig.VALUE_SCHEMA_CONF, valueSchemaConfig);
settings.put(SpoolDirSourceConnectorConfig.EMPTY_POLL_WAIT_MS_CONF, "10");
settings(settings);
if (null != testCase.settings && !testCase.settings.isEmpty()) {
settings.putAll(testCase.settings);
}
this.task = createTask();
SourceTaskContext sourceTaskContext = mock(SourceTaskContext.class);
OffsetStorageReader offsetStorageReader = mock(OffsetStorageReader.class);
when(offsetStorageReader.offset(anyMap())).thenReturn(testCase.offset);
when(sourceTaskContext.offsetStorageReader()).thenReturn(offsetStorageReader);
this.task.initialize(sourceTaskContext);
this.task.start(settings);
String dataFile = new File(packageName, Files.getNameWithoutExtension(testCase.path.toString())) + ".data";
log.trace("poll(String, TestCase) - dataFile={}", dataFile);
String inputFileName = String.format("%s.%s",
Files.getNameWithoutExtension(testCase.path.toString()),
packageName
);
final File inputFile = new File(this.inputPath, inputFileName);
log.trace("poll(String, TestCase) - inputFile = {}", inputFile);
final File processingFile = this.task.processingFile(inputFile);
try (InputStream inputStream = this.getClass().getResourceAsStream(dataFile)) {
try (OutputStream outputStream = new FileOutputStream(inputFile)) {
ByteStreams.copy(inputStream, outputStream);
}
}
assertFalse(processingFile.exists(), String.format("processingFile %s should not exist before first poll().", processingFile));
assertTrue(inputFile.exists(), String.format("inputFile %s should exist.", inputFile));
List<SourceRecord> records = this.task.poll();
assertTrue(inputFile.exists(), String.format("inputFile %s should exist after first poll().", inputFile));
assertTrue(processingFile.exists(), String.format("processingFile %s should exist after first poll().", processingFile));
assertNotNull(records, "records should not be null.");
assertFalse(records.isEmpty(), "records should not be empty");
assertEquals(testCase.expected.size(), records.size(), "records.size() does not match.");
for (int i = 0; i < testCase.expected.size(); i++) {
SourceRecord expectedRecord = testCase.expected.get(i);
SourceRecord actualRecord = records.get(i);
assertSourceRecord(expectedRecord, actualRecord, String.format("index:%s", i));
}
records = this.task.poll();
assertTrue(records.isEmpty(), "records should be null after first poll.");
records = this.task.poll();
assertTrue(records.isEmpty(), "records should be null after first poll.");
assertFalse(inputFile.exists(), String.format("inputFile %s should not exist.", inputFile));
assertFalse(processingFile.exists(), String.format("processingFile %s should not exist.", processingFile));
assertTrue(records.isEmpty(), "records should be empty.");
final File finishedFile = new File(this.finishedPath, inputFileName);
assertTrue(finishedFile.exists(), String.format("finishedFile %s should exist.", finishedFile));
}
开发者ID:jcustenborder,项目名称:kafka-connect-spooldir,代码行数:76,代码来源:SpoolDirSourceTaskTest.java
示例12: initialize
import org.apache.kafka.connect.source.SourceTaskContext; //导入依赖的package包/类
@Override
public void initialize(SourceTaskContext context) {
super.initialize(context);
LOG.info("AMQP source task initialized");
}
开发者ID:ppatierno,项目名称:kafka-connect-amqp,代码行数:7,代码来源:AmqpSourceTask.java
示例13: setUp
import org.apache.kafka.connect.source.SourceTaskContext; //导入依赖的package包/类
@Override
public void setUp() {
offsets = new HashMap<>();
totalWrittenDocuments = 0;
try {
super.setUp();
mongodStarter = MongodStarter.getDefaultInstance();
mongodConfig = new MongodConfigBuilder()
.version(Version.Main.V3_2)
.replication(new Storage(REPLICATION_PATH, "rs0", 1024))
.net(new Net(12345, Network.localhostIsIPv6()))
.build();
mongodExecutable = mongodStarter.prepare(mongodConfig);
mongod = mongodExecutable.start();
mongoClient = new MongoClient(new ServerAddress("localhost", 12345));
MongoDatabase adminDatabase = mongoClient.getDatabase("admin");
BasicDBObject replicaSetSetting = new BasicDBObject();
replicaSetSetting.put("_id", "rs0");
BasicDBList members = new BasicDBList();
DBObject host = new BasicDBObject();
host.put("_id", 0);
host.put("host", "127.0.0.1:12345");
members.add(host);
replicaSetSetting.put("members", members);
adminDatabase.runCommand(new BasicDBObject("isMaster", 1));
adminDatabase.runCommand(new BasicDBObject("replSetInitiate", replicaSetSetting));
MongoDatabase db = mongoClient.getDatabase("mydb");
db.createCollection("test1");
db.createCollection("test2");
db.createCollection("test3");
} catch (Exception e) {
// Assert.assertTrue(false);
}
task = new MongodbSourceTask();
offsetStorageReader = PowerMock.createMock(OffsetStorageReader.class);
context = PowerMock.createMock(SourceTaskContext.class);
task.initialize(context);
sourceProperties = new HashMap<>();
sourceProperties.put("uri", "mongodb://localhost:12345");
sourceProperties.put("batch.size", Integer.toString(100));
sourceProperties.put("schema.name", "schema");
sourceProperties.put("topic.prefix", "prefix");
sourceProperties.put("databases", "mydb.test1,mydb.test2,mydb.test3");
}
开发者ID:DataReply,项目名称:kafka-connect-mongodb,代码行数:50,代码来源:MongodbSourceUriTaskTest.java
示例14: setUp
import org.apache.kafka.connect.source.SourceTaskContext; //导入依赖的package包/类
@Override
public void setUp() {
offsets = new HashMap<>();
totalWrittenDocuments = 0;
try {
super.setUp();
mongodStarter = MongodStarter.getDefaultInstance();
mongodConfig = new MongodConfigBuilder()
.version(Version.Main.V3_2)
.replication(new Storage(REPLICATION_PATH, "rs0", 1024))
.net(new Net(12345, Network.localhostIsIPv6()))
.build();
mongodExecutable = mongodStarter.prepare(mongodConfig);
mongod = mongodExecutable.start();
mongoClient = new MongoClient(new ServerAddress("localhost", 12345));
MongoDatabase adminDatabase = mongoClient.getDatabase("admin");
BasicDBObject replicaSetSetting = new BasicDBObject();
replicaSetSetting.put("_id", "rs0");
BasicDBList members = new BasicDBList();
DBObject host = new BasicDBObject();
host.put("_id", 0);
host.put("host", "127.0.0.1:12345");
members.add(host);
replicaSetSetting.put("members", members);
adminDatabase.runCommand(new BasicDBObject("isMaster", 1));
adminDatabase.runCommand(new BasicDBObject("replSetInitiate", replicaSetSetting));
MongoDatabase db = mongoClient.getDatabase("mydb");
db.createCollection("test1");
db.createCollection("test2");
db.createCollection("test3");
} catch (Exception e) {
// Assert.assertTrue(false);
}
task = new MongodbSourceTask();
offsetStorageReader = PowerMock.createMock(OffsetStorageReader.class);
context = PowerMock.createMock(SourceTaskContext.class);
task.initialize(context);
sourceProperties = new HashMap<>();
sourceProperties.put("host", "localhost");
sourceProperties.put("port", Integer.toString(12345));
sourceProperties.put("batch.size", Integer.toString(100));
sourceProperties.put("schema.name", "schema");
sourceProperties.put("topic.prefix", "prefix");
sourceProperties.put("databases", "mydb.test1,mydb.test2,mydb.test3");
}
开发者ID:DataReply,项目名称:kafka-connect-mongodb,代码行数:51,代码来源:MongodbSourceTaskTest.java
示例15: initialize
import org.apache.kafka.connect.source.SourceTaskContext; //导入依赖的package包/类
@Override
public void initialize(SourceTaskContext context) {
super.initialize(context);
}
开发者ID:yaravind,项目名称:kafka-connect-jenkins,代码行数:5,代码来源:JenkinsSourceTask.java
示例16: testSourceTaskProcessAll
import org.apache.kafka.connect.source.SourceTaskContext; //导入依赖的package包/类
@Test
public void testSourceTaskProcessAll() {
logger.info (
"Setting up stubs for clean run with no published messages"
);
SourceTaskContext context = EasyMock.createMock(
SourceTaskContext.class
);
OffsetStorageReader offsetStorageReader = EasyMock.createMock (
OffsetStorageReader.class
);
EasyMock.expect(context.offsetStorageReader())
.andReturn(offsetStorageReader);
EasyMock.expect(offsetStorageReader.offsets(offsetPartitions()))
.andReturn(new HashMap<Map<String, String>, Map<String, Object>>());
EasyMock.checkOrder(context, false);
EasyMock.replay(context);
EasyMock.checkOrder(offsetStorageReader, false);
EasyMock.replay(offsetStorageReader);
logger.info ("Source task poll all - test");
ReplicateSourceTask sourceTask = new ReplicateSourceTask();
sourceTask.initialize(context);
try {
sourceTask.start(getConfigPropsForSourceTask());
assertEquals (
"Expecting no record for first poll, nothing in PLOG",
null,
sourceTask.poll()
);
/* expect 2 messages per PLOG, 3 PLOGs with data */
List<SourceRecord> records = sourceTask.poll();
validateRecordCount (records.size());
validateTopics (records);
records = sourceTask.poll();
validateRecordCount (records.size());
validateTopics (records);
records = sourceTask.poll();
validateRecordCount (records.size());
validateTopics (records);
}
catch (InterruptedException ie) {
logger.info ("Stopping task");
}
catch (Exception e) {
e.printStackTrace();
fail (e.getMessage());
}
finally {
sourceTask.stop();
}
}
开发者ID:dbvisitsoftware,项目名称:replicate-connector-for-kafka,代码行数:66,代码来源:ReplicateSourceTaskTest.java
示例17: testSourceTaskProcessTwoMessagesBySCN
import org.apache.kafka.connect.source.SourceTaskContext; //导入依赖的package包/类
@Test
public void testSourceTaskProcessTwoMessagesBySCN() {
logger.info (
"Setting up stubs using global SCN filter for fake cold start"
);
SourceTaskContext context = EasyMock.createMock(
SourceTaskContext.class
);
OffsetStorageReader offsetStorageReader = EasyMock.createMock (
OffsetStorageReader.class
);
EasyMock.expect(context.offsetStorageReader())
.andReturn(offsetStorageReader);
EasyMock.expect(offsetStorageReader.offsets(offsetPartitions()))
.andReturn(new HashMap<Map<String, String>, Map<String, Object>>());
EasyMock.checkOrder(context, false);
EasyMock.replay(context);
EasyMock.checkOrder(offsetStorageReader, false);
EasyMock.replay(offsetStorageReader);
logger.info ("Source task poll all committed, no new data");
ReplicateSourceTask sourceTask = new ReplicateSourceTask();
sourceTask.initialize(context);
sourceTask.start(
getConfigPropsForSourceTaskWithStartSCN(PLOG_22_TRANSACTION_SCN)
);
try {
/* expect no new messages to publish for first poll */
List<SourceRecord> records = sourceTask.poll();
assertNull (
"Expecting no records for first PLOG 20, contains no data",
records
);
records = sourceTask.poll();
assertNull (
"Expecting no records for second PLOG",
records
);
/* expect 2 messages for last two PLOG */
records = sourceTask.poll();
validateRecordCount (records.size());
validateTopics (records);
validateRecordOffset(records, PLOG_21_TRANSACTION_OFFSET);
records = sourceTask.poll();
validateRecordCount (records.size());
validateTopics (records);
validateRecordOffset(records, PLOG_22_TRANSACTION_OFFSET);
/* next call will block because there is no further data */
}
catch (InterruptedException ie) {
logger.info ("Stopping task");
}
catch (Exception e) {
e.printStackTrace();
fail (e.getMessage());
}
finally {
sourceTask.stop();
}
}
开发者ID:dbvisitsoftware,项目名称:replicate-connector-for-kafka,代码行数:75,代码来源:ReplicateSourceTaskTest.java
注:本文中的org.apache.kafka.connect.source.SourceTaskContext类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论