• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java RetriableException类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.kafka.connect.errors.RetriableException的典型用法代码示例。如果您正苦于以下问题:Java RetriableException类的具体用法?Java RetriableException怎么用?Java RetriableException使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



RetriableException类属于org.apache.kafka.connect.errors包,在下文中一共展示了RetriableException类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: connectInternal

import org.apache.kafka.connect.errors.RetriableException; //导入依赖的package包/类
/**
 * Internal method to connect to MQ.
 *
 * @throws RetriableException Operation failed, but connector should continue to retry.
 * @throws ConnectException   Operation failed and connector should stop.
 */
private void connectInternal() throws ConnectException, RetriableException {
    if (connected) {
        return;
    }

    try {
        if (userName != null) {
            jmsCtxt = mqConnFactory.createContext(userName, password, JMSContext.SESSION_TRANSACTED);
        }
        else {
            jmsCtxt = mqConnFactory.createContext(JMSContext.SESSION_TRANSACTED);
        }            

        jmsProd = jmsCtxt.createProducer();
        jmsProd.setDeliveryMode(deliveryMode);
        jmsProd.setTimeToLive(timeToLive);
        connected = true;
    }
    catch (JMSRuntimeException jmse) {
        log.debug("JMS exception {}", jmse);
        handleException(jmse);
    }
}
 
开发者ID:ibm-messaging,项目名称:kafka-connect-mq-sink,代码行数:30,代码来源:JMSWriter.java


示例2: commit

import org.apache.kafka.connect.errors.RetriableException; //导入依赖的package包/类
/**
 * Commits the current transaction. If the current transaction contains a message that could not
 * be processed, the transaction is "in peril" and is rolled back instead to avoid data loss.
 *
 * @throws RetriableException Operation failed, but connector should continue to retry.
 * @throws ConnectException   Operation failed and connector should stop.
 */
public void commit() throws ConnectException, RetriableException {
    connectInternal();
    try {
        if (inflight) {
            inflight = false;

            if (inperil) {
                inperil = false;
                log.trace("Rolling back in-flight transaction");
                jmsCtxt.rollback();
                throw new RetriableException("Transaction rolled back");
            }
            else {
                jmsCtxt.commit();
            }
        }
    }
    catch (JMSRuntimeException jmse) {
        log.debug("JMS exception {}", jmse);
        handleException(jmse);
    }
}
 
开发者ID:ibm-messaging,项目名称:kafka-connect-mq-source,代码行数:30,代码来源:JMSReader.java


示例3: connectInternal

import org.apache.kafka.connect.errors.RetriableException; //导入依赖的package包/类
/**
 * Internal method to connect to MQ.
 *
 * @throws RetriableException Operation failed, but connector should continue to retry.
 * @throws ConnectException   Operation failed and connector should stop.
 */
private void connectInternal() throws ConnectException, RetriableException {
    if (connected) {
        return;
    }

    if (closeNow.get()) {
        throw new ConnectException("Connection closing");
    }

    try {
        if (userName != null) {
            jmsCtxt = mqConnFactory.createContext(userName, password, JMSContext.SESSION_TRANSACTED);
        }
        else {
            jmsCtxt = mqConnFactory.createContext(JMSContext.SESSION_TRANSACTED);
        }            

        jmsCons = jmsCtxt.createConsumer(queue);
        connected = true;
    }
    catch (JMSRuntimeException jmse) {
        log.debug("JMS exception {}", jmse);
        handleException(jmse);
    }
}
 
开发者ID:ibm-messaging,项目名称:kafka-connect-mq-source,代码行数:32,代码来源:JMSReader.java


示例4: put

import org.apache.kafka.connect.errors.RetriableException; //导入依赖的package包/类
@Override
public void put(final Collection<SinkRecord> records) {
    if (records.isEmpty()) {
        return;
    }
    log.info("===>>>records size:{}", records.size());
    try {
        writer.batchWrite(records);
    } catch (SQLException e) {
        log.error(e.getMessage(), e);
        if (remainRetries <= 0) {
            throw new ConnectException(e);
        } else {
            writer.closeQuietly();
            writer = new MySqlDbWriter(config);
            remainRetries--;
            context.timeout(config.retryBackoffMs);
            throw new RetriableException(e);
        }
    }
    remainRetries = config.maxRetries;
}
 
开发者ID:songxin1990,项目名称:maxwell-sink,代码行数:23,代码来源:MySqlSinkTask.java


示例5: put

import org.apache.kafka.connect.errors.RetriableException; //导入依赖的package包/类
@Override
public void put(final Collection<SinkRecord> records) {
	if (records == null || records.isEmpty()) {
		return;
	}
	log.info("===>>>records size:{}", records.size());
	try {
		writer.batchWrite(records);
	} catch (SQLException e) {
		log.error(e.getMessage(), e);
		if (remainRetries <= 0) {
			throw new ConnectException(e);
		} else {
			writer.closeQuietly();
			writer = new MySqlDbWriter(config);
			remainRetries--;
			context.timeout(config.retryBackoffMs);
			throw new RetriableException(e);
		}
	}
	remainRetries = config.maxRetries;
}
 
开发者ID:songxin1990,项目名称:maxwell-sink,代码行数:23,代码来源:MySqlSinkTask.java


示例6: connectionRefused

import org.apache.kafka.connect.errors.RetriableException; //导入依赖的package包/类
@Test
public void connectionRefused() throws IOException {
  Collection<SinkRecord> sinkRecords = new ArrayList<>();
  SinkRecordContentTest.addRecord(sinkRecords, ImmutableMap.of("host", "hostname.example.com"));
  SinkRecordContentTest.addRecord(sinkRecords, ImmutableMap.of("host", "hostname.example.com", "time", new Date(1472256858924L), "source", "testapp"));
  SinkRecordContentTest.addRecord(sinkRecords, ImmutableMap.of("host", "hostname.example.com", "time", new Date(1472256858924L), "source", "testapp", "sourcetype", "txt", "index", "main"));

  final LowLevelHttpRequest httpRequest = mock(LowLevelHttpRequest.class, CALLS_REAL_METHODS);
  when(httpRequest.execute()).thenThrow(ConnectException.class);
  this.task.transport = new MockHttpTransport() {
    @Override
    public LowLevelHttpRequest buildRequest(String method, String url) throws IOException {
      return httpRequest;
    }
  };

  this.task.httpRequestFactory = this.task.transport.createRequestFactory(this.task.httpRequestInitializer);
  assertThrows(RetriableException.class, () -> this.task.put(sinkRecords));
}
 
开发者ID:jcustenborder,项目名称:kafka-connect-splunk,代码行数:20,代码来源:SplunkHttpSinkTaskTest.java


示例7: put

import org.apache.kafka.connect.errors.RetriableException; //导入依赖的package包/类
@Override
public void put(Collection<SinkRecord> records) throws ConnectException {
  for (SinkRecord record : records) {
    try {
      String topic = record.topic();
      int partition = record.kafkaPartition();
      TopicPartition tp = new TopicPartition(topic, partition);
      BlockGZIPFileWriter buffer = tmpFiles.get(tp);
      if (buffer == null) {
        log.error("Trying to put {} records to partition {} which doesn't exist yet", records.size(), tp);
        throw new ConnectException("Trying to put records for a topic partition that has not be assigned");
      }
      buffer.write(record.value().toString());
    } catch (IOException e) {
      throw new RetriableException("Failed to write to buffer", e);
    }
  }
}
 
开发者ID:DeviantArt,项目名称:kafka-connect-s3,代码行数:19,代码来源:S3SinkTask.java


示例8: process

import org.apache.kafka.connect.errors.RetriableException; //导入依赖的package包/类
protected HttpResponse process(final HttpRequestBase request) {

        try {
            return httpClient.execute(request, localContext);
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            throw new RetriableException(e);
        }
    }
 
开发者ID:sanjuthomas,项目名称:kafka-connect-marklogic,代码行数:10,代码来源:MarkLogicWriter.java


示例9: put

import org.apache.kafka.connect.errors.RetriableException; //导入依赖的package包/类
@Override
public void put(final Collection<SinkRecord> records) {

    if (records.isEmpty()) {
        logger.debug("Empty record collection to process");
        return;
    }

    final SinkRecord first = records.iterator().next();
    final int recordsCount = records.size();
    logger.debug("Received {} records. kafka coordinates from record: Topic - {}, Partition - {}, Offset - {}",
                    recordsCount, first.topic(), first.kafkaPartition(), first.kafkaOffset());

    try {
        writer.write(records);
    } catch (final RetriableException e) {
        if (maxRetires > 0 && remainingRetries == 0) {
            throw new ConnectException("Retries exhausted, ending the task. Manual restart is required.");
        }else{
            logger.warn("Setting the task timeout to {} ms upon RetriableException", timeout);
            initWriter(config);
            context.timeout(timeout);
            remainingRetries--;
            throw e;
        }
    }
    this.remainingRetries = maxRetires;
}
 
开发者ID:sanjuthomas,项目名称:kafka-connect-marklogic,代码行数:29,代码来源:MarkLogicSinkTask.java


示例10: send

import org.apache.kafka.connect.errors.RetriableException; //导入依赖的package包/类
/**
 * Sends a message to MQ. Adds the message to the current transaction. Reconnects to MQ if required.
 * 
 * @param r                  The message and schema to send
 *
 * @throws RetriableException Operation failed, but connector should continue to retry.
 * @throws ConnectException   Operation failed and connector should stop.
 */
public void send(SinkRecord r) throws ConnectException, RetriableException {
    connectInternal();

    try {
        Message m = builder.fromSinkRecord(jmsCtxt, r);
        inflight = true;
        jmsProd.send(queue, m);
    }
    catch (JMSRuntimeException jmse) {
        log.debug("JMS exception {}", jmse);
        handleException(jmse);
    }
}
 
开发者ID:ibm-messaging,项目名称:kafka-connect-mq-sink,代码行数:22,代码来源:JMSWriter.java


示例11: commit

import org.apache.kafka.connect.errors.RetriableException; //导入依赖的package包/类
/**
 * Commits the current transaction.
 *
 * @throws RetriableException Operation failed, but connector should continue to retry.
 * @throws ConnectException   Operation failed and connector should stop.
 */
public void commit() throws ConnectException, RetriableException {
    connectInternal();
    try {
        if (inflight) {
            inflight = false;
        }

        jmsCtxt.commit();
    }
    catch (JMSRuntimeException jmse) {
        log.debug("JMS exception {}", jmse);
        handleException(jmse);
    }
}
 
开发者ID:ibm-messaging,项目名称:kafka-connect-mq-sink,代码行数:21,代码来源:JMSWriter.java


示例12: commitRecord

import org.apache.kafka.connect.errors.RetriableException; //导入依赖的package包/类
@Override
public void commitRecord(SourceRecord record) throws InterruptedException {
  Long deliveryTag = (Long) record.sourceOffset().get("deliveryTag");
  try {
    this.channel.basicAck(deliveryTag, false);
  } catch (IOException e) {
    throw new RetriableException(e);
  }
}
 
开发者ID:jcustenborder,项目名称:kafka-connect-rabbitmq,代码行数:10,代码来源:RabbitMQSourceTask.java


示例13: put

import org.apache.kafka.connect.errors.RetriableException; //导入依赖的package包/类
@Override
public void put(Collection<SinkRecord> sinkRecords) {
  for (SinkRecord record : sinkRecords) {
    log.trace("current sinkRecord value: " + record.value());
    if (!(record.value() instanceof byte[])) {
      throw new ConnectException("the value of the record has an invalid type (must be of type byte[])");
    }
    try {
      channel.basicPublish(this.config.exchange, this.config.routingKey, null, (byte[]) record.value());
    } catch (IOException e) {
      log.error("There was an error while publishing the outgoing message to RabbitMQ");
      throw new RetriableException(e);
    }
  }
}
 
开发者ID:jcustenborder,项目名称:kafka-connect-rabbitmq,代码行数:16,代码来源:RabbitMQSinkTask.java


示例14: deliverMessages

import org.apache.kafka.connect.errors.RetriableException; //导入依赖的package包/类
private void deliverMessages() {
    // Finally, deliver this batch to the sink
    try {
        // Since we reuse the messageBatch buffer, ensure we give the task its own copy
        task.put(new ArrayList<>(messageBatch));
        for (SinkRecord record : messageBatch)
            currentOffsets.put(new TopicPartition(record.topic(), record.kafkaPartition()),
                    new OffsetAndMetadata(record.kafkaOffset() + 1));
        messageBatch.clear();
        // If we had paused all consumer topic partitions to try to redeliver data, then we should resume any that
        // the task had not explicitly paused
        if (pausedForRedelivery) {
            if (!shouldPause())
                resumeAll();
            pausedForRedelivery = false;
        }
    } catch (RetriableException e) {
        log.error("RetriableException from SinkTask {}:", id, e);
        // If we're retrying a previous batch, make sure we've paused all topic partitions so we don't get new data,
        // but will still be able to poll in order to handle user-requested timeouts, keep group membership, etc.
        pausedForRedelivery = true;
        pauseAll();
        // Let this exit normally, the batch will be reprocessed on the next loop.
    } catch (Throwable t) {
        log.error("Task {} threw an uncaught and unrecoverable exception", id, t);
        log.error("Task is being killed and will not recover until manually restarted");
        throw new ConnectException("Exiting WorkerSinkTask due to unrecoverable exception.");
    }
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:30,代码来源:WorkerSinkTask.java


示例15: testPollRedelivery

import org.apache.kafka.connect.errors.RetriableException; //导入依赖的package包/类
@Test
public void testPollRedelivery() throws Exception {
    expectInitializeTask();
    expectPollInitialAssignment();

    // If a retriable exception is thrown, we should redeliver the same batch, pausing the consumer in the meantime
    expectConsumerPoll(1);
    expectConversionAndTransformation(1);
    Capture<Collection<SinkRecord>> records = EasyMock.newCapture(CaptureType.ALL);
    sinkTask.put(EasyMock.capture(records));
    EasyMock.expectLastCall().andThrow(new RetriableException("retry"));
    // Pause
    HashSet<TopicPartition> partitions = new HashSet<>(asList(TOPIC_PARTITION, TOPIC_PARTITION2));
    EasyMock.expect(consumer.assignment()).andReturn(partitions);
    consumer.pause(partitions);
    PowerMock.expectLastCall();

    // Retry delivery should succeed
    expectConsumerPoll(0);
    sinkTask.put(EasyMock.capture(records));
    EasyMock.expectLastCall();
    // And unpause
    EasyMock.expect(consumer.assignment()).andReturn(partitions);
    consumer.resume(singleton(TOPIC_PARTITION));
    PowerMock.expectLastCall();
    consumer.resume(singleton(TOPIC_PARTITION2));
    PowerMock.expectLastCall();

    PowerMock.replayAll();

    workerTask.initialize(TASK_CONFIG);
    workerTask.initializeAndStart();
    workerTask.iteration();
    workerTask.iteration();
    workerTask.iteration();

    PowerMock.verifyAll();
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:39,代码来源:WorkerSinkTaskTest.java


示例16: put

import org.apache.kafka.connect.errors.RetriableException; //导入依赖的package包/类
@Override
public void put(Collection<SinkRecord> collection) {
  try {
    for (SinkRecord record : collection) {
      appendRecord(record);
    }
  } catch (IOException e) {
    LOGGER.warn("Failed to append records. Retrying batch.", e);
    throw new RetriableException(e);
  }
}
 
开发者ID:lepfhty,项目名称:kafka-connect-redshift,代码行数:12,代码来源:RedshiftSinkTask.java


示例17: flush

import org.apache.kafka.connect.errors.RetriableException; //导入依赖的package包/类
@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) throws ConnectException {
  // Don't rely on offsets passed. They have some quirks like including topic partitions that just
  // got revoked (i.e. we have deleted the writer already). Not sure if this is intended...
  // https://twitter.com/mr_paul_banks/status/702493772983177218

  // Instead iterate over the writers we do have and get the offsets directly from them.
  for (Map.Entry<TopicPartition, BlockGZIPFileWriter> entry : tmpFiles.entrySet()) {
    TopicPartition tp = entry.getKey();
    BlockGZIPFileWriter writer = entry.getValue();
    if (writer.getNumRecords() == 0) {
      // Not done anything yet
      log.info("No new records for partition {}", tp);
      continue;
    }
    try {
      writer.close();

      long nextOffset = s3.putChunk(writer.getDataFilePath(), writer.getIndexFilePath(), tp);

      // Now reset writer to a new one
      tmpFiles.put(tp, this.createNextBlockWriter(tp, nextOffset));
      log.info("Successfully uploaded chunk for {} now at offset {}", tp, nextOffset);
    } catch (FileNotFoundException fnf) {
      throw new ConnectException("Failed to find local dir for temp files", fnf);
    } catch (IOException e) {
      throw new RetriableException("Failed S3 upload", e);
    }
  }
}
 
开发者ID:DeviantArt,项目名称:kafka-connect-s3,代码行数:31,代码来源:S3SinkTask.java


示例18: put

import org.apache.kafka.connect.errors.RetriableException; //导入依赖的package包/类
/**
 * Put the records in the sink. Usually this should send the records to the sink asynchronously
 * and immediately return.
 * <p>
 * If this operation fails, the SinkTask may throw a {@link RetriableException} to
 * indicate that the framework should attempt to retry the same call again. Other exceptions will cause the task to
 * be stopped immediately. {@link org.apache.kafka.connect.sink.SinkTaskContext#timeout(long)} can be used to set the maximum time before the
 * batch will be retried.
 *
 * @param records the set of records to send
 */
@Override
public void put(Collection<SinkRecord> records) {
  if (records.isEmpty()) { return; }

  if (log.isTraceEnabled()) {
    final SinkRecord first = records.iterator().next();
    final int recordsCount = records.size();
    log.trace("Received {} records. First record kafka coordinates:({}-{}-{}). Writing them to Kudu...",
      recordsCount, first.topic(), first.kafkaPartition(), first.kafkaOffset());
  }

  try {
    writer.write(records);
  } catch (KuduException ke) {
    if (ke instanceof PleaseThrottleException) {
      log.warn("Write of {} records failed. Kudu asks to throttle. Will retry.", records.size(), ke);
      throw new RetriableException(ke);
    } else {
      log.warn("Write of {} records failed, remainingRetries={}", records.size(), remainingRetries, ke);
      if (remainingRetries == 0) {
        throw new ConnectException(ke);
      }
      else {
        writer.close();
        initWriter();
        remainingRetries--;
        context.timeout(config.retryBackoffMs);
        throw new RetriableException(ke);
      }
    }
  }
  remainingRetries = config.maxRetries;
}
 
开发者ID:onfocusio,项目名称:kafka-connect-kudu,代码行数:45,代码来源:KuduSinkTask.java


示例19: handleException

import org.apache.kafka.connect.errors.RetriableException; //导入依赖的package包/类
/**
 * Handles exceptions from MQ. Some JMS exceptions are treated as retriable meaning that the
 * connector can keep running and just trying again is likely to fix things.
 *
 * @throws RetriableException Operation failed, but connector should continue to retry.
 * @throws ConnectException   Operation failed and connector should stop.
 */
private void handleException(Throwable exc) throws ConnectException, RetriableException {
    boolean isRetriable = false;
    boolean mustClose = true;
    int reason = -1;

    // Try to extract the MQ reason code to see if it's a retriable exception
    Throwable t = exc.getCause();
    while (t != null) {
        if (t instanceof MQException) {
            MQException mqe = (MQException)t;
            log.error("MQ error: CompCode {}, Reason {}", mqe.getCompCode(), mqe.getReason());
            reason = mqe.getReason();
            break;
        }
        t = t.getCause();
    }

    switch (reason)
    {
        // These reason codes indicate that the connection needs to be closed, but just retrying later
        // will probably recover
        case MQConstants.MQRC_BACKED_OUT:
        case MQConstants.MQRC_CHANNEL_NOT_AVAILABLE:
        case MQConstants.MQRC_CONNECTION_BROKEN:
        case MQConstants.MQRC_HOST_NOT_AVAILABLE:
        case MQConstants.MQRC_NOT_AUTHORIZED:
        case MQConstants.MQRC_Q_MGR_NOT_AVAILABLE:
        case MQConstants.MQRC_Q_MGR_QUIESCING:
        case MQConstants.MQRC_Q_MGR_STOPPING:
        case MQConstants.MQRC_UNEXPECTED_ERROR:
            isRetriable = true;
            break;

        // These reason codes indicates that the connect is still OK, but just retrying later
        // will probably recover - possibly with administrative action on the queue manager
        case MQConstants.MQRC_Q_FULL:
        case MQConstants.MQRC_PUT_INHIBITED:
            isRetriable = true;
            mustClose = false;
            break;
    }

    if (mustClose) {
        close();
    }

    if (isRetriable) {
        throw new RetriableException(exc);
    }
    throw new ConnectException(exc);
}
 
开发者ID:ibm-messaging,项目名称:kafka-connect-mq-sink,代码行数:59,代码来源:JMSWriter.java


示例20: receive

import org.apache.kafka.connect.errors.RetriableException; //导入依赖的package包/类
/**
 * Receives a message from MQ. Adds the message to the current transaction. Reconnects to MQ if required.
 *
 * @param wait                Whether to wait indefinitely for a message
 *
 * @return The SourceRecord representing the message
 *
 * @throws RetriableException Operation failed, but connector should continue to retry.
 * @throws ConnectException   Operation failed and connector should stop.
 */
public SourceRecord receive(boolean wait) throws ConnectException, RetriableException {
    connectInternal();

    Message m = null;
    SourceRecord sr = null;
    try {
        if (wait) {
            while ((m == null) && !closeNow.get())
            {
                log.trace("Waiting {} ms for message", RECEIVE_TIMEOUT);
                m = jmsCons.receive(RECEIVE_TIMEOUT);
            }

            if (m == null) {
                log.trace("No message received");
            }
        }
        else {
            m = jmsCons.receiveNoWait();
        }

        if (m != null) {
            inflight = true;

            // We've received a message in a transacted session so we must only permit the transaction
            // to commit once we've passed it on to Kafka. Temporarily mark the transaction as "in-peril"
            // so that any exception thrown will result in the transaction rolling back instead of committing.
            inperil = true;
            
            sr = builder.toSourceRecord(jmsCtxt, topic, messageBodyJms, m);
            inperil = false;
        }
    }
    catch (JMSException | JMSRuntimeException | ConnectException exc) {
        log.debug("JMS exception {}", exc);
        handleException(exc);
    }

    return sr;
}
 
开发者ID:ibm-messaging,项目名称:kafka-connect-mq-source,代码行数:51,代码来源:JMSReader.java



注:本文中的org.apache.kafka.connect.errors.RetriableException类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java XSAnnotation类代码示例发布时间:2022-05-21
下一篇:
Java HotSpotVirtualMachine类代码示例发布时间:2022-05-21
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap