本文整理汇总了Scala中com.rabbitmq.client.Channel类的典型用法代码示例。如果您正苦于以下问题:Scala Channel类的具体用法?Scala Channel怎么用?Scala Channel使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Channel类的13个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: MQProducer
//设置package包名称以及导入依赖的类
package mq
import java.io.{ByteArrayOutputStream, ObjectOutputStream}
import com.fasterxml.jackson.databind.ObjectMapper
import twitter4j.Status
import com.rabbitmq.client.{Channel, ConnectionFactory}
object MQProducer {
private val EXCHANGE_TWITTER = "twitter_exchange"
def initiateRabbitMQProducer:Channel = {
val factory = new ConnectionFactory
factory.setHost("localhost")
val connection = factory.newConnection
val channel = connection.createChannel()
channel.exchangeDeclare(EXCHANGE_TWITTER, "topic")
channel
}
def produce(channel: Channel,topic:String ,status:Status): Unit ={
println(status.getText)
channel.basicPublish(EXCHANGE_TWITTER,topic,null,serialise(status))
}
def serialise(value: Any): Array[Byte] = {
val stream: ByteArrayOutputStream = new ByteArrayOutputStream()
val oos = new ObjectOutputStream(stream)
oos.writeObject(value)
oos.close
stream.toByteArray
}
}
开发者ID:neruti-developers,项目名称:neruti-demo-backend,代码行数:39,代码来源:MQProducer.scala
示例2: MQAgent
//设置package包名称以及导入依赖的类
package com.asto.dmp.mgtevl.mq
import com.asto.dmp.mgtevl.base.Props
import com.rabbitmq.client.{Channel, Connection, ConnectionFactory, MessageProperties}
import org.apache.spark.Logging
object MQAgent extends Logging {
private val connection: Connection = getConnection
private val channel: Channel = connection.createChannel
def getConnection = {
val connectionFactory = new ConnectionFactory
connectionFactory.setHost(Props.get("rabbit_mq_host"))
connectionFactory.setPort(Props.get("rabbit_mq_port").toInt)
connectionFactory.setUsername(Props.get("rabbit_mq_username"))
connectionFactory.setPassword(Props.get("rabbit_mq_password"))
connectionFactory.newConnection
}
def send(message: String) {
val queueName = Props.get("queue_name_online")
//???????????????????????
channel.queueDeclare(queueName, true, false, false, null)
channel.exchangeDeclare(queueName, "direct", true)
channel.queueBind(queueName, queueName, queueName)
channel.basicPublish(queueName, queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"))
}
def close() {
if (Option(channel).isDefined) channel.close()
if (Option(connection).isDefined) connection.close()
}
}
开发者ID:zj-lingxin,项目名称:MgtEvl,代码行数:35,代码来源:MQAgent.scala
示例3: AMQPMndSetup
//设置package包名称以及导入依赖的类
package com.biosimilarity.lift.lib
import com.rabbitmq.client.AMQP.{Exchange, Queue}
import com.rabbitmq.client.{Channel, Connection, ConnectionFactory}
import scala.collection.mutable
object AMQPMndSetup {
// ht 2016.07.15
// It would be better to use a fixture for setting up the AMQP connection,
// but we'll need to wait until the AMQPMnd implementation doesn't rely on Path-dependent Types
def roundTrip[A](src: String, trgt: String, qName: String, produced: Set[A], consumed: mutable.Set[A]): () => Unit = {
val srcScope: AMQPStdScope[A] = new AMQPStdScope[A]()
val trgtScope: AMQPStdScope[A] = new AMQPStdScope[A]()
val srcQM: srcScope.AMQPQueueHostExchangeM[A] = new srcScope.AMQPQueueHostExchangeM[A](src, qName)
val trgtQM: trgtScope.AMQPQueueHostExchangeM[A] = new trgtScope.AMQPQueueHostExchangeM[A](trgt, qName)
val srcQ: srcScope.AMQPQueue[A] = srcQM.zero[A]
val trgtQ: trgtScope.AMQPQueue[A] = trgtQM.zero[A]
val cleanup: () => Unit = { () =>
val factory: ConnectionFactory = srcScope.factory
val conn: Connection = factory.newConnection()
val channel: Channel = conn.createChannel()
val exchangeDeleteOk: Exchange.DeleteOk = channel.exchangeDelete(qName)
val queueDeleteOk: Queue.DeleteOk = channel.queueDelete(qName + "_queue")
channel.close()
conn.close()
}
trgtQM(trgtQ).foreach { (msg: A) =>
consumed += msg
}
produced.foreach { (v: A) =>
srcQ ! v
}
cleanup
}
}
开发者ID:synereo,项目名称:synereo,代码行数:42,代码来源:AMQPMndSetup.scala
示例4: AMQPTwistedPairMndSetup
//设置package包名称以及导入依赖的类
package com.biosimilarity.lift.lib
import java.net.URI
import com.rabbitmq.client.AMQP.{Exchange, Queue}
import com.rabbitmq.client.{Channel, Connection, ConnectionFactory}
import scala.collection.mutable
object AMQPTwistedPairMndSetup {
def roundTrip[A](src: URI, trgt: URI, qName: String, produced: Set[A], consumed: mutable.Set[A]): () => Unit = {
val scope: AMQPTwistedPairScope[A] with AMQPBrokerScope[A] with MonadicDispatcherScope[A] = AMQPStdTPS[A](src, trgt)
val qpM: scope.AMQPQueueMQT[A, scope.AMQPAbstractQueue] =
new scope.TwistedQueuePairM[A](qName, "routeroute").asInstanceOf[scope.AMQPQueueMQT[A, scope.AMQPAbstractQueue]]
val qtp: scope.AMQPAbstractQueue[A] = qpM.zero[A]
val cleanup: () => Unit = { () =>
val factory: ConnectionFactory = scope.factory
val conn: Connection = factory.newConnection()
val channel: Channel = conn.createChannel()
val exchangeDeleteOk: Exchange.DeleteOk = channel.exchangeDelete(qName)
val queueDeleteOk: Queue.DeleteOk = channel.queueDelete(qName + "_queue")
channel.close()
conn.close()
}
qpM(qtp).foreach { (msg: A) =>
consumed += msg
}
produced.foreach { (v: A) =>
qtp ! v
}
cleanup
}
val localhost: URI = new URI("amqp", null, "localhost", 5672, "/mult", "routingKey=routeroute", null)
}
开发者ID:synereo,项目名称:synereo,代码行数:41,代码来源:AMQPTwistedPairMndSetup.scala
示例5: MQAgent
//设置package包名称以及导入依赖的类
package com.asto.dmp.alipay.mq
import com.asto.dmp.alipay.base.Props
import com.rabbitmq.client.{Channel, Connection, ConnectionFactory, MessageProperties}
import org.apache.spark.Logging
object MQAgent extends Logging {
private val connection: Connection = getConnection
private val channel: Channel = connection.createChannel
def getConnection = {
val connectionFactory = new ConnectionFactory
connectionFactory.setHost(Props.get("rabbit_mq_host"))
connectionFactory.setPort(Props.get("rabbit_mq_port").toInt)
connectionFactory.setUsername(Props.get("rabbit_mq_username"))
connectionFactory.setPassword(Props.get("rabbit_mq_password"))
connectionFactory.newConnection
}
def send(message: String) {
val queueName = Props.get("queue_name_online")
//???????????????????????
channel.queueDeclare(queueName, true, false, false, null)
channel.exchangeDeclare(queueName, "direct", true)
channel.queueBind(queueName, queueName, queueName)
channel.basicPublish(queueName, queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"))
}
def close() {
if (Option(channel).isDefined) channel.close()
if (Option(connection).isDefined) connection.close()
}
}
开发者ID:zj-lingxin,项目名称:dmp_alipay_loan_balance,代码行数:35,代码来源:MQAgent.scala
示例6: props
//设置package包名称以及导入依赖的类
package com.eigenroute.plumbing
import akka.actor._
import akka.routing.SmallestMailboxPool
import com.rabbitmq.client.{Channel, ConnectionFactory}
import com.spingo.op_rabbit.PlayJsonSupport._
import com.spingo.op_rabbit.{Message, RabbitControl}
import com.thenewmotion.akka.rabbitmq._
import com.typesafe.config.ConfigFactory
import play.api.libs.json._
trait RabbitMQPublisherSubscriber extends PublisherSubscriber {
val actorSystem: ActorSystem
val exchange: String
def props: Props
val nrOfInstances = 10000
val convert: (String) => Option[MessageBrokerMessageType]
val rabbitControl = actorSystem.actorOf(Props[RabbitControl])
override def publish(message: JsValue, routingKey: String): Unit = {
rabbitControl ! Message.exchange(message, exchange, routingKey)
}
val conf = ConfigFactory.load()
val queueName: String = conf.getString("eigenroute-publish-subscribe.queueName")
val factory = new ConnectionFactory()
factory.setHost(conf.getString("op-rabbit.connection.host"))
factory.setPort(conf.getInt("op-rabbit.connection.port"))
factory.setUsername(conf.getString("op-rabbit.connection.username"))
factory.setPassword(conf.getString("op-rabbit.connection.password"))
factory.setVirtualHost(conf.getString("op-rabbit.connection.virtual-host"))
val useSSL = conf.getBoolean("op-rabbit.connection.ssl")
if (useSSL) factory.useSslProtocol()
import concurrent.duration._
val connectionActor: ActorRef =
actorSystem.actorOf(ConnectionActor.props(factory, 3.seconds), "subscriber-connection")
def setupSubscriber(channel: Channel, self: ActorRef) {
val queue = channel.queueDeclare(queueName, true, false, false, new java.util.HashMap()).getQueue
val incomingMessageHandler = actorSystem.actorOf(props.withRouter(SmallestMailboxPool(nrOfInstances = nrOfInstances)))
val consumer = new DefaultConsumer(channel) {
override def handleDelivery(consumerTag: String, envelope: Envelope, properties: BasicProperties, body: Array[Byte]) {
val incomingMessageJson = Json.parse(new String(body))
val maybeMessage = convert(envelope.getRoutingKey).flatMap(_.toMessageBrokerMessage(incomingMessageJson))
maybeMessage.foreach { message => incomingMessageHandler ! message }
}
}
channel.basicConsume(queue, true, consumer)
}
connectionActor ! CreateChannel(ChannelActor.props(setupSubscriber), Some("subscriber"))
def fromBytes(x: Array[Byte]) = new String(x, "UTF-8")
}
开发者ID:shafiquejamal,项目名称:eigenroute-publish-subscribe-minimal-scala,代码行数:60,代码来源:RabbitMQPublisherSubscriber.scala
示例7: ListeningActor
//设置package包名称以及导入依赖的类
package models
import akka.actor.{Actor, Props}
import com.rabbitmq.client.{Channel, QueueingConsumer}
class ListeningActor(channel: Channel, queue: String, f: (String) => Any) extends Actor {
// called on the initial run
def receive = {
case _ => startReceving
}
def startReceving = {
val consumer = new QueueingConsumer(channel);
channel.basicConsume(queue, true, consumer);
while (true) {
// wait for the message
val delivery = consumer.nextDelivery();
val msg = new String(delivery.getBody());
// send the message to the provided callback function
// and execute this in a subactor
context.actorOf(Props(new Actor {
def receive = {
case some: String => f(some);
}
})) ! msg
}
}
}
开发者ID:nkaraolis,项目名称:NBGardensNewPlay,代码行数:34,代码来源:ListeningActor.scala
示例8: MQAgent
//设置package包名称以及导入依赖的类
package com.asto.dmp.relcir.mq
import com.asto.dmp.relcir.base.Props
import com.rabbitmq.client.{Channel, Connection, ConnectionFactory, MessageProperties}
import org.apache.spark.Logging
object MQAgent extends Logging {
private val connection: Connection = getConnection
private val channel: Channel = connection.createChannel
def getConnection = {
val connectionFactory = new ConnectionFactory
connectionFactory.setHost(Props.get("rabbit_mq_host"))
connectionFactory.setPort(Props.get("rabbit_mq_port").toInt)
connectionFactory.setUsername(Props.get("rabbit_mq_username"))
connectionFactory.setPassword(Props.get("rabbit_mq_password"))
connectionFactory.newConnection
}
def send(queueName: String, message: String) {
//???????????????????????
channel.queueDeclare(queueName, true, false, false, null)
channel.exchangeDeclare(queueName, "direct", true)
channel.queueBind(queueName, queueName, queueName)
channel.basicPublish(queueName, queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"))
}
def close() {
if (Option(channel).isDefined) channel.close()
if (Option(connection).isDefined) connection.close()
}
}
开发者ID:zj-lingxin,项目名称:dmp_relationship_circle,代码行数:34,代码来源:MQAgent.scala
示例9: Messenger
//设置package包名称以及导入依赖的类
package com.example.services
import com.example.models.SystemMessage
import com.rabbitmq.client.{Channel, Connection, ConnectionFactory}
import org.json4s.NoTypeHints
import org.json4s.native.Serialization
import org.json4s.native.Serialization.write
class Messenger {
private val QUEUE_NAME: String = "places"
implicit val formats = Serialization.formats(NoTypeHints)
def send(message: String) : Unit = {
val factory: ConnectionFactory = new ConnectionFactory
factory.setHost("localhost")
val connection: Connection = factory.newConnection
val channel: Channel = connection.createChannel
channel.queueDeclare(QUEUE_NAME, false, false, false, null)
channel.basicPublish("", QUEUE_NAME, null, message.getBytes())
channel.close()
connection.close()
}
def send(message: SystemMessage): Unit = {
send(write(message))
}
}
object Messenger {
def apply() = new Messenger()
}
开发者ID:divanvisagie,项目名称:lazy-places,代码行数:37,代码来源:Messenger.scala
示例10: MQAgent
//设置package包名称以及导入依赖的类
package com.asto.dmp.xxx.mq
import com.asto.dmp.xxx.base.Props
import com.rabbitmq.client.{Channel, Connection, ConnectionFactory, MessageProperties}
import org.apache.spark.Logging
object MQAgent extends Logging {
private val connection: Connection = getConnection
private val channel: Channel = connection.createChannel
def getConnection = {
val connectionFactory = new ConnectionFactory
connectionFactory.setHost(Props.get("rabbit_mq_host"))
connectionFactory.setPort(Props.get("rabbit_mq_port").toInt)
connectionFactory.setUsername(Props.get("rabbit_mq_username"))
connectionFactory.setPassword(Props.get("rabbit_mq_password"))
connectionFactory.newConnection
}
def send(message: String) {
val queueName = Props.get("queue_name_online")
//???????????????????????
channel.queueDeclare(queueName, true, false, false, null)
channel.exchangeDeclare(queueName, "direct", true)
channel.queueBind(queueName, queueName, queueName)
channel.basicPublish(queueName, queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"))
}
def close() {
if (Option(channel).isDefined) channel.close()
if (Option(connection).isDefined) connection.close()
}
}
开发者ID:zj-lingxin,项目名称:asto-sparksql,代码行数:35,代码来源:MQAgent.scala
示例11: Producer
//设置package包名称以及导入依赖的类
package org.juanitodread.broker.client
import com.newmotion.akka.rabbitmq.ChannelActor
import com.newmotion.akka.rabbitmq.ChannelMessage
import com.newmotion.akka.rabbitmq.ConnectionActor
import com.newmotion.akka.rabbitmq.CreateChannel
import com.rabbitmq.client.Channel
import com.rabbitmq.client.MessageProperties
import akka.actor.ActorRef
import akka.actor.ActorSystem
object Producer {
implicit val system = ActorSystem("rabbit-actor")
val factory = RabbitConnection.getConnectionFactory
val connectionActor = system.actorOf(ConnectionActor.props(factory), "rabbitmq")
val queue = RabbitConnection.inputQueue
val exchange = "amq.fanout"
// this function will be called each time new channel received
def setupChannel(channel: Channel, self: ActorRef) {
channel.queueDeclare(queue, true, false, false, null)
channel.queueBind(queue, exchange, "")
}
connectionActor ! CreateChannel(ChannelActor.props(setupChannel), Some("publisher"))
def produce(message: String): Unit = {
val publisher = system.actorSelection("/user/rabbitmq/publisher")
def publish(channel: Channel) {
channel.basicPublish(exchange, queue, MessageProperties.PERSISTENT_TEXT_PLAIN, toBytes(message))
}
publisher ! ChannelMessage(publish, dropIfNoChannel = false)
}
def fromBytes(x: Array[Byte]) = new String(x, "UTF-8")
def toBytes(x: String) = x.toString.getBytes("UTF-8")
}
开发者ID:juanitodread,项目名称:pitaya,代码行数:43,代码来源:Producer.scala
示例12: RabbitQueueConsumer
//设置package包名称以及导入依赖的类
package com.fustigatedcat.heystk.engine.queue
import com.fustigatedcat.heystk.common.normalization.Normalization
import com.fustigatedcat.heystk.engine.Engine
import com.fustigatedcat.heystk.engine.processor.Processor
import com.rabbitmq.client.{AMQP, Envelope, Channel, DefaultConsumer}
import org.json4s.native.JsonMethods.parse
import org.slf4j.LoggerFactory
class RabbitQueueConsumer(channel : Channel) extends DefaultConsumer(channel) {
val logger = LoggerFactory.getLogger(this.getClass)
implicit val formats = org.json4s.DefaultFormats
val processor = Processor.create(Engine.config)
override def handleDelivery(consumerTag : String, envelope : Envelope, properties : AMQP.BasicProperties, body : Array[Byte]) : Unit = {
logger.debug("Processing a new normalization {}", envelope.getDeliveryTag)
val normalization = parse(new String(body)).extract[Normalization]
logger.debug("Got normalization {}", normalization)
processor.process(normalization)
logger.debug("Acking normalization {}", envelope.getDeliveryTag)
channel.basicAck(envelope.getDeliveryTag, false)
}
}
开发者ID:fustigatedcat,项目名称:heystk,代码行数:28,代码来源:RabbitQueueConsumer.scala
示例13: AMQPUtil
//设置package包名称以及导入依赖的类
package com.biosimilarity.lift.lib.amqp
import com.rabbitmq.client.{Channel, Connection, ConnectionFactory}
import scala.util.Try
object AMQPUtil {
def rabbitIsRunning(host: String = "localhost", port: Int = 5672): Boolean =
Try {
val factory: ConnectionFactory = new ConnectionFactory
factory.setHost(host)
factory.setPort(port)
val conn: Connection = factory.newConnection
val channel: Channel = conn.createChannel
channel.close()
conn.close()
}.isSuccess
}
开发者ID:synereo,项目名称:synereo,代码行数:20,代码来源:AMQPUtil.scala
注:本文中的com.rabbitmq.client.Channel类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论