本文整理汇总了Scala中org.apache.spark.sql.execution.streaming.Sink类的典型用法代码示例。如果您正苦于以下问题:Scala Sink类的具体用法?Scala Sink怎么用?Scala Sink使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Sink类的3个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: ClickHouseSink
//设置package包名称以及导入依赖的类
package io.clickhouse.ext.spark.streaming
import io.clickhouse.ext.ClickHouseUtils
import io.clickhouse.ext.tools.Utils
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Encoders}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.execution.streaming.Sink
import scala.reflect.{ClassTag, classTag}
import scala.reflect.runtime.universe.TypeTag
class ClickHouseSink[T <: Product: ClassTag](dbName: String, tableName: String, eventDataColumn: String)
(getConnectionString: () => (String, Int)) // -> (host, port)
(partitionFunc: (org.apache.spark.sql.Row) => java.sql.Date)
(implicit tag: TypeTag[T]) extends Sink with Serializable with Logging {
override def addBatch(batchId: Long, data: DataFrame) = {
val res = data.queryExecution.toRdd.mapPartitions{ iter =>
val stateUpdateEncoder = Encoders.product[T]
val schema = stateUpdateEncoder.schema
val exprEncoder = stateUpdateEncoder.asInstanceOf[ExpressionEncoder[T]]
if(iter.nonEmpty){
val clickHouseHostPort = getConnectionString()
Utils.using(ClickHouseUtils.createConnection(clickHouseHostPort)){ connection =>
val insertStatement = ClickHouseUtils.prepareInsertStatement(connection, dbName, tableName, eventDataColumn)(schema)
iter.foreach{ internalRow =>
val caseClassInstance = exprEncoder.resolveAndBind(
schema.map(f => AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
).fromRow(internalRow)
val row = org.apache.spark.sql.Row.fromTuple(caseClassInstance)
ClickHouseUtils.batchAdd(schema, row)(insertStatement)(partitionFunc)
}
val inserted = insertStatement.executeBatch().sum
log.info(s"inserted $inserted -> (${clickHouseHostPort._1}:${clickHouseHostPort._2})")
List(inserted).toIterator
} // end: close connection
} else {
Iterator.empty
}
} // end: mapPartition
val insertedCount = res.collect().sum
log.info(s"Batch $batchId's inserted total: $insertedCount")
}
}
开发者ID:DmitryBe,项目名称:spark-streaming-clickhouse,代码行数:58,代码来源:ClickHouseSink.scala
示例2: CustomSinkProvider
//设置package包名称以及导入依赖的类
package com.knockdata.spark.highcharts
import com.knockdata.spark.highcharts.model.Highcharts
import org.apache.spark.sql._
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources.StreamSinkProvider
import org.apache.spark.sql.streaming.OutputMode
class CustomSinkProvider extends StreamSinkProvider {
def createSink(
sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink = {
new Sink {
override def addBatch(batchId: Long, data: DataFrame): Unit = {
val chartId = parameters("chartId")
val chartParagraphId = parameters("chartParagraphId")
println(s"batchId: $batchId, chartId: $chartId, chartParagraphId: $chartParagraphId")
// data.show(3)
val z = Registry.get(s"$chartId-z").asInstanceOf[ZeppelinContextHolder]
val seriesHolder = Registry.get(s"$chartId-seriesHolder").asInstanceOf[SeriesHolder]
val outputMode = Registry.get(s"$chartId-outputMode").asInstanceOf[CustomOutputMode]
seriesHolder.dataFrame = data
val result = seriesHolder.result
val (normalSeriesList, drilldownSeriesList) = outputMode.result(result._1, result._2)
val chart = new Highcharts(normalSeriesList, seriesHolder.chartId)
.drilldown(drilldownSeriesList)
val plotData = chart.plotData
// val escaped = plotData.replace("%angular", "")
// println(s" put $chartParagraphId $escaped")
z.put(chartParagraphId, plotData)
println(s"run $chartParagraphId")
z.run(chartParagraphId)
}
}
}
}
开发者ID:knockdata,项目名称:spark-highcharts,代码行数:46,代码来源:CustomSinkProvider.scala
示例3: CustomSinkProvider
//设置package包名称以及导入依赖的类
package com.rockiey.kafka
import org.apache.spark.sql._
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources.StreamSinkProvider
import org.apache.spark.sql.streaming.OutputMode
class CustomSinkProvider extends StreamSinkProvider {
def createSink(
sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink = {
new Sink {
override def addBatch(batchId: Long, data: DataFrame): Unit = {
data.printSchema()
data.show()
println(s"count ${data.count()}")
}
}
}
}
开发者ID:rockie-yang,项目名称:explore-spark-kafka,代码行数:24,代码来源:CustomSinkProvider.scala
注:本文中的org.apache.spark.sql.execution.streaming.Sink类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论