本文整理汇总了Scala中org.apache.spark.streaming.receiver.Receiver类的典型用法代码示例。如果您正苦于以下问题:Scala Receiver类的具体用法?Scala Receiver怎么用?Scala Receiver使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Receiver类的13个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: FqueueStreamingReceiver
//设置package包名称以及导入依赖的类
import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket
import Fqueue.FqueueReceiver
import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
class FqueueStreamingReceiver(val address: String, val connectionPoolSize: Int, val timeOut: Int)
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
private var receiver: Option[FqueueReceiver] = None
def onStart() {
new Thread("Socket Receiver") {
override def run() { receive() }
}.start()
}
def onStop(): Unit = {
receiver foreach { _.stop() }
}
private def receive(): Unit = {
val fqueueReceiver = new FqueueReceiver(address, connectionPoolSize, timeOut)
receiver = Some(fqueueReceiver)
receiver foreach { _.connect() }
try
{
var stop = false
while (!isStopped() && !stop) {
val data = fqueueReceiver.deQueue("track_BOdao2015*")
data match {
case Some(str) => store(str)
case None => Thread.sleep(1000)//stop = true
}
}
receiver foreach { _.stop() }
} catch {
case e: Exception =>
println("get data from fqueue err! pleace sure the server is live")
println(e.getMessage)
println(e.getStackTraceString)
receiver foreach { _.stop() }
}
}
}
开发者ID:TopSpoofer,项目名称:FqueueStreamingReceiver,代码行数:49,代码来源:FqueueStreamingReceiver.scala
示例2: JMSInputDStream
//设置package包名称以及导入依赖的类
package com.redhat.spark.streaming.jms
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver.Receiver
private[streaming]
class JMSInputDStream(
@transient ssc_ : StreamingContext,
brokerURL: String,
username: String,
password: String,
queuename: String,
selector: String,
storageLevel: StorageLevel
) extends ReceiverInputDStream[JMSEvent](ssc_) {
override def getReceiver(): Receiver[JMSEvent] = {
new JMSReceiver(brokerURL, username, password, queuename, selector, storageLevel)
}
}
开发者ID:xiaoJacky,项目名称:sparkLearning,代码行数:23,代码来源:JMSInputDStream.scala
示例3: TwitterInputDStream
//设置package包名称以及导入依赖的类
package com.aluxian.tweeather.streaming
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver
import twitter4j.auth.{Authorization, OAuthAuthorization}
import twitter4j.conf.ConfigurationBuilder
import twitter4j.{FilterQuery, Status}
class TwitterInputDStream(@transient ssc: StreamingContext,
twitterAuth: Option[Authorization],
filterQuery: Option[FilterQuery],
storageLevel: StorageLevel
) extends ReceiverInputDStream[Status](ssc) {
private val authorization = twitterAuth.getOrElse(createOAuthAuthorization())
private def createOAuthAuthorization(): Authorization = {
new OAuthAuthorization(new ConfigurationBuilder().build())
}
override def getReceiver(): Receiver[Status] = {
new TwitterReceiver(authorization, filterQuery, storageLevel)
}
}
开发者ID:cnajeefa,项目名称:Tourism-Sentiment-Analysis,代码行数:29,代码来源:TwitterInputDStream.scala
示例4: FacebookPostReceiver
//设置package包名称以及导入依赖的类
package com.github.catalystcode.fortis.spark.streaming.facebook
import java.util.Date
import com.github.catalystcode.fortis.spark.streaming.facebook.client.FacebookPageClient
import com.github.catalystcode.fortis.spark.streaming.facebook.dto.FacebookPost
import com.github.catalystcode.fortis.spark.streaming.{PollingReceiver, PollingSchedule}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver
private class FacebookPostReceiver(
clients: Set[FacebookPageClient],
pollingSchedule: PollingSchedule,
storageLevel: StorageLevel,
pollingWorkers: Int
) extends PollingReceiver[FacebookPost](pollingSchedule, pollingWorkers, storageLevel) with Logger {
@volatile private var lastIngestedDate: Option[Date] = None
override protected def poll(): Unit = {
clients.par.foreach(_
.loadNewFacebookPosts(lastIngestedDate)
.filter(x => {
logDebug(s"Got facebook ${x.post.getPermalinkUrl} from page ${x.pageId} time ${x.post.getCreatedTime}")
isNew(x)
})
.foreach(x => {
logInfo(s"Storing facebook ${x.post.getPermalinkUrl}")
store(x)
markStored(x)
})
)
}
private def isNew(item: FacebookPost) = {
lastIngestedDate.isEmpty || item.post.getCreatedTime.after(lastIngestedDate.get)
}
private def markStored(item: FacebookPost): Unit = {
if (isNew(item)) {
lastIngestedDate = Some(item.post.getCreatedTime)
logDebug(s"Updating last ingested date to ${item.post.getCreatedTime}")
}
}
}
class FacebookPostInputDStream(
ssc: StreamingContext,
clients: Set[FacebookPageClient],
pollingSchedule: PollingSchedule,
pollingWorkers: Int,
storageLevel: StorageLevel
) extends ReceiverInputDStream[FacebookPost](ssc) {
override def getReceiver(): Receiver[FacebookPost] = {
logDebug("Creating facebook receiver")
new FacebookPostReceiver(clients, pollingSchedule, storageLevel, pollingWorkers)
}
}
开发者ID:CatalystCode,项目名称:streaming-facebook,代码行数:62,代码来源:FacebookPostInputDStream.scala
示例5: FacebookCommentsReceiver
//设置package包名称以及导入依赖的类
package com.github.catalystcode.fortis.spark.streaming.facebook
import java.util.Date
import com.github.catalystcode.fortis.spark.streaming.facebook.client.FacebookPageClient
import com.github.catalystcode.fortis.spark.streaming.facebook.dto.FacebookComment
import com.github.catalystcode.fortis.spark.streaming.{PollingReceiver, PollingSchedule}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver
private class FacebookCommentsReceiver(
clients: Set[FacebookPageClient],
pollingSchedule: PollingSchedule,
storageLevel: StorageLevel,
pollingWorkers: Int
) extends PollingReceiver[FacebookComment](pollingSchedule, pollingWorkers, storageLevel) with Logger {
@volatile private var lastIngestedDate: Option[Date] = None
override protected def poll(): Unit = {
clients.par.foreach(_
.loadNewFacebookComments(lastIngestedDate)
.filter(x => {
logDebug(s"Got comment with id ${x.comment.getId} from page ${x.pageId}")
isNew(x)
})
.foreach(x => {
logInfo(s"Storing comment ${x.comment.getId} from page ${x.pageId}")
store(x)
markStored(x)
})
)
}
private def isNew(item: FacebookComment) = {
lastIngestedDate.isEmpty || item.comment.getCreatedTime.after(lastIngestedDate.get)
}
private def markStored(item: FacebookComment): Unit = {
if (isNew(item)) {
lastIngestedDate = Some(item.comment.getCreatedTime)
logDebug(s"Updating last ingested date to ${lastIngestedDate.get}")
}
}
}
class FacebookCommentsInputDStream(
ssc: StreamingContext,
clients: Set[FacebookPageClient],
pollingSchedule: PollingSchedule,
pollingWorkers: Int,
storageLevel: StorageLevel
) extends ReceiverInputDStream[FacebookComment](ssc) {
override def getReceiver(): Receiver[FacebookComment] = {
logDebug("Creating facebook receiver")
new FacebookCommentsReceiver(clients, pollingSchedule, storageLevel, pollingWorkers)
}
}
开发者ID:CatalystCode,项目名称:streaming-facebook,代码行数:62,代码来源:FacebookCommentsInputDStream.scala
示例6: PollingSchedule
//设置package包名称以及导入依赖的类
package com.github.catalystcode.fortis.spark.streaming
import java.util.concurrent.{ScheduledThreadPoolExecutor, TimeUnit}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
case class PollingSchedule(interval: Long, unit: TimeUnit, initialDelay: Long = 1)
// Taken from https://github.com/CatalystCode/streaming-instagram/blob/3873a197212ba5929dd54ec4949f3d1ac10ffc1f/src/main/scala/com/github/catalystcode/fortis/spark/streaming/PollingReceiver.scala
// Put this into a shared library at some point
abstract class PollingReceiver[T](
pollingSchedule: PollingSchedule,
pollingWorkers: Int,
storageLevel: StorageLevel
) extends Receiver[T](storageLevel) {
private var threadPool: ScheduledThreadPoolExecutor = _
def onStart(): Unit = {
threadPool = new ScheduledThreadPoolExecutor(pollingWorkers)
val pollingThread = new Thread("Polling thread") {
override def run(): Unit = {
poll()
}
}
threadPool.scheduleAtFixedRate(
pollingThread, pollingSchedule.initialDelay,
pollingSchedule.interval, pollingSchedule.unit)
}
def onStop(): Unit = {
if (threadPool != null) {
threadPool.shutdown()
}
}
protected def poll(): Unit
}
开发者ID:CatalystCode,项目名称:streaming-facebook,代码行数:42,代码来源:PollingReceiver.scala
示例7: PollingSchedule
//设置package包名称以及导入依赖的类
package com.github.catalystcode.fortis.spark.streaming
import java.util.concurrent.{ScheduledThreadPoolExecutor, TimeUnit}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
case class PollingSchedule(interval: Long, unit: TimeUnit, initialDelay: Long = 1)
abstract class PollingReceiver[T](
pollingSchedule: PollingSchedule,
pollingWorkers: Int,
storageLevel: StorageLevel
) extends Receiver[T](storageLevel) {
private var threadPool: ScheduledThreadPoolExecutor = _
def onStart(): Unit = {
threadPool = new ScheduledThreadPoolExecutor(pollingWorkers)
val pollingThread = new Thread("Polling thread") {
override def run(): Unit = {
poll()
}
}
threadPool.scheduleAtFixedRate(
pollingThread, pollingSchedule.initialDelay,
pollingSchedule.interval, pollingSchedule.unit)
}
def onStop(): Unit = {
if (threadPool != null) {
threadPool.shutdown()
}
}
protected def poll(): Unit
}
开发者ID:CatalystCode,项目名称:streaming-instagram,代码行数:40,代码来源:PollingReceiver.scala
示例8: InstagramReceiver
//设置package包名称以及导入依赖的类
package com.github.catalystcode.fortis.spark.streaming.instagram
import com.github.catalystcode.fortis.spark.streaming.instagram.client.InstagramClient
import com.github.catalystcode.fortis.spark.streaming.instagram.dto.InstagramItem
import com.github.catalystcode.fortis.spark.streaming.{PollingReceiver, PollingSchedule}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver
private class InstagramReceiver(
client: InstagramClient,
pollingSchedule: PollingSchedule,
storageLevel: StorageLevel,
pollingWorkers: Int
) extends PollingReceiver[InstagramItem](pollingSchedule, pollingWorkers, storageLevel) with Logger {
@volatile private var lastIngestedEpoch = Long.MinValue
override protected def poll(): Unit = {
client
.loadNewInstagrams()
.filter(x => {
val createdAt = x.created_time.toLong
logDebug(s"Got instagram ${x.link} from time $createdAt")
createdAt > lastIngestedEpoch
})
.foreach(x => {
logInfo(s"Storing instagram ${x.link}")
store(x)
markStored(x)
})
}
private def markStored(item: InstagramItem): Unit = {
val itemCreatedAt = item.created_time.toLong
if (itemCreatedAt > lastIngestedEpoch) {
lastIngestedEpoch = itemCreatedAt
logDebug(s"Updating last ingested epoch to $itemCreatedAt")
}
}
}
class InstagramInputDStream(
ssc: StreamingContext,
client: InstagramClient,
pollingSchedule: PollingSchedule,
pollingWorkers: Int,
storageLevel: StorageLevel
) extends ReceiverInputDStream[InstagramItem](ssc) {
override def getReceiver(): Receiver[InstagramItem] = {
logDebug("Creating instagram receiver")
new InstagramReceiver(client, pollingSchedule, storageLevel, pollingWorkers)
}
}
开发者ID:CatalystCode,项目名称:streaming-instagram,代码行数:57,代码来源:InstagramInputDStream.scala
示例9: CustomReceiver
//设置package包名称以及导入依赖的类
package com.knoldus.receiver
import java.io.{BufferedReader, FileInputStream, InputStreamReader}
import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{Seconds, StreamingContext}
object CustomReceiver extends App {
val sparkConf = new SparkConf().setAppName("CustomReceiver")
val ssc = new StreamingContext(sparkConf, Seconds(1))
val lines = ssc.receiverStream(new CustomReceiver(args(0)))
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
class CustomReceiver(path: String) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
def onStart() {
new Thread("File Reader") {
override def run() {
receive()
}
}.start()
}
def onStop() {}
private def receive() =
try {
println("Reading file " + path)
val reader = new BufferedReader(
new InputStreamReader(new FileInputStream(path), StandardCharsets.UTF_8))
var userInput = reader.readLine()
while (!isStopped && userInput != null) {
store(userInput)
userInput = reader.readLine()
}
reader.close()
println("Stopped receiving")
restart("Trying to connect again")
} catch {
case ex: Exception =>
restart("Error reading file " + path, ex)
}
}
开发者ID:knoldus,项目名称:spark-streaming-meetup,代码行数:58,代码来源:CustomReceiver.scala
示例10: TranscriptionReceiver
//设置package包名称以及导入依赖的类
package com.microsoft.partnercatalyst.fortis.spark.sources.streamwrappers.radio
import java.io.InputStream
import java.net.URL
import java.util.Locale
import java.util.function.Consumer
import com.github.catalystcode.fortis.speechtotext.Transcriber
import com.github.catalystcode.fortis.speechtotext.config.{OutputFormat, SpeechServiceConfig, SpeechType}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
class TranscriptionReceiver(
radioUrl: String,
audioType: String,
locale: String,
subscriptionKey: String,
speechType: String,
outputFormat: String,
storageLevel: StorageLevel
) extends Receiver[RadioTranscription](storageLevel) {
private val language = new Locale(locale).getLanguage
private var audioStream: InputStream = _
private var transcriber: Transcriber = _
private val onTranscription = new Consumer[String] {
override def accept(text: String): Unit = {
val transcription = RadioTranscription(text = text, language = language, radioUrl = radioUrl)
store(transcription)
}
}
private val onHypothesis = new Consumer[String] {
override def accept(hypothesis: String): Unit = {
// do nothing
}
}
override def onStart(): Unit = {
val config = new SpeechServiceConfig(
subscriptionKey,
SpeechType.valueOf(speechType),
OutputFormat.valueOf(outputFormat),
new Locale(locale))
transcriber = Transcriber.create(audioType, config)
audioStream = new URL(radioUrl).openConnection.getInputStream
transcriber.transcribe(audioStream, onTranscription, onHypothesis)
}
override def onStop(): Unit = {
if (audioStream != null) audioStream.close()
if (transcriber != null) transcriber = null
}
}
开发者ID:CatalystCode,项目名称:project-fortis-spark,代码行数:57,代码来源:TranscriptionReceiver.scala
示例11: RadioInputDStream
//设置package包名称以及导入依赖的类
package com.microsoft.partnercatalyst.fortis.spark.sources.streamwrappers.radio
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver
class RadioInputDStream(
ssc: StreamingContext,
radioUrl: String,
audioType: String,
locale: String,
subscriptionKey: String,
speechType: String,
outputFormat: String,
storageLevel: StorageLevel
) extends ReceiverInputDStream[RadioTranscription](ssc) {
override def getReceiver(): Receiver[RadioTranscription] = {
logDebug("Creating radio transcription receiver")
new TranscriptionReceiver(radioUrl, audioType, locale, subscriptionKey, speechType, outputFormat, storageLevel)
}
}
开发者ID:CatalystCode,项目名称:project-fortis-spark,代码行数:23,代码来源:RadioInputDStream.scala
示例12: PollingSource
//设置package包名称以及导入依赖的类
package com.github.btmorr
package tutorials.spark.stream
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
class PollingSource(sleepSeconds: Int, uri: String) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
def onStart() {
// Start the thread that receives data over a connection
new Thread("Dummy Source") {
override def run() { receive() }
}.start()
}
def onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself isStopped() returns false
}
private def receive() {
while(!isStopped()) {
store(uri) // this just takes whatever the uri is and emits it over and over
Thread.sleep( sleepSeconds * 1000 )
}
}
}
开发者ID:btmorr,项目名称:scala-boost,代码行数:29,代码来源:PollingSource.scala
示例13: BingReceiver
//设置package包名称以及导入依赖的类
package com.github.catalystcode.fortis.spark.streaming.bing
import com.github.catalystcode.fortis.spark.streaming.bing.client.BingClient
import com.github.catalystcode.fortis.spark.streaming.bing.dto.BingPost
import com.github.catalystcode.fortis.spark.streaming.{PollingReceiver, PollingSchedule}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver
private class BingReceiver(
client: BingClient,
pollingSchedule: PollingSchedule,
storageLevel: StorageLevel,
pollingWorkers: Int
) extends PollingReceiver[BingPost](pollingSchedule, pollingWorkers, storageLevel) with Logger {
@volatile private var lastIngestedDate = Long.MinValue
override protected def poll(): Unit = {
client
.loadNewPostings
.filter(x => {
logDebug(s"Received Bing result ${x.name} from time ${x.dateLastCrawled}")
isNew(x)
})
.foreach(x => {
logInfo(s"Storing bing result ${x.url}")
store(x)
markStored(x)
})
}
private def isNew(item: BingPost) = {
val createdAt = item.dateLastCrawled.toLong
createdAt > lastIngestedDate
}
private def markStored(item: BingPost): Unit = {
val itemCreatedAt = item.dateLastCrawled.toLong
if (isNew(item)) {
lastIngestedDate = itemCreatedAt
logDebug(s"Updating last ingested date to ${item.dateLastCrawled}")
}
}
}
class BingInputDStream(
ssc: StreamingContext,
client: BingClient,
pollingSchedule: PollingSchedule,
pollingWorkers: Int,
storageLevel: StorageLevel
) extends ReceiverInputDStream[BingPost](ssc) {
override def getReceiver(): Receiver[BingPost] = {
logDebug("Creating bing receiver")
new BingReceiver(client, pollingSchedule, storageLevel, pollingWorkers)
}
}
开发者ID:CatalystCode,项目名称:streaming-bing,代码行数:62,代码来源:BingInputDStream.scala
注:本文中的org.apache.spark.streaming.receiver.Receiver类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论