• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala MqttMessage类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中org.eclipse.paho.client.mqttv3.MqttMessage的典型用法代码示例。如果您正苦于以下问题:Scala MqttMessage类的具体用法?Scala MqttMessage怎么用?Scala MqttMessage使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了MqttMessage类的10个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: Publisher

//设置package包名称以及导入依赖的类
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
import org.eclipse.paho.client.mqttv3.{MqttClient, MqttMessage}

import scala.util.{Failure, Random, Success, Try}


object Publisher extends App {
  // Global information
  val name = "apdl-case-study-from-scratch"

  // MQTT Server info to publish
  val brokerURL = "tcp://mosquitto-mqtt:1883"
  val topicTemperature = s"temp"
  val topicLuminosity = s"light"
  val persistence = new MqttDefaultFilePersistence("tmp")

  Try {
    val client = new MqttClient(brokerURL, MqttClient.generateClientId(), persistence)
    client.connect()

    val topicTemp = client.getTopic(topicTemperature)
    val topicLight = client.getTopic(topicLuminosity)

    while (true) {

      val temp = Random.nextInt(10) + 20
      val light = Random.nextInt(40) + 640
      val msgTemp = new MqttMessage(s"$temp".getBytes)
      val msgLight = new MqttMessage(s"$light".getBytes)

      println(s"Publish $temp to $topicTemperature")
      println(s"Publish $light to $topicLight")

      topicTemp.publish(msgTemp)
      topicLight.publish(msgLight)
      Thread.sleep(1000)
    }
    while(true) {
      println("OK !")
      Thread.sleep(1000)
    }

  } match {
    case Failure(exception) => println(s"ERROR : $exception + ${exception.getCause}")
    case Success(_) => println(s"OK !")
  }
} 
开发者ID:SnipyJulmy,项目名称:APDL-from-scratch-mqtt,代码行数:48,代码来源:Publisher.scala


示例2: MQTTClient

//设置package包名称以及导入依赖的类
package com.murraywilliams

import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
import org.eclipse.paho.client.mqttv3.MqttClient
import org.eclipse.paho.client.mqttv3.MqttConnectOptions
import org.eclipse.paho.client.mqttv3.MqttMessage

object MQTTClient {
  
  private[this] val logger = org.log4s.getLogger
  
  lazy val mqttClient = {
    import XBeeReceiver.conf
    
    try {
      val memoryPersistence = new MemoryPersistence()
      val brokerUrl = conf.getString("mqtt.url")
      val client = new MqttClient(brokerUrl, "xbeeSensorsClientId", memoryPersistence)
      val opts = new MqttConnectOptions()
      opts.setCleanSession(true)
      logger.info(s"Connecting to broker ${brokerUrl}...")
      client.connect(opts)
      logger.info("Connected")
      Some(client)
    } catch {
      case t : Throwable => {
        logger.error(t)("Could not connect to MQTT server.")
        None
      }
    }
  }
  
  lazy val mqttTopicRoot = XBeeReceiver.conf.getString("mqtt.topic")
  
  def mqttSend(topicSuffix : String, message : Any) = {
    val msg = message match {
      case str : String => str.getBytes
      case byteArray : Array[Byte] => byteArray
      case _ => message.toString.getBytes
    }
    mqttClient match {
      case Some(client) if client.isConnected => client.publish(s"${mqttTopicRoot}/${topicSuffix}",new MqttMessage(msg))
      case None => logger.warn("No connection to MQTT broker. Not sending any data.")
    }      
  }
  
} 
开发者ID:murraytodd,项目名称:xbeeToMQTT,代码行数:48,代码来源:MQTTClient.scala


示例3: MQTTPublisher

//设置package包名称以及导入依赖的类
package com.knoldus

import com.typesafe.config.ConfigFactory
import org.eclipse.paho.client.mqttv3.{MqttClient, MqttException, MqttMessage}
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence

import scala.sys.process._



object MQTTPublisher extends App {

  val config = ConfigFactory.load()
  val url = config.getString("mosquitto-server.url")
  val port = config.getInt("mosquitto-server.port")

  def publishToserver() = {
    println("Hey I am publishing")
    val brokerUrl = s"tcp://$url:$port"
    val topic = "TemperatureEvent"
    val tempCommand = "/opt/vc/bin/vcgencmd measure_temp"
    def getMessage = s"Temperature of CPU at ${System.currentTimeMillis()} is ${tempCommand.!!.split("=")(1)} "
    var client: MqttClient = null
    val persistence = new MemoryPersistence
    try {
      client = new MqttClient(brokerUrl, MqttClient.generateClientId, persistence)
      client.connect()
      val msgTopic = client.getTopic(topic)
      val message = new MqttMessage(getMessage.getBytes("utf-8"))
      while (true) {
        msgTopic.publish(message)
        println(s"Publishing the data topic ${msgTopic.getName} message: ${message.toString}")
        Thread.sleep(1000)
      }
    }
    catch {
      case exception: MqttException => println(s"ExceptionOccured:$exception ")
    }
    finally {
      client.disconnect()
    }
  }

  publishToserver
} 
开发者ID:shiv4nsh,项目名称:scala-mqtt-client-rasberrypi-starter-kit,代码行数:46,代码来源:MQTTPublisher.scala


示例4: MqttDispatchingListener

//设置package包名称以及导入依赖的类
package mqtt

import akka.actor.{ActorPath, ActorSystem}
import mqtt.MqttListenerMessage.ConsumeMessage
import org.eclipse.paho.client.mqttv3.{IMqttDeliveryToken, MqttCallback, MqttMessage}
import play.api.Logger


class MqttDispatchingListener(actorSystem: ActorSystem) extends MqttCallback {
  var listeners: List[ActorPath] = List.empty

  override def deliveryComplete(token: IMqttDeliveryToken): Unit = {}
  override def connectionLost(cause: Throwable): Unit = {}

  def addListener(listener:ActorPath): Unit = {
    listeners = listener :: listeners
  }

  override def messageArrived(topic: String, message: MqttMessage): Unit = {
    val msgStr = new String(message.getPayload)
    Logger.debug(s"Mqtt topic $topic received $msgStr")

    listeners.foreach(actorSystem.actorSelection(_) ! ConsumeMessage(topic, msgStr))
  }
}

object MqttListenerMessage {
  case class Ping()
  case class ConsumeMessage(receivedTopic: String, message: String)
} 
开发者ID:vavravl1,项目名称:home_center,代码行数:31,代码来源:MqttDispatchingListener.scala


示例5: MqttSinkAppA

//设置package包名称以及导入依赖的类
package org.apress.prospark

import java.nio.charset.StandardCharsets

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.eclipse.paho.client.mqttv3.MqttClient
import org.eclipse.paho.client.mqttv3.MqttMessage
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
import org.json4s.DefaultFormats
import org.json4s.JField
import org.json4s.JsonAST.JObject
import org.json4s.jvalue2extractable
import org.json4s.jvalue2monadic
import org.json4s.native.JsonMethods.parse
import org.json4s.string2JsonInput

object MqttSinkAppA {

  def main(args: Array[String]) {
    if (args.length != 3) {
      System.err.println(
        "Usage: MqttSinkApp <appname> <outputBrokerUrl> <topic>")
      System.exit(1)
    }

    val Seq(appName, outputBrokerUrl, topic) = args.toSeq

    val conf = new SparkConf()
      .setAppName(appName)
      .setJars(SparkContext.jarOfClass(this.getClass).toSeq)

    val batchInterval = 10

    val ssc = new StreamingContext(conf, Seconds(batchInterval))

    HttpUtils.createStream(ssc, url = "https://query.yahooapis.com/v1/public/yql?q=select%20*%20from%20yahoo.finance.quotes%20where%20symbol%20in%20(%22IBM,GOOG,MSFT,AAPL,FB,ORCL,YHOO,TWTR,LNKD,INTC%22)%0A%09%09&format=json&diagnostics=true&env=http%3A%2F%2Fdatatables.org%2Falltables.env",
      interval = batchInterval)
      .flatMap(rec => {
        val query = parse(rec) \ "query"
        ((query \ "results" \ "quote").children).map(rec => JObject(JField("Timestamp", query \ "created")).merge(rec))
      })
      .map(rec => {
        implicit val formats = DefaultFormats
        rec.children.map(f => f.extract[String]) mkString ","
      })
      .foreachRDD { rdd =>
        val client = new MqttClient(outputBrokerUrl, MqttClient.generateClientId(), new MemoryPersistence())
        client.connect()
        rdd.foreach(rec => client.publish(topic, new MqttMessage(rec.getBytes(StandardCharsets.UTF_8))))
        client.disconnect()
        client.close()
      }

    ssc.start()
    ssc.awaitTermination()
  }

} 
开发者ID:ZubairNabi,项目名称:prosparkstreaming,代码行数:62,代码来源:L6-5Exception.scala


