本文整理汇总了Java中com.datatorrent.api.LocalMode类的典型用法代码示例。如果您正苦于以下问题:Java LocalMode类的具体用法?Java LocalMode怎么用?Java LocalMode使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
LocalMode类属于com.datatorrent.api包,在下文中一共展示了LocalMode类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: testApplication
import com.datatorrent.api.LocalMode; //导入依赖的package包/类
@Test
public void testApplication() throws Exception
{
try {
// write messages to Kafka topic
Configuration conf = getConfig();
writeToTopic();
// run app asynchronously; terminate after results are checked
LocalMode.Controller lc = asyncRun(conf);
// check for presence of output file
waitForOutputTuples();
// compare output lines to input
compare();
lc.shutdown();
} catch (ConstraintViolationException e) {
Assert.fail("constraint violations: " + e.getConstraintViolations());
}
}
开发者ID:DataTorrent,项目名称:app-templates,代码行数:25,代码来源:ApplicationTest.java
示例2: testApplication
import com.datatorrent.api.LocalMode; //导入依赖的package包/类
@Test
public void testApplication() throws IOException, Exception
{
try {
LocalMode lma = LocalMode.newInstance();
Configuration conf = new Configuration(false);
conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-test.xml"));
lma.prepareDAG(new Application(), conf);
LocalMode.Controller lc = lma.getController();
lc.run(5000);
// get messages from Kafka topic and compare with input
chkOutput();
lc.shutdown();
} catch (ConstraintViolationException e) {
Assert.fail("constraint violations: " + e.getConstraintViolations());
}
}
开发者ID:DataTorrent,项目名称:app-templates,代码行数:22,代码来源:ApplicationTest.java
示例3: testApplication
import com.datatorrent.api.LocalMode; //导入依赖的package包/类
@Test
public void testApplication() throws Exception
{
try {
Configuration conf = new Configuration(false);
conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-test.xml"));
/*
* Run the application asynchronously and keep polling for results till timeout.
*/
LocalMode.Controller lc = asyncRun(conf);
waitForOutputTuples();
/*
* Validate the data contents of results.
*/
validateTuples();
lc.shutdown();
} catch (ConstraintViolationException e) {
Assert.fail("constraint violations: " + e.getConstraintViolations());
}
}
开发者ID:DataTorrent,项目名称:app-templates,代码行数:24,代码来源:ApplicationTest.java
示例4: testApplication
import com.datatorrent.api.LocalMode; //导入依赖的package包/类
@Test
public void testApplication() throws Exception
{
try {
// write messages to Kafka topic
Configuration conf = getConfig();
writeToTopic();
// run app asynchronously; terminate after results are checked
LocalMode.Controller lc = asyncRun(conf);
// check for presence of output file
waitForOutputTuples();
// compare output lines to input
compare();
cleanTable();
lc.shutdown();
} catch (ConstraintViolationException e) {
Assert.fail("constraint violations: " + e.getConstraintViolations());
}
}
开发者ID:DataTorrent,项目名称:app-templates,代码行数:25,代码来源:ApplicationTest.java
示例5: testApplication
import com.datatorrent.api.LocalMode; //导入依赖的package包/类
@Test
public void testApplication() throws Exception
{
try {
Configuration conf = new Configuration(false);
conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-test.xml"));
conf.set("dt.operator.fileOutput.prop.filePath", outputDir);
/*
* Run the application asynchronously and keep polling for results till timeout.
*/
LocalMode.Controller lc = asyncRun(conf);
waitForOutputTuples();
cleanTable();
lc.shutdown();
} catch (ConstraintViolationException e) {
Assert.fail("constraint violations: " + e.getConstraintViolations());
}
}
开发者ID:DataTorrent,项目名称:app-templates,代码行数:21,代码来源:ApplicationTest.java
示例6: testApplication
import com.datatorrent.api.LocalMode; //导入依赖的package包/类
@Test
public void testApplication() throws Exception
{
try {
// write messages to Kafka topic
Configuration conf = getConfig();
writeToTopic();
// run app asynchronously; terminate after results are checked
LocalMode.Controller lc = asyncRun(conf);
// check for presence of output file
chkOutput();
// compare output lines to input
compare();
lc.shutdown();
} catch (ConstraintViolationException e) {
Assert.fail("constraint violations: " + e.getConstraintViolations());
}
}
开发者ID:DataTorrent,项目名称:app-templates,代码行数:25,代码来源:ApplicationTest.java
示例7: testApplication
import com.datatorrent.api.LocalMode; //导入依赖的package包/类
@Test
public void testApplication() throws Exception
{
try {
// write messages to Kafka topic
Configuration conf = getConfig();
writeToTopic();
// run app asynchronously; terminate after results are checked
LocalMode.Controller lc = asyncRun(conf);
// check for presence of output file
waitForOutputTuples();
// compare output lines to input
compare();
lc.shutdown();
} catch (ConstraintViolationException e) {
Assert.fail("constraint violations: " + e.getConstraintViolations());
}
}
开发者ID:DataTorrent,项目名称:app-templates,代码行数:25,代码来源:ApplicationTest.java
示例8: testApplicationWithPojoConversion
import com.datatorrent.api.LocalMode; //导入依赖的package包/类
@Test
public void testApplicationWithPojoConversion() throws IOException, Exception
{
try {
FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true);
int cnt = 7;
createReaderInput(cnt);
writeJsonInputFile(new File(FILENAME));
FileInputOperator fileInput = new FileInputOperator();
fileInput.setDirectory(testMeta.dir);
LocalMode lma = LocalMode.newInstance();
Configuration conf = new Configuration(false);
JsonStreamingParserApp streamingParserApp = new JsonStreamingParserApp();
streamingParserApp.setParser(jsonParser);
streamingParserApp.setFileInput(fileInput);
lma.prepareDAG(streamingParserApp, conf);
LocalMode.Controller lc = lma.getController();
lc.run(10000);// runs for 10 seconds and quits
} catch (ConstraintViolationException e) {
Assert.fail("constraint violations: " + e.getConstraintViolations());
}
}
开发者ID:apache,项目名称:apex-malhar,代码行数:23,代码来源:StreamingJsonParserTest.java
示例9: runEmbedded
import com.datatorrent.api.LocalMode; //导入依赖的package包/类
@Override
public void runEmbedded(boolean async, long duration, Callable<Boolean> exitCondition)
{
LocalMode lma = LocalMode.newInstance();
populateDag(lma.getDAG());
DAG dag = lma.getDAG();
LocalMode.Controller lc = lma.getController();
if (lc instanceof StramLocalCluster) {
((StramLocalCluster)lc).setExitCondition(exitCondition);
}
if (async) {
lc.runAsync();
} else {
if (duration >= 0) {
lc.run(duration);
} else {
lc.run();
}
}
}
开发者ID:apache,项目名称:apex-malhar,代码行数:22,代码来源:ApexStreamImpl.java
示例10: testFixedWidthRecords
import com.datatorrent.api.LocalMode; //导入依赖的package包/类
@Test
public void testFixedWidthRecords() throws Exception
{
FixedWidthApplication app = new FixedWidthApplication();
LocalMode lma = LocalMode.newInstance();
Configuration conf = new Configuration(false);
conf.set("dt.operator.S3RecordReaderModuleMock.prop.files", inputDir);
conf.set("dt.operator.S3RecordReaderModuleMock.prop.recordLength", "8");
conf.set("dt.operator.S3RecordReaderModuleMock.prop.blockSize", "3");
conf.set("dt.operator.S3RecordReaderModuleMock.prop.blocksThreshold", "1");
conf.set("dt.operator.S3RecordReaderModuleMock.prop.scanIntervalMillis", "10000");
lma.prepareDAG(app, conf);
LocalMode.Controller lc = lma.getController();
lc.setHeartbeatMonitoringEnabled(true);
lc.runAsync();
LOG.debug("Waiting for app to finish");
Thread.sleep(1000 * 1);
lc.shutdown();
}
开发者ID:apache,项目名称:apex-malhar,代码行数:22,代码来源:S3RecordReaderMockTest.java
示例11: testCouchBaseAppOutput
import com.datatorrent.api.LocalMode; //导入依赖的package包/类
@Test
public void testCouchBaseAppOutput() throws FileNotFoundException, IOException
{
Configuration conf = new Configuration();
InputStream is = new FileInputStream("src/site/conf/dt-site-couchbase.xml");
conf.addResource(is);
conf.get("dt.application.CouchBaseAppOutput.operator.couchbaseOutput.store.uriString");
conf.get("dt.application.CouchBaseAppOutput.operator.couchbaseOutput.store.password");
conf.get("dt.application.CouchBaseAppOutput.operator.couchbaseOutput.store.bucket");
conf.get("dt.application.couchbaseAppOutput.operator.couchbaseOutput.store.max_tuples");
conf.get("dt.application.couchbaseAppOutput.operator.couchbaseOutput.store.queueSize");
conf.get("dt.application.couchbaseAppOutput.operator.couchbaseOutput.store.blocktime");
conf.get("dt.application.couchbaseAppOutput.operator.couchbaseOutput.store.timeout");
LocalMode lm = LocalMode.newInstance();
try {
lm.prepareDAG(new CouchBaseAppOutput(), conf);
LocalMode.Controller lc = lm.getController();
//lc.setHeartbeatMonitoringEnabled(false);
lc.run(20000);
} catch (Exception ex) {
logger.info(ex.getCause());
}
is.close();
}
开发者ID:apache,项目名称:apex-malhar,代码行数:27,代码来源:CouchBaseBenchmarkTest.java
示例12: testCouchBaseAppInput
import com.datatorrent.api.LocalMode; //导入依赖的package包/类
@Test
public void testCouchBaseAppInput() throws FileNotFoundException, IOException
{
Configuration conf = new Configuration();
InputStream is = new FileInputStream("src/site/conf/dt-site-couchbase.xml");
conf.addResource(is);
conf.get("dt.application.CouchBaseAppInput.operator.couchbaseInput.store.uriString");
conf.get("dt.application.CouchBaseAppInput.operator.couchbaseInput.store.blocktime");
conf.get("dt.application.CouchBaseAppInput.operator.couchbaseInput.store.timeout");
conf.get("dt.application.CouchBaseAppInput.operator.couchbaseInput.store.bucket");
conf.get("dt.application.CouchBaseAppInput.operator.couchbaseInput.store.password");
LocalMode lm = LocalMode.newInstance();
try {
lm.prepareDAG(new CouchBaseAppInput(), conf);
LocalMode.Controller lc = lm.getController();
lc.run(20000);
} catch (Exception ex) {
logger.info(ex.getCause());
}
is.close();
}
开发者ID:apache,项目名称:apex-malhar,代码行数:23,代码来源:CouchBaseBenchmarkTest.java
示例13: testEventGeneratorApp
import com.datatorrent.api.LocalMode; //导入依赖的package包/类
@Test
public void testEventGeneratorApp() throws FileNotFoundException, IOException
{
Logger logger = LoggerFactory.getLogger(EventGeneratorAppTest.class);
LocalMode lm = LocalMode.newInstance();
Configuration conf = new Configuration();
InputStream is = new FileInputStream("src/site/conf/dt-site-testbench.xml");
conf.addResource(is);
conf.get("dt.application.EventGeneratorApp.operator.eventGenerator.keysHelper");
conf.get("dt.application.EventGeneratorApp.operator.eventGenerator.weightsHelper");
conf.get("dt.application.EventGeneratorApp.operator.eventGenerator.valuesHelper");
try {
lm.prepareDAG(new EventGeneratorApp(), conf);
LocalMode.Controller lc = lm.getController();
lc.run(20000);
} catch (Exception ex) {
logger.info(ex.getMessage());
}
is.close();
}
开发者ID:apache,项目名称:apex-malhar,代码行数:23,代码来源:EventGeneratorAppTest.java
示例14: testMissingRecordLength
import com.datatorrent.api.LocalMode; //导入依赖的package包/类
@Test(expected = IllegalArgumentException.class)
public void testMissingRecordLength() throws Exception
{
FixedWidthApplication app = new FixedWidthApplication();
LocalMode lma = LocalMode.newInstance();
Configuration conf = new Configuration(false);
conf.set("dt.operator.HDFSRecordReaderModule.prop.files", inputDir);
//Should give IllegalArgumentException since recordLength is not set
//conf.set("dt.operator.HDFSRecordReaderModule.prop.recordLength", "8");
conf.set("dt.operator.HDFSRecordReaderModule.prop.blocksThreshold", "1");
conf.set("dt.operator.HDFSRecordReaderModule.prop.scanIntervalMillis", "10000");
lma.prepareDAG(app, conf);
LocalMode.Controller lc = lma.getController();
lc.setHeartbeatMonitoringEnabled(true);
lc.runAsync();
LOG.debug("Waiting for app to finish");
Thread.sleep(1000 * 1);
lc.shutdown();
}
开发者ID:apache,项目名称:apex-malhar,代码行数:21,代码来源:FSRecordReaderTest.java
示例15: testFilterClassifierApp
import com.datatorrent.api.LocalMode; //导入依赖的package包/类
@Test
public void testFilterClassifierApp() throws FileNotFoundException, IOException
{
Logger logger = LoggerFactory.getLogger(FilteredEventClassifierAppTest.class);
LocalMode lm = LocalMode.newInstance();
Configuration conf = new Configuration();
InputStream is = new FileInputStream("src/site/conf/dt-site-testbench.xml");
conf.addResource(is);
conf.get("dt.application.FilteredEventClassifierApp.operator.hmapOper.keys");
conf.get("dt.application.FilteredEventClassifierApp.operator.hmapOper.numKeys");
try {
lm.prepareDAG(new FilteredEventClassifierApp(), conf);
LocalMode.Controller lc = lm.getController();
lc.run(20000);
} catch (Exception ex) {
logger.info(ex.getMessage());
}
is.close();
}
开发者ID:apache,项目名称:apex-malhar,代码行数:20,代码来源:FilteredEventClassifierAppTest.java
示例16: testThroughputCounterApp
import com.datatorrent.api.LocalMode; //导入依赖的package包/类
@Test
public void testThroughputCounterApp() throws FileNotFoundException, IOException
{
Logger logger = LoggerFactory.getLogger(ThroughputCounterAppTest.class);
LocalMode lm = LocalMode.newInstance();
Configuration conf = new Configuration();
InputStream is = new FileInputStream("src/site/conf/dt-site-testbench.xml");
conf.addResource(is);
conf.get("dt.application.ThroughputCounterApp.operator.hmapOper.keys");
conf.get("dt.application.ThroughputCounterApp.operator.hmapOper.numKeys");
try {
lm.prepareDAG(new ThroughputCounterApp(), conf);
LocalMode.Controller lc = lm.getController();
lc.run(20000);
} catch (Exception ex) {
logger.info(ex.getMessage());
}
is.close();
}
开发者ID:apache,项目名称:apex-malhar,代码行数:20,代码来源:ThroughputCounterAppTest.java
示例17: testEventClassifierApp
import com.datatorrent.api.LocalMode; //导入依赖的package包/类
@Test
public void testEventClassifierApp() throws FileNotFoundException, IOException
{
Logger logger = LoggerFactory.getLogger(EventClassifierAppTest.class);
LocalMode lm = LocalMode.newInstance();
Configuration conf = new Configuration();
InputStream is = new FileInputStream("src/site/conf/dt-site-testbench.xml");
conf.addResource(is);
conf.get("dt.application.EventClassifierApp.operator.hmapOper.keys");
conf.get("dt.application.EventClassifierApp.operator.hmapOper.numKeys");
try {
lm.prepareDAG(new EventClassifierApp(), conf);
LocalMode.Controller lc = lm.getController();
lc.run(20000);
} catch (Exception ex) {
logger.info(ex.getMessage());
}
is.close();
}
开发者ID:apache,项目名称:apex-malhar,代码行数:20,代码来源:EventClassifierAppTest.java
示例18: testEventIncrementerApp
import com.datatorrent.api.LocalMode; //导入依赖的package包/类
@Test
public void testEventIncrementerApp() throws FileNotFoundException, IOException
{
Logger logger = LoggerFactory.getLogger(EventIncrementerAppTest.class);
LocalMode lm = LocalMode.newInstance();
Configuration conf = new Configuration();
InputStream is = new FileInputStream("src/site/conf/dt-site-testbench.xml");
conf.addResource(is);
conf.get("dt.application.EventIncrementerApp.operator.hmapOper.seed");
conf.get("dt.application.EventIncrementerApp.operator.hmapOper.keys");
conf.get("dt.application.EventIncrementerApp.operator.hmapOper.numKeys");
try {
lm.prepareDAG(new EventIncrementerApp(), conf);
LocalMode.Controller lc = lm.getController();
lc.run(20000);
} catch (Exception ex) {
logger.info(ex.getMessage());
}
is.close();
}
开发者ID:apache,项目名称:apex-malhar,代码行数:21,代码来源:EventIncrementerAppTest.java
示例19: testS3FixedWidthRecords
import com.datatorrent.api.LocalMode; //导入依赖的package包/类
@Test
public void testS3FixedWidthRecords() throws Exception
{
S3FixedWidthApplication app = new S3FixedWidthApplication();
LocalMode lma = LocalMode.newInstance();
Configuration conf = new Configuration(false);
conf.set("dt.operator.S3RecordReaderModule.prop.files", files);
conf.set("dt.operator.S3RecordReaderModule.prop.recordLength", "8");
conf.set("dt.operator.S3RecordReaderModule.prop.blocksThreshold", "1");
conf.set("dt.operator.S3RecordReaderModule.prop.scanIntervalMillis", "10000");
lma.prepareDAG(app, conf);
LocalMode.Controller lc = lma.getController();
lc.setHeartbeatMonitoringEnabled(true);
lc.runAsync();
LOG.debug("Waiting for app to finish");
Thread.sleep(1000 * 1);
lc.shutdown();
}
开发者ID:apache,项目名称:apex-malhar,代码行数:21,代码来源:S3RecordReaderModuleAppTest.java
示例20: testBenchmark
import com.datatorrent.api.LocalMode; //导入依赖的package包/类
@Test
public void testBenchmark() throws FileNotFoundException
{
Configuration conf = new Configuration();
InputStream is = new FileInputStream("src/site/conf/dt-site-kafka.xml");
conf.addResource(is);
LocalMode lma = LocalMode.newInstance();
try {
lma.prepareDAG(new KafkaInputBenchmark(), conf);
LocalMode.Controller lc = lma.getController();
lc.run(30000);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
开发者ID:apache,项目名称:apex-malhar,代码行数:18,代码来源:KafkaOutputBenchmarkTest.java
注:本文中的com.datatorrent.api.LocalMode类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论