• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala GenericRecord类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中org.apache.avro.generic.GenericRecord的典型用法代码示例。如果您正苦于以下问题:Scala GenericRecord类的具体用法?Scala GenericRecord怎么用?Scala GenericRecord使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了GenericRecord类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: Tip

//设置package包名称以及导入依赖的类
package com.alvin.niagara.model

import java.io.ByteArrayOutputStream
import java.util

import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, GenericDatumReader, GenericDatumWriter, GenericRecord}
import org.apache.avro.io.{DecoderFactory, EncoderFactory}

import scala.collection.JavaConversions._
import scala.io.Source


case class Tip(business_id: String, date: String, likes: Long, text: String, `type`: String, user_id: String)


object TipSerde {

  val avroSchema = Source.fromInputStream(getClass.getResourceAsStream("/schema/tip.avsc")).mkString
  val schema = new Schema.Parser().parse(avroSchema)

  val reader = new GenericDatumReader[GenericRecord](schema)
  val writer = new GenericDatumWriter[GenericRecord](schema)

  def serialize(tip: Tip): Array[Byte] = {

    val out = new ByteArrayOutputStream()
    val encoder = EncoderFactory.get.binaryEncoder(out, null)

    val avroRecord = new GenericData.Record(schema)
    avroRecord.put("business_id", tip.business_id)
    avroRecord.put("date", tip.date)
    avroRecord.put("likes", tip.likes)
    avroRecord.put("text", tip.text)
    avroRecord.put("type", tip.`type`)
    avroRecord.put("user_id", tip.user_id)

    writer.write(avroRecord, encoder)
    encoder.flush
    out.close
    out.toByteArray

  }

  def deserialize(bytes: Array[Byte]): Tip = {

    val decoder = DecoderFactory.get.binaryDecoder(bytes, null)
    val record = reader.read(null, decoder)

    Tip(
      record.get("business_id").toString,
      record.get("date").toString,
      record.get("likes").asInstanceOf[Long],
      record.get("text").toString,
      record.get("type").toString,
      record.get("user_id").toString
    )
  }
} 
开发者ID:AlvinCJin,项目名称:Niagara,代码行数:60,代码来源:Tip.scala


示例2: AvroNodeSerializer

//设置package包名称以及导入依赖的类
package eventgen.launcher.core.avro

import java.io.ByteArrayOutputStream

import eventgen.launcher.core.NodeSerializer
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumWriter, GenericRecord}
import org.apache.avro.io.EncoderFactory


class AvroNodeSerializer extends NodeSerializer[Schema, AvroNode[_], ByteArrayOutputStream] {
  override def serialize(metadata: Schema, node: AvroNode[_]): ByteArrayOutputStream = {
    val record = node.asInstanceOf[AvroRecord]
    val writer = new GenericDatumWriter[GenericRecord]
    writer.setSchema(metadata)
    val outputStream = new ByteArrayOutputStream
    val encoder = EncoderFactory.get().jsonEncoder(metadata, outputStream, true)
    writer.write(record.value, encoder)
    encoder.flush()
    outputStream
  }
} 
开发者ID:gpulse,项目名称:eventgenerator,代码行数:23,代码来源:AvroNodeSerializer.scala


示例3: AvroParquetWriterFn

//设置package包名称以及导入依赖的类
package io.eels.component.parquet.avro

import com.sksamuel.exts.Logging
import io.eels.component.parquet.ParquetWriterConfig
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.{ParquetFileWriter, ParquetWriter}


object AvroParquetWriterFn extends Logging {
  def apply(path: Path, avroSchema: Schema): ParquetWriter[GenericRecord] = {
    val config = ParquetWriterConfig()
    AvroParquetWriter.builder[GenericRecord](path)
      .withSchema(avroSchema)
      .withCompressionCodec(config.compressionCodec)
      .withPageSize(config.pageSize)
      .withRowGroupSize(config.blockSize)
      .withDictionaryEncoding(config.enableDictionary)
      .withWriteMode(ParquetFileWriter.Mode.CREATE)
      .withValidation(config.validating)
      .build()
  }
} 
开发者ID:51zero,项目名称:eel-sdk,代码行数:26,代码来源:AvroParquetWriterFn.scala


示例4: JsonToParquetPipelineFactory

//设置package包名称以及导入依赖的类
package yamrcraft.etlite.pipeline

import org.apache.avro.generic.GenericRecord
import yamrcraft.etlite.PipelineSettings
import yamrcraft.etlite.transformers.{JsonToAvroTransformer, Message}
import yamrcraft.etlite.writers.{AvroToParquetWriter, TimePartitioningWriter}

class JsonToParquetPipelineFactory extends PipelineFactory[Message[GenericRecord]] {

  def createPipeline(settings: PipelineSettings, jobId: Long, partitionId: Int): Pipeline[Message[GenericRecord]] =
    new Pipeline(
      new JsonToAvroTransformer(settings.transformerConfig),
      new TimePartitioningWriter(
        settings.writerConfig,
        jobId,
        partitionId,
        (tempFile, outputFile) => new AvroToParquetWriter(tempFile, outputFile))
    )

} 
开发者ID:yamrcraft,项目名称:etl-light,代码行数:21,代码来源:JsonToParquetPipelineFactory.scala


示例5: AvroConverter

//设置package包名称以及导入依赖的类
import org.apache.avro.generic.GenericRecord

// Avro -> Case class conversion
object AvroConverter {
  def shipment(record: GenericRecord) = {
    Schemas.Shipments_v1(
      record.get("itemID").asInstanceOf[Long],
      record.get("storeCode").toString,
      record.get("count").asInstanceOf[Int])
  }

  def sale(record: GenericRecord) = {
    Schemas.Sales_v2(
      record.get("itemID").asInstanceOf[Long],
      record.get("storeCode").toString,
      record.get("count").asInstanceOf[Int],
      record.get("customerID").toString)
  }

  def getShipment(message: (Object, Object)) = {
    val (k, v) = message
    //val name = k.asInstanceOf[IndexedRecord].getSchema.getName
    //if (name == "Shipments_v1")
    val value = v.asInstanceOf[GenericRecord]
    shipment(value)
  }

  def getSale(message: (Object, Object)) = {
    val (k, v) = message
    val value = v.asInstanceOf[GenericRecord]
    sale(value)
  }

} 
开发者ID:Antwnis,项目名称:kafka-streaming-examples,代码行数:35,代码来源:AvroConverter.scala


示例6: AvroUtils

//设置package包名称以及导入依赖的类
package pulse.kafka.avro

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, File}

import com.twitter.util.Future
import org.apache.avro.Schema
import org.apache.avro.file.DataFileWriter
import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord}
import org.apache.avro.io.DecoderFactory
import pulse.kafka.extensions.managedByteArrayInputStream
import pulse.kafka.extensions.managedByteArrayOutputStream
import pulse.kafka.extensions.catsStdInstancesForFuture
import scala.concurrent.ExecutionContext.Implicits._

object AvroUtils {

  import pulse.common.syntax._


  def jsonToAvroBytes(json: String, schemaFile: File): Future[Array[Byte]] =
    use(new ByteArrayOutputStream()) { output =>
      for {
        s <- loadSchema(schemaFile)
        _ <- convertImpl(json, output, s)
      } yield output.toByteArray
    }

  private def convertImpl(json: String, output: ByteArrayOutputStream, schemaSpec: Schema): Future[GenericDatumReader[GenericRecord]] =

    use(new ByteArrayInputStream(json.getBytes)) { input =>
      for {
        w <- getWriter(output, schemaSpec)
        r <- getReader(input, schemaSpec, w)
      } yield r
    }

  def getReader(input: ByteArrayInputStream, schemaSpec: Schema, w: DataFileWriter[GenericRecord]) = Future.value {
    val reader = new GenericDatumReader[GenericRecord](schemaSpec)
    val datum = reader.read(null, getJsonDecoder(input, schemaSpec))
    w.append(datum)
    w.flush()
    reader
  }

  private def getJsonDecoder(input: ByteArrayInputStream, schema: Schema) =
    DecoderFactory.get.jsonDecoder(schema, new DataInputStream(input))

  private def getWriter(output: ByteArrayOutputStream, schemaSpec: Schema) = {
    Future.value {
      val writer = new DataFileWriter[GenericRecord](new GenericDatumWriter[GenericRecord]())
      writer.create(schemaSpec, output)
    }
  }

  private def loadSchema(schemaFile: File): Future[Schema] =
    Future {
      new Schema.Parser().parse(schemaFile)
    }
} 
开发者ID:gpulse,项目名称:kafka,代码行数:60,代码来源:AvroUtils.scala


示例7: KProducer

//设置package包名称以及导入依赖的类
package org.parsec

import java.util.Properties
import java.util.concurrent.Future

import com.sksamuel.avro4s.{FromRecord, RecordFormat, ToRecord}
import io.confluent.kafka.serializers.KafkaAvroSerializer
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}

// Convenient Kafka producer using avro4s
class KProducer[K <: Product, V <: Product] {

  val kafkaProps = new Properties()
  kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
  kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[KafkaAvroSerializer].getCanonicalName)
  kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[KafkaAvroSerializer].getCanonicalName)
  kafkaProps.put("schema.registry.url", "http://localhost:8081")
  private lazy val producer  = new KafkaProducer[GenericRecord, GenericRecord](kafkaProps)

  def produce(topic: String, key: K, value: V, partition: Int = 0)(implicit toRecordKey: ToRecord[K], fromRecordKey: FromRecord[K], toRecord: ToRecord[V], fromRecord: FromRecord[V]): Future[RecordMetadata] = {
    val keyRec = RecordFormat[K].to(key)
    val valueRec = RecordFormat[V].to(value)
    val data: ProducerRecord[GenericRecord, GenericRecord] = new ProducerRecord(topic, partition, keyRec, valueRec)
    producer.send(data)
  }

} 
开发者ID:cryptocurrencyindia,项目名称:Parsec,代码行数:29,代码来源:KProducer.scala


示例8: KafkaSparkStreamingRegistrator

//设置package包名称以及导入依赖的类
package com.miguno.kafkastorm.spark.serialization

import com.esotericsoftware.kryo.Kryo
import com.miguno.avro.Tweet
import com.twitter.chill.avro.AvroSerializer
import org.apache.avro.generic.GenericRecord
import org.apache.spark.serializer.KryoRegistrator


class KafkaSparkStreamingRegistrator extends KryoRegistrator {

  override def registerClasses(kryo: Kryo) {
    // Registers a serializer for any generic Avro records.  The kafka-storm-starter project does not yet include
    // examples that work on generic Avro records, but we keep this registration for the convenience of our readers.
    kryo.register(classOf[GenericRecord], AvroSerializer.GenericRecordSerializer[GenericRecord]())
    // Registers a serializer specifically for the, well, specific Avro record `Tweet`
    kryo.register(classOf[Tweet], AvroSerializer.SpecificRecordSerializer[Tweet])
    ()
  }

} 
开发者ID:JohnReedLOL,项目名称:KafkaStormSpark,代码行数:22,代码来源:KafkaSparkStreamingRegistrator.scala


示例9: AvroType

//设置package包名称以及导入依赖的类
package shapeless.datatype.avro

import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import shapeless._

import scala.reflect.runtime.universe._

class AvroType[A] extends Serializable {
  def fromGenericRecord[L <: HList](m: GenericRecord)
                                   (implicit gen: LabelledGeneric.Aux[A, L], fromL: FromAvroRecord[L])
  : Option[A] = fromL(Right(m)).map(gen.from)
  def toGenericRecord[L <: HList](a: A)
                                 (implicit gen: LabelledGeneric.Aux[A, L], toL: ToAvroRecord[L], tt: TypeTag[A])
  : GenericRecord = toL(gen.to(a)).left.get.build(AvroSchema[A])
}

object AvroType {
  def apply[A: TypeTag]: AvroType[A] = new AvroType[A]

  def at[V: TypeTag](schemaType: Schema.Type)(fromFn: Any => V, toFn: V => Any): BaseAvroMappableType[V] = {
    AvroSchema.register(implicitly[TypeTag[V]].tpe, schemaType)
    new BaseAvroMappableType[V] {
      override def from(value: Any): V = fromFn(value)
      override def to(value: V): Any = toFn(value)
    }
  }
} 
开发者ID:nevillelyh,项目名称:shapeless-datatype,代码行数:29,代码来源:AvroType.scala


示例10: AvroUtils

//设置package包名称以及导入依赖的类
package pulse.services.example.avro

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, File}

import org.apache.avro.Schema
import org.apache.avro.file.DataFileWriter
import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord}
import org.apache.avro.io.DecoderFactory
import pulse.services.example.extensions._

object AvroUtils {

  def jsonToAvroBytes(json: String, schemaFile: File) = {
    use(new ByteArrayOutputStream())(output => {
      val schemaSpec = loadSchema(schemaFile)
      use(new ByteArrayInputStream(json.getBytes))(input => {
        val writer = new DataFileWriter[GenericRecord](new GenericDatumWriter[GenericRecord]())
        writer.create(schemaSpec, output)
        val reader = new GenericDatumReader[GenericRecord](schemaSpec)
        val datum = reader.read(null, getJsonDecoder(input, schemaSpec))
        writer.append(datum)
        writer.flush()
      })
      output.toByteArray
    })
  }

  def getJsonDecoder(input: ByteArrayInputStream, schema: Schema) =
    DecoderFactory.get.jsonDecoder(schema, new DataInputStream(input))

  def loadSchema(schemaFile: File) =
      new Schema.Parser().parse(schemaFile)
} 
开发者ID:gpulse,项目名称:services,代码行数:34,代码来源:AvroUtils.scala


示例11: KafkaSparkStreamingRegistrator

//设置package包名称以及导入依赖的类
package com.miguno.kafkastorm.spark.serialization

import com.esotericsoftware.kryo.Kryo
import com.miguno.avro.Tweet
import com.twitter.chill.avro.AvroSerializer
import org.apache.avro.generic.GenericRecord
import org.apache.spark.serializer.KryoRegistrator
import scala.trace.{Pos, implicitlyFormatable}


class KafkaSparkStreamingRegistrator extends KryoRegistrator {

  override def registerClasses(kryo: Kryo) {
    // Registers a serializer for any generic Avro records.  The kafka-storm-starter project does not yet include
    // examples that work on generic Avro records, but we keep this registration for the convenience of our readers.
    kryo.register(classOf[GenericRecord], AvroSerializer.GenericRecordSerializer[GenericRecord]())
    // Registers a serializer specifically for the, well, specific Avro record `Tweet`
    kryo.register(classOf[Tweet], AvroSerializer.SpecificRecordSerializer[Tweet])
    ()
  }

} 
开发者ID:JohnReedLOL,项目名称:full-stack-big-data,代码行数:23,代码来源:KafkaSparkStreamingRegistrator.scala


示例12: AvroFileWriter

//设置package包名称以及导入依赖的类
package com.landoop.avro

import java.io.{BufferedOutputStream, File, FileOutputStream}

import com.landoop.avro.codec.CodecFactory
import org.apache.avro.Schema
import org.apache.avro.file.DataFileWriter
import org.apache.avro.generic.GenericRecord

object AvroFileWriter {
  def fastWrite(file: File,
                count: Int,
                parallelization: Int,
                schema: Schema,
                records: IndexedSeq[GenericRecord]) = {
    val out = new BufferedOutputStream(new FileOutputStream(file), 4 * 1048576)

    import org.apache.avro.generic.GenericDatumWriter
    val datumWriter = new GenericDatumWriter[GenericRecord](schema)
    val builder = FastDataFileWriterBuilder(datumWriter, out, schema)
      .withCodec(CodecFactory.snappyCodec())
      .withFlushOnEveryBlock(false)
      .withParallelization(parallelization)

    builder.encoderFactory.configureBufferSize(4 * 1048576)
    builder.encoderFactory.configureBlockSize(4 * 1048576)

    val fileWriter = builder.build()
    fileWriter.write(records)
    fileWriter.close()
  }

  def write(file: File,
            count: Int,
            schema: Schema,
            records: Seq[GenericRecord]) = {
    val out = new BufferedOutputStream(new FileOutputStream(file), 4 * 1048576)
    
    import org.apache.avro.generic.GenericDatumWriter
    val datumWriter = new GenericDatumWriter[GenericRecord](schema)
    val writer = new DataFileWriter(datumWriter)
      .setCodec(org.apache.avro.file.CodecFactory.snappyCodec())
      .create(schema, out)

    writer.setFlushOnEveryBlock(false)

    records.foreach(writer.append)
    writer.close()
  }
} 
开发者ID:Landoop,项目名称:fast-avro-write,代码行数:51,代码来源:AvroFileWriter.scala


示例13: props

//设置package包名称以及导入依赖的类
package com.kakao.cuesheet.convert

import java.util.Arrays.copyOfRange

import kafka.serializer.Decoder
import kafka.utils.VerifiableProperties
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}


sealed trait AvroDecoder[T] extends Decoder[T] {

  def props: VerifiableProperties

  protected val schema = new Schema.Parser().parse(props.getString(Avro.SCHEMA))
  protected val skipBytes = props.getInt(Avro.SKIP_BYTES, 0)

  protected val reader = new GenericDatumReader[GenericRecord](schema)
  protected val decoder = Avro.recordDecoder(reader)

  private def skip(bytes: Array[Byte], size: Int): Array[Byte] = {
    val length = bytes.length
    length - size match {
      case remaining if remaining > 0 => copyOfRange(bytes, size, length)
      case _ => new Array[Byte](0)
    }
  }

  def parse(bytes: Array[Byte]): GenericRecord = {
    val data = if (skipBytes == 0) bytes else skip(bytes, skipBytes)
    decoder(data)
  }
}

class AvroRecordDecoder(val props: VerifiableProperties) extends AvroDecoder[GenericRecord] {
  override def fromBytes(bytes: Array[Byte]): GenericRecord = parse(bytes)
}

class AvroMapDecoder(val props: VerifiableProperties) extends AvroDecoder[Map[String, Any]] {
  override def fromBytes(bytes: Array[Byte]): Map[String, Any] = Avro.toMap(parse(bytes))
}

class AvroJsonDecoder(val props: VerifiableProperties) extends AvroDecoder[String] {
  override def fromBytes(bytes: Array[Byte]): String = Avro.toJson(parse(bytes))
} 
开发者ID:kakao,项目名称:cuesheet,代码行数:46,代码来源:AvroDecoder.scala


示例14: AvroUtils

//设置package包名称以及导入依赖的类
package com.datawizards.sparklocal.impl.scala.dataset.io

import org.apache.avro.generic.GenericRecord
import org.apache.avro.{Schema, SchemaBuilder}

import scala.collection.JavaConversions._

object AvroUtils {

  def mapSchema(originalSchema: Schema, fieldNameMapping: Map[String, String]): Schema = {
    var customSchemaBuilder = SchemaBuilder
      .builder(originalSchema.getNamespace)
      .record(originalSchema.getName)
      .fields()

    originalSchema.getFields.toList.foreach { f =>
      customSchemaBuilder = customSchemaBuilder
        .name(fieldNameMapping(f.name()))
        .`type`(f.schema())
        .noDefault()
    }

    customSchemaBuilder.endRecord()
  }

  def mapGenericRecordFromOriginalToTarget(record: GenericRecord, mappedSchema: Schema, fieldNameMapping: Map[String, String]): GenericRecord = {
    val customRecord = new org.apache.avro.generic.GenericData.Record(mappedSchema)
    record.getSchema.getFields.foreach{ f =>
      customRecord.put(fieldNameMapping(f.name()), record.get(f.name()))
    }
    customRecord
  }

  def mapGenericRecordFromTargetToOriginal(record: GenericRecord, schema: Schema, fieldNameMapping: Map[String, String]): GenericRecord = {
    val customRecord = new org.apache.avro.generic.GenericData.Record(schema)
    customRecord.getSchema.getFields.foreach{ f =>
      customRecord.put(f.name(), record.get(fieldNameMapping(f.name())))
    }
    customRecord
  }
} 
开发者ID:piotr-kalanski,项目名称:spark-local,代码行数:42,代码来源:AvroUtils.scala


示例15: AvroDeserializer

//设置package包名称以及导入依赖的类
package io.eels.component.avro

import com.typesafe.config.ConfigFactory
import io.eels.Row
import io.eels.schema.StructType
import org.apache.avro.Schema.Field
import org.apache.avro.generic.GenericRecord
import org.apache.avro.util.Utf8

import scala.collection.JavaConverters._


class AvroDeserializer(useJavaString: Boolean = ConfigFactory.load().getBoolean("eel.avro.java.string")) {

  val config = ConfigFactory.load()
  val deserializeAsNullable = config.getBoolean("eel.avro.deserializeAsNullable")
  var schema: StructType = null
  var fields: Array[Field] = null
  var range: Range = null

  def toScala(value: Any): Any = {
    value match {
      case record: GenericRecord => toValues(record)
      case utf8: Utf8 if useJavaString => value.asInstanceOf[Utf8].toString
      case col: java.util.Collection[Any] => col.asScala.toVector.map(toScala)
      case map: java.util.Map[_, _] => map.asScala.toMap.map { case (k, v) => toScala(k) -> toScala(v) }
      case other => other
    }
  }

  def toValues(record: GenericRecord): Vector[Any] = {
    val vector = Vector.newBuilder[Any]
    for (k <- 0 until record.getSchema.getFields.size) {
      val value = record.get(k)
      vector += toScala(value)
    }
    vector.result
  }

  def toRow(record: GenericRecord): Row = {
    // take the schema from the first record
    if (schema == null) {
      schema = AvroSchemaFns.fromAvroSchema(record.getSchema, deserializeAsNullable)
    }
    Row(schema, toValues(record))
  }
} 
开发者ID:51zero,项目名称:eel-sdk,代码行数:48,代码来源:AvroDeserializer.scala


示例16: AvroWriter

//设置package包名称以及导入依赖的类
package io.eels.component.avro

import java.io.OutputStream
import java.util.concurrent.atomic.AtomicInteger

import io.eels.Row
import io.eels.schema.StructType
import org.apache.avro.file.DataFileWriter
import org.apache.avro.generic
import org.apache.avro.generic.GenericRecord

class AvroWriter(structType: StructType, out: OutputStream) {
  
  private val schema = AvroSchemaFns.toAvroSchema(structType)
  private val datumWriter = new generic.GenericDatumWriter[GenericRecord](schema)
  private val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
  private val serializer = new RowSerializer(schema)
  private val _records = new AtomicInteger(0)

  dataFileWriter.create(schema, out)

  def write(row: Row): Unit = {
    val record = serializer.serialize(row)
    dataFileWriter.append(record)
    _records.incrementAndGet()
  }

  def records: Int = _records.get()

  def close(): Unit = {
    dataFileWriter.flush()
    dataFileWriter.close()
  }
} 
开发者ID:51zero,项目名称:eel-sdk,代码行数:35,代码来源:AvroWriter.scala


示例17: apply

//设置package包名称以及导入依赖的类
package io.eels.component.parquet.avro

import io.eels.Predicate
import io.eels.component.parquet.{ParquetPredicateBuilder, ParquetReaderConfig}
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.{AvroParquetReader, AvroReadSupport}
import org.apache.parquet.filter2.compat.FilterCompat
import org.apache.parquet.hadoop.ParquetReader


  def apply(path: Path,
            predicate: Option[Predicate],
            projectionSchema: Option[Schema])(implicit conf: Configuration): ParquetReader[GenericRecord] = {

    // The parquet reader can use a projection by setting a projected schema onto a conf object
    def configuration(): Configuration = {
      val newconf = new Configuration(conf)
      projectionSchema.foreach { it =>
        AvroReadSupport.setAvroReadSchema(newconf, it)
        AvroReadSupport.setRequestedProjection(newconf, it)
      }
      //conf.set(ParquetInputFormat.DICTIONARY_FILTERING_ENABLED, "true")
      newconf.set(org.apache.parquet.hadoop.ParquetFileReader.PARQUET_READ_PARALLELISM, config.parallelism.toString)
      newconf
    }

    // a filter is set when we have a predicate for the read
    def filter(): FilterCompat.Filter = predicate.map(ParquetPredicateBuilder.build)
      .map(FilterCompat.get)
      .getOrElse(FilterCompat.NOOP)

    AvroParquetReader.builder[GenericRecord](path)
      .withCompatibility(false)
      .withConf(configuration())
      .withFilter(filter())
      .build()
      .asInstanceOf[ParquetReader[GenericRecord]]
  }
} 
开发者ID:51zero,项目名称:eel-sdk,代码行数:43,代码来源:AvroParquetReaderFn.scala


示例18: AvroParquetRowWriter

//设置package包名称以及导入依赖的类
package io.eels.component.parquet.avro

import com.sksamuel.exts.Logging
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.fs.{FileSystem, Path}


class AvroParquetRowWriter(path: Path,
                           avroSchema: Schema)(implicit fs: FileSystem) extends Logging {

  private val config: Config = ConfigFactory.load()
  private val skipCrc = config.getBoolean("eel.parquet.skipCrc")
  logger.info(s"Parquet writer will skipCrc = $skipCrc")

  private val writer = AvroParquetWriterFn(path, avroSchema)

  def write(record: GenericRecord): Unit = {
    writer.write(record)
  }

  def close(): Unit = {
    writer.close()
    if (skipCrc) {
      val crc = new Path("." + path.toString() + ".crc")
      logger.debug("Deleting crc $crc")
      if (fs.exists(crc))
        fs.delete(crc, false)
    }
  }
} 
开发者ID:51zero,项目名称:eel-sdk,代码行数:33,代码来源:AvroParquetRowWriter.scala


示例19: JsonEncodingDecoding

//设置package包名称以及导入依赖的类
package com.giampaolotrapasso.avrosamples.test

import java.io.ByteArrayInputStream

import com.giampaolotrapasso.avrosamples.SchemaRegistry
import com.giampaolotrapasso.avrosamples.events.{Event, MovieChangedV1, MovieChangedV2, MovieChangedV3}
import com.giampaolotrapasso.avrosamples.serializers.{BinarySerializer, DataWithSchemaSerializer, JsonSerializer}
import com.sksamuel.avro4s.{AvroInputStream, FromRecord, RecordFormat, ToRecord}
import org.apache.avro.Schema
import org.apache.avro.file.SeekableByteArrayInput
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
import org.apache.avro.io.DecoderFactory

class JsonEncodingDecoding extends TestSpec {
  val title     = "Raiders of lost ark"
  val year      = 1986
  val director  = "Spielberg"
  val wonOscars = 1

  def deserialize[A <: Event: ToRecord: FromRecord: RecordFormat](oldSchema: Schema,
                                                                  newSchema: Schema,
                                                                  stream: ByteArrayInputStream) = {
    val gdr                   = new GenericDatumReader[GenericRecord](oldSchema, newSchema)
    val binDecoder            = DecoderFactory.get().jsonDecoder(newSchema, stream)
    val record: GenericRecord = gdr.read(null, binDecoder)
    val format                = RecordFormat[A]
    format.from(record)
  }

  "BinaryEncodingDecodingTest" should "deserialize an added field V1(title, year) to V2(title, year, director) " in {
    val obj                = MovieChangedV1(title, year)
    val bytes: Array[Byte] = JsonSerializer.serializeV1(obj)
    //println("*** Json Size" + bytes.length)

    val in = new SeekableByteArrayInput(bytes)

    val result = deserialize[MovieChangedV2](SchemaRegistry.movieChanged(1), SchemaRegistry.movieChanged(2), in)

    result should matchPattern {
      case MovieChangedV2(`title`, `year`, "unknown") ?
    }
  }

} 
开发者ID:giampaolotrapasso,项目名称:avro-samples,代码行数:45,代码来源:JsonEncodingDecoding.scala


示例20: KProducer

//设置package包名称以及导入依赖的类
package org.parsec

import java.util.Properties
import java.util.concurrent.Future

import com.sksamuel.avro4s.{FromRecord, RecordFormat, ToRecord}
import io.confluent.kafka.serializers.KafkaAvroSerializer
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}

// Convenient Kafka producer using avro4s
class KProducer[K <: Product, V <: Product] {

  val kafkaProps = new Properties()
  kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "parsec.playground.landoop.com:49092")
  kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[KafkaAvroSerializer].getCanonicalName)
  kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[KafkaAvroSerializer].getCanonicalName)
  kafkaProps.put("schema.registry.url", "http://parsec.playground.landoop.com:48081")
  private lazy val producer  = new KafkaProducer[GenericRecord, GenericRecord](kafkaProps)

  def produce(topic: String, key: K, value: V, partition: Int = 0)(implicit toRecordKey: ToRecord[K], fromRecordKey: FromRecord[K], toRecord: ToRecord[V], fromRecord: FromRecord[V]): Future[RecordMetadata] = {
    val keyRec = RecordFormat[K].to(key)
    val valueRec = RecordFormat[V].to(value)
    val data: ProducerRecord[GenericRecord, GenericRecord] = new ProducerRecord(topic, partition, keyRec, valueRec)
    producer.send(data)
  }

} 
开发者ID:parsec-network,项目名称:parsec,代码行数:29,代码来源:KProducer.scala



注:本文中的org.apache.avro.generic.GenericRecord类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala Experimental类代码示例发布时间:2022-05-23
下一篇:
Scala LocalDate类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap