• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala Deserializer类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中org.apache.kafka.common.serialization.Deserializer的典型用法代码示例。如果您正苦于以下问题:Scala Deserializer类的具体用法?Scala Deserializer怎么用?Scala Deserializer使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Deserializer类的10个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: configure

//设置package包名称以及导入依赖的类
package edu.uw.at.iroberts.wirefugue.kafka.serdes

import java.util

import org.apache.kafka.common.serialization.{Deserializer, Serde, Serializer}


trait StatelessSerde[T] { self: Serde[T] =>
  override def configure(configs: util.Map[String, _], isKey: Boolean) = ()
  override def close() = ()
}

trait StatelessSerializer[T] { self: Serializer[T] =>
  override def configure(configs: util.Map[String, _], isKey: Boolean) = ()
  override def close() = ()
}

trait StatelessDeserializer[T] { self: Deserializer[T] =>
  override def configure(configs: util.Map[String, _], isKey: Boolean) = ()
  override def close() = ()
} 
开发者ID:robertson-tech,项目名称:wirefugue,代码行数:22,代码来源:Stateless.scala


示例2: EventDeserialiser

//设置package包名称以及导入依赖的类
package serialisation

import java.util

import model.Event
import org.apache.kafka.common.serialization.{Deserializer, StringDeserializer}
import org.json4s.native.Serialization
import org.json4s.native.Serialization.read
import org.json4s.{Formats, NoTypeHints}

class EventDeserialiser extends Deserializer[Event] {

  implicit val formats: Formats = Serialization.formats(NoTypeHints)

  val stringDeserialiser = new StringDeserializer

  override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = {
    stringDeserialiser.configure(configs, isKey)
  }

  override def deserialize(topic: String, data: Array[Byte]): Event = {
    val stringValue = stringDeserialiser.deserialize(topic, data)
    read[Event](stringValue)
  }

  override def close(): Unit = {
    stringDeserialiser.close()
  }
} 
开发者ID:benwheeler,项目名称:kafka-streams-poc,代码行数:30,代码来源:EventDeserialiser.scala


示例3: SeqDeserialiser

//设置package包名称以及导入依赖的类
package serialisation

import java.io.{ByteArrayInputStream, ObjectInputStream}
import java.util

import org.apache.kafka.common.serialization.Deserializer

import scala.collection.mutable

class SeqDeserialiser[ELEMENT] extends Deserializer[Seq[ELEMENT]] {

  override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = {
  }

  override def deserialize(topic: String, data: Array[Byte]): Seq[ELEMENT] = {
    val byteStream = new ByteArrayInputStream(data)
    val objectStream = new ObjectInputStream(byteStream)
    val result = mutable.Seq[ELEMENT]()
    while (objectStream.available() > 0) {
      result :+ objectStream.readObject().asInstanceOf[ELEMENT]
    }
    objectStream.close()
    result
  }

  override def close(): Unit = {
  }
} 
开发者ID:benwheeler,项目名称:kafka-streams-poc,代码行数:29,代码来源:SeqDeserialiser.scala


示例4: UUIDDeserializer

//设置package包名称以及导入依赖的类
package eventsource.kafka

import java.util
import java.util.UUID

import org.apache.kafka.common.errors.SerializationException
import org.apache.kafka.common.serialization.Deserializer

class UUIDDeserializer extends Deserializer[UUID] {
  override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = {}

  override def deserialize(topic: String, data: Array[Byte]): UUID =
    try {
      UUID.fromString(new String(data, "UTF-8"))
    } catch {
      case e: Throwable =>
        throw new SerializationException("Failed to deserialize uuid data: " + e.getMessage)
    }

  override def close(): Unit = {}
} 
开发者ID:21re,项目名称:play-eventsource,代码行数:22,代码来源:UUIDDeserializer.scala


示例5: produceRecord

//设置package包名称以及导入依赖的类
package de.choffmeister.microserviceutils.kafka.testkit

