• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala ProducerSettings类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中akka.kafka.ProducerSettings的典型用法代码示例。如果您正苦于以下问题:Scala ProducerSettings类的具体用法?Scala ProducerSettings怎么用?Scala ProducerSettings使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了ProducerSettings类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: ProcessingKafkaApplication

//设置package包名称以及导入依赖的类
package com.packt.chapter8

import akka.actor.ActorSystem
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.stream.{ActorMaterializer, ClosedShape}
import akka.stream.scaladsl.{Flow, GraphDSL, RunnableGraph, Sink, Source}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}

import scala.concurrent.duration._

object ProcessingKafkaApplication extends App {
  implicit val actorSystem = ActorSystem("SimpleStream")
  implicit val actorMaterializer = ActorMaterializer()

  val bootstrapServers = "localhost:9092"
  val kafkaTopic = "akka_streams_topic"
  val partition = 0
  val subscription = Subscriptions.assignment(new TopicPartition(kafkaTopic, partition))

  val consumerSettings = ConsumerSettings(actorSystem, new ByteArrayDeserializer, new StringDeserializer)
    .withBootstrapServers(bootstrapServers)
    .withGroupId("akka_streams_group")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  val producerSettings = ProducerSettings(actorSystem, new ByteArraySerializer, new StringSerializer)
    .withBootstrapServers(bootstrapServers)

  val runnableGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._

    val tickSource = Source.tick(0 seconds, 5 seconds, "Hello from Akka Streams using Kafka!")
    val kafkaSource = Consumer.plainSource(consumerSettings, subscription)
    val kafkaSink = Producer.plainSink(producerSettings)
    val printlnSink = Sink.foreach(println)

    val mapToProducerRecord = Flow[String].map(elem => new ProducerRecord[Array[Byte], String](kafkaTopic, elem))
    val mapFromConsumerRecord = Flow[ConsumerRecord[Array[Byte], String]].map(record => record.value())

    tickSource  ~> mapToProducerRecord   ~> kafkaSink
    kafkaSource ~> mapFromConsumerRecord ~> printlnSink

    ClosedShape
  })

  runnableGraph.run()
} 
开发者ID:PacktPublishing,项目名称:Akka-Cookbook,代码行数:51,代码来源:ProcessingKafkaApplication.scala


示例2: Settings

//设置package包名称以及导入依赖的类
package com.scalaio.kafka.consumer

import akka.actor.ActorSystem
import akka.kafka.ConsumerMessage.CommittableMessage
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import com.scalaio.kafka.consumer.Settings.consumerSettings
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}

import scala.concurrent.Future

object Settings {
  def consumerSettings(implicit system: ActorSystem) =
    ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
      .withBootstrapServers("localhost:9092")
      .withGroupId("CommittableSourceConsumer")
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  def producerSettings(implicit system: ActorSystem) =
    ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
      .withBootstrapServers("localhost:9092")
}

object CommittableSource extends App {

  type KafkaMessage = CommittableMessage[Array[Byte], String]

  implicit val system = ActorSystem("CommittableSourceConsumerMain")
  implicit val materializer = ActorMaterializer()

  implicit val ec = system.dispatcher

  // explicit commit
  Consumer
    .committableSource(consumerSettings, Subscriptions.topics("topic1"))
    .mapAsync(1) { msg =>
      BusinessController.handleMessage(msg.record.value)
        .flatMap(response => msg.committableOffset.commitScaladsl())
        .recoverWith { case e => msg.committableOffset.commitScaladsl() }
    }
    .runWith(Sink.ignore)

}

object BusinessController {

  type Service[A, B] = A => Future[B]

  val handleMessage: Service[String, String] =
    (message) => Future.successful(message.toUpperCase)

} 
开发者ID:fagossa,项目名称:scalaio_akka,代码行数:56,代码来源:CommittableSource.scala


示例3: running

//设置package包名称以及导入依赖的类
package producers

import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Source}
import akka.{Done, NotUsed}
import broker.ActorBroker
import config.AppConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}

import scala.concurrent.Future

trait Producerable extends ActorBroker {
  val config: AppConfig
  implicit val materializer = ActorMaterializer()

  val producerSettings = ProducerSettings(context.system, new ByteArraySerializer, new StringSerializer)
    .withBootstrapServers(s"${config.kafkaConfig.uri}:${config.kafkaConfig.port}")

  def running(): Receive = {
    case Stop =>
      log.info("Stopping Kafka producer stream and actor")
      context.stop(self)
  }

  def sendToSink(message: String): Unit = {
    log.info(s"Attempting to produce message on topic $topicName")
    val kafkaSink = Producer.plainSink(producerSettings)

    val stringToProducerRecord: ProducerRecord[Array[Byte], String] = new ProducerRecord[Array[Byte], String](topicName, message)
    val (a, future): (NotUsed, Future[Done]) = Source.fromFuture(Future(stringToProducerRecord))
      .toMat(kafkaSink)(Keep.both)
      .run()
    future.onFailure {
      case ex =>
        log.error("Stream failed due to error, restarting", ex)
        throw ex
    }
    context.become(running())
    log.info(s"Writer now running, writing random numbers to topic $topicName")
  }


  case object Stop
} 
开发者ID:jguido,项目名称:reactive-kafka-registration,代码行数:48,代码来源:Producerable.scala


示例4: self

//设置package包名称以及导入依赖的类
package com.omearac.producers

import akka.actor.{ActorRef, ActorSystem}
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{Flow, Source}
import com.omearac.shared.JsonMessageConversion.Conversion
import com.omearac.shared.{AkkaStreams, EventSourcing}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}



trait ProducerStream extends AkkaStreams with EventSourcing {
    implicit val system: ActorSystem
    def self: ActorRef

    def createStreamSource[msgType] = {
        Source.queue[msgType](Int.MaxValue,OverflowStrategy.backpressure)
    }

    def createStreamSink(producerProperties: Map[String, String]) = {
        val kafkaMBAddress = producerProperties("bootstrap-servers")
        val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer).withBootstrapServers(kafkaMBAddress)

        Producer.plainSink(producerSettings)
    }

    def createStreamFlow[msgType: Conversion](producerProperties: Map[String, String]) = {
        val numberOfPartitions = producerProperties("num.partitions").toInt -1
        val topicToPublish = producerProperties("publish-topic")
        val rand = new scala.util.Random
        val range = 0 to numberOfPartitions

        Flow[msgType].map { msg =>
            val partition = range(rand.nextInt(range.length))
            val stringJSONMessage = Conversion[msgType].convertToJson(msg)
            new ProducerRecord[Array[Byte], String](topicToPublish, partition, null, stringJSONMessage)
        }
    }
} 
开发者ID:omearac,项目名称:reactive-kafka-microservice-template,代码行数:43,代码来源:ProducerStream.scala


示例5: ReactiveKafkaSingleConsumerMultipleProducerScala

//设置package包名称以及导入依赖的类
package org.rgcase.reactivekafka

import akka.actor.ActorSystem
import akka.kafka.ConsumerMessage.{ CommittableMessage, CommittableOffsetBatch }
import akka.kafka.ProducerMessage.Message
import akka.kafka.scaladsl.{ Consumer, Producer }
import akka.kafka.{ ConsumerSettings, ProducerSettings, Subscriptions }
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ Flow, Sink }
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer }

class ReactiveKafkaSingleConsumerMultipleProducerScala extends App {

  implicit val system = ActorSystem("reactivekafkascala")
  implicit val mat = ActorMaterializer()

  val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
    .withBootstrapServers("localhost:9092")
    .withGroupId("group1")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
    .withBootstrapServers("localhost:9093")

  val kafkaSource =
    Consumer.committableSource(consumerSettings, Subscriptions.topics("sourcetopic"))

  def toProducerMessage(topic: String) = (msg: CommittableMessage[Array[Byte], String]) ?
    Message[Array[Byte], String, CommittableMessage[Array[Byte], String]](new ProducerRecord(topic, msg.record.value), msg)

  val producerFlow1 =
    Flow.fromFunction(toProducerMessage("targettopic1")).via(Producer.flow(producerSettings)).map(_.message.passThrough)

  val producerFlow2 =
    Flow.fromFunction(toProducerMessage("targettopic2")).via(Producer.flow(producerSettings)).map(_.message.passThrough)

  val producerFlow3 =
    Flow.fromFunction(toProducerMessage("targettopic3")).via(Producer.flow(producerSettings)).map(_.message.passThrough)

  kafkaSource
    .via(producerFlow1)
    .via(producerFlow2)
    .via(producerFlow3)
    .batch(max = 20, first ? CommittableOffsetBatch.empty.updated(first.committableOffset)) { (batch, elem) ?
      batch.updated(elem.committableOffset)
    }.mapAsync(3)(_.commitScaladsl())
    .runWith(Sink.ignore)

} 
开发者ID:rgcase,项目名称:testplayground,代码行数:52,代码来源:ReactiveKafkaSingleConsumerMultipleProducerScala.scala


示例6: ReactiveProducer

//设置package包名称以及导入依赖的类
package co.s4n.reactiveKafka

import akka.actor.ActorSystem
import akka.kafka.ProducerMessage
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.Source
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.common.serialization.StringSerializer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import scala.concurrent.Future
import akka.Done
import scala.util.{ Failure, Success }


object ReactiveProducer {

  val system = ActorSystem("example")
  implicit val ec = system.dispatcher
  implicit val materializer = ActorMaterializer.create(system)

  val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
    .withBootstrapServers("localhost:9092")
  val kafkaProducer = producerSettings.createKafkaProducer()

  def produce(msg: String): Unit = {
    val done = Source(1 to 1)
      .map(_.toString)
      .map { elem =>
        println("\n" + msg);
        new ProducerRecord[Array[Byte], String]("UsersTopic", msg)
      }
      .runWith(Producer.plainSink(producerSettings, kafkaProducer))
    // #plainSinkWithProducer

    //    terminateWhenDone(done)
  }

  def terminateWhenDone(result: Future[Done]): Unit = {
    result.onComplete {
      case Failure(e) =>
        system.log.error(e, e.getMessage)
        system.terminate()
      case Success(_) => system.terminate()
    }
  }

} 
开发者ID:bazzo03,项目名称:users-api,代码行数:51,代码来源:ReactiveProducer.scala


示例7: FlowProducerMain

//设置package包名称以及导入依赖的类
package com.example.producer

import akka.actor.ActorSystem
import akka.kafka.scaladsl.Producer
import akka.kafka.{ProducerMessage, ProducerSettings}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}


object FlowProducerMain extends App {
  implicit val system = ActorSystem("FlowProducerMain")
  implicit val materializer = ActorMaterializer()

  val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
    .withBootstrapServers("localhost:9092")

  val done = Source(1 to 10)
    .map { n =>
      // val partition = math.abs(n) % 2
      val partition = 0
      ProducerMessage.Message(new ProducerRecord[Array[Byte], String](
        "topic1", partition, null, n.toString
      ), n)
    }
    .via(Producer.flow(producerSettings))
    .map { result =>
      val record = result.message.record
      println(s"${record.topic}/${record.partition} ${result.offset}: ${record.value}" +
        s"(${result.message.passThrough})")
      result
    }
    .runWith(Sink.ignore)
} 
开发者ID:makersu,项目名称:reactive-kafka-scala-example,代码行数:36,代码来源:FlowProducerMain.scala


示例8: CommitConsumerToFlowProducerMain

//设置package包名称以及导入依赖的类
package com.example.producer

import akka.actor.ActorSystem
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerMessage, ProducerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}


object CommitConsumerToFlowProducerMain extends App {
  implicit val system = ActorSystem("CommitConsumerToFlowProducerMain")
  implicit val materializer = ActorMaterializer()

  val consumerSettings =
    ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
      .withBootstrapServers("localhost:9092")
      .withGroupId("CommitConsumerToFlowProducer")
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
    .withBootstrapServers("localhost:9092")

  val done =
    Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
      .map { msg =>
        println(s"topic1 -> topic2: $msg")
        ProducerMessage.Message(new ProducerRecord[Array[Byte], String](
          "topic2",
          msg.record.value
        ), msg.committableOffset)
      }
      .via(Producer.flow(producerSettings))
      .mapAsync(producerSettings.parallelism) { result =>
        result.message.passThrough.commitScaladsl()
      }
      .runWith(Sink.ignore)
} 
开发者ID:makersu,项目名称:reactive-kafka-scala-example,代码行数:41,代码来源:CommitConsumerToFlowProducerMain.scala


示例9: PlainSinkProducerMain

//设置package包名称以及导入依赖的类
package com.example.producer

import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}


object PlainSinkProducerMain extends App {

  implicit val system = ActorSystem("PlainSinkProducerMain")
  implicit val materializer = ActorMaterializer()

  val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
    .withBootstrapServers("localhost:9092")

  val done = Source(1 to 10)
    .map(_.toString)
    .map { elem =>
      println(s"PlainSinkProducer produce: ${elem}")
      new ProducerRecord[Array[Byte], String]("topic1", elem)
    }
    .runWith(Producer.plainSink(producerSettings))

} 
开发者ID:makersu,项目名称:reactive-kafka-scala-example,代码行数:29,代码来源:PlainSinkProducerMain.scala


示例10: ConsumerToCommitableSinkProducerMain

//设置package包名称以及导入依赖的类
package com.example.producer

import akka.actor.ActorSystem
import akka.kafka.ConsumerMessage.CommittableOffsetBatch
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerMessage, ProducerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}


object ConsumerToCommitableSinkProducerMain extends App {

  implicit val system = ActorSystem("Consumer2ProducerMain")
  implicit val materializer = ActorMaterializer()

  //TODO: move to configuration application.conf
  val consumerSettings =
    ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
      .withBootstrapServers("localhost:9092")
      .withGroupId("Consumer2Producer")
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  //TODO: move to configuration application.conf
  val producerSettings =
    ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
      .withBootstrapServers("localhost:9092")

  Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
    .map { msg =>
      println(s"topic1 -> topic2: $msg")
      ProducerMessage.Message(new ProducerRecord[Array[Byte], String](
        "topic2",
        msg.record.value
      ), msg.committableOffset)
    }
    .runWith(Producer.commitableSink(producerSettings))

} 
开发者ID:makersu,项目名称:reactive-kafka-scala-example,代码行数:42,代码来源:ConsumerToCommitableSinkProducerMain.scala


示例11: Manager

//设置package包名称以及导入依赖的类
package reactivehub.akka.stream.apns.manager

import akka.actor.{ActorRef, ActorSystem}
import akka.http.scaladsl.Http
import akka.http.scaladsl.Http.ServerBinding
import akka.kafka.ProducerSettings
import akka.stream.ActorMaterializer
import akka.util.Timeout
import reactivehub.akka.stream.apns.pusher._
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.io.StdIn
import scala.util.Success
import slick.driver.H2Driver.api._

object Manager extends RestApi {
  val dbConfig = "h2"
  val kafka = "192.168.99.100:9092"
  val topic = "notifications"
  val interface = "0.0.0.0"
  val port = 8080

  implicit val system = ActorSystem("system")
  implicit val materializer = ActorMaterializer()
  implicit val dispatcher = system.dispatcher
  implicit val timeout = Timeout(10.second)

  def main(args: Array[String]): Unit = {
    val binding = createService().flatMap(manager => bind(manager))
    binding.onComplete {
      case Success(b) =>
        println(s"Successfully bound to ${b.localAddress}, press enter to exit")
      case _ =>
        println("Failed to bootstrap Manager, press enter")
    }

    StdIn.readLine()
    binding.flatMap(_.unbind()).onComplete(_ => system.terminate())
  }

  private def createService(): Future[ActorRef] = {
    val db = Database.forConfig(dbConfig)
    val store = new SqlDeviceStore(db)
    val queue = new KafkaPushQueue(topic, producerSettings)

    db.run(devices.schema.drop)
      .recover({ case _ => () })
      .flatMap(_ => db.run(devices.schema.create))
      .map(_ => system.actorOf(DeviceService.props(store, queue)))
  }

  private def bind(service: ActorRef): Future[ServerBinding] =
    Http(system).bindAndHandle(route(service), interface, port)

  private def producerSettings: ProducerSettings[Long, PushData] =
    ProducerSettings(system, ScalaLongSerializer, PushDataSerializer)
      .withBootstrapServers(kafka)
} 
开发者ID:reactive-hub,项目名称:reactive-apns-example,代码行数:59,代码来源:Manager.scala


示例12: KafkaPushQueue

//设置package包名称以及导入依赖的类
package reactivehub.akka.stream.apns.manager

import akka.kafka.ProducerMessage.Message
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.{Flow, Keep, Sink}
import org.apache.kafka.clients.producer.ProducerRecord
import reactivehub.akka.stream.apns.pusher.PushData
import scala.concurrent.Future

class KafkaPushQueue(topic: String, settings: ProducerSettings[Long, PushData])
  extends PushQueue {

  override def pushDataSink: Sink[(Long, PushData), Future[Long]] =
    Flow[(Long, PushData)]
      .map {
        case (key, value) => Message(
          new ProducerRecord[Long, PushData]("notifications", key, value), key)
      }
      .via(Producer.flow(settings))
      .toMat(Sink.fold(0L)({ case (acc, _) => acc + 1}))(Keep.right)
} 
开发者ID:reactive-hub,项目名称:reactive-apns-example,代码行数:23,代码来源:KafkaPushQueue.scala


示例13: BaseStation2Kafka

//设置package包名称以及导入依赖的类
package com.darienmt.airplaneadventures.basestation.collector.streams

import akka.Done
import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.ActorMaterializer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ ByteArraySerializer, StringSerializer }

import scala.concurrent.Future
import io.circe._
import io.circe.generic.auto._
import io.circe.parser._
import io.circe.syntax._
import io.circe.java8.time.{ encodeLocalDateDefault, encodeZonedDateTimeDefault }
import com.darienmt.airplaneadventures.basestation.collector.parsing.CirceEncoders._

object BaseStation2Kafka {

  case class SourceConfig(address: String, port: Int)
  case class SinkConfig(address: String, port: Int, topic: String)

  def apply(sourceConfig: SourceConfig, sinkConfig: SinkConfig)(implicit system: ActorSystem, materializer: ActorMaterializer): Future[Done] =
    BaseStationSource(sourceConfig.address, sourceConfig.port)
      .map(_.asJson.noSpaces)
      .map(m => new ProducerRecord[Array[Byte], String](sinkConfig.topic, m))
      .runWith(
        Producer.plainSink(
          ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
            .withBootstrapServers(s"${sinkConfig.address}:${sinkConfig.port}")
        )
      )
} 
开发者ID:darienmt,项目名称:airplane-adventures,代码行数:35,代码来源:BaseStation2Kafka.scala


示例14: Main

//设置package包名称以及导入依赖的类
package com.darienmt.airplaneadventures.basestation.rawcollector

import java.net.InetSocketAddress

import akka.Done
import akka.io.Inet.SocketOption
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.{ Framing, Source, Tcp }
import akka.util.ByteString
import com.darienmt.keepers.{ Generator, KeepThisUp, MainCommons }
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ ByteArraySerializer, StringSerializer }

import scala.collection.immutable
import scala.collection.immutable.IndexedSeq
import scala.util.Success
import scala.concurrent.duration._

object Main extends App with MainCommons {

  implicit def asFiniteDuration(d: java.time.Duration): FiniteDuration =
    scala.concurrent.duration.Duration.fromNanos(d.toNanos)

  val bsAddress = config.getString("station.address")
  val bsPort = config.getInt("station.port")
  val connectTimeout: Duration = config.getDuration("station.connectTimeout")
  val idleTimeout: Duration = config.getDuration("station.idleTimeout")

  val kafkaAddress = config.getString("kafka.address")
  val kafkaPort = config.getInt("kafka.port")
  val kafkaTopic = config.getString("kafka.topic")


  val generator: Generator = () => Source(IndexedSeq(ByteString.empty))
    .via(
      Tcp().outgoingConnection(
        remoteAddress = InetSocketAddress.createUnresolved(bsAddress, bsPort),
        connectTimeout = connectTimeout,
        idleTimeout = idleTimeout
      )
        .via(Framing.delimiter(ByteString("\n"), 256))
        .map(_.utf8String)
    )
    .map(m => new ProducerRecord[Array[Byte], String](kafkaTopic, m))
    .runWith(
      Producer.plainSink(
        ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
          .withBootstrapServers(s"${kafkaAddress}:${kafkaPort}")
      )
    )

  val keeper = KeepThisUp(config)
  keeper(generator)
} 
开发者ID:darienmt,项目名称:airplane-adventures,代码行数:56,代码来源:Main.scala


示例15: produceRecord

//设置package包名称以及导入依赖的类
package de.choffmeister.microserviceutils.kafka.testkit

import java.util.UUID

import akka.Done
import akka.actor.ActorSystem
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.stream.Materializer
import akka.stream.scaladsl.{Keep, Sink, Source}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{Deserializer, Serializer}

import scala.concurrent.Future

trait KafkaTest {
  def produceRecord[K, V](topic: String, keySerializer: Serializer[K], valueSerializer: Serializer[V], key: K, value: V)(implicit system: ActorSystem, mat: Materializer): Future[Done] = {
    val producerSettings = ProducerSettings(system, keySerializer, valueSerializer)
      .withBootstrapServers(system.settings.config.getString("kafka.bootstrap-servers"))

    Source.single(new ProducerRecord("mail.command.send", key, value))
      .toMat(Producer.plainSink(producerSettings))(Keep.right)
      .run()
  }

  def consumeRecordPF[K, V, Out](topic: String, keyDeserializer: Deserializer[K], valueDeserializer: Deserializer[V])(pf: PartialFunction[ConsumerRecord[K, V], Out])(implicit system: ActorSystem, mat: Materializer): Future[Out] = {
    val consumerSettings = ConsumerSettings(system, keyDeserializer, valueDeserializer)
      .withBootstrapServers(system.settings.config.getString("kafka.bootstrap-servers"))
      .withGroupId(UUID.randomUUID.toString)
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

    Consumer.plainSource(consumerSettings, Subscriptions.topics(topic))
      .filter(pf.isDefinedAt)
      .map(pf)
      .toMat(Sink.head)(Keep.right)
      .run()
  }
} 
开发者ID:choffmeister,项目名称:microservice-utils,代码行数:40,代码来源:KafkaTest.scala


示例16: SimpleProducer

//设置package包名称以及导入依赖的类
package de.choffmeister.microserviceutils.kafka

import akka.NotUsed
import akka.kafka.ProducerMessage.Result
import akka.kafka.{ProducerMessage, ProducerSettings}
import org.apache.kafka.clients.producer.{Callback, ProducerRecord, RecordMetadata}

import scala.concurrent.{Future, Promise}

class SimpleProducer[K, V](producerSettings: ProducerSettings[K, V]) {
  private val producer = producerSettings.createKafkaProducer()

  def send(record: ProducerRecord[K, V]): Future[Result[K, V, NotUsed]] = {
    val promise = Promise[Result[K, V, NotUsed]]
    val callback = new Callback {
      override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
        Option(exception) match {
          case Some(err) => promise.failure(err)
          case None => promise.success(Result(metadata, ProducerMessage.Message(record, NotUsed)))
        }
      }
    }

    producer.send(record, callback)
    promise.future
  }
} 
开发者ID:choffmeister,项目名称:microservice-utils,代码行数:28,代码来源:SimpleProducer.scala


示例17: Config

//设置package包名称以及导入依赖的类
package com.github.kliewkliew.cornucopia.kafka

import akka.actor.ActorSystem
import akka.kafka.scaladsl.{Producer, Consumer => ConsumerDSL}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Supervision}
import com.typesafe.config.ConfigFactory
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.slf4j.LoggerFactory

import scala.concurrent.duration._

object Config {
  object Cornucopia {
    private val config = ConfigFactory.load().getConfig("cornucopia")
    val minReshardWait = config.getInt("reshard.interval").seconds
    val gracePeriod = config.getInt("grace.period") * 1000
    val refreshTimeout = config.getInt("refresh.timeout") * 1000
    val batchPeriod = config.getInt("batch.period").seconds
  }

  object Consumer {
    private val kafkaConfig = ConfigFactory.load().getConfig("kafka")
    private val kafkaServers = kafkaConfig.getString("bootstrap.servers")
    private val kafkaConsumerConfig = kafkaConfig.getConfig("consumer")
    private val topic = kafkaConsumerConfig.getString("topic")
    private val groupId = kafkaConsumerConfig.getString("group.id")

    implicit val actorSystem = ActorSystem()
    // Log failures and resume processing
    private val decider: Supervision.Decider = { e =>
      LoggerFactory.getLogger(this.getClass).error("Failed to process event", e)
      Supervision.Resume
    }
    private val materializerSettings = ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider)

    private val sourceSettings = ConsumerSettings(actorSystem, new StringDeserializer, new StringDeserializer)
      .withBootstrapServers(kafkaServers)
      .withGroupId(groupId)
    private val subscription = Subscriptions.topics(topic)

    private val sinkSettings = ProducerSettings(actorSystem, new StringSerializer, new StringSerializer)
      .withBootstrapServers(kafkaServers)

    implicit val materializer = ActorMaterializer(materializerSettings)(actorSystem)
    val cornucopiaSource = ConsumerDSL.plainSource(sourceSettings, subscription)
    val cornucopiaSink = Producer.plainSink(sinkSettings)
  }

} 
开发者ID:kliewkliew,项目名称:cornucopia,代码行数:51,代码来源:Config.scala


示例18: ServiceKafkaProducer

//设置package包名称以及导入依赖的类
package services

import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import play.api.Configuration


class ServiceKafkaProducer(topicName: String, actorSystem: ActorSystem,
                           configuration: Configuration) {
  val bootstrapServers: String = configuration.getString("kafka.bootstrap.servers")
    .getOrElse(throw new Exception("No config for kafka bootstrap servers!"))

  val producerSettings: ProducerSettings[String, String] = ProducerSettings(actorSystem, new StringSerializer,
    new StringSerializer)
    .withBootstrapServers(bootstrapServers)

  val producer: KafkaProducer[String, String] = producerSettings.createKafkaProducer

  def send(logRecordStr: String): Unit = {
    producer.send(new ProducerRecord(topicName, logRecordStr))
  }

} 
开发者ID:getArtemUsername,项目名称:play-and-events,代码行数:26,代码来源:ServiceKafkaProducer.scala


示例19: KafkaEventProcessor

//设置package包名称以及导入依赖的类
package events

import akka.actor.{Actor, ActorLogging, Props}
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Source}
import com.google.inject.Inject
import events.KafkaEventProcessor.Init
import events.Serializer.EventSerializer
import models.KafkaEvents.{Event, Hello}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.ByteArraySerializer
import play.api.{Configuration, Logger}

import scala.concurrent.ExecutionContextExecutor


class KafkaEventProcessor @Inject()(config: Configuration) extends Actor with ActorLogging {

  private val eventStream = context.system.eventStream
  implicit val materializer: ActorMaterializer = ActorMaterializer()(context.system)
  implicit val ctx: ExecutionContextExecutor = context.dispatcher

  private val server = config.getString("kafka.broker").getOrElse("localhost:9092")
  private val topic = config.getString("kafka.topic").getOrElse("test")

  override def preStart(): Unit = {
    super.preStart()
    self ! Init
    log.info("Start EventsProcessorActor")
  }

  override def postStop(): Unit = {
    eventStream.unsubscribe(self)
    super.postStop()
  }

  override def receive = {
    case Init => createProducer()
  }

  private def createProducer() = {
    val producerSettings = ProducerSettings(context.system, new ByteArraySerializer(), new EventSerializer())
      .withBootstrapServers(server)

    val jobManagerSource = Source.actorPublisher[Event](Props(classOf[KafkaEventPublisher]))

    Flow[Event].map {
      case e: Hello => new ProducerRecord[Array[Byte], Event](topic, e)
    }.to(Producer.plainSink(producerSettings))
      .runWith(jobManagerSource)

    Logger.info("init producer")
  }
}

object KafkaEventProcessor {
  case object Init
} 
开发者ID:fsanaulla,项目名称:Akka-Kafka-Producer,代码行数:61,代码来源:KafkaEventProcessor.scala


示例20: KafkaPublishExample

//设置package包名称以及导入依赖的类
package com.benencahill.akka.streams

import java.util.UUID

import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Keep, Source}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer

import scala.util.{Random, Success}

object KafkaPublishExample extends App {

  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()
  implicit val context = system.dispatcher

  val events = Source.fromIterator(() => Iterator.continually(
    UserEvent(
      id = UUID.randomUUID().toString,
      userId = Random.shuffle(Seq(None, Some(UUID.randomUUID().toString))).head,
      page = "/home",
      ip = "127.0.0.1",
      eventType = Random.shuffle(Seq(PageView, ButtonClick)).head)
    )
  )

  val serialize = Flow[UserEvent].map { userEvent =>
    import UserEventJsonProtocol._
    import spray.json._
    userEvent.toJson.toString()
  }

  val toKafkaRecord = Flow[String].map { message =>
    new ProducerRecord[String, String]("testing-akka-streams", message)
  }

  val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
    .withBootstrapServers("localhost:9092")

  val publishToKafka = Producer.plainSink(producerSettings)

  val blueprint = events
    .take(10000)
    .via(serialize)
    .via(toKafkaRecord)
    .toMat(publishToKafka)(Keep.right)


  val materialized = blueprint.run()

  materialized.andThen { case Success(_) => system.terminate() }
} 
开发者ID:benen,项目名称:akka-streams-intro,代码行数:57,代码来源:KafkaPublishExample.scala



注:本文中的akka.kafka.ProducerSettings类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala DoubleMatrix类代码示例发布时间:2022-05-23
下一篇:
Scala DefaultDB类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap