本文整理汇总了Scala中org.apache.flink.util.Collector类的典型用法代码示例。如果您正苦于以下问题:Scala Collector类的具体用法?Scala Collector怎么用?Scala Collector使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Collector类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: JsonTweetParser
//设置package包名称以及导入依赖的类
package flink.parsers
import domain.Tweet
import org.apache.flink.streaming.connectors.json.JSONParseFlatMap
import org.apache.flink.util.Collector
import scala.util.Try
class JsonTweetParser extends JSONParseFlatMap[String,Tweet] {
override def flatMap(value: String, out: Collector[Tweet]): Unit = {
out.collect(
Tweet(
Try(getString(value, "created_at")).getOrElse(""),
Try(getString(value, "text")).getOrElse(""),
Try(getString(value, "user.name")).getOrElse(""),
Try(getString(value, "user.lang")).getOrElse(""),
Try(getString(value, "place.name")).getOrElse(""),
Try(getString(value, "place.country")).getOrElse("")
)
)
}
}
开发者ID:tquiviger,项目名称:twitter-flink,代码行数:24,代码来源:JsonTweetParser.scala
示例2: subtract
//设置package包名称以及导入依赖的类
package net.sansa_stack.inference.flink.utils
import java.lang.Iterable
import org.apache.flink.api.common.functions.CoGroupFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala.DataSet
import org.apache.flink.util.Collector
import scala.reflect.ClassTag
def subtract(other: DataSet[T]): DataSet[T] = {
dataset.coGroup(other).where("*").equalTo("*")(new MinusCoGroupFunction[T](true)).name("subtract")
}
}
}
class MinusCoGroupFunction[T: ClassTag: TypeInformation](all: Boolean) extends CoGroupFunction[T, T, T] {
override def coGroup(first: Iterable[T], second: Iterable[T], out: Collector[T]): Unit = {
if (first == null || second == null) return
val leftIter = first.iterator
val rightIter = second.iterator
if (all) {
while (rightIter.hasNext && leftIter.hasNext) {
leftIter.next()
rightIter.next()
}
while (leftIter.hasNext) {
out.collect(leftIter.next())
}
} else {
if (!rightIter.hasNext && leftIter.hasNext) {
out.collect(leftIter.next())
}
}
}
}
开发者ID:SANSA-Stack,项目名称:SANSA-Inference,代码行数:42,代码来源:DataSetUtils.scala
示例3: Flink
//设置package包名称以及导入依赖的类
package uk.co.bitcat.streaming.flink
import java.util.Properties
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
import org.apache.flink.util.Collector
import uk.co.bitcat.streaming.flink.domain.{Measurement, MeasurementSchema}
import uk.co.bitcat.streaming.flink.watermark.TwoSecondDelayWatermark
object Flink {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "flink_consumer")
env
.addSource(new FlinkKafkaConsumer09[Measurement]("pollution", new MeasurementSchema(), properties))
.assignTimestampsAndWatermarks(new TwoSecondDelayWatermark())
.timeWindowAll(Time.seconds(10))
.apply(
(0L, 0.0, 0), // (Window End Time, To Store Mean, Count)
(acc: (Long, Double, Int), m: Measurement) => { (0L, acc._2 + m.pollution, acc._3 + 1) },
( window: TimeWindow,
accs: Iterable[(Long, Double, Int)],
out: Collector[(Long, Double, Int)] ) =>
{
val acc = accs.iterator.next()
out.collect((window.getEnd, acc._2/acc._3, acc._3))
}
)
.filter(_._2 > 75.0)
.print() // Replace with call to custom sink to raise alert for pollution level
env.execute()
}
}
开发者ID:dscook,项目名称:streaming-examples,代码行数:48,代码来源:Flink.scala
示例4: Palindrome
//设置package包名称以及导入依赖的类
package de.tu_berlin.dima.bdapro.flink.palindrome.wladox
import org.apache.flink.api.scala._
import org.apache.flink.util.Collector
object Palindrome {
def main(args: Array[String]) {
if (args.length != 1) {
Console.err.println("Usage: <jar> inputPath")
System.exit(-1)
}
val REGEX = "[_[^\\w\\d]]"
val inputPath = args(0)
val myFilter = (s: String) => isPalindrome(s)
val env = ExecutionEnvironment.getExecutionEnvironment
val text = env.readTextFile(inputPath)
val result = text
//.flatMap(m => m.split("[\\r?\\n]"))
.filter(myFilter)
.map(s => MyTuple(s, s.replaceAll(" ", "").length))
.reduceGroup {
(in, out: Collector[MyTuple]) =>
val list = in.toList
val max = list.maxBy(_.length)
list foreach (e => if (e.length == max.length) out.collect(e))
}
result.print()
}
def isPalindrome(s:String):Boolean = {
val REGEX = "[_[^\\w\\d]]"
s.nonEmpty && s.replaceAll(REGEX, "") == s.replaceAll(REGEX, "").reverse
}
case class MyTuple(text:String, length:Int) {
override def toString:String= s"The biggest palindrome sentence: <$text>"
}
}
开发者ID:cristiprg,项目名称:BDAPRO.GlobalStateML,代码行数:49,代码来源:Palindrome.scala
示例5: Palindrome
//设置package包名称以及导入依赖的类
package de.tu_berlin.dima.bdapro.flink.palindrome.omarflores16
import org.apache.flink.api.scala._
import org.apache.flink.util.Collector
import scala.collection.mutable.Queue
object Palindrome {
def main(args: Array[String]){
if (args.length != 1) {
Console.err.println("Usage: <jar> inputPath outputPath")
System.exit(-1)
}
var maxStringPalindrome = 0
val inputPath = args(0)
val env = ExecutionEnvironment.getExecutionEnvironment
val lineText = env.readTextFile(inputPath)
.flatMap( _.toLowerCase.trim.split("\\n") filter { _.nonEmpty } )
.map{ strPalindrome => (strPalindrome.size,strPalindrome,palindromeFunc(strPalindrome))}
for (t <- lineText.max(0).collect())
maxStringPalindrome = t._1
val finalTuple = lineText
.reduceGroup{
(in, out: Collector[(String)]) =>
for (t <- in) {
if (t._3 && t._1 >= maxStringPalindrome)
out.collect("The biggest palindrome sentence: " + t._2)
}
}
finalTuple.writeAsText("Output")
env.execute("bdapro-ws1617-flink")
}
def palindromeFunc(phrase: String): Boolean ={
var qPalindrome = new Queue[(Char)]
var isEqually: Boolean = true
(phrase).foreach( (w: Char) => if (!w.isSpaceChar)
qPalindrome.enqueue(w.toLower) )
while(qPalindrome.size > 1 && isEqually){
val wFirst = qPalindrome.dequeue().toString
qPalindrome = qPalindrome.reverse
val wLast = qPalindrome.dequeue().toString
qPalindrome = qPalindrome.reverse
if (wFirst != wLast)
isEqually = false
}
return isEqually
}
}
开发者ID:cristiprg,项目名称:BDAPRO.GlobalStateML,代码行数:60,代码来源:Palindrome.scala
示例6: Palindrome
//设置package包名称以及导入依赖的类
package de.tu_berlin.dima.bdapro.flink.palindrome.plehmann93
import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.util.Collector
import scala.math.Ordering.String
import scala.util.control.Breaks
object Palindrome {
def main(args: Array[String]) {
if (args.length != 1) {
Console.err.println("Usage: <jar> inputPath")
System.exit(-1)
}
val inputPath = args(0)
val env = ExecutionEnvironment.getExecutionEnvironment
val result= env.readTextFile(inputPath)
.flatMap(_.toLowerCase.split("/n"))
.filter(x => x.replace(" ","")==x.replace(" ","").reverse)
.map( x => (1,x.replace(" ","").size,x) )
.groupBy(0).sortGroup(1,Order.DESCENDING)
.reduceGroup{ (in, out: Collector[(Int, Int, String)]) =>
var max: Int = -1
Breaks.breakable{for (x <- in){
if(max==(-1)) {max=x._2}
if(max>x._2){Breaks.break()}
out.collect(x)
}}
}
result.collect().foreach(x=>Console.println("The biggest palindrome sentence: <"+x._3+">"))
//env.execute("bdapro-ws1617-flink")
}
}
开发者ID:cristiprg,项目名称:BDAPRO.GlobalStateML,代码行数:41,代码来源:Palindrome.scala
示例7: Palindrome
//设置package包名称以及导入依赖的类
package de.tu_berlin.dima.bdapro.flink.palindrome.mschwarzer
import org.apache.flink.api.common.functions.RichFilterFunction
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.util.Collector
object Palindrome {
def main(args: Array[String]) {
if (args.length != 1) {
Console.err.println("Usage: <jar> inputPath")
System.exit(-1)
}
val inputPath = args(0) // Input will contain only lower-case English letters, spaces, newlines and numeric chars
val env = ExecutionEnvironment.getExecutionEnvironment
// Read sentences from text file
val pals = env.readTextFile(inputPath)
.flatMap{
(in, out: Collector[(String, Int)]) =>
val p = in.replaceAll("[^A-Za-z0-9]", "")
if (p.equals(p.reverse)) { // check for valid palindrome
out.collect(in, p.length())
}
}
// Save max length in config
val config = new Configuration()
config.setInteger("maxLength", pals.max(1).collect().last._2)
// Filter by max length
val maxPals = pals
.filter(new RichFilterFunction[(String, Int)]() {
var maxLength = 0
override def open(config: Configuration): Unit = {
maxLength = config.getInteger("maxLength", 0)
}
def filter(in: (String, Int)): Boolean = {
in._2 == maxLength
}
})
.withParameters(config)
.collect()
// Print all left-over sentences
maxPals.foreach { e =>
val (sentence, len) = e
println("The biggest palindrome sentence: <" + sentence + ">")
}
}
}
开发者ID:cristiprg,项目名称:BDAPRO.GlobalStateML,代码行数:59,代码来源:Palindrome.scala
示例8: OddSemordnilaps
//设置package包名称以及导入依赖的类
package de.tu_berlin.dima.bdapro.flink.oddsemordnilaps.mmziyad
import org.apache.flink.api.scala._
import org.apache.flink.util.Collector
object OddSemordnilaps {
def main(args: Array[String]) {
if (args.length != 1) {
Console.err.println("Usage: <jar> inputPath ")
System.exit(-1)
}
val inputPath = args(0)
val env = ExecutionEnvironment.getExecutionEnvironment
val data = env.readTextFile(inputPath)
.flatMap(_.trim.split("\\s").distinct.filter(_.toInt % 2 != 0))
.map(d => (d, d.length))
.groupBy(_._2)
.reduceGroup {
(in, out: Collector[(String, Int)]) => {
val li = in.toSet
for (e <- li) {
val x = e
val y = (e._1.reverse, e._2)
if (((x._1.reverse == x._1)) || li.contains(y))
out.collect(x._1, 1)
}
}
}.count()
println("The result is " + data)
//env.execute("bdapro-ws1617-flink")
}
}
开发者ID:cristiprg,项目名称:BDAPRO.GlobalStateML,代码行数:41,代码来源:OddSemordnilaps.scala
示例9: OddSemordnilaps
//设置package包名称以及导入依赖的类
package de.tu_berlin.dima.bdapro.flink.oddsemordnilaps.omarflores16
import org.apache.flink.api.scala._
import org.apache.flink.util.Collector
object OddSemordnilaps {
def main(args: Array[String]){
if (args.length != 1) {
Console.err.println("Usage: <jar> inputPath outputPath")
System.exit(-1)
}
val inputPath = args(0)
val env = ExecutionEnvironment.getExecutionEnvironment
val outText = env.readTextFile(inputPath)
.flatMap( _.toLowerCase.trim.split("\\W+") filter { _.nonEmpty } )
.map( strNumber => (strNumber.reverse,1,(strNumber.reverse.toInt%2)))
.groupBy(0)
.sum(1)
.reduceGroup{
(in, out: Collector[(String)]) =>
var i = 0
for (t <- in) {
if (t._3 == 1) i = i +1
}
out.collect("The result is " + i)
}
outText.print()
}
}
开发者ID:cristiprg,项目名称:BDAPRO.GlobalStateML,代码行数:34,代码来源:OddSemordnilaps.scala
示例10: OddSemordnilaps
//设置package包名称以及导入依赖的类
package de.tu_berlin.dima.bdapro.flink.oddsemordnilaps.plehmann93
import org.apache.flink.api.scala._
import org.apache.flink.util.Collector
import scala.util.control.Breaks
object OddSemordnilaps {
def main(args: Array[String]) {
if (args.length != 1) {
Console.err.println("Usage: <jar> inputPath")
System.exit(-1)
}
val inputPath = args(0)
val env = ExecutionEnvironment.getExecutionEnvironment
val result=env.readTextFile(inputPath)
.flatMap(_.toLowerCase.split("\\W+"))
.filter(x => x.toInt%2==1)
.map{x=>if (x<x.reverse) (x,x) else (x.reverse,x) }
.groupBy(0)
.reduceGroup { (in, out: Collector[(String)]) =>
var value:String="null"
Breaks.breakable{ for (x <- in){
if(value=="null") {
value=x._2
if(value==value.reverse) {
out.collect(value)
Breaks.break
}
}
else{
if(value.reverse==x._2){
out.collect(value)
out.collect(x._2)
Breaks.break()
}
}
}
}}
Console.println("The result is "+result.collect().size.toString)
// env.execute("bdapro-ws1617-flink")
}
}
开发者ID:cristiprg,项目名称:BDAPRO.GlobalStateML,代码行数:50,代码来源:OddSemordnilaps.scala
示例11: DuplicateFilter
//设置package包名称以及导入依赖的类
package uk.co.pollett.flink.newsreader
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.util.Collector
object DuplicateFilter {
val descriptor: ValueStateDescriptor[Boolean] = new ValueStateDescriptor[Boolean]("seen", classOf[Boolean])
}
class DuplicateFilter[T] extends RichFlatMapFunction[T, T] {
var operatorState: ValueState[Boolean] = _
override def open(parameters: Configuration): Unit = {
operatorState = this.getRuntimeContext.getState(DuplicateFilter.descriptor)
}
override def flatMap(value: T, out: Collector[T]): Unit = {
if (!operatorState.value()) {
out.collect(value)
operatorState.update(true)
}
}
}
开发者ID:pollett,项目名称:flink-newsreader,代码行数:26,代码来源:DuplicateFilter.scala
示例12: DuplicateFilterSpec
//设置package包名称以及导入依赖的类
package uk.co.pollett.flink.newsreader
import org.apache.flink.api.common.state.ValueState
import org.apache.flink.util.Collector
import org.scalamock.scalatest.MockFactory
import org.scalatest.FlatSpec
class DuplicateFilterSpec extends FlatSpec with MockFactory {
val data: String = "value"
val duplicateFilter: DuplicateFilter[String] = new DuplicateFilter[String]()
val collector: Collector[String] = mock[Collector[String]]
val dummyOperatorState: ValueState[Boolean] = new ValueState[Boolean] {
var state: Boolean = false
override def update(value: Boolean): Unit = { state = value }
override def value(): Boolean = state
override def clear(): Unit = { state = false }
}
duplicateFilter.operatorState = dummyOperatorState
"A first send" should "be emitted" in {
(collector.collect _).expects(data) // output data
duplicateFilter.flatMap(data, collector)
}
"A second send" should "not emit" in {
(collector.collect _).expects(*).never // shouldn't emit
duplicateFilter.flatMap(data, collector)
}
}
开发者ID:pollett,项目名称:flink-newsreader,代码行数:35,代码来源:DuplicateFilterSpec.scala
示例13: Page
//设置package包名称以及导入依赖的类
package phu.quang.le.batch
import org.apache.flink.api.scala._
import org.apache.flink.util.Collector
case class Page(id: Long, rank: Double)
case class Adjacency(id: Long, neighbors: Array[Long])
object PageRankTest extends App {
private final val DAMPENING_FACTOR: Double = 0.85
private final val NUM_VERTICES = 82140L;
private final val RANDOM_JUMP = (1 - DAMPENING_FACTOR) / NUM_VERTICES;
val env = ExecutionEnvironment.getExecutionEnvironment
val rawLines: DataSet[String] = env.fromElements(
"1 2 3 4",
"2 1",
"3 5",
"4 2 3",
"5 2 4"
)
val adjacency: DataSet[Adjacency] = rawLines
.map(str => {
val elements = str.split(' ')
val id = elements(0).toLong
val neighbors = elements.slice(1, elements.length).map { _.toLong }
Adjacency(id, neighbors)
})
val initialRanks: DataSet[Page] = adjacency.map { adj => Page(adj.id, 1.0 / NUM_VERTICES) }
val rankContributions = initialRanks.join(adjacency).where("id").equalTo("id") {
(page, adj, out: Collector[Page]) => {
val rankPerTarget = DAMPENING_FACTOR * page.rank / adj.neighbors.length
out.collect(Page(page.id, RANDOM_JUMP))
for (neighbor <- adj.neighbors) {
out.collect(Page(neighbor, rankPerTarget))
}
}
}
rankContributions.print()
val newRanks = rankContributions.groupBy("id").reduce((a, b) => Page(a.id, a.rank + b.rank))
newRanks.print()
}
开发者ID:p-le,项目名称:flink-learning,代码行数:47,代码来源:PageRankTest.scala
示例14: GenerateUnaryBinaryCindCandidates
//设置package包名称以及导入依赖的类
package de.hpi.isg.sodap.rdfind.operators.candidate_generation
import de.hpi.isg.sodap.rdfind.data.Cind
import de.hpi.isg.sodap.rdfind.util.ConditionCodes
import de.hpi.isg.sodap.rdfind.util.ConditionCodes._
import org.apache.flink.util.Collector
class GenerateUnaryBinaryCindCandidates
extends GenerateXxxBinaryCindCandidates {
override def generateRefinedCinds(knownCinds: IndexedSeq[Cind], output: Cind, out: Collector[Cind]): Unit = {
// Consider trivial single-single CINDs that were not considered above.
// E.g., if we have s[p1] < s[o1], then we need to consider that there implicitly is s[p1] < s[p1].
// Then we also need to create s[p1] < s[p1,o1], which is not trivial.
this.buffer.foreach { singleSingleCind =>
val primaryDepCondition = extractPrimaryConditions(singleSingleCind.depCaptureType)
val primaryRefCondition = extractPrimaryConditions(singleSingleCind.refCaptureType)
if ((primaryDepCondition != primaryRefCondition) &&
(extractSecondaryConditions(singleSingleCind.depCaptureType) ==
extractSecondaryConditions(singleSingleCind.refCaptureType))) {
output.depCaptureType = singleSingleCind.depCaptureType
output.depConditionValue1 = singleSingleCind.depConditionValue1
output.depConditionValue2 = singleSingleCind.depConditionValue2
output.refCaptureType = merge(singleSingleCind.refCaptureType, singleSingleCind.depCaptureType)
if (primaryDepCondition < primaryRefCondition) {
// e.g., s[p=...] < s[o=...] -> s[p=...,o=...]
output.refConditionValue1 = singleSingleCind.depConditionValue1
output.refConditionValue2 = singleSingleCind.refConditionValue1
} else {
output.refConditionValue1 = singleSingleCind.refConditionValue1
output.refConditionValue2 = singleSingleCind.depConditionValue1
}
out.collect(output)
}
}
}
}
开发者ID:stratosphere,项目名称:rdfind,代码行数:41,代码来源:GenerateUnaryBinaryCindCandidates.scala
示例15: GenerateBinaryBinaryCindCandidates
//设置package包名称以及导入依赖的类
package de.hpi.isg.sodap.rdfind.operators.candidate_generation
import de.hpi.isg.sodap.rdfind.data.Cind
import de.hpi.isg.sodap.rdfind.util.ConditionCodes
import de.hpi.isg.sodap.rdfind.util.ConditionCodes._
import org.apache.flink.api.common.functions.GroupReduceFunction
import org.apache.flink.util.Collector
import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._
class GenerateBinaryBinaryCindCandidates
extends GenerateXxxBinaryCindCandidates {
override def generateRefinedCinds(knownCinds: IndexedSeq[Cind], output: Cind, out: Collector[Cind]): Unit = {
// Consider trivial double-single CINDs that were not considered above.
// For instance, s[p=p1,o=o1] < s[o=o1] holds naturally
// Given s[p=p1, o=o1] < s[p=p2],
// we would also need to consider s[p=p1,o=o1] < s[p=p2,o=o1].
this.buffer.foreach { doubleSingleCind =>
if (ConditionCodes.isSubcode(doubleSingleCind.refCaptureType, doubleSingleCind.depCaptureType)) {
output.depConditionValue1 = doubleSingleCind.depConditionValue1
output.depConditionValue2 = doubleSingleCind.depConditionValue2
output.depCaptureType = doubleSingleCind.depCaptureType
output.refCaptureType = doubleSingleCind.depCaptureType
if (ConditionCodes.extractFirstSubcapture(doubleSingleCind.depCaptureType) == doubleSingleCind.refCaptureType) {
output.refConditionValue1 = doubleSingleCind.refConditionValue1
output.refConditionValue2 = doubleSingleCind.depConditionValue2
} else {
output.refConditionValue1 = doubleSingleCind.depConditionValue1
output.refConditionValue2 = doubleSingleCind.refConditionValue1
}
out.collect(output)
}
}
}
}
开发者ID:stratosphere,项目名称:rdfind,代码行数:42,代码来源:GenerateBinaryBinaryCindCandidates.scala
示例16: ExtractBalancedHalfApproximateUnaryUnaryOverlapCandidates
//设置package包名称以及导入依赖的类
package de.hpi.isg.sodap.rdfind.operators
import de.hpi.isg.sodap.rdfind.data._
import de.hpi.isg.sodap.rdfind.util.BloomFilterParameters
import org.apache.flink.util.Collector
import scala.collection.mutable.ArrayBuffer
class ExtractBalancedHalfApproximateUnaryUnaryOverlapCandidates(explicitThreshold: Int,
bloomFilterParameters: BloomFilterParameters[ConditionCount],
isUseAssociationRules: Boolean)
extends AbstractExtractBalancedUnaryUnaryOverlapCandidates[HalfApproximateOverlapSet](isUseAssociationRules) {
private lazy val output = HalfApproximateOverlapSet(0, null, null, 0, null, null)
private lazy val fuzzyRhsConditionCounts = bloomFilterParameters.createSpectralBloomFilter
// We do not need this function as we override its more specific sibling.
override def output(lhsConditionCount: ConditionCount, rhsConditionCounts: ArrayBuffer[ConditionCount],
out: Collector[HalfApproximateOverlapSet]): Unit = {
this.output.update(depConditionCount = lhsConditionCount)
if (rhsConditionCounts.size > explicitThreshold) {
rhsConditionCounts.foreach { conditionCount =>
this.fuzzyRhsConditionCounts.putToSetBatch(conditionCount)
}
this.fuzzyRhsConditionCounts.executeSetBatch()
this.output.rhsConditions = Array()
this.output.approximateRhsConditions = this.fuzzyRhsConditionCounts.exportBits()
} else {
this.output.rhsConditions = rhsConditionCounts.toArray
this.output.approximateRhsConditions = Array()
}
out.collect(this.output)
this.fuzzyRhsConditionCounts.clear()
}
}
开发者ID:stratosphere,项目名称:rdfind,代码行数:39,代码来源:ExtractBalancedHalfApproximateUnaryUnaryOverlapCandidates.scala
示例17: RemoveNonMinimalXxxSingleCinds
//设置package包名称以及导入依赖的类
package de.hpi.isg.sodap.rdfind.operators
import java.lang
import de.hpi.isg.sodap.rdfind.data.Cind
import de.hpi.isg.sodap.rdfind.util.ConditionCodes
import org.apache.flink.api.common.functions.CoGroupFunction
import org.apache.flink.util.Collector
import scala.collection.JavaConversions._
import scala.collection.mutable
class RemoveNonMinimalXxxSingleCinds extends CoGroupFunction[Cind, Cind, Cind] {
override def coGroup(impliedCinds: lang.Iterable[Cind], implyingCinds: lang.Iterable[Cind],
out: Collector[Cind]): Unit = {
// Build a probing table of referenced captures of the right CINDs.
val probingTable = new mutable.HashMap[Int, mutable.Set[String]]
implyingCinds.foreach { implyingCind =>
val firstSubcaptureCode = ConditionCodes.extractFirstSubcapture(implyingCind.refCaptureType)
val firstSubcaptureValues = probingTable.getOrElseUpdate(firstSubcaptureCode, new mutable.HashSet[String])
firstSubcaptureValues += implyingCind.refConditionValue1
val secondSubcaptureCode = ConditionCodes.extractSecondSubcapture(implyingCind.refCaptureType)
val secondSubcaptureValues = probingTable.getOrElseUpdate(secondSubcaptureCode, new mutable.HashSet[String])
secondSubcaptureValues += implyingCind.refConditionValue2
}
// Probe the potentially implied CINDs and collect only when probing did not succeed.
impliedCinds.foreach { impliedCind =>
probingTable.get(impliedCind.refCaptureType) match {
case Some(values) => if (!values.contains(impliedCind.refConditionValue1)) out.collect(impliedCind)
case None => out.collect(impliedCind)
}
}
}
}
开发者ID:stratosphere,项目名称:rdfind,代码行数:41,代码来源:RemoveNonMinimalXxxSingleCinds.scala
示例18: MergeUnaryConditionEvidences
//设置package包名称以及导入依赖的类
package de.hpi.isg.sodap.rdfind.operators.frequent_conditions
import java.lang.Iterable
import java.util
import de.hpi.isg.sodap.rdfind.data.UnaryConditionEvidence
import org.apache.flink.api.common.functions.RichGroupReduceFunction
import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable
import org.apache.flink.util.Collector
import scala.collection.mutable.ArrayBuffer
@Combinable
class MergeUnaryConditionEvidences extends RichGroupReduceFunction[UnaryConditionEvidence, UnaryConditionEvidence] {
val tripleIdCollector = ArrayBuffer[Long]()
val outputEvidence = UnaryConditionEvidence(0, null, 0, null)
override def reduce(in: Iterable[UnaryConditionEvidence], out: Collector[UnaryConditionEvidence]): Unit = {
val inIterator: util.Iterator[UnaryConditionEvidence] = in.iterator()
val firstEvidence: UnaryConditionEvidence = inIterator.next()
if (!inIterator.hasNext) {
out.collect(firstEvidence)
return
}
this.outputEvidence.conditionType = firstEvidence.conditionType
this.outputEvidence.value = firstEvidence.value
this.tripleIdCollector.clear()
this.tripleIdCollector ++= firstEvidence.tripleIds
var count = firstEvidence.count
while (inIterator.hasNext) {
val nextEvidence = inIterator.next()
this.tripleIdCollector ++= nextEvidence.tripleIds
count += nextEvidence.count
}
this.outputEvidence.count = count
this.outputEvidence.tripleIds = this.tripleIdCollector.toArray
out.collect(this.outputEvidence)
}
}
开发者ID:stratosphere,项目名称:rdfind,代码行数:47,代码来源:MergeUnaryConditionEvidences.scala
示例19: CreateUnaryConditionEvidences
//设置package包名称以及导入依赖的类
package de.hpi.isg.sodap.rdfind.operators.frequent_conditions
import de.hpi.isg.sodap.flink.util.GlobalIdGenerator
import de.hpi.isg.sodap.rdfind.data.{RDFTriple, UnaryConditionEvidence}
import de.hpi.isg.sodap.rdfind.util.ConditionCodes._
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.util.Collector
class CreateUnaryConditionEvidences extends RichFlatMapFunction[RDFTriple, UnaryConditionEvidence] {
val idGenerator: GlobalIdGenerator = new GlobalIdGenerator(0)
lazy val output = UnaryConditionEvidence(0, null, 1, new Array[Long](1))
override def open(parameters: Configuration): Unit = {
super.open(parameters)
this.idGenerator.initialize(getRuntimeContext)
}
override def flatMap(triple: RDFTriple, out: Collector[UnaryConditionEvidence]): Unit = {
val tripleId = this.idGenerator.yieldLong()
this.output.tripleIds(0) = tripleId
this.output.conditionType = subjectCondition
this.output.value = triple.subj
out.collect(output)
output.conditionType = predicateCondition
output.value = triple.pred
out.collect(output)
output.conditionType = objectCondition
output.value = triple.obj
out.collect(output)
}
}
开发者ID:stratosphere,项目名称:rdfind,代码行数:39,代码来源:CreateUnaryConditionEvidences.scala
示例20: CreateBalancedUnaryUnaryOverlapCandidates
//设置package包名称以及导入依赖的类
package de.hpi.isg.sodap.rdfind.operators
import de.hpi.isg.sodap.rdfind.data._
import org.apache.flink.util.Collector
import scala.collection.mutable.ArrayBuffer
class CreateBalancedUnaryUnaryOverlapCandidates(isUseAssociationRules: Boolean)
extends AbstractExtractBalancedUnaryUnaryOverlapCandidates[OverlapSet](isUseAssociationRules) {
private lazy val output = OverlapSet(0, null, null, 0, null)
// We do not need this function as we override its more specific sibling.
override def output(lhsConditionCount: ConditionCount, rhsConditionCounts: ArrayBuffer[ConditionCount],
out: Collector[OverlapSet]): Unit = {
this.output.update(depConditionCount = lhsConditionCount)
this.output.rhsConditions = rhsConditionCounts.toArray
out.collect(this.output)
}
}
开发者ID:stratosphere,项目名称:rdfind,代码行数:22,代码来源:CreateBalancedUnaryUnaryOverlapCandidates.scala
注:本文中的org.apache.flink.util.Collector类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论