本文整理汇总了Java中org.apache.kafka.connect.errors.RetriableException类的典型用法代码示例。如果您正苦于以下问题:Java RetriableException类的具体用法?Java RetriableException怎么用?Java RetriableException使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
RetriableException类属于org.apache.kafka.connect.errors包,在下文中一共展示了RetriableException类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: connectInternal
import org.apache.kafka.connect.errors.RetriableException; //导入依赖的package包/类
/**
* Internal method to connect to MQ.
*
* @throws RetriableException Operation failed, but connector should continue to retry.
* @throws ConnectException Operation failed and connector should stop.
*/
private void connectInternal() throws ConnectException, RetriableException {
if (connected) {
return;
}
try {
if (userName != null) {
jmsCtxt = mqConnFactory.createContext(userName, password, JMSContext.SESSION_TRANSACTED);
}
else {
jmsCtxt = mqConnFactory.createContext(JMSContext.SESSION_TRANSACTED);
}
jmsProd = jmsCtxt.createProducer();
jmsProd.setDeliveryMode(deliveryMode);
jmsProd.setTimeToLive(timeToLive);
connected = true;
}
catch (JMSRuntimeException jmse) {
log.debug("JMS exception {}", jmse);
handleException(jmse);
}
}
开发者ID:ibm-messaging,项目名称:kafka-connect-mq-sink,代码行数:30,代码来源:JMSWriter.java
示例2: commit
import org.apache.kafka.connect.errors.RetriableException; //导入依赖的package包/类
/**
* Commits the current transaction. If the current transaction contains a message that could not
* be processed, the transaction is "in peril" and is rolled back instead to avoid data loss.
*
* @throws RetriableException Operation failed, but connector should continue to retry.
* @throws ConnectException Operation failed and connector should stop.
*/
public void commit() throws ConnectException, RetriableException {
connectInternal();
try {
if (inflight) {
inflight = false;
if (inperil) {
inperil = false;
log.trace("Rolling back in-flight transaction");
jmsCtxt.rollback();
throw new RetriableException("Transaction rolled back");
}
else {
jmsCtxt.commit();
}
}
}
catch (JMSRuntimeException jmse) {
log.debug("JMS exception {}", jmse);
handleException(jmse);
}
}
开发者ID:ibm-messaging,项目名称:kafka-connect-mq-source,代码行数:30,代码来源:JMSReader.java
示例3: connectInternal
import org.apache.kafka.connect.errors.RetriableException; //导入依赖的package包/类
/**
* Internal method to connect to MQ.
*
* @throws RetriableException Operation failed, but connector should continue to retry.
* @throws ConnectException Operation failed and connector should stop.
*/
private void connectInternal() throws ConnectException, RetriableException {
if (connected) {
return;
}
if (closeNow.get()) {
throw new ConnectException("Connection closing");
}
try {
if (userName != null) {
jmsCtxt = mqConnFactory.createContext(userName, password, JMSContext.SESSION_TRANSACTED);
}
else {
jmsCtxt = mqConnFactory.createContext(JMSContext.SESSION_TRANSACTED);
}
jmsCons = jmsCtxt.createConsumer(queue);
connected = true;
}
catch (JMSRuntimeException jmse) {
log.debug("JMS exception {}", jmse);
handleException(jmse);
}
}
开发者ID:ibm-messaging,项目名称:kafka-connect-mq-source,代码行数:32,代码来源:JMSReader.java
示例4: put
import org.apache.kafka.connect.errors.RetriableException; //导入依赖的package包/类
@Override
public void put(final Collection<SinkRecord> records) {
if (records.isEmpty()) {
return;
}
log.info("===>>>records size:{}", records.size());
try {
writer.batchWrite(records);
} catch (SQLException e) {
log.error(e.getMessage(), e);
if (remainRetries <= 0) {
throw new ConnectException(e);
} else {
writer.closeQuietly();
writer = new MySqlDbWriter(config);
remainRetries--;
context.timeout(config.retryBackoffMs);
throw new RetriableException(e);
}
}
remainRetries = config.maxRetries;
}
开发者ID:songxin1990,项目名称:maxwell-sink,代码行数:23,代码来源:MySqlSinkTask.java
示例5: put
import org.apache.kafka.connect.errors.RetriableException; //导入依赖的package包/类
@Override
public void put(final Collection<SinkRecord> records) {
if (records == null || records.isEmpty()) {
return;
}
log.info("===>>>records size:{}", records.size());
try {
writer.batchWrite(records);
} catch (SQLException e) {
log.error(e.getMessage(), e);
if (remainRetries <= 0) {
throw new ConnectException(e);
} else {
writer.closeQuietly();
writer = new MySqlDbWriter(config);
remainRetries--;
context.timeout(config.retryBackoffMs);
throw new RetriableException(e);
}
}
remainRetries = config.maxRetries;
}
开发者ID:songxin1990,项目名称:maxwell-sink,代码行数:23,代码来源:MySqlSinkTask.java
示例6: connectionRefused
import org.apache.kafka.connect.errors.RetriableException; //导入依赖的package包/类
@Test
public void connectionRefused() throws IOException {
Collection<SinkRecord> sinkRecords = new ArrayList<>();
SinkRecordContentTest.addRecord(sinkRecords, ImmutableMap.of("host", "hostname.example.com"));
SinkRecordContentTest.addRecord(sinkRecords, ImmutableMap.of("host", "hostname.example.com", "time", new Date(1472256858924L), "source", "testapp"));
SinkRecordContentTest.addRecord(sinkRecords, ImmutableMap.of("host", "hostname.example.com", "time", new Date(1472256858924L), "source", "testapp", "sourcetype", "txt", "index", "main"));
final LowLevelHttpRequest httpRequest = mock(LowLevelHttpRequest.class, CALLS_REAL_METHODS);
when(httpRequest.execute()).thenThrow(ConnectException.class);
this.task.transport = new MockHttpTransport() {
@Override
public LowLevelHttpRequest buildRequest(String method, String url) throws IOException {
return httpRequest;
}
};
this.task.httpRequestFactory = this.task.transport.createRequestFactory(this.task.httpRequestInitializer);
assertThrows(RetriableException.class, () -> this.task.put(sinkRecords));
}
开发者ID:jcustenborder,项目名称:kafka-connect-splunk,代码行数:20,代码来源:SplunkHttpSinkTaskTest.java
示例7: put
import org.apache.kafka.connect.errors.RetriableException; //导入依赖的package包/类
@Override
public void put(Collection<SinkRecord> records) throws ConnectException {
for (SinkRecord record : records) {
try {
String topic = record.topic();
int partition = record.kafkaPartition();
TopicPartition tp = new TopicPartition(topic, partition);
BlockGZIPFileWriter buffer = tmpFiles.get(tp);
if (buffer == null) {
log.error("Trying to put {} records to partition {} which doesn't exist yet", records.size(), tp);
throw new ConnectException("Trying to put records for a topic partition that has not be assigned");
}
buffer.write(record.value().toString());
} catch (IOException e) {
throw new RetriableException("Failed to write to buffer", e);
}
}
}
开发者ID:DeviantArt,项目名称:kafka-connect-s3,代码行数:19,代码来源:S3SinkTask.java
示例8: process
import org.apache.kafka.connect.errors.RetriableException; //导入依赖的package包/类
protected HttpResponse process(final HttpRequestBase request) {
try {
return httpClient.execute(request, localContext);
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RetriableException(e);
}
}
开发者ID:sanjuthomas,项目名称:kafka-connect-marklogic,代码行数:10,代码来源:MarkLogicWriter.java
示例9: put
import org.apache.kafka.connect.errors.RetriableException; //导入依赖的package包/类
@Override
public void put(final Collection<SinkRecord> records) {
if (records.isEmpty()) {
logger.debug("Empty record collection to process");
return;
}
final SinkRecord first = records.iterator().next();
final int recordsCount = records.size();
logger.debug("Received {} records. kafka coordinates from record: Topic - {}, Partition - {}, Offset - {}",
recordsCount, first.topic(), first.kafkaPartition(), first.kafkaOffset());
try {
writer.write(records);
} catch (final RetriableException e) {
if (maxRetires > 0 && remainingRetries == 0) {
throw new ConnectException("Retries exhausted, ending the task. Manual restart is required.");
}else{
logger.warn("Setting the task timeout to {} ms upon RetriableException", timeout);
initWriter(config);
context.timeout(timeout);
remainingRetries--;
throw e;
}
}
this.remainingRetries = maxRetires;
}
开发者ID:sanjuthomas,项目名称:kafka-connect-marklogic,代码行数:29,代码来源:MarkLogicSinkTask.java
示例10: send
import org.apache.kafka.connect.errors.RetriableException; //导入依赖的package包/类
/**
* Sends a message to MQ. Adds the message to the current transaction. Reconnects to MQ if required.
*
* @param r The message and schema to send
*
* @throws RetriableException Operation failed, but connector should continue to retry.
* @throws ConnectException Operation failed and connector should stop.
*/
public void send(SinkRecord r) throws ConnectException, RetriableException {
connectInternal();
try {
Message m = builder.fromSinkRecord(jmsCtxt, r);
inflight = true;
jmsProd.send(queue, m);
}
catch (JMSRuntimeException jmse) {
log.debug("JMS exception {}", jmse);
handleException(jmse);
}
}
开发者ID:ibm-messaging,项目名称:kafka-connect-mq-sink,代码行数:22,代码来源:JMSWriter.java
示例11: commit
import org.apache.kafka.connect.errors.RetriableException; //导入依赖的package包/类
/**
* Commits the current transaction.
*
* @throws RetriableException Operation failed, but connector should continue to retry.
* @throws ConnectException Operation failed and connector should stop.
*/
public void commit() throws ConnectException, RetriableException {
connectInternal();
try {
if (inflight) {
inflight = false;
}
jmsCtxt.commit();
}
catch (JMSRuntimeException jmse) {
log.debug("JMS exception {}", jmse);
handleException(jmse);
}
}
开发者ID:ibm-messaging,项目名称:kafka-connect-mq-sink,代码行数:21,代码来源:JMSWriter.java
示例12: commitRecord
import org.apache.kafka.connect.errors.RetriableException; //导入依赖的package包/类
@Override
public void commitRecord(SourceRecord record) throws InterruptedException {
Long deliveryTag = (Long) record.sourceOffset().get("deliveryTag");
try {
this.channel.basicAck(deliveryTag, false);
} catch (IOException e) {
throw new RetriableException(e);
}
}
开发者ID:jcustenborder,项目名称:kafka-connect-rabbitmq,代码行数:10,代码来源:RabbitMQSourceTask.java
示例13: put
import org.apache.kafka.connect.errors.RetriableException; //导入依赖的package包/类
@Override
public void put(Collection<SinkRecord> sinkRecords) {
for (SinkRecord record : sinkRecords) {
log.trace("current sinkRecord value: " + record.value());
if (!(record.value() instanceof byte[])) {
throw new ConnectException("the value of the record has an invalid type (must be of type byte[])");
}
try {
channel.basicPublish(this.config.exchange, this.config.routingKey, null, (byte[]) record.value());
} catch (IOException e) {
log.error("There was an error while publishing the outgoing message to RabbitMQ");
throw new RetriableException(e);
}
}
}
开发者ID:jcustenborder,项目名称:kafka-connect-rabbitmq,代码行数:16,代码来源:RabbitMQSinkTask.java
示例14: deliverMessages
import org.apache.kafka.connect.errors.RetriableException; //导入依赖的package包/类
private void deliverMessages() {
// Finally, deliver this batch to the sink
try {
// Since we reuse the messageBatch buffer, ensure we give the task its own copy
task.put(new ArrayList<>(messageBatch));
for (SinkRecord record : messageBatch)
currentOffsets.put(new TopicPartition(record.topic(), record.kafkaPartition()),
new OffsetAndMetadata(record.kafkaOffset() + 1));
messageBatch.clear();
// If we had paused all consumer topic partitions to try to redeliver data, then we should resume any that
// the task had not explicitly paused
if (pausedForRedelivery) {
if (!shouldPause())
resumeAll();
pausedForRedelivery = false;
}
} catch (RetriableException e) {
log.error("RetriableException from SinkTask {}:", id, e);
// If we're retrying a previous batch, make sure we've paused all topic partitions so we don't get new data,
// but will still be able to poll in order to handle user-requested timeouts, keep group membership, etc.
pausedForRedelivery = true;
pauseAll();
// Let this exit normally, the batch will be reprocessed on the next loop.
} catch (Throwable t) {
log.error("Task {} threw an uncaught and unrecoverable exception", id, t);
log.error("Task is being killed and will not recover until manually restarted");
throw new ConnectException("Exiting WorkerSinkTask due to unrecoverable exception.");
}
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:30,代码来源:WorkerSinkTask.java
示例15: testPollRedelivery
import org.apache.kafka.connect.errors.RetriableException; //导入依赖的package包/类
@Test
public void testPollRedelivery() throws Exception {
expectInitializeTask();
expectPollInitialAssignment();
// If a retriable exception is thrown, we should redeliver the same batch, pausing the consumer in the meantime
expectConsumerPoll(1);
expectConversionAndTransformation(1);
Capture<Collection<SinkRecord>> records = EasyMock.newCapture(CaptureType.ALL);
sinkTask.put(EasyMock.capture(records));
EasyMock.expectLastCall().andThrow(new RetriableException("retry"));
// Pause
HashSet<TopicPartition> partitions = new HashSet<>(asList(TOPIC_PARTITION, TOPIC_PARTITION2));
EasyMock.expect(consumer.assignment()).andReturn(partitions);
consumer.pause(partitions);
PowerMock.expectLastCall();
// Retry delivery should succeed
expectConsumerPoll(0);
sinkTask.put(EasyMock.capture(records));
EasyMock.expectLastCall();
// And unpause
EasyMock.expect(consumer.assignment()).andReturn(partitions);
consumer.resume(singleton(TOPIC_PARTITION));
PowerMock.expectLastCall();
consumer.resume(singleton(TOPIC_PARTITION2));
PowerMock.expectLastCall();
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.iteration();
workerTask.iteration();
workerTask.iteration();
PowerMock.verifyAll();
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:39,代码来源:WorkerSinkTaskTest.java
示例16: put
import org.apache.kafka.connect.errors.RetriableException; //导入依赖的package包/类
@Override
public void put(Collection<SinkRecord> collection) {
try {
for (SinkRecord record : collection) {
appendRecord(record);
}
} catch (IOException e) {
LOGGER.warn("Failed to append records. Retrying batch.", e);
throw new RetriableException(e);
}
}
开发者ID:lepfhty,项目名称:kafka-connect-redshift,代码行数:12,代码来源:RedshiftSinkTask.java
示例17: flush
import org.apache.kafka.connect.errors.RetriableException; //导入依赖的package包/类
@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) throws ConnectException {
// Don't rely on offsets passed. They have some quirks like including topic partitions that just
// got revoked (i.e. we have deleted the writer already). Not sure if this is intended...
// https://twitter.com/mr_paul_banks/status/702493772983177218
// Instead iterate over the writers we do have and get the offsets directly from them.
for (Map.Entry<TopicPartition, BlockGZIPFileWriter> entry : tmpFiles.entrySet()) {
TopicPartition tp = entry.getKey();
BlockGZIPFileWriter writer = entry.getValue();
if (writer.getNumRecords() == 0) {
// Not done anything yet
log.info("No new records for partition {}", tp);
continue;
}
try {
writer.close();
long nextOffset = s3.putChunk(writer.getDataFilePath(), writer.getIndexFilePath(), tp);
// Now reset writer to a new one
tmpFiles.put(tp, this.createNextBlockWriter(tp, nextOffset));
log.info("Successfully uploaded chunk for {} now at offset {}", tp, nextOffset);
} catch (FileNotFoundException fnf) {
throw new ConnectException("Failed to find local dir for temp files", fnf);
} catch (IOException e) {
throw new RetriableException("Failed S3 upload", e);
}
}
}
开发者ID:DeviantArt,项目名称:kafka-connect-s3,代码行数:31,代码来源:S3SinkTask.java
示例18: put
import org.apache.kafka.connect.errors.RetriableException; //导入依赖的package包/类
/**
* Put the records in the sink. Usually this should send the records to the sink asynchronously
* and immediately return.
* <p>
* If this operation fails, the SinkTask may throw a {@link RetriableException} to
* indicate that the framework should attempt to retry the same call again. Other exceptions will cause the task to
* be stopped immediately. {@link org.apache.kafka.connect.sink.SinkTaskContext#timeout(long)} can be used to set the maximum time before the
* batch will be retried.
*
* @param records the set of records to send
*/
@Override
public void put(Collection<SinkRecord> records) {
if (records.isEmpty()) { return; }
if (log.isTraceEnabled()) {
final SinkRecord first = records.iterator().next();
final int recordsCount = records.size();
log.trace("Received {} records. First record kafka coordinates:({}-{}-{}). Writing them to Kudu...",
recordsCount, first.topic(), first.kafkaPartition(), first.kafkaOffset());
}
try {
writer.write(records);
} catch (KuduException ke) {
if (ke instanceof PleaseThrottleException) {
log.warn("Write of {} records failed. Kudu asks to throttle. Will retry.", records.size(), ke);
throw new RetriableException(ke);
} else {
log.warn("Write of {} records failed, remainingRetries={}", records.size(), remainingRetries, ke);
if (remainingRetries == 0) {
throw new ConnectException(ke);
}
else {
writer.close();
initWriter();
remainingRetries--;
context.timeout(config.retryBackoffMs);
throw new RetriableException(ke);
}
}
}
remainingRetries = config.maxRetries;
}
开发者ID:onfocusio,项目名称:kafka-connect-kudu,代码行数:45,代码来源:KuduSinkTask.java
示例19: handleException
import org.apache.kafka.connect.errors.RetriableException; //导入依赖的package包/类
/**
* Handles exceptions from MQ. Some JMS exceptions are treated as retriable meaning that the
* connector can keep running and just trying again is likely to fix things.
*
* @throws RetriableException Operation failed, but connector should continue to retry.
* @throws ConnectException Operation failed and connector should stop.
*/
private void handleException(Throwable exc) throws ConnectException, RetriableException {
boolean isRetriable = false;
boolean mustClose = true;
int reason = -1;
// Try to extract the MQ reason code to see if it's a retriable exception
Throwable t = exc.getCause();
while (t != null) {
if (t instanceof MQException) {
MQException mqe = (MQException)t;
log.error("MQ error: CompCode {}, Reason {}", mqe.getCompCode(), mqe.getReason());
reason = mqe.getReason();
break;
}
t = t.getCause();
}
switch (reason)
{
// These reason codes indicate that the connection needs to be closed, but just retrying later
// will probably recover
case MQConstants.MQRC_BACKED_OUT:
case MQConstants.MQRC_CHANNEL_NOT_AVAILABLE:
case MQConstants.MQRC_CONNECTION_BROKEN:
case MQConstants.MQRC_HOST_NOT_AVAILABLE:
case MQConstants.MQRC_NOT_AUTHORIZED:
case MQConstants.MQRC_Q_MGR_NOT_AVAILABLE:
case MQConstants.MQRC_Q_MGR_QUIESCING:
case MQConstants.MQRC_Q_MGR_STOPPING:
case MQConstants.MQRC_UNEXPECTED_ERROR:
isRetriable = true;
break;
// These reason codes indicates that the connect is still OK, but just retrying later
// will probably recover - possibly with administrative action on the queue manager
case MQConstants.MQRC_Q_FULL:
case MQConstants.MQRC_PUT_INHIBITED:
isRetriable = true;
mustClose = false;
break;
}
if (mustClose) {
close();
}
if (isRetriable) {
throw new RetriableException(exc);
}
throw new ConnectException(exc);
}
开发者ID:ibm-messaging,项目名称:kafka-connect-mq-sink,代码行数:59,代码来源:JMSWriter.java
示例20: receive
import org.apache.kafka.connect.errors.RetriableException; //导入依赖的package包/类
/**
* Receives a message from MQ. Adds the message to the current transaction. Reconnects to MQ if required.
*
* @param wait Whether to wait indefinitely for a message
*
* @return The SourceRecord representing the message
*
* @throws RetriableException Operation failed, but connector should continue to retry.
* @throws ConnectException Operation failed and connector should stop.
*/
public SourceRecord receive(boolean wait) throws ConnectException, RetriableException {
connectInternal();
Message m = null;
SourceRecord sr = null;
try {
if (wait) {
while ((m == null) && !closeNow.get())
{
log.trace("Waiting {} ms for message", RECEIVE_TIMEOUT);
m = jmsCons.receive(RECEIVE_TIMEOUT);
}
if (m == null) {
log.trace("No message received");
}
}
else {
m = jmsCons.receiveNoWait();
}
if (m != null) {
inflight = true;
// We've received a message in a transacted session so we must only permit the transaction
// to commit once we've passed it on to Kafka. Temporarily mark the transaction as "in-peril"
// so that any exception thrown will result in the transaction rolling back instead of committing.
inperil = true;
sr = builder.toSourceRecord(jmsCtxt, topic, messageBodyJms, m);
inperil = false;
}
}
catch (JMSException | JMSRuntimeException | ConnectException exc) {
log.debug("JMS exception {}", exc);
handleException(exc);
}
return sr;
}
开发者ID:ibm-messaging,项目名称:kafka-connect-mq-source,代码行数:51,代码来源:JMSReader.java
注:本文中的org.apache.kafka.connect.errors.RetriableException类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论