• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala Collector类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中org.apache.flink.util.Collector的典型用法代码示例。如果您正苦于以下问题:Scala Collector类的具体用法?Scala Collector怎么用?Scala Collector使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Collector类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: JsonTweetParser

//设置package包名称以及导入依赖的类
package flink.parsers

import domain.Tweet
import org.apache.flink.streaming.connectors.json.JSONParseFlatMap
import org.apache.flink.util.Collector

import scala.util.Try

class JsonTweetParser extends JSONParseFlatMap[String,Tweet] {

  override def flatMap(value: String, out: Collector[Tweet]): Unit = {
    out.collect(
      Tweet(
        Try(getString(value, "created_at")).getOrElse(""),
        Try(getString(value, "text")).getOrElse(""),
        Try(getString(value, "user.name")).getOrElse(""),
        Try(getString(value, "user.lang")).getOrElse(""),
        Try(getString(value, "place.name")).getOrElse(""),
        Try(getString(value, "place.country")).getOrElse("")
      )
    )
  }
} 
开发者ID:tquiviger,项目名称:twitter-flink,代码行数:24,代码来源:JsonTweetParser.scala


示例2: subtract

//设置package包名称以及导入依赖的类
package net.sansa_stack.inference.flink.utils

import java.lang.Iterable

import org.apache.flink.api.common.functions.CoGroupFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala.DataSet
import org.apache.flink.util.Collector

import scala.reflect.ClassTag


    def subtract(other: DataSet[T]): DataSet[T] = {
      dataset.coGroup(other).where("*").equalTo("*")(new MinusCoGroupFunction[T](true)).name("subtract")
    }
  }

}

class MinusCoGroupFunction[T: ClassTag: TypeInformation](all: Boolean) extends CoGroupFunction[T, T, T] {
  override def coGroup(first: Iterable[T], second: Iterable[T], out: Collector[T]): Unit = {
    if (first == null || second == null) return
    val leftIter = first.iterator
    val rightIter = second.iterator

    if (all) {
      while (rightIter.hasNext && leftIter.hasNext) {
        leftIter.next()
        rightIter.next()
      }

      while (leftIter.hasNext) {
        out.collect(leftIter.next())
      }
    } else {
      if (!rightIter.hasNext && leftIter.hasNext) {
        out.collect(leftIter.next())
      }
    }
  }
} 
开发者ID:SANSA-Stack,项目名称:SANSA-Inference,代码行数:42,代码来源:DataSetUtils.scala


示例3: Flink

//设置package包名称以及导入依赖的类
package uk.co.bitcat.streaming.flink

import java.util.Properties

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
import org.apache.flink.util.Collector
import uk.co.bitcat.streaming.flink.domain.{Measurement, MeasurementSchema}
import uk.co.bitcat.streaming.flink.watermark.TwoSecondDelayWatermark

object Flink {

  def main(args: Array[String]) {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("group.id", "flink_consumer")

    env
      .addSource(new FlinkKafkaConsumer09[Measurement]("pollution", new MeasurementSchema(), properties))
      .assignTimestampsAndWatermarks(new TwoSecondDelayWatermark())
      .timeWindowAll(Time.seconds(10))
      .apply(
        (0L, 0.0, 0), // (Window End Time, To Store Mean, Count)
        (acc: (Long, Double, Int), m: Measurement) => { (0L, acc._2 + m.pollution, acc._3 + 1) },
        ( window: TimeWindow,
          accs: Iterable[(Long, Double, Int)],
          out: Collector[(Long, Double, Int)] ) =>
        {
          val acc = accs.iterator.next()
          out.collect((window.getEnd, acc._2/acc._3, acc._3))
        }
      )
      .filter(_._2 > 75.0)
      .print()  // Replace with call to custom sink to raise alert for pollution level

    env.execute()
  }

} 
开发者ID:dscook,项目名称:streaming-examples,代码行数:48,代码来源:Flink.scala


示例4: Palindrome

//设置package包名称以及导入依赖的类
package de.tu_berlin.dima.bdapro.flink.palindrome.wladox

import org.apache.flink.api.scala._
import org.apache.flink.util.Collector

object Palindrome {

  def main(args: Array[String]) {

    if (args.length != 1) {
      Console.err.println("Usage: <jar> inputPath")
      System.exit(-1)
    }

    val REGEX = "[_[^\\w\\d]]"

    val inputPath = args(0)

    val myFilter = (s: String) => isPalindrome(s)

    val env = ExecutionEnvironment.getExecutionEnvironment
    val text = env.readTextFile(inputPath)

    val result = text
      //.flatMap(m => m.split("[\\r?\\n]"))
      .filter(myFilter)
      .map(s => MyTuple(s, s.replaceAll(" ", "").length))
      .reduceGroup {
        (in, out: Collector[MyTuple]) =>
          val list = in.toList
          val max = list.maxBy(_.length)
          list foreach (e => if (e.length == max.length) out.collect(e))
      }

    result.print()
  }

  def isPalindrome(s:String):Boolean = {
    val REGEX = "[_[^\\w\\d]]"
    s.nonEmpty && s.replaceAll(REGEX, "") == s.replaceAll(REGEX, "").reverse
  }

  case class MyTuple(text:String, length:Int) {
    override def toString:String= s"The biggest palindrome sentence: <$text>"
  }


} 
开发者ID:cristiprg,项目名称:BDAPRO.GlobalStateML,代码行数:49,代码来源:Palindrome.scala


示例5: Palindrome

//设置package包名称以及导入依赖的类
package de.tu_berlin.dima.bdapro.flink.palindrome.omarflores16

import org.apache.flink.api.scala._
import org.apache.flink.util.Collector

import scala.collection.mutable.Queue

object Palindrome {
  def main(args: Array[String]){

    if (args.length != 1) {
      Console.err.println("Usage: <jar> inputPath outputPath")
      System.exit(-1)
    }

    var maxStringPalindrome = 0
    val inputPath = args(0)
    val env = ExecutionEnvironment.getExecutionEnvironment

    val lineText = env.readTextFile(inputPath)
      .flatMap( _.toLowerCase.trim.split("\\n") filter { _.nonEmpty } )
      .map{ strPalindrome =>  (strPalindrome.size,strPalindrome,palindromeFunc(strPalindrome))}

    for (t <- lineText.max(0).collect())
      maxStringPalindrome = t._1

    val finalTuple = lineText
      .reduceGroup{
        (in, out: Collector[(String)]) =>
          for (t <- in) {
            if (t._3 && t._1 >= maxStringPalindrome)
              out.collect("The biggest palindrome sentence: " + t._2)
          }
      }
    finalTuple.writeAsText("Output")
    env.execute("bdapro-ws1617-flink")
  }

  def palindromeFunc(phrase: String): Boolean ={
    var qPalindrome = new Queue[(Char)]
    var isEqually: Boolean = true

    (phrase).foreach( (w: Char) => if (!w.isSpaceChar)
      qPalindrome.enqueue(w.toLower) )

    while(qPalindrome.size > 1 && isEqually){
      val wFirst = qPalindrome.dequeue().toString
      qPalindrome = qPalindrome.reverse
      val wLast =  qPalindrome.dequeue().toString
      qPalindrome = qPalindrome.reverse

      if (wFirst != wLast)
        isEqually = false
    }

    return isEqually
  }

} 
开发者ID:cristiprg,项目名称:BDAPRO.GlobalStateML,代码行数:60,代码来源:Palindrome.scala


示例6: Palindrome

//设置package包名称以及导入依赖的类
package de.tu_berlin.dima.bdapro.flink.palindrome.plehmann93


import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.util.Collector

import scala.math.Ordering.String
import scala.util.control.Breaks

object Palindrome {



  def main(args: Array[String]) {
    if (args.length != 1) {
      Console.err.println("Usage: <jar> inputPath")
      System.exit(-1)
    }
    val inputPath = args(0)

    val env = ExecutionEnvironment.getExecutionEnvironment
    val result= env.readTextFile(inputPath)
      .flatMap(_.toLowerCase.split("/n"))
      .filter(x => x.replace(" ","")==x.replace(" ","").reverse)
      .map( x => (1,x.replace(" ","").size,x)  )
      .groupBy(0).sortGroup(1,Order.DESCENDING)
      .reduceGroup{ (in, out: Collector[(Int, Int, String)]) =>
      var max: Int = -1
      Breaks.breakable{for (x <- in){
        if(max==(-1)) {max=x._2}
        if(max>x._2){Breaks.break()}
        out.collect(x)
      }}
    }
    result.collect().foreach(x=>Console.println("The biggest palindrome sentence: <"+x._3+">"))
    //env.execute("bdapro-ws1617-flink")
  }
} 
开发者ID:cristiprg,项目名称:BDAPRO.GlobalStateML,代码行数:41,代码来源:Palindrome.scala


示例7: Palindrome

//设置package包名称以及导入依赖的类
package de.tu_berlin.dima.bdapro.flink.palindrome.mschwarzer

import org.apache.flink.api.common.functions.RichFilterFunction
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.util.Collector


object Palindrome {

  def main(args: Array[String]) {
    if (args.length != 1) {
      Console.err.println("Usage: <jar> inputPath")
      System.exit(-1)
    }

    val inputPath = args(0) // Input will contain only lower-case English letters, spaces, newlines and numeric chars

    val env = ExecutionEnvironment.getExecutionEnvironment

    // Read sentences from text file
    val pals = env.readTextFile(inputPath)
      .flatMap{
        (in, out: Collector[(String, Int)]) =>
          val p = in.replaceAll("[^A-Za-z0-9]", "")
          if (p.equals(p.reverse)) { // check for valid palindrome
            out.collect(in, p.length())
          }
      }

    // Save max length in config
    val config = new Configuration()
    config.setInteger("maxLength", pals.max(1).collect().last._2)

    // Filter by max length
    val maxPals = pals
      .filter(new RichFilterFunction[(String, Int)]() {
        var maxLength = 0

        override def open(config: Configuration): Unit = {
          maxLength = config.getInteger("maxLength", 0)
        }

        def filter(in: (String, Int)): Boolean = {
          in._2 == maxLength
        }
      })
      .withParameters(config)
      .collect()

    // Print all left-over sentences
    maxPals.foreach { e =>
      val (sentence, len) = e
      println("The biggest palindrome sentence: <" + sentence + ">")
    }

  }
} 
开发者ID:cristiprg,项目名称:BDAPRO.GlobalStateML,代码行数:59,代码来源:Palindrome.scala


示例8: OddSemordnilaps

//设置package包名称以及导入依赖的类
package de.tu_berlin.dima.bdapro.flink.oddsemordnilaps.mmziyad

import org.apache.flink.api.scala._
import org.apache.flink.util.Collector


object OddSemordnilaps {

  def main(args: Array[String]) {
    if (args.length != 1) {
      Console.err.println("Usage: <jar> inputPath ")
      System.exit(-1)
    }

    val inputPath = args(0)
    val env = ExecutionEnvironment.getExecutionEnvironment

    val data = env.readTextFile(inputPath)
      .flatMap(_.trim.split("\\s").distinct.filter(_.toInt % 2 != 0))
      .map(d => (d, d.length))
      .groupBy(_._2)
      .reduceGroup {
        (in, out: Collector[(String, Int)]) => {
          val li = in.toSet

          for (e <- li) {
            val x = e
            val y = (e._1.reverse, e._2)

            if (((x._1.reverse == x._1)) || li.contains(y))
              out.collect(x._1, 1)
          }
        }
      }.count()

    println("The result is " + data)
    //env.execute("bdapro-ws1617-flink")
  }

} 
开发者ID:cristiprg,项目名称:BDAPRO.GlobalStateML,代码行数:41,代码来源:OddSemordnilaps.scala


示例9: OddSemordnilaps

//设置package包名称以及导入依赖的类
package de.tu_berlin.dima.bdapro.flink.oddsemordnilaps.omarflores16

import org.apache.flink.api.scala._
import org.apache.flink.util.Collector

object OddSemordnilaps {

  def main(args: Array[String]){

    if (args.length != 1) {
      Console.err.println("Usage: <jar> inputPath outputPath")
      System.exit(-1)
    }

    val inputPath = args(0)
    val env = ExecutionEnvironment.getExecutionEnvironment

    val outText = env.readTextFile(inputPath)
      .flatMap( _.toLowerCase.trim.split("\\W+") filter { _.nonEmpty } )
      .map( strNumber => (strNumber.reverse,1,(strNumber.reverse.toInt%2)))
      .groupBy(0)
      .sum(1)
      .reduceGroup{
        (in, out: Collector[(String)]) =>
          var i = 0
          for (t <- in) {
            if (t._3 == 1) i = i +1
          }
          out.collect("The result is " + i)
      }
    outText.print()
  }
} 
开发者ID:cristiprg,项目名称:BDAPRO.GlobalStateML,代码行数:34,代码来源:OddSemordnilaps.scala


示例10: OddSemordnilaps

//设置package包名称以及导入依赖的类
package de.tu_berlin.dima.bdapro.flink.oddsemordnilaps.plehmann93

import org.apache.flink.api.scala._
import org.apache.flink.util.Collector

import scala.util.control.Breaks


object OddSemordnilaps {


  def main(args: Array[String]) {
    if (args.length != 1) {
      Console.err.println("Usage: <jar> inputPath")
      System.exit(-1)
    }

    val inputPath = args(0)

    val env = ExecutionEnvironment.getExecutionEnvironment
    val result=env.readTextFile(inputPath)
      .flatMap(_.toLowerCase.split("\\W+"))
      .filter(x => x.toInt%2==1)
      .map{x=>if (x<x.reverse) (x,x) else (x.reverse,x) }
      .groupBy(0)
      .reduceGroup { (in, out: Collector[(String)]) =>
        var value:String="null"
        Breaks.breakable{ for (x <- in){
          if(value=="null") {
            value=x._2
            if(value==value.reverse) {
              out.collect(value)
              Breaks.break
            }
          }
          else{
            if(value.reverse==x._2){
              out.collect(value)
              out.collect(x._2)
              Breaks.break()
            }
          }
        }
        }}
    Console.println("The result is "+result.collect().size.toString)
    // env.execute("bdapro-ws1617-flink")
  }

} 
开发者ID:cristiprg,项目名称:BDAPRO.GlobalStateML,代码行数:50,代码来源:OddSemordnilaps.scala


示例11: DuplicateFilter

//设置package包名称以及导入依赖的类
package uk.co.pollett.flink.newsreader

import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.util.Collector

object DuplicateFilter {
  val descriptor: ValueStateDescriptor[Boolean] = new ValueStateDescriptor[Boolean]("seen", classOf[Boolean])
}

class DuplicateFilter[T] extends RichFlatMapFunction[T, T] {
  var operatorState: ValueState[Boolean] = _

  override def open(parameters: Configuration): Unit = {
    operatorState = this.getRuntimeContext.getState(DuplicateFilter.descriptor)
  }

  override def flatMap(value: T, out: Collector[T]): Unit = {
    if (!operatorState.value()) {
      out.collect(value)
      operatorState.update(true)
    }
  }
} 
开发者ID:pollett,项目名称:flink-newsreader,代码行数:26,代码来源:DuplicateFilter.scala


示例12: DuplicateFilterSpec

//设置package包名称以及导入依赖的类
package uk.co.pollett.flink.newsreader

import org.apache.flink.api.common.state.ValueState
import org.apache.flink.util.Collector

import org.scalamock.scalatest.MockFactory
import org.scalatest.FlatSpec

class DuplicateFilterSpec extends FlatSpec with MockFactory {
  val data: String = "value"

  val duplicateFilter: DuplicateFilter[String] = new DuplicateFilter[String]()
  val collector: Collector[String] = mock[Collector[String]]

  val dummyOperatorState: ValueState[Boolean] = new ValueState[Boolean] {
    var state: Boolean = false
    override def update(value: Boolean): Unit = { state = value }

    override def value(): Boolean = state

    override def clear(): Unit = { state = false }
  }
  duplicateFilter.operatorState = dummyOperatorState

  "A first send" should "be emitted" in {
    (collector.collect _).expects(data) // output data
    duplicateFilter.flatMap(data, collector)
  }

  "A second send" should "not emit" in {
    (collector.collect _).expects(*).never // shouldn't emit
    duplicateFilter.flatMap(data, collector)
  }
} 
开发者ID:pollett,项目名称:flink-newsreader,代码行数:35,代码来源:DuplicateFilterSpec.scala


示例13: Page

//设置package包名称以及导入依赖的类
package phu.quang.le.batch

import org.apache.flink.api.scala._
import org.apache.flink.util.Collector

case class Page(id: Long, rank: Double)
case class Adjacency(id: Long, neighbors: Array[Long])

object PageRankTest extends App {
  private final val DAMPENING_FACTOR: Double = 0.85
  private final val NUM_VERTICES = 82140L;
  private final val RANDOM_JUMP = (1 - DAMPENING_FACTOR) / NUM_VERTICES;

  val env = ExecutionEnvironment.getExecutionEnvironment

  val rawLines: DataSet[String] = env.fromElements(
    "1 2 3 4",
    "2 1",
    "3 5",
    "4 2 3",
    "5 2 4"
  )

  val adjacency: DataSet[Adjacency] = rawLines
    .map(str => {
      val elements = str.split(' ')
      val id = elements(0).toLong
      val neighbors = elements.slice(1, elements.length).map { _.toLong }
      Adjacency(id, neighbors)
    })

  val initialRanks: DataSet[Page] = adjacency.map { adj => Page(adj.id, 1.0 / NUM_VERTICES) }
  
  val rankContributions = initialRanks.join(adjacency).where("id").equalTo("id") {
    (page, adj, out: Collector[Page]) => {
      val rankPerTarget = DAMPENING_FACTOR * page.rank / adj.neighbors.length
      out.collect(Page(page.id, RANDOM_JUMP))
      for (neighbor <- adj.neighbors) {
        out.collect(Page(neighbor, rankPerTarget))
      }
    }
  }
  rankContributions.print()
  val newRanks = rankContributions.groupBy("id").reduce((a, b) => Page(a.id, a.rank + b.rank))
  newRanks.print()
} 
开发者ID:p-le,项目名称:flink-learning,代码行数:47,代码来源:PageRankTest.scala


示例14: GenerateUnaryBinaryCindCandidates

//设置package包名称以及导入依赖的类
package de.hpi.isg.sodap.rdfind.operators.candidate_generation

import de.hpi.isg.sodap.rdfind.data.Cind
import de.hpi.isg.sodap.rdfind.util.ConditionCodes
import de.hpi.isg.sodap.rdfind.util.ConditionCodes._
import org.apache.flink.util.Collector


