• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala ByteArraySerializer类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中org.apache.kafka.common.serialization.ByteArraySerializer的典型用法代码示例。如果您正苦于以下问题:Scala ByteArraySerializer类的具体用法?Scala ByteArraySerializer怎么用?Scala ByteArraySerializer使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了ByteArraySerializer类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: ProcessingKafkaApplication

//设置package包名称以及导入依赖的类
package com.packt.chapter8

import akka.actor.ActorSystem
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.stream.{ActorMaterializer, ClosedShape}
import akka.stream.scaladsl.{Flow, GraphDSL, RunnableGraph, Sink, Source}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}

import scala.concurrent.duration._

object ProcessingKafkaApplication extends App {
  implicit val actorSystem = ActorSystem("SimpleStream")
  implicit val actorMaterializer = ActorMaterializer()

  val bootstrapServers = "localhost:9092"
  val kafkaTopic = "akka_streams_topic"
  val partition = 0
  val subscription = Subscriptions.assignment(new TopicPartition(kafkaTopic, partition))

  val consumerSettings = ConsumerSettings(actorSystem, new ByteArrayDeserializer, new StringDeserializer)
    .withBootstrapServers(bootstrapServers)
    .withGroupId("akka_streams_group")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  val producerSettings = ProducerSettings(actorSystem, new ByteArraySerializer, new StringSerializer)
    .withBootstrapServers(bootstrapServers)

  val runnableGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._

    val tickSource = Source.tick(0 seconds, 5 seconds, "Hello from Akka Streams using Kafka!")
    val kafkaSource = Consumer.plainSource(consumerSettings, subscription)
    val kafkaSink = Producer.plainSink(producerSettings)
    val printlnSink = Sink.foreach(println)

    val mapToProducerRecord = Flow[String].map(elem => new ProducerRecord[Array[Byte], String](kafkaTopic, elem))
    val mapFromConsumerRecord = Flow[ConsumerRecord[Array[Byte], String]].map(record => record.value())

    tickSource  ~> mapToProducerRecord   ~> kafkaSink
    kafkaSource ~> mapFromConsumerRecord ~> printlnSink

    ClosedShape
  })

  runnableGraph.run()
} 
开发者ID:PacktPublishing,项目名称:Akka-Cookbook,代码行数:51,代码来源:ProcessingKafkaApplication.scala


示例2: Settings

//设置package包名称以及导入依赖的类
package com.scalaio.kafka.consumer

import akka.actor.ActorSystem
import akka.kafka.ConsumerMessage.CommittableMessage
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import com.scalaio.kafka.consumer.Settings.consumerSettings
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}

import scala.concurrent.Future

object Settings {
  def consumerSettings(implicit system: ActorSystem) =
    ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
      .withBootstrapServers("localhost:9092")
      .withGroupId("CommittableSourceConsumer")
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  def producerSettings(implicit system: ActorSystem) =
    ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
      .withBootstrapServers("localhost:9092")
}

object CommittableSource extends App {

  type KafkaMessage = CommittableMessage[Array[Byte], String]

  implicit val system = ActorSystem("CommittableSourceConsumerMain")
  implicit val materializer = ActorMaterializer()

  implicit val ec = system.dispatcher

  // explicit commit
  Consumer
    .committableSource(consumerSettings, Subscriptions.topics("topic1"))
    .mapAsync(1) { msg =>
      BusinessController.handleMessage(msg.record.value)
        .flatMap(response => msg.committableOffset.commitScaladsl())
        .recoverWith { case e => msg.committableOffset.commitScaladsl() }
    }
    .runWith(Sink.ignore)

}

object BusinessController {

  type Service[A, B] = A => Future[B]

  val handleMessage: Service[String, String] =
    (message) => Future.successful(message.toUpperCase)

} 
开发者ID:fagossa,项目名称:scalaio_akka,代码行数:56,代码来源:CommittableSource.scala


示例3: running

//设置package包名称以及导入依赖的类
package producers

import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Source}
import akka.{Done, NotUsed}
import broker.ActorBroker
import config.AppConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}

import scala.concurrent.Future

trait Producerable extends ActorBroker {
  val config: AppConfig
  implicit val materializer = ActorMaterializer()

  val producerSettings = ProducerSettings(context.system, new ByteArraySerializer, new StringSerializer)
    .withBootstrapServers(s"${config.kafkaConfig.uri}:${config.kafkaConfig.port}")

  def running(): Receive = {
    case Stop =>
      log.info("Stopping Kafka producer stream and actor")
      context.stop(self)
  }

  def sendToSink(message: String): Unit = {
    log.info(s"Attempting to produce message on topic $topicName")
    val kafkaSink = Producer.plainSink(producerSettings)

    val stringToProducerRecord: ProducerRecord[Array[Byte], String] = new ProducerRecord[Array[Byte], String](topicName, message)
    val (a, future): (NotUsed, Future[Done]) = Source.fromFuture(Future(stringToProducerRecord))
      .toMat(kafkaSink)(Keep.both)
      .run()
    future.onFailure {
      case ex =>
        log.error("Stream failed due to error, restarting", ex)
        throw ex
    }
    context.become(running())
    log.info(s"Writer now running, writing random numbers to topic $topicName")
  }


  case object Stop
} 
开发者ID:jguido,项目名称:reactive-kafka-registration,代码行数:48,代码来源:Producerable.scala


示例4: self

//设置package包名称以及导入依赖的类
package com.omearac.producers

import akka.actor.{ActorRef, ActorSystem}
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{Flow, Source}
import com.omearac.shared.JsonMessageConversion.Conversion
import com.omearac.shared.{AkkaStreams, EventSourcing}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}



trait ProducerStream extends AkkaStreams with EventSourcing {
    implicit val system: ActorSystem
    def self: ActorRef

    def createStreamSource[msgType] = {
        Source.queue[msgType](Int.MaxValue,OverflowStrategy.backpressure)
    }

    def createStreamSink(producerProperties: Map[String, String]) = {
        val kafkaMBAddress = producerProperties("bootstrap-servers")
        val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer).withBootstrapServers(kafkaMBAddress)

        Producer.plainSink(producerSettings)
    }

    def createStreamFlow[msgType: Conversion](producerProperties: Map[String, String]) = {
        val numberOfPartitions = producerProperties("num.partitions").toInt -1
        val topicToPublish = producerProperties("publish-topic")
        val rand = new scala.util.Random
        val range = 0 to numberOfPartitions

        Flow[msgType].map { msg =>
            val partition = range(rand.nextInt(range.length))
            val stringJSONMessage = Conversion[msgType].convertToJson(msg)
            new ProducerRecord[Array[Byte], String](topicToPublish, partition, null, stringJSONMessage)
        }
    }
} 
开发者ID:omearac,项目名称:reactive-kafka-microservice-template,代码行数:43,代码来源:ProducerStream.scala


示例5: ReactiveKafkaSingleConsumerMultipleProducerScala

//设置package包名称以及导入依赖的类
package org.rgcase.reactivekafka

import akka.actor.ActorSystem
import akka.kafka.ConsumerMessage.{ CommittableMessage, CommittableOffsetBatch }
import akka.kafka.ProducerMessage.Message
import akka.kafka.scaladsl.{ Consumer, Producer }
import akka.kafka.{ ConsumerSettings, ProducerSettings, Subscriptions }
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ Flow, Sink }
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer }

class ReactiveKafkaSingleConsumerMultipleProducerScala extends App {

  implicit val system = ActorSystem("reactivekafkascala")
  implicit val mat = ActorMaterializer()

  val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
    .withBootstrapServers("localhost:9092")
    .withGroupId("group1")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
    .withBootstrapServers("localhost:9093")

  val kafkaSource =
    Consumer.committableSource(consumerSettings, Subscriptions.topics("sourcetopic"))

  def toProducerMessage(topic: String) = (msg: CommittableMessage[Array[Byte], String]) ?
    Message[Array[Byte], String, CommittableMessage[Array[Byte], String]](new ProducerRecord(topic, msg.record.value), msg)

  val producerFlow1 =
    Flow.fromFunction(toProducerMessage("targettopic1")).via(Producer.flow(producerSettings)).map(_.message.passThrough)

  val producerFlow2 =
    Flow.fromFunction(toProducerMessage("targettopic2")).via(Producer.flow(producerSettings)).map(_.message.passThrough)

  val producerFlow3 =
    Flow.fromFunction(toProducerMessage("targettopic3")).via(Producer.flow(producerSettings)).map(_.message.passThrough)

  kafkaSource
    .via(producerFlow1)
    .via(producerFlow2)
    .via(producerFlow3)
    .batch(max = 20, first ? CommittableOffsetBatch.empty.updated(first.committableOffset)) { (batch, elem) ?
      batch.updated(elem.committableOffset)
    }.mapAsync(3)(_.commitScaladsl())
    .runWith(Sink.ignore)

} 
开发者ID:rgcase,项目名称:testplayground,代码行数:52,代码来源:ReactiveKafkaSingleConsumerMultipleProducerScala.scala


示例6: ReactiveProducer

//设置package包名称以及导入依赖的类
package co.s4n.reactiveKafka

import akka.actor.ActorSystem
import akka.kafka.ProducerMessage
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.Source
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.common.serialization.StringSerializer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import scala.concurrent.Future
import akka.Done
import scala.util.{ Failure, Success }


object ReactiveProducer {

  val system = ActorSystem("example")
  implicit val ec = system.dispatcher
  implicit val materializer = ActorMaterializer.create(system)

  val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
    .withBootstrapServers("localhost:9092")
  val kafkaProducer = producerSettings.createKafkaProducer()

  def produce(msg: String): Unit = {
    val done = Source(1 to 1)
      .map(_.toString)
      .map { elem =>
        println("\n" + msg);
        new ProducerRecord[Array[Byte], String]("UsersTopic", msg)
      }
      .runWith(Producer.plainSink(producerSettings, kafkaProducer))
    // #plainSinkWithProducer

    //    terminateWhenDone(done)
  }

  def terminateWhenDone(result: Future[Done]): Unit = {
    result.onComplete {
      case Failure(e) =>
        system.log.error(e, e.getMessage)
        system.terminate()
      case Success(_) => system.terminate()
    }
  }

} 
开发者ID:bazzo03,项目名称:users-api,代码行数:51,代码来源:ReactiveProducer.scala


示例7: FlowProducerMain

//设置package包名称以及导入依赖的类
package com.example.producer

import akka.actor.ActorSystem
import akka.kafka.scaladsl.Producer
import akka.kafka.{ProducerMessage, ProducerSettings}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}


object FlowProducerMain extends App {
  implicit val system = ActorSystem("FlowProducerMain")
  implicit val materializer = ActorMaterializer()

  val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
    .withBootstrapServers("localhost:9092")

  val done = Source(1 to 10)
    .map { n =>
      // val partition = math.abs(n) % 2
      val partition = 0
      ProducerMessage.Message(new ProducerRecord[Array[Byte], String](
        "topic1", partition, null, n.toString
      ), n)
    }
    .via(Producer.flow(producerSettings))
    .map { result =>
      val record = result.message.record
      println(s"${record.topic}/${record.partition} ${result.offset}: ${record.value}" +
        s"(${result.message.passThrough})")
      result
    }
    .runWith(Sink.ignore)
} 
开发者ID:makersu,项目名称:reactive-kafka-scala-example,代码行数:36,代码来源:FlowProducerMain.scala


示例8: CommitConsumerToFlowProducerMain

//设置package包名称以及导入依赖的类
package com.example.producer

import akka.actor.ActorSystem
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerMessage, ProducerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}


object CommitConsumerToFlowProducerMain extends App {
  implicit val system = ActorSystem("CommitConsumerToFlowProducerMain")
  implicit val materializer = ActorMaterializer()

  val consumerSettings =
    ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
      .withBootstrapServers("localhost:9092")
      .withGroupId("CommitConsumerToFlowProducer")
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
    .withBootstrapServers("localhost:9092")

  val done =
    Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
      .map { msg =>
        println(s"topic1 -> topic2: $msg")
        ProducerMessage.Message(new ProducerRecord[Array[Byte], String](
          "topic2",
          msg.record.value
        ), msg.committableOffset)
      }
      .via(Producer.flow(producerSettings))
      .mapAsync(producerSettings.parallelism) { result =>
        result.message.passThrough.commitScaladsl()
      }
      .runWith(Sink.ignore)
} 
开发者ID:makersu,项目名称:reactive-kafka-scala-example,代码行数:41,代码来源:CommitConsumerToFlowProducerMain.scala


示例9: PlainSinkProducerMain

//设置package包名称以及导入依赖的类
package com.example.producer

import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}


object PlainSinkProducerMain extends App {

  implicit val system = ActorSystem("PlainSinkProducerMain")
  implicit val materializer = ActorMaterializer()

  val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
    .withBootstrapServers("localhost:9092")

  val done = Source(1 to 10)
    .map(_.toString)
    .map { elem =>
      println(s"PlainSinkProducer produce: ${elem}")
      new ProducerRecord[Array[Byte], String]("topic1", elem)
    }
    .runWith(Producer.plainSink(producerSettings))

} 
开发者ID:makersu,项目名称:reactive-kafka-scala-example,代码行数:29,代码来源:PlainSinkProducerMain.scala


示例10: ConsumerToCommitableSinkProducerMain

//设置package包名称以及导入依赖的类
package com.example.producer

import akka.actor.ActorSystem
import akka.kafka.ConsumerMessage.CommittableOffsetBatch
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerMessage, ProducerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}


object ConsumerToCommitableSinkProducerMain extends App {

  implicit val system = ActorSystem("Consumer2ProducerMain")
  implicit val materializer = ActorMaterializer()

  //TODO: move to configuration application.conf
  val consumerSettings =
    ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
      .withBootstrapServers("localhost:9092")
      .withGroupId("Consumer2Producer")
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  //TODO: move to configuration application.conf
  val producerSettings =
    ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
      .withBootstrapServers("localhost:9092")

  Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
    .map { msg =>
      println(s"topic1 -> topic2: $msg")
      ProducerMessage.Message(new ProducerRecord[Array[Byte], String](
        "topic2",
        msg.record.value
      ), msg.committableOffset)
    }
    .runWith(Producer.commitableSink(producerSettings))

} 
开发者ID:makersu,项目名称:reactive-kafka-scala-example,代码行数:42,代码来源:ConsumerToCommitableSinkProducerMain.scala


示例11: BaseStation2Kafka

//设置package包名称以及导入依赖的类
package com.darienmt.airplaneadventures.basestation.collector.streams

import akka.Done
import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.ActorMaterializer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ ByteArraySerializer, StringSerializer }

import scala.concurrent.Future
import io.circe._
import io.circe.generic.auto._
import io.circe.parser._
import io.circe.syntax._
import io.circe.java8.time.{ encodeLocalDateDefault, encodeZonedDateTimeDefault }
import com.darienmt.airplaneadventures.basestation.collector.parsing.CirceEncoders._

object BaseStation2Kafka {

  case class SourceConfig(address: String, port: Int)
  case class SinkConfig(address: String, port: Int, topic: String)

  def apply(sourceConfig: SourceConfig, sinkConfig: SinkConfig)(implicit system: ActorSystem, materializer: ActorMaterializer): Future[Done] =
    BaseStationSource(sourceConfig.address, sourceConfig.port)
      .map(_.asJson.noSpaces)
      .map(m => new ProducerRecord[Array[Byte], String](sinkConfig.topic, m))
      .runWith(
        Producer.plainSink(
          ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
            .withBootstrapServers(s"${sinkConfig.address}:${sinkConfig.port}")
        )
      )
} 
开发者ID:darienmt,项目名称:airplane-adventures,代码行数:35,代码来源:BaseStation2Kafka.scala


示例12:

//设置package包名称以及导入依赖的类
import akka.kafka._
import akka.kafka.scaladsl._
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.common.serialization.ByteArraySerializer

val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer).withBootstrapServers("localhost:9092")

Source(1 to 10000)
  .map(_.toString)
  .map(elem => new ProducerRecord[Array[Byte], String]("topic1", elem))
  .to(Producer.plainSink(producerSettings))

Source(1 to 10000).map(elem => ProducerMessage.Message(new ProducerRecord[Array[Byte], String]("topic1", elem.toString), elem))
    .via(Producer.flow(producerSettings))
    .map { result =>
      val record = result.message.record
      println(s"${record.topic}/${record.partition} ${result.offset}: ${record.value} (${result.message.passThrough}")
      result
    } 
开发者ID:PacktPublishing,项目名称:Fast-Data-Processing-Systems-with-SMACK-Stack,代码行数:20,代码来源:ProducerSettings.scala


示例13: KafkaEventProcessor

//设置package包名称以及导入依赖的类
package events

import akka.actor.{Actor, ActorLogging, Props}
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Source}
import com.google.inject.Inject
import events.KafkaEventProcessor.Init
import events.Serializer.EventSerializer
import models.KafkaEvents.{Event, Hello}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.ByteArraySerializer
import play.api.{Configuration, Logger}

import scala.concurrent.ExecutionContextExecutor


class KafkaEventProcessor @Inject()(config: Configuration) extends Actor with ActorLogging {

  private val eventStream = context.system.eventStream
  implicit val materializer: ActorMaterializer = ActorMaterializer()(context.system)
  implicit val ctx: ExecutionContextExecutor = context.dispatcher

  private val server = config.getString("kafka.broker").getOrElse("localhost:9092")
  private val topic = config.getString("kafka.topic").getOrElse("test")

  override def preStart(): Unit = {
    super.preStart()
    self ! Init
    log.info("Start EventsProcessorActor")
  }

  override def postStop(): Unit = {
    eventStream.unsubscribe(self)
    super.postStop()
  }

  override def receive = {
    case Init => createProducer()
  }

  private def createProducer() = {
    val producerSettings = ProducerSettings(context.system, new ByteArraySerializer(), new EventSerializer())
      .withBootstrapServers(server)

    val jobManagerSource = Source.actorPublisher[Event](Props(classOf[KafkaEventPublisher]))

    Flow[Event].map {
      case e: Hello => new ProducerRecord[Array[Byte], Event](topic, e)
    }.to(Producer.plainSink(producerSettings))
      .runWith(jobManagerSource)

    Logger.info("init producer")
  }
}

object KafkaEventProcessor {
  case object Init
} 
开发者ID:fsanaulla,项目名称:Akka-Kafka-Producer,代码行数:61,代码来源:KafkaEventProcessor.scala


示例14: Config

//设置package包名称以及导入依赖的类
package com.kissthinker.kafka

import java.util.Properties
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer}

abstract class Config(kafkaAddress: String) extends Properties {
  put("bootstrap.servers", kafkaAddress)
}

class PublisherConfig(kafkaAddress: String) extends Config(kafkaAddress) {
  put("metadata.broker.list", kafkaAddress)
  //put("client.id", "kafka-publisher")
  put("key.serializer", classOf[ByteArraySerializer].getName)
  put("value.serializer", classOf[ByteArraySerializer].getName)
  put("producer.type", "async")
}

class SubscriberConfig(zookeeperAddress: String, kafkaAddress: String) extends Config(kafkaAddress) {
  //put("zookeeper.connect", "127.0.0.1:2181")
  put("zookeeper.connect", zookeeperAddress)
  //put("group.id", "1")
  put("group.id", "2")
  put("auto.offset.reset", "largest")
  put("zookeeper.session.timeout.ms", "400")
  put("zookeeper.sync.time.ms", "200")
  put("auto.commit.interval.ms", "1000")
  put("key.deserializer", classOf[ByteArrayDeserializer].getName)
  put("value.deserializer", classOf[ByteArrayDeserializer].getName)
} 
开发者ID:davidainslie,项目名称:kafka-kissthinker,代码行数:30,代码来源:Config.scala


示例15: ThrottolableConsumerFlowSpec

//设置package包名称以及导入依赖的类
package com.github.everpeace.reactive_kafka

import akka.Done
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Sink}
import akka.testkit.TestProbe
import com.typesafe.config.ConfigFactory
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
import org.scalatest.{FlatSpec, Matchers}

import scala.concurrent.Await
import scala.concurrent.duration._

class ThrottolableConsumerFlowSpec extends FlatSpec with Matchers with EmbeddedKafka with RandomPortSupport {
  val kafkaPort = temporaryServerPort()
  val zkPort = temporaryServerPort()
  val TOPIC = "topic1"

  implicit val system = ActorSystem("ThrottolableConsumerFlowSpec", ConfigFactory.parseString(
    s"""
      |throttolable-consumer {
      |  bootstrap-servers = "localhost:$kafkaPort"
      |  topic = "$TOPIC"
      |  group-id = "throttolable-consumer-flow-spec"
      |  throttle = 0
      |  offset-commit-batch-size = 2
      |  offset-commit-parallelism = 10
      |}
    """.stripMargin))
  implicit val materializer = ActorMaterializer()
  implicit val kafkaConfig = EmbeddedKafkaConfig(kafkaPort, zkPort)
  implicit val byteArraySer = new ByteArraySerializer
  implicit val stringSer = new StringSerializer

  def createMsg(n: Int): Seq[(Array[Byte], String)] = {
    def gen = {
      val key = scala.util.Random.alphanumeric.take(10).mkString.getBytes()
      val msg = scala.util.Random.alphanumeric.take(10).mkString
      key -> msg
    }
    Iterator.tabulate(n)(_ => gen).toSeq
  }

  "FakeConsumerFlow" should "consume messages correctly" in withRunningKafka {
    createCustomTopic("topic1")
    val p = TestProbe("sinkProbe")

    val control = throttolableConsumerFlow.toMat(Sink.actorRef(p.ref, Done))(Keep.left).run()

    Thread.sleep((5 second).toMillis)

    val n = 100
    createMsg(n).foreach(kv => publishToKafka(TOPIC, kv._1, kv._2))

    (0 until n).foreach(_ => p.expectMsgType[Done])

    Await.result(control.shutdown(), 1 minute)
  }
} 
开发者ID:everpeace,项目名称:throttolable-perf-consumer,代码行数:62,代码来源:ThrottolableConsumerFlowSpec.scala


示例16: FutureToTry

//设置package包名称以及导入依赖的类
package com.github.dnvriend

import akka.NotUsed
import akka.actor._
import akka.event.{Logging, LoggingAdapter}
import akka.kafka.{ConsumerSettings, ProducerSettings}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.clients.consumer.ConsumerConfig
import akka.stream.scaladsl.Source
import akka.stream.{ActorMaterializer, Materializer}
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try

trait TestSpec extends FlatSpec with Matchers with ScalaFutures with BeforeAndAfterAll {
  implicit val system: ActorSystem = ActorSystem()
  implicit val ec: ExecutionContext = system.dispatcher
  implicit val mat: Materializer = ActorMaterializer()
  implicit val log: LoggingAdapter = Logging(system, this.getClass)
  implicit val pc: PatienceConfig = PatienceConfig(timeout = 50.seconds)

  val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
    .withBootstrapServers("boot2docker:9092")

  val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer,
    Set("topic1"))
    .withBootstrapServers("boot2docker:9092")
    .withGroupId("group1")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  implicit class FutureToTry[T](f: Future[T]) {
    def toTry: Try[T] = Try(f.futureValue)
  }

  
  def withIterator[T](start: Int = 0)(f: Source[Int, NotUsed] ? T): T =
    f(Source.fromIterator(() ? Iterator from start))

  override protected def afterAll(): Unit = {
    system.terminate()
    system.whenTerminated.toTry should be a 'success
  }
} 
开发者ID:dnvriend,项目名称:reactive-kafka-test,代码行数:49,代码来源:TestSpec.scala


示例17: system

//设置package包名称以及导入依赖的类
package io.scalac.newspaper.crawler.publishing

import akka.Done
import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.{Flow, Keep, Sink}
import io.scalac.newspaper.crawler.fetching.FetchingFlow.URLFetched
import io.scalac.newspaper.crawler.publishing.KafkaPublisher._
import io.scalac.newspaper.events.ContentFetched
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.ByteArraySerializer

import scala.concurrent.Future


trait KafkaPublisher extends Publisher {

  implicit def system: ActorSystem
  def topic: String
  private def producerSettings = ProducerSettings(system, new ByteArraySerializer, new ContentFetchedSerializer)

  override def publish: Sink[URLFetched, Future[Done]] =
    Flow[URLFetched]
      .map(new ProducerRecord[Array[Byte], ContentFetched](topic, _))
      .toMat(Producer.plainSink[Array[Byte], ContentFetched](producerSettings))(Keep.right)
}

object KafkaPublisher {
  implicit def urlFetched2ContentFetched(source: URLFetched): ContentFetched =
    ContentFetched(source.url, source.content)
} 
开发者ID:ScalaConsultants,项目名称:newspaper,代码行数:33,代码来源:KafkaPublisher.scala


示例18: publishNewSubscription

//设置package包名称以及导入依赖的类
package services

import javax.inject.Singleton

import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.{ActorMaterializer, OverflowStrategy}
import akka.stream.scaladsl.{Source, SourceQueueWithComplete}
import com.google.inject.ImplementedBy
import io.scalac.newspaper.events.SubscribeUser
import model.Subscriber
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer}
import serializers.SubscribeUserPBSerializer

import scala.concurrent.Future

@ImplementedBy(classOf[KafkaOutboundService])
trait OutboundService {
  def publishNewSubscription(s: Subscriber): Future[Unit]
}

@Singleton
class KafkaOutboundService() extends OutboundService {
  //TODO: move it outside and share among services
  implicit val system = ActorSystem("Newspaper-Analyzer-System")
  implicit val ec = system.dispatcher
  implicit val materializer = ActorMaterializer()

  val producerSettings = ProducerSettings(system, new ByteArraySerializer, new SubscribeUserPBSerializer)

  val source: SourceQueueWithComplete[SubscribeUser] =
    Source.queue[SubscribeUser](100, OverflowStrategy.fail).map { sub =>
      new ProducerRecord[Array[Byte], SubscribeUser]("newspaper-users", sub)
    }
    .to(Producer.plainSink(producerSettings))
    .run()

  override def publishNewSubscription(sub: Subscriber): Future[Unit] = {
    val domainObject = SubscribeUser(sub.email, sub.name.getOrElse(""))

    source.offer(domainObject).map{ _ =>
      ()
    }
  }
} 
开发者ID:ScalaConsultants,项目名称:newspaper,代码行数:48,代码来源:OutboundService.scala


示例19: AnalyzerRunner

//设置package包名称以及导入依赖的类
package io.scalac.newspaper.analyzer

import akka.actor.ActorSystem
import akka.kafka.ProducerMessage
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{Subscriptions, ConsumerSettings, ProducerSettings}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Source, Sink}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.{ProducerRecord}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer}

import io.scalac.newspaper.events._

object AnalyzerRunner extends App {
  implicit val system = ActorSystem("Newspaper-Analyzer-System")
  implicit val materializer = ActorMaterializer()

  val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new ContentFetchedDeserializer)
    .withGroupId("Newspaper-Analyzer")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  val producerSettings = ProducerSettings(system, new ByteArraySerializer, new ChangeDetectedSerializer)

  val subscription = Subscriptions.topics("newspaper-content")
  Consumer.committableSource(consumerSettings, subscription)
    .map { msg =>
      // Do sth with msg.record.value
      println(s"[ANALYZING] ${msg.record.value}")
      val input = msg.record.value
      val output = ChangeDetected(input.pageUrl, input.pageContent)
      val record = new ProducerRecord[Array[Byte], ChangeDetected]("newspaper", output)
      ProducerMessage.Message(record, msg.committableOffset)
    }
    .via(Producer.flow(producerSettings))
    .map(_.message.passThrough)
    .mapAsync(1)(_.commitScaladsl())
    .runWith(Sink.ignore)

} 
开发者ID:ScalaConsultants,项目名称:newspaper,代码行数:41,代码来源:AnalyzerRunner.scala


示例20: KafkaPublisher

//设置package包名称以及导入依赖的类
package nl.tradecloud.kafka

import java.util.concurrent.atomic.AtomicInteger

import akka.Done
import akka.actor.{ActorRefFactory, ActorSystem, Props, SupervisorStrategy}
import akka.kafka.ProducerSettings
import akka.pattern.BackoffSupervisor
import akka.stream.Materializer
import nl.tradecloud.kafka.command.Publish
import nl.tradecloud.kafka.config.KafkaConfig
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future, Promise}

class KafkaPublisher(system: ActorSystem)(implicit mat: Materializer, context: ActorRefFactory) {
  import KafkaPublisher._

  implicit val dispatcher: ExecutionContext = system.dispatchers.lookup("dispatchers.kafka-dispatcher")

  val kafkaConfig = KafkaConfig(system.settings.config)

  private lazy val publisherId = KafkaClientIdSequenceNumber.getAndIncrement

  private def publisherSettings = {
    val keySerializer = new StringSerializer
    val valueSerializer = new ByteArraySerializer

    ProducerSettings(system, keySerializer, valueSerializer).withBootstrapServers(kafkaConfig.brokers)
  }

  private val publisherProps: Props = KafkaPublisherActor.props(kafkaConfig, publisherSettings)
  private val backoffPublisherProps: Props = BackoffSupervisor.propsWithSupervisorStrategy(
    publisherProps, s"KafkaPublisherActor$publisherId", 3.seconds,
    30.seconds, 1.0, SupervisorStrategy.stoppingStrategy
  )
  private val publishActor = context.actorOf(backoffPublisherProps, s"KafkaBackoffPublisher$publisherId")

  def publish(topic: String, msg: AnyRef): Future[Done] = {
    val completed: Promise[Done] = Promise()

    publishActor ! Publish(topic, msg, completed)

    completed.future
  }

}

object KafkaPublisher {
  private val KafkaClientIdSequenceNumber = new AtomicInteger(1)
} 
开发者ID:tradecloud,项目名称:kafka-akka-extension,代码行数:53,代码来源:KafkaPublisher.scala



注:本文中的org.apache.kafka.common.serialization.ByteArraySerializer类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala AuthConnector类代码示例发布时间:2022-05-23
下一篇:
Scala mapping类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap