• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala Channel类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中com.rabbitmq.client.Channel的典型用法代码示例。如果您正苦于以下问题:Scala Channel类的具体用法?Scala Channel怎么用?Scala Channel使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Channel类的13个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: MQProducer

//设置package包名称以及导入依赖的类
package mq

import java.io.{ByteArrayOutputStream, ObjectOutputStream}

import com.fasterxml.jackson.databind.ObjectMapper
import twitter4j.Status
import com.rabbitmq.client.{Channel, ConnectionFactory}



object MQProducer {

  private val EXCHANGE_TWITTER = "twitter_exchange"

  def initiateRabbitMQProducer:Channel = {
    val factory = new ConnectionFactory
    factory.setHost("localhost")
    val connection = factory.newConnection
    val channel = connection.createChannel()
    channel.exchangeDeclare(EXCHANGE_TWITTER, "topic")
    channel
  }

  def produce(channel: Channel,topic:String ,status:Status): Unit ={
    println(status.getText)
    channel.basicPublish(EXCHANGE_TWITTER,topic,null,serialise(status))
  }


  def serialise(value: Any): Array[Byte] = {
    val stream: ByteArrayOutputStream = new ByteArrayOutputStream()
    val oos = new ObjectOutputStream(stream)
    oos.writeObject(value)
    oos.close
    stream.toByteArray
  }

} 
开发者ID:neruti-developers,项目名称:neruti-demo-backend,代码行数:39,代码来源:MQProducer.scala


示例2: MQAgent

//设置package包名称以及导入依赖的类
package com.asto.dmp.mgtevl.mq

import com.asto.dmp.mgtevl.base.Props
import com.rabbitmq.client.{Channel, Connection, ConnectionFactory, MessageProperties}
import org.apache.spark.Logging

object MQAgent extends Logging {
  private val connection: Connection = getConnection
  private val channel: Channel = connection.createChannel

  def getConnection = {
    val connectionFactory = new ConnectionFactory
    connectionFactory.setHost(Props.get("rabbit_mq_host"))
    connectionFactory.setPort(Props.get("rabbit_mq_port").toInt)
    connectionFactory.setUsername(Props.get("rabbit_mq_username"))
    connectionFactory.setPassword(Props.get("rabbit_mq_password"))
    connectionFactory.newConnection
  }

  def send(message: String) {
    val queueName = Props.get("queue_name_online")
    //???????????????????????
    channel.queueDeclare(queueName, true, false, false, null)
    channel.exchangeDeclare(queueName, "direct", true)
    channel.queueBind(queueName, queueName, queueName)
    channel.basicPublish(queueName, queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"))
  }

  def close() {
    if (Option(channel).isDefined) channel.close()
    if (Option(connection).isDefined) connection.close()
  }

} 
开发者ID:zj-lingxin,项目名称:MgtEvl,代码行数:35,代码来源:MQAgent.scala


示例3: AMQPMndSetup

//设置package包名称以及导入依赖的类
package com.biosimilarity.lift.lib

import com.rabbitmq.client.AMQP.{Exchange, Queue}
import com.rabbitmq.client.{Channel, Connection, ConnectionFactory}

import scala.collection.mutable

object AMQPMndSetup {

  // ht 2016.07.15
  // It would be better to use a fixture for setting up the AMQP connection,
  // but we'll need to wait until the AMQPMnd implementation doesn't rely on Path-dependent Types
  def roundTrip[A](src: String, trgt: String, qName: String, produced: Set[A], consumed: mutable.Set[A]): () => Unit = {
    val srcScope: AMQPStdScope[A]                   = new AMQPStdScope[A]()
    val trgtScope: AMQPStdScope[A]                  = new AMQPStdScope[A]()
    val srcQM: srcScope.AMQPQueueHostExchangeM[A]   = new srcScope.AMQPQueueHostExchangeM[A](src, qName)
    val trgtQM: trgtScope.AMQPQueueHostExchangeM[A] = new trgtScope.AMQPQueueHostExchangeM[A](trgt, qName)
    val srcQ: srcScope.AMQPQueue[A]                 = srcQM.zero[A]
    val trgtQ: trgtScope.AMQPQueue[A]               = trgtQM.zero[A]

    val cleanup: () => Unit = { () =>
      val factory: ConnectionFactory          = srcScope.factory
      val conn: Connection                    = factory.newConnection()
      val channel: Channel                    = conn.createChannel()
      val exchangeDeleteOk: Exchange.DeleteOk = channel.exchangeDelete(qName)
      val queueDeleteOk: Queue.DeleteOk       = channel.queueDelete(qName + "_queue")
      channel.close()
      conn.close()
    }

    trgtQM(trgtQ).foreach { (msg: A) =>
      consumed += msg
    }

    produced.foreach { (v: A) =>
      srcQ ! v
    }

    cleanup
  }
} 
开发者ID:synereo,项目名称:synereo,代码行数:42,代码来源:AMQPMndSetup.scala


示例4: AMQPTwistedPairMndSetup

//设置package包名称以及导入依赖的类
package com.biosimilarity.lift.lib

import java.net.URI

import com.rabbitmq.client.AMQP.{Exchange, Queue}
import com.rabbitmq.client.{Channel, Connection, ConnectionFactory}

import scala.collection.mutable

object AMQPTwistedPairMndSetup {

  def roundTrip[A](src: URI, trgt: URI, qName: String, produced: Set[A], consumed: mutable.Set[A]): () => Unit = {
    val scope: AMQPTwistedPairScope[A] with AMQPBrokerScope[A] with MonadicDispatcherScope[A] = AMQPStdTPS[A](src, trgt)
    val qpM: scope.AMQPQueueMQT[A, scope.AMQPAbstractQueue] =
      new scope.TwistedQueuePairM[A](qName, "routeroute").asInstanceOf[scope.AMQPQueueMQT[A, scope.AMQPAbstractQueue]]
    val qtp: scope.AMQPAbstractQueue[A] = qpM.zero[A]

    val cleanup: () => Unit = { () =>
      val factory: ConnectionFactory          = scope.factory
      val conn: Connection                    = factory.newConnection()
      val channel: Channel                    = conn.createChannel()
      val exchangeDeleteOk: Exchange.DeleteOk = channel.exchangeDelete(qName)
      val queueDeleteOk: Queue.DeleteOk       = channel.queueDelete(qName + "_queue")
      channel.close()
      conn.close()
    }

    qpM(qtp).foreach { (msg: A) =>
      consumed += msg
    }

    produced.foreach { (v: A) =>
      qtp ! v
    }

    cleanup
  }

  val localhost: URI = new URI("amqp", null, "localhost", 5672, "/mult", "routingKey=routeroute", null)
} 
开发者ID:synereo,项目名称:synereo,代码行数:41,代码来源:AMQPTwistedPairMndSetup.scala


示例5: MQAgent

//设置package包名称以及导入依赖的类
package com.asto.dmp.alipay.mq

import com.asto.dmp.alipay.base.Props
import com.rabbitmq.client.{Channel, Connection, ConnectionFactory, MessageProperties}
import org.apache.spark.Logging

object MQAgent extends Logging {
  private val connection: Connection = getConnection
  private val channel: Channel = connection.createChannel

  def getConnection = {
    val connectionFactory = new ConnectionFactory
    connectionFactory.setHost(Props.get("rabbit_mq_host"))
    connectionFactory.setPort(Props.get("rabbit_mq_port").toInt)
    connectionFactory.setUsername(Props.get("rabbit_mq_username"))
    connectionFactory.setPassword(Props.get("rabbit_mq_password"))
    connectionFactory.newConnection
  }

  def send(message: String) {
    val queueName = Props.get("queue_name_online")
    //???????????????????????
    channel.queueDeclare(queueName, true, false, false, null)
    channel.exchangeDeclare(queueName, "direct", true)
    channel.queueBind(queueName, queueName, queueName)
    channel.basicPublish(queueName, queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"))
  }

  def close() {
    if (Option(channel).isDefined) channel.close()
    if (Option(connection).isDefined) connection.close()
  }

} 
开发者ID:zj-lingxin,项目名称:dmp_alipay_loan_balance,代码行数:35,代码来源:MQAgent.scala


示例6: props

//设置package包名称以及导入依赖的类
package com.eigenroute.plumbing

import akka.actor._
import akka.routing.SmallestMailboxPool
import com.rabbitmq.client.{Channel, ConnectionFactory}
import com.spingo.op_rabbit.PlayJsonSupport._
import com.spingo.op_rabbit.{Message, RabbitControl}
import com.thenewmotion.akka.rabbitmq._
import com.typesafe.config.ConfigFactory
import play.api.libs.json._

trait RabbitMQPublisherSubscriber extends PublisherSubscriber {

  val actorSystem: ActorSystem
  val exchange: String
  def props: Props
  val nrOfInstances = 10000
  val convert: (String) => Option[MessageBrokerMessageType]

  val rabbitControl = actorSystem.actorOf(Props[RabbitControl])

  override def publish(message: JsValue, routingKey: String): Unit = {
    rabbitControl ! Message.exchange(message, exchange, routingKey)
  }

  val conf = ConfigFactory.load()
  val queueName: String = conf.getString("eigenroute-publish-subscribe.queueName")

  val factory = new ConnectionFactory()
  factory.setHost(conf.getString("op-rabbit.connection.host"))
  factory.setPort(conf.getInt("op-rabbit.connection.port"))
  factory.setUsername(conf.getString("op-rabbit.connection.username"))
  factory.setPassword(conf.getString("op-rabbit.connection.password"))
  factory.setVirtualHost(conf.getString("op-rabbit.connection.virtual-host"))
  val useSSL = conf.getBoolean("op-rabbit.connection.ssl")
  if (useSSL) factory.useSslProtocol()

  import concurrent.duration._
  val connectionActor: ActorRef =
    actorSystem.actorOf(ConnectionActor.props(factory, 3.seconds), "subscriber-connection")

  def setupSubscriber(channel: Channel, self: ActorRef) {
    val queue = channel.queueDeclare(queueName, true, false, false, new java.util.HashMap()).getQueue
    val incomingMessageHandler = actorSystem.actorOf(props.withRouter(SmallestMailboxPool(nrOfInstances = nrOfInstances)))
    val consumer = new DefaultConsumer(channel) {
      override def handleDelivery(consumerTag: String, envelope: Envelope, properties: BasicProperties, body: Array[Byte]) {
        val incomingMessageJson = Json.parse(new String(body))
        val maybeMessage = convert(envelope.getRoutingKey).flatMap(_.toMessageBrokerMessage(incomingMessageJson))
        maybeMessage.foreach { message => incomingMessageHandler ! message }
      }
    }
    channel.basicConsume(queue, true, consumer)
  }

  connectionActor ! CreateChannel(ChannelActor.props(setupSubscriber), Some("subscriber"))

  def fromBytes(x: Array[Byte]) = new String(x, "UTF-8")

} 
开发者ID:shafiquejamal,项目名称:eigenroute-publish-subscribe-minimal-scala,代码行数:60,代码来源:RabbitMQPublisherSubscriber.scala


示例7: ListeningActor

//设置package包名称以及导入依赖的类
package models

import akka.actor.{Actor, Props}
import com.rabbitmq.client.{Channel, QueueingConsumer}


class ListeningActor(channel: Channel, queue: String, f: (String) => Any) extends Actor {

  // called on the initial run
  def receive = {
    case _ => startReceving
  }

  def startReceving = {

    val consumer = new QueueingConsumer(channel);
    channel.basicConsume(queue, true, consumer);

    while (true) {
      // wait for the message
      val delivery = consumer.nextDelivery();
      val msg = new String(delivery.getBody());

      // send the message to the provided callback function
      // and execute this in a subactor
      context.actorOf(Props(new Actor {
        def receive = {
          case some: String => f(some);
        }
      })) ! msg
    }
  }
} 
开发者ID:nkaraolis,项目名称:NBGardensNewPlay,代码行数:34,代码来源:ListeningActor.scala


示例8: MQAgent

//设置package包名称以及导入依赖的类
package com.asto.dmp.relcir.mq

import com.asto.dmp.relcir.base.Props
import com.rabbitmq.client.{Channel, Connection, ConnectionFactory, MessageProperties}
import org.apache.spark.Logging

object MQAgent extends Logging {
  private val connection: Connection = getConnection
  private val channel: Channel = connection.createChannel

  def getConnection = {
    val connectionFactory = new ConnectionFactory
    connectionFactory.setHost(Props.get("rabbit_mq_host"))
    connectionFactory.setPort(Props.get("rabbit_mq_port").toInt)
    connectionFactory.setUsername(Props.get("rabbit_mq_username"))
    connectionFactory.setPassword(Props.get("rabbit_mq_password"))
    connectionFactory.newConnection
  }

  def send(queueName: String, message: String) {
    //???????????????????????
    channel.queueDeclare(queueName, true, false, false, null)
    channel.exchangeDeclare(queueName, "direct", true)
    channel.queueBind(queueName, queueName, queueName)
    channel.basicPublish(queueName, queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"))
  }

  def close() {
    if (Option(channel).isDefined) channel.close()
    if (Option(connection).isDefined) connection.close()
  }

} 
开发者ID:zj-lingxin,项目名称:dmp_relationship_circle,代码行数:34,代码来源:MQAgent.scala


示例9: Messenger

//设置package包名称以及导入依赖的类
package com.example.services

import com.example.models.SystemMessage
import com.rabbitmq.client.{Channel, Connection, ConnectionFactory}
import org.json4s.NoTypeHints
import org.json4s.native.Serialization
import org.json4s.native.Serialization.write

class Messenger {
  private val QUEUE_NAME: String = "places"
  implicit val formats = Serialization.formats(NoTypeHints)


  def send(message: String) : Unit = {
    val factory: ConnectionFactory = new ConnectionFactory
    factory.setHost("localhost")
    val connection: Connection = factory.newConnection
    val channel: Channel = connection.createChannel


    channel.queueDeclare(QUEUE_NAME, false, false, false, null)
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes())


    channel.close()
    connection.close()
  }

  def send(message: SystemMessage): Unit = {
    send(write(message))
  }

}
object Messenger {
  def apply() = new Messenger()
} 
开发者ID:divanvisagie,项目名称:lazy-places,代码行数:37,代码来源:Messenger.scala


示例10: MQAgent

//设置package包名称以及导入依赖的类
package com.asto.dmp.xxx.mq

import com.asto.dmp.xxx.base.Props
import com.rabbitmq.client.{Channel, Connection, ConnectionFactory, MessageProperties}
import org.apache.spark.Logging

object MQAgent extends Logging {
  private val connection: Connection = getConnection
  private val channel: Channel = connection.createChannel

  def getConnection = {
    val connectionFactory = new ConnectionFactory
    connectionFactory.setHost(Props.get("rabbit_mq_host"))
    connectionFactory.setPort(Props.get("rabbit_mq_port").toInt)
    connectionFactory.setUsername(Props.get("rabbit_mq_username"))
    connectionFactory.setPassword(Props.get("rabbit_mq_password"))
    connectionFactory.newConnection
  }

  def send(message: String) {
    val queueName = Props.get("queue_name_online")
    //???????????????????????
    channel.queueDeclare(queueName, true, false, false, null)
    channel.exchangeDeclare(queueName, "direct", true)
    channel.queueBind(queueName, queueName, queueName)
    channel.basicPublish(queueName, queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"))
  }

  def close() {
    if (Option(channel).isDefined) channel.close()
    if (Option(connection).isDefined) connection.close()
  }

} 
开发者ID:zj-lingxin,项目名称:asto-sparksql,代码行数:35,代码来源:MQAgent.scala


示例11: Producer

//设置package包名称以及导入依赖的类
package org.juanitodread.broker.client

import com.newmotion.akka.rabbitmq.ChannelActor
import com.newmotion.akka.rabbitmq.ChannelMessage
import com.newmotion.akka.rabbitmq.ConnectionActor
import com.newmotion.akka.rabbitmq.CreateChannel
import com.rabbitmq.client.Channel
import com.rabbitmq.client.MessageProperties

import akka.actor.ActorRef
import akka.actor.ActorSystem


object Producer {
  implicit val system = ActorSystem("rabbit-actor")

  val factory = RabbitConnection.getConnectionFactory
  val connectionActor = system.actorOf(ConnectionActor.props(factory), "rabbitmq")

  val queue = RabbitConnection.inputQueue
  val exchange = "amq.fanout"

  // this function will be called each time new channel received
  def setupChannel(channel: Channel, self: ActorRef) {
    channel.queueDeclare(queue, true, false, false, null)
    channel.queueBind(queue, exchange, "")
  }

  connectionActor ! CreateChannel(ChannelActor.props(setupChannel), Some("publisher"))

  def produce(message: String): Unit = {
    val publisher = system.actorSelection("/user/rabbitmq/publisher")

    def publish(channel: Channel) {
      channel.basicPublish(exchange, queue, MessageProperties.PERSISTENT_TEXT_PLAIN, toBytes(message))
    }
    publisher ! ChannelMessage(publish, dropIfNoChannel = false)
  }

  def fromBytes(x: Array[Byte]) = new String(x, "UTF-8")
  def toBytes(x: String) = x.toString.getBytes("UTF-8")
} 
开发者ID:juanitodread,项目名称:pitaya,代码行数:43,代码来源:Producer.scala


示例12: RabbitQueueConsumer

//设置package包名称以及导入依赖的类
package com.fustigatedcat.heystk.engine.queue

import com.fustigatedcat.heystk.common.normalization.Normalization
import com.fustigatedcat.heystk.engine.Engine
import com.fustigatedcat.heystk.engine.processor.Processor
import com.rabbitmq.client.{AMQP, Envelope, Channel, DefaultConsumer}
import org.json4s.native.JsonMethods.parse
import org.slf4j.LoggerFactory

class RabbitQueueConsumer(channel : Channel) extends DefaultConsumer(channel) {

  val logger = LoggerFactory.getLogger(this.getClass)

  implicit val formats = org.json4s.DefaultFormats

  val processor = Processor.create(Engine.config)

  override def handleDelivery(consumerTag : String, envelope : Envelope, properties : AMQP.BasicProperties, body : Array[Byte]) : Unit = {
    logger.debug("Processing a new normalization {}", envelope.getDeliveryTag)
    val normalization = parse(new String(body)).extract[Normalization]
    logger.debug("Got normalization {}", normalization)
    processor.process(normalization)
    logger.debug("Acking normalization {}", envelope.getDeliveryTag)
    channel.basicAck(envelope.getDeliveryTag, false)
  }

} 
开发者ID:fustigatedcat,项目名称:heystk,代码行数:28,代码来源:RabbitQueueConsumer.scala


示例13: AMQPUtil

//设置package包名称以及导入依赖的类
package com.biosimilarity.lift.lib.amqp

import com.rabbitmq.client.{Channel, Connection, ConnectionFactory}

import scala.util.Try

object AMQPUtil {

  def rabbitIsRunning(host: String = "localhost", port: Int = 5672): Boolean =
    Try {
      val factory: ConnectionFactory = new ConnectionFactory
      factory.setHost(host)
      factory.setPort(port)
      val conn: Connection           = factory.newConnection
      val channel: Channel           = conn.createChannel
      channel.close()
      conn.close()
    }.isSuccess
} 
开发者ID:synereo,项目名称:synereo,代码行数:20,代码来源:AMQPUtil.scala



注:本文中的com.rabbitmq.client.Channel类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala UUIDs类代码示例发布时间:2022-05-23
下一篇:
Scala ObjectMetadata类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap