本文整理汇总了Scala中org.eclipse.paho.client.mqttv3.persist.MemoryPersistence类的典型用法代码示例。如果您正苦于以下问题:Scala MemoryPersistence类的具体用法?Scala MemoryPersistence怎么用?Scala MemoryPersistence使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了MemoryPersistence类的9个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: MQTTClient
//设置package包名称以及导入依赖的类
package com.murraywilliams
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
import org.eclipse.paho.client.mqttv3.MqttClient
import org.eclipse.paho.client.mqttv3.MqttConnectOptions
import org.eclipse.paho.client.mqttv3.MqttMessage
object MQTTClient {
private[this] val logger = org.log4s.getLogger
lazy val mqttClient = {
import XBeeReceiver.conf
try {
val memoryPersistence = new MemoryPersistence()
val brokerUrl = conf.getString("mqtt.url")
val client = new MqttClient(brokerUrl, "xbeeSensorsClientId", memoryPersistence)
val opts = new MqttConnectOptions()
opts.setCleanSession(true)
logger.info(s"Connecting to broker ${brokerUrl}...")
client.connect(opts)
logger.info("Connected")
Some(client)
} catch {
case t : Throwable => {
logger.error(t)("Could not connect to MQTT server.")
None
}
}
}
lazy val mqttTopicRoot = XBeeReceiver.conf.getString("mqtt.topic")
def mqttSend(topicSuffix : String, message : Any) = {
val msg = message match {
case str : String => str.getBytes
case byteArray : Array[Byte] => byteArray
case _ => message.toString.getBytes
}
mqttClient match {
case Some(client) if client.isConnected => client.publish(s"${mqttTopicRoot}/${topicSuffix}",new MqttMessage(msg))
case None => logger.warn("No connection to MQTT broker. Not sending any data.")
}
}
}
开发者ID:murraytodd,项目名称:xbeeToMQTT,代码行数:48,代码来源:MQTTClient.scala
示例2: MQTTSubscriber
//设置package包名称以及导入依赖的类
package com.knoldus
import com.typesafe.config.ConfigFactory
import org.eclipse.paho.client.mqttv3._
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
object MQTTSubscriber extends App {
val config = ConfigFactory.load()
val url = config.getString("mosquitto-server.url")
val port = config.getInt("mosquitto-server.port")
def subscribeToCommands() {
val brokerUrl = s"tcp://$url:$port"
val topic = "TemperatureEvent"
val persistence = new MemoryPersistence
val client = new MqttClient(brokerUrl, MqttClient.generateClientId, persistence)
client.connect
client.subscribe(topic)
val callback = new MqttCallBackImpl
client.setCallback(callback)
}
subscribeToCommands
}
class MqttCallBackImpl extends MqttCallback {
override def messageArrived(topic: String, message: MqttMessage): Unit = {
println(s"Receiving Data | Topic : $topic | Message : $message")
}
override def connectionLost(cause: Throwable): Unit = {
println(cause.toString)
}
override def deliveryComplete(token: IMqttDeliveryToken): Unit = {
println(s"Delivered Message :${token.getMessage}")
}
}
开发者ID:shiv4nsh,项目名称:scala-mqtt-client-rasberrypi-starter-kit,代码行数:44,代码来源:MQTTSubscriber.scala
示例3: MQTTPublisher
//设置package包名称以及导入依赖的类
package com.knoldus
import com.typesafe.config.ConfigFactory
import org.eclipse.paho.client.mqttv3.{MqttClient, MqttException, MqttMessage}
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
import scala.sys.process._
object MQTTPublisher extends App {
val config = ConfigFactory.load()
val url = config.getString("mosquitto-server.url")
val port = config.getInt("mosquitto-server.port")
def publishToserver() = {
println("Hey I am publishing")
val brokerUrl = s"tcp://$url:$port"
val topic = "TemperatureEvent"
val tempCommand = "/opt/vc/bin/vcgencmd measure_temp"
def getMessage = s"Temperature of CPU at ${System.currentTimeMillis()} is ${tempCommand.!!.split("=")(1)} "
var client: MqttClient = null
val persistence = new MemoryPersistence
try {
client = new MqttClient(brokerUrl, MqttClient.generateClientId, persistence)
client.connect()
val msgTopic = client.getTopic(topic)
val message = new MqttMessage(getMessage.getBytes("utf-8"))
while (true) {
msgTopic.publish(message)
println(s"Publishing the data topic ${msgTopic.getName} message: ${message.toString}")
Thread.sleep(1000)
}
}
catch {
case exception: MqttException => println(s"ExceptionOccured:$exception ")
}
finally {
client.disconnect()
}
}
publishToserver
}
开发者ID:shiv4nsh,项目名称:scala-mqtt-client-rasberrypi-starter-kit,代码行数:46,代码来源:MQTTPublisher.scala
示例4: MQTTSubscriber
//设置package包名称以及导入依赖的类
package services
import javax.inject._
import play.Configuration
import play.api.Logger
import play.api.inject.ApplicationLifecycle
import scala.concurrent.Future
import org.eclipse.paho.client.mqttv3._
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
@Singleton
class MQTTSubscriber @Inject() (config: Configuration, appLifecycle: ApplicationLifecycle) {
val url = config.getString("mqtt-broker.url")
val port = config.getInt("mqtt-broker.port")
val brokerUrl = s"tcp://$url:$port"
val topics = Array("Lobby", "DeviceStat")
val persistence = new MemoryPersistence
try {
val client = new MqttClient(brokerUrl, MqttClient.generateClientId, persistence)
client.connect()
client.subscribe(topics)
val callback = new MqttCallBackImpl
client.setCallback(callback)
// When the application starts, register a stop hook with the
// ApplicationLifecycle object. The code inside the stop hook will
// be run when the application stops.
appLifecycle.addStopHook { () =>
client.disconnect()
client.close()
Logger.info(s"MQTT Subscriber: Closing MQTT connection...")
Future.successful(())
}
} catch {
case me: MqttException => {
Logger.info(s"MqttException: " + me.getReasonCode())
Logger.info(me.getMessage)
}
}
Logger.info(s"MQTT Subscriber: Connected to broker at $url")
}
class MqttCallBackImpl extends MqttCallback {
override def messageArrived(topic: String, message: MqttMessage): Unit = {
Logger.info(s"Receiving Data | Topic : $topic | Message : $message")
}
override def connectionLost(cause: Throwable): Unit = {
Logger.info(cause.toString)
}
override def deliveryComplete(token: IMqttDeliveryToken): Unit = {
Logger.info(s"Delivered Message :${token.getMessage}")
}
}
开发者ID:istanleyy,项目名称:play-w-mqtt,代码行数:60,代码来源:MQTTSubscriber.scala
示例5: MqttSinkAppA
//设置package包名称以及导入依赖的类
package org.apress.prospark
import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.eclipse.paho.client.mqttv3.MqttClient
import org.eclipse.paho.client.mqttv3.MqttMessage
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
import org.json4s.DefaultFormats
import org.json4s.JField
import org.json4s.JsonAST.JObject
import org.json4s.jvalue2extractable
import org.json4s.jvalue2monadic
import org.json4s.native.JsonMethods.parse
import org.json4s.string2JsonInput
object MqttSinkAppA {
def main(args: Array[String]) {
if (args.length != 3) {
System.err.println(
"Usage: MqttSinkApp <appname> <outputBrokerUrl> <topic>")
System.exit(1)
}
val Seq(appName, outputBrokerUrl, topic) = args.toSeq
val conf = new SparkConf()
.setAppName(appName)
.setJars(SparkContext.jarOfClass(this.getClass).toSeq)
val batchInterval = 10
val ssc = new StreamingContext(conf, Seconds(batchInterval))
HttpUtils.createStream(ssc, url = "https://query.yahooapis.com/v1/public/yql?q=select%20*%20from%20yahoo.finance.quotes%20where%20symbol%20in%20(%22IBM,GOOG,MSFT,AAPL,FB,ORCL,YHOO,TWTR,LNKD,INTC%22)%0A%09%09&format=json&diagnostics=true&env=http%3A%2F%2Fdatatables.org%2Falltables.env",
interval = batchInterval)
.flatMap(rec => {
val query = parse(rec) \ "query"
((query \ "results" \ "quote").children).map(rec => JObject(JField("Timestamp", query \ "created")).merge(rec))
})
.map(rec => {
implicit val formats = DefaultFormats
rec.children.map(f => f.extract[String]) mkString ","
})
.foreachRDD { rdd =>
val client = new MqttClient(outputBrokerUrl, MqttClient.generateClientId(), new MemoryPersistence())
client.connect()
rdd.foreach(rec => client.publish(topic, new MqttMessage(rec.getBytes(StandardCharsets.UTF_8))))
client.disconnect()
client.close()
}
ssc.start()
ssc.awaitTermination()
}
}
开发者ID:ZubairNabi,项目名称:prosparkstreaming,代码行数:62,代码来源:L6-5Exception.scala
示例6: notify
//设置package包名称以及导入依赖的类
package io.hydrosphere.mist.master
import io.hydrosphere.mist.Messages.StatusMessages.SystemEvent
import io.hydrosphere.mist.master.interfaces.JsonCodecs
import io.hydrosphere.mist.master.interfaces.async.kafka.TopicProducer
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
import org.eclipse.paho.client.mqttv3.{MqttAsyncClient, MqttConnectOptions, MqttMessage}
trait JobEventPublisher {
def notify(event: SystemEvent): Unit
def close(): Unit
}
object JobEventPublisher {
import JsonCodecs._
import spray.json._
def forKafka(host: String, port: Int, topic: String): JobEventPublisher = {
new JobEventPublisher {
val producer = TopicProducer(host, port, topic)
override def notify(event: SystemEvent): Unit = {
val json = event.toJson
producer.send("", json)
}
override def close(): Unit = {}
}
}
def forMqtt(host: String, port: Int, topic: String): JobEventPublisher = {
new JobEventPublisher {
val client = {
val persistence = new MemoryPersistence
val uri = s"tcp://$host:$port"
val client = new MqttAsyncClient(uri, MqttAsyncClient.generateClientId(), persistence)
val opt = new MqttConnectOptions
opt.setCleanSession(true)
client.connect(opt)
client
}
override def notify(event: SystemEvent): Unit = {
val json = event.toJson
val message = new MqttMessage(json.getBytes())
client.publish(topic, message)
}
override def close(): Unit = client.close()
}
}
}
开发者ID:Hydrospheredata,项目名称:mist,代码行数:61,代码来源:JobEventPublisher.scala
示例7: MQTTPublisher
//设置package包名称以及导入依赖的类
import com.typesafe.config.ConfigFactory
import org.eclipse.paho.client.mqttv3.{MqttClient, MqttException, MqttMessage}
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
import scala.sys.process._
import scala.util.Try
object MQTTPublisher extends App {
val url = Try(ConfigFactory.load().getString("mosquitto-server.url")).toOption.fold("localhost")
val port = Try(ConfigFactory.load().getInt("mosquitto-server.port")).toOption.fold(1883)
def publishToserver() = {
println("Hey I am publishing")
val brokerUrl = s"tcp://$url:$port"
val topic = "TemperatureEvent"
val tempCommand = "/opt/vc/bin/vcgencmd measure_temp"
def getMessage = s"Temperature of CPU at ${System.currentTimeMillis()} is ${tempCommand.!!.split("=")(1)} "
var client: MqttClient = null
val persistence = new MemoryPersistence
try {
client = new MqttClient(brokerUrl, MqttClient.generateClientId, persistence)
client.connect()
val msgTopic = client.getTopic(topic)
val message = new MqttMessage(getMessage.getBytes("utf-8"))
while (true) {
msgTopic.publish(message)
println(s"Publishing the data topic ${msgTopic.getName} message: ${message.toString}")
Thread.sleep(1000)
}
}
catch {
case exception: MqttException => println(s"ExceptionOccured:$exception ")
}
finally {
client.disconnect()
}
}
publishToserver
}
开发者ID:shiv4nsh,项目名称:raspi-spark-streaming-mqtt,代码行数:44,代码来源:MQTTPublisher.scala
示例8: MQTTConnector
//设置package包名称以及导入依赖的类
package cer.drtec.IO
import cer.drtec.utils.SerializedLogging
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
import org.eclipse.paho.client.mqttv3._
private[drtec] case class MQTTConnector(brokerUrl: String, inputTopic: String, outputTopic: String,
username: Option[String], password: Option[String])
extends MqttCallback with SerializedLogging with Serializable{
var client: MQTTClient = null
private[drtec] def connect() {
try {
client = new MQTTClient(brokerUrl, MqttClient.generateClientId(), new MemoryPersistence())
val properties = new MqttConnectOptions()
username match {
case Some(user) => properties.setUserName(user)
case None =>
}
password match {
case Some(pass) => properties.setPassword(pass.toCharArray)
case None =>
}
client.connect(properties)
client.setCallback(this)
client.subscribe(inputTopic)
}
catch{
case e:Throwable =>
error("trying to reconnect to MQTT broker\nURL -> " + brokerUrl + "...")
Thread.sleep(5000)
connect()
}
}
override private[drtec] def connectionLost(cause: Throwable) {
error("trying to reconnect to MQTT broker...")
Thread.sleep(5000)
connect()
}
override private[drtec] def messageArrived(topic: String, message: MqttMessage) = {
val batch = new String(message.getPayload,"utf-8")
InputHandler.mqttData.offer(batch)
}
override private[drtec] def deliveryComplete(token: IMqttDeliveryToken) {}
private[drtec] def publish(message: String){
val mqttMessage = new MqttMessage()
mqttMessage.setPayload(message.getBytes)
client.publish(outputTopic, mqttMessage)
}
}
开发者ID:blackeye42,项目名称:dRTEC,代码行数:58,代码来源:MQTTConnector.scala
示例9: BrokerUtil
//设置package包名称以及导入依赖的类
package com.ubirch.backend.util
import com.ubirch.backend.config.Config
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
import org.eclipse.paho.client.mqttv3.{MqttClient, MqttConnectOptions}
object BrokerUtil {
def connectBroker(topic: String, qos: Int, clientId: String): MqttClient = {
val broker = Config.mqttBroker
val persistence = new MemoryPersistence()
val mqttClient = new MqttClient(broker, clientId, persistence)
val connOpts = new MqttConnectOptions()
connOpts.setCleanSession(true)
mqttClient.connect(connOpts)
mqttClient
}
}
开发者ID:ubirch,项目名称:ubirch-storage-service,代码行数:21,代码来源:BrokerUtil.scala
注:本文中的org.eclipse.paho.client.mqttv3.persist.MemoryPersistence类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论