本文整理汇总了Java中cascading.tap.Tap类的典型用法代码示例。如果您正苦于以下问题:Java Tap类的具体用法?Java Tap怎么用?Java Tap使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
Tap类属于cascading.tap包,在下文中一共展示了Tap类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: sinkConfInit
import cascading.tap.Tap; //导入依赖的package包/类
@Override
public void sinkConfInit(FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
conf.setOutputFormat(EsOutputFormat.class);
// define an output dir to prevent Cascading from setting up a TempHfs and overriding the OutputFormat
Settings set = loadSettings(conf, false);
Log log = LogFactory.getLog(EsTap.class);
InitializationUtils.setValueWriterIfNotSet(set, CascadingValueWriter.class, log);
InitializationUtils.setValueReaderIfNotSet(set, JdkValueReader.class, log);
InitializationUtils.setBytesConverterIfNeeded(set, CascadingLocalBytesConverter.class, log);
InitializationUtils.setFieldExtractorIfNotSet(set, CascadingFieldExtractor.class, log);
// NB: we need to set this property even though it is not being used - and since and URI causes problem, use only the resource/file
//conf.set("mapred.output.dir", set.getTargetUri() + "/" + set.getTargetResource());
HadoopCfgUtils.setFileOutputFormatDir(conf, set.getResourceWrite());
HadoopCfgUtils.setOutputCommitterClass(conf, EsOutputFormat.EsOldAPIOutputCommitter.class.getName());
if (log.isTraceEnabled()) {
log.trace("Initialized (sink) configuration " + HadoopCfgUtils.asProperties(conf));
}
}
开发者ID:xushjie1987,项目名称:es-hadoop-v2.2.0,代码行数:23,代码来源:EsHadoopScheme.java
示例2: testUpdateOnlyScript
import cascading.tap.Tap; //导入依赖的package包/类
@Test
public void testUpdateOnlyScript() throws Exception {
Properties properties = new TestSettings().getProperties();
properties.put(ConfigurationOptions.ES_WRITE_OPERATION, "update");
properties.put(ConfigurationOptions.ES_MAPPING_ID, "number");
properties.put(ConfigurationOptions.ES_INDEX_AUTO_CREATE, "yes");
properties.put(ConfigurationOptions.ES_UPDATE_RETRY_ON_CONFLICT, "3");
properties.put(ConfigurationOptions.ES_UPDATE_SCRIPT, "counter = 3");
properties.put(ConfigurationOptions.ES_UPDATE_SCRIPT_LANG, "groovy");
properties.put(ConfigurationOptions.ES_INPUT_JSON, "yes");
Tap in = sourceTap();
// use an existing id to allow the update to succeed
Tap out = new EsTap("json-cascading-local/createwithid", new Fields("line"));
Pipe pipe = new Pipe("copy");
build(properties, in, out, pipe);
}
开发者ID:xushjie1987,项目名称:es-hadoop-v2.2.0,代码行数:19,代码来源:AbstractCascadingLocalJsonSaveTest.java
示例3: testUpdateOnlyParamScript
import cascading.tap.Tap; //导入依赖的package包/类
@Test
public void testUpdateOnlyParamScript() throws Exception {
Properties properties = new TestSettings().getProperties();
properties.put(ConfigurationOptions.ES_WRITE_OPERATION, "update");
properties.put(ConfigurationOptions.ES_MAPPING_ID, "number");
properties.put(ConfigurationOptions.ES_INDEX_AUTO_CREATE, "yes");
properties.put(ConfigurationOptions.ES_UPDATE_RETRY_ON_CONFLICT, "3");
properties.put(ConfigurationOptions.ES_UPDATE_SCRIPT, "counter = param1; anothercounter = param2");
properties.put(ConfigurationOptions.ES_UPDATE_SCRIPT_LANG, "groovy");
properties.put(ConfigurationOptions.ES_UPDATE_SCRIPT_PARAMS, " param1:<1>, param2:number ");
properties.put(ConfigurationOptions.ES_INPUT_JSON, "yes");
Tap in = sourceTap();
// use an existing id to allow the update to succeed
Tap out = new EsTap("json-cascading-local/createwithid", new Fields("line"));
Pipe pipe = new Pipe("copy");
build(properties, in, out, pipe);
}
开发者ID:xushjie1987,项目名称:es-hadoop-v2.2.0,代码行数:20,代码来源:AbstractCascadingLocalJsonSaveTest.java
示例4: testUpdateOnlyParamJsonScript
import cascading.tap.Tap; //导入依赖的package包/类
@Test
public void testUpdateOnlyParamJsonScript() throws Exception {
Properties properties = new TestSettings().getProperties();
properties.put(ConfigurationOptions.ES_WRITE_OPERATION, "update");
properties.put(ConfigurationOptions.ES_MAPPING_ID, "number");
properties.put(ConfigurationOptions.ES_UPDATE_RETRY_ON_CONFLICT, "3");
properties.put(ConfigurationOptions.ES_UPDATE_SCRIPT, "counter = param1; anothercounter = param2");
properties.put(ConfigurationOptions.ES_UPDATE_SCRIPT_LANG, "groovy");
properties.put(ConfigurationOptions.ES_UPDATE_SCRIPT_PARAMS_JSON, "{ \"param1\":1, \"param2\":2}");
properties.put(ConfigurationOptions.ES_INPUT_JSON, "yes");
Tap in = sourceTap();
// use an existing id to allow the update to succeed
Tap out = new EsTap("json-cascading-local/createwithid", new Fields("line"));
Pipe pipe = new Pipe("copy");
build(properties, in, out, pipe);
}
开发者ID:xushjie1987,项目名称:es-hadoop-v2.2.0,代码行数:20,代码来源:AbstractCascadingLocalJsonSaveTest.java
示例5: testUpsertParamScript
import cascading.tap.Tap; //导入依赖的package包/类
@Test
public void testUpsertParamScript() throws Exception {
Properties properties = new TestSettings().getProperties();
properties.put(ConfigurationOptions.ES_WRITE_OPERATION, "upsert");
properties.put(ConfigurationOptions.ES_MAPPING_ID, "number");
properties.put(ConfigurationOptions.ES_INPUT_JSON, "yes");
properties.put(ConfigurationOptions.ES_UPDATE_SCRIPT, "counter += param1; anothercounter += param2");
properties.put(ConfigurationOptions.ES_UPDATE_SCRIPT_LANG, "groovy");
properties.put(ConfigurationOptions.ES_UPDATE_SCRIPT_PARAMS, " param1:<1>, param2:number ");
Tap in = sourceTap();
// use an existing id to allow the update to succeed
Tap out = new EsTap("json-cascading-local/upsert-param-script", new Fields("line"));
Pipe pipe = new Pipe("copy");
build(properties, in, out, pipe);
}
开发者ID:xushjie1987,项目名称:es-hadoop-v2.2.0,代码行数:18,代码来源:AbstractCascadingLocalJsonSaveTest.java
示例6: testUpsertParamJsonScript
import cascading.tap.Tap; //导入依赖的package包/类
@Test
public void testUpsertParamJsonScript() throws Exception {
Properties properties = new TestSettings().getProperties();
properties.put(ConfigurationOptions.ES_WRITE_OPERATION, "upsert");
properties.put(ConfigurationOptions.ES_MAPPING_ID, "number");
properties.put(ConfigurationOptions.ES_INPUT_JSON, "yes");
properties.put(ConfigurationOptions.ES_UPDATE_SCRIPT, "ctx._source.counter += param1; ctx._source.anothercounter += param2");
properties.put(ConfigurationOptions.ES_UPDATE_SCRIPT_LANG, "groovy");
properties.put(ConfigurationOptions.ES_UPDATE_SCRIPT_PARAMS_JSON, "{ \"param1\":1, \"param2\":2}");
Tap in = sourceTap();
// use an existing id to allow the update to succeed
Tap out = new EsTap("json-cascading-local/upsert-script-json-script", new Fields("line"));
Pipe pipe = new Pipe("copy");
build(properties, in, out, pipe);
}
开发者ID:xushjie1987,项目名称:es-hadoop-v2.2.0,代码行数:18,代码来源:AbstractCascadingLocalJsonSaveTest.java
示例7: testUpdateOnlyParamScript
import cascading.tap.Tap; //导入依赖的package包/类
@Test
public void testUpdateOnlyParamScript() throws Exception {
Properties properties = new TestSettings().getProperties();
properties.put(ConfigurationOptions.ES_WRITE_OPERATION, "update");
properties.put(ConfigurationOptions.ES_MAPPING_ID, "id");
properties.put(ConfigurationOptions.ES_INDEX_AUTO_CREATE, "yes");
properties.put(ConfigurationOptions.ES_UPDATE_RETRY_ON_CONFLICT, "3");
properties.put(ConfigurationOptions.ES_UPDATE_SCRIPT, "counter = param1; anothercounter = param2");
properties.put(ConfigurationOptions.ES_UPDATE_SCRIPT_LANG, "groovy");
properties.put(ConfigurationOptions.ES_UPDATE_SCRIPT_PARAMS, " param1:<1>, param2:id ");
Tap in = sourceTap();
// use an existing id to allow the update to succeed
Tap out = new EsTap("cascading-local/createwithid", new Fields("id", "name", "url", "picture"));
Pipe pipe = new Pipe("copy");
build(properties, in, out, pipe);
}
开发者ID:xushjie1987,项目名称:es-hadoop-v2.2.0,代码行数:19,代码来源:AbstractCascadingLocalSaveTest.java
示例8: testUpdateOnlyParamJsonScript
import cascading.tap.Tap; //导入依赖的package包/类
@Test
public void testUpdateOnlyParamJsonScript() throws Exception {
Properties properties = new TestSettings().getProperties();
properties.put(ConfigurationOptions.ES_WRITE_OPERATION, "update");
properties.put(ConfigurationOptions.ES_MAPPING_ID, "id");
properties.put(ConfigurationOptions.ES_UPDATE_RETRY_ON_CONFLICT, "3");
properties.put(ConfigurationOptions.ES_UPDATE_SCRIPT, "counter = param1; anothercounter = param2");
properties.put(ConfigurationOptions.ES_UPDATE_SCRIPT_LANG, "groovy");
properties.put(ConfigurationOptions.ES_UPDATE_SCRIPT_PARAMS_JSON, "{ \"param1\":1, \"param2\":2}");
Tap in = sourceTap();
// use an existing id to allow the update to succeed
Tap out = new EsTap("cascading-local/createwithid", new Fields("id", "name", "url", "picture"));
Pipe pipe = new Pipe("copy");
build(properties, in, out, pipe);
}
开发者ID:xushjie1987,项目名称:es-hadoop-v2.2.0,代码行数:19,代码来源:AbstractCascadingLocalSaveTest.java
示例9: testUpsertParamScript
import cascading.tap.Tap; //导入依赖的package包/类
@Test
public void testUpsertParamScript() throws Exception {
Properties properties = new TestSettings().getProperties();
properties.put(ConfigurationOptions.ES_WRITE_OPERATION, "upsert");
properties.put(ConfigurationOptions.ES_MAPPING_ID, "id");
properties.put(ConfigurationOptions.ES_UPDATE_SCRIPT, "counter += param1; anothercounter += param2");
properties.put(ConfigurationOptions.ES_UPDATE_SCRIPT_LANG, "groovy");
properties.put(ConfigurationOptions.ES_UPDATE_SCRIPT_PARAMS, " param1:<1>, param2:id ");
Tap in = sourceTap();
// use an existing id to allow the update to succeed
Tap out = new EsTap("cascading-local/upsert-param-script", new Fields("id", "name", "url", "picture"));
Pipe pipe = new Pipe("copy");
build(properties, in, out, pipe);
}
开发者ID:xushjie1987,项目名称:es-hadoop-v2.2.0,代码行数:18,代码来源:AbstractCascadingLocalSaveTest.java
示例10: testUpsertParamJsonScript
import cascading.tap.Tap; //导入依赖的package包/类
@Test
public void testUpsertParamJsonScript() throws Exception {
Properties properties = new TestSettings().getProperties();
properties.put(ConfigurationOptions.ES_WRITE_OPERATION, "upsert");
properties.put(ConfigurationOptions.ES_MAPPING_ID, "id");
properties.put(ConfigurationOptions.ES_UPDATE_SCRIPT, "counter += param1; anothercounter += param2");
properties.put(ConfigurationOptions.ES_UPDATE_SCRIPT_LANG, "groovy");
properties.put(ConfigurationOptions.ES_UPDATE_SCRIPT_PARAMS_JSON, "{ \"param1\":1, \"param2\":2}");
Tap in = sourceTap();
// use an existing id to allow the update to succeed
Tap out = new EsTap("cascading-local/upsert-script-json-script", new Fields("id", "name", "url", "picture"));
Pipe pipe = new Pipe("copy");
build(properties, in, out, pipe);
}
开发者ID:xushjie1987,项目名称:es-hadoop-v2.2.0,代码行数:18,代码来源:AbstractCascadingLocalSaveTest.java
示例11: viewsPerHour
import cascading.tap.Tap; //导入依赖的package包/类
@SuppressWarnings("rawtypes")
public static void viewsPerHour() throws IOException {
Tap normalizedByUrl = dataTap(
FileUtils.prepareResultsPath("normalized-by-url", false, false));
// first query part aggregates views by url and hour
Subquery hourlyRollup = new Subquery("?url", "?hour-bucket", "?hour-count")
.predicate(normalizedByUrl, "_", "?fact")
.predicate(new ExtractPageViewFields(), "?fact").out("?url", "?time")
.predicate(new ToHour(), "?time").out("?hour-bucket")
.predicate(new jcascalog.op.Count(), "?hour-count")
.predicate(new Debug(), "?url", "?hour-bucket", "?hour-count").out("?one");
// sink into stdout in absence of serving layer db
Api.execute(new StdoutTap(),
new Subquery("?url", "?granularity", "?bucket", "?bucket-count")
.predicate(hourlyRollup, "?url", "?hour-bucket", "?hour-count")
.predicate(new ToGranularityBuckets(), "?hour-bucket").out("?granularity", "?bucket")
.predicate(new jcascalog.op.Sum(), "?hour-count").out("?bucket-count"));
}
开发者ID:zirpins,项目名称:bdelab,代码行数:21,代码来源:SimpleBatchWorkflow.java
示例12: countFacts
import cascading.tap.Tap; //导入依赖的package包/类
@SuppressWarnings("rawtypes")
public static void countFacts() throws Exception {
// Erzeuge Tap aus Master Pail
Tap masterDataset = dataTap(FileUtils.prepareMasterFactsPath(false,false));
// Die Query erzeugt nur einen Zählerwert
query = new Subquery("?count")
// Master PailTap generiert Tupel nur mit Data Element
.predicate(masterDataset, "_", "?raw")
// Aggregation aller Tupel als Zählerwert
.predicate(new Count(), "?count");
// prepare result path
String resultPath = FileUtils.prepareResultsPath("count-facts-3",true,false);
// Query ausführen; Ergebnisse gehen als Textzeilen in HDFS File
Api.execute(Api.hfsTextline(resultPath), query);
}
开发者ID:zirpins,项目名称:bdelab,代码行数:19,代码来源:CountFacts3.java
示例13: validateFields
import cascading.tap.Tap; //导入依赖的package包/类
/**
* Method to validate Fields passed present in the headers.
*/
protected boolean validateFields(FlowProcess<JobConf> flowProcess, Tap tap, Fields sourceFields) {
CSVRecord headerRecord = getHeaderRecord(flowProcess, tap);
if (sourceFields.size() > headerRecord.size()) {
return false;
}
List<String> recordList = new ArrayList<String>();
for (int i = 0; i < headerRecord.size(); i++) {
recordList.add(headerRecord.get(i));
}
for (int i = 0; i < sourceFields.size(); i++) {
if (!recordList.contains(sourceFields.get(i))) {
return false;
}
}
return true;
}
开发者ID:datascienceinc,项目名称:cascading.csv,代码行数:25,代码来源:CsvScheme.java
示例14: fieldsCountGreaterThanColumnsTest
import cascading.tap.Tap; //导入依赖的package包/类
/**
* Tests if correct number of input fields are provided.
*/
@Test(expected = RuntimeException.class)
public void fieldsCountGreaterThanColumnsTest() {
String sourcePath = "src/test/resources/input/with-headers.txt";
String sinkPath = "src/test/resources/output/sink-with-headers";
FlowConnector connector = new Hadoop2MR1FlowConnector();
CSVFormat sourceFormat = CSVFormat.newFormat(',')
.withQuote('"')
.withEscape('\\')
.withRecordSeparator('\n');
CSVFormat sinkFormat = CSVFormat.newFormat('\t')
.withSkipHeaderRecord()
.withEscape('\\')
.withRecordSeparator('\n');
Fields sourceFields = new Fields("id", "last name", "first name", "phone");
Tap source = new Hfs(new CsvScheme(sourceFields, sourceFormat), sourcePath);
Tap sink = new Hfs(new CsvScheme(sinkFormat), sinkPath);
Pipe pipe = new Pipe("pipe");
connector.connect(source, sink, pipe).complete();
}
开发者ID:datascienceinc,项目名称:cascading.csv,代码行数:29,代码来源:CsvSchemeTest.java
示例15: testWhenExtraColumnsStrict
import cascading.tap.Tap; //导入依赖的package包/类
@Test(expected = FlowException.class)
public void testWhenExtraColumnsStrict() throws Exception {
String sourcePath = "src/test/resources/input/with-extra-columns.txt";
String sinkPath = "src/test/resources/input/sink-with-headers";
FlowConnector connector = new Hadoop2MR1FlowConnector();
CSVFormat sourceFormat = CSVFormat.newFormat('\t')
.withHeader("id", "first name", "last name", "city", "zip")
.withQuote('"')
.withEscape('\\')
.withRecordSeparator('\n');
CSVFormat sinkFormat = CSVFormat.newFormat('\t')
.withEscape('\\')
.withRecordSeparator('\n');
Tap source = new Hfs(new CsvScheme(sourceFormat, true), sourcePath);
Tap sink = new Hfs(new CsvScheme(sinkFormat), sinkPath, SinkMode.REPLACE);
Pipe pipe = new Pipe("pipe");
connector.connect(source, sink, pipe).complete();
}
开发者ID:datascienceinc,项目名称:cascading.csv,代码行数:24,代码来源:CsvSchemeTest.java
示例16: testWhenExtraColumnsStrictNoHeaders
import cascading.tap.Tap; //导入依赖的package包/类
@Test(expected = FlowException.class)
public void testWhenExtraColumnsStrictNoHeaders() throws Exception {
String sourcePath = "src/test/resources/input/with-extra-columns-no-header.txt";
String sinkPath = "src/test/resources/input/sink-no-headers";
FlowConnector connector = new Hadoop2MR1FlowConnector();
CSVFormat sourceFormat = CSVFormat.newFormat('\t')
.withQuote('"')
.withEscape('\\')
.withRecordSeparator('\n');
CSVFormat sinkFormat = CSVFormat.newFormat('\t')
.withEscape('\\')
.withRecordSeparator('\n');
Tap source = new Hfs(new CsvScheme(sourceFormat, true), sourcePath);
Tap sink = new Hfs(new CsvScheme(sinkFormat), sinkPath, SinkMode.REPLACE);
Pipe pipe = new Pipe("pipe");
connector.connect(source, sink, pipe).complete();
}
开发者ID:datascienceinc,项目名称:cascading.csv,代码行数:23,代码来源:CsvSchemeTest.java
示例17: testSchemeFields
import cascading.tap.Tap; //导入依赖的package包/类
/**
* Helper method used for assertion of fields generated by CsvScheme.
*/
@SuppressWarnings("unchecked")
private void testSchemeFields(String sourcePath, CsvScheme sourceSchema, String sinkPath, CsvScheme sinkScheme, Set<String> expected) {
Tap source = new Hfs(sourceSchema, sourcePath);
Tap sink = new Hfs(sinkScheme, sinkPath);
Pipe pipe = new Pipe("pipe");
FlowConnector connector = new Hadoop2MR1FlowConnector();
connector.connect(source, sink, pipe).complete();
Fields sinkFields = sink.getSinkFields();
for (int i = 0; i < sinkFields.size(); i++) {
assertTrue("Unexpected column " + sinkFields.get(i), expected.contains(sinkFields.get(i)));
expected.remove(sinkFields.get(i));
}
assertTrue("Not all expected values are found", expected.isEmpty());
}
开发者ID:datascienceinc,项目名称:cascading.csv,代码行数:23,代码来源:CsvSchemeTest.java
示例18: testPaths
import cascading.tap.Tap; //导入依赖的package包/类
/**
* Tests the content of an output path against the given expected path.
*/
@SuppressWarnings("unchecked")
private void testPaths(String actual, String expected) throws Exception {
Tap outputTest = new Hfs(new TextLine(), actual);
Tap expectedTest = new Hfs(new TextLine(), expected);
FlowProcess outputProcess = new HadoopFlowProcess(new JobConf(new Configuration()));
FlowProcess expectedProcess = new HadoopFlowProcess(new JobConf(new Configuration()));
TupleEntryIterator outputIterator = outputTest.openForRead(outputProcess);
TupleEntryIterator expectedIterator = expectedTest.openForRead(expectedProcess);
List<String> outputList = new ArrayList<>();
while (outputIterator.hasNext()) {
outputList.add(outputIterator.next().getTuple().getString(1));
}
List<String> expectedList = new ArrayList<>();
while (expectedIterator.hasNext()) {
expectedList.add(expectedIterator.next().getTuple().getString(1));
}
assertTrue(outputList.equals(expectedList));
}
开发者ID:datascienceinc,项目名称:cascading.csv,代码行数:29,代码来源:CsvSchemeTest.java
示例19: BottomUpNoSplitConsecutiveBoundariesExpressionGraph
import cascading.tap.Tap; //导入依赖的package包/类
public BottomUpNoSplitConsecutiveBoundariesExpressionGraph()
{
super( SearchOrder.ReverseTopological );
this.arc(
or(
new FlowElementExpression( Boundary.class, TypeExpression.Topo.LinearOut ),
new FlowElementExpression( Tap.class, TypeExpression.Topo.LinearOut ),
new FlowElementExpression( Group.class, TypeExpression.Topo.LinearOut )
),
PathScopeExpression.ANY,
new BoundariesElementExpression( ElementCapture.Primary )
);
}
开发者ID:dataArtisans,项目名称:cascading-flink,代码行数:17,代码来源:BottomUpBoundariesNodePartitioner.java
示例20: SinkTapGraph
import cascading.tap.Tap; //导入依赖的package包/类
public SinkTapGraph() {
super(SearchOrder.ReverseTopological);
arc(
not(
OrElementExpression.or(
new FlowElementExpression(Extent.class),
new FlowElementExpression(Boundary.class)
)
),
ScopeExpression.ANY,
new FlowElementExpression(ElementCapture.Primary, Tap.class)
);
}
开发者ID:dataArtisans,项目名称:cascading-flink,代码行数:17,代码来源:BoundaryBeforeSinkTapTransformer.java
注:本文中的cascading.tap.Tap类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论