本文整理汇总了Scala中org.apache.flink.api.scala.ExecutionEnvironment类的典型用法代码示例。如果您正苦于以下问题:Scala ExecutionEnvironment类的具体用法?Scala ExecutionEnvironment怎么用?Scala ExecutionEnvironment使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了ExecutionEnvironment类的19个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: FunctionalSyntaxOWLExpressionsDataSetBuilder
//设置package包名称以及导入依赖的类
package net.sansa_stack.owl.flink.dataset
import net.sansa_stack.owl.common.parsing.{FunctionalSyntaxExpressionBuilder, FunctionalSyntaxPrefixParsing}
import net.sansa_stack.owl.flink.hadoop.FunctionalSyntaxInputFormat
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.hadoop.io.{LongWritable, Text}
object FunctionalSyntaxOWLExpressionsDataSetBuilder extends FunctionalSyntaxPrefixParsing {
def build(env: ExecutionEnvironment, filePath: String): OWLExpressionsDataSet = {
import org.apache.flink.api.scala._
val hadoopDataSet: DataSet[(LongWritable, Text)] =
env.readHadoopFile[LongWritable, Text](
new FunctionalSyntaxInputFormat,
classOf[LongWritable],
classOf[Text],
filePath
)
val rawDataSet = hadoopDataSet.map(_._2.toString)
val tmp: Seq[(String, String)] = rawDataSet.filter(isPrefixDeclaration(_)).map(parsePrefix(_)).collect()
val prefixes: Map[String, String] = tmp.toMap
val builder = new FunctionalSyntaxExpressionBuilder(prefixes)
rawDataSet.map(builder.clean(_)).filter(_ != null)
}
}
开发者ID:SANSA-Stack,项目名称:SANSA-OWL,代码行数:30,代码来源:FunctionalSyntaxOWLExpressionsDataSetBuilder.scala
示例2: ManchesterSyntaxOWLAxiomsDataSetBuilder
//设置package包名称以及导入依赖的类
package net.sansa_stack.owl.flink.dataset
import com.typesafe.scalalogging.Logger
import net.sansa_stack.owl.common.parsing.ManchesterSyntaxParsing
import org.apache.flink.api.scala.ExecutionEnvironment
import org.semanticweb.owlapi.io.OWLParserException
import org.semanticweb.owlapi.model.{OWLAxiom, OWLRuntimeException}
object ManchesterSyntaxOWLAxiomsDataSetBuilder extends ManchesterSyntaxParsing {
private val logger = Logger(this.getClass)
def build(env: ExecutionEnvironment, filePath: String): OWLAxiomsDataSet = {
val res =
ManchesterSyntaxOWLExpressionsDataSetBuilder.buildAndGetPrefixes(env, filePath)
val expressionsDataSet = res._1
val prefixes = res._2
val defaultPrefix = prefixes.getOrElse(ManchesterSyntaxParsing._empty,
ManchesterSyntaxParsing.dummyURI)
import org.apache.flink.api.scala._
expressionsDataSet.filter(!_.startsWith("Annotations")).flatMap(frame => {
try makeAxioms(frame, defaultPrefix)
catch {
case exception: OWLParserException => {
val msg = exception.getMessage
logger.warn("Parser error for frame\n" + frame + "\n\n" + msg)
// exception.printStackTrace()
Set.empty[OWLAxiom]
}
case exception: OWLRuntimeException => {
val msg = exception.getMessage
logger.warn("Parser error for frame\n" + frame + "\n\n" + msg)
exception.printStackTrace()
Set.empty[OWLAxiom]
}
}
}).filter(_ != null)
}
}
开发者ID:SANSA-Stack,项目名称:SANSA-OWL,代码行数:43,代码来源:ManchesterSyntaxOWLAxiomsDataSetBuilder.scala
示例3: ManchesterSyntaxOWLExpressionsDataSetBuilder
//设置package包名称以及导入依赖的类
package net.sansa_stack.owl.flink.dataset
import net.sansa_stack.owl.common.parsing.{ManchesterSyntaxExpressionBuilder, ManchesterSyntaxPrefixParsing}
import net.sansa_stack.owl.flink.hadoop.ManchesterSyntaxInputFormat
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.hadoop.io.{LongWritable, Text}
object ManchesterSyntaxOWLExpressionsDataSetBuilder extends ManchesterSyntaxPrefixParsing {
def build(env: ExecutionEnvironment, filePath: String): OWLExpressionsDataSet = {
buildAndGetPrefixes(env, filePath)._1
}
private[dataset] def buildAndGetPrefixes(env: ExecutionEnvironment,
filePath: String): (OWLExpressionsDataSet, Map[String, String]) = {
import org.apache.flink.api.scala._
val hadoopDataSet: DataSet[(LongWritable, Text)] =
env.readHadoopFile[LongWritable, Text](
new ManchesterSyntaxInputFormat,
classOf[LongWritable],
classOf[Text],
filePath
)
val rawDataSet = hadoopDataSet.map(_._2.toString)
val tmp: Seq[(String, String)] = rawDataSet.filter(isPrefixDeclaration(_)).map(parsePrefix(_)).collect()
val prefixes: Map[String, String] = tmp.toMap
val builder = new ManchesterSyntaxExpressionBuilder(prefixes)
(rawDataSet.map(builder.clean(_)).filter(_ != null), prefixes)
}
}
开发者ID:SANSA-Stack,项目名称:SANSA-OWL,代码行数:35,代码来源:ManchesterSyntaxOWLExpressionsDataSetBuilder.scala
示例4: FunctionalSyntaxOWLExpressionsDataSetBuilderTest
//设置package包名称以及导入依赖的类
package net.sansa_stack.owl.flink.dataset
import org.apache.flink.api.scala.ExecutionEnvironment
import org.scalatest.FunSuite
class FunctionalSyntaxOWLExpressionsDataSetBuilderTest extends FunSuite {
lazy val env = ExecutionEnvironment.getExecutionEnvironment
var _dataSet: OWLExpressionsDataSet = null
def dataSet: OWLExpressionsDataSet = {
if (_dataSet == null) {
_dataSet = FunctionalSyntaxOWLExpressionsDataSetBuilder.build(
env, "src/test/resources/ont_functional.owl")
}
_dataSet
}
test("There should be three annotation lines with full URIs") {
val res: List[String] = dataSet.filter(line => line.startsWith("Annotation(")).collect().toList
val expected = List(
"Annotation(<http://ex.com/foo#hasName> \"Name\")",
"Annotation(<http://ex.com/bar#hasTitle> \"Title\")",
"""Annotation(<http://ex.com/default#description> "A longer
description running over
several lines")""")
assert(res.size == 3)
for (e <- expected) {
assert(res.contains(e))
}
}
// test("There should be an import statement") {
// val res = rdd.filter(line => line.startsWith("Import")).collect()
// assert(res.length == 1)
// assert(res(0) == "Import(<http://www.example.com/my/2.0>)")
// }
test("There should not be any empty lines") {
val res = dataSet.filter(line => line.trim.isEmpty)
assert(res.count() == 0)
}
test("There should not be any comment lines") {
val res = dataSet.filter(line => line.trim.startsWith("#"))
assert(res.count() == 0)
}
test("There should be a DisjointObjectProperties axiom") {
val res = dataSet.filter(line => line.trim.startsWith("DisjointObjectProperties"))
assert(res.count() == 1)
}
test("The total number of axioms should be correct") {
val total = 70 // = 71 - uncommented Import(...)
assert(dataSet.count() == total)
}
}
开发者ID:SANSA-Stack,项目名称:SANSA-OWL,代码行数:60,代码来源:FunctionalSyntaxOWLExpressionsDataSetBuilderTest.scala
示例5: BroadcastVariables
//设置package包名称以及导入依赖的类
package com.zjlp.face.flink.examples.batch
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.java.DataSet
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import scala.collection.JavaConversions._
object BroadcastVariables {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
//1?????????
val toBroadcast = env.fromElements(1, 2, 3)
val data = env.fromElements("a", "b")
data.map(new RichMapFunction[String, String] {
var broadcastSet: Traversable[String] = null
override def map(in: String): String = {
in + broadcastSet
}
override def open(parameters: Configuration): Unit = {
//?Collection???????????
broadcastSet = getRuntimeContext.getBroadcastVariable[String]("myBroadcast")
}
})
.withBroadcastSet(toBroadcast, "myBroadcast") //?????
.print()
}
}
开发者ID:luciuschina,项目名称:flink-examples,代码行数:32,代码来源:BroadcastVariables.scala
示例6: TripleWriter
//设置package包名称以及导入依赖的类
package net.sansa_stack.examples.flink.rdf
import scala.collection.mutable
import org.apache.flink.api.scala.ExecutionEnvironment
import net.sansa_stack.rdf.flink.data.{RDFGraphLoader, RDFGraphWriter}
object TripleWriter {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println(
"Usage: Triple writer <input> <output>")
System.exit(1)
}
val input = args(0) //"src/main/resources/rdf.nt"
val output = args(1)
val optionsList = args.drop(2).map { arg =>
arg.dropWhile(_ == '-').split('=') match {
case Array(opt, v) => (opt -> v)
case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
}
}
val options = mutable.Map(optionsList: _*)
options.foreach {
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
}
println("======================================")
println("| Triple writer example |")
println("======================================")
val env = ExecutionEnvironment.getExecutionEnvironment
val rdfgraph = RDFGraphLoader.loadFromFile(input, env)
RDFGraphWriter.writeToFile(rdfgraph, output)
env.execute(s"Triple writer example ($input)")
}
}
开发者ID:SANSA-Stack,项目名称:SANSA-Examples,代码行数:39,代码来源:TripleWriter.scala
示例7: TripleOps
//设置package包名称以及导入依赖的类
package net.sansa_stack.examples.flink.rdf
import scala.collection.mutable
import org.apache.flink.api.scala.ExecutionEnvironment
import net.sansa_stack.rdf.flink.data.RDFGraphLoader
import org.apache.flink.api.scala._
object TripleOps {
def main(args: Array[String]) {
if (args.length < 1) {
System.err.println(
"Usage: Triple Ops <input>")
System.exit(1)
}
val input = args(0) // "src/main/resources/rdf.nt"
val optionsList = args.drop(1).map { arg =>
arg.dropWhile(_ == '-').split('=') match {
case Array(opt, v) => (opt -> v)
case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
}
}
val options = mutable.Map(optionsList: _*)
options.foreach {
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
}
println("======================================")
println("| Triple Ops example |")
println("======================================")
val env = ExecutionEnvironment.getExecutionEnvironment
val rdfgraph = RDFGraphLoader.loadFromFile(input, env)
rdfgraph.triples.collect().take(4).foreach(println(_))
//Triples filtered by subject ( "http://dbpedia.org/resource/Charles_Dickens" )
println("All triples related to Dickens:\n" + rdfgraph.find(Some("http://commons.dbpedia.org/resource/Category:Places"), None, None).collect().mkString("\n"))
//Triples filtered by predicate ( "http://dbpedia.org/ontology/influenced" )
println("All triples for predicate influenced:\n" + rdfgraph.find(None, Some("http://dbpedia.org/ontology/influenced"), None).collect().mkString("\n"))
//Triples filtered by object ( <http://dbpedia.org/resource/Henry_James> )
println("All triples influenced by Henry_James:\n" + rdfgraph.find(None, None, Some("<http://dbpedia.org/resource/Henry_James>")).collect().mkString("\n"))
//println("Number of triples: " + rdfgraph.triples.distinct.count())
println("Number of subjects: " + rdfgraph.getSubjects.map(_.toString).distinct().count)
println("Number of predicates: " + rdfgraph.getPredicates.map(_.toString).distinct.count())
println("Number of objects: " + rdfgraph.getPredicates.map(_.toString).distinct.count())
}
}
开发者ID:SANSA-Stack,项目名称:SANSA-Examples,代码行数:50,代码来源:TripleOps.scala
示例8: RDFStats
//设置package包名称以及导入依赖的类
package net.sansa_stack.examples.flink.rdf
import scala.collection.mutable
import java.io.File
import org.apache.flink.api.scala.ExecutionEnvironment
import net.sansa_stack.rdf.flink.data.RDFGraphLoader
import net.sansa_stack.rdf.flink.stats.RDFStatistics
object RDFStats {
def main(args: Array[String]) = {
if (args.length < 2) {
System.err.println(
"Usage: RDF Statistics <input> <output>")
System.exit(1)
}
val input = args(0) //"src/main/resources/rdf.nt"
val rdf_stats_file = new File(input).getName
val output = args(1)
val optionsList = args.drop(1).map { arg =>
arg.dropWhile(_ == '-').split('=') match {
case Array(opt, v) => (opt -> v)
case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
}
}
val options = mutable.Map(optionsList: _*)
options.foreach {
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
}
println("======================================")
println("| RDF Statistic example |")
println("======================================")
val env = ExecutionEnvironment.getExecutionEnvironment
val rdfgraph = RDFGraphLoader.loadFromFile(input, env)
// compute criterias
val rdf_statistics = RDFStatistics(rdfgraph, env)
val stats = rdf_statistics.run()
rdf_statistics.voidify(stats, rdf_stats_file, output)
}
}
开发者ID:SANSA-Stack,项目名称:SANSA-Examples,代码行数:46,代码来源:RDFStats.scala
示例9: TripleReader
//设置package包名称以及导入依赖的类
package net.sansa_stack.examples.flink.rdf
import scala.collection.mutable
import org.apache.flink.api.scala.ExecutionEnvironment
import net.sansa_stack.rdf.flink.data.RDFGraphLoader
object TripleReader {
def main(args: Array[String]) {
if (args.length < 1) {
System.err.println(
"Usage: Triple reader <input>")
System.exit(1)
}
val input = args(0)
val optionsList = args.drop(1).map { arg =>
arg.dropWhile(_ == '-').split('=') match {
case Array(opt, v) => (opt -> v)
case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
}
}
val options = mutable.Map(optionsList: _*)
options.foreach {
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
}
println("======================================")
println("| Triple reader example |")
println("======================================")
val env = ExecutionEnvironment.getExecutionEnvironment
val rdfgraph = RDFGraphLoader.loadFromFile(input, env)
rdfgraph.triples.first(10).print()
}
}
开发者ID:SANSA-Stack,项目名称:SANSA-Examples,代码行数:37,代码来源:TripleReader.scala
示例10: RDFByModularityClustering
//设置package包名称以及导入依赖的类
package net.sansa_stack.examples.flink.ml.clustering
import scala.collection.mutable
import net.sansa_stack.ml.flink.clustering.{ RDFByModularityClustering => RDFByModularityClusteringAlg }
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
object RDFByModularityClustering {
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println(
"Usage: RDFByModularityClustering <input> <output> <numIterations>")
System.exit(1)
}
val graphFile = args(0) //"src/main/resources/Clustering_sampledata.nt"
val outputFile = args(1) //"src/main/resources/output"
val numIterations = args(2).toInt // 5
val optionsList = args.drop(3).map { arg =>
arg.dropWhile(_ == '-').split('=') match {
case Array(opt, v) => (opt -> v)
case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
}
}
val options = mutable.Map(optionsList: _*)
options.foreach {
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
}
println("============================================")
println("| RDF By Modularity Clustering example |")
println("============================================")
val env = ExecutionEnvironment.getExecutionEnvironment
RDFByModularityClusteringAlg(env, numIterations, graphFile, outputFile)
}
}
开发者ID:SANSA-Stack,项目名称:SANSA-Examples,代码行数:38,代码来源:RDFByModularityClustering.scala
示例11:
//设置package包名称以及导入依赖的类
package net.sansa_stack.ml.flink
import scala.collection.mutable
import org.apache.flink.api.scala.ExecutionEnvironment
import net.sansa_stack.ml.flink.clustering.RDFByModularityClustering
val graphFile = "Clustering_sampledata.nt" // args(0)
val outputFile = "output" //args(1)
val numIterations = 5 //args(2).toInt
val optionsList = args.drop(3).map { arg =>
arg.dropWhile(_ == '-').split('=') match {
case Array(opt, v) => (opt -> v)
case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
}
}
val options = mutable.Map(optionsList: _*)
options.foreach {
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
}
println("============================================")
println("| RDF By Modularity Clustering example |")
println("============================================")
val env = ExecutionEnvironment.getExecutionEnvironment
RDFByModularityClustering(env, numIterations, graphFile, outputFile)
}
}
开发者ID:SANSA-Stack,项目名称:SANSA-ML,代码行数:32,代码来源:App.scala
示例12: RDFGraphTestCase
//设置package包名称以及导入依赖的类
package net.sansa_stack.inference.flink
import java.util
import java.util.Comparator
import com.google.common.collect.ComparisonChain
import net.sansa_stack.inference.flink.data.RDFGraph
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import net.sansa_stack.inference.data.RDFTriple
import scala.collection.JavaConverters._
@RunWith(classOf[Parameterized])
class RDFGraphTestCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
@Test
def testSubtract(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
// generate dataset
val g1 = RDFGraph(env.fromCollection(
Seq(
RDFTriple("s1", "p1", "o1"),
RDFTriple("s1", "p1", "o2"),
RDFTriple("s1", "p1", "o3")
)
))
val g2 = RDFGraph(env.fromCollection(
Seq(
RDFTriple("s1", "p1", "o1"),
RDFTriple("s1", "p1", "o2")
)
))
// compute
val g_diff = g1.subtract(g2)
val result = g_diff.triples.collect()
val expected = Seq(
RDFTriple("s1", "p1", "o3")
)
TestBaseUtils.compareResultCollections(new util.ArrayList(result.asJava), new util.ArrayList(expected.asJava), new Comparator[RDFTriple] {
override def compare(t1: RDFTriple, t2: RDFTriple): Int =
ComparisonChain.start()
.compare(t1.s, t2.s)
.compare(t1.p, t2.p)
.compare(t1.o, t2.o)
.result()
})
}
}
开发者ID:SANSA-Stack,项目名称:SANSA-Inference,代码行数:61,代码来源:RDFGraphTestCase.scala
示例13: reasoner
//设置package包名称以及导入依赖的类
package net.sansa_stack.inference.flink.conformance
import net.sansa_stack.inference.flink.forwardchaining.{ForwardRuleReasoner, ForwardRuleReasonerRDFS}
import net.sansa_stack.inference.rules.RDFSLevel
import org.apache.flink.api.scala.ExecutionEnvironment
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.scalatest.{BeforeAndAfterAll, Suite}
@RunWith(classOf[Parameterized])
trait SharedRDFSReasonerContext extends BeforeAndAfterAll with ReasonerContextProvider{
self: Suite =>
@transient private var _reasoner: ForwardRuleReasonerRDFS = _
def reasoner: ForwardRuleReasoner = _reasoner
@transient private var _env: ExecutionEnvironment = _
def env: ExecutionEnvironment = _env
override def beforeAll(): Unit = {
super.beforeAll()
_env = ExecutionEnvironment.getExecutionEnvironment
_env.getConfig.disableSysoutLogging()
_reasoner = new ForwardRuleReasonerRDFS(env)
_reasoner.level = RDFSLevel.SIMPLE
}
}
开发者ID:SANSA-Stack,项目名称:SANSA-Inference,代码行数:30,代码来源:SharedRDFSReasonerContext.scala
示例14: env
//设置package包名称以及导入依赖的类
package net.sansa_stack.inference.flink.conformance
import net.sansa_stack.inference.flink.forwardchaining.ForwardRuleReasoner
import org.apache.flink.api.scala.ExecutionEnvironment
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import net.sansa_stack.inference.flink.forwardchaining.ForwardRuleReasonerOWLHorst
import org.scalatest.{BeforeAndAfterAll, Suite}
@RunWith(classOf[Parameterized])
trait SharedOWLHorstReasonerContext extends BeforeAndAfterAll with ReasonerContextProvider{
self: Suite =>
@transient private var _reasoner: ForwardRuleReasonerOWLHorst = _
val reasoner: ForwardRuleReasoner = _reasoner
@transient private var _env: ExecutionEnvironment = _
def env: ExecutionEnvironment = _env
override def beforeAll(): Unit = {
super.beforeAll()
_env = ExecutionEnvironment.getExecutionEnvironment
_reasoner = new ForwardRuleReasonerOWLHorst(env)
}
}
开发者ID:SANSA-Stack,项目名称:SANSA-Inference,代码行数:29,代码来源:SharedOWLHorstReasonerContext.scala
示例15: TripleReader
//设置package包名称以及导入依赖的类
package net.sansa_stack.entityrank.flink.io
import java.io.InputStream
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala.DataSet
import org.apache.flink.api.scala._
import net.sansa_stack.entityrank.flink.utils.Logging
import net.sansa_stack.entityrank.flink.model.Triples
import org.openjena.riot.RiotReader
import org.openjena.riot.Lang
object TripleReader extends Logging {
def parseTriples(fn: String) = {
val triples = RiotReader.createIteratorTriples(new StringInputStream(fn), Lang.NTRIPLES, "http://example/base").next
Triples(triples.getSubject(), triples.getPredicate(), triples.getObject())
}
def loadFromFile(path: String, env: ExecutionEnvironment): DataSet[Triples] = {
val triples =
env.readTextFile(path)
.filter(line => !line.trim().isEmpty & !line.startsWith("#"))
.map(f => parseTriples(path))
triples
}
def loadSFromFile(path: String, env: ExecutionEnvironment): DataSet[(String, String, String)] = {
logger.info("loading triples from disk...")
val startTime = System.currentTimeMillis()
val tr =
env.readTextFile(path)
.filter(line => !line.trim().isEmpty & !line.startsWith("#"))
val triples = tr
.map(line => line.replace(">", "").replace("<", "").split("\\s+")) // line to tokens
.map(triple => (triple(0), triple(1), triple(2))) // tokens to triple
logger.info("finished loading " + triples.count() + " triples in " + (System.currentTimeMillis() - startTime) + "ms.")
triples
}
}
class StringInputStream(s: String) extends InputStream {
private val bytes = s.getBytes
private var pos = 0
override def read(): Int = if (pos >= bytes.length) {
-1
} else {
val r = bytes(pos)
pos += 1
r.toInt
}
}
开发者ID:SANSA-Stack,项目名称:SANSA-RDF-EntityRank,代码行数:56,代码来源:TripleReader.scala
示例16: Palindrome
//设置package包名称以及导入依赖的类
package de.tu_berlin.dima.bdapro.flink.palindrome.plehmann93
import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.util.Collector
import scala.math.Ordering.String
import scala.util.control.Breaks
object Palindrome {
def main(args: Array[String]) {
if (args.length != 1) {
Console.err.println("Usage: <jar> inputPath")
System.exit(-1)
}
val inputPath = args(0)
val env = ExecutionEnvironment.getExecutionEnvironment
val result= env.readTextFile(inputPath)
.flatMap(_.toLowerCase.split("/n"))
.filter(x => x.replace(" ","")==x.replace(" ","").reverse)
.map( x => (1,x.replace(" ","").size,x) )
.groupBy(0).sortGroup(1,Order.DESCENDING)
.reduceGroup{ (in, out: Collector[(Int, Int, String)]) =>
var max: Int = -1
Breaks.breakable{for (x <- in){
if(max==(-1)) {max=x._2}
if(max>x._2){Breaks.break()}
out.collect(x)
}}
}
result.collect().foreach(x=>Console.println("The biggest palindrome sentence: <"+x._3+">"))
//env.execute("bdapro-ws1617-flink")
}
}
开发者ID:cristiprg,项目名称:BDAPRO.GlobalStateML,代码行数:41,代码来源:Palindrome.scala
示例17: OddSemordnilaps
//设置package包名称以及导入依赖的类
package de.tu_berlin.dima.bdapro.flink.oddsemordnilaps.royd1990
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.ExecutionEnvironment
object OddSemordnilaps {
def main(args: Array[String]) {
if (args.length != 1) {
Console.err.println("Usage: <jar> inputPath outputPath")
System.exit(-1)
}
val inputPath = args(0)
val env = ExecutionEnvironment.getExecutionEnvironment
val count1 = env.readTextFile(inputPath).flatMap{_.split("\\W+")}.map { i => i.toInt }.distinct().filter(_%2==1)
val count2 = count1.map{x=>x.toString.reverse }.map{x=> x.toInt}
val count3 = count2.union(count1).map { (_, 1) }.groupBy(0).sum(1).filter((x) => x._2 == 2)
print(count3.count)
}
}
开发者ID:cristiprg,项目名称:BDAPRO.GlobalStateML,代码行数:19,代码来源:OddSemordnilaps.scala
示例18: PassingParameters2Functions1
//设置package包名称以及导入依赖的类
package com.daxin.batch
import org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
object PassingParameters2Functions1 {
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
val toFilter = env.fromElements(1, 2, 3)
class MyFilter(limit: Int) extends FilterFunction[Int] {
override def filter(value: Int): Boolean = {
value > limit
}
}
val result =toFilter.filter(new MyFilter(2))
result.print()
}
}
开发者ID:Dax1n,项目名称:flinkdevelop,代码行数:24,代码来源:PassingParameters2Functions1.scala
示例19: SinkProc
//设置package包名称以及导入依赖的类
package elastic
import java.net.{InetAddress, InetSocketAddress}
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import java.util
import scala.collection.mutable
/**
* Created by I311352 on 3/30/2017.
*/
object SinkProc extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// checkpointing is required for exactly-once or at-least-once guarantees
//env.enableCheckpointing(...)
val connectionConfig = new RMQConnectionConfig.Builder()
.setHost("10.128.165.206")
.setPort(5672)
.setUserName("guest")
.setPassword("guest")
.build()
val stream = env
.addSource(new RMQSource[String](
connectionConfig, // config for the RabbitMQ connection
"ElasticRabbit", // name of the RabbitMQ queue to consume
true, // use correlation ids; can be false if only at-least-once is required
new SimpleStringSchema)) // deserialization schema to turn messages into Java objects
.setParallelism(1) // non-parallel source is only required for exactly-once
val config = new util.HashMap[String, String]
config.put("cluster.name", "SAP-AnyWhere-ES-EShop")
val transports = new util.ArrayList[String]
transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300))
transports.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));
//stream.addSink()
}
开发者ID:compasses,项目名称:elastic-spark,代码行数:46,代码来源:SinkProc.scala
注:本文中的org.apache.flink.api.scala.ExecutionEnvironment类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论