• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala ExpressionEncoder类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中org.apache.spark.sql.catalyst.encoders.ExpressionEncoder的典型用法代码示例。如果您正苦于以下问题:Scala ExpressionEncoder类的具体用法?Scala ExpressionEncoder怎么用?Scala ExpressionEncoder使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了ExpressionEncoder类的3个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: ClickHouseSink

//设置package包名称以及导入依赖的类
package io.clickhouse.ext.spark.streaming

import io.clickhouse.ext.ClickHouseUtils
import io.clickhouse.ext.tools.Utils
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Encoders}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.execution.streaming.Sink
import scala.reflect.{ClassTag, classTag}
import scala.reflect.runtime.universe.TypeTag

class ClickHouseSink[T <: Product: ClassTag](dbName: String, tableName: String, eventDataColumn: String)
                                            (getConnectionString: () => (String, Int)) // -> (host, port)
                                            (partitionFunc: (org.apache.spark.sql.Row) => java.sql.Date)
                                            (implicit tag: TypeTag[T]) extends Sink with Serializable with Logging {

  override def addBatch(batchId: Long, data: DataFrame) = {

    val res = data.queryExecution.toRdd.mapPartitions{ iter =>

      val stateUpdateEncoder = Encoders.product[T]
      val schema = stateUpdateEncoder.schema
      val exprEncoder = stateUpdateEncoder.asInstanceOf[ExpressionEncoder[T]]

      if(iter.nonEmpty){

        val clickHouseHostPort = getConnectionString()
        Utils.using(ClickHouseUtils.createConnection(clickHouseHostPort)){ connection =>

          val insertStatement = ClickHouseUtils.prepareInsertStatement(connection, dbName, tableName, eventDataColumn)(schema)

          iter.foreach{ internalRow =>
            val caseClassInstance = exprEncoder.resolveAndBind(
              schema.map(f => AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
            ).fromRow(internalRow)
            val row = org.apache.spark.sql.Row.fromTuple(caseClassInstance)
            ClickHouseUtils.batchAdd(schema, row)(insertStatement)(partitionFunc)
          }

          val inserted = insertStatement.executeBatch().sum
          log.info(s"inserted $inserted -> (${clickHouseHostPort._1}:${clickHouseHostPort._2})")

          List(inserted).toIterator

        } // end: close connection

      } else {
        Iterator.empty
      }

    } // end: mapPartition

    val insertedCount = res.collect().sum
    log.info(s"Batch $batchId's inserted total: $insertedCount")
  }
} 
开发者ID:DmitryBe,项目名称:spark-streaming-clickhouse,代码行数:58,代码来源:ClickHouseSink.scala


示例2: KarpsStubs

//设置package包名称以及导入依赖的类
package org.apache.spark.sql

import org.apache.spark.SparkContext
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.{QueryExecution, SQLExecution}

object KarpsStubs {
  def withExecutionId[T](sc: SparkContext, executionId: String)(body: => T): T= {
    SQLExecution.withExecutionId(sc, executionId)(body)
  }

  def withNewExecutionId[T](
      sparkSession: SparkSession,
      queryExecution: QueryExecution)(body: => T): T = {
    SQLExecution.withNewExecutionId(sparkSession, queryExecution)(body)
  }

  def getBoundEncoder(df: DataFrame): ExpressionEncoder[Row] = {
    df.exprEnc.resolveAndBind(df.logicalPlan.output,
      df.sparkSession.sessionState.analyzer)
  }

  def getExpression(c: Column): Expression = c.expr

  def makeColumn(exp: Expression): Column = Column.apply(exp)
} 
开发者ID:tjhunter,项目名称:karps,代码行数:28,代码来源:KarpsStubs.scala


示例3: BenchmarkTestData

//设置package包名称以及导入依赖的类
package com.datawizards.sparklocal.performance

import com.datawizards.sparklocal.TestModel.Person
import com.datawizards.sparklocal.performance.BenchmarkModel.{InputDataSets, InputRDDs}
import com.datawizards.sparklocal.session.{ExecutionEngine, SparkSessionAPI}
import com.datawizards.sparklocal.implicits._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.scalacheck.Arbitrary
import org.scalacheck.Shapeless._

import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag

object BenchmarkTestData {
  lazy val dataSets10Elements: InputDataSets[Person] = createInputDataSets(people10Elements)
  lazy val dataSets100Elements: InputDataSets[Person] = createInputDataSets(people100Elements)
  lazy val dataSets1000Elements: InputDataSets[Person] = createInputDataSets(people1000Elements)
  lazy val dataSets100000Elements: InputDataSets[Person] = createInputDataSets(people100000Elements)
  lazy val rdds10Elements: InputRDDs[Person] = createInputRDDs(people10Elements)
  lazy val rdds100Elements: InputRDDs[Person] = createInputRDDs(people100Elements)
  lazy val rdds1000Elements: InputRDDs[Person] = createInputRDDs(people1000Elements)
  lazy val rdds100000Elements: InputRDDs[Person] = createInputRDDs(people100000Elements)

  def createInputDataSets[T: ClassTag: TypeTag](data: Seq[T]): InputDataSets[T] = {
    implicit val encoder = ExpressionEncoder[T]()

    InputDataSets(
      scalaEagerImpl = scalaEagerSession.createDataset(data),
      scalaLazyImpl = scalaLazySession.createDataset(data),
      scalaParallelImpl = scalaParallelSession.createDataset(data),
      scalaParallelLazyImpl = scalaParallelLazySession.createDataset(data),
      sparkImpl = sparkSession.createDataset(data)
    )
  }

  private def createInputRDDs[T: ClassTag](data: Seq[T]): InputRDDs[T] =
    InputRDDs(
      scalaEagerImpl = scalaEagerSession.createRDD(data),
      scalaLazyImpl = scalaLazySession.createRDD(data),
      scalaParallelImpl = scalaParallelSession.createRDD(data),
      scalaParallelLazyImpl = scalaParallelLazySession.createRDD(data),
      sparkImpl = sparkSession.createRDD(data)
    )

  private lazy val scalaEagerSession = SparkSessionAPI.builder(ExecutionEngine.ScalaEager).master("local").getOrCreate()
  private lazy val scalaLazySession = SparkSessionAPI.builder(ExecutionEngine.ScalaLazy).master("local").getOrCreate()
  private lazy val scalaParallelSession = SparkSessionAPI.builder(ExecutionEngine.ScalaParallel).master("local").getOrCreate()
  private lazy val scalaParallelLazySession = SparkSessionAPI.builder(ExecutionEngine.ScalaParallelLazy).master("local").getOrCreate()
  private lazy val sparkSession = SparkSessionAPI.builder(ExecutionEngine.Spark).master("local").getOrCreate()

  private lazy val peopleGenerator = implicitly[Arbitrary[Person]].arbitrary

  private lazy val people10Elements = for(i <- 1 to 10) yield peopleGenerator.sample.get
  private lazy val people100Elements = for(i <- 1 to 100) yield peopleGenerator.sample.get
  private lazy val people1000Elements = for(i <- 1 to 1000) yield peopleGenerator.sample.get
  private lazy val people100000Elements = for(i <- 1 to 100000) yield peopleGenerator.sample.get
} 
开发者ID:piotr-kalanski,项目名称:spark-local,代码行数:58,代码来源:BenchmarkTestData.scala



注:本文中的org.apache.spark.sql.catalyst.encoders.ExpressionEncoder类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala RegionUtils类代码示例发布时间:2022-05-23
下一篇:
Scala MathContext类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap