• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala Producer类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中akka.kafka.scaladsl.Producer的典型用法代码示例。如果您正苦于以下问题:Scala Producer类的具体用法?Scala Producer怎么用?Scala Producer使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Producer类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: ProcessingKafkaApplication

//设置package包名称以及导入依赖的类
package com.packt.chapter8

import akka.actor.ActorSystem
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.stream.{ActorMaterializer, ClosedShape}
import akka.stream.scaladsl.{Flow, GraphDSL, RunnableGraph, Sink, Source}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}

import scala.concurrent.duration._

object ProcessingKafkaApplication extends App {
  implicit val actorSystem = ActorSystem("SimpleStream")
  implicit val actorMaterializer = ActorMaterializer()

  val bootstrapServers = "localhost:9092"
  val kafkaTopic = "akka_streams_topic"
  val partition = 0
  val subscription = Subscriptions.assignment(new TopicPartition(kafkaTopic, partition))

  val consumerSettings = ConsumerSettings(actorSystem, new ByteArrayDeserializer, new StringDeserializer)
    .withBootstrapServers(bootstrapServers)
    .withGroupId("akka_streams_group")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  val producerSettings = ProducerSettings(actorSystem, new ByteArraySerializer, new StringSerializer)
    .withBootstrapServers(bootstrapServers)

  val runnableGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._

    val tickSource = Source.tick(0 seconds, 5 seconds, "Hello from Akka Streams using Kafka!")
    val kafkaSource = Consumer.plainSource(consumerSettings, subscription)
    val kafkaSink = Producer.plainSink(producerSettings)
    val printlnSink = Sink.foreach(println)

    val mapToProducerRecord = Flow[String].map(elem => new ProducerRecord[Array[Byte], String](kafkaTopic, elem))
    val mapFromConsumerRecord = Flow[ConsumerRecord[Array[Byte], String]].map(record => record.value())

    tickSource  ~> mapToProducerRecord   ~> kafkaSink
    kafkaSource ~> mapFromConsumerRecord ~> printlnSink

    ClosedShape
  })

  runnableGraph.run()
} 
开发者ID:PacktPublishing,项目名称:Akka-Cookbook,代码行数:51,代码来源:ProcessingKafkaApplication.scala


示例2: KafkaProducer

//设置package包名称以及导入依赖的类
package com.marwanad.actor

import akka.actor.{Actor, ActorLogging, Props}
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.Materializer
import akka.stream.scaladsl.Source
import com.marwanad.model.SerializableCommentContent
import org.apache.kafka.clients.producer.ProducerRecord
import com.marwanad.serializer.CommentContentJsonProtocol._
import com.marwanad.util.log.Timber
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
import spray.json.JsonWriter


class KafkaProducer(mat: Materializer, topic: String, bootstrap: String) extends Actor with Timber {
  implicit lazy val materializer = mat

  override def preStart(): Unit = {
    logActorPreStart("KafkaWriter")
    super.preStart()
    self ! "Start"
  }

  private def toJson[T](element: T)(implicit writer: JsonWriter[T]): String = {
    writer.write(element).toString
  }

  override def receive: Receive = {
    case "Start" =>
      logActorMessageReceived("KafkaWriter received Start")
      val producerSettings = ProducerSettings(context.system, new ByteArraySerializer, new StringSerializer)
        .withBootstrapServers(bootstrap)

      val ele = Source.actorPublisher[SerializableCommentContent](CommentPublisher.props)
        .map(toJson(_))
        .map { elem =>
          new ProducerRecord[Array[Byte], String](topic, elem)
        }
        .runWith(Producer.plainSink(producerSettings))
  }
}

object KafkaProducer {
  def props(mat: Materializer, topic: String, bootstrap: String): Props = Props(new KafkaProducer(mat, topic, bootstrap))
} 
开发者ID:marwanad,项目名称:UofR,代码行数:47,代码来源:KafkaProducer.scala


示例3: Main

//设置package包名称以及导入依赖的类
package edu.uw.at.iroberts.wirefugue.sensor

import java.nio.file.Paths

import akka.actor.ActorSystem
import akka.kafka.{ProducerMessage, ProducerSettings}
import akka.kafka.scaladsl.Producer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{FileIO, Sink}
import com.typesafe.config.ConfigFactory
import edu.uw.at.iroberts.wirefugue.pcap.PcapFileRaw.LinkType
import edu.uw.at.iroberts.wirefugue.pcap._
import edu.uw.at.iroberts.wirefugue.kafka.producer.{KafkaKey, PacketProducer}
import edu.uw.at.iroberts.wirefugue.kafka.serdes.PacketSerializer
import edu.uw.at.iroberts.wirefugue.protocol.overlay.{Ethernet, IPV4Datagram}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, IntegerSerializer}

import scala.concurrent.Await
import scala.concurrent.duration._


object Main {

  def main(args: Array[String]): Unit = {
    if (args.length < 1) {
      println("Please specify a filename as the first argument")
      System.exit(1)
    }

    val config = ConfigFactory.load("application.conf")

    implicit val system = ActorSystem("stream-producer-system", config)
    implicit val materializer = ActorMaterializer()

    val producerSettings = ProducerSettings[Integer, Packet](system, None, None)

    val doneF = PcapSource(Paths.get(args(0)).toUri)
      .filter( p => p.network == LinkType.ETHERNET && p.ip.isDefined )
      .map( packet => new ProducerRecord[Integer, Packet]("packets", packet.key.##, packet))
      .map( pr => new ProducerMessage.Message[Integer, Packet, Unit](pr, ()))
      .via(Producer.flow(producerSettings))
      .runWith(Sink.foreach(println))

    try {
      Await.ready(doneF, 10 seconds)
    }
    finally {
      system.terminate()
    }
  }
} 
开发者ID:robertson-tech,项目名称:wirefugue,代码行数:53,代码来源:Main.scala


示例4: running

//设置package包名称以及导入依赖的类
package producers

import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Source}
import akka.{Done, NotUsed}
import broker.ActorBroker
import config.AppConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}

import scala.concurrent.Future

trait Producerable extends ActorBroker {
  val config: AppConfig
  implicit val materializer = ActorMaterializer()

  val producerSettings = ProducerSettings(context.system, new ByteArraySerializer, new StringSerializer)
    .withBootstrapServers(s"${config.kafkaConfig.uri}:${config.kafkaConfig.port}")

  def running(): Receive = {
    case Stop =>
      log.info("Stopping Kafka producer stream and actor")
      context.stop(self)
  }

  def sendToSink(message: String): Unit = {
    log.info(s"Attempting to produce message on topic $topicName")
    val kafkaSink = Producer.plainSink(producerSettings)

    val stringToProducerRecord: ProducerRecord[Array[Byte], String] = new ProducerRecord[Array[Byte], String](topicName, message)
    val (a, future): (NotUsed, Future[Done]) = Source.fromFuture(Future(stringToProducerRecord))
      .toMat(kafkaSink)(Keep.both)
      .run()
    future.onFailure {
      case ex =>
        log.error("Stream failed due to error, restarting", ex)
        throw ex
    }
    context.become(running())
    log.info(s"Writer now running, writing random numbers to topic $topicName")
  }


  case object Stop
} 
开发者ID:jguido,项目名称:reactive-kafka-registration,代码行数:48,代码来源:Producerable.scala


示例5: Main

//设置package包名称以及导入依赖的类
import java.util.concurrent.TimeUnit.SECONDS

import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.Source
import akka.stream.{ActorMaterializer, ThrottleMode}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}

import scala.concurrent.duration.FiniteDuration
import scala.language.postfixOps

object Main {

  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem.apply("akka-stream-kafka")
    implicit val materializer = ActorMaterializer()

    val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
      .withBootstrapServers("localhost:9092;localhost:9093")

    Source.repeat(0)
      .scan(0)((next, _) => next + 1)
      .throttle(1, FiniteDuration(2L, SECONDS), 1, ThrottleMode.Shaping)
      .map(nextInt => {
        val topicName = "topic1"
        val partitionCount = 2
        val partition = nextInt % partitionCount

        new ProducerRecord[Array[Byte], String](topicName, nextInt.toString.getBytes, nextInt.toString)
//        new ProducerRecord[Array[Byte], String](topicName, partition, null, nextInt.toString)
      })
      .runWith(Producer.plainSink(producerSettings))
  }
} 
开发者ID:kczulko,项目名称:akka-streams-kafka,代码行数:37,代码来源:Main.scala


示例6: self

//设置package包名称以及导入依赖的类
package com.omearac.producers

import akka.actor.{ActorRef, ActorSystem}
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{Flow, Source}
import com.omearac.shared.JsonMessageConversion.Conversion
import com.omearac.shared.{AkkaStreams, EventSourcing}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}



trait ProducerStream extends AkkaStreams with EventSourcing {
    implicit val system: ActorSystem
    def self: ActorRef

    def createStreamSource[msgType] = {
        Source.queue[msgType](Int.MaxValue,OverflowStrategy.backpressure)
    }

    def createStreamSink(producerProperties: Map[String, String]) = {
        val kafkaMBAddress = producerProperties("bootstrap-servers")
        val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer).withBootstrapServers(kafkaMBAddress)

        Producer.plainSink(producerSettings)
    }

    def createStreamFlow[msgType: Conversion](producerProperties: Map[String, String]) = {
        val numberOfPartitions = producerProperties("num.partitions").toInt -1
        val topicToPublish = producerProperties("publish-topic")
        val rand = new scala.util.Random
        val range = 0 to numberOfPartitions

        Flow[msgType].map { msg =>
            val partition = range(rand.nextInt(range.length))
            val stringJSONMessage = Conversion[msgType].convertToJson(msg)
            new ProducerRecord[Array[Byte], String](topicToPublish, partition, null, stringJSONMessage)
        }
    }
} 
开发者ID:omearac,项目名称:reactive-kafka-microservice-template,代码行数:43,代码来源:ProducerStream.scala


示例7: ReactiveKafkaSingleConsumerMultipleProducerScala

//设置package包名称以及导入依赖的类
package org.rgcase.reactivekafka

import akka.actor.ActorSystem
import akka.kafka.ConsumerMessage.{ CommittableMessage, CommittableOffsetBatch }
import akka.kafka.ProducerMessage.Message
import akka.kafka.scaladsl.{ Consumer, Producer }
import akka.kafka.{ ConsumerSettings, ProducerSettings, Subscriptions }
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ Flow, Sink }
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer }

class ReactiveKafkaSingleConsumerMultipleProducerScala extends App {

  implicit val system = ActorSystem("reactivekafkascala")
  implicit val mat = ActorMaterializer()

  val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
    .withBootstrapServers("localhost:9092")
    .withGroupId("group1")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
    .withBootstrapServers("localhost:9093")

  val kafkaSource =
    Consumer.committableSource(consumerSettings, Subscriptions.topics("sourcetopic"))

  def toProducerMessage(topic: String) = (msg: CommittableMessage[Array[Byte], String]) ?
    Message[Array[Byte], String, CommittableMessage[Array[Byte], String]](new ProducerRecord(topic, msg.record.value), msg)

  val producerFlow1 =
    Flow.fromFunction(toProducerMessage("targettopic1")).via(Producer.flow(producerSettings)).map(_.message.passThrough)

  val producerFlow2 =
    Flow.fromFunction(toProducerMessage("targettopic2")).via(Producer.flow(producerSettings)).map(_.message.passThrough)

  val producerFlow3 =
    Flow.fromFunction(toProducerMessage("targettopic3")).via(Producer.flow(producerSettings)).map(_.message.passThrough)

  kafkaSource
    .via(producerFlow1)
    .via(producerFlow2)
    .via(producerFlow3)
    .batch(max = 20, first ? CommittableOffsetBatch.empty.updated(first.committableOffset)) { (batch, elem) ?
      batch.updated(elem.committableOffset)
    }.mapAsync(3)(_.commitScaladsl())
    .runWith(Sink.ignore)

} 
开发者ID:rgcase,项目名称:testplayground,代码行数:52,代码来源:ReactiveKafkaSingleConsumerMultipleProducerScala.scala


示例8: ReactiveProducer

//设置package包名称以及导入依赖的类
package co.s4n.reactiveKafka

import akka.actor.ActorSystem
import akka.kafka.ProducerMessage
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.Source
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.common.serialization.StringSerializer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import scala.concurrent.Future
import akka.Done
import scala.util.{ Failure, Success }


object ReactiveProducer {

  val system = ActorSystem("example")
  implicit val ec = system.dispatcher
  implicit val materializer = ActorMaterializer.create(system)

  val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
    .withBootstrapServers("localhost:9092")
  val kafkaProducer = producerSettings.createKafkaProducer()

  def produce(msg: String): Unit = {
    val done = Source(1 to 1)
      .map(_.toString)
      .map { elem =>
        println("\n" + msg);
        new ProducerRecord[Array[Byte], String]("UsersTopic", msg)
      }
      .runWith(Producer.plainSink(producerSettings, kafkaProducer))
    // #plainSinkWithProducer

    //    terminateWhenDone(done)
  }

  def terminateWhenDone(result: Future[Done]): Unit = {
    result.onComplete {
      case Failure(e) =>
        system.log.error(e, e.getMessage)
        system.terminate()
      case Success(_) => system.terminate()
    }
  }

} 
开发者ID:bazzo03,项目名称:users-api,代码行数:51,代码来源:ReactiveProducer.scala


示例9: FlowProducerMain

//设置package包名称以及导入依赖的类
package com.example.producer

import akka.actor.ActorSystem
import akka.kafka.scaladsl.Producer
import akka.kafka.{ProducerMessage, ProducerSettings}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}


object FlowProducerMain extends App {
  implicit val system = ActorSystem("FlowProducerMain")
  implicit val materializer = ActorMaterializer()

  val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
    .withBootstrapServers("localhost:9092")

  val done = Source(1 to 10)
    .map { n =>
      // val partition = math.abs(n) % 2
      val partition = 0
      ProducerMessage.Message(new ProducerRecord[Array[Byte], String](
        "topic1", partition, null, n.toString
      ), n)
    }
    .via(Producer.flow(producerSettings))
    .map { result =>
      val record = result.message.record
      println(s"${record.topic}/${record.partition} ${result.offset}: ${record.value}" +
        s"(${result.message.passThrough})")
      result
    }
    .runWith(Sink.ignore)
} 
开发者ID:makersu,项目名称:reactive-kafka-scala-example,代码行数:36,代码来源:FlowProducerMain.scala


示例10: CommitConsumerToFlowProducerMain

//设置package包名称以及导入依赖的类
package com.example.producer

import akka.actor.ActorSystem
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerMessage, ProducerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}


object CommitConsumerToFlowProducerMain extends App {
  implicit val system = ActorSystem("CommitConsumerToFlowProducerMain")
  implicit val materializer = ActorMaterializer()

  val consumerSettings =
    ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
      .withBootstrapServers("localhost:9092")
      .withGroupId("CommitConsumerToFlowProducer")
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
    .withBootstrapServers("localhost:9092")

  val done =
    Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
      .map { msg =>
        println(s"topic1 -> topic2: $msg")
        ProducerMessage.Message(new ProducerRecord[Array[Byte], String](
          "topic2",
          msg.record.value
        ), msg.committableOffset)
      }
      .via(Producer.flow(producerSettings))
      .mapAsync(producerSettings.parallelism) { result =>
        result.message.passThrough.commitScaladsl()
      }
      .runWith(Sink.ignore)
} 
开发者ID:makersu,项目名称:reactive-kafka-scala-example,代码行数:41,代码来源:CommitConsumerToFlowProducerMain.scala


示例11: PlainSinkProducerMain

//设置package包名称以及导入依赖的类
package com.example.producer

import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}


object PlainSinkProducerMain extends App {

  implicit val system = ActorSystem("PlainSinkProducerMain")
  implicit val materializer = ActorMaterializer()

  val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
    .withBootstrapServers("localhost:9092")

  val done = Source(1 to 10)
    .map(_.toString)
    .map { elem =>
      println(s"PlainSinkProducer produce: ${elem}")
      new ProducerRecord[Array[Byte], String]("topic1", elem)
    }
    .runWith(Producer.plainSink(producerSettings))

} 
开发者ID:makersu,项目名称:reactive-kafka-scala-example,代码行数:29,代码来源:PlainSinkProducerMain.scala


示例12: ConsumerToCommitableSinkProducerMain

//设置package包名称以及导入依赖的类
package com.example.producer

import akka.actor.ActorSystem
import akka.kafka.ConsumerMessage.CommittableOffsetBatch
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerMessage, ProducerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}


object ConsumerToCommitableSinkProducerMain extends App {

  implicit val system = ActorSystem("Consumer2ProducerMain")
  implicit val materializer = ActorMaterializer()

  //TODO: move to configuration application.conf
  val consumerSettings =
    ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
      .withBootstrapServers("localhost:9092")
      .withGroupId("Consumer2Producer")
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  //TODO: move to configuration application.conf
  val producerSettings =
    ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
      .withBootstrapServers("localhost:9092")

  Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
    .map { msg =>
      println(s"topic1 -> topic2: $msg")
      ProducerMessage.Message(new ProducerRecord[Array[Byte], String](
        "topic2",
        msg.record.value
      ), msg.committableOffset)
    }
    .runWith(Producer.commitableSink(producerSettings))

} 
开发者ID:makersu,项目名称:reactive-kafka-scala-example,代码行数:42,代码来源:ConsumerToCommitableSinkProducerMain.scala


示例13: KafkaPushQueue

//设置package包名称以及导入依赖的类
package reactivehub.akka.stream.apns.manager

import akka.kafka.ProducerMessage.Message
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.{Flow, Keep, Sink}
import org.apache.kafka.clients.producer.ProducerRecord
import reactivehub.akka.stream.apns.pusher.PushData
import scala.concurrent.Future

class KafkaPushQueue(topic: String, settings: ProducerSettings[Long, PushData])
  extends PushQueue {

  override def pushDataSink: Sink[(Long, PushData), Future[Long]] =
    Flow[(Long, PushData)]
      .map {
        case (key, value) => Message(
          new ProducerRecord[Long, PushData]("notifications", key, value), key)
      }
      .via(Producer.flow(settings))
      .toMat(Sink.fold(0L)({ case (acc, _) => acc + 1}))(Keep.right)
} 
开发者ID:reactive-hub,项目名称:reactive-apns-example,代码行数:23,代码来源:KafkaPushQueue.scala


示例14: BaseStation2Kafka

//设置package包名称以及导入依赖的类
package com.darienmt.airplaneadventures.basestation.collector.streams

import akka.Done
import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.ActorMaterializer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ ByteArraySerializer, StringSerializer }

import scala.concurrent.Future
import io.circe._
import io.circe.generic.auto._
import io.circe.parser._
import io.circe.syntax._
import io.circe.java8.time.{ encodeLocalDateDefault, encodeZonedDateTimeDefault }
import com.darienmt.airplaneadventures.basestation.collector.parsing.CirceEncoders._

object BaseStation2Kafka {

  case class SourceConfig(address: String, port: Int)
  case class SinkConfig(address: String, port: Int, topic: String)

  def apply(sourceConfig: SourceConfig, sinkConfig: SinkConfig)(implicit system: ActorSystem, materializer: ActorMaterializer): Future[Done] =
    BaseStationSource(sourceConfig.address, sourceConfig.port)
      .map(_.asJson.noSpaces)
      .map(m => new ProducerRecord[Array[Byte], String](sinkConfig.topic, m))
      .runWith(
        Producer.plainSink(
          ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
            .withBootstrapServers(s"${sinkConfig.address}:${sinkConfig.port}")
        )
      )
} 
开发者ID:darienmt,项目名称:airplane-adventures,代码行数:35,代码来源:BaseStation2Kafka.scala


示例15: Main

//设置package包名称以及导入依赖的类
package com.darienmt.airplaneadventures.basestation.rawcollector

import java.net.InetSocketAddress

import akka.Done
import akka.io.Inet.SocketOption
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.{ Framing, Source, Tcp }
import akka.util.ByteString
import com.darienmt.keepers.{ Generator, KeepThisUp, MainCommons }
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ ByteArraySerializer, StringSerializer }

import scala.collection.immutable
import scala.collection.immutable.IndexedSeq
import scala.util.Success
import scala.concurrent.duration._

object Main extends App with MainCommons {

  implicit def asFiniteDuration(d: java.time.Duration): FiniteDuration =
    scala.concurrent.duration.Duration.fromNanos(d.toNanos)

  val bsAddress = config.getString("station.address")
  val bsPort = config.getInt("station.port")
  val connectTimeout: Duration = config.getDuration("station.connectTimeout")
  val idleTimeout: Duration = config.getDuration("station.idleTimeout")

  val kafkaAddress = config.getString("kafka.address")
  val kafkaPort = config.getInt("kafka.port")
  val kafkaTopic = config.getString("kafka.topic")


  val generator: Generator = () => Source(IndexedSeq(ByteString.empty))
    .via(
      Tcp().outgoingConnection(
        remoteAddress = InetSocketAddress.createUnresolved(bsAddress, bsPort),
        connectTimeout = connectTimeout,
        idleTimeout = idleTimeout
      )
        .via(Framing.delimiter(ByteString("\n"), 256))
        .map(_.utf8String)
    )
    .map(m => new ProducerRecord[Array[Byte], String](kafkaTopic, m))
    .runWith(
      Producer.plainSink(
        ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
          .withBootstrapServers(s"${kafkaAddress}:${kafkaPort}")
      )
    )

  val keeper = KeepThisUp(config)
  keeper(generator)
} 
开发者ID:darienmt,项目名称:airplane-adventures,代码行数:56,代码来源:Main.scala


示例16: produceRecord

//设置package包名称以及导入依赖的类
package de.choffmeister.microserviceutils.kafka.testkit

import java.util.UUID

import akka.Done
import akka.actor.ActorSystem
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.stream.Materializer
import akka.stream.scaladsl.{Keep, Sink, Source}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{Deserializer, Serializer}

import scala.concurrent.Future

trait KafkaTest {
  def produceRecord[K, V](topic: String, keySerializer: Serializer[K], valueSerializer: Serializer[V], key: K, value: V)(implicit system: ActorSystem, mat: Materializer): Future[Done] = {
    val producerSettings = ProducerSettings(system, keySerializer, valueSerializer)
      .withBootstrapServers(system.settings.config.getString("kafka.bootstrap-servers"))

    Source.single(new ProducerRecord("mail.command.send", key, value))
      .toMat(Producer.plainSink(producerSettings))(Keep.right)
      .run()
  }

  def consumeRecordPF[K, V, Out](topic: String, keyDeserializer: Deserializer[K], valueDeserializer: Deserializer[V])(pf: PartialFunction[ConsumerRecord[K, V], Out])(implicit system: ActorSystem, mat: Materializer): Future[Out] = {
    val consumerSettings = ConsumerSettings(system, keyDeserializer, valueDeserializer)
      .withBootstrapServers(system.settings.config.getString("kafka.bootstrap-servers"))
      .withGroupId(UUID.randomUUID.toString)
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

    Consumer.plainSource(consumerSettings, Subscriptions.topics(topic))
      .filter(pf.isDefinedAt)
      .map(pf)
      .toMat(Sink.head)(Keep.right)
      .run()
  }
} 
开发者ID:choffmeister,项目名称:microservice-utils,代码行数:40,代码来源:KafkaTest.scala


示例17: KafkaDao

//设置package包名称以及导入依赖的类
package dao

import akka.NotUsed
import akka.actor.ActorSystem
import akka.kafka.scaladsl.Consumer.Control
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{
  ConsumerSettings,
  ProducerSettings,
  Subscriptions
}
import akka.stream.Materializer
import akka.stream.scaladsl.{Flow, Sink, Source}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{
  ByteArrayDeserializer,
  ByteArraySerializer,
  StringDeserializer,
  StringSerializer
}
import play.api.libs.json.Format
import [email protected]@

class KafkaDao[Event](val topic: String @@ Event)(implicit mat: Materializer,
                      sys: ActorSystem,
                      format: Format[Event])
    extends EventDao[Event] {

  def producerSettings =
    ProducerSettings(sys,
                     new ByteArraySerializer(),
                     new StringSerializer())
      .withBootstrapServers("localhost:9092")

  private def kafkaIn: Sink[String, NotUsed] =
    Flow[String]
      .map { elem =>
        new ProducerRecord[Array[Byte], String](topic,
                                                elem)
      }
      .to(Producer.plainSink(producerSettings))

  private def consumerSettings =
    ConsumerSettings(sys,
                     new ByteArrayDeserializer,
                     new StringDeserializer)
      .withBootstrapServers("localhost:9092")
      .withGroupId("group1")

  private def kafkaOut: Source[String, Control] =
    Consumer
      .plainSource(consumerSettings,
                   Subscriptions.topics(topic))
      .map(_.value())

  override protected def eventStore
    : Flow[String, String, NotUsed] =
    Flow.fromSinkAndSource(kafkaIn, kafkaOut)
} 
开发者ID:leanovate,项目名称:contoso-conference-manager,代码行数:60,代码来源:KafkaDao.scala


示例18: KafkaSink

//设置package包名称以及导入依赖的类
package com.thenetcircle.event_dispatcher.sink

import java.util.concurrent.ConcurrentHashMap

import akka.NotUsed
import akka.kafka.ProducerMessage
import akka.kafka.ProducerMessage.Message
import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.{ Flow, Keep }
import com.thenetcircle.event_dispatcher.Event
import com.thenetcircle.event_dispatcher.driver.adapter.KafkaSinkAdapter
import com.thenetcircle.event_dispatcher.driver.extractor.Extractor
import com.thenetcircle.event_dispatcher.driver.{ KafkaKey, KafkaValue }
import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerRecord }

object KafkaSink {

  lazy private val producerList = new ConcurrentHashMap[String, KafkaProducer[KafkaKey, KafkaValue]]

  def apply(
      settings: KafkaSinkSettings
  ): Flow[Event, ProducerMessage.Result[KafkaKey, KafkaValue, NotUsed.type], NotUsed] = {

    val producerName = settings.name
    val producerSettings = settings.producerSettings
    val producer: KafkaProducer[KafkaKey, KafkaValue] = if (producerList.containsKey(producerName)) {
      producerList.get(producerName)
    } else {
      producerSettings.createKafkaProducer()
    }

    val kafkaProducerFlow = Flow[ProducerRecord[KafkaKey, KafkaValue]]
      .map(Message(_, NotUsed))
      .via(Producer.flow(settings.producerSettings, producer))

    Flow[Event]
      .map(Extractor.deExtract)
      .map(KafkaSinkAdapter.unfit)
      .viaMat(kafkaProducerFlow)(Keep.left)

  }
} 
开发者ID:thenetcircle,项目名称:event-dispatcher,代码行数:43,代码来源:KafkaSink.scala


示例19: KafkaEventProcessor

//设置package包名称以及导入依赖的类
package events

import akka.actor.{Actor, ActorLogging, Props}
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Source}
import com.google.inject.Inject
import events.KafkaEventProcessor.Init
import events.Serializer.EventSerializer
import models.KafkaEvents.{Event, Hello}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.ByteArraySerializer
import play.api.{Configuration, Logger}

import scala.concurrent.ExecutionContextExecutor


class KafkaEventProcessor @Inject()(config: Configuration) extends Actor with ActorLogging {

  private val eventStream = context.system.eventStream
  implicit val materializer: ActorMaterializer = ActorMaterializer()(context.system)
  implicit val ctx: ExecutionContextExecutor = context.dispatcher

  private val server = config.getString("kafka.broker").getOrElse("localhost:9092")
  private val topic = config.getString("kafka.topic").getOrElse("test")

  override def preStart(): Unit = {
    super.preStart()
    self ! Init
    log.info("Start EventsProcessorActor")
  }

  override def postStop(): Unit = {
    eventStream.unsubscribe(self)
    super.postStop()
  }

  override def receive = {
    case Init => createProducer()
  }

  private def createProducer() = {
    val producerSettings = ProducerSettings(context.system, new ByteArraySerializer(), new EventSerializer())
      .withBootstrapServers(server)

    val jobManagerSource = Source.actorPublisher[Event](Props(classOf[KafkaEventPublisher]))

    Flow[Event].map {
      case e: Hello => new ProducerRecord[Array[Byte], Event](topic, e)
    }.to(Producer.plainSink(producerSettings))
      .runWith(jobManagerSource)

    Logger.info("init producer")
  }
}

object KafkaEventProcessor {
  case object Init
} 
开发者ID:fsanaulla,项目名称:Akka-Kafka-Producer,代码行数:61,代码来源:KafkaEventProcessor.scala


示例20: self

//设置package包名称以及导入依赖的类
package com.omearac.producers

import akka.actor.{ActorRef, ActorSystem}
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{Flow, Source}
import com.omearac.shared.ProtobufMessageConversion.Conversion
import com.omearac.shared.{AkkaStreams, EventSourcing}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}



trait ProducerStream extends AkkaStreams with EventSourcing {
    implicit val system: ActorSystem
    def self: ActorRef

    def createStreamSource[msgType] = {
        Source.queue[msgType](Int.MaxValue,OverflowStrategy.backpressure)
    }

    def createStreamSink(producerProperties: Map[String, String]) = {
        val kafkaMBAddress = producerProperties("bootstrap-servers")
        val producerSettings = ProducerSettings(system, new ByteArraySerializer, new ByteArraySerializer).withBootstrapServers(kafkaMBAddress)
        Producer.plainSink(producerSettings)
    }

    def createStreamFlow[msgType: Conversion](producerProperties: Map[String, String]) = {
        val numberOfPartitions = producerProperties("num.partitions").toInt -1
        val topicToPublish = producerProperties("publish-topic")
        val rand = new scala.util.Random
        val range = 0 to numberOfPartitions

        Flow[msgType].map { msg =>
            val partition = range(rand.nextInt(range.length))
            val stringJSONMessage = Conversion[msgType].convertToByteArray(msg)
            new ProducerRecord[Array[Byte], Array[Byte]](topicToPublish, partition, null, stringJSONMessage)
        }
    }
} 
开发者ID:omearac,项目名称:reactive-kafka-microservice-protobuf,代码行数:42,代码来源:ProducerStream.scala



注:本文中的akka.kafka.scaladsl.Producer类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala Nino类代码示例发布时间:2022-05-23
下一篇:
Scala Scheduler类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap