本文整理汇总了Scala中org.apache.avro.generic.GenericData类的典型用法代码示例。如果您正苦于以下问题:Scala GenericData类的具体用法?Scala GenericData怎么用?Scala GenericData使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了GenericData类的8个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: Tip
//设置package包名称以及导入依赖的类
package com.alvin.niagara.model
import java.io.ByteArrayOutputStream
import java.util
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, GenericDatumReader, GenericDatumWriter, GenericRecord}
import org.apache.avro.io.{DecoderFactory, EncoderFactory}
import scala.collection.JavaConversions._
import scala.io.Source
case class Tip(business_id: String, date: String, likes: Long, text: String, `type`: String, user_id: String)
object TipSerde {
val avroSchema = Source.fromInputStream(getClass.getResourceAsStream("/schema/tip.avsc")).mkString
val schema = new Schema.Parser().parse(avroSchema)
val reader = new GenericDatumReader[GenericRecord](schema)
val writer = new GenericDatumWriter[GenericRecord](schema)
def serialize(tip: Tip): Array[Byte] = {
val out = new ByteArrayOutputStream()
val encoder = EncoderFactory.get.binaryEncoder(out, null)
val avroRecord = new GenericData.Record(schema)
avroRecord.put("business_id", tip.business_id)
avroRecord.put("date", tip.date)
avroRecord.put("likes", tip.likes)
avroRecord.put("text", tip.text)
avroRecord.put("type", tip.`type`)
avroRecord.put("user_id", tip.user_id)
writer.write(avroRecord, encoder)
encoder.flush
out.close
out.toByteArray
}
def deserialize(bytes: Array[Byte]): Tip = {
val decoder = DecoderFactory.get.binaryDecoder(bytes, null)
val record = reader.read(null, decoder)
Tip(
record.get("business_id").toString,
record.get("date").toString,
record.get("likes").asInstanceOf[Long],
record.get("text").toString,
record.get("type").toString,
record.get("user_id").toString
)
}
}
开发者ID:AlvinCJin,项目名称:Niagara,代码行数:60,代码来源:Tip.scala
示例2: AvroTreeBuilder
//设置package包名称以及导入依赖的类
package eventgen.launcher.core.avro
import eventgen.launcher.core.PrimitiveGenerators._
import eventgen.launcher.core._
import org.apache.avro.Schema
import org.apache.avro.Schema.{Field, Type}
import scala.collection.JavaConversions._
import scalaz._
import Scalaz._
import org.apache.avro.generic.GenericData
class AvroTreeBuilder extends TreeBuilder[Schema, AvroNode[_]] {
def getCustomFieldState(schema: Schema, extGenerator: ExternalGenerator[_]): State[ImmutableRandom, _] = extGenerator.get
def getRangeFieldState[T](from: Int, to: Int)(implicit rangeGen: RangeGenerator[T]): State[ImmutableRandom, AvroNode[_]] = {
rangeGen.generate(from, to).map(AvroField[T](_))
}
def getFieldState(f: Field, context: ExecutionContext): String \/ State[ImmutableRandom, AvroNode[_]] = {
val RangePattern = "Range\\[(Double|Int)\\]\\(from = ([-0-9]+), to = ([-0-9]+)\\)".r
f.getProp("generator") match {
case RangePattern(typeParam, Int(from), Int(to)) => typeParam match {
case "Double" => \/-(getRangeFieldState[Double](from, to))
case "Int" => \/-(getRangeFieldState[Int](from, to))
}
case name => context.generators.get(name) match {
case Some(extGenerator) => \/-(extGenerator.get.map(AvroField(_)))
case None => -\/(s"Cannot find generator $name")
}
}
}
override def buildTree(rootSchema: Schema, executionContext: ExecutionContext): String \/ State[ImmutableRandom, AvroNode[_]] = {
val fields = rootSchema.getFields.toList
val fieldStates = fields.map(f => {
if (f.schema().getType == Type.RECORD)
buildTree(f.schema(), executionContext).map((f.name(), _))
else
getFieldState(f, executionContext).map((f.name(), _))
})
for (childrenMap <- fieldStates.sequenceU) yield generateNodeState(rootSchema, childrenMap.toMap)
}
def generateNodeState(rootSchema: Schema, childrenStates: Map[String, State[ImmutableRandom, AvroNode[_]]]) = {
State[ImmutableRandom, AvroNode[_]](rand => {
val nativeRecord = new GenericData.Record(rootSchema)
val (rand2, childNodes) = childrenStates.invertStatesMap(rand)
childNodes.foreach {
case (fieldName, node) => nativeRecord.put(fieldName, node.value)
}
(rand2, AvroRecord(nativeRecord))
})
}
}
开发者ID:gpulse,项目名称:eventgenerator,代码行数:59,代码来源:AvroTreeBuilder.scala
示例3: AvroDeserializerTest
//设置package包名称以及导入依赖的类
package io.eels.component.avro
import com.typesafe.config.ConfigFactory
import io.eels.Row
import io.eels.schema._
import org.apache.avro.generic.GenericData
import org.scalatest.{Matchers, WordSpec}
class AvroDeserializerTest extends WordSpec with Matchers {
private val config = ConfigFactory.parseString(""" eel.avro.fillMissingValues = true """)
"toRow" should {
"create eel row from supplied avro record" in {
val schema = StructType(Field("a", nullable = false), Field("b", nullable = false), Field("c", nullable = false))
val record = new GenericData.Record(AvroSchemaFns.toAvroSchema(schema))
record.put("a", "aaaa")
record.put("b", "bbbb")
record.put("c", "cccc")
val row = new AvroDeserializer(true).toRow(record)
row.schema shouldBe schema
row shouldBe Row(schema, "aaaa", "bbbb", "cccc")
}
"support arrays" in {
val schema = StructType(Field("a"), Field("b", ArrayType(BooleanType)))
val record = new GenericData.Record(AvroSchemaFns.toAvroSchema(schema))
record.put("a", "aaaa")
record.put("b", Array(true, false))
new AvroDeserializer().toRow(record).values.head shouldBe "aaaa"
new AvroDeserializer().toRow(record).values.last.asInstanceOf[Array[Boolean]].toList shouldBe List(true, false)
}
}
}
开发者ID:51zero,项目名称:eel-sdk,代码行数:34,代码来源:AvroDeserializerTest.scala
示例4: newEventHeader
//设置package包名称以及导入依赖的类
package com.pragmasoft.eventaggregator.support
import org.apache.avro.generic.{GenericData, GenericRecord}
trait GenericRecordEventFixture extends RecordEventsSchemas {
def newEventHeader : GenericRecord = newEventHeader("ID", None, System.currentTimeMillis())
def newEventHeader(id: String, correlationId: Option[String], eventTs: Long) : GenericRecord = {
val header = new GenericData.Record(EventHeaderSchema)
header.put("id", id)
header.put("correlationId", correlationId.orNull)
header.put("eventTs", eventTs)
header
}
lazy val record: GenericRecord = {
val result = new GenericData.Record(ProfileCreatedSchema)
result.put("header", newEventHeader("RECORD_ID", Some("CORRELATION_ID"), 100l))
result.put("userId", "UserID")
result.put("firstName", "Stefano")
result.put("lastName", "Galarraga")
result.put("username", "stefano.galarraga")
result
}
lazy val recordWithMoreNestedHeader : GenericRecord = {
val subHeader = new GenericData.Record(SubHeaderSchema)
subHeader.put("id", "RECORD_ID")
subHeader.put("correlationId", "CORRELATION_ID")
val header = new GenericData.Record(NestedHeaderSchema)
header.put("subHeader", subHeader)
header.put("eventTs", 100l)
val result = new GenericData.Record(ProfileCreatedSchema)
result.put("header", header)
result.put("userId", "UserID")
result.put("username", "Stefano")
result.put("description", "NewUser")
result
}
}
开发者ID:galarragas,项目名称:event-aggregator,代码行数:49,代码来源:GenericRecordEventFixture.scala
示例5: Main
//设置package包名称以及导入依赖的类
package com.github.dnvriend
import akka.actor.ActorSystem
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericData.Record
import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerRecord }
import scala.concurrent.Future
import scala.concurrent._
object Main extends App {
val system = ActorSystem()
implicit val ec = system.dispatcher
val producer = new KafkaProducer[String, Record](KafkaConfig.config)
val key = "key1"
val userSchema =
"""
|{
| "type":"record",
| "name":"myrecord",
| "fields": [
| {"name": "f1", "type":"string"}
| ]
|}
""".stripMargin
val parser = new Schema.Parser()
val schema = parser.parse(userSchema)
val avroRecord: Record = new GenericData.Record(schema)
avroRecord.put("f1", "value1")
val record = new ProducerRecord("topic1", key, avroRecord)
(for {
_ <- Future(blocking(producer.send(record).get))
_ <- system.terminate()
} yield ()).recoverWith {
case t: Throwable =>
t.printStackTrace()
system.terminate()
}
}
开发者ID:dnvriend,项目名称:kafka-streams-test,代码行数:42,代码来源:Main.scala
示例6: BasicTest
//设置package包名称以及导入依赖的类
package com.landoop.kafka.testing
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import org.apache.kafka.clients.producer.ProducerRecord
class BasicTest extends ClusterTestingCapabilities {
private def createAvroRecord = {
val userSchema = "{\"namespace\": \"example.avro\", \"type\": \"record\", " + "\"name\": \"User\"," + "\"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}"
val parser = new Schema.Parser
val schema = parser.parse(userSchema)
val avroRecord = new GenericData.Record(schema)
avroRecord.put("name", "testUser")
avroRecord
}
"KCluster" should {
"start up and be able to handle avro records being sent " in {
val topic = "testAvro"
val avroRecord = createAvroRecord
val objects = Array[AnyRef](avroRecord)
val producerProps = stringAvroProducerProps
val producer = createProducer[String,Any](producerProps)
for (o <- objects) {
val message = new ProducerRecord[String, Any](topic, o)
producer.send(message)
}
val consumerProps = stringAvroConsumerProps()
val consumer = createStringAvroConsumer(consumerProps)
val records = consumeStringAvro(consumer, topic, objects.length)
objects.toSeq shouldBe records
}
"handle the avro new producer" in {
val topic = "testAvro"
val avroRecord = createAvroRecord
val objects = Array[Any](avroRecord, true, 130, 345L, 1.23f, 2.34d, "abc", "def".getBytes)
val producerProps = stringAvroProducerProps
val producer = createProducer[String,Any](producerProps)
for (o <- objects) {
producer.send(new ProducerRecord[String, Any](topic, o))
}
val consumerProps = stringAvroConsumerProps()
val consumer = createStringAvroConsumer(consumerProps)
val records = consumeStringAvro(consumer, topic, objects.length)
objects.deep shouldBe records.toArray.deep
}
}
}
开发者ID:Landoop,项目名称:kafka-testing,代码行数:53,代码来源:BasicTest.scala
示例7: BasicTest
//设置package包名称以及导入依赖的类
package com.landoop.kafka.ws.cluster
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import org.apache.kafka.clients.producer.ProducerRecord
class BasicTest extends ClusterTestingCapabilities {
private def createAvroRecord = {
val userSchema = "{\"namespace\": \"example.avro\", \"type\": \"record\", " + "\"name\": \"User\"," + "\"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}"
val parser = new Schema.Parser
val schema = parser.parse(userSchema)
val avroRecord = new GenericData.Record(schema)
avroRecord.put("name", "testUser")
avroRecord
}
"KCluster" should {
"start up and be able to handle avro records being sent " in {
val topic = "testAvro"
val avroRecord = createAvroRecord
val objects = Array[AnyRef](avroRecord)
val producerProps = stringAvroProducerProps
val producer = createProducer(producerProps)
for (o <- objects) {
val message = new ProducerRecord[String, Any](topic, o)
producer.send(message)
}
val consumerProps = stringAvroConsumerProps()
val consumer = createStringAvroConsumer(consumerProps)
val records = consumeStringAvro(consumer, topic, objects.length)
objects.toSeq shouldBe records
}
"handle the avro new producer" in {
val topic = "testAvro"
val avroRecord = createAvroRecord
val objects = Array[Any](avroRecord, true, 130, 345L, 1.23f, 2.34d, "abc", "def".getBytes)
val producerProps = stringAvroProducerProps
val producer = createProducer(producerProps)
for (o <- objects) {
producer.send(new ProducerRecord[String, Any](topic, o))
}
val consumerProps = stringAvroConsumerProps()
val consumer = createStringAvroConsumer(consumerProps)
val records = consumeStringAvro(consumer, topic, objects.length)
objects.deep shouldBe records.toArray.deep
}
}
}
开发者ID:Landoop,项目名称:kafka-ws,代码行数:53,代码来源:BasicTest.scala
示例8: BasicTest
//设置package包名称以及导入依赖的类
package com.landoop.kstreams.sql.cluster
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import org.apache.kafka.clients.producer.ProducerRecord
class BasicTest extends ClusterTestingCapabilities {
private def createAvroRecord = {
val userSchema = "{\"namespace\": \"example.avro\", \"type\": \"record\", " + "\"name\": \"User\"," + "\"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}"
val parser = new Schema.Parser
val schema = parser.parse(userSchema)
val avroRecord = new GenericData.Record(schema)
avroRecord.put("name", "testUser")
avroRecord
}
"KCluster" should {
"start up and be able to handle avro records being sent " in {
val topic = "testAvro"
val avroRecord = createAvroRecord
val objects = Array[AnyRef](avroRecord)
val producerProps = stringAvroProducerProps
val producer = createProducer(producerProps)
for (o <- objects) {
val message = new ProducerRecord[String, Any](topic, o)
producer.send(message)
}
val consumerProps = stringAvroConsumerProps()
val consumer = createStringAvroConsumer(consumerProps)
val records = consumeStringAvro(consumer, topic, objects.length)
objects.toSeq shouldBe records
}
"handle the avro new producer" in {
val topic = "testAvro"
val avroRecord = createAvroRecord
val objects = Array[Any](avroRecord, true, 130, 345L, 1.23f, 2.34d, "abc", "def".getBytes)
val producerProps = stringAvroProducerProps
val producer = createProducer(producerProps)
for (o <- objects) {
producer.send(new ProducerRecord[String, Any](topic, o))
}
val consumerProps = stringAvroConsumerProps()
val consumer = createStringAvroConsumer(consumerProps)
val records = consumeStringAvro(consumer, topic, objects.length)
objects.deep shouldBe records.toArray.deep
}
}
}
开发者ID:Landoop,项目名称:kstreams-sql-transform,代码行数:53,代码来源:BasicTest.scala
注:本文中的org.apache.avro.generic.GenericData类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论