import java.util.UUID

import akka.Done
import akka.actor.ActorSystem
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.stream.Materializer
import akka.stream.scaladsl.{Keep, Sink, Source}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{Deserializer, Serializer}

import scala.concurrent.Future

trait KafkaTest {
  def produceRecord[K, V](topic: String, keySerializer: Serializer[K], valueSerializer: Serializer[V], key: K, value: V)(implicit system: ActorSystem, mat: Materializer): Future[Done] = {
    val producerSettings = ProducerSettings(system, keySerializer, valueSerializer)
      .withBootstrapServers(system.settings.config.getString("kafka.bootstrap-servers"))

    Source.single(new ProducerRecord("mail.command.send", key, value))
      .toMat(Producer.plainSink(producerSettings))(Keep.right)
      .run()
  }

  def consumeRecordPF[K, V, Out](topic: String, keyDeserializer: Deserializer[K], valueDeserializer: Deserializer[V])(pf: PartialFunction[ConsumerRecord[K, V], Out])(implicit system: ActorSystem, mat: Materializer): Future[Out] = {
    val consumerSettings = ConsumerSettings(system, keyDeserializer, valueDeserializer)
      .withBootstrapServers(system.settings.config.getString("kafka.bootstrap-servers"))
      .withGroupId(UUID.randomUUID.toString)
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

    Consumer.plainSource(consumerSettings, Subscriptions.topics(topic))
      .filter(pf.isDefinedAt)
      .map(pf)
      .toMat(Sink.head)(Keep.right)
      .run()
  }
} 
开发者ID:choffmeister,项目名称:microservice-utils,代码行数:40,代码来源:KafkaTest.scala


示例6: UUIDSerializer

//设置package包名称以及导入依赖的类
package de.choffmeister.microserviceutils.kafka

import java.nio.ByteBuffer
import java.util
import java.util.UUID

import org.apache.kafka.common.serialization.{Deserializer, Serializer}

class UUIDSerializer extends Serializer[UUID] {
  override def serialize(topic: String, data: UUID): Array[Byte] = {
    val bb = ByteBuffer.wrap(new Array[Byte](16))
    bb.putLong(data.getMostSignificantBits)
    bb.putLong(data.getLeastSignificantBits)
    bb.array
  }

  override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = {}

  override def close(): Unit = {}
}

class UUIDDeserializer extends Deserializer[UUID] {
  override def deserialize(topic: String, data: Array[Byte]): UUID = {
    val bb = ByteBuffer.wrap(data)
    new UUID(bb.getLong, bb.getLong)
  }

  override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = {}

  override def close(): Unit = {}
} 
开发者ID:choffmeister,项目名称:microservice-utils,代码行数:32,代码来源:UUIDSerializer.scala


示例7: stop

//设置package包名称以及导入依赖的类
package co.blocke
package latekafka

import akka.actor.Actor
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.RunnableGraph
import org.apache.kafka.common.serialization.Deserializer

trait GraphHolder[V] {
  val host: String
  val groupId: String
  val topic: String
  val deserializer: Deserializer[V]
  val properties: Map[String, String]
  val flow: RunnableGraph[akka.NotUsed]

  var late: LateKafka[V] = null

  def stop() = if (late != null) late.stop()
}

// NOTE: We must create the LateKafka object *inside* its own thread!  So pass in all the needed
// parameters (in GraphHolder) and create it once the actor has started and called run().
class FlowActor[V](graph: GraphHolder[V])(implicit materializer: ActorMaterializer) extends Actor {

  def run() = graph.flow.run()

  override def postStop() {
    graph.stop()
  }

  def receive: Actor.Receive = Actor.ignoringBehavior

  run() // just go, by default
} 
开发者ID:gzoller,项目名称:LateKafka,代码行数:36,代码来源:FlowActor.scala


示例8: LateKafka

//设置package包名称以及导入依赖的类
package co.blocke
package latekafka

import akka.stream.scaladsl.Source
import org.apache.kafka.common.serialization.Deserializer
import org.apache.kafka.clients.consumer.ConsumerRecord
import scala.concurrent.{ Promise, Await }
import scala.concurrent.duration._

case class LateKafka[V](
    host:         String,
    groupId:      String,
    topic:        String,
    deserializer: Deserializer[V],
    properties:   Map[String, String] = Map.empty[String, String]
) extends Iterator[ConsumerRecord[Array[Byte], V]] {

  type REC = ConsumerRecord[Array[Byte], V]
  type ITER_REC = Iterator[REC]

  private val t = KafkaThread[V](host, groupId, topic, deserializer, properties)
  private val h = Heartbeat(t, 100L)
  private var i: ITER_REC = null
  private var hasMore = true

  new java.lang.Thread(t).start
  new java.lang.Thread(h).start
  Thread.sleep(500)

  def done() = hasMore = false
  def stop() = {
    done()
    Thread.sleep(500)
    t.stop()
    h.stop()
  }

  def hasNext = hasMore
  def next() = {
    while (i.isEmpty || !i.hasNext)
      fill()
    i.next
  }

  def commit(cr: REC) = t ! cr
  def source = {
    fill()
    Source.fromIterator(() => this)
  }

  private def fill() = {
    val p = Promise[ITER_REC]()
    val f = p.future
    t ! p
    i = Await.result(f, Duration.Inf).asInstanceOf[ITER_REC]
  }
} 
开发者ID:gzoller,项目名称:LateKafka,代码行数:58,代码来源:LateKafka.scala


示例9: SimpleKafkaConsumer

//设置package包名称以及导入依赖的类
import java.util.Properties

import com.fasterxml.jackson.databind.KeyDeserializer
import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer}
import org.apache.kafka.common.serialization.Deserializer
import net.manub.embeddedkafka.Codecs.stringDeserializer
import net.manub.embeddedkafka.ConsumerExtensions._

class SimpleKafkaConsumer[K,V](consumerProps : Properties,
                               topic : String,
                               keyDeserializer: Deserializer[K],
                               valueDeserializer: Deserializer[V],
                               function : ConsumerRecords[K, V] => Unit,
                               poll : Long = 2000) {

  private var running = false

  private val consumer = new KafkaConsumer[K, V](consumerProps, keyDeserializer, valueDeserializer)


  private val thread = new Thread {
    import scala.collection.JavaConverters._

    override def run: Unit = {
      consumer.subscribe(List(topic).asJava)
      consumer.partitionsFor(topic)

      while (running) {
        val record: ConsumerRecords[K, V] = consumer.poll(poll)
        function(record)
      }
    }
  }

  def start(): Unit = {
    if(!running) {
      running = true
      thread.start()
    }
  }

  def stop(): Unit = {
    if(running) {
      running = false
      thread.join()
      consumer.close()
    }
  }
} 
开发者ID:zalando-incubator,项目名称:remora,代码行数:50,代码来源:SimpleKafkaConsumer.scala


示例10: JsonDeserializer

//设置package包名称以及导入依赖的类
package eventsource.kafka

import java.util

import org.apache.kafka.common.errors.SerializationException
import org.apache.kafka.common.serialization.Deserializer
import play.api.libs.json.{JsValue, Json}


class JsonDeserializer extends Deserializer[JsValue] {
  override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = {}

  override def deserialize(topic: String, data: Array[Byte]): JsValue =
    try {
      Json.parse(data)
    } catch {
      case e: Throwable =>
        throw new SerializationException("Failed to deserialize json data: " + e.getMessage)
    }

  override def close(): Unit = {}
} 
开发者ID:21re,项目名称:play-eventsource,代码行数:23,代码来源:JsonDeserializer.scala



注:本文中的org.apache.kafka.common.serialization.Deserializer类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala PrimaryStage类代码示例发布时间:2022-05-23
下一篇:
Scala SimpleRouter类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap