本文整理汇总了Scala中org.apache.flink.configuration.Configuration类的典型用法代码示例。如果您正苦于以下问题:Scala Configuration类的具体用法?Scala Configuration怎么用?Scala Configuration使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Configuration类的16个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: BroadcastVariables
//设置package包名称以及导入依赖的类
package com.zjlp.face.flink.examples.batch
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.java.DataSet
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import scala.collection.JavaConversions._
object BroadcastVariables {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
//1?????????
val toBroadcast = env.fromElements(1, 2, 3)
val data = env.fromElements("a", "b")
data.map(new RichMapFunction[String, String] {
var broadcastSet: Traversable[String] = null
override def map(in: String): String = {
in + broadcastSet
}
override def open(parameters: Configuration): Unit = {
//?Collection???????????
broadcastSet = getRuntimeContext.getBroadcastVariable[String]("myBroadcast")
}
})
.withBroadcastSet(toBroadcast, "myBroadcast") //?????
.print()
}
}
开发者ID:luciuschina,项目名称:flink-examples,代码行数:32,代码来源:BroadcastVariables.scala
示例2: PassParamToFun
//设置package包名称以及导入依赖的类
package com.zjlp.face.flink.examples.batch
import org.apache.flink.api.common.functions.{RichFilterFunction, FilterFunction}
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
object PassParamToFun {
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
val toFilter = env.fromElements(1, 2, 3)
//????????????????
toFilter.filter(new MyFilter(2)).print
//?????????
val c = new Configuration()
c.setInteger("limit", 2)
//??????withParameters(Configuration)???????
toFilter.filter(new RichFilterFunction[Int] {
var limit = 0
override def filter(t: Int): Boolean = {
t > limit
}
override def open(config: Configuration): Unit = {
limit = config.getInteger("limit", 0)
}
}).withParameters(c).print
//??????ExecutionConfig????
env.getConfig.setGlobalJobParameters(c)
toFilter.filter(new RichFilterFunction[Int] {
var limit = 0
override def filter(t: Int): Boolean = {
t > limit
}
override def open(config: Configuration): Unit = {
limit = (getRuntimeContext.getExecutionConfig.getGlobalJobParameters.asInstanceOf[Configuration]).getInteger("limit", 0);
}
}).print
}
}
class MyFilter(limit: Int) extends FilterFunction[Int] {
override def filter(t: Int): Boolean = {
t > limit
}
}
开发者ID:luciuschina,项目名称:flink-examples,代码行数:53,代码来源:PassParamToFun.scala
示例3: ConsoleReporterTestJob
//设置package包名称以及导入依赖的类
package com.jgrier.flinkstuff.jobs
import com.jgrier.flinkstuff.sources.IntegerSource
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.api.scala._
object ConsoleReporterTestJob {
def main(args: Array[String]) {
val config = new Configuration()
config.setString("metrics.reporters", "consoleReporter")
config.setString("metrics.reporter.consoleReporter.class", "com.jgrier.flinkstuff.metrics.ConsoleReporter")
config.setString("metrics.reporter.consoleReporter.interval", "10 SECONDS")
val env = new StreamExecutionEnvironment(new LocalStreamEnvironment(config))
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream = env.addSource(new IntegerSource(100))
stream
.timeWindowAll(Time.seconds(1))
.sum(0)
.print
env.execute("ConsoleReporterTestJob")
}
}
开发者ID:jgrier,项目名称:flink-stuff,代码行数:31,代码来源:ConsoleReporterTestJob.scala
示例4: BroadcastJob
//设置package包名称以及导入依赖的类
package de.tu_berlin.dima.experiments.flink.broadcast
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.FileSystem.WriteMode
import scala.collection.convert.wrapAsScala._
object BroadcastJob {
val BYTES_PER_LONG = 8
val BYTES_PER_MB = 1024 * 1024
def main(args: Array[String]) {
if (args.length != 3) {
System.err.println("Not enough parameters")
System.err.println("Usage: <jar> [number of map tasks] [size of broadcast in MB] [output path]")
System.exit(-1)
}
val numMapTasks = args(0).toInt
val vectorSizeInMB = args(1).toLong
val outputPath = args(2)
val numVectorElements = vectorSizeInMB * BYTES_PER_MB / BYTES_PER_LONG
val env = ExecutionEnvironment.getExecutionEnvironment
// generate a NumberSequence to map over
// one number per task/dop
val matrix: DataSet[Long] = env
.fromParallelCollection(new NumberSequenceIteratorWrapper(1, numMapTasks))
.setParallelism(numMapTasks)
.name(s"Generate mapper dataset [1..$numMapTasks]")
// generate the broadcast DataSet
val vector: DataSet[Long] = env
.fromParallelCollection(new NumberSequenceIteratorWrapper(1, numVectorElements))
.setParallelism(1)
.name(s"Generate broadcast vector (${vectorSizeInMB}mb)")
val result: DataSet[Long] = matrix.map(new RichMapFunction[Long, Long] {
var bcastVector: scala.collection.mutable.Buffer[Long] = null
override def open(conf: Configuration) {
bcastVector = getRuntimeContext.getBroadcastVariable[Long]("bVector")
}
override def map(value: Long): Long = {
Math.max(bcastVector.last, value)
}
}).withBroadcastSet(vector, "bVector")
result.writeAsText(outputPath, WriteMode.OVERWRITE)
env.execute(s"Broadcast Job - dop: $numMapTasks, broadcast vector (mb): $vectorSizeInMB")
}
}
开发者ID:TU-Berlin-DIMA,项目名称:flink-broadcast,代码行数:61,代码来源:BroadcastJob.scala
示例5: RangePSLogicWithClose
//设置package包名称以及导入依赖的类
package hu.sztaki.ilab.ps.server
import hu.sztaki.ilab.ps.{ParameterServer, ParameterServerLogic}
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.configuration.Configuration
class RangePSLogicWithClose[P](featureCount: Int, paramInit: => Int => P, paramUpdate: => (P, P) => P) extends ParameterServerLogic[P, (Int, P)] {
var startIndex = 0
var params: Array[Option[P]] = _
@transient lazy val init: (Int) => P = paramInit
@transient lazy val update: (P, P) => P = paramUpdate
override def onPullRecv(id: Int, workerPartitionIndex: Int, ps: ParameterServer[P, (Int, P)]): Unit = {
if (id - startIndex < 0) {
println(id)
println(params.mkString("[", ",", "]"))
}
ps.answerPull(id, params(id - startIndex) match {
case Some(e) => e
case None => val ini = init(id)
params(id - startIndex) = Some(ini)
ini
}, workerPartitionIndex)
}
override def onPushRecv(id: Int, deltaUpdate: P, ps: ParameterServer[P, (Int, P)]): Unit = {
val index = id - startIndex
val c = params(index) match {
case Some(q) =>
update(q, deltaUpdate)
case None =>
deltaUpdate
}
params(index) = Some(c)
}
override def open(parameters: Configuration, runtimeContext: RuntimeContext): Unit = {
super.open(parameters, runtimeContext)
val div = Math.ceil(featureCount.toDouble / runtimeContext.getNumberOfParallelSubtasks).toInt
val mod = featureCount - (runtimeContext.getNumberOfParallelSubtasks - 1) * div
params = Array.fill[Option[P]](
if (mod != 0 && runtimeContext.getIndexOfThisSubtask + 1 == runtimeContext.getNumberOfParallelSubtasks) {
mod
} else {
div
})(None)
startIndex = runtimeContext.getIndexOfThisSubtask * div
}
}
开发者ID:gaborhermann,项目名称:flink-parameter-server,代码行数:53,代码来源:RangePSLogicWithClose.scala
示例6: StoppableExecutionEnvironment
//设置package包名称以及导入依赖的类
package pl.touk.nussknacker.engine.flink.test
import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.api.common.{JobExecutionResult, JobSubmissionResult}
import org.apache.flink.configuration.{ConfigConstants, Configuration, QueryableStateOptions}
import org.apache.flink.runtime.jobgraph.JobGraph
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
import org.apache.flink.runtime.query.QueryableStateClient
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.graph.StreamGraph
import org.apache.flink.test.util.TestBaseUtils
object StoppableExecutionEnvironment {
def withQueryableStateEnabled(userFlinkClusterConfiguration: Configuration = new Configuration()) : StoppableExecutionEnvironment= {
//blaaa this is needed to make queryableState work
//LocalFlinkMiniCluster#221 i #237
userFlinkClusterConfiguration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2)
userFlinkClusterConfiguration.setBoolean(QueryableStateOptions.SERVER_ENABLE, true)
new StoppableExecutionEnvironment(userFlinkClusterConfiguration, false)
}
}
class StoppableExecutionEnvironment(userFlinkClusterConfig: Configuration,
singleActorSystem: Boolean = true) extends StreamExecutionEnvironment with LazyLogging {
protected var localFlinkMiniCluster: LocalFlinkMiniCluster = _
def getJobManagerActorSystem() = {
localFlinkMiniCluster.jobManagerActorSystems.get.head
}
def queryableClient() : QueryableStateClient= {
new QueryableStateClient(userFlinkClusterConfig, localFlinkMiniCluster.highAvailabilityServices)
}
def execute(jobName: String): JobExecutionResult = {
// transform the streaming program into a JobGraph
val streamGraph: StreamGraph = getStreamGraph
streamGraph.setJobName(jobName)
val jobGraph: JobGraph = streamGraph.getJobGraph
logger.info("Running job on local embedded Flink mini cluster")
jobGraph.getJobConfiguration.addAll(userFlinkClusterConfig)
localFlinkMiniCluster = TestBaseUtils.startCluster(jobGraph.getJobConfiguration, singleActorSystem)
val submissionRes: JobSubmissionResult = localFlinkMiniCluster.submitJobDetached(jobGraph)
new JobExecutionResult(submissionRes.getJobID, 0, new java.util.HashMap[String, AnyRef]())
}
def stop(): Unit = {
if (localFlinkMiniCluster != null) {
localFlinkMiniCluster.stop()
}
}
}
开发者ID:TouK,项目名称:nussknacker,代码行数:60,代码来源:StoppableExecutionEnvironment.scala
示例7: lazyHandler
//设置package包名称以及导入依赖的类
package pl.touk.nussknacker.engine.flink.api.state
import org.apache.flink.api.common.functions.RichFunction
import org.apache.flink.configuration.Configuration
import pl.touk.nussknacker.engine.flink.api.exception.FlinkEspExceptionHandler
trait WithExceptionHandler extends RichFunction {
@transient lazy val exceptionHandler = lazyHandler()
def lazyHandler: () => FlinkEspExceptionHandler
override def close() = {
exceptionHandler.close()
}
override def open(parameters: Configuration) = {
exceptionHandler.open(getRuntimeContext)
}
}
开发者ID:TouK,项目名称:nussknacker,代码行数:20,代码来源:WithExceptionHandler.scala
示例8: beforeAll
//设置package包名称以及导入依赖的类
package pl.touk.nussknacker.engine.example
import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigValueFactory.fromAnyRef
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.scalatest.{BeforeAndAfterAll, Matchers, Suite}
import pl.touk.nussknacker.engine.flink.test.StoppableExecutionEnvironment
import pl.touk.nussknacker.engine.kafka.{KafkaSpec, KafkaZookeeperServer}
import pl.touk.nussknacker.engine.process.FlinkProcessRegistrar
import pl.touk.nussknacker.engine.process.compiler.StandardFlinkProcessCompiler
trait BaseITest extends KafkaSpec {
self: Suite with BeforeAndAfterAll with Matchers =>
val creator = new ExampleProcessConfigCreator
val flinkConf = new Configuration()
val stoppableEnv = new StoppableExecutionEnvironment(flinkConf)
val env = new StreamExecutionEnvironment(stoppableEnv)
var registrar: FlinkProcessRegistrar = _
override protected def beforeAll(): Unit = {
super.beforeAll()
val config = TestConfig(kafkaZookeeperServer)
env.getConfig.disableSysoutLogging()
registrar = new StandardFlinkProcessCompiler(creator, config).createFlinkProcessRegistrar()
}
override protected def afterAll(): Unit = {
super.afterAll()
stoppableEnv.stop()
}
}
object TestConfig {
def apply(kafkaZookeeperServer: KafkaZookeeperServer) = {
ConfigFactory.empty()
.withValue("kafka.kafkaAddress", fromAnyRef(kafkaZookeeperServer.kafkaAddress))
.withValue("kafka.zkAddress", fromAnyRef(kafkaZookeeperServer.zkAddress))
.withValue("checkpointInterval", fromAnyRef("10s"))
.withValue("timeout", fromAnyRef("10s"))
}
}
开发者ID:TouK,项目名称:nussknacker,代码行数:44,代码来源:BaseItTest.scala
示例9: Palindrome
//设置package包名称以及导入依赖的类
package de.tu_berlin.dima.bdapro.flink.palindrome.mschwarzer
import org.apache.flink.api.common.functions.RichFilterFunction
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.util.Collector
object Palindrome {
def main(args: Array[String]) {
if (args.length != 1) {
Console.err.println("Usage: <jar> inputPath")
System.exit(-1)
}
val inputPath = args(0) // Input will contain only lower-case English letters, spaces, newlines and numeric chars
val env = ExecutionEnvironment.getExecutionEnvironment
// Read sentences from text file
val pals = env.readTextFile(inputPath)
.flatMap{
(in, out: Collector[(String, Int)]) =>
val p = in.replaceAll("[^A-Za-z0-9]", "")
if (p.equals(p.reverse)) { // check for valid palindrome
out.collect(in, p.length())
}
}
// Save max length in config
val config = new Configuration()
config.setInteger("maxLength", pals.max(1).collect().last._2)
// Filter by max length
val maxPals = pals
.filter(new RichFilterFunction[(String, Int)]() {
var maxLength = 0
override def open(config: Configuration): Unit = {
maxLength = config.getInteger("maxLength", 0)
}
def filter(in: (String, Int)): Boolean = {
in._2 == maxLength
}
})
.withParameters(config)
.collect()
// Print all left-over sentences
maxPals.foreach { e =>
val (sentence, len) = e
println("The biggest palindrome sentence: <" + sentence + ">")
}
}
}
开发者ID:cristiprg,项目名称:BDAPRO.GlobalStateML,代码行数:59,代码来源:Palindrome.scala
示例10: PalindromeWithBroadcastSet
//设置package包名称以及导入依赖的类
package de.tu_berlin.dima.bdapro.flink.palindrome.ggevay
import org.apache.flink.api.common.functions.RichFilterFunction
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
object PalindromeWithBroadcastSet {
def main(args: Array[String]): Unit = {
val inFile = args(0)
val env = ExecutionEnvironment.getExecutionEnvironment
val sent = env.readTextFile(inFile).
map(o => (o, o.replaceAll(" ", ""))).
filter(t => t._2 == t._2.reverse).
map(t => (t._1, t._2.length))
val maxLen = sent.max(1)
// see https://cwiki.apache.org/confluence/display/FLINK/Variables+Closures+vs.+Broadcast+Variables
val allMax = sent.
filter(new RichFilterFunction[(String, Int)] {
var maxLenLocal: Int = _
override def open(parameters: Configuration): Unit = {
maxLenLocal = getRuntimeContext.getBroadcastVariable[(String, Int)]("maxLength").get(0)._2
}
override def filter(t: (String, Int)): Boolean = t._2 == maxLenLocal
}).withBroadcastSet(maxLen, "maxLength")
allMax.map(s => s"The biggest palindrome sentence: <${s._1}>").print()
}
}
开发者ID:cristiprg,项目名称:BDAPRO.GlobalStateML,代码行数:32,代码来源:PalindromeWithBroadcastSet.scala
示例11: DuplicateFilter
//设置package包名称以及导入依赖的类
package uk.co.pollett.flink.newsreader
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.util.Collector
object DuplicateFilter {
val descriptor: ValueStateDescriptor[Boolean] = new ValueStateDescriptor[Boolean]("seen", classOf[Boolean])
}
class DuplicateFilter[T] extends RichFlatMapFunction[T, T] {
var operatorState: ValueState[Boolean] = _
override def open(parameters: Configuration): Unit = {
operatorState = this.getRuntimeContext.getState(DuplicateFilter.descriptor)
}
override def flatMap(value: T, out: Collector[T]): Unit = {
if (!operatorState.value()) {
out.collect(value)
operatorState.update(true)
}
}
}
开发者ID:pollett,项目名称:flink-newsreader,代码行数:26,代码来源:DuplicateFilter.scala
示例12: FilterAssociationRuleImpliedCinds
//设置package包名称以及导入依赖的类
package de.hpi.isg.sodap.rdfind.operators
import java.lang
import de.hpi.isg.sodap.rdfind.data.{AssociationRule, Cind}
import de.hpi.isg.sodap.rdfind.util.ConditionCodes
import org.apache.flink.api.common.functions.{BroadcastVariableInitializer, RichFilterFunction}
import org.apache.flink.configuration.Configuration
import scala.collection.JavaConversions._
class FilterAssociationRuleImpliedCinds extends RichFilterFunction[Cind] {
private var impliedCinds: Set[Cind] = _
private lazy val testCind = Cind(0, null, "", 0, null, "", -1)
override def open(parameters: Configuration): Unit = {
super.open(parameters)
this.impliedCinds = getRuntimeContext.getBroadcastVariableWithInitializer(
FilterAssociationRuleImpliedCinds.ASSOCIATION_RULE_BROADCAST,
FilterAssociationRuleImpliedCinds.AssocationRuleBroadcastInitializer)
}
override def filter(cind: Cind): Boolean = {
// Copy all relevant values (all others are set already).
this.testCind.depCaptureType = cind.depCaptureType
this.testCind.depConditionValue1 = cind.depConditionValue1
this.testCind.refCaptureType = cind.refCaptureType
this.testCind.refConditionValue1 = cind.refConditionValue1
!this.impliedCinds(this.testCind)
}
}
object FilterAssociationRuleImpliedCinds {
val ASSOCIATION_RULE_BROADCAST = "association-rules"
object AssocationRuleBroadcastInitializer extends BroadcastVariableInitializer[AssociationRule, Set[Cind]] {
override def initializeBroadcastVariable(associationRules: lang.Iterable[AssociationRule]): Set[Cind] = {
associationRules.map { associationRule =>
val conditionCode = associationRule.antecedentType | associationRule.consequentType
val captureCode = ConditionCodes.addSecondaryConditions(conditionCode)
val projectionCode = captureCode & ~conditionCode
val cind = Cind(associationRule.antecedentType | projectionCode, associationRule.antecedent, "",
associationRule.consequentType | projectionCode, associationRule.consequent, "")
cind
}.toSet
}
}
}
开发者ID:stratosphere,项目名称:rdfind,代码行数:57,代码来源:FilterAssociationRuleImpliedCinds.scala
示例13: CreateUnaryConditionEvidences
//设置package包名称以及导入依赖的类
package de.hpi.isg.sodap.rdfind.operators.frequent_conditions
import de.hpi.isg.sodap.flink.util.GlobalIdGenerator
import de.hpi.isg.sodap.rdfind.data.{RDFTriple, UnaryConditionEvidence}
import de.hpi.isg.sodap.rdfind.util.ConditionCodes._
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.util.Collector
class CreateUnaryConditionEvidences extends RichFlatMapFunction[RDFTriple, UnaryConditionEvidence] {
val idGenerator: GlobalIdGenerator = new GlobalIdGenerator(0)
lazy val output = UnaryConditionEvidence(0, null, 1, new Array[Long](1))
override def open(parameters: Configuration): Unit = {
super.open(parameters)
this.idGenerator.initialize(getRuntimeContext)
}
override def flatMap(triple: RDFTriple, out: Collector[UnaryConditionEvidence]): Unit = {
val tripleId = this.idGenerator.yieldLong()
this.output.tripleIds(0) = tripleId
this.output.conditionType = subjectCondition
this.output.value = triple.subj
out.collect(output)
output.conditionType = predicateCondition
output.value = triple.pred
out.collect(output)
output.conditionType = objectCondition
output.value = triple.obj
out.collect(output)
}
}
开发者ID:stratosphere,项目名称:rdfind,代码行数:39,代码来源:CreateUnaryConditionEvidences.scala
示例14: PrintInfrequentCaptureSavings
//设置package包名称以及导入依赖的类
package de.hpi.isg.sodap.rdfind.operators
import de.hpi.isg.sodap.rdfind.data.{Condition, JoinLine}
import de.hpi.isg.sodap.rdfind.util.ConditionCodes._
import org.apache.flink.api.common.functions.{RichFilterFunction, RichMapFunction}
import org.apache.flink.configuration.Configuration
import scala.collection.JavaConversions._
import scala.collection.mutable
@deprecated(message = "This is only a debug function and should not be used in production.")
class PrintInfrequentCaptureSavings extends RichFilterFunction[JoinLine] {
var infrequentConditions: Set[Condition] = _
override def open(parameters: Configuration): Unit = {
super.open(parameters)
val broadcast = getRuntimeContext.getBroadcastVariable[Condition]("infrequent-conditions")
broadcast.foreach(_.decoalesce())
this.infrequentConditions = broadcast.toSet
}
override def filter(joinLine: JoinLine): Boolean = {
// Materialize conditions.
val conditions = new mutable.HashSet[Condition]()
joinLine.conditions.foreach { condition =>
conditions += condition
if (!condition.isUnary) {
val conditionCodes = decodeConditionCode(condition.conditionType, isRequireDoubleCode = true)
val newConditionCode1 = createConditionCode(conditionCodes._1, secondaryCondition = conditionCodes._3)
conditions += Condition(condition.conditionValue1, "", newConditionCode1)
val newConditionCode2 = createConditionCode(conditionCodes._2, secondaryCondition = conditionCodes._3)
conditions += Condition(condition.conditionValue2, "", newConditionCode2)
}
}
// Count the number condition to be removed.
val overallConditions = conditions.size
val unnecessaryConditions = conditions.intersect(this.infrequentConditions).size
if (unnecessaryConditions > 0) {
println(s"Can reduce condtions in join line from $overallConditions by $unnecessaryConditions")
if (unnecessaryConditions > 5) {
println(s"Namely remove ${conditions.intersect(this.infrequentConditions)} by $conditions")
}
}
true
}
}
开发者ID:stratosphere,项目名称:rdfind,代码行数:53,代码来源:PrintInfrequentCaptureSavings.scala
示例15: ShortenUrls
//设置package包名称以及导入依赖的类
package de.hpi.isg.sodap.rdfind.operators
import java.lang.Iterable
import de.hpi.isg.sodap.rdfind.data.{RDFPrefix, RDFTriple}
import de.hpi.isg.sodap.rdfind.util.StringTrie
import org.apache.flink.api.common.functions.{BroadcastVariableInitializer, RichMapFunction}
import org.apache.flink.configuration.Configuration
import scala.collection.JavaConversions._
class ShortenUrls extends RichMapFunction[RDFTriple, RDFTriple] {
var prefixTrie: StringTrie[String] = _
override def open(parameters: Configuration): Unit = {
super.open(parameters)
this.prefixTrie = getRuntimeContext.getBroadcastVariableWithInitializer(
ShortenUrls.PREFIX_BROADCAST,
ShortenUrls.PrefixTrieCreator)
}
override def map(triple: RDFTriple): RDFTriple = {
triple.subj = shorten(triple.subj)
triple.pred = shorten(triple.pred)
triple.obj = shorten(triple.obj)
triple
}
@inline
private def shorten(url: String): String = {
if (url.endsWith(">")) {
val keyValuePair = this.prefixTrie.getKeyAndValue(url)
if (keyValuePair != null) {
return keyValuePair._2 + url.substring(keyValuePair._1.length, url.length - 1)
}
}
url
}
}
object ShortenUrls {
val PREFIX_BROADCAST = "prefixes"
object PrefixTrieCreator extends BroadcastVariableInitializer[RDFPrefix, StringTrie[String]] {
override def initializeBroadcastVariable(data: Iterable[RDFPrefix]): StringTrie[String] = {
val trie = new StringTrie[String]
data.foreach(prefix => trie += s"<${prefix.url}" -> s"${prefix.prefix}:")
trie.squash()
trie
}
}
}
开发者ID:stratosphere,项目名称:rdfind,代码行数:59,代码来源:ShortenUrls.scala
示例16: CreateHashes
//设置package包名称以及导入依赖的类
package de.hpi.isg.sodap.rdfind.operators
import com.google.common.hash.BloomFilter
import de.hpi.isg.sodap.rdfind.data.RDFTriple
import de.hpi.isg.sodap.rdfind.operators.CreateHashes._
import de.hpi.isg.sodap.rdfind.util.ConditionCodes._
import de.hpi.isg.sodap.rdfind.util.HashFunction
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.util.Collector
import scala.collection.JavaConversions._
class CreateHashes(hashAlgorithm: String, hashBytes: Int = -1)
extends RichFlatMapFunction[RDFTriple, (String, Array[String])] {
var unaryFcBloomFilter: Map[Int, BloomFilter[String]] = _
lazy val hashFunction = new HashFunction(hashAlgorithm, hashBytes, isUnsetFirstMsb = false)
lazy val singletonArray = Array("")
override def open(parameters: Configuration): Unit = {
super.open(parameters)
val broadcastVariable = getRuntimeContext.getBroadcastVariable[(Int, BloomFilter[String])](
UNARY_FC_BLOOM_FILTERS_BROADCAST)
unaryFcBloomFilter = broadcastVariable.toMap
}
override def flatMap(triple: RDFTriple, out: Collector[(String, Array[String])]): Unit = {
val s = if (unaryFcBloomFilter(subjectCondition).mightContain(triple.subj)) triple.subj else null
val p = if (unaryFcBloomFilter(predicateCondition).mightContain(triple.pred)) triple.pred else null
val o = if (unaryFcBloomFilter(objectCondition).mightContain(triple.obj)) triple.obj else null
if (s != null) {
this.singletonArray(0) = s
out.collect((this.hashFunction.hashStringToString(s), this.singletonArray))
}
if (p != null) {
this.singletonArray(0) = p
out.collect((this.hashFunction.hashStringToString(p), this.singletonArray))
}
if (o != null) {
this.singletonArray(0) = o
out.collect((this.hashFunction.hashStringToString(o), this.singletonArray))
}
}
def replaceNullsWithEmptyString(str: String): String = if (str == null) "" else str
}
object CreateHashes {
val UNARY_FC_BLOOM_FILTERS_BROADCAST = CreateJoinPartners.UNARY_FC_BLOOM_FILTERS_BROADCAST
}
开发者ID:stratosphere,项目名称:rdfind,代码行数:59,代码来源:CreateHashes.scala
注:本文中的org.apache.flink.configuration.Configuration类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论