示例6: MsgToElastic

//设置package包名称以及导入依赖的类
package Birdcage


import java.util.Date
import com.sksamuel.elastic4s.ElasticDsl._
import akka.actor.Actor
import com.sksamuel.elastic4s.{ElasticClient, IndexAndType, IndexAndTypes, IndexResult}
import com.typesafe.scalalogging.LazyLogging
import org.eclipse.paho.client.mqttv3.MqttMessage
import scala.concurrent.ExecutionContext.Implicits.global
import concurrent.Future
import scala.util.{Failure, Success}

object MsgToElastic {
  case class MsgMqtt(topic: String, message: MqttMessage, esClient: ElasticClient, indexTypeEs: Tuple2[String, String])
}

class MsgToElastic extends Actor with LazyLogging {

  import MsgToElastic._

  def receive = {
    case MsgMqtt(topic, message, esClient, indexTypeEs) => sentToElastic(topic, message, esClient, indexTypeEs)
    case _ => logger.debug("MsgActor received unsupported message")
  }

  def sentToElastic(topic: String, message: MqttMessage, esClient: ElasticClient, indexTypeEs: Tuple2[String, String]) = {

    val indexType = new IndexAndType(indexTypeEs._1.toLowerCase, indexTypeEs._2.toLowerCase())
    val messageVal = s"%s".format(message)
    val timestamp = new Date().toString

    val resp: Future[IndexResult] = {
      esClient.execute {
        index into IndexAndTypes.apply(indexType)  fields(
          "postcode"   -> topic.takeWhile(_ != '/').toLowerCase(),
          "huisnummer" -> topic.dropWhile(_ != '/').drop(1).takeWhile(_ != '/'),
          "timestamp"  -> timestamp,
          "subject"    -> topic.reverse.takeWhile(_ != '/').reverse,
          "value"      -> messageVal
          )
      }
    }
    resp.onComplete
    {
      case Success(res) => {
        val bufferString = "Msg to Es : %s, %s, %s\n %s\n".format(timestamp, topic, messageVal, resp)
        logger.info(bufferString)
      }
      case Failure(e) => logger.info("An error has occured while sending to Elastic")
    }
  }
} 
开发者ID:TreeWIFI,项目名称:aviary,代码行数:54,代码来源:MsgToElastic.scala


示例7: notify

//设置package包名称以及导入依赖的类
package io.hydrosphere.mist.master

import io.hydrosphere.mist.Messages.StatusMessages.SystemEvent
import io.hydrosphere.mist.master.interfaces.JsonCodecs
import io.hydrosphere.mist.master.interfaces.async.kafka.TopicProducer
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
import org.eclipse.paho.client.mqttv3.{MqttAsyncClient, MqttConnectOptions, MqttMessage}

trait JobEventPublisher {

  def notify(event: SystemEvent): Unit

  def close(): Unit

}

object JobEventPublisher {

  import JsonCodecs._
  import spray.json._

  def forKafka(host: String, port: Int, topic: String): JobEventPublisher = {
    new JobEventPublisher {
      val producer = TopicProducer(host, port, topic)

      override def notify(event: SystemEvent): Unit = {
        val json = event.toJson
        producer.send("", json)
      }

      override def close(): Unit = {}
    }
  }

  def forMqtt(host: String, port: Int, topic: String): JobEventPublisher = {
    new JobEventPublisher {

      val client = {
        val persistence = new MemoryPersistence
        val uri = s"tcp://$host:$port"
        val client = new MqttAsyncClient(uri, MqttAsyncClient.generateClientId(), persistence)

        val opt = new MqttConnectOptions
        opt.setCleanSession(true)
        client.connect(opt)

        client
      }

      override def notify(event: SystemEvent): Unit = {
        val json = event.toJson
        val message = new MqttMessage(json.getBytes())
        client.publish(topic, message)
      }

      override def close(): Unit = client.close()
    }
  }

} 
开发者ID:Hydrospheredata,项目名称:mist,代码行数:61,代码来源:JobEventPublisher.scala


示例8: MQTTPublisher

//设置package包名称以及导入依赖的类
import com.typesafe.config.ConfigFactory
import org.eclipse.paho.client.mqttv3.{MqttClient, MqttException, MqttMessage}
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence

import scala.sys.process._
import scala.util.Try



object MQTTPublisher extends App {

  val url = Try(ConfigFactory.load().getString("mosquitto-server.url")).toOption.fold("localhost")
  val port = Try(ConfigFactory.load().getInt("mosquitto-server.port")).toOption.fold(1883)

  def publishToserver() = {
    println("Hey I am publishing")
    val brokerUrl = s"tcp://$url:$port"
    val topic = "TemperatureEvent"
    val tempCommand = "/opt/vc/bin/vcgencmd measure_temp"
    def getMessage = s"Temperature of CPU at ${System.currentTimeMillis()} is ${tempCommand.!!.split("=")(1)} "
    var client: MqttClient = null
    val persistence = new MemoryPersistence
    try {
      client = new MqttClient(brokerUrl, MqttClient.generateClientId, persistence)
      client.connect()
      val msgTopic = client.getTopic(topic)
      val message = new MqttMessage(getMessage.getBytes("utf-8"))
      while (true) {
        msgTopic.publish(message)
        println(s"Publishing the data topic ${msgTopic.getName} message: ${message.toString}")
        Thread.sleep(1000)
      }
    }
    catch {
      case exception: MqttException => println(s"ExceptionOccured:$exception ")
    }
    finally {
      client.disconnect()
    }
  }

  publishToserver
} 
开发者ID:shiv4nsh,项目名称:raspi-spark-streaming-mqtt,代码行数:44,代码来源:MQTTPublisher.scala


示例9: PublishService

//设置package包名称以及导入依赖的类
package service

import com.google.inject.ImplementedBy
import org.eclipse.paho.client.mqttv3.{MqttClient, MqttMessage}
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence

import scala.concurrent.Future
import scala.util.{Failure, Success, Try}


object PublishService {
  val client = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence)
  def publish(message: String): Boolean = {
    client.connect()
    val result = Try(client.getTopic(topic+"/set")) match {
      case Success(messageTopic) =>
        val mqttMessage = new MqttMessage(message.getBytes("utf-8"))
        Try(messageTopic.publish(mqttMessage)) match {
          case Success(r) =>
            play.Logger.debug(s"Result of publishing: ${r.getMessage}")
            true
          case Failure(exception) =>

            play.Logger.error(
              s"""
                 |An error occurred while trying to publish the message "$message" to $topic into $brokerUrl:
         """.stripMargin, exception)
            false
        }
      case Failure(exception) =>
        play.Logger.error(s"""
                             |An error occurred while trying to publish the message "$message" to $topic into $brokerUrl:
         """.stripMargin, exception)
        false
    }
    client.disconnect()
    result
  }
} 
开发者ID:camilosampedro,项目名称:lights-on,代码行数:40,代码来源:PublishService.scala


示例10: SubscribeService

//设置package包名称以及导入依赖的类
package service

import controllers.socket.LightSocketActor
import model.Message
import org.eclipse.paho.client.mqttv3.{IMqttDeliveryToken, MqttCallback, MqttClient, MqttMessage}


object SubscribeService {
  val client = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence)

  def subscribe(): Unit = {
    client.connect()

    // TODO: Change topic
    client.subscribe(topic)

    val callback = new MqttCallback {
      override def deliveryComplete(token: IMqttDeliveryToken): Unit = {
        play.Logger.debug("Delivery complete!")
      }

      override def connectionLost(cause: Throwable): Unit = {
        play.Logger.debug("Connection to socket lost")
        LightSocketActor.sendMessage(Message("Connection lost"))
      }

      override def messageArrived(topic: String, message: MqttMessage): Unit = {
        play.Logger.debug(s"A message arrived: ${new String(message.getPayload)}")
        LightSocketActor.sendMessage(Message(new String(message.getPayload)))
      }
    }

    client.setCallback(callback)
  }
} 
开发者ID:camilosampedro,项目名称:lights-on,代码行数:36,代码来源:SubscribeService.scala



注:本文中的org.eclipse.paho.client.mqttv3.MqttMessage类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala OAuthAuthorization类代码示例发布时间:2022-05-23
下一篇:
Scala ClassUtils类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap