• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala GenericDatumReader类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中org.apache.avro.generic.GenericDatumReader的典型用法代码示例。如果您正苦于以下问题:Scala GenericDatumReader类的具体用法?Scala GenericDatumReader怎么用?Scala GenericDatumReader使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了GenericDatumReader类的5个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: Tip

//设置package包名称以及导入依赖的类
package com.alvin.niagara.model

import java.io.ByteArrayOutputStream
import java.util

import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, GenericDatumReader, GenericDatumWriter, GenericRecord}
import org.apache.avro.io.{DecoderFactory, EncoderFactory}

import scala.collection.JavaConversions._
import scala.io.Source


case class Tip(business_id: String, date: String, likes: Long, text: String, `type`: String, user_id: String)


object TipSerde {

  val avroSchema = Source.fromInputStream(getClass.getResourceAsStream("/schema/tip.avsc")).mkString
  val schema = new Schema.Parser().parse(avroSchema)

  val reader = new GenericDatumReader[GenericRecord](schema)
  val writer = new GenericDatumWriter[GenericRecord](schema)

  def serialize(tip: Tip): Array[Byte] = {

    val out = new ByteArrayOutputStream()
    val encoder = EncoderFactory.get.binaryEncoder(out, null)

    val avroRecord = new GenericData.Record(schema)
    avroRecord.put("business_id", tip.business_id)
    avroRecord.put("date", tip.date)
    avroRecord.put("likes", tip.likes)
    avroRecord.put("text", tip.text)
    avroRecord.put("type", tip.`type`)
    avroRecord.put("user_id", tip.user_id)

    writer.write(avroRecord, encoder)
    encoder.flush
    out.close
    out.toByteArray

  }

  def deserialize(bytes: Array[Byte]): Tip = {

    val decoder = DecoderFactory.get.binaryDecoder(bytes, null)
    val record = reader.read(null, decoder)

    Tip(
      record.get("business_id").toString,
      record.get("date").toString,
      record.get("likes").asInstanceOf[Long],
      record.get("text").toString,
      record.get("type").toString,
      record.get("user_id").toString
    )
  }
} 
开发者ID:AlvinCJin,项目名称:Niagara,代码行数:60,代码来源:Tip.scala


示例2: AvroUtils

//设置package包名称以及导入依赖的类
package pulse.kafka.avro

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, File}

import com.twitter.util.Future
import org.apache.avro.Schema
import org.apache.avro.file.DataFileWriter
import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord}
import org.apache.avro.io.DecoderFactory
import pulse.kafka.extensions.managedByteArrayInputStream
import pulse.kafka.extensions.managedByteArrayOutputStream
import pulse.kafka.extensions.catsStdInstancesForFuture
import scala.concurrent.ExecutionContext.Implicits._

object AvroUtils {

  import pulse.common.syntax._


  def jsonToAvroBytes(json: String, schemaFile: File): Future[Array[Byte]] =
    use(new ByteArrayOutputStream()) { output =>
      for {
        s <- loadSchema(schemaFile)
        _ <- convertImpl(json, output, s)
      } yield output.toByteArray
    }

  private def convertImpl(json: String, output: ByteArrayOutputStream, schemaSpec: Schema): Future[GenericDatumReader[GenericRecord]] =

    use(new ByteArrayInputStream(json.getBytes)) { input =>
      for {
        w <- getWriter(output, schemaSpec)
        r <- getReader(input, schemaSpec, w)
      } yield r
    }

  def getReader(input: ByteArrayInputStream, schemaSpec: Schema, w: DataFileWriter[GenericRecord]) = Future.value {
    val reader = new GenericDatumReader[GenericRecord](schemaSpec)
    val datum = reader.read(null, getJsonDecoder(input, schemaSpec))
    w.append(datum)
    w.flush()
    reader
  }

  private def getJsonDecoder(input: ByteArrayInputStream, schema: Schema) =
    DecoderFactory.get.jsonDecoder(schema, new DataInputStream(input))

  private def getWriter(output: ByteArrayOutputStream, schemaSpec: Schema) = {
    Future.value {
      val writer = new DataFileWriter[GenericRecord](new GenericDatumWriter[GenericRecord]())
      writer.create(schemaSpec, output)
    }
  }

  private def loadSchema(schemaFile: File): Future[Schema] =
    Future {
      new Schema.Parser().parse(schemaFile)
    }
} 
开发者ID:gpulse,项目名称:kafka,代码行数:60,代码来源:AvroUtils.scala


示例3: AvroUtils

//设置package包名称以及导入依赖的类
package pulse.services.example.avro

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, File}

import org.apache.avro.Schema
import org.apache.avro.file.DataFileWriter
import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord}
import org.apache.avro.io.DecoderFactory
import pulse.services.example.extensions._

object AvroUtils {

