• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala DecoderFactory类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中org.apache.avro.io.DecoderFactory的典型用法代码示例。如果您正苦于以下问题:Scala DecoderFactory类的具体用法?Scala DecoderFactory怎么用?Scala DecoderFactory使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了DecoderFactory类的7个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: Tip

//设置package包名称以及导入依赖的类
package com.alvin.niagara.model

import java.io.ByteArrayOutputStream
import java.util

import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, GenericDatumReader, GenericDatumWriter, GenericRecord}
import org.apache.avro.io.{DecoderFactory, EncoderFactory}

import scala.collection.JavaConversions._
import scala.io.Source


case class Tip(business_id: String, date: String, likes: Long, text: String, `type`: String, user_id: String)


object TipSerde {

  val avroSchema = Source.fromInputStream(getClass.getResourceAsStream("/schema/tip.avsc")).mkString
  val schema = new Schema.Parser().parse(avroSchema)

  val reader = new GenericDatumReader[GenericRecord](schema)
  val writer = new GenericDatumWriter[GenericRecord](schema)

  def serialize(tip: Tip): Array[Byte] = {

    val out = new ByteArrayOutputStream()
    val encoder = EncoderFactory.get.binaryEncoder(out, null)

    val avroRecord = new GenericData.Record(schema)
    avroRecord.put("business_id", tip.business_id)
    avroRecord.put("date", tip.date)
    avroRecord.put("likes", tip.likes)
    avroRecord.put("text", tip.text)
    avroRecord.put("type", tip.`type`)
    avroRecord.put("user_id", tip.user_id)

    writer.write(avroRecord, encoder)
    encoder.flush
    out.close
    out.toByteArray

  }

  def deserialize(bytes: Array[Byte]): Tip = {

    val decoder = DecoderFactory.get.binaryDecoder(bytes, null)
    val record = reader.read(null, decoder)

    Tip(
      record.get("business_id").toString,
      record.get("date").toString,
      record.get("likes").asInstanceOf[Long],
      record.get("text").toString,
      record.get("type").toString,
      record.get("user_id").toString
    )
  }
} 
开发者ID:AlvinCJin,项目名称:Niagara,代码行数:60,代码来源:Tip.scala


示例2: Avro

//设置package包名称以及导入依赖的类
package com.lukecycon.avro

import java.io.ByteArrayOutputStream
import org.apache.avro.io.EncoderFactory
import org.apache.avro.file.BZip2Codec
import java.nio.ByteBuffer
import org.apache.avro.io.DecoderFactory

object Avro {
  def schemaFor[T: AvroFormat] = implicitly[AvroFormat[T]].schema

  def write[T: AvroFormat](thing: T, compress: Boolean = false): Array[Byte] = {
    val out = new ByteArrayOutputStream
    val encoder = EncoderFactory.get.binaryEncoder(out, null)

    implicitly[AvroFormat[T]].writeValue(thing, encoder)

    encoder.flush

    if (compress) {
      new BZip2Codec().compress(ByteBuffer.wrap(out.toByteArray)).array
    } else {
      out.toByteArray
    }
  }

  def writeHex[T: AvroFormat](thing: T): String =
    byteArrayToHexString(write(thing))

  def read[T: AvroFormat](bytes: Array[Byte],
                          compressed: Boolean = false): Either[String, T] = {
    val byts = if (compressed) {
      new BZip2Codec().decompress(ByteBuffer.wrap(bytes)).array
    } else {
      bytes
    }

    val decoder = DecoderFactory.get.binaryDecoder(byts, null)

    implicitly[AvroFormat[T]].decodeValue(Nil, decoder)
  }

  def readHex[T: AvroFormat](hex: String): Either[String, T] =
    read(
        hex
          .replace(" ", "")
          .grouped(2)
          .map(Integer.parseInt(_, 16).toByte)
          .toArray)

  private def byteArrayToHexString(bb: Array[Byte]): String =
    bb.map("%02X" format _).mkString.grouped(2).mkString(" ")
} 
开发者ID:themattchan,项目名称:Skaro,代码行数:54,代码来源:Avro.scala


示例3: AvroUtils

//设置package包名称以及导入依赖的类
package pulse.kafka.avro

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, File}

import com.twitter.util.Future
import org.apache.avro.Schema
import org.apache.avro.file.DataFileWriter
import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord}
import org.apache.avro.io.DecoderFactory
import pulse.kafka.extensions.managedByteArrayInputStream
import pulse.kafka.extensions.managedByteArrayOutputStream
import pulse.kafka.extensions.catsStdInstancesForFuture
import scala.concurrent.ExecutionContext.Implicits._

object AvroUtils {

  import pulse.common.syntax._


  def jsonToAvroBytes(json: String, schemaFile: File): Future[Array[Byte]] =
    use(new ByteArrayOutputStream()) { output =>
      for {
        s <- loadSchema(schemaFile)
        _ <- convertImpl(json, output, s)
      } yield output.toByteArray
    }

  private def convertImpl(json: String, output: ByteArrayOutputStream, schemaSpec: Schema): Future[GenericDatumReader[GenericRecord]] =

    use(new ByteArrayInputStream(json.getBytes)) { input =>
      for {
        w <- getWriter(output, schemaSpec)
        r <- getReader(input, schemaSpec, w)
      } yield r
    }

  def getReader(input: ByteArrayInputStream, schemaSpec: Schema, w: DataFileWriter[GenericRecord]) = Future.value {
    val reader = new GenericDatumReader[GenericRecord](schemaSpec)
    val datum = reader.read(null, getJsonDecoder(input, schemaSpec))
    w.append(datum)
    w.flush()
    reader
  }

  private def getJsonDecoder(input: ByteArrayInputStream, schema: Schema) =
    DecoderFactory.get.jsonDecoder(schema, new DataInputStream(input))

  private def getWriter(output: ByteArrayOutputStream, schemaSpec: Schema) = {
    Future.value {
      val writer = new DataFileWriter[GenericRecord](new GenericDatumWriter[GenericRecord]())
      writer.create(schemaSpec, output)
    }
  }

  private def loadSchema(schemaFile: File): Future[Schema] =
    Future {
      new Schema.Parser().parse(schemaFile)
    }
} 
开发者ID:gpulse,项目名称:kafka,代码行数:60,代码来源:AvroUtils.scala


示例4: AvroUtils

//设置package包名称以及导入依赖的类
package pulse.services.example.avro

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, File}

import org.apache.avro.Schema
import org.apache.avro.file.DataFileWriter
import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord}
import org.apache.avro.io.DecoderFactory
import pulse.services.example.extensions._

object AvroUtils {

  def jsonToAvroBytes(json: String, schemaFile: File) = {
    use(new ByteArrayOutputStream())(output => {
      val schemaSpec = loadSchema(schemaFile)
      use(new ByteArrayInputStream(json.getBytes))(input => {
        val writer = new DataFileWriter[GenericRecord](new GenericDatumWriter[GenericRecord]())
        writer.create(schemaSpec, output)
        val reader = new GenericDatumReader[GenericRecord](schemaSpec)
        val datum = reader.read(null, getJsonDecoder(input, schemaSpec))
        writer.append(datum)
        writer.flush()
      })
      output.toByteArray
    })
  }

  def getJsonDecoder(input: ByteArrayInputStream, schema: Schema) =
    DecoderFactory.get.jsonDecoder(schema, new DataInputStream(input))

  def loadSchema(schemaFile: File) =
      new Schema.Parser().parse(schemaFile)
} 
开发者ID:gpulse,项目名称:services,代码行数:34,代码来源:AvroUtils.scala


示例5: AvroFlumeEventDecoder

//设置package包名称以及导入依赖的类
package contrib.kafka.serializer

import kafka.serializer.Decoder
import kafka.utils.VerifiableProperties

import org.apache.avro.io.BinaryDecoder
import org.apache.avro.io.DecoderFactory
import org.apache.avro.specific.SpecificDatumReader

import org.apache.flume.Event
import org.apache.flume.event.EventBuilder
import org.apache.flume.source.avro.AvroFlumeEvent

import java.io.ByteArrayInputStream

class AvroFlumeEventDecoder(props: VerifiableProperties = null)
  extends Decoder[Event] {

  private val reader: SpecificDatumReader[AvroFlumeEvent] =
    new SpecificDatumReader[AvroFlumeEvent](classOf[AvroFlumeEvent])
  private var decoder: BinaryDecoder = null.asInstanceOf[BinaryDecoder]

  override def fromBytes(bytes: Array[Byte]): Event = {
    val inputStream = new ByteArrayInputStream(bytes)
    decoder = DecoderFactory.get.directBinaryDecoder(inputStream, decoder)
    val avroEvent: AvroFlumeEvent = reader.read(null, decoder)
    EventBuilder.withBody(
      avroEvent.getBody.array,
      toStringJavaMap(avroEvent.getHeaders))
  }

  def toStringJavaMap(
    charSeqMap: JMap[CharSequence, CharSequence]): JMap[String, String] = {

    import scala.collection.JavaConversions._
    for ((k: CharSequence, v: CharSequence) <- charSeqMap)
      yield (k.toString, v.toString)
  }
} 
开发者ID:saikocat,项目名称:spark-sql-kafka-avroflumeevent,代码行数:40,代码来源:AvroFlumeEventDecoder.scala


示例6: mkSerializer

//设置package包名称以及导入依赖的类
package com.cj.serialization

import java.io.ByteArrayOutputStream

import org.apache.avro.Schema
import org.apache.avro.io.{BinaryDecoder, BinaryEncoder, DatumWriter, DecoderFactory, EncoderFactory}
import org.apache.avro.specific.{SpecificDatumReader, SpecificDatumWriter, SpecificRecord}

package object avro {
  type RecordSerializer[-T] = T => Array[Byte]
  type RecordDeserializer[+T] = Array[Byte] => T

  // make a RecordSerializer for class T, passing through intermediate avro-generated class U
  def mkSerializer[T, U <: SpecificRecord](f: (T => U)): RecordSerializer[T] = {
    val avroSerializer = mkAvroSerializer[U]()
    record => avroSerializer(f(record))
  }

  // make a RecordDeserializer for class T, passing through intermediate avro-generated class U
  def mkDeserializer[T, U >: Null <: SpecificRecord](f: U => T, schema: Schema): RecordDeserializer[Option[T]] = {
    val avroDeserializer = mkAvroDeserializer(schema)
    bytes => {
      val avroRec: U = avroDeserializer(bytes)
      if (avroRec == null) None
      else Some(f(avroRec))
    }
  }

  // make a RecordSerializer for avro-generated class T
  def mkAvroSerializer[T <: SpecificRecord](): RecordSerializer[T] = {
    val output = new ByteArrayOutputStream()
    val writer: DatumWriter[T] = new SpecificDatumWriter[T]()
    var encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(output, null)
    record => {
      output.reset()
      encoder = EncoderFactory.get().binaryEncoder(output, encoder)
      writer.setSchema(record.getSchema)
      writer.write(record, encoder)
      encoder.flush()
      output.close()
      output.toByteArray
    }
  }

  // make a RecordDeserializer for avro-generated class T
  def mkAvroDeserializer[T >: Null <: SpecificRecord](schema: Schema): RecordDeserializer[T] = {
    val reader: SpecificDatumReader[T] = new SpecificDatumReader[T](schema)
    var decoder: BinaryDecoder = DecoderFactory.get().binaryDecoder(Array[Byte](), null)
    bytes => {
      decoder = DecoderFactory.get().binaryDecoder(bytes, decoder)
      reader.read(null, decoder)
    }
  }
} 
开发者ID:cjdev,项目名称:avro-serialization,代码行数:55,代码来源:package.scala


示例7: JsonEncodingDecoding

//设置package包名称以及导入依赖的类
package com.giampaolotrapasso.avrosamples.test

import java.io.ByteArrayInputStream

import com.giampaolotrapasso.avrosamples.SchemaRegistry
import com.giampaolotrapasso.avrosamples.events.{Event, MovieChangedV1, MovieChangedV2, MovieChangedV3}
import com.giampaolotrapasso.avrosamples.serializers.{BinarySerializer, DataWithSchemaSerializer, JsonSerializer}
import com.sksamuel.avro4s.{AvroInputStream, FromRecord, RecordFormat, ToRecord}
import org.apache.avro.Schema
import org.apache.avro.file.SeekableByteArrayInput
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
import org.apache.avro.io.DecoderFactory

class JsonEncodingDecoding extends TestSpec {
  val title     = "Raiders of lost ark"
  val year      = 1986
  val director  = "Spielberg"
  val wonOscars = 1

  def deserialize[A <: Event: ToRecord: FromRecord: RecordFormat](oldSchema: Schema,
                                                                  newSchema: Schema,
                                                                  stream: ByteArrayInputStream) = {
    val gdr                   = new GenericDatumReader[GenericRecord](oldSchema, newSchema)
    val binDecoder            = DecoderFactory.get().jsonDecoder(newSchema, stream)
    val record: GenericRecord = gdr.read(null, binDecoder)
    val format                = RecordFormat[A]
    format.from(record)
  }

  "BinaryEncodingDecodingTest" should "deserialize an added field V1(title, year) to V2(title, year, director) " in {
    val obj                = MovieChangedV1(title, year)
    val bytes: Array[Byte] = JsonSerializer.serializeV1(obj)
    //println("*** Json Size" + bytes.length)

    val in = new SeekableByteArrayInput(bytes)

    val result = deserialize[MovieChangedV2](SchemaRegistry.movieChanged(1), SchemaRegistry.movieChanged(2), in)

    result should matchPattern {
      case MovieChangedV2(`title`, `year`, "unknown") ?
    }
  }

} 
开发者ID:giampaolotrapasso,项目名称:avro-samples,代码行数:45,代码来源:JsonEncodingDecoding.scala



注:本文中的org.apache.avro.io.DecoderFactory类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala Credentials类代码示例发布时间:2022-05-23
下一篇:
Scala Universe类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap