• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala DataSourceRegister类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中org.apache.spark.sql.sources.DataSourceRegister的典型用法代码示例。如果您正苦于以下问题:Scala DataSourceRegister类的具体用法?Scala DataSourceRegister怎么用?Scala DataSourceRegister使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了DataSourceRegister类的3个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: HttpTextSinkProvider

//设置package包名称以及导入依赖的类
package org.apache.spark.sql.execution.streaming

import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.internal.Logging
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.sources.StreamSinkProvider
import org.apache.spark.sql.streaming.OutputMode

import Params._

class HttpTextSinkProvider extends StreamSinkProvider with DataSourceRegister {
	def createSink(
		sqlContext: SQLContext,
		parameters: Map[String, String],
		partitionColumns: Seq[String],
		outputMode: OutputMode): Sink = {
		new HttpTextSink(parameters.getString("httpServletUrl"), parameters.getString("topic"), parameters.getBool("useGzipCompress", true));
	}

	def shortName(): String = "httpText"
}

class HttpTextSink(httpPostURL: String, topic: String, useGzipCompress: Boolean) extends Sink with Logging {
	val sender = new HttpTextSender(httpPostURL);
	val RETRY_TIMES = 5;
	val SLEEP_TIME = 100;

	override def addBatch(batchId: Long, data: DataFrame) {
		//send data to the HTTP server
		var success = false;
		var retried = 0;
		while (!success && retried < RETRY_TIMES) {
			try {
				retried += 1;
				sender.sendTextArray(topic, batchId, data.collect().map { _.get(0).asInstanceOf[String] }, useGzipCompress);
				success = true;
			}
			catch {
				case e: Throwable ? {
					success = false;
					super.logWarning(s"failed to send", e);
					if (retried < RETRY_TIMES) {
						val sleepTime = SLEEP_TIME * retried;
						super.logWarning(s"will retry to send after ${sleepTime}ms");
						Thread.sleep(sleepTime);
					}
					else {
						throw e;
					}
				}
			}
		}
	}
} 
开发者ID:bluejoe2008,项目名称:spark-http-stream,代码行数:57,代码来源:HttpTextSink.scala


示例2: DefaultSource

//设置package包名称以及导入依赖的类
package de.usu.research.sake.sparksparql

import org.apache.spark.sql.sources.SchemaRelationProvider
import org.apache.spark.sql.sources.RelationProvider
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.StructType

class DefaultSource extends RelationProvider with SchemaRelationProvider with DataSourceRegister {
  override def shortName(): String = "sparql"

  
  override def createRelation(
    sqlContext: SQLContext,
    parameters: Map[String, String],
    schema: StructType): SparqlRelation = {
    val service = checkService(parameters)
    val query = checkQuery(parameters)
    SparqlRelation(service, query, schema)(sqlContext)
  }

  private def checkService(parameters: Map[String, String]): String = {
    parameters.getOrElse("service", sys.error("'service' must be specified for SPARQL data."))
  }

  private def checkQuery(parameters: Map[String, String]): String = {
    parameters.getOrElse("query", sys.error("'query' must be specified for SPARQL data."))
  }
} 
开发者ID:USU-Research,项目名称:spark-sparql-connector,代码行数:31,代码来源:DefaultSource.scala


示例3: DefaultSource

//设置package包名称以及导入依赖的类
package solr

import com.lucidworks.spark.SolrRelation
import com.lucidworks.spark.util.Constants
import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
import org.apache.spark.sql.sources.{DataSourceRegister, BaseRelation, CreatableRelationProvider, RelationProvider}

class DefaultSource extends RelationProvider with CreatableRelationProvider with DataSourceRegister {

  override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = {
    try {
      return new SolrRelation(parameters, sqlContext)
    } catch {
      case re: RuntimeException => throw re
      case e: Exception => throw new RuntimeException(e)
    }
  }

  override def createRelation(
      sqlContext: SQLContext,
      mode: SaveMode,
      parameters: Map[String, String],
      df: DataFrame): BaseRelation = {
    try {
      // TODO: What to do with the saveMode?
      val solrRelation: SolrRelation = new SolrRelation(parameters, sqlContext, Some(df))
      solrRelation.insert(df, overwrite = true)
      solrRelation
    } catch {
      case re: RuntimeException => throw re
      case e: Exception => throw new RuntimeException(e)
    }
  }

  override def shortName(): String = Constants.SOLR_FORMAT
} 
开发者ID:OpenPOWER-BigData,项目名称:HDP2.5-spark-solr,代码行数:37,代码来源:DefaultSource.scala



注:本文中的org.apache.spark.sql.sources.DataSourceRegister类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala Serializer类代码示例发布时间:2022-05-23
下一篇:
Scala NotSerializableException类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap