本文整理汇总了Scala中org.apache.kafka.common.serialization.Deserializer类的典型用法代码示例。如果您正苦于以下问题:Scala Deserializer类的具体用法?Scala Deserializer怎么用?Scala Deserializer使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Deserializer类的10个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: configure
//设置package包名称以及导入依赖的类
package edu.uw.at.iroberts.wirefugue.kafka.serdes
import java.util
import org.apache.kafka.common.serialization.{Deserializer, Serde, Serializer}
trait StatelessSerde[T] { self: Serde[T] =>
override def configure(configs: util.Map[String, _], isKey: Boolean) = ()
override def close() = ()
}
trait StatelessSerializer[T] { self: Serializer[T] =>
override def configure(configs: util.Map[String, _], isKey: Boolean) = ()
override def close() = ()
}
trait StatelessDeserializer[T] { self: Deserializer[T] =>
override def configure(configs: util.Map[String, _], isKey: Boolean) = ()
override def close() = ()
}
开发者ID:robertson-tech,项目名称:wirefugue,代码行数:22,代码来源:Stateless.scala
示例2: EventDeserialiser
//设置package包名称以及导入依赖的类
package serialisation
import java.util
import model.Event
import org.apache.kafka.common.serialization.{Deserializer, StringDeserializer}
import org.json4s.native.Serialization
import org.json4s.native.Serialization.read
import org.json4s.{Formats, NoTypeHints}
class EventDeserialiser extends Deserializer[Event] {
implicit val formats: Formats = Serialization.formats(NoTypeHints)
val stringDeserialiser = new StringDeserializer
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = {
stringDeserialiser.configure(configs, isKey)
}
override def deserialize(topic: String, data: Array[Byte]): Event = {
val stringValue = stringDeserialiser.deserialize(topic, data)
read[Event](stringValue)
}
override def close(): Unit = {
stringDeserialiser.close()
}
}
开发者ID:benwheeler,项目名称:kafka-streams-poc,代码行数:30,代码来源:EventDeserialiser.scala
示例3: SeqDeserialiser
//设置package包名称以及导入依赖的类
package serialisation
import java.io.{ByteArrayInputStream, ObjectInputStream}
import java.util
import org.apache.kafka.common.serialization.Deserializer
import scala.collection.mutable
class SeqDeserialiser[ELEMENT] extends Deserializer[Seq[ELEMENT]] {
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = {
}
override def deserialize(topic: String, data: Array[Byte]): Seq[ELEMENT] = {
val byteStream = new ByteArrayInputStream(data)
val objectStream = new ObjectInputStream(byteStream)
val result = mutable.Seq[ELEMENT]()
while (objectStream.available() > 0) {
result :+ objectStream.readObject().asInstanceOf[ELEMENT]
}
objectStream.close()
result
}
override def close(): Unit = {
}
}
开发者ID:benwheeler,项目名称:kafka-streams-poc,代码行数:29,代码来源:SeqDeserialiser.scala
示例4: UUIDDeserializer
//设置package包名称以及导入依赖的类
package eventsource.kafka
import java.util
import java.util.UUID
import org.apache.kafka.common.errors.SerializationException
import org.apache.kafka.common.serialization.Deserializer
class UUIDDeserializer extends Deserializer[UUID] {
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = {}
override def deserialize(topic: String, data: Array[Byte]): UUID =
try {
UUID.fromString(new String(data, "UTF-8"))
} catch {
case e: Throwable =>
throw new SerializationException("Failed to deserialize uuid data: " + e.getMessage)
}
override def close(): Unit = {}
}
开发者ID:21re,项目名称:play-eventsource,代码行数:22,代码来源:UUIDDeserializer.scala
示例5: produceRecord
//设置package包名称以及导入依赖的类
package de.choffmeister.microserviceutils.kafka.testkit
import java.util.UUID
import akka.Done
import akka.actor.ActorSystem
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.stream.Materializer
import akka.stream.scaladsl.{Keep, Sink, Source}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{Deserializer, Serializer}
import scala.concurrent.Future
trait KafkaTest {
def produceRecord[K, V](topic: String, keySerializer: Serializer[K], valueSerializer: Serializer[V], key: K, value: V)(implicit system: ActorSystem, mat: Materializer): Future[Done] = {
val producerSettings = ProducerSettings(system, keySerializer, valueSerializer)
.withBootstrapServers(system.settings.config.getString("kafka.bootstrap-servers"))
Source.single(new ProducerRecord("mail.command.send", key, value))
.toMat(Producer.plainSink(producerSettings))(Keep.right)
.run()
}
def consumeRecordPF[K, V, Out](topic: String, keyDeserializer: Deserializer[K], valueDeserializer: Deserializer[V])(pf: PartialFunction[ConsumerRecord[K, V], Out])(implicit system: ActorSystem, mat: Materializer): Future[Out] = {
val consumerSettings = ConsumerSettings(system, keyDeserializer, valueDeserializer)
.withBootstrapServers(system.settings.config.getString("kafka.bootstrap-servers"))
.withGroupId(UUID.randomUUID.toString)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
Consumer.plainSource(consumerSettings, Subscriptions.topics(topic))
.filter(pf.isDefinedAt)
.map(pf)
.toMat(Sink.head)(Keep.right)
.run()
}
}
开发者ID:choffmeister,项目名称:microservice-utils,代码行数:40,代码来源:KafkaTest.scala
示例6: UUIDSerializer
//设置package包名称以及导入依赖的类
package de.choffmeister.microserviceutils.kafka
import java.nio.ByteBuffer
import java.util
import java.util.UUID
import org.apache.kafka.common.serialization.{Deserializer, Serializer}
class UUIDSerializer extends Serializer[UUID] {
override def serialize(topic: String, data: UUID): Array[Byte] = {
val bb = ByteBuffer.wrap(new Array[Byte](16))
bb.putLong(data.getMostSignificantBits)
bb.putLong(data.getLeastSignificantBits)
bb.array
}
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = {}
override def close(): Unit = {}
}
class UUIDDeserializer extends Deserializer[UUID] {
override def deserialize(topic: String, data: Array[Byte]): UUID = {
val bb = ByteBuffer.wrap(data)
new UUID(bb.getLong, bb.getLong)
}
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = {}
override def close(): Unit = {}
}
开发者ID:choffmeister,项目名称:microservice-utils,代码行数:32,代码来源:UUIDSerializer.scala
示例7: stop
//设置package包名称以及导入依赖的类
package co.blocke
package latekafka
import akka.actor.Actor
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.RunnableGraph
import org.apache.kafka.common.serialization.Deserializer
trait GraphHolder[V] {
val host: String
val groupId: String
val topic: String
val deserializer: Deserializer[V]
val properties: Map[String, String]
val flow: RunnableGraph[akka.NotUsed]
var late: LateKafka[V] = null
def stop() = if (late != null) late.stop()
}
// NOTE: We must create the LateKafka object *inside* its own thread! So pass in all the needed
// parameters (in GraphHolder) and create it once the actor has started and called run().
class FlowActor[V](graph: GraphHolder[V])(implicit materializer: ActorMaterializer) extends Actor {
def run() = graph.flow.run()
override def postStop() {
graph.stop()
}
def receive: Actor.Receive = Actor.ignoringBehavior
run() // just go, by default
}
开发者ID:gzoller,项目名称:LateKafka,代码行数:36,代码来源:FlowActor.scala
示例8: LateKafka
//设置package包名称以及导入依赖的类
package co.blocke
package latekafka
import akka.stream.scaladsl.Source
import org.apache.kafka.common.serialization.Deserializer
import org.apache.kafka.clients.consumer.ConsumerRecord
import scala.concurrent.{ Promise, Await }
import scala.concurrent.duration._
case class LateKafka[V](
host: String,
groupId: String,
topic: String,
deserializer: Deserializer[V],
properties: Map[String, String] = Map.empty[String, String]
) extends Iterator[ConsumerRecord[Array[Byte], V]] {
type REC = ConsumerRecord[Array[Byte], V]
type ITER_REC = Iterator[REC]
private val t = KafkaThread[V](host, groupId, topic, deserializer, properties)
private val h = Heartbeat(t, 100L)
private var i: ITER_REC = null
private var hasMore = true
new java.lang.Thread(t).start
new java.lang.Thread(h).start
Thread.sleep(500)
def done() = hasMore = false
def stop() = {
done()
Thread.sleep(500)
t.stop()
h.stop()
}
def hasNext = hasMore
def next() = {
while (i.isEmpty || !i.hasNext)
fill()
i.next
}
def commit(cr: REC) = t ! cr
def source = {
fill()
Source.fromIterator(() => this)
}
private def fill() = {
val p = Promise[ITER_REC]()
val f = p.future
t ! p
i = Await.result(f, Duration.Inf).asInstanceOf[ITER_REC]
}
}
开发者ID:gzoller,项目名称:LateKafka,代码行数:58,代码来源:LateKafka.scala
示例9: SimpleKafkaConsumer
//设置package包名称以及导入依赖的类
import java.util.Properties
import com.fasterxml.jackson.databind.KeyDeserializer
import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer}
import org.apache.kafka.common.serialization.Deserializer
import net.manub.embeddedkafka.Codecs.stringDeserializer
import net.manub.embeddedkafka.ConsumerExtensions._
class SimpleKafkaConsumer[K,V](consumerProps : Properties,
topic : String,
keyDeserializer: Deserializer[K],
valueDeserializer: Deserializer[V],
function : ConsumerRecords[K, V] => Unit,
poll : Long = 2000) {
private var running = false
private val consumer = new KafkaConsumer[K, V](consumerProps, keyDeserializer, valueDeserializer)
private val thread = new Thread {
import scala.collection.JavaConverters._
override def run: Unit = {
consumer.subscribe(List(topic).asJava)
consumer.partitionsFor(topic)
while (running) {
val record: ConsumerRecords[K, V] = consumer.poll(poll)
function(record)
}
}
}
def start(): Unit = {
if(!running) {
running = true
thread.start()
}
}
def stop(): Unit = {
if(running) {
running = false
thread.join()
consumer.close()
}
}
}
开发者ID:zalando-incubator,项目名称:remora,代码行数:50,代码来源:SimpleKafkaConsumer.scala
示例10: JsonDeserializer
//设置package包名称以及导入依赖的类
package eventsource.kafka
import java.util
import org.apache.kafka.common.errors.SerializationException
import org.apache.kafka.common.serialization.Deserializer
import play.api.libs.json.{JsValue, Json}
class JsonDeserializer extends Deserializer[JsValue] {
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = {}
override def deserialize(topic: String, data: Array[Byte]): JsValue =
try {
Json.parse(data)
} catch {
case e: Throwable =>
throw new SerializationException("Failed to deserialize json data: " + e.getMessage)
}
override def close(): Unit = {}
}
开发者ID:21re,项目名称:play-eventsource,代码行数:23,代码来源:JsonDeserializer.scala
注:本文中的org.apache.kafka.common.serialization.Deserializer类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论