本文整理汇总了Scala中org.apache.avro.generic.GenericRecord类的典型用法代码示例。如果您正苦于以下问题:Scala GenericRecord类的具体用法?Scala GenericRecord怎么用?Scala GenericRecord使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了GenericRecord类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: Tip
//设置package包名称以及导入依赖的类
package com.alvin.niagara.model
import java.io.ByteArrayOutputStream
import java.util
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, GenericDatumReader, GenericDatumWriter, GenericRecord}
import org.apache.avro.io.{DecoderFactory, EncoderFactory}
import scala.collection.JavaConversions._
import scala.io.Source
case class Tip(business_id: String, date: String, likes: Long, text: String, `type`: String, user_id: String)
object TipSerde {
val avroSchema = Source.fromInputStream(getClass.getResourceAsStream("/schema/tip.avsc")).mkString
val schema = new Schema.Parser().parse(avroSchema)
val reader = new GenericDatumReader[GenericRecord](schema)
val writer = new GenericDatumWriter[GenericRecord](schema)
def serialize(tip: Tip): Array[Byte] = {
val out = new ByteArrayOutputStream()
val encoder = EncoderFactory.get.binaryEncoder(out, null)
val avroRecord = new GenericData.Record(schema)
avroRecord.put("business_id", tip.business_id)
avroRecord.put("date", tip.date)
avroRecord.put("likes", tip.likes)
avroRecord.put("text", tip.text)
avroRecord.put("type", tip.`type`)
avroRecord.put("user_id", tip.user_id)
writer.write(avroRecord, encoder)
encoder.flush
out.close
out.toByteArray
}
def deserialize(bytes: Array[Byte]): Tip = {
val decoder = DecoderFactory.get.binaryDecoder(bytes, null)
val record = reader.read(null, decoder)
Tip(
record.get("business_id").toString,
record.get("date").toString,
record.get("likes").asInstanceOf[Long],
record.get("text").toString,
record.get("type").toString,
record.get("user_id").toString
)
}
}
开发者ID:AlvinCJin,项目名称:Niagara,代码行数:60,代码来源:Tip.scala
示例2: AvroNodeSerializer
//设置package包名称以及导入依赖的类
package eventgen.launcher.core.avro
import java.io.ByteArrayOutputStream
import eventgen.launcher.core.NodeSerializer
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumWriter, GenericRecord}
import org.apache.avro.io.EncoderFactory
class AvroNodeSerializer extends NodeSerializer[Schema, AvroNode[_], ByteArrayOutputStream] {
override def serialize(metadata: Schema, node: AvroNode[_]): ByteArrayOutputStream = {
val record = node.asInstanceOf[AvroRecord]
val writer = new GenericDatumWriter[GenericRecord]
writer.setSchema(metadata)
val outputStream = new ByteArrayOutputStream
val encoder = EncoderFactory.get().jsonEncoder(metadata, outputStream, true)
writer.write(record.value, encoder)
encoder.flush()
outputStream
}
}
开发者ID:gpulse,项目名称:eventgenerator,代码行数:23,代码来源:AvroNodeSerializer.scala
示例3: AvroParquetWriterFn
//设置package包名称以及导入依赖的类
package io.eels.component.parquet.avro
import com.sksamuel.exts.Logging
import io.eels.component.parquet.ParquetWriterConfig
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.{ParquetFileWriter, ParquetWriter}
object AvroParquetWriterFn extends Logging {
def apply(path: Path, avroSchema: Schema): ParquetWriter[GenericRecord] = {
val config = ParquetWriterConfig()
AvroParquetWriter.builder[GenericRecord](path)
.withSchema(avroSchema)
.withCompressionCodec(config.compressionCodec)
.withPageSize(config.pageSize)
.withRowGroupSize(config.blockSize)
.withDictionaryEncoding(config.enableDictionary)
.withWriteMode(ParquetFileWriter.Mode.CREATE)
.withValidation(config.validating)
.build()
}
}
开发者ID:51zero,项目名称:eel-sdk,代码行数:26,代码来源:AvroParquetWriterFn.scala
示例4: JsonToParquetPipelineFactory
//设置package包名称以及导入依赖的类
package yamrcraft.etlite.pipeline
import org.apache.avro.generic.GenericRecord
import yamrcraft.etlite.PipelineSettings
import yamrcraft.etlite.transformers.{JsonToAvroTransformer, Message}
import yamrcraft.etlite.writers.{AvroToParquetWriter, TimePartitioningWriter}
class JsonToParquetPipelineFactory extends PipelineFactory[Message[GenericRecord]] {
def createPipeline(settings: PipelineSettings, jobId: Long, partitionId: Int): Pipeline[Message[GenericRecord]] =
new Pipeline(
new JsonToAvroTransformer(settings.transformerConfig),
new TimePartitioningWriter(
settings.writerConfig,
jobId,
partitionId,
(tempFile, outputFile) => new AvroToParquetWriter(tempFile, outputFile))
)
}
开发者ID:yamrcraft,项目名称:etl-light,代码行数:21,代码来源:JsonToParquetPipelineFactory.scala
示例5: AvroConverter
//设置package包名称以及导入依赖的类
import org.apache.avro.generic.GenericRecord
// Avro -> Case class conversion
object AvroConverter {
def shipment(record: GenericRecord) = {
Schemas.Shipments_v1(
record.get("itemID").asInstanceOf[Long],
record.get("storeCode").toString,
record.get("count").asInstanceOf[Int])
}
def sale(record: GenericRecord) = {
Schemas.Sales_v2(
record.get("itemID").asInstanceOf[Long],
record.get("storeCode").toString,
record.get("count").asInstanceOf[Int],
record.get("customerID").toString)
}
def getShipment(message: (Object, Object)) = {
val (k, v) = message
//val name = k.asInstanceOf[IndexedRecord].getSchema.getName
//if (name == "Shipments_v1")
val value = v.asInstanceOf[GenericRecord]
shipment(value)
}
def getSale(message: (Object, Object)) = {
val (k, v) = message
val value = v.asInstanceOf[GenericRecord]
sale(value)
}
}
开发者ID:Antwnis,项目名称:kafka-streaming-examples,代码行数:35,代码来源:AvroConverter.scala
示例6: AvroUtils
//设置package包名称以及导入依赖的类
package pulse.kafka.avro
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, File}
import com.twitter.util.Future
import org.apache.avro.Schema
import org.apache.avro.file.DataFileWriter
import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord}
import org.apache.avro.io.DecoderFactory
import pulse.kafka.extensions.managedByteArrayInputStream
import pulse.kafka.extensions.managedByteArrayOutputStream
import pulse.kafka.extensions.catsStdInstancesForFuture
import scala.concurrent.ExecutionContext.Implicits._
object AvroUtils {
import pulse.common.syntax._
def jsonToAvroBytes(json: String, schemaFile: File): Future[Array[Byte]] =
use(new ByteArrayOutputStream()) { output =>
for {
s <- loadSchema(schemaFile)
_ <- convertImpl(json, output, s)
} yield output.toByteArray
}
private def convertImpl(json: String, output: ByteArrayOutputStream, schemaSpec: Schema): Future[GenericDatumReader[GenericRecord]] =
use(new ByteArrayInputStream(json.getBytes)) { input =>
for {
w <- getWriter(output, schemaSpec)
r <- getReader(input, schemaSpec, w)
} yield r
}
def getReader(input: ByteArrayInputStream, schemaSpec: Schema, w: DataFileWriter[GenericRecord]) = Future.value {
val reader = new GenericDatumReader[GenericRecord](schemaSpec)
val datum = reader.read(null, getJsonDecoder(input, schemaSpec))
w.append(datum)
w.flush()
reader
}
private def getJsonDecoder(input: ByteArrayInputStream, schema: Schema) =
DecoderFactory.get.jsonDecoder(schema, new DataInputStream(input))
private def getWriter(output: ByteArrayOutputStream, schemaSpec: Schema) = {
Future.value {
val writer = new DataFileWriter[GenericRecord](new GenericDatumWriter[GenericRecord]())
writer.create(schemaSpec, output)
}
}
private def loadSchema(schemaFile: File): Future[Schema] =
Future {
new Schema.Parser().parse(schemaFile)
}
}
开发者ID:gpulse,项目名称:kafka,代码行数:60,代码来源:AvroUtils.scala
示例7: KProducer
//设置package包名称以及导入依赖的类
package org.parsec
import java.util.Properties
import java.util.concurrent.Future
import com.sksamuel.avro4s.{FromRecord, RecordFormat, ToRecord}
import io.confluent.kafka.serializers.KafkaAvroSerializer
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}
// Convenient Kafka producer using avro4s
class KProducer[K <: Product, V <: Product] {
val kafkaProps = new Properties()
kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[KafkaAvroSerializer].getCanonicalName)
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[KafkaAvroSerializer].getCanonicalName)
kafkaProps.put("schema.registry.url", "http://localhost:8081")
private lazy val producer = new KafkaProducer[GenericRecord, GenericRecord](kafkaProps)
def produce(topic: String, key: K, value: V, partition: Int = 0)(implicit toRecordKey: ToRecord[K], fromRecordKey: FromRecord[K], toRecord: ToRecord[V], fromRecord: FromRecord[V]): Future[RecordMetadata] = {
val keyRec = RecordFormat[K].to(key)
val valueRec = RecordFormat[V].to(value)
val data: ProducerRecord[GenericRecord, GenericRecord] = new ProducerRecord(topic, partition, keyRec, valueRec)
producer.send(data)
}
}
开发者ID:cryptocurrencyindia,项目名称:Parsec,代码行数:29,代码来源:KProducer.scala
示例8: KafkaSparkStreamingRegistrator
//设置package包名称以及导入依赖的类
package com.miguno.kafkastorm.spark.serialization
import com.esotericsoftware.kryo.Kryo
import com.miguno.avro.Tweet
import com.twitter.chill.avro.AvroSerializer
import org.apache.avro.generic.GenericRecord
import org.apache.spark.serializer.KryoRegistrator
class KafkaSparkStreamingRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo) {
// Registers a serializer for any generic Avro records. The kafka-storm-starter project does not yet include
// examples that work on generic Avro records, but we keep this registration for the convenience of our readers.
kryo.register(classOf[GenericRecord], AvroSerializer.GenericRecordSerializer[GenericRecord]())
// Registers a serializer specifically for the, well, specific Avro record `Tweet`
kryo.register(classOf[Tweet], AvroSerializer.SpecificRecordSerializer[Tweet])
()
}
}
开发者ID:JohnReedLOL,项目名称:KafkaStormSpark,代码行数:22,代码来源:KafkaSparkStreamingRegistrator.scala
示例9: AvroType
//设置package包名称以及导入依赖的类
package shapeless.datatype.avro
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import shapeless._
import scala.reflect.runtime.universe._
class AvroType[A] extends Serializable {
def fromGenericRecord[L <: HList](m: GenericRecord)
(implicit gen: LabelledGeneric.Aux[A, L], fromL: FromAvroRecord[L])
: Option[A] = fromL(Right(m)).map(gen.from)
def toGenericRecord[L <: HList](a: A)
(implicit gen: LabelledGeneric.Aux[A, L], toL: ToAvroRecord[L], tt: TypeTag[A])
: GenericRecord = toL(gen.to(a)).left.get.build(AvroSchema[A])
}
object AvroType {
def apply[A: TypeTag]: AvroType[A] = new AvroType[A]
def at[V: TypeTag](schemaType: Schema.Type)(fromFn: Any => V, toFn: V => Any): BaseAvroMappableType[V] = {
AvroSchema.register(implicitly[TypeTag[V]].tpe, schemaType)
new BaseAvroMappableType[V] {
override def from(value: Any): V = fromFn(value)
override def to(value: V): Any = toFn(value)
}
}
}
开发者ID:nevillelyh,项目名称:shapeless-datatype,代码行数:29,代码来源:AvroType.scala
示例10: AvroUtils
//设置package包名称以及导入依赖的类
package pulse.services.example.avro
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, File}
import org.apache.avro.Schema
import org.apache.avro.file.DataFileWriter
import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord}
import org.apache.avro.io.DecoderFactory
import pulse.services.example.extensions._
object AvroUtils {
def jsonToAvroBytes(json: String, schemaFile: File) = {
use(new ByteArrayOutputStream())(output => {
val schemaSpec = loadSchema(schemaFile)
use(new ByteArrayInputStream(json.getBytes))(input => {
val writer = new DataFileWriter[GenericRecord](new GenericDatumWriter[GenericRecord]())
writer.create(schemaSpec, output)
val reader = new GenericDatumReader[GenericRecord](schemaSpec)
val datum = reader.read(null, getJsonDecoder(input, schemaSpec))
writer.append(datum)
writer.flush()
})
output.toByteArray
})
}
def getJsonDecoder(input: ByteArrayInputStream, schema: Schema) =
DecoderFactory.get.jsonDecoder(schema, new DataInputStream(input))
def loadSchema(schemaFile: File) =
new Schema.Parser().parse(schemaFile)
}
开发者ID:gpulse,项目名称:services,代码行数:34,代码来源:AvroUtils.scala
示例11: KafkaSparkStreamingRegistrator
//设置package包名称以及导入依赖的类
package com.miguno.kafkastorm.spark.serialization
import com.esotericsoftware.kryo.Kryo
import com.miguno.avro.Tweet
import com.twitter.chill.avro.AvroSerializer
import org.apache.avro.generic.GenericRecord
import org.apache.spark.serializer.KryoRegistrator
import scala.trace.{Pos, implicitlyFormatable}
class KafkaSparkStreamingRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo) {
// Registers a serializer for any generic Avro records. The kafka-storm-starter project does not yet include
// examples that work on generic Avro records, but we keep this registration for the convenience of our readers.
kryo.register(classOf[GenericRecord], AvroSerializer.GenericRecordSerializer[GenericRecord]())
// Registers a serializer specifically for the, well, specific Avro record `Tweet`
kryo.register(classOf[Tweet], AvroSerializer.SpecificRecordSerializer[Tweet])
()
}
}
开发者ID:JohnReedLOL,项目名称:full-stack-big-data,代码行数:23,代码来源:KafkaSparkStreamingRegistrator.scala
示例12: AvroFileWriter
//设置package包名称以及导入依赖的类
package com.landoop.avro
import java.io.{BufferedOutputStream, File, FileOutputStream}
import com.landoop.avro.codec.CodecFactory
import org.apache.avro.Schema
import org.apache.avro.file.DataFileWriter
import org.apache.avro.generic.GenericRecord
object AvroFileWriter {
def fastWrite(file: File,
count: Int,
parallelization: Int,
schema: Schema,
records: IndexedSeq[GenericRecord]) = {
val out = new BufferedOutputStream(new FileOutputStream(file), 4 * 1048576)
import org.apache.avro.generic.GenericDatumWriter
val datumWriter = new GenericDatumWriter[GenericRecord](schema)
val builder = FastDataFileWriterBuilder(datumWriter, out, schema)
.withCodec(CodecFactory.snappyCodec())
.withFlushOnEveryBlock(false)
.withParallelization(parallelization)
builder.encoderFactory.configureBufferSize(4 * 1048576)
builder.encoderFactory.configureBlockSize(4 * 1048576)
val fileWriter = builder.build()
fileWriter.write(records)
fileWriter.close()
}
def write(file: File,
count: Int,
schema: Schema,
records: Seq[GenericRecord]) = {
val out = new BufferedOutputStream(new FileOutputStream(file), 4 * 1048576)
import org.apache.avro.generic.GenericDatumWriter
val datumWriter = new GenericDatumWriter[GenericRecord](schema)
val writer = new DataFileWriter(datumWriter)
.setCodec(org.apache.avro.file.CodecFactory.snappyCodec())
.create(schema, out)
writer.setFlushOnEveryBlock(false)
records.foreach(writer.append)
writer.close()
}
}
开发者ID:Landoop,项目名称:fast-avro-write,代码行数:51,代码来源:AvroFileWriter.scala
示例13: props
//设置package包名称以及导入依赖的类
package com.kakao.cuesheet.convert
import java.util.Arrays.copyOfRange
import kafka.serializer.Decoder
import kafka.utils.VerifiableProperties
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
sealed trait AvroDecoder[T] extends Decoder[T] {
def props: VerifiableProperties
protected val schema = new Schema.Parser().parse(props.getString(Avro.SCHEMA))
protected val skipBytes = props.getInt(Avro.SKIP_BYTES, 0)
protected val reader = new GenericDatumReader[GenericRecord](schema)
protected val decoder = Avro.recordDecoder(reader)
private def skip(bytes: Array[Byte], size: Int): Array[Byte] = {
val length = bytes.length
length - size match {
case remaining if remaining > 0 => copyOfRange(bytes, size, length)
case _ => new Array[Byte](0)
}
}
def parse(bytes: Array[Byte]): GenericRecord = {
val data = if (skipBytes == 0) bytes else skip(bytes, skipBytes)
decoder(data)
}
}
class AvroRecordDecoder(val props: VerifiableProperties) extends AvroDecoder[GenericRecord] {
override def fromBytes(bytes: Array[Byte]): GenericRecord = parse(bytes)
}
class AvroMapDecoder(val props: VerifiableProperties) extends AvroDecoder[Map[String, Any]] {
override def fromBytes(bytes: Array[Byte]): Map[String, Any] = Avro.toMap(parse(bytes))
}
class AvroJsonDecoder(val props: VerifiableProperties) extends AvroDecoder[String] {
override def fromBytes(bytes: Array[Byte]): String = Avro.toJson(parse(bytes))
}
开发者ID:kakao,项目名称:cuesheet,代码行数:46,代码来源:AvroDecoder.scala
示例14: AvroUtils
//设置package包名称以及导入依赖的类
package com.datawizards.sparklocal.impl.scala.dataset.io
import org.apache.avro.generic.GenericRecord
import org.apache.avro.{Schema, SchemaBuilder}
import scala.collection.JavaConversions._
object AvroUtils {
def mapSchema(originalSchema: Schema, fieldNameMapping: Map[String, String]): Schema = {
var customSchemaBuilder = SchemaBuilder
.builder(originalSchema.getNamespace)
.record(originalSchema.getName)
.fields()
originalSchema.getFields.toList.foreach { f =>
customSchemaBuilder = customSchemaBuilder
.name(fieldNameMapping(f.name()))
.`type`(f.schema())
.noDefault()
}
customSchemaBuilder.endRecord()
}
def mapGenericRecordFromOriginalToTarget(record: GenericRecord, mappedSchema: Schema, fieldNameMapping: Map[String, String]): GenericRecord = {
val customRecord = new org.apache.avro.generic.GenericData.Record(mappedSchema)
record.getSchema.getFields.foreach{ f =>
customRecord.put(fieldNameMapping(f.name()), record.get(f.name()))
}
customRecord
}
def mapGenericRecordFromTargetToOriginal(record: GenericRecord, schema: Schema, fieldNameMapping: Map[String, String]): GenericRecord = {
val customRecord = new org.apache.avro.generic.GenericData.Record(schema)
customRecord.getSchema.getFields.foreach{ f =>
customRecord.put(f.name(), record.get(fieldNameMapping(f.name())))
}
customRecord
}
}
开发者ID:piotr-kalanski,项目名称:spark-local,代码行数:42,代码来源:AvroUtils.scala
示例15: AvroDeserializer
//设置package包名称以及导入依赖的类
package io.eels.component.avro
import com.typesafe.config.ConfigFactory
import io.eels.Row
import io.eels.schema.StructType
import org.apache.avro.Schema.Field
import org.apache.avro.generic.GenericRecord
import org.apache.avro.util.Utf8
import scala.collection.JavaConverters._
class AvroDeserializer(useJavaString: Boolean = ConfigFactory.load().getBoolean("eel.avro.java.string")) {
val config = ConfigFactory.load()
val deserializeAsNullable = config.getBoolean("eel.avro.deserializeAsNullable")
var schema: StructType = null
var fields: Array[Field] = null
var range: Range = null
def toScala(value: Any): Any = {
value match {
case record: GenericRecord => toValues(record)
case utf8: Utf8 if useJavaString => value.asInstanceOf[Utf8].toString
case col: java.util.Collection[Any] => col.asScala.toVector.map(toScala)
case map: java.util.Map[_, _] => map.asScala.toMap.map { case (k, v) => toScala(k) -> toScala(v) }
case other => other
}
}
def toValues(record: GenericRecord): Vector[Any] = {
val vector = Vector.newBuilder[Any]
for (k <- 0 until record.getSchema.getFields.size) {
val value = record.get(k)
vector += toScala(value)
}
vector.result
}
def toRow(record: GenericRecord): Row = {
// take the schema from the first record
if (schema == null) {
schema = AvroSchemaFns.fromAvroSchema(record.getSchema, deserializeAsNullable)
}
Row(schema, toValues(record))
}
}
开发者ID:51zero,项目名称:eel-sdk,代码行数:48,代码来源:AvroDeserializer.scala
示例16: AvroWriter
//设置package包名称以及导入依赖的类
package io.eels.component.avro
import java.io.OutputStream
import java.util.concurrent.atomic.AtomicInteger
import io.eels.Row
import io.eels.schema.StructType
import org.apache.avro.file.DataFileWriter
import org.apache.avro.generic
import org.apache.avro.generic.GenericRecord
class AvroWriter(structType: StructType, out: OutputStream) {
private val schema = AvroSchemaFns.toAvroSchema(structType)
private val datumWriter = new generic.GenericDatumWriter[GenericRecord](schema)
private val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
private val serializer = new RowSerializer(schema)
private val _records = new AtomicInteger(0)
dataFileWriter.create(schema, out)
def write(row: Row): Unit = {
val record = serializer.serialize(row)
dataFileWriter.append(record)
_records.incrementAndGet()
}
def records: Int = _records.get()
def close(): Unit = {
dataFileWriter.flush()
dataFileWriter.close()
}
}
开发者ID:51zero,项目名称:eel-sdk,代码行数:35,代码来源:AvroWriter.scala
示例17: apply
//设置package包名称以及导入依赖的类
package io.eels.component.parquet.avro
import io.eels.Predicate
import io.eels.component.parquet.{ParquetPredicateBuilder, ParquetReaderConfig}
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.{AvroParquetReader, AvroReadSupport}
import org.apache.parquet.filter2.compat.FilterCompat
import org.apache.parquet.hadoop.ParquetReader
def apply(path: Path,
predicate: Option[Predicate],
projectionSchema: Option[Schema])(implicit conf: Configuration): ParquetReader[GenericRecord] = {
// The parquet reader can use a projection by setting a projected schema onto a conf object
def configuration(): Configuration = {
val newconf = new Configuration(conf)
projectionSchema.foreach { it =>
AvroReadSupport.setAvroReadSchema(newconf, it)
AvroReadSupport.setRequestedProjection(newconf, it)
}
//conf.set(ParquetInputFormat.DICTIONARY_FILTERING_ENABLED, "true")
newconf.set(org.apache.parquet.hadoop.ParquetFileReader.PARQUET_READ_PARALLELISM, config.parallelism.toString)
newconf
}
// a filter is set when we have a predicate for the read
def filter(): FilterCompat.Filter = predicate.map(ParquetPredicateBuilder.build)
.map(FilterCompat.get)
.getOrElse(FilterCompat.NOOP)
AvroParquetReader.builder[GenericRecord](path)
.withCompatibility(false)
.withConf(configuration())
.withFilter(filter())
.build()
.asInstanceOf[ParquetReader[GenericRecord]]
}
}
开发者ID:51zero,项目名称:eel-sdk,代码行数:43,代码来源:AvroParquetReaderFn.scala
示例18: AvroParquetRowWriter
//设置package包名称以及导入依赖的类
package io.eels.component.parquet.avro
import com.sksamuel.exts.Logging
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.fs.{FileSystem, Path}
class AvroParquetRowWriter(path: Path,
avroSchema: Schema)(implicit fs: FileSystem) extends Logging {
private val config: Config = ConfigFactory.load()
private val skipCrc = config.getBoolean("eel.parquet.skipCrc")
logger.info(s"Parquet writer will skipCrc = $skipCrc")
private val writer = AvroParquetWriterFn(path, avroSchema)
def write(record: GenericRecord): Unit = {
writer.write(record)
}
def close(): Unit = {
writer.close()
if (skipCrc) {
val crc = new Path("." + path.toString() + ".crc")
logger.debug("Deleting crc $crc")
if (fs.exists(crc))
fs.delete(crc, false)
}
}
}
开发者ID:51zero,项目名称:eel-sdk,代码行数:33,代码来源:AvroParquetRowWriter.scala
示例19: JsonEncodingDecoding
//设置package包名称以及导入依赖的类
package com.giampaolotrapasso.avrosamples.test
import java.io.ByteArrayInputStream
import com.giampaolotrapasso.avrosamples.SchemaRegistry
import com.giampaolotrapasso.avrosamples.events.{Event, MovieChangedV1, MovieChangedV2, MovieChangedV3}
import com.giampaolotrapasso.avrosamples.serializers.{BinarySerializer, DataWithSchemaSerializer, JsonSerializer}
import com.sksamuel.avro4s.{AvroInputStream, FromRecord, RecordFormat, ToRecord}
import org.apache.avro.Schema
import org.apache.avro.file.SeekableByteArrayInput
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
import org.apache.avro.io.DecoderFactory
class JsonEncodingDecoding extends TestSpec {
val title = "Raiders of lost ark"
val year = 1986
val director = "Spielberg"
val wonOscars = 1
def deserialize[A <: Event: ToRecord: FromRecord: RecordFormat](oldSchema: Schema,
newSchema: Schema,
stream: ByteArrayInputStream) = {
val gdr = new GenericDatumReader[GenericRecord](oldSchema, newSchema)
val binDecoder = DecoderFactory.get().jsonDecoder(newSchema, stream)
val record: GenericRecord = gdr.read(null, binDecoder)
val format = RecordFormat[A]
format.from(record)
}
"BinaryEncodingDecodingTest" should "deserialize an added field V1(title, year) to V2(title, year, director) " in {
val obj = MovieChangedV1(title, year)
val bytes: Array[Byte] = JsonSerializer.serializeV1(obj)
//println("*** Json Size" + bytes.length)
val in = new SeekableByteArrayInput(bytes)
val result = deserialize[MovieChangedV2](SchemaRegistry.movieChanged(1), SchemaRegistry.movieChanged(2), in)
result should matchPattern {
case MovieChangedV2(`title`, `year`, "unknown") ?
}
}
}
开发者ID:giampaolotrapasso,项目名称:avro-samples,代码行数:45,代码来源:JsonEncodingDecoding.scala
示例20: KProducer
//设置package包名称以及导入依赖的类
package org.parsec
import java.util.Properties
import java.util.concurrent.Future
import com.sksamuel.avro4s.{FromRecord, RecordFormat, ToRecord}
import io.confluent.kafka.serializers.KafkaAvroSerializer
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}
// Convenient Kafka producer using avro4s
class KProducer[K <: Product, V <: Product] {
val kafkaProps = new Properties()
kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "parsec.playground.landoop.com:49092")
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[KafkaAvroSerializer].getCanonicalName)
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[KafkaAvroSerializer].getCanonicalName)
kafkaProps.put("schema.registry.url", "http://parsec.playground.landoop.com:48081")
private lazy val producer = new KafkaProducer[GenericRecord, GenericRecord](kafkaProps)
def produce(topic: String, key: K, value: V, partition: Int = 0)(implicit toRecordKey: ToRecord[K], fromRecordKey: FromRecord[K], toRecord: ToRecord[V], fromRecord: FromRecord[V]): Future[RecordMetadata] = {
val keyRec = RecordFormat[K].to(key)
val valueRec = RecordFormat[V].to(value)
val data: ProducerRecord[GenericRecord, GenericRecord] = new ProducerRecord(topic, partition, keyRec, valueRec)
producer.send(data)
}
}
开发者ID:parsec-network,项目名称:parsec,代码行数:29,代码来源:KProducer.scala
注:本文中的org.apache.avro.generic.GenericRecord类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论