class GenerateUnaryBinaryCindCandidates
extends GenerateXxxBinaryCindCandidates {


  override def generateRefinedCinds(knownCinds: IndexedSeq[Cind], output: Cind, out: Collector[Cind]): Unit = {
    // Consider trivial single-single CINDs that were not considered above.
    // E.g., if we have s[p1] < s[o1], then we need to consider that there implicitly is s[p1] < s[p1].
    // Then we also need to create s[p1] < s[p1,o1], which is not trivial.
    this.buffer.foreach { singleSingleCind =>
      val primaryDepCondition = extractPrimaryConditions(singleSingleCind.depCaptureType)
      val primaryRefCondition = extractPrimaryConditions(singleSingleCind.refCaptureType)
      if ((primaryDepCondition != primaryRefCondition) &&
        (extractSecondaryConditions(singleSingleCind.depCaptureType) ==
          extractSecondaryConditions(singleSingleCind.refCaptureType))) {
        output.depCaptureType = singleSingleCind.depCaptureType
        output.depConditionValue1 = singleSingleCind.depConditionValue1
        output.depConditionValue2 = singleSingleCind.depConditionValue2
        output.refCaptureType = merge(singleSingleCind.refCaptureType, singleSingleCind.depCaptureType)
        if (primaryDepCondition < primaryRefCondition) {
          // e.g., s[p=...] < s[o=...] -> s[p=...,o=...]
          output.refConditionValue1 = singleSingleCind.depConditionValue1
          output.refConditionValue2 = singleSingleCind.refConditionValue1
        } else {
          output.refConditionValue1 = singleSingleCind.refConditionValue1
          output.refConditionValue2 = singleSingleCind.depConditionValue1
        }
        out.collect(output)
      }
    }
  }

} 
开发者ID:stratosphere,项目名称:rdfind,代码行数:41,代码来源:GenerateUnaryBinaryCindCandidates.scala


示例15: GenerateBinaryBinaryCindCandidates

//设置package包名称以及导入依赖的类
package de.hpi.isg.sodap.rdfind.operators.candidate_generation

import de.hpi.isg.sodap.rdfind.data.Cind
import de.hpi.isg.sodap.rdfind.util.ConditionCodes
import de.hpi.isg.sodap.rdfind.util.ConditionCodes._
import org.apache.flink.api.common.functions.GroupReduceFunction
import org.apache.flink.util.Collector

import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._


class GenerateBinaryBinaryCindCandidates
extends GenerateXxxBinaryCindCandidates {


  override def generateRefinedCinds(knownCinds: IndexedSeq[Cind], output: Cind, out: Collector[Cind]): Unit = { 
    // Consider trivial double-single CINDs that were not considered above.
    // For instance, s[p=p1,o=o1] < s[o=o1] holds naturally
    // Given s[p=p1, o=o1] < s[p=p2],
    // we would also need to consider s[p=p1,o=o1] < s[p=p2,o=o1].
    this.buffer.foreach { doubleSingleCind =>
      if (ConditionCodes.isSubcode(doubleSingleCind.refCaptureType, doubleSingleCind.depCaptureType)) {
        output.depConditionValue1 = doubleSingleCind.depConditionValue1
        output.depConditionValue2 = doubleSingleCind.depConditionValue2
        output.depCaptureType = doubleSingleCind.depCaptureType
        output.refCaptureType = doubleSingleCind.depCaptureType

        if (ConditionCodes.extractFirstSubcapture(doubleSingleCind.depCaptureType) == doubleSingleCind.refCaptureType) {
          output.refConditionValue1 = doubleSingleCind.refConditionValue1
          output.refConditionValue2 = doubleSingleCind.depConditionValue2
        } else {
          output.refConditionValue1 = doubleSingleCind.depConditionValue1
          output.refConditionValue2 = doubleSingleCind.refConditionValue1
        }
        out.collect(output)
      }
    }
  }

} 
开发者ID:stratosphere,项目名称:rdfind,代码行数:42,代码来源:GenerateBinaryBinaryCindCandidates.scala


示例16: ExtractBalancedHalfApproximateUnaryUnaryOverlapCandidates

//设置package包名称以及导入依赖的类
package de.hpi.isg.sodap.rdfind.operators

import de.hpi.isg.sodap.rdfind.data._
import de.hpi.isg.sodap.rdfind.util.BloomFilterParameters
import org.apache.flink.util.Collector

import scala.collection.mutable.ArrayBuffer


class ExtractBalancedHalfApproximateUnaryUnaryOverlapCandidates(explicitThreshold: Int,
                                                                bloomFilterParameters: BloomFilterParameters[ConditionCount],
                                                                isUseAssociationRules: Boolean)
  extends AbstractExtractBalancedUnaryUnaryOverlapCandidates[HalfApproximateOverlapSet](isUseAssociationRules) {

  private lazy val output = HalfApproximateOverlapSet(0, null, null, 0, null, null)

  private lazy val fuzzyRhsConditionCounts = bloomFilterParameters.createSpectralBloomFilter

  // We do not need this function as we override its more specific sibling.
  override def output(lhsConditionCount: ConditionCount, rhsConditionCounts: ArrayBuffer[ConditionCount],
                      out: Collector[HalfApproximateOverlapSet]): Unit = {

    this.output.update(depConditionCount = lhsConditionCount)
    if (rhsConditionCounts.size > explicitThreshold) {
      rhsConditionCounts.foreach { conditionCount =>
        this.fuzzyRhsConditionCounts.putToSetBatch(conditionCount)
      }
      this.fuzzyRhsConditionCounts.executeSetBatch()
      this.output.rhsConditions = Array()
      this.output.approximateRhsConditions = this.fuzzyRhsConditionCounts.exportBits()
    } else {
      this.output.rhsConditions = rhsConditionCounts.toArray
      this.output.approximateRhsConditions = Array()
    }
    out.collect(this.output)
    this.fuzzyRhsConditionCounts.clear()
  }
} 
开发者ID:stratosphere,项目名称:rdfind,代码行数:39,代码来源:ExtractBalancedHalfApproximateUnaryUnaryOverlapCandidates.scala


示例17: RemoveNonMinimalXxxSingleCinds

//设置package包名称以及导入依赖的类
package de.hpi.isg.sodap.rdfind.operators

import java.lang

import de.hpi.isg.sodap.rdfind.data.Cind
import de.hpi.isg.sodap.rdfind.util.ConditionCodes
import org.apache.flink.api.common.functions.CoGroupFunction
import org.apache.flink.util.Collector

import scala.collection.JavaConversions._
import scala.collection.mutable


class RemoveNonMinimalXxxSingleCinds extends CoGroupFunction[Cind, Cind, Cind] {

  override def coGroup(impliedCinds: lang.Iterable[Cind], implyingCinds: lang.Iterable[Cind],
                       out: Collector[Cind]): Unit = {

    // Build a probing table of referenced captures of the right CINDs.
    val probingTable = new mutable.HashMap[Int, mutable.Set[String]]
    implyingCinds.foreach { implyingCind =>
      val firstSubcaptureCode = ConditionCodes.extractFirstSubcapture(implyingCind.refCaptureType)
      val firstSubcaptureValues = probingTable.getOrElseUpdate(firstSubcaptureCode, new mutable.HashSet[String])
      firstSubcaptureValues += implyingCind.refConditionValue1

      val secondSubcaptureCode = ConditionCodes.extractSecondSubcapture(implyingCind.refCaptureType)
      val secondSubcaptureValues = probingTable.getOrElseUpdate(secondSubcaptureCode, new mutable.HashSet[String])
      secondSubcaptureValues += implyingCind.refConditionValue2
    }

    // Probe the potentially implied CINDs and collect only when probing did not succeed.
    impliedCinds.foreach { impliedCind =>
      probingTable.get(impliedCind.refCaptureType) match {
        case Some(values) => if (!values.contains(impliedCind.refConditionValue1)) out.collect(impliedCind)
        case None => out.collect(impliedCind)
      }
    }

  }
} 
开发者ID:stratosphere,项目名称:rdfind,代码行数:41,代码来源:RemoveNonMinimalXxxSingleCinds.scala


示例18: MergeUnaryConditionEvidences

//设置package包名称以及导入依赖的类
package de.hpi.isg.sodap.rdfind.operators.frequent_conditions

import java.lang.Iterable
import java.util

import de.hpi.isg.sodap.rdfind.data.UnaryConditionEvidence
import org.apache.flink.api.common.functions.RichGroupReduceFunction
import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable
import org.apache.flink.util.Collector

import scala.collection.mutable.ArrayBuffer


@Combinable
class MergeUnaryConditionEvidences extends RichGroupReduceFunction[UnaryConditionEvidence, UnaryConditionEvidence] {

  val tripleIdCollector = ArrayBuffer[Long]()

  val outputEvidence = UnaryConditionEvidence(0, null, 0, null)

  override def reduce(in: Iterable[UnaryConditionEvidence], out: Collector[UnaryConditionEvidence]): Unit = {
    val inIterator: util.Iterator[UnaryConditionEvidence] = in.iterator()
    val firstEvidence: UnaryConditionEvidence = inIterator.next()
    if (!inIterator.hasNext) {
      out.collect(firstEvidence)
      return
    }
    this.outputEvidence.conditionType = firstEvidence.conditionType
    this.outputEvidence.value = firstEvidence.value

    this.tripleIdCollector.clear()
    this.tripleIdCollector ++= firstEvidence.tripleIds
    var count = firstEvidence.count

    while (inIterator.hasNext) {
      val nextEvidence = inIterator.next()
      this.tripleIdCollector ++= nextEvidence.tripleIds
      count += nextEvidence.count
    }
    this.outputEvidence.count = count
    this.outputEvidence.tripleIds = this.tripleIdCollector.toArray

    out.collect(this.outputEvidence)
  }

} 
开发者ID:stratosphere,项目名称:rdfind,代码行数:47,代码来源:MergeUnaryConditionEvidences.scala


示例19: CreateUnaryConditionEvidences

//设置package包名称以及导入依赖的类
package de.hpi.isg.sodap.rdfind.operators.frequent_conditions

import de.hpi.isg.sodap.flink.util.GlobalIdGenerator
import de.hpi.isg.sodap.rdfind.data.{RDFTriple, UnaryConditionEvidence}
import de.hpi.isg.sodap.rdfind.util.ConditionCodes._
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.util.Collector


class CreateUnaryConditionEvidences extends RichFlatMapFunction[RDFTriple, UnaryConditionEvidence] {

  val idGenerator: GlobalIdGenerator = new GlobalIdGenerator(0)

  lazy val output = UnaryConditionEvidence(0, null, 1, new Array[Long](1))

  override def open(parameters: Configuration): Unit = {
    super.open(parameters)

    this.idGenerator.initialize(getRuntimeContext)
  }

  override def flatMap(triple: RDFTriple, out: Collector[UnaryConditionEvidence]): Unit = {
    val tripleId = this.idGenerator.yieldLong()
    this.output.tripleIds(0) = tripleId
    this.output.conditionType = subjectCondition
    this.output.value = triple.subj
    out.collect(output)

    output.conditionType = predicateCondition
    output.value = triple.pred
    out.collect(output)

    output.conditionType = objectCondition
    output.value = triple.obj
    out.collect(output)
  }
} 
开发者ID:stratosphere,项目名称:rdfind,代码行数:39,代码来源:CreateUnaryConditionEvidences.scala


示例20: CreateBalancedUnaryUnaryOverlapCandidates

//设置package包名称以及导入依赖的类
package de.hpi.isg.sodap.rdfind.operators

import de.hpi.isg.sodap.rdfind.data._
import org.apache.flink.util.Collector

import scala.collection.mutable.ArrayBuffer


class CreateBalancedUnaryUnaryOverlapCandidates(isUseAssociationRules: Boolean)
  extends AbstractExtractBalancedUnaryUnaryOverlapCandidates[OverlapSet](isUseAssociationRules) {

  private lazy val output = OverlapSet(0, null, null, 0, null)

  // We do not need this function as we override its more specific sibling.
  override def output(lhsConditionCount: ConditionCount, rhsConditionCounts: ArrayBuffer[ConditionCount],
                      out: Collector[OverlapSet]): Unit = {
    this.output.update(depConditionCount = lhsConditionCount)
    this.output.rhsConditions = rhsConditionCounts.toArray
    out.collect(this.output)
  }
} 
开发者ID:stratosphere,项目名称:rdfind,代码行数:22,代码来源:CreateBalancedUnaryUnaryOverlapCandidates.scala



注:本文中的org.apache.flink.util.Collector类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala ModelProxy类代码示例发布时间:2022-05-23
下一篇:
Scala ActorSubscriber类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap