• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java GetRecordsResult类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中com.amazonaws.services.kinesis.model.GetRecordsResult的典型用法代码示例。如果您正苦于以下问题:Java GetRecordsResult类的具体用法?Java GetRecordsResult怎么用?Java GetRecordsResult使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



GetRecordsResult类属于com.amazonaws.services.kinesis.model包,在下文中一共展示了GetRecordsResult类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: noSourceOffsets

import com.amazonaws.services.kinesis.model.GetRecordsResult; //导入依赖的package包/类
@Test
public void noSourceOffsets() throws InterruptedException {
  when(this.kinesisClient.getShardIterator(any())).thenReturn(
      new GetShardIteratorResult().withShardIterator("dfasdfsadfasdf")
  );
  this.task.start(settings);

  GetRecordsResult recordsResult = new GetRecordsResult()
      .withNextShardIterator("dsfargadsfasdfasda")
      .withRecords(TestData.record())
      .withMillisBehindLatest(0L);

  when(this.kinesisClient.getRecords(any())).thenReturn(recordsResult);

  List<SourceRecord> records = this.task.poll();

  assertNotNull(records, "records should not be null.");
  assertFalse(records.isEmpty(), "records should not be empty.");

  verify(this.kinesisClient, atLeastOnce()).getShardIterator(any());
  verify(this.kinesisClient, atLeastOnce()).getRecords(any());
}
 
开发者ID:jcustenborder,项目名称:kafka-connect-kinesis,代码行数:23,代码来源:KinesisSourceTaskTest.java


示例2: noRecords

import com.amazonaws.services.kinesis.model.GetRecordsResult; //导入依赖的package包/类
@Test
public void noRecords() throws InterruptedException {
  final String SEQUENCE_NUMBER = "asdfasdfddsa";
  Map<String, Object> sourceOffset = ImmutableMap.of(RecordConverter.FIELD_SEQUENCE_NUMBER, SEQUENCE_NUMBER);
  when(this.offsetStorageReader.offset(anyMap())).thenReturn(sourceOffset);
  when(this.kinesisClient.getShardIterator(any())).thenReturn(
      new GetShardIteratorResult().withShardIterator("dfasdfsadfasdf")
  );
  this.task.start(settings);

  GetRecordsResult recordsResult = new GetRecordsResult()
      .withNextShardIterator("dsfargadsfasdfasda")
      .withRecords(Arrays.asList())
      .withMillisBehindLatest(0L);

  when(this.kinesisClient.getRecords(any())).thenReturn(recordsResult);

  List<SourceRecord> records = this.task.poll();
  assertNotNull(records, "records should not be null");
  assertTrue(records.isEmpty(), "records should be empty.");

  verify(this.task.time, atLeastOnce()).sleep(this.config.kinesisEmptyRecordsBackoffMs);
}
 
开发者ID:jcustenborder,项目名称:kafka-connect-kinesis,代码行数:24,代码来源:KinesisSourceTaskTest.java


示例3: getRecords

import com.amazonaws.services.kinesis.model.GetRecordsResult; //导入依赖的package包/类
/**
 * Gets records from Kinesis and deaggregates them if needed.
 *
 * @return list of deaggregated records
 * @throws TransientKinesisException - in case of recoverable situation
 */
public GetKinesisRecordsResult getRecords(final String shardIterator, final String streamName,
    final String shardId, final Integer limit)
    throws
    TransientKinesisException {
  return wrapExceptions(new Callable<GetKinesisRecordsResult>() {

    @Override
    public GetKinesisRecordsResult call() throws Exception {
      GetRecordsResult response = kinesis.getRecords(new GetRecordsRequest()
          .withShardIterator(shardIterator)
          .withLimit(limit));
      return new GetKinesisRecordsResult(
          UserRecord.deaggregate(response.getRecords()),
          response.getNextShardIterator(),
          response.getMillisBehindLatest(),
          streamName, shardId);
    }
  });
}
 
开发者ID:apache,项目名称:beam,代码行数:26,代码来源:SimplifiedKinesisClient.java


示例4: getRecords

import com.amazonaws.services.kinesis.model.GetRecordsResult; //导入依赖的package包/类
/**
 * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while also handling unexpected
 * AWS {@link ExpiredIteratorException}s to assure that we get results and don't just fail on
 * such occasions. The returned shard iterator within the successful {@link GetRecordsResult} should
 * be used for the next call to this method.
 *
 * <p>Note: it is important that this method is not called again before all the records from the last result have been
 * fully collected with {@link ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, otherwise
 * {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in the middle of an aggregated record, leading to
 * incorrect shard iteration if the iterator had to be refreshed.
 *
 * @param shardItr shard iterator to use
 * @param maxNumberOfRecords the maximum number of records to fetch for this getRecords attempt
 * @return get records result
 * @throws InterruptedException
 */
private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) throws InterruptedException {
	GetRecordsResult getRecordsResult = null;
	while (getRecordsResult == null) {
		try {
			getRecordsResult = kinesis.getRecords(shardItr, maxNumberOfRecords);

			// Update millis behind latest so it gets reported by the millisBehindLatest gauge
			shardMetricsReporter.setMillisBehindLatest(getRecordsResult.getMillisBehindLatest());
		} catch (ExpiredIteratorException eiEx) {
			LOG.warn("Encountered an unexpected expired iterator {} for shard {};" +
				" refreshing the iterator ...", shardItr, subscribedShard);
			shardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber());

			// sleep for the fetch interval before the next getRecords attempt with the refreshed iterator
			if (fetchIntervalMillis != 0) {
				Thread.sleep(fetchIntervalMillis);
			}
		}
	}
	return getRecordsResult;
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:38,代码来源:ShardConsumer.java


示例5: noShardsFoundForRequestedStreamsBehaviour

import com.amazonaws.services.kinesis.model.GetRecordsResult; //导入依赖的package包/类
public static KinesisProxyInterface noShardsFoundForRequestedStreamsBehaviour() {

		return new KinesisProxyInterface() {
			@Override
			public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSeenShardIds) {
				return new GetShardListResult(); // not setting any retrieved shards for result
			}

			@Override
			public String getShardIterator(StreamShardHandle shard, String shardIteratorType, Object startingMarker) {
				return null;
			}

			@Override
			public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) {
				return null;
			}
		};

	}
 
开发者ID:axbaretto,项目名称:flink,代码行数:21,代码来源:FakeKinesisBehavioursFactory.java


示例6: getRecords

import com.amazonaws.services.kinesis.model.GetRecordsResult; //导入依赖的package包/类
@Override
public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) {
	if ((Integer.valueOf(shardIterator) == orderOfCallToExpire - 1) && !expiredOnceAlready) {
		// we fake only once the expired iterator exception at the specified get records attempt order
		expiredOnceAlready = true;
		throw new ExpiredIteratorException("Artificial expired shard iterator");
	} else if (expiredOnceAlready && !expiredIteratorRefreshed) {
		// if we've thrown the expired iterator exception already, but the iterator was not refreshed,
		// throw a hard exception to the test that is testing this Kinesis behaviour
		throw new RuntimeException("expired shard iterator was not refreshed on the next getRecords() call");
	} else {
		// assuming that the maxRecordsToGet is always large enough
		return new GetRecordsResult()
			.withRecords(shardItrToRecordBatch.get(shardIterator))
			.withMillisBehindLatest(millisBehindLatest)
			.withNextShardIterator(
				(Integer.valueOf(shardIterator) == totalNumOfGetRecordsCalls - 1)
					? null : String.valueOf(Integer.valueOf(shardIterator) + 1)); // last next shard iterator is null
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:21,代码来源:FakeKinesisBehavioursFactory.java


示例7: getRecords

import com.amazonaws.services.kinesis.model.GetRecordsResult; //导入依赖的package包/类
/**
 * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while also handling unexpected
 * AWS {@link ExpiredIteratorException}s to assure that we get results and don't just fail on
 * such occasions. The returned shard iterator within the successful {@link GetRecordsResult} should
 * be used for the next call to this method.
 *
 * Note: it is important that this method is not called again before all the records from the last result have been
 * fully collected with {@link ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, otherwise
 * {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in the middle of an aggregated record, leading to
 * incorrect shard iteration if the iterator had to be refreshed.
 *
 * @param shardItr shard iterator to use
 * @param maxNumberOfRecords the maximum number of records to fetch for this getRecords attempt
 * @return get records result
 * @throws InterruptedException
 */
private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) throws InterruptedException {
	GetRecordsResult getRecordsResult = null;
	while (getRecordsResult == null) {
		try {
			getRecordsResult = kinesis.getRecords(shardItr, maxNumberOfRecords);
		} catch (ExpiredIteratorException eiEx) {
			LOG.warn("Encountered an unexpected expired iterator {} for shard {};" +
				" refreshing the iterator ...", shardItr, subscribedShard);
			shardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber());

			// sleep for the fetch interval before the next getRecords attempt with the refreshed iterator
			if (fetchIntervalMillis != 0) {
				Thread.sleep(fetchIntervalMillis);
			}
		}
	}
	return getRecordsResult;
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:35,代码来源:ShardConsumer.java


示例8: noShardsFoundForRequestedStreamsBehaviour

import com.amazonaws.services.kinesis.model.GetRecordsResult; //导入依赖的package包/类
public static KinesisProxyInterface noShardsFoundForRequestedStreamsBehaviour() {

		return new KinesisProxyInterface() {
			@Override
			public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSeenShardIds) {
				return new GetShardListResult(); // not setting any retrieved shards for result
			}

			@Override
			public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, String startingSeqNum) {
				return null;
			}

			@Override
			public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) {
				return null;
			}
		};

	}
 
开发者ID:axbaretto,项目名称:flink,代码行数:21,代码来源:FakeKinesisBehavioursFactory.java


示例9: getRecords

import com.amazonaws.services.kinesis.model.GetRecordsResult; //导入依赖的package包/类
@Override
public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) {
	if ((Integer.valueOf(shardIterator) == orderOfCallToExpire-1) && !expiredOnceAlready) {
		// we fake only once the expired iterator exception at the specified get records attempt order
		expiredOnceAlready = true;
		throw new ExpiredIteratorException("Artificial expired shard iterator");
	} else if (expiredOnceAlready && !expiredIteratorRefreshed) {
		// if we've thrown the expired iterator exception already, but the iterator was not refreshed,
		// throw a hard exception to the test that is testing this Kinesis behaviour
		throw new RuntimeException("expired shard iterator was not refreshed on the next getRecords() call");
	} else {
		// assuming that the maxRecordsToGet is always large enough
		return new GetRecordsResult()
			.withRecords(shardItrToRecordBatch.get(shardIterator))
			.withNextShardIterator(
				(Integer.valueOf(shardIterator) == totalNumOfGetRecordsCalls - 1)
					? null : String.valueOf(Integer.valueOf(shardIterator) + 1)); // last next shard iterator is null
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:20,代码来源:FakeKinesisBehavioursFactory.java


示例10: poll

import com.amazonaws.services.kinesis.model.GetRecordsResult; //导入依赖的package包/类
@Override
protected int poll() throws Exception {
    GetRecordsRequest req = new GetRecordsRequest()
            .withShardIterator(getShardItertor())
            .withLimit(getEndpoint().getMaxResultsPerRequest());
    GetRecordsResult result = getClient().getRecords(req);

    Queue<Exchange> exchanges = createExchanges(result.getRecords());
    int processedExchangeCount = processBatch(CastUtils.cast(exchanges));

    // May cache the last successful sequence number, and pass it to the
    // getRecords request. That way, on the next poll, we start from where
    // we left off, however, I don't know what happens to subsequent
    // exchanges when an earlier echangee fails.

    currentShardIterator = result.getNextShardIterator();

    return processedExchangeCount;
}
 
开发者ID:HydAu,项目名称:Camel,代码行数:20,代码来源:KinesisConsumer.java


示例11: setup

import com.amazonaws.services.kinesis.model.GetRecordsResult; //导入依赖的package包/类
@Before
public void setup() throws Exception {
    KinesisEndpoint endpoint = new KinesisEndpoint(null, "streamName", component);
    endpoint.setAmazonKinesisClient(kinesisClient);
    endpoint.setIteratorType(ShardIteratorType.LATEST);
    undertest = new KinesisConsumer(endpoint, processor);

    when(kinesisClient.getRecords(any(GetRecordsRequest.class)))
        .thenReturn(new GetRecordsResult()
            .withNextShardIterator("nextShardIterator")
        );
    when(kinesisClient.describeStream(any(DescribeStreamRequest.class)))
        .thenReturn(new DescribeStreamResult()
            .withStreamDescription(new StreamDescription()
                .withShards(new Shard().withShardId("shardId"))
            )
        );
    when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class)))
        .thenReturn(new GetShardIteratorResult()
            .withShardIterator("shardIterator")
        );
}
 
开发者ID:HydAu,项目名称:Camel,代码行数:23,代码来源:KinesisConsumerTest.java


示例12: recordsAreSentToTheProcessor

import com.amazonaws.services.kinesis.model.GetRecordsResult; //导入依赖的package包/类
@Test
public void recordsAreSentToTheProcessor() throws Exception {
    when(kinesisClient.getRecords(any(GetRecordsRequest.class)))
        .thenReturn(new GetRecordsResult()
            .withNextShardIterator("nextShardIterator")
            .withRecords(new Record().withSequenceNumber("1"), new Record().withSequenceNumber("2"))
        );

    int messageCount = undertest.poll();

    assertThat(messageCount, is(2));
    final ArgumentCaptor<Exchange> exchangeCaptor = ArgumentCaptor.forClass(Exchange.class);

    verify(processor, times(2)).process(exchangeCaptor.capture(), any(AsyncCallback.class));
    assertThat(exchangeCaptor.getAllValues().get(0).getIn().getBody(Record.class).getSequenceNumber(), is("1"));
    assertThat(exchangeCaptor.getAllValues().get(1).getIn().getBody(Record.class).getSequenceNumber(), is("2"));
}
 
开发者ID:HydAu,项目名称:Camel,代码行数:18,代码来源:KinesisConsumerTest.java


示例13: exchangePropertiesAreSet

import com.amazonaws.services.kinesis.model.GetRecordsResult; //导入依赖的package包/类
@Test
public void exchangePropertiesAreSet() throws Exception {
    String partitionKey = "partitionKey";
    String sequenceNumber = "1";
    when(kinesisClient.getRecords(any(GetRecordsRequest.class)))
        .thenReturn(new GetRecordsResult()
            .withNextShardIterator("nextShardIterator")
            .withRecords(new Record()
                .withSequenceNumber(sequenceNumber)
                .withApproximateArrivalTimestamp(new Date(42))
                .withPartitionKey(partitionKey)
            )
        );

    undertest.poll();

    final ArgumentCaptor<Exchange> exchangeCaptor = ArgumentCaptor.forClass(Exchange.class);

    verify(processor).process(exchangeCaptor.capture(), any(AsyncCallback.class));
    assertThat(exchangeCaptor.getValue().getIn().getHeader(KinesisConstants.APPROX_ARRIVAL_TIME, long.class), is(42L));
    assertThat(exchangeCaptor.getValue().getIn().getHeader(KinesisConstants.PARTITION_KEY, String.class), is(partitionKey));
    assertThat(exchangeCaptor.getValue().getIn().getHeader(KinesisConstants.SEQUENCE_NUMBER, String.class), is(sequenceNumber));
}
 
开发者ID:HydAu,项目名称:Camel,代码行数:24,代码来源:KinesisConsumerTest.java


示例14: processNextIterator

import com.amazonaws.services.kinesis.model.GetRecordsResult; //导入依赖的package包/类
public String processNextIterator(String iterator)
{
  GetRecordsRequest getRequest = new GetRecordsRequest();
  getRequest.setLimit(1000);

  getRequest.setShardIterator(iterator);
  // call "get" operation and get everything in this shard range
  GetRecordsResult getResponse = client.getRecords(getRequest);

  iterator = getResponse.getNextShardIterator();

  List<Record> records = getResponse.getRecords();
  processResponseRecords(records);

  return iterator;
}
 
开发者ID:apache,项目名称:apex-malhar,代码行数:17,代码来源:KinesisTestConsumer.java


示例15: getPreviewRecords

import com.amazonaws.services.kinesis.model.GetRecordsResult; //导入依赖的package包/类
public static List<com.amazonaws.services.kinesis.model.Record> getPreviewRecords(
    ClientConfiguration awsClientConfig,
    KinesisConfigBean conf,
    int maxBatchSize,
    GetShardIteratorRequest getShardIteratorRequest
) throws StageException {
  AmazonKinesis kinesisClient = getKinesisClient(awsClientConfig, conf);

  GetShardIteratorResult getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest);
  String shardIterator = getShardIteratorResult.getShardIterator();

  GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
  getRecordsRequest.setShardIterator(shardIterator);
  getRecordsRequest.setLimit(maxBatchSize);

  GetRecordsResult getRecordsResult = kinesisClient.getRecords(getRecordsRequest);
  return getRecordsResult.getRecords();
}
 
开发者ID:streamsets,项目名称:datacollector,代码行数:19,代码来源:KinesisUtil.java


示例16: poll

import com.amazonaws.services.kinesis.model.GetRecordsResult; //导入依赖的package包/类
@Override
public List<SourceRecord> poll() throws InterruptedException {
  List<SourceRecord> records;

  try {
    GetRecordsResult recordsResult = this.kinesisClient.getRecords(this.recordsRequest);
    records = new ArrayList<>(recordsResult.getRecords().size());
    log.trace("poll() - {} record(s) returned from shard {}.", this.config.kinesisShardId);

    for (Record record : recordsResult.getRecords()) {
      SourceRecord sourceRecord = this.recordConverter.sourceRecord(this.config.kinesisStreamName, this.config.kinesisShardId, record);
      records.add(sourceRecord);
    }

    log.trace("poll() - Changing shard iterator to {}", recordsResult.getNextShardIterator());
    this.recordsRequest.setShardIterator(recordsResult.getNextShardIterator());
  } catch (ProvisionedThroughputExceededException ex) {
    log.warn("poll() - Throughput exceeded sleeping {} ms", this.config.kinesisThroughputExceededBackoffMs, ex);
    this.time.sleep(this.config.kinesisThroughputExceededBackoffMs);
    return new ArrayList<>();
  }

  if (records.isEmpty()) {
    log.trace("poll() - No records returned. Sleeping {} ms.", this.config.kinesisEmptyRecordsBackoffMs);
    this.time.sleep(this.config.kinesisEmptyRecordsBackoffMs);
  }

  return records;
}
 
开发者ID:jcustenborder,项目名称:kafka-connect-kinesis,代码行数:30,代码来源:KinesisSourceTask.java


示例17: sourceOffsets

import com.amazonaws.services.kinesis.model.GetRecordsResult; //导入依赖的package包/类
@Test
public void sourceOffsets() throws InterruptedException {
  final String SEQUENCE_NUMBER = "asdfasdfddsa";
  Map<String, Object> sourceOffset = ImmutableMap.of(RecordConverter.FIELD_SEQUENCE_NUMBER, SEQUENCE_NUMBER);
  when(this.offsetStorageReader.offset(anyMap())).thenReturn(sourceOffset);
  when(this.kinesisClient.getShardIterator(any())).thenReturn(
      new GetShardIteratorResult().withShardIterator("dfasdfsadfasdf")
  );
  this.task.start(settings);

  GetRecordsResult recordsResult = new GetRecordsResult()
      .withNextShardIterator("dsfargadsfasdfasda")
      .withRecords(TestData.record())
      .withMillisBehindLatest(0L);

  when(this.kinesisClient.getRecords(any())).thenReturn(recordsResult);

  List<SourceRecord> records = this.task.poll();

  assertNotNull(records, "records should not be null.");
  assertFalse(records.isEmpty(), "records should not be empty.");

  verify(this.offsetStorageReader, atLeastOnce()).offset(anyMap());

  GetShardIteratorRequest expectedIteratorRequest = new GetShardIteratorRequest()
      .withShardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER)
      .withShardId(this.config.kinesisShardId)
      .withStreamName(this.config.kinesisStreamName)
      .withStartingSequenceNumber(SEQUENCE_NUMBER);

  verify(this.kinesisClient, atLeastOnce()).getShardIterator(expectedIteratorRequest);
}
 
开发者ID:jcustenborder,项目名称:kafka-connect-kinesis,代码行数:33,代码来源:KinesisSourceTaskTest.java


示例18: getRecords

import com.amazonaws.services.kinesis.model.GetRecordsResult; //导入依赖的package包/类
@Override
public GetRecordsResult getRecords(GetRecordsRequest getRecordsRequest) {
  String[] shardIteratorParts = getRecordsRequest.getShardIterator().split(":");
  int shardId = parseInt(shardIteratorParts[0]);
  int startingRecord = parseInt(shardIteratorParts[1]);
  List<Record> shardData = shardedData.get(shardId);

  int toIndex = min(startingRecord + numberOfRecordsPerGet, shardData.size());
  int fromIndex = min(startingRecord, toIndex);
  return new GetRecordsResult()
      .withRecords(shardData.subList(fromIndex, toIndex))
      .withNextShardIterator(String.format("%s:%s", shardId, toIndex))
      .withMillisBehindLatest(0L);
}
 
开发者ID:apache,项目名称:beam,代码行数:15,代码来源:AmazonKinesisMock.java


示例19: getRecords

import com.amazonaws.services.kinesis.model.GetRecordsResult; //导入依赖的package包/类
/**
 * {@inheritDoc}
 */
@Override
public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) throws InterruptedException {
	final GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
	getRecordsRequest.setShardIterator(shardIterator);
	getRecordsRequest.setLimit(maxRecordsToGet);

	GetRecordsResult getRecordsResult = null;

	int attempt = 0;
	while (attempt <= getRecordsMaxAttempts && getRecordsResult == null) {
		try {
			getRecordsResult = kinesisClient.getRecords(getRecordsRequest);
		} catch (AmazonServiceException ex) {
			if (isRecoverableException(ex)) {
				long backoffMillis = fullJitterBackoff(
					getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++);
				LOG.warn("Got recoverable AmazonServiceException. Backing off for "
					+ backoffMillis + " millis (" + ex.getErrorMessage() + ")");
				Thread.sleep(backoffMillis);
			} else {
				throw ex;
			}
		}
	}

	if (getRecordsResult == null) {
		throw new RuntimeException("Rate Exceeded for getRecords operation - all " + getRecordsMaxAttempts +
			" retry attempts returned ProvisionedThroughputExceededException.");
	}

	return getRecordsResult;
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:36,代码来源:KinesisProxy.java


示例20: getRecords

import com.amazonaws.services.kinesis.model.GetRecordsResult; //导入依赖的package包/类
/**
 * {@inheritDoc}
 */
@Override
public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) throws InterruptedException {
	final GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
	getRecordsRequest.setShardIterator(shardIterator);
	getRecordsRequest.setLimit(maxRecordsToGet);

	GetRecordsResult getRecordsResult = null;

	int attempt = 0;
	while (attempt <= getRecordsMaxAttempts && getRecordsResult == null) {
		try {
			getRecordsResult = kinesisClient.getRecords(getRecordsRequest);
		} catch (ProvisionedThroughputExceededException ex) {
			long backoffMillis = fullJitterBackoff(
				getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++);
			LOG.warn("Got ProvisionedThroughputExceededException. Backing off for "
				+ backoffMillis + " millis.");
			Thread.sleep(backoffMillis);
		}
	}

	if (getRecordsResult == null) {
		throw new RuntimeException("Rate Exceeded for getRecords operation - all " + getRecordsMaxAttempts +
			" retry attempts returned ProvisionedThroughputExceededException.");
	}

	return getRecordsResult;
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:32,代码来源:KinesisProxy.java



注:本文中的com.amazonaws.services.kinesis.model.GetRecordsResult类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java ReplicaInPipelineInterface类代码示例发布时间:2022-05-22
下一篇:
Java GetPreferredBlockSizeResponseProto类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap