本文整理汇总了Java中org.apache.flume.Transaction类的典型用法代码示例。如果您正苦于以下问题:Java Transaction类的具体用法?Java Transaction怎么用?Java Transaction使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
Transaction类属于org.apache.flume包,在下文中一共展示了Transaction类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: testPut2
import org.apache.flume.Transaction; //导入依赖的package包/类
@Test
public void testPut2() throws Exception {
Transaction transaction = channel.getTransaction();
transaction.begin();
channel.put(events.get(0));
transaction.rollback();
testIllegalState(new Runnable() {
@Override
public void run() {
channel.put(events.get(0));
}
});
transaction.close();
testIllegalState(new Runnable() {
@Override
public void run() {
channel.put(events.get(0));
}
});
}
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:24,代码来源:TestBasicChannelSemantics.java
示例2: testPutCheckpointCommitCheckpointReplay
import org.apache.flume.Transaction; //导入依赖的package包/类
@Test
public void testPutCheckpointCommitCheckpointReplay() throws Exception {
Map<String, String> overrides = Maps.newHashMap();
overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(2));
overrides.put(FileChannelConfiguration.TRANSACTION_CAPACITY,
String.valueOf(2));
overrides.put(FileChannelConfiguration.CHECKPOINT_INTERVAL, "10000");
FileChannel channel = createFileChannel(overrides);
channel.start();
//Force a checkpoint by committing a transaction
Transaction tx = channel.getTransaction();
Set<String> in = putWithoutCommit(channel, tx, "doubleCheckpoint", 1);
forceCheckpoint(channel);
tx.commit();
tx.close();
forceCheckpoint(channel);
channel.stop();
channel = createFileChannel(overrides);
channel.start();
Assert.assertTrue(channel.isOpen());
Set<String> out = takeEvents(channel, 5);
compareInputAndOut(in, out);
channel.stop();
}
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:26,代码来源:TestFileChannel.java
示例3: remaining
import org.apache.flume.Transaction; //导入依赖的package包/类
public static int remaining(Channel ch) throws EventDeliveryException {
Transaction t = ch.getTransaction();
try {
t.begin();
int count = 0;
while (ch.take() != null) {
count += 1;
}
t.commit();
return count;
} catch (Throwable th) {
t.rollback();
Throwables.propagateIfInstanceOf(th, Error.class);
Throwables.propagateIfInstanceOf(th, EventDeliveryException.class);
throw new EventDeliveryException(th);
} finally {
t.close();
}
}
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:20,代码来源:TestDatasetSink.java
示例4: testCommit2
import org.apache.flume.Transaction; //导入依赖的package包/类
@Test
public void testCommit2() throws Exception {
final Transaction transaction = channel.getTransaction();
transaction.begin();
transaction.rollback();
testIllegalState(new Runnable() {
@Override
public void run() {
transaction.commit();
}
});
transaction.close();
testIllegalState(new Runnable() {
@Override
public void run() {
transaction.commit();
}
});
}
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:24,代码来源:TestBasicChannelSemantics.java
示例5: testClose1
import org.apache.flume.Transaction; //导入依赖的package包/类
@Test
public void testClose1() throws Exception {
final Transaction transaction = channel.getTransaction();
testError(new Runnable() {
@Override
public void run() {
transaction.close();
}
});
testIllegalState(new Runnable() {
@Override
public void run() {
transaction.close();
}
});
}
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:19,代码来源:TestBasicChannelSemantics.java
示例6: testClose2
import org.apache.flume.Transaction; //导入依赖的package包/类
@Test
public void testClose2() throws Exception {
final Transaction transaction = channel.getTransaction();
testRuntimeException(new Runnable() {
@Override
public void run() {
transaction.close();
}
});
testIllegalState(new Runnable() {
@Override
public void run() {
transaction.close();
}
});
}
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:19,代码来源:TestBasicChannelSemantics.java
示例7: testClose5
import org.apache.flume.Transaction; //导入依赖的package包/类
@Test
public void testClose5() throws Exception {
final Transaction transaction = channel.getTransaction();
transaction.begin();
testChannelException(new Runnable() {
@Override
public void run() {
transaction.commit();
}
});
testIllegalState(new Runnable() {
@Override
public void run() {
transaction.close();
}
});
}
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:20,代码来源:TestBasicChannelSemantics.java
示例8: commitPutsToOverflow_core
import org.apache.flume.Transaction; //导入依赖的package包/类
private void commitPutsToOverflow_core(Transaction overflowPutTx)
throws InterruptedException {
// reattempt only once if overflow is full first time around
for (int i = 0; i < 2; ++i) {
try {
synchronized (queueLock) {
overflowPutTx.commit();
drainOrder.putOverflow(putList.size());
channelCounter.setChannelSize(memQueue.size()
+ drainOrder.overflowCounter);
break;
}
} catch (ChannelFullException e) { // drop lock & reattempt
if (i == 0) {
Thread.sleep(overflowTimeout * 1000);
} else {
throw e;
}
}
}
}
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:22,代码来源:SpillableMemoryChannel.java
示例9: testCapacityBufferEmptyingAfterRollback
import org.apache.flume.Transaction; //导入依赖的package包/类
@Test
public void testCapacityBufferEmptyingAfterRollback() {
Context context = new Context();
Map<String, String> parms = new HashMap<String, String>();
parms.put("capacity", "3");
parms.put("transactionCapacity", "3");
context.putAll(parms);
Configurables.configure(channel, context);
Transaction tx = channel.getTransaction();
tx.begin();
channel.put(EventBuilder.withBody("test".getBytes()));
channel.put(EventBuilder.withBody("test".getBytes()));
channel.put(EventBuilder.withBody("test".getBytes()));
tx.rollback();
tx.close();
tx = channel.getTransaction();
tx.begin();
channel.put(EventBuilder.withBody("test".getBytes()));
channel.put(EventBuilder.withBody("test".getBytes()));
channel.put(EventBuilder.withBody("test".getBytes()));
tx.commit();
tx.close();
}
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:26,代码来源:TestMemoryChannel.java
示例10: testNullEmptyEvent
import org.apache.flume.Transaction; //导入依赖的package包/类
@Test
public void testNullEmptyEvent() {
Context context = new Context();
Map<String, String> parms = new HashMap<String, String>();
parms.put("byteCapacity", "2000");
parms.put("byteCapacityBufferPercentage", "20");
context.putAll(parms);
Configurables.configure(channel, context);
Transaction tx = channel.getTransaction();
tx.begin();
//This line would cause a NPE without FLUME-1622.
channel.put(EventBuilder.withBody(null));
tx.commit();
tx.close();
tx = channel.getTransaction();
tx.begin();
channel.put(EventBuilder.withBody(new byte[0]));
tx.commit();
tx.close();
}
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:25,代码来源:TestMemoryChannel.java
示例11: testLayout
import org.apache.flume.Transaction; //导入依赖的package包/类
@Test
public void testLayout() throws IOException {
configureSource();
props.put("log4j.appender.out2.layout", "org.apache.log4j.PatternLayout");
props.put("log4j.appender.out2.layout.ConversionPattern",
"%-5p [%t]: %m%n");
PropertyConfigurator.configure(props);
Logger logger = LogManager.getLogger(TestLog4jAppender.class);
Thread.currentThread().setName("Log4jAppenderTest");
for (int count = 0; count <= 100; count++) {
/*
* Log4j internally defines levels as multiples of 10000. So if we
* create levels directly using count, the level will be set as the
* default.
*/
int level = ((count % 5) + 1) * 10000;
String msg = "This is log message number" + String.valueOf(count);
logger.log(Level.toLevel(level), msg);
Transaction transaction = ch.getTransaction();
transaction.begin();
Event event = ch.take();
Assert.assertNotNull(event);
StringBuilder builder = new StringBuilder();
builder.append("[").append("Log4jAppenderTest").append("]: ")
.append(msg);
//INFO seems to insert an extra space, so lets split the string.
String eventBody = new String(event.getBody(), "UTF-8");
String eventLevel = eventBody.split("\\s+")[0];
Assert.assertEquals(Level.toLevel(level).toString(), eventLevel);
Assert.assertEquals(
new String(event.getBody(), "UTF8").trim()
.substring(eventLevel.length()).trim(), builder.toString());
Map<String, String> hdrs = event.getHeaders();
Assert.assertNotNull(hdrs.get(Log4jAvroHeaders.TIMESTAMP.toString()));
Assert.assertEquals(Level.toLevel(level),
Level.toLevel(Integer.parseInt(hdrs.get(Log4jAvroHeaders.LOG_LEVEL
.toString()))));
Assert.assertEquals(logger.getName(),
hdrs.get(Log4jAvroHeaders.LOGGER_NAME.toString()));
Assert.assertEquals("UTF8",
hdrs.get(Log4jAvroHeaders.MESSAGE_ENCODING.toString()));
transaction.commit();
transaction.close();
}
}
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:54,代码来源:TestLog4jAppender.java
示例12: testAppend
import org.apache.flume.Transaction; //导入依赖的package包/类
@Test
public void testAppend() throws Exception {
client = RpcClientFactory.getThriftInstance(props);
Context context = new Context();
channel.configure(context);
configureSource();
context.put(ThriftSource.CONFIG_BIND, "0.0.0.0");
context.put(ThriftSource.CONFIG_PORT, String.valueOf(port));
Configurables.configure(source, context);
source.start();
for (int i = 0; i < 30; i++) {
client.append(EventBuilder.withBody(String.valueOf(i).getBytes()));
}
Transaction transaction = channel.getTransaction();
transaction.begin();
for (int i = 0; i < 30; i++) {
Event event = channel.take();
Assert.assertNotNull(event);
Assert.assertEquals(String.valueOf(i), new String(event.getBody()));
}
transaction.commit();
transaction.close();
}
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:25,代码来源:TestThriftSource.java
示例13: testSourceCounter
import org.apache.flume.Transaction; //导入依赖的package包/类
@Test
public void testSourceCounter() throws Exception {
init("true");
source.start();
DatagramPacket datagramPacket = createDatagramPacket("test".getBytes());
sendDatagramPacket(datagramPacket);
Transaction txn = channel.getTransaction();
txn.begin();
channel.take();
commitAndCloseTransaction(txn);
// Retrying up to 10 times while the acceptedCount == 0 because the event processing in
// SyslogUDPSource is handled on a separate thread by Netty so message delivery,
// thus the sourceCounter's increment can be delayed resulting in a flaky test
for (int i = 0; i < 10 && source.getSourceCounter().getEventAcceptedCount() == 0; i++) {
Thread.sleep(100);
}
Assert.assertEquals(1, source.getSourceCounter().getEventAcceptedCount());
Assert.assertEquals(1, source.getSourceCounter().getEventReceivedCount());
}
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:25,代码来源:TestSyslogUdpSource.java
示例14: testSimpleUTF16
import org.apache.flume.Transaction; //导入依赖的package包/类
@Test
public void testSimpleUTF16() throws IOException, InterruptedException {
StringEntity input = new StringEntity("[{\"headers\":{\"a\": \"b\"},\"body\": \"random_body\"},"
+ "{\"headers\":{\"e\": \"f\"},\"body\": \"random_body2\"}]", "UTF-16");
input.setContentType("application/json; charset=utf-16");
postRequest.setEntity(input);
HttpResponse response = httpClient.execute(postRequest);
Assert.assertEquals(HttpServletResponse.SC_OK,
response.getStatusLine().getStatusCode());
Transaction tx = channel.getTransaction();
tx.begin();
Event e = channel.take();
Assert.assertNotNull(e);
Assert.assertEquals("b", e.getHeaders().get("a"));
Assert.assertEquals("random_body", new String(e.getBody(), "UTF-16"));
e = channel.take();
Assert.assertNotNull(e);
Assert.assertEquals("f", e.getHeaders().get("e"));
Assert.assertEquals("random_body2", new String(e.getBody(), "UTF-16"));
tx.commit();
tx.close();
}
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:27,代码来源:TestHTTPSource.java
示例15: testSingleEvent
import org.apache.flume.Transaction; //导入依赖的package包/类
@Test
public void testSingleEvent() throws Exception {
StringEntity input = new StringEntity("[{\"headers\" : {\"a\": \"b\"},\"body\":"
+ " \"random_body\"}]");
input.setContentType("application/json");
postRequest.setEntity(input);
httpClient.execute(postRequest);
Transaction tx = channel.getTransaction();
tx.begin();
Event e = channel.take();
Assert.assertNotNull(e);
Assert.assertEquals("b", e.getHeaders().get("a"));
Assert.assertEquals("random_body", new String(e.getBody(),"UTF-8"));
tx.commit();
tx.close();
}
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:18,代码来源:TestHTTPSource.java
示例16: takeWithEncoding
import org.apache.flume.Transaction; //导入依赖的package包/类
private void takeWithEncoding(String encoding, int n, List<JSONEvent> events) throws Exception {
Transaction tx = channel.getTransaction();
tx.begin();
Event e = null;
int i = 0;
while (true) {
e = channel.take();
if (e == null) {
break;
}
Event current = events.get(i++);
Assert.assertEquals(new String(current.getBody(), encoding),
new String(e.getBody(), encoding));
Assert.assertEquals(current.getHeaders(), e.getHeaders());
}
Assert.assertEquals(n, events.size());
tx.commit();
tx.close();
}
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:20,代码来源:TestHTTPSource.java
示例17: testPutTake
import org.apache.flume.Transaction; //导入依赖的package包/类
@Test
public void testPutTake() {
Map<String, String> params = new HashMap<String, String>();
params.put("memoryCapacity", "5");
params.put("overflowCapacity", "5");
params.put(FileChannelConfiguration.TRANSACTION_CAPACITY, "5");
startChannel(params);
Transaction tx = channel.getTransaction();
tx.begin();
putN(0, 2, channel);
tx.commit();
tx.close();
tx = channel.getTransaction();
tx.begin();
takeN(0, 2, channel);
tx.commit();
tx.close();
}
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:21,代码来源:TestSpillableMemoryChannel.java
示例18: testOneEvent
import org.apache.flume.Transaction; //导入依赖的package包/类
@Test
public void testOneEvent() throws Exception {
initContextForSimpleHbaseEventSerializer();
HBaseSink sink = new HBaseSink(conf);
Configurables.configure(sink, ctx);
Channel channel = new MemoryChannel();
Configurables.configure(channel, new Context());
sink.setChannel(channel);
sink.start();
Transaction tx = channel.getTransaction();
tx.begin();
Event e = EventBuilder.withBody(
Bytes.toBytes(valBase));
channel.put(e);
tx.commit();
tx.close();
sink.process();
sink.stop();
HTable table = new HTable(conf, tableName);
byte[][] results = getResults(table, 1);
byte[] out = results[0];
Assert.assertArrayEquals(e.getBody(), out);
out = results[1];
Assert.assertArrayEquals(Longs.toByteArray(1), out);
}
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:27,代码来源:TestHBaseSink.java
示例19: testRollbackAfterNoPutTake
import org.apache.flume.Transaction; //导入依赖的package包/类
@Test
public void testRollbackAfterNoPutTake() throws Exception {
channel.start();
Assert.assertTrue(channel.isOpen());
Transaction transaction;
transaction = channel.getTransaction();
transaction.begin();
transaction.rollback();
transaction.close();
// ensure we can reopen log with no error
channel.stop();
channel = createFileChannel();
channel.start();
Assert.assertTrue(channel.isOpen());
transaction = channel.getTransaction();
transaction.begin();
Assert.assertNull(channel.take());
transaction.commit();
transaction.close();
}
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:22,代码来源:TestFileChannelRollback.java
示例20: testPutFilenameHeader
import org.apache.flume.Transaction; //导入依赖的package包/类
@Test
public void testPutFilenameHeader() throws IOException {
File f1 = new File(tmpDir, "file1");
Files.write("f1\n", f1, Charsets.UTF_8);
Context context = new Context();
context.put(POSITION_FILE, posFilePath);
context.put(FILE_GROUPS, "fg");
context.put(FILE_GROUPS_PREFIX + "fg", tmpDir.getAbsolutePath() + "/file.*");
context.put(FILENAME_HEADER, "true");
context.put(FILENAME_HEADER_KEY, "path");
Configurables.configure(source, context);
source.start();
source.process();
Transaction txn = channel.getTransaction();
txn.begin();
Event e = channel.take();
txn.commit();
txn.close();
assertNotNull(e.getHeaders().get("path"));
assertEquals(f1.getAbsolutePath(),
e.getHeaders().get("path"));
}
开发者ID:moueimei,项目名称:flume-release-1.7.0,代码行数:26,代码来源:TestTaildirSource.java
注:本文中的org.apache.flume.Transaction类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论