本文整理汇总了Java中com.amazonaws.services.kinesis.producer.UserRecordResult类的典型用法代码示例。如果您正苦于以下问题:Java UserRecordResult类的具体用法?Java UserRecordResult怎么用?Java UserRecordResult使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
UserRecordResult类属于com.amazonaws.services.kinesis.producer包,在下文中一共展示了UserRecordResult类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: send
import com.amazonaws.services.kinesis.producer.UserRecordResult; //导入依赖的package包/类
public void send(String event) throws UnsupportedEncodingException {
byte[] bytes = event.getBytes("UTF-8");
this.metrics.queueEvent(bytes.length);
ByteBuffer data = ByteBuffer.wrap(bytes);
String partitionKey = getPartitionKey(event);
if (partitionKey != null) {
ListenableFuture<UserRecordResult> f = producer.addUserRecord(streamName, partitionKey, data);
Futures.addCallback(f, new FutureCallback<UserRecordResult>() {
@Override
public void onFailure(Throwable t) {
if (t instanceof UserRecordFailedException) {
Attempt last = Iterables.getLast(((UserRecordFailedException) t).getResult().getAttempts());
LOGGER.error(String.format("Record failed to put - %s : %s", last.getErrorCode(), last.getErrorMessage()));
}
LOGGER.error("Exception during put", t);
}
@Override
public void onSuccess(UserRecordResult result) {
metrics.ackEvent();
}
});
}
}
开发者ID:monetate,项目名称:koupler,代码行数:25,代码来源:KinesisEventProducer.java
示例2: put
import com.amazonaws.services.kinesis.producer.UserRecordResult; //导入依赖的package包/类
@Override
public void put(Collection<SinkRecord> sinkRecords) {
// If KinesisProducers cannot write to Kinesis Streams (because of
// connectivity issues, access issues
// or misconfigured shards we will pause consumption of messages till
// backlog is cleared
validateOutStandingRecords();
String partitionKey;
for (SinkRecord sinkRecord : sinkRecords) {
ListenableFuture<UserRecordResult> f;
// Kinesis does not allow empty partition key
if (sinkRecord.key() != null && !sinkRecord.key().toString().trim().equals("")) {
partitionKey = sinkRecord.key().toString().trim();
} else {
partitionKey = Integer.toString(sinkRecord.kafkaPartition());
}
if (singleKinesisProducerPerPartition)
f = addUserRecord(producerMap.get(sinkRecord.kafkaPartition() + "@" + sinkRecord.topic()), streamName,
partitionKey, usePartitionAsHashKey, sinkRecord);
else
f = addUserRecord(kinesisProducer, streamName, partitionKey, usePartitionAsHashKey, sinkRecord);
Futures.addCallback(f, callback);
}
}
开发者ID:awslabs,项目名称:kinesis-kafka-connector,代码行数:32,代码来源:AmazonKinesisSinkTask.java
示例3: addUserRecord
import com.amazonaws.services.kinesis.producer.UserRecordResult; //导入依赖的package包/类
private ListenableFuture<UserRecordResult> addUserRecord(KinesisProducer kp, String streamName, String partitionKey,
boolean usePartitionAsHashKey, SinkRecord sinkRecord) {
// If configured use kafka partition key as explicit hash key
// This will be useful when sending data from same partition into
// same shard
if (usePartitionAsHashKey)
return kp.addUserRecord(streamName, partitionKey, Integer.toString(sinkRecord.kafkaPartition()),
DataUtility.parseValue(sinkRecord.valueSchema(), sinkRecord.value()));
else
return kp.addUserRecord(streamName, partitionKey,
DataUtility.parseValue(sinkRecord.valueSchema(), sinkRecord.value()));
}
开发者ID:awslabs,项目名称:kinesis-kafka-connector,代码行数:15,代码来源:AmazonKinesisSinkTask.java
示例4: onSuccess
import com.amazonaws.services.kinesis.producer.UserRecordResult; //导入依赖的package包/类
@Override
public void onSuccess(UserRecordResult result) {
recordsCompleted.getAndIncrement();
if (recordsCompleted.get() % NUMBER_OF_ZOMBIES == 0) {
log.info(format("Records completed: %s; Shard: %s; SequenceNumber: %s.",
recordsCompleted.get(), result.getShardId(), result.getSequenceNumber()));
}
}
开发者ID:capside,项目名称:aws-kinesis-zombies,代码行数:10,代码来源:Drone.java
示例5: putNewRecord
import com.amazonaws.services.kinesis.producer.UserRecordResult; //导入依赖的package包/类
@SneakyThrows
public void putNewRecord(Zombie zombie) {
CoordinateUTM utm = zombie.getCurrentPosition();
CoordinateLatLon latLon = Datum.WGS84.utmToLatLon(utm);
ZombieLecture lect = new ZombieLecture(id, zombie.getId(), new Date(), latLon.getLat(), latLon.getLon());
utm.setAccuracy(RADIOUS);
String partitionKey = utm.getShortForm();
String json = mapper.writeValueAsString(lect);
ByteBuffer data = ByteBuffer.wrap(json.getBytes("UTF-8"));
ListenableFuture<UserRecordResult> f
= producer.addUserRecord(streamName, partitionKey, data);
Futures.addCallback(f, this.recordSentCallback);
}
开发者ID:capside,项目名称:aws-kinesis-zombies,代码行数:14,代码来源:Drone.java
示例6: open
import com.amazonaws.services.kinesis.producer.UserRecordResult; //导入依赖的package包/类
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// check and pass the configuration properties
KinesisProducerConfiguration producerConfig = KinesisConfigUtil.getValidatedProducerConfiguration(configProps);
producer = getKinesisProducer(producerConfig);
callback = new FutureCallback<UserRecordResult>() {
@Override
public void onSuccess(UserRecordResult result) {
if (!result.isSuccessful()) {
if (failOnError) {
// only remember the first thrown exception
if (thrownException == null) {
thrownException = new RuntimeException("Record was not sent successful");
}
} else {
LOG.warn("Record was not sent successful");
}
}
}
@Override
public void onFailure(Throwable t) {
if (failOnError) {
thrownException = t;
} else {
LOG.warn("An exception occurred while processing a record", t);
}
}
};
if (this.customPartitioner != null) {
this.customPartitioner.initialize(getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
}
LOG.info("Started Kinesis producer instance for region '{}'", producerConfig.getRegion());
}
开发者ID:axbaretto,项目名称:flink,代码行数:40,代码来源:FlinkKinesisProducer.java
示例7: invoke
import com.amazonaws.services.kinesis.producer.UserRecordResult; //导入依赖的package包/类
@Override
public void invoke(OUT value, Context context) throws Exception {
if (this.producer == null) {
throw new RuntimeException("Kinesis producer has been closed");
}
checkAndPropagateAsyncError();
String stream = defaultStream;
String partition = defaultPartition;
ByteBuffer serialized = schema.serialize(value);
// maybe set custom stream
String customStream = schema.getTargetStream(value);
if (customStream != null) {
stream = customStream;
}
String explicitHashkey = null;
// maybe set custom partition
if (customPartitioner != null) {
partition = customPartitioner.getPartitionId(value);
explicitHashkey = customPartitioner.getExplicitHashKey(value);
}
if (stream == null) {
if (failOnError) {
throw new RuntimeException("No target stream set");
} else {
LOG.warn("No target stream set. Skipping record");
return;
}
}
ListenableFuture<UserRecordResult> cb = producer.addUserRecord(stream, partition, explicitHashkey, serialized);
Futures.addCallback(cb, callback);
}
开发者ID:axbaretto,项目名称:flink,代码行数:39,代码来源:FlinkKinesisProducer.java
示例8: isAllRecordFuturesCompleted
import com.amazonaws.services.kinesis.producer.UserRecordResult; //导入依赖的package包/类
private boolean isAllRecordFuturesCompleted() {
for (SettableFuture<UserRecordResult> future : pendingRecordFutures) {
if (!future.isDone()) {
return false;
}
}
return true;
}
开发者ID:axbaretto,项目名称:flink,代码行数:10,代码来源:FlinkKinesisProducerTest.java
示例9: getNumPendingRecordFutures
import com.amazonaws.services.kinesis.producer.UserRecordResult; //导入依赖的package包/类
private int getNumPendingRecordFutures() {
int numPending = 0;
for (SettableFuture<UserRecordResult> future : pendingRecordFutures) {
if (!future.isDone()) {
numPending++;
}
}
return numPending;
}
开发者ID:axbaretto,项目名称:flink,代码行数:12,代码来源:FlinkKinesisProducerTest.java
示例10: getAndCheck
import com.amazonaws.services.kinesis.producer.UserRecordResult; //导入依赖的package包/类
private void getAndCheck(Future<UserRecordResult> future) throws StageException {
try {
UserRecordResult result = future.get();
if (!result.isSuccessful()) {
for (Attempt attempt : result.getAttempts()) {
LOG.error("Failed to put record: {}", attempt.getErrorMessage());
}
throw new StageException(Errors.KINESIS_00, result.getAttempts().get(0).getErrorMessage());
}
} catch (InterruptedException | ExecutionException e) {
LOG.error("Pipeline is shutting down.", e);
// We should flush if we encounter an error.
kinesisProducer.flushSync();
}
}
开发者ID:streamsets,项目名称:datacollector,代码行数:16,代码来源:KinesisTarget.java
示例11: testRecordTooLarge
import com.amazonaws.services.kinesis.producer.UserRecordResult; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Test
public void testRecordTooLarge() throws Exception {
KinesisProducerConfigBean config = getKinesisTargetConfig();
KinesisTarget target = new KinesisTarget(config);
TargetRunner targetRunner = new TargetRunner.Builder(
KinesisDTarget.class,
target
).setOnRecordError(OnRecordError.TO_ERROR).build();
KinesisTestUtil.mockKinesisUtil(1);
KinesisProducer producer = mock(KinesisProducer.class);
Whitebox.setInternalState(target, "kinesisProducer", producer);
targetRunner.runInit();
ListenableFuture<UserRecordResult> future = mock(ListenableFuture.class);
UserRecordResult result = mock(UserRecordResult.class);
when(result.isSuccessful()).thenReturn(true);
when(future.get()).thenReturn(result);
when(producer.addUserRecord(any(String.class), any(String.class), any(ByteBuffer.class)))
.thenReturn(future);
List<Record> records = new ArrayList<>(4);
records.add(KinesisTestUtil.getTooLargeRecord());
records.addAll(KinesisTestUtil.getProducerTestRecords(3));
targetRunner.runWrite(records);
// Verify we added 3 good records at the end of the batch but not the bad one
verify(producer, times(3)).addUserRecord(eq(STREAM_NAME), any(String.class), any(ByteBuffer.class));
assertEquals(1, targetRunner.getErrorRecords().size());
targetRunner.runDestroy();
}
开发者ID:streamsets,项目名称:datacollector,代码行数:41,代码来源:TestKinesisTarget.java
示例12: trackTimestamp
import com.amazonaws.services.kinesis.producer.UserRecordResult; //导入依赖的package包/类
/** Track the timestamp of the event for determining watermark values until it has been sent or dropped. */
public void trackTimestamp(ListenableFuture<UserRecordResult> f, TripEvent event) {
Futures.addCallback(f, new RemoveTimestampCallback(event));
}
开发者ID:awslabs,项目名称:flink-stream-processing-refarch,代码行数:5,代码来源:WatermarkTracker.java
示例13: onSuccess
import com.amazonaws.services.kinesis.producer.UserRecordResult; //导入依赖的package包/类
@Override
public void onSuccess(UserRecordResult result) {
removeEvent();
}
开发者ID:awslabs,项目名称:flink-stream-processing-refarch,代码行数:5,代码来源:WatermarkTracker.java
示例14: testAsyncErrorRethrownAfterFlush
import com.amazonaws.services.kinesis.producer.UserRecordResult; //导入依赖的package包/类
/**
* Test ensuring that if an async exception is caught for one of the flushed requests on checkpoint,
* it should be rethrown; we set a timeout because the test will not finish if the logic is broken.
*
* <p>Note that this test does not test the snapshot method is blocked correctly when there are pending records.
* The test for that is covered in testAtLeastOnceProducer.
*/
@SuppressWarnings("ResultOfMethodCallIgnored")
@Test(timeout = 10000)
public void testAsyncErrorRethrownAfterFlush() throws Throwable {
final DummyFlinkKinesisProducer<String> producer = new DummyFlinkKinesisProducer<>(new SimpleStringSchema());
OneInputStreamOperatorTestHarness<String, Object> testHarness =
new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
testHarness.open();
testHarness.processElement(new StreamRecord<>("msg-1"));
testHarness.processElement(new StreamRecord<>("msg-2"));
testHarness.processElement(new StreamRecord<>("msg-3"));
// only let the first record succeed for now
UserRecordResult result = mock(UserRecordResult.class);
when(result.isSuccessful()).thenReturn(true);
producer.getPendingRecordFutures().get(0).set(result);
CheckedThread snapshotThread = new CheckedThread() {
@Override
public void go() throws Exception {
// this should block at first, since there are still two pending records that needs to be flushed
testHarness.snapshot(123L, 123L);
}
};
snapshotThread.start();
// let the 2nd message fail with an async exception
producer.getPendingRecordFutures().get(1).setException(new Exception("artificial async failure for 2nd message"));
producer.getPendingRecordFutures().get(2).set(mock(UserRecordResult.class));
try {
snapshotThread.sync();
} catch (Exception e) {
// after the flush, the async exception should have been rethrown
Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(e, "artificial async failure for 2nd message").isPresent());
// test succeeded
return;
}
Assert.fail();
}
开发者ID:axbaretto,项目名称:flink,代码行数:52,代码来源:FlinkKinesisProducerTest.java
示例15: testAtLeastOnceProducer
import com.amazonaws.services.kinesis.producer.UserRecordResult; //导入依赖的package包/类
/**
* Test ensuring that the producer is not dropping buffered records;
* we set a timeout because the test will not finish if the logic is broken.
*/
@SuppressWarnings({"unchecked", "ResultOfMethodCallIgnored"})
@Test(timeout = 10000)
public void testAtLeastOnceProducer() throws Throwable {
final DummyFlinkKinesisProducer<String> producer = new DummyFlinkKinesisProducer<>(new SimpleStringSchema());
OneInputStreamOperatorTestHarness<String, Object> testHarness =
new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
testHarness.open();
testHarness.processElement(new StreamRecord<>("msg-1"));
testHarness.processElement(new StreamRecord<>("msg-2"));
testHarness.processElement(new StreamRecord<>("msg-3"));
// start a thread to perform checkpointing
CheckedThread snapshotThread = new CheckedThread() {
@Override
public void go() throws Exception {
// this should block until all records are flushed;
// if the snapshot implementation returns before pending records are flushed,
testHarness.snapshot(123L, 123L);
}
};
snapshotThread.start();
// before proceeding, make sure that flushing has started and that the snapshot is still blocked;
// this would block forever if the snapshot didn't perform a flush
producer.waitUntilFlushStarted();
Assert.assertTrue("Snapshot returned before all records were flushed", snapshotThread.isAlive());
// now, complete the callbacks
UserRecordResult result = mock(UserRecordResult.class);
when(result.isSuccessful()).thenReturn(true);
producer.getPendingRecordFutures().get(0).set(result);
Assert.assertTrue("Snapshot returned before all records were flushed", snapshotThread.isAlive());
producer.getPendingRecordFutures().get(1).set(result);
Assert.assertTrue("Snapshot returned before all records were flushed", snapshotThread.isAlive());
producer.getPendingRecordFutures().get(2).set(result);
// this would fail with an exception if flushing wasn't completed before the snapshot method returned
snapshotThread.sync();
testHarness.close();
}
开发者ID:axbaretto,项目名称:flink,代码行数:52,代码来源:FlinkKinesisProducerTest.java
示例16: getPendingRecordFutures
import com.amazonaws.services.kinesis.producer.UserRecordResult; //导入依赖的package包/类
List<SettableFuture<UserRecordResult>> getPendingRecordFutures() {
return pendingRecordFutures;
}
开发者ID:axbaretto,项目名称:flink,代码行数:4,代码来源:FlinkKinesisProducerTest.java
示例17: open
import com.amazonaws.services.kinesis.producer.UserRecordResult; //导入依赖的package包/类
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration();
producerConfig.setRegion(configProps.getProperty(ProducerConfigConstants.AWS_REGION));
producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps));
if (configProps.containsKey(ProducerConfigConstants.COLLECTION_MAX_COUNT)) {
producerConfig.setCollectionMaxCount(PropertiesUtil.getLong(configProps,
ProducerConfigConstants.COLLECTION_MAX_COUNT, producerConfig.getCollectionMaxCount(), LOG));
}
if (configProps.containsKey(ProducerConfigConstants.AGGREGATION_MAX_COUNT)) {
producerConfig.setAggregationMaxCount(PropertiesUtil.getLong(configProps,
ProducerConfigConstants.AGGREGATION_MAX_COUNT, producerConfig.getAggregationMaxCount(), LOG));
}
producer = new KinesisProducer(producerConfig);
callback = new FutureCallback<UserRecordResult>() {
@Override
public void onSuccess(UserRecordResult result) {
if (!result.isSuccessful()) {
if(failOnError) {
thrownException = new RuntimeException("Record was not sent successful");
} else {
LOG.warn("Record was not sent successful");
}
}
}
@Override
public void onFailure(Throwable t) {
if (failOnError) {
thrownException = t;
} else {
LOG.warn("An exception occurred while processing a record", t);
}
}
};
if (this.customPartitioner != null) {
this.customPartitioner.initialize(getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
}
LOG.info("Started Kinesis producer instance for region '{}'", producerConfig.getRegion());
}
开发者ID:axbaretto,项目名称:flink,代码行数:47,代码来源:FlinkKinesisProducer.java
示例18: invoke
import com.amazonaws.services.kinesis.producer.UserRecordResult; //导入依赖的package包/类
@Override
public void invoke(OUT value) throws Exception {
if (this.producer == null) {
throw new RuntimeException("Kinesis producer has been closed");
}
if (thrownException != null) {
String errorMessages = "";
if (thrownException instanceof UserRecordFailedException) {
List<Attempt> attempts = ((UserRecordFailedException) thrownException).getResult().getAttempts();
for (Attempt attempt: attempts) {
if (attempt.getErrorMessage() != null) {
errorMessages += attempt.getErrorMessage() +"\n";
}
}
}
if (failOnError) {
throw new RuntimeException("An exception was thrown while processing a record: " + errorMessages, thrownException);
} else {
LOG.warn("An exception was thrown while processing a record: {}", thrownException, errorMessages);
thrownException = null; // reset
}
}
String stream = defaultStream;
String partition = defaultPartition;
ByteBuffer serialized = schema.serialize(value);
// maybe set custom stream
String customStream = schema.getTargetStream(value);
if (customStream != null) {
stream = customStream;
}
String explicitHashkey = null;
// maybe set custom partition
if (customPartitioner != null) {
partition = customPartitioner.getPartitionId(value);
explicitHashkey = customPartitioner.getExplicitHashKey(value);
}
if (stream == null) {
if (failOnError) {
throw new RuntimeException("No target stream set");
} else {
LOG.warn("No target stream set. Skipping record");
return;
}
}
ListenableFuture<UserRecordResult> cb = producer.addUserRecord(stream, partition, explicitHashkey, serialized);
Futures.addCallback(cb, callback);
}
开发者ID:axbaretto,项目名称:flink,代码行数:54,代码来源:FlinkKinesisProducer.java
示例19: testInOrderProduce
import com.amazonaws.services.kinesis.producer.UserRecordResult; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Test
public void testInOrderProduce() throws Exception {
KinesisProducerConfigBean config = getKinesisTargetConfig();
config.preserveOrdering = true;
KinesisTarget target = new KinesisTarget(config);
TargetRunner targetRunner = new TargetRunner.Builder(KinesisDTarget.class, target).build();
PowerMockito.mockStatic(KinesisUtil.class);
when(KinesisUtil.checkStreamExists(
any(ClientConfiguration.class),
any(KinesisConfigBean.class),
any(String.class),
any(List.class),
any(Stage.Context.class)
)
).thenReturn(1L);
KinesisProducer producer = mock(KinesisProducer.class);
Whitebox.setInternalState(target, "kinesisProducer", producer);
targetRunner.runInit();
ListenableFuture<UserRecordResult> future = mock(ListenableFuture.class);
UserRecordResult result = mock(UserRecordResult.class);
when(result.isSuccessful()).thenReturn(true);
when(result.getShardId()).thenReturn("shardId-000000000000");
when(future.get()).thenReturn(result);
when(producer.addUserRecord(any(String.class), any(String.class), any(ByteBuffer.class)))
.thenReturn(future);
targetRunner.runWrite(KinesisTestUtil.getProducerTestRecords(3));
// Verify we added 3 records to stream test
verify(producer, times(3)).addUserRecord(eq(STREAM_NAME), any(String.class), any(ByteBuffer.class));
// With preserveOrdering we should call flushSync for each record, plus once more for the batch.
// The last invocation has no effect as no records should be pending.
verify(producer, times(4)).flushSync();
targetRunner.runDestroy();
}
开发者ID:streamsets,项目名称:datacollector,代码行数:48,代码来源:TestKinesisTarget.java
示例20: testDefaultProduce
import com.amazonaws.services.kinesis.producer.UserRecordResult; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Test
public void testDefaultProduce() throws Exception {
KinesisProducerConfigBean config = getKinesisTargetConfig();
KinesisTarget target = new KinesisTarget(config);
TargetRunner targetRunner = new TargetRunner.Builder(KinesisDTarget.class, target).build();
KinesisTestUtil.mockKinesisUtil(1);
KinesisProducer producer = mock(KinesisProducer.class);
Whitebox.setInternalState(target, "kinesisProducer", producer);
targetRunner.runInit();
ListenableFuture<UserRecordResult> future = mock(ListenableFuture.class);
UserRecordResult result = mock(UserRecordResult.class);
when(result.isSuccessful()).thenReturn(true);
when(future.get()).thenReturn(result);
when(producer.addUserRecord(any(String.class), any(String.class), any(ByteBuffer.class)))
.thenReturn(future);
targetRunner.runWrite(KinesisTestUtil.getProducerTestRecords(3));
// Verify we added 3 records
verify(producer, times(3)).addUserRecord(eq(STREAM_NAME), any(String.class), any(ByteBuffer.class));
// By default we should only call flushSync one time per batch.
verify(producer, times(1)).flushSync();
targetRunner.runDestroy();
}
开发者ID:streamsets,项目名称:datacollector,代码行数:36,代码来源:TestKinesisTarget.java
注:本文中的com.amazonaws.services.kinesis.producer.UserRecordResult类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论