本文整理汇总了Java中com.amazonaws.services.kinesis.model.GetRecordsResult类的典型用法代码示例。如果您正苦于以下问题:Java GetRecordsResult类的具体用法?Java GetRecordsResult怎么用?Java GetRecordsResult使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
GetRecordsResult类属于com.amazonaws.services.kinesis.model包,在下文中一共展示了GetRecordsResult类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: noSourceOffsets
import com.amazonaws.services.kinesis.model.GetRecordsResult; //导入依赖的package包/类
@Test
public void noSourceOffsets() throws InterruptedException {
when(this.kinesisClient.getShardIterator(any())).thenReturn(
new GetShardIteratorResult().withShardIterator("dfasdfsadfasdf")
);
this.task.start(settings);
GetRecordsResult recordsResult = new GetRecordsResult()
.withNextShardIterator("dsfargadsfasdfasda")
.withRecords(TestData.record())
.withMillisBehindLatest(0L);
when(this.kinesisClient.getRecords(any())).thenReturn(recordsResult);
List<SourceRecord> records = this.task.poll();
assertNotNull(records, "records should not be null.");
assertFalse(records.isEmpty(), "records should not be empty.");
verify(this.kinesisClient, atLeastOnce()).getShardIterator(any());
verify(this.kinesisClient, atLeastOnce()).getRecords(any());
}
开发者ID:jcustenborder,项目名称:kafka-connect-kinesis,代码行数:23,代码来源:KinesisSourceTaskTest.java
示例2: noRecords
import com.amazonaws.services.kinesis.model.GetRecordsResult; //导入依赖的package包/类
@Test
public void noRecords() throws InterruptedException {
final String SEQUENCE_NUMBER = "asdfasdfddsa";
Map<String, Object> sourceOffset = ImmutableMap.of(RecordConverter.FIELD_SEQUENCE_NUMBER, SEQUENCE_NUMBER);
when(this.offsetStorageReader.offset(anyMap())).thenReturn(sourceOffset);
when(this.kinesisClient.getShardIterator(any())).thenReturn(
new GetShardIteratorResult().withShardIterator("dfasdfsadfasdf")
);
this.task.start(settings);
GetRecordsResult recordsResult = new GetRecordsResult()
.withNextShardIterator("dsfargadsfasdfasda")
.withRecords(Arrays.asList())
.withMillisBehindLatest(0L);
when(this.kinesisClient.getRecords(any())).thenReturn(recordsResult);
List<SourceRecord> records = this.task.poll();
assertNotNull(records, "records should not be null");
assertTrue(records.isEmpty(), "records should be empty.");
verify(this.task.time, atLeastOnce()).sleep(this.config.kinesisEmptyRecordsBackoffMs);
}
开发者ID:jcustenborder,项目名称:kafka-connect-kinesis,代码行数:24,代码来源:KinesisSourceTaskTest.java
示例3: getRecords
import com.amazonaws.services.kinesis.model.GetRecordsResult; //导入依赖的package包/类
/**
* Gets records from Kinesis and deaggregates them if needed.
*
* @return list of deaggregated records
* @throws TransientKinesisException - in case of recoverable situation
*/
public GetKinesisRecordsResult getRecords(final String shardIterator, final String streamName,
final String shardId, final Integer limit)
throws
TransientKinesisException {
return wrapExceptions(new Callable<GetKinesisRecordsResult>() {
@Override
public GetKinesisRecordsResult call() throws Exception {
GetRecordsResult response = kinesis.getRecords(new GetRecordsRequest()
.withShardIterator(shardIterator)
.withLimit(limit));
return new GetKinesisRecordsResult(
UserRecord.deaggregate(response.getRecords()),
response.getNextShardIterator(),
response.getMillisBehindLatest(),
streamName, shardId);
}
});
}
开发者ID:apache,项目名称:beam,代码行数:26,代码来源:SimplifiedKinesisClient.java
示例4: getRecords
import com.amazonaws.services.kinesis.model.GetRecordsResult; //导入依赖的package包/类
/**
* Calls {@link KinesisProxyInterface#getRecords(String, int)}, while also handling unexpected
* AWS {@link ExpiredIteratorException}s to assure that we get results and don't just fail on
* such occasions. The returned shard iterator within the successful {@link GetRecordsResult} should
* be used for the next call to this method.
*
* <p>Note: it is important that this method is not called again before all the records from the last result have been
* fully collected with {@link ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, otherwise
* {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in the middle of an aggregated record, leading to
* incorrect shard iteration if the iterator had to be refreshed.
*
* @param shardItr shard iterator to use
* @param maxNumberOfRecords the maximum number of records to fetch for this getRecords attempt
* @return get records result
* @throws InterruptedException
*/
private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) throws InterruptedException {
GetRecordsResult getRecordsResult = null;
while (getRecordsResult == null) {
try {
getRecordsResult = kinesis.getRecords(shardItr, maxNumberOfRecords);
// Update millis behind latest so it gets reported by the millisBehindLatest gauge
shardMetricsReporter.setMillisBehindLatest(getRecordsResult.getMillisBehindLatest());
} catch (ExpiredIteratorException eiEx) {
LOG.warn("Encountered an unexpected expired iterator {} for shard {};" +
" refreshing the iterator ...", shardItr, subscribedShard);
shardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber());
// sleep for the fetch interval before the next getRecords attempt with the refreshed iterator
if (fetchIntervalMillis != 0) {
Thread.sleep(fetchIntervalMillis);
}
}
}
return getRecordsResult;
}
开发者ID:axbaretto,项目名称:flink,代码行数:38,代码来源:ShardConsumer.java
示例5: noShardsFoundForRequestedStreamsBehaviour
import com.amazonaws.services.kinesis.model.GetRecordsResult; //导入依赖的package包/类
public static KinesisProxyInterface noShardsFoundForRequestedStreamsBehaviour() {
return new KinesisProxyInterface() {
@Override
public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSeenShardIds) {
return new GetShardListResult(); // not setting any retrieved shards for result
}
@Override
public String getShardIterator(StreamShardHandle shard, String shardIteratorType, Object startingMarker) {
return null;
}
@Override
public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) {
return null;
}
};
}
开发者ID:axbaretto,项目名称:flink,代码行数:21,代码来源:FakeKinesisBehavioursFactory.java
示例6: getRecords
import com.amazonaws.services.kinesis.model.GetRecordsResult; //导入依赖的package包/类
@Override
public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) {
if ((Integer.valueOf(shardIterator) == orderOfCallToExpire - 1) && !expiredOnceAlready) {
// we fake only once the expired iterator exception at the specified get records attempt order
expiredOnceAlready = true;
throw new ExpiredIteratorException("Artificial expired shard iterator");
} else if (expiredOnceAlready && !expiredIteratorRefreshed) {
// if we've thrown the expired iterator exception already, but the iterator was not refreshed,
// throw a hard exception to the test that is testing this Kinesis behaviour
throw new RuntimeException("expired shard iterator was not refreshed on the next getRecords() call");
} else {
// assuming that the maxRecordsToGet is always large enough
return new GetRecordsResult()
.withRecords(shardItrToRecordBatch.get(shardIterator))
.withMillisBehindLatest(millisBehindLatest)
.withNextShardIterator(
(Integer.valueOf(shardIterator) == totalNumOfGetRecordsCalls - 1)
? null : String.valueOf(Integer.valueOf(shardIterator) + 1)); // last next shard iterator is null
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:21,代码来源:FakeKinesisBehavioursFactory.java
示例7: getRecords
import com.amazonaws.services.kinesis.model.GetRecordsResult; //导入依赖的package包/类
/**
* Calls {@link KinesisProxyInterface#getRecords(String, int)}, while also handling unexpected
* AWS {@link ExpiredIteratorException}s to assure that we get results and don't just fail on
* such occasions. The returned shard iterator within the successful {@link GetRecordsResult} should
* be used for the next call to this method.
*
* Note: it is important that this method is not called again before all the records from the last result have been
* fully collected with {@link ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, otherwise
* {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in the middle of an aggregated record, leading to
* incorrect shard iteration if the iterator had to be refreshed.
*
* @param shardItr shard iterator to use
* @param maxNumberOfRecords the maximum number of records to fetch for this getRecords attempt
* @return get records result
* @throws InterruptedException
*/
private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) throws InterruptedException {
GetRecordsResult getRecordsResult = null;
while (getRecordsResult == null) {
try {
getRecordsResult = kinesis.getRecords(shardItr, maxNumberOfRecords);
} catch (ExpiredIteratorException eiEx) {
LOG.warn("Encountered an unexpected expired iterator {} for shard {};" +
" refreshing the iterator ...", shardItr, subscribedShard);
shardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber());
// sleep for the fetch interval before the next getRecords attempt with the refreshed iterator
if (fetchIntervalMillis != 0) {
Thread.sleep(fetchIntervalMillis);
}
}
}
return getRecordsResult;
}
开发者ID:axbaretto,项目名称:flink,代码行数:35,代码来源:ShardConsumer.java
示例8: noShardsFoundForRequestedStreamsBehaviour
import com.amazonaws.services.kinesis.model.GetRecordsResult; //导入依赖的package包/类
public static KinesisProxyInterface noShardsFoundForRequestedStreamsBehaviour() {
return new KinesisProxyInterface() {
@Override
public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSeenShardIds) {
return new GetShardListResult(); // not setting any retrieved shards for result
}
@Override
public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, String startingSeqNum) {
return null;
}
@Override
public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) {
return null;
}
};
}
开发者ID:axbaretto,项目名称:flink,代码行数:21,代码来源:FakeKinesisBehavioursFactory.java
示例9: getRecords
import com.amazonaws.services.kinesis.model.GetRecordsResult; //导入依赖的package包/类
@Override
public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) {
if ((Integer.valueOf(shardIterator) == orderOfCallToExpire-1) && !expiredOnceAlready) {
// we fake only once the expired iterator exception at the specified get records attempt order
expiredOnceAlready = true;
throw new ExpiredIteratorException("Artificial expired shard iterator");
} else if (expiredOnceAlready && !expiredIteratorRefreshed) {
// if we've thrown the expired iterator exception already, but the iterator was not refreshed,
// throw a hard exception to the test that is testing this Kinesis behaviour
throw new RuntimeException("expired shard iterator was not refreshed on the next getRecords() call");
} else {
// assuming that the maxRecordsToGet is always large enough
return new GetRecordsResult()
.withRecords(shardItrToRecordBatch.get(shardIterator))
.withNextShardIterator(
(Integer.valueOf(shardIterator) == totalNumOfGetRecordsCalls - 1)
? null : String.valueOf(Integer.valueOf(shardIterator) + 1)); // last next shard iterator is null
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:20,代码来源:FakeKinesisBehavioursFactory.java
示例10: poll
import com.amazonaws.services.kinesis.model.GetRecordsResult; //导入依赖的package包/类
@Override
protected int poll() throws Exception {
GetRecordsRequest req = new GetRecordsRequest()
.withShardIterator(getShardItertor())
.withLimit(getEndpoint().getMaxResultsPerRequest());
GetRecordsResult result = getClient().getRecords(req);
Queue<Exchange> exchanges = createExchanges(result.getRecords());
int processedExchangeCount = processBatch(CastUtils.cast(exchanges));
// May cache the last successful sequence number, and pass it to the
// getRecords request. That way, on the next poll, we start from where
// we left off, however, I don't know what happens to subsequent
// exchanges when an earlier echangee fails.
currentShardIterator = result.getNextShardIterator();
return processedExchangeCount;
}
开发者ID:HydAu,项目名称:Camel,代码行数:20,代码来源:KinesisConsumer.java
示例11: setup
import com.amazonaws.services.kinesis.model.GetRecordsResult; //导入依赖的package包/类
@Before
public void setup() throws Exception {
KinesisEndpoint endpoint = new KinesisEndpoint(null, "streamName", component);
endpoint.setAmazonKinesisClient(kinesisClient);
endpoint.setIteratorType(ShardIteratorType.LATEST);
undertest = new KinesisConsumer(endpoint, processor);
when(kinesisClient.getRecords(any(GetRecordsRequest.class)))
.thenReturn(new GetRecordsResult()
.withNextShardIterator("nextShardIterator")
);
when(kinesisClient.describeStream(any(DescribeStreamRequest.class)))
.thenReturn(new DescribeStreamResult()
.withStreamDescription(new StreamDescription()
.withShards(new Shard().withShardId("shardId"))
)
);
when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class)))
.thenReturn(new GetShardIteratorResult()
.withShardIterator("shardIterator")
);
}
开发者ID:HydAu,项目名称:Camel,代码行数:23,代码来源:KinesisConsumerTest.java
示例12: recordsAreSentToTheProcessor
import com.amazonaws.services.kinesis.model.GetRecordsResult; //导入依赖的package包/类
@Test
public void recordsAreSentToTheProcessor() throws Exception {
when(kinesisClient.getRecords(any(GetRecordsRequest.class)))
.thenReturn(new GetRecordsResult()
.withNextShardIterator("nextShardIterator")
.withRecords(new Record().withSequenceNumber("1"), new Record().withSequenceNumber("2"))
);
int messageCount = undertest.poll();
assertThat(messageCount, is(2));
final ArgumentCaptor<Exchange> exchangeCaptor = ArgumentCaptor.forClass(Exchange.class);
verify(processor, times(2)).process(exchangeCaptor.capture(), any(AsyncCallback.class));
assertThat(exchangeCaptor.getAllValues().get(0).getIn().getBody(Record.class).getSequenceNumber(), is("1"));
assertThat(exchangeCaptor.getAllValues().get(1).getIn().getBody(Record.class).getSequenceNumber(), is("2"));
}
开发者ID:HydAu,项目名称:Camel,代码行数:18,代码来源:KinesisConsumerTest.java
示例13: exchangePropertiesAreSet
import com.amazonaws.services.kinesis.model.GetRecordsResult; //导入依赖的package包/类
@Test
public void exchangePropertiesAreSet() throws Exception {
String partitionKey = "partitionKey";
String sequenceNumber = "1";
when(kinesisClient.getRecords(any(GetRecordsRequest.class)))
.thenReturn(new GetRecordsResult()
.withNextShardIterator("nextShardIterator")
.withRecords(new Record()
.withSequenceNumber(sequenceNumber)
.withApproximateArrivalTimestamp(new Date(42))
.withPartitionKey(partitionKey)
)
);
undertest.poll();
final ArgumentCaptor<Exchange> exchangeCaptor = ArgumentCaptor.forClass(Exchange.class);
verify(processor).process(exchangeCaptor.capture(), any(AsyncCallback.class));
assertThat(exchangeCaptor.getValue().getIn().getHeader(KinesisConstants.APPROX_ARRIVAL_TIME, long.class), is(42L));
assertThat(exchangeCaptor.getValue().getIn().getHeader(KinesisConstants.PARTITION_KEY, String.class), is(partitionKey));
assertThat(exchangeCaptor.getValue().getIn().getHeader(KinesisConstants.SEQUENCE_NUMBER, String.class), is(sequenceNumber));
}
开发者ID:HydAu,项目名称:Camel,代码行数:24,代码来源:KinesisConsumerTest.java
示例14: processNextIterator
import com.amazonaws.services.kinesis.model.GetRecordsResult; //导入依赖的package包/类
public String processNextIterator(String iterator)
{
GetRecordsRequest getRequest = new GetRecordsRequest();
getRequest.setLimit(1000);
getRequest.setShardIterator(iterator);
// call "get" operation and get everything in this shard range
GetRecordsResult getResponse = client.getRecords(getRequest);
iterator = getResponse.getNextShardIterator();
List<Record> records = getResponse.getRecords();
processResponseRecords(records);
return iterator;
}
开发者ID:apache,项目名称:apex-malhar,代码行数:17,代码来源:KinesisTestConsumer.java
示例15: getPreviewRecords
import com.amazonaws.services.kinesis.model.GetRecordsResult; //导入依赖的package包/类
public static List<com.amazonaws.services.kinesis.model.Record> getPreviewRecords(
ClientConfiguration awsClientConfig,
KinesisConfigBean conf,
int maxBatchSize,
GetShardIteratorRequest getShardIteratorRequest
) throws StageException {
AmazonKinesis kinesisClient = getKinesisClient(awsClientConfig, conf);
GetShardIteratorResult getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest);
String shardIterator = getShardIteratorResult.getShardIterator();
GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
getRecordsRequest.setShardIterator(shardIterator);
getRecordsRequest.setLimit(maxBatchSize);
GetRecordsResult getRecordsResult = kinesisClient.getRecords(getRecordsRequest);
return getRecordsResult.getRecords();
}
开发者ID:streamsets,项目名称:datacollector,代码行数:19,代码来源:KinesisUtil.java
示例16: poll
import com.amazonaws.services.kinesis.model.GetRecordsResult; //导入依赖的package包/类
@Override
public List<SourceRecord> poll() throws InterruptedException {
List<SourceRecord> records;
try {
GetRecordsResult recordsResult = this.kinesisClient.getRecords(this.recordsRequest);
records = new ArrayList<>(recordsResult.getRecords().size());
log.trace("poll() - {} record(s) returned from shard {}.", this.config.kinesisShardId);
for (Record record : recordsResult.getRecords()) {
SourceRecord sourceRecord = this.recordConverter.sourceRecord(this.config.kinesisStreamName, this.config.kinesisShardId, record);
records.add(sourceRecord);
}
log.trace("poll() - Changing shard iterator to {}", recordsResult.getNextShardIterator());
this.recordsRequest.setShardIterator(recordsResult.getNextShardIterator());
} catch (ProvisionedThroughputExceededException ex) {
log.warn("poll() - Throughput exceeded sleeping {} ms", this.config.kinesisThroughputExceededBackoffMs, ex);
this.time.sleep(this.config.kinesisThroughputExceededBackoffMs);
return new ArrayList<>();
}
if (records.isEmpty()) {
log.trace("poll() - No records returned. Sleeping {} ms.", this.config.kinesisEmptyRecordsBackoffMs);
this.time.sleep(this.config.kinesisEmptyRecordsBackoffMs);
}
return records;
}
开发者ID:jcustenborder,项目名称:kafka-connect-kinesis,代码行数:30,代码来源:KinesisSourceTask.java
示例17: sourceOffsets
import com.amazonaws.services.kinesis.model.GetRecordsResult; //导入依赖的package包/类
@Test
public void sourceOffsets() throws InterruptedException {
final String SEQUENCE_NUMBER = "asdfasdfddsa";
Map<String, Object> sourceOffset = ImmutableMap.of(RecordConverter.FIELD_SEQUENCE_NUMBER, SEQUENCE_NUMBER);
when(this.offsetStorageReader.offset(anyMap())).thenReturn(sourceOffset);
when(this.kinesisClient.getShardIterator(any())).thenReturn(
new GetShardIteratorResult().withShardIterator("dfasdfsadfasdf")
);
this.task.start(settings);
GetRecordsResult recordsResult = new GetRecordsResult()
.withNextShardIterator("dsfargadsfasdfasda")
.withRecords(TestData.record())
.withMillisBehindLatest(0L);
when(this.kinesisClient.getRecords(any())).thenReturn(recordsResult);
List<SourceRecord> records = this.task.poll();
assertNotNull(records, "records should not be null.");
assertFalse(records.isEmpty(), "records should not be empty.");
verify(this.offsetStorageReader, atLeastOnce()).offset(anyMap());
GetShardIteratorRequest expectedIteratorRequest = new GetShardIteratorRequest()
.withShardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER)
.withShardId(this.config.kinesisShardId)
.withStreamName(this.config.kinesisStreamName)
.withStartingSequenceNumber(SEQUENCE_NUMBER);
verify(this.kinesisClient, atLeastOnce()).getShardIterator(expectedIteratorRequest);
}
开发者ID:jcustenborder,项目名称:kafka-connect-kinesis,代码行数:33,代码来源:KinesisSourceTaskTest.java
示例18: getRecords
import com.amazonaws.services.kinesis.model.GetRecordsResult; //导入依赖的package包/类
@Override
public GetRecordsResult getRecords(GetRecordsRequest getRecordsRequest) {
String[] shardIteratorParts = getRecordsRequest.getShardIterator().split(":");
int shardId = parseInt(shardIteratorParts[0]);
int startingRecord = parseInt(shardIteratorParts[1]);
List<Record> shardData = shardedData.get(shardId);
int toIndex = min(startingRecord + numberOfRecordsPerGet, shardData.size());
int fromIndex = min(startingRecord, toIndex);
return new GetRecordsResult()
.withRecords(shardData.subList(fromIndex, toIndex))
.withNextShardIterator(String.format("%s:%s", shardId, toIndex))
.withMillisBehindLatest(0L);
}
开发者ID:apache,项目名称:beam,代码行数:15,代码来源:AmazonKinesisMock.java
示例19: getRecords
import com.amazonaws.services.kinesis.model.GetRecordsResult; //导入依赖的package包/类
/**
* {@inheritDoc}
*/
@Override
public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) throws InterruptedException {
final GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
getRecordsRequest.setShardIterator(shardIterator);
getRecordsRequest.setLimit(maxRecordsToGet);
GetRecordsResult getRecordsResult = null;
int attempt = 0;
while (attempt <= getRecordsMaxAttempts && getRecordsResult == null) {
try {
getRecordsResult = kinesisClient.getRecords(getRecordsRequest);
} catch (AmazonServiceException ex) {
if (isRecoverableException(ex)) {
long backoffMillis = fullJitterBackoff(
getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++);
LOG.warn("Got recoverable AmazonServiceException. Backing off for "
+ backoffMillis + " millis (" + ex.getErrorMessage() + ")");
Thread.sleep(backoffMillis);
} else {
throw ex;
}
}
}
if (getRecordsResult == null) {
throw new RuntimeException("Rate Exceeded for getRecords operation - all " + getRecordsMaxAttempts +
" retry attempts returned ProvisionedThroughputExceededException.");
}
return getRecordsResult;
}
开发者ID:axbaretto,项目名称:flink,代码行数:36,代码来源:KinesisProxy.java
示例20: getRecords
import com.amazonaws.services.kinesis.model.GetRecordsResult; //导入依赖的package包/类
/**
* {@inheritDoc}
*/
@Override
public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) throws InterruptedException {
final GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
getRecordsRequest.setShardIterator(shardIterator);
getRecordsRequest.setLimit(maxRecordsToGet);
GetRecordsResult getRecordsResult = null;
int attempt = 0;
while (attempt <= getRecordsMaxAttempts && getRecordsResult == null) {
try {
getRecordsResult = kinesisClient.getRecords(getRecordsRequest);
} catch (ProvisionedThroughputExceededException ex) {
long backoffMillis = fullJitterBackoff(
getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++);
LOG.warn("Got ProvisionedThroughputExceededException. Backing off for "
+ backoffMillis + " millis.");
Thread.sleep(backoffMillis);
}
}
if (getRecordsResult == null) {
throw new RuntimeException("Rate Exceeded for getRecords operation - all " + getRecordsMaxAttempts +
" retry attempts returned ProvisionedThroughputExceededException.");
}
return getRecordsResult;
}
开发者ID:axbaretto,项目名称:flink,代码行数:32,代码来源:KinesisProxy.java
注:本文中的com.amazonaws.services.kinesis.model.GetRecordsResult类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论