本文整理汇总了Java中org.apache.flume.Sink.Status类的典型用法代码示例。如果您正苦于以下问题:Java Status类的具体用法?Java Status怎么用?Java Status使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
Status类属于org.apache.flume.Sink包,在下文中一共展示了Status类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: process
import org.apache.flume.Sink.Status; //导入依赖的package包/类
@Override
public Status process() throws EventDeliveryException {
Status status = null;
Iterator<Sink> sinkIterator = selector.createSinkIterator();
while (sinkIterator.hasNext()) {
Sink sink = sinkIterator.next();
try {
status = sink.process();
break;
} catch (Exception ex) {
selector.informSinkFailed(sink);
LOGGER.warn("Sink failed to consume event. "
+ "Attempting next sink if available.", ex);
}
}
if (status == null) {
throw new EventDeliveryException("All configured sinks have failed");
}
return status;
}
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:24,代码来源:LoadBalancingSinkProcessor.java
示例2: ensureGroupConfigurationCorrectlyUsed
import org.apache.flume.Sink.Status; //导入依赖的package包/类
@Test
public void ensureGroupConfigurationCorrectlyUsed() throws Exception {
when(channel.take()).thenReturn(event);
when(event.getBody()).thenReturn("something".getBytes());
Context context = new Context();
context.put("defaultRollback", "true");
context.put("defaultBackoff", "true");
context.put("defaultIncrementMetrics", "false");
context.put("rollback.2XX", "false");
context.put("backoff.2XX", "false");
context.put("incrementMetrics.2XX", "true");
executeWithMocks(true, Status.READY, true, true, context, HttpURLConnection.HTTP_OK);
executeWithMocks(true, Status.READY, true, true, context, HttpURLConnection.HTTP_NO_CONTENT);
}
开发者ID:hmrc,项目名称:flume-http-sink,代码行数:17,代码来源:HttpSinkTest.java
示例3: ensureSingleStatusConfigurationOverridesGroupConfigurationCorrectly
import org.apache.flume.Sink.Status; //导入依赖的package包/类
@Test
public void ensureSingleStatusConfigurationOverridesGroupConfigurationCorrectly() throws Exception {
when(channel.take()).thenReturn(event);
when(event.getBody()).thenReturn("something".getBytes());
Context context = new Context();
context.put("rollback.2XX", "false");
context.put("backoff.2XX", "false");
context.put("incrementMetrics.2XX", "true");
context.put("rollback.200", "true");
context.put("backoff.200", "true");
context.put("incrementMetrics.200", "false");
executeWithMocks(true, Status.READY, true, true, context, HttpURLConnection.HTTP_NO_CONTENT);
executeWithMocks(false, Status.BACKOFF, false, true, context, HttpURLConnection.HTTP_OK);
}
开发者ID:hmrc,项目名称:flume-http-sink,代码行数:17,代码来源:HttpSinkTest.java
示例4: testFieldType
import org.apache.flume.Sink.Status; //导入依赖的package包/类
private void testFieldType(final String field, final String value, final Status result) {
//System.out.println();
headers.put(field, value);
addEventToChannel(headers);
boolean thrown = false;
try {
Status status = sink.process();
Assert.assertEquals(result, status);
} catch (EventDeliveryException ex) {
thrown = true;
}
final Transaction tx = channel.getTransaction();
tx.begin();
final Event nextEvent = channel.take();
tx.commit();
tx.close();
if (result == Status.READY) {
Assert.assertFalse(thrown);
Assert.assertNull(nextEvent);
} else {
Assert.assertTrue(thrown);
Assert.assertNotNull(nextEvent);
}
}
开发者ID:Stratio,项目名称:ingestion,代码行数:25,代码来源:CassandraDataTypesIT.java
示例5: testDefaultConfiguration
import org.apache.flume.Sink.Status; //导入依赖的package包/类
@Test
public void testDefaultConfiguration() throws Exception {
// If no selector is specified, the round-robin selector should be used
Channel ch = new MockChannel();
int n = 100;
int numEvents = 3 * n;
for (int i = 0; i < numEvents; i++) {
ch.put(new MockEvent("test" + i));
}
MockSink s1 = new MockSink(1);
s1.setChannel(ch);
MockSink s2 = new MockSink(2);
s2.setChannel(ch);
MockSink s3 = new MockSink(3);
s3.setChannel(ch);
List<Sink> sinks = new ArrayList<Sink>();
sinks.add(s1);
sinks.add(s2);
sinks.add(s3);
LoadBalancingSinkProcessor lbsp = getProcessor(sinks, new Context());
Status s = Status.READY;
while (s != Status.BACKOFF) {
s = lbsp.process();
}
Assert.assertTrue(s1.getEvents().size() == n);
Assert.assertTrue(s2.getEvents().size() == n);
Assert.assertTrue(s3.getEvents().size() == n);
}
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:37,代码来源:TestLoadBalancingSinkProcessor.java
示例6: testRandomPersistentFailure
import org.apache.flume.Sink.Status; //导入依赖的package包/类
@Test
public void testRandomPersistentFailure() throws Exception {
Channel ch = new MockChannel();
int n = 100;
int numEvents = 3 * n;
for (int i = 0; i < numEvents; i++) {
ch.put(new MockEvent("test" + i));
}
MockSink s1 = new MockSink(1);
s1.setChannel(ch);
MockSink s2 = new MockSink(2);
s2.setChannel(ch);
// s2 always fails
s2.setFail(true);
MockSink s3 = new MockSink(3);
s3.setChannel(ch);
List<Sink> sinks = new ArrayList<Sink>();
sinks.add(s1);
sinks.add(s2);
sinks.add(s3);
LoadBalancingSinkProcessor lbsp = getProcessor("random",sinks, false);
Status s = Status.READY;
while (s != Status.BACKOFF) {
s = lbsp.process();
}
Assert.assertTrue(s2.getEvents().size() == 0);
Assert.assertTrue(s1.getEvents().size() + s3.getEvents().size() == 3 * n);
}
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:37,代码来源:TestLoadBalancingSinkProcessor.java
示例7: testRoundRobinPersistentFailure
import org.apache.flume.Sink.Status; //导入依赖的package包/类
@Test
public void testRoundRobinPersistentFailure() throws Exception {
Channel ch = new MockChannel();
int n = 100;
int numEvents = 3 * n;
for (int i = 0; i < numEvents; i++) {
ch.put(new MockEvent("test" + i));
}
MockSink s1 = new MockSink(1);
s1.setChannel(ch);
MockSink s2 = new MockSink(2);
s2.setChannel(ch);
// s2 always fails
s2.setFail(true);
MockSink s3 = new MockSink(3);
s3.setChannel(ch);
List<Sink> sinks = new ArrayList<Sink>();
sinks.add(s1);
sinks.add(s2);
sinks.add(s3);
LoadBalancingSinkProcessor lbsp = getProcessor("round_robin",sinks, false);
Status s = Status.READY;
while (s != Status.BACKOFF) {
s = lbsp.process();
}
Assert.assertTrue(s1.getEvents().size() == n);
Assert.assertTrue(s2.getEvents().size() == 0);
Assert.assertTrue(s3.getEvents().size() == 2 * n);
}
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:38,代码来源:TestLoadBalancingSinkProcessor.java
示例8: testRoundRobinNoFailure
import org.apache.flume.Sink.Status; //导入依赖的package包/类
@Test
public void testRoundRobinNoFailure() throws Exception {
Channel ch = new MockChannel();
int n = 100;
int numEvents = 3 * n;
for (int i = 0; i < numEvents; i++) {
ch.put(new MockEvent("test" + i));
}
MockSink s1 = new MockSink(1);
s1.setChannel(ch);
MockSink s2 = new MockSink(2);
s2.setChannel(ch);
MockSink s3 = new MockSink(3);
s3.setChannel(ch);
List<Sink> sinks = new ArrayList<Sink>();
sinks.add(s1);
sinks.add(s2);
sinks.add(s3);
LoadBalancingSinkProcessor lbsp = getProcessor("round_robin",sinks, false);
Status s = Status.READY;
while (s != Status.BACKOFF) {
s = lbsp.process();
}
Assert.assertTrue(s1.getEvents().size() == n);
Assert.assertTrue(s2.getEvents().size() == n);
Assert.assertTrue(s3.getEvents().size() == n);
}
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:36,代码来源:TestLoadBalancingSinkProcessor.java
示例9: testCustomSelector
import org.apache.flume.Sink.Status; //导入依赖的package包/类
@Test
public void testCustomSelector() throws Exception {
Channel ch = new MockChannel();
int n = 10;
int numEvents = n;
for (int i = 0; i < numEvents; i++) {
ch.put(new MockEvent("test" + i));
}
MockSink s1 = new MockSink(1);
s1.setChannel(ch);
// s1 always fails
s1.setFail(true);
MockSink s2 = new MockSink(2);
s2.setChannel(ch);
MockSink s3 = new MockSink(3);
s3.setChannel(ch);
List<Sink> sinks = new ArrayList<Sink>();
sinks.add(s1);
sinks.add(s2);
sinks.add(s3);
// This selector will result in all events going to s2
Context ctx = getContext(FixedOrderSelector.class.getCanonicalName());
ctx.put("selector." + FixedOrderSelector.SET_ME, "foo");
LoadBalancingSinkProcessor lbsp = getProcessor(sinks, ctx);
Sink.Status s = Sink.Status.READY;
while (s != Sink.Status.BACKOFF) {
s = lbsp.process();
}
Assert.assertTrue(s1.getEvents().size() == 0);
Assert.assertTrue(s2.getEvents().size() == n);
Assert.assertTrue(s3.getEvents().size() == 0);
}
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:41,代码来源:TestLoadBalancingSinkProcessor.java
示例10: process
import org.apache.flume.Sink.Status; //导入依赖的package包/类
@Override
public Status process() throws EventDeliveryException {
if (fail) {
throw new EventDeliveryException("failed");
}
Event e = this.getChannel().take();
if (e == null) {
return Status.BACKOFF;
}
events.add(e);
return Status.READY;
}
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:14,代码来源:TestLoadBalancingSinkProcessor.java
示例11: testEmptyChannelResultsInStatusBackoff
import org.apache.flume.Sink.Status; //导入依赖的package包/类
@Test
public void testEmptyChannelResultsInStatusBackoff()
throws InterruptedException, LifecycleException, EventDeliveryException {
LOG.debug("Starting...");
Context context = new Context();
Channel channel = new MemoryChannel();
context.put("hdfs.path", testPath);
context.put("keep-alive", "0");
Configurables.configure(sink, context);
Configurables.configure(channel, context);
sink.setChannel(channel);
sink.start();
Assert.assertEquals(Status.BACKOFF, sink.process());
sink.stop();
}
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:16,代码来源:TestHDFSEventSink.java
示例12: testSlowAppendFailure
import org.apache.flume.Sink.Status; //导入依赖的package包/类
@Test
public void testSlowAppendFailure() throws InterruptedException,
LifecycleException, EventDeliveryException, IOException {
LOG.debug("Starting...");
final String fileName = "FlumeData";
final long rollCount = 5;
final long batchSize = 2;
final int numBatches = 2;
String newPath = testPath + "/singleBucket";
int i = 1, j = 1;
// clear the test directory
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path dirPath = new Path(newPath);
fs.delete(dirPath, true);
fs.mkdirs(dirPath);
// create HDFS sink with slow writer
HDFSTestWriterFactory badWriterFactory = new HDFSTestWriterFactory();
sink = new HDFSEventSink(badWriterFactory);
Context context = new Context();
context.put("hdfs.path", newPath);
context.put("hdfs.filePrefix", fileName);
context.put("hdfs.rollCount", String.valueOf(rollCount));
context.put("hdfs.batchSize", String.valueOf(batchSize));
context.put("hdfs.fileType", HDFSTestWriterFactory.TestSequenceFileType);
context.put("hdfs.callTimeout", Long.toString(1000));
Configurables.configure(sink, context);
Channel channel = new MemoryChannel();
Configurables.configure(channel, context);
sink.setChannel(channel);
sink.start();
Calendar eventDate = Calendar.getInstance();
// push the event batches into channel
for (i = 0; i < numBatches; i++) {
Transaction txn = channel.getTransaction();
txn.begin();
for (j = 1; j <= batchSize; j++) {
Event event = new SimpleEvent();
eventDate.clear();
eventDate.set(2011, i, i, i, 0); // yy mm dd
event.getHeaders().put("timestamp",
String.valueOf(eventDate.getTimeInMillis()));
event.getHeaders().put("hostname", "Host" + i);
event.getHeaders().put("slow", "1500");
event.setBody(("Test." + i + "." + j).getBytes());
channel.put(event);
}
txn.commit();
txn.close();
// execute sink to process the events
Status satus = sink.process();
// verify that the append returned backoff due to timeotu
Assert.assertEquals(satus, Status.BACKOFF);
}
sink.stop();
}
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:68,代码来源:TestHDFSEventSink.java
示例13: shouldIndexFiveEventsOverThreeBatches
import org.apache.flume.Sink.Status; //导入依赖的package包/类
@Test
public void shouldIndexFiveEventsOverThreeBatches() throws Exception {
parameters.put(BATCH_SIZE, "2");
Configurables.configure(fixture, new Context(parameters));
Channel channel = bindAndStartChannel(fixture);
int numberOfEvents = 5;
Event[] events = new Event[numberOfEvents];
Transaction tx = channel.getTransaction();
tx.begin();
for (int i = 0; i < numberOfEvents; i++) {
String body = "event #" + i + " of " + numberOfEvents;
Event event = EventBuilder.withBody(body.getBytes());
events[i] = event;
channel.put(event);
}
tx.commit();
tx.close();
int count = 0;
Status status = Status.READY;
while (status != Status.BACKOFF) {
count++;
status = fixture.process();
}
fixture.stop();
assertEquals(3, count);
client.admin().indices()
.refresh(Requests.refreshRequest(timestampedIndexName)).actionGet();
assertMatchAllQuery(numberOfEvents, events);
assertBodyQuery(5, events);
}
开发者ID:Redliver,项目名称:flume-ng-elasticsearch5-sink,代码行数:36,代码来源:TestElasticSearchSink.java
示例14: ensureRollbackBackoffAndIncrementMetricsIfConfigured
import org.apache.flume.Sink.Status; //导入依赖的package包/类
@Test
public void ensureRollbackBackoffAndIncrementMetricsIfConfigured() throws Exception {
when(channel.take()).thenReturn(event);
when(event.getBody()).thenReturn("something".getBytes());
Context context = new Context();
context.put("defaultRollback", "true");
context.put("defaultBackoff", "true");
context.put("defaultIncrementMetrics", "true");
executeWithMocks(false, Status.BACKOFF, true, true, context, HttpURLConnection.HTTP_OK);
}
开发者ID:hmrc,项目名称:flume-http-sink,代码行数:13,代码来源:HttpSinkTest.java
示例15: ensureCommitReadyAndNoIncrementMetricsIfConfigured
import org.apache.flume.Sink.Status; //导入依赖的package包/类
@Test
public void ensureCommitReadyAndNoIncrementMetricsIfConfigured() throws Exception {
when(channel.take()).thenReturn(event);
when(event.getBody()).thenReturn("something".getBytes());
Context context = new Context();
context.put("defaultRollback", "false");
context.put("defaultBackoff", "false");
context.put("defaultIncrementMetrics", "false");
executeWithMocks(true, Status.READY, false, false, context, HttpURLConnection.HTTP_OK);
}
开发者ID:hmrc,项目名称:flume-http-sink,代码行数:13,代码来源:HttpSinkTest.java
示例16: ensureSingleStatusConfigurationCorrectlyUsed
import org.apache.flume.Sink.Status; //导入依赖的package包/类
@Test
public void ensureSingleStatusConfigurationCorrectlyUsed() throws Exception {
when(channel.take()).thenReturn(event);
when(event.getBody()).thenReturn("something".getBytes());
Context context = new Context();
context.put("defaultRollback", "true");
context.put("defaultBackoff", "true");
context.put("defaultIncrementMetrics", "false");
context.put("rollback.200", "false");
context.put("backoff.200", "false");
context.put("incrementMetrics.200", "true");
executeWithMocks(true, Status.READY, true, true, context, HttpURLConnection.HTTP_OK);
}
开发者ID:hmrc,项目名称:flume-http-sink,代码行数:16,代码来源:HttpSinkTest.java
示例17: process
import org.apache.flume.Sink.Status; //导入依赖的package包/类
@Override
public Status process() throws EventDeliveryException {
if (invalidConfiguration) {
return Status.BACKOFF;
} else if (rollbackedAccumulations.isEmpty()) {
return processNewBatches();
} else {
processRollbackedBatches();
return processNewBatches();
} // if else
}
开发者ID:telefonicaid,项目名称:fiware-cygnus,代码行数:12,代码来源:NGSISink.java
示例18: testProcessStatusReady
import org.apache.flume.Sink.Status; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Test
public void testProcessStatusReady() throws EventDeliveryException, SecurityException, NoSuchFieldException, IllegalArgumentException, IllegalAccessException {
when(mockEvent.getBody()).thenReturn("frank".getBytes());
Status status = mockKafkaSink.process();
verify(mockChannel, times(1)).getTransaction();
verify(mockChannel, times(1)).take();
verify(mockProducer, times(1)).send((KeyedMessage<byte[], byte[]>) any());
verify(mockTx, times(1)).commit();
verify(mockTx, times(0)).rollback();
verify(mockTx, times(1)).close();
assertEquals(Status.READY, status);
}
开发者ID:keedio,项目名称:flume-ng-kafka-avro-sink,代码行数:14,代码来源:KafkaAvroSinkTest.java
示例19: testProcessStatusBackoff
import org.apache.flume.Sink.Status; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Test
public void testProcessStatusBackoff() throws EventDeliveryException, SecurityException, NoSuchFieldException, IllegalArgumentException, IllegalAccessException {
when(mockEvent.getBody()).thenThrow(new RuntimeException());
Status status = mockKafkaSink.process();
verify(mockChannel, times(1)).getTransaction();
verify(mockChannel, times(1)).take();
verify(mockProducer, times(0)).send((KeyedMessage<byte[], byte[]>) any());
verify(mockTx, times(0)).commit();
verify(mockTx, times(1)).rollback();
verify(mockTx, times(1)).close();
assertEquals(Status.BACKOFF, status);
}
开发者ID:keedio,项目名称:flume-ng-kafka-avro-sink,代码行数:14,代码来源:KafkaAvroSinkTest.java
示例20: timestampFieldAllowsDatesWithTheFormatDefined
import org.apache.flume.Sink.Status; //导入依赖的package包/类
@Ignore
@Test
public void timestampFieldAllowsDatesWithTheFormatDefined() {
testFieldType(TIMESTAMP_FIELD, "1231234", Status.READY);
testFieldType(TIMESTAMP_FIELD, "2010-12-20T10:20:20", Status.READY);
testFieldType(TIMESTAMP_FIELD, "2010-12-20T10:20:20.000", Status.READY);
}
开发者ID:Stratio,项目名称:ingestion,代码行数:8,代码来源:CassandraDataTypesIT.java
注:本文中的org.apache.flume.Sink.Status类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论