  def jsonToAvroBytes(json: String, schemaFile: File) = {
    use(new ByteArrayOutputStream())(output => {
      val schemaSpec = loadSchema(schemaFile)
      use(new ByteArrayInputStream(json.getBytes))(input => {
        val writer = new DataFileWriter[GenericRecord](new GenericDatumWriter[GenericRecord]())
        writer.create(schemaSpec, output)
        val reader = new GenericDatumReader[GenericRecord](schemaSpec)
        val datum = reader.read(null, getJsonDecoder(input, schemaSpec))
        writer.append(datum)
        writer.flush()
      })
      output.toByteArray
    })
  }

  def getJsonDecoder(input: ByteArrayInputStream, schema: Schema) =
    DecoderFactory.get.jsonDecoder(schema, new DataInputStream(input))

  def loadSchema(schemaFile: File) =
      new Schema.Parser().parse(schemaFile)
} 
开发者ID:gpulse,项目名称:services,代码行数:34,代码来源:AvroUtils.scala


示例4: props

//设置package包名称以及导入依赖的类
package com.kakao.cuesheet.convert

import java.util.Arrays.copyOfRange

import kafka.serializer.Decoder
import kafka.utils.VerifiableProperties
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}


sealed trait AvroDecoder[T] extends Decoder[T] {

  def props: VerifiableProperties

  protected val schema = new Schema.Parser().parse(props.getString(Avro.SCHEMA))
  protected val skipBytes = props.getInt(Avro.SKIP_BYTES, 0)

  protected val reader = new GenericDatumReader[GenericRecord](schema)
  protected val decoder = Avro.recordDecoder(reader)

  private def skip(bytes: Array[Byte], size: Int): Array[Byte] = {
    val length = bytes.length
    length - size match {
      case remaining if remaining > 0 => copyOfRange(bytes, size, length)
      case _ => new Array[Byte](0)
    }
  }

  def parse(bytes: Array[Byte]): GenericRecord = {
    val data = if (skipBytes == 0) bytes else skip(bytes, skipBytes)
    decoder(data)
  }
}

class AvroRecordDecoder(val props: VerifiableProperties) extends AvroDecoder[GenericRecord] {
  override def fromBytes(bytes: Array[Byte]): GenericRecord = parse(bytes)
}

class AvroMapDecoder(val props: VerifiableProperties) extends AvroDecoder[Map[String, Any]] {
  override def fromBytes(bytes: Array[Byte]): Map[String, Any] = Avro.toMap(parse(bytes))
}

class AvroJsonDecoder(val props: VerifiableProperties) extends AvroDecoder[String] {
  override def fromBytes(bytes: Array[Byte]): String = Avro.toJson(parse(bytes))
} 
开发者ID:kakao,项目名称:cuesheet,代码行数:46,代码来源:AvroDecoder.scala


示例5: JsonEncodingDecoding

//设置package包名称以及导入依赖的类
package com.giampaolotrapasso.avrosamples.test

import java.io.ByteArrayInputStream

import com.giampaolotrapasso.avrosamples.SchemaRegistry
import com.giampaolotrapasso.avrosamples.events.{Event, MovieChangedV1, MovieChangedV2, MovieChangedV3}
import com.giampaolotrapasso.avrosamples.serializers.{BinarySerializer, DataWithSchemaSerializer, JsonSerializer}
import com.sksamuel.avro4s.{AvroInputStream, FromRecord, RecordFormat, ToRecord}
import org.apache.avro.Schema
import org.apache.avro.file.SeekableByteArrayInput
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
import org.apache.avro.io.DecoderFactory

class JsonEncodingDecoding extends TestSpec {
  val title     = "Raiders of lost ark"
  val year      = 1986
  val director  = "Spielberg"
  val wonOscars = 1

  def deserialize[A <: Event: ToRecord: FromRecord: RecordFormat](oldSchema: Schema,
                                                                  newSchema: Schema,
                                                                  stream: ByteArrayInputStream) = {
    val gdr                   = new GenericDatumReader[GenericRecord](oldSchema, newSchema)
    val binDecoder            = DecoderFactory.get().jsonDecoder(newSchema, stream)
    val record: GenericRecord = gdr.read(null, binDecoder)
    val format                = RecordFormat[A]
    format.from(record)
  }

  "BinaryEncodingDecodingTest" should "deserialize an added field V1(title, year) to V2(title, year, director) " in {
    val obj                = MovieChangedV1(title, year)
    val bytes: Array[Byte] = JsonSerializer.serializeV1(obj)
    //println("*** Json Size" + bytes.length)

    val in = new SeekableByteArrayInput(bytes)

    val result = deserialize[MovieChangedV2](SchemaRegistry.movieChanged(1), SchemaRegistry.movieChanged(2), in)

    result should matchPattern {
      case MovieChangedV2(`title`, `year`, "unknown") ?
    }
  }

} 
开发者ID:giampaolotrapasso,项目名称:avro-samples,代码行数:45,代码来源:JsonEncodingDecoding.scala



注:本文中的org.apache.avro.generic.GenericDatumReader类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala Switch类代码示例发布时间:2022-05-23
下一篇:
Scala OAuth1Info类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap