• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala GenericData类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中org.apache.avro.generic.GenericData的典型用法代码示例。如果您正苦于以下问题:Scala GenericData类的具体用法?Scala GenericData怎么用?Scala GenericData使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了GenericData类的8个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: Tip

//设置package包名称以及导入依赖的类
package com.alvin.niagara.model

import java.io.ByteArrayOutputStream
import java.util

import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, GenericDatumReader, GenericDatumWriter, GenericRecord}
import org.apache.avro.io.{DecoderFactory, EncoderFactory}

import scala.collection.JavaConversions._
import scala.io.Source


case class Tip(business_id: String, date: String, likes: Long, text: String, `type`: String, user_id: String)


object TipSerde {

  val avroSchema = Source.fromInputStream(getClass.getResourceAsStream("/schema/tip.avsc")).mkString
  val schema = new Schema.Parser().parse(avroSchema)

  val reader = new GenericDatumReader[GenericRecord](schema)
  val writer = new GenericDatumWriter[GenericRecord](schema)

  def serialize(tip: Tip): Array[Byte] = {

    val out = new ByteArrayOutputStream()
    val encoder = EncoderFactory.get.binaryEncoder(out, null)

    val avroRecord = new GenericData.Record(schema)
    avroRecord.put("business_id", tip.business_id)
    avroRecord.put("date", tip.date)
    avroRecord.put("likes", tip.likes)
    avroRecord.put("text", tip.text)
    avroRecord.put("type", tip.`type`)
    avroRecord.put("user_id", tip.user_id)

    writer.write(avroRecord, encoder)
    encoder.flush
    out.close
    out.toByteArray

  }

  def deserialize(bytes: Array[Byte]): Tip = {

    val decoder = DecoderFactory.get.binaryDecoder(bytes, null)
    val record = reader.read(null, decoder)

    Tip(
      record.get("business_id").toString,
      record.get("date").toString,
      record.get("likes").asInstanceOf[Long],
      record.get("text").toString,
      record.get("type").toString,
      record.get("user_id").toString
    )
  }
} 
开发者ID:AlvinCJin,项目名称:Niagara,代码行数:60,代码来源:Tip.scala


示例2: AvroTreeBuilder

//设置package包名称以及导入依赖的类
package eventgen.launcher.core.avro

import eventgen.launcher.core.PrimitiveGenerators._
import eventgen.launcher.core._
import org.apache.avro.Schema
import org.apache.avro.Schema.{Field, Type}

import scala.collection.JavaConversions._
import scalaz._
import Scalaz._
import org.apache.avro.generic.GenericData


class AvroTreeBuilder extends TreeBuilder[Schema, AvroNode[_]] {

  def getCustomFieldState(schema: Schema, extGenerator: ExternalGenerator[_]): State[ImmutableRandom, _] = extGenerator.get

  def getRangeFieldState[T](from: Int, to: Int)(implicit rangeGen: RangeGenerator[T]): State[ImmutableRandom, AvroNode[_]] = {
    rangeGen.generate(from, to).map(AvroField[T](_))
  }

  def getFieldState(f: Field, context: ExecutionContext): String \/ State[ImmutableRandom, AvroNode[_]] = {
    val RangePattern = "Range\\[(Double|Int)\\]\\(from = ([-0-9]+), to = ([-0-9]+)\\)".r
    f.getProp("generator") match {
      case RangePattern(typeParam, Int(from), Int(to)) => typeParam match {
        case "Double" => \/-(getRangeFieldState[Double](from, to))
        case "Int" => \/-(getRangeFieldState[Int](from, to))
      }
      case name => context.generators.get(name) match {
        case Some(extGenerator) => \/-(extGenerator.get.map(AvroField(_)))
        case None => -\/(s"Cannot find generator $name")
      }
    }
  }

  override def buildTree(rootSchema: Schema, executionContext: ExecutionContext): String \/ State[ImmutableRandom, AvroNode[_]] = {
    val fields = rootSchema.getFields.toList
    val fieldStates = fields.map(f => {
      if (f.schema().getType == Type.RECORD)
        buildTree(f.schema(), executionContext).map((f.name(), _))
      else
        getFieldState(f, executionContext).map((f.name(), _))
    })

    for (childrenMap <- fieldStates.sequenceU) yield generateNodeState(rootSchema, childrenMap.toMap)
  }

  def generateNodeState(rootSchema: Schema, childrenStates: Map[String, State[ImmutableRandom, AvroNode[_]]]) = {
    State[ImmutableRandom, AvroNode[_]](rand => {
      val nativeRecord = new GenericData.Record(rootSchema)
      val (rand2, childNodes) = childrenStates.invertStatesMap(rand)
      childNodes.foreach {
        case (fieldName, node) => nativeRecord.put(fieldName, node.value)
      }
      (rand2, AvroRecord(nativeRecord))
    })
  }
} 
开发者ID:gpulse,项目名称:eventgenerator,代码行数:59,代码来源:AvroTreeBuilder.scala


示例3: AvroDeserializerTest

//设置package包名称以及导入依赖的类
package io.eels.component.avro

import com.typesafe.config.ConfigFactory
import io.eels.Row
import io.eels.schema._
import org.apache.avro.generic.GenericData
import org.scalatest.{Matchers, WordSpec}

class AvroDeserializerTest extends WordSpec with Matchers {

  private  val config = ConfigFactory.parseString(""" eel.avro.fillMissingValues = true """)

  "toRow" should {
    "create eel row from supplied avro record" in {
      val schema = StructType(Field("a", nullable = false), Field("b", nullable = false), Field("c", nullable = false))
      val record = new GenericData.Record(AvroSchemaFns.toAvroSchema(schema))
      record.put("a", "aaaa")
      record.put("b", "bbbb")
      record.put("c", "cccc")
      val row = new AvroDeserializer(true).toRow(record)
      row.schema shouldBe schema
      row shouldBe Row(schema, "aaaa", "bbbb", "cccc")
    }
    "support arrays" in {
      val schema = StructType(Field("a"), Field("b", ArrayType(BooleanType)))
      val record = new GenericData.Record(AvroSchemaFns.toAvroSchema(schema))
      record.put("a", "aaaa")
      record.put("b", Array(true, false))
      new AvroDeserializer().toRow(record).values.head shouldBe "aaaa"
      new AvroDeserializer().toRow(record).values.last.asInstanceOf[Array[Boolean]].toList shouldBe List(true, false)
    }
  }
} 
开发者ID:51zero,项目名称:eel-sdk,代码行数:34,代码来源:AvroDeserializerTest.scala


示例4: newEventHeader

//设置package包名称以及导入依赖的类
package com.pragmasoft.eventaggregator.support

import org.apache.avro.generic.{GenericData, GenericRecord}

trait GenericRecordEventFixture extends RecordEventsSchemas {
  def newEventHeader : GenericRecord = newEventHeader("ID", None, System.currentTimeMillis())

  def newEventHeader(id: String, correlationId: Option[String], eventTs: Long) : GenericRecord = {
    val header = new GenericData.Record(EventHeaderSchema)
    header.put("id", id)
    header.put("correlationId", correlationId.orNull)
    header.put("eventTs", eventTs)

    header
  }

  lazy val record: GenericRecord = {
    val result = new GenericData.Record(ProfileCreatedSchema)

    result.put("header", newEventHeader("RECORD_ID", Some("CORRELATION_ID"), 100l))

    result.put("userId", "UserID")
    result.put("firstName", "Stefano")
    result.put("lastName", "Galarraga")
    result.put("username", "stefano.galarraga")

    result
  }

  lazy val recordWithMoreNestedHeader : GenericRecord = {
    val subHeader = new GenericData.Record(SubHeaderSchema)
    subHeader.put("id", "RECORD_ID")
    subHeader.put("correlationId", "CORRELATION_ID")

    val header = new GenericData.Record(NestedHeaderSchema)
    header.put("subHeader", subHeader)
    header.put("eventTs", 100l)

    val result = new GenericData.Record(ProfileCreatedSchema)
    result.put("header", header)

    result.put("userId", "UserID")
    result.put("username", "Stefano")
    result.put("description", "NewUser")

    result
  }
} 
开发者ID:galarragas,项目名称:event-aggregator,代码行数:49,代码来源:GenericRecordEventFixture.scala


示例5: Main

//设置package包名称以及导入依赖的类
package com.github.dnvriend

import akka.actor.ActorSystem
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericData.Record
import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerRecord }

import scala.concurrent.Future
import scala.concurrent._

object Main extends App {
  val system = ActorSystem()
  implicit val ec = system.dispatcher
  val producer = new KafkaProducer[String, Record](KafkaConfig.config)
  val key = "key1"
  val userSchema =
    """
      |{
      |  "type":"record",
      |  "name":"myrecord",
      |  "fields": [
      |    {"name": "f1", "type":"string"}
      |  ]
      |}
    """.stripMargin

  val parser = new Schema.Parser()
  val schema = parser.parse(userSchema)
  val avroRecord: Record = new GenericData.Record(schema)
  avroRecord.put("f1", "value1")
  val record = new ProducerRecord("topic1", key, avroRecord)
  (for {
    _ <- Future(blocking(producer.send(record).get))
    _ <- system.terminate()
  } yield ()).recoverWith {
    case t: Throwable =>
      t.printStackTrace()
      system.terminate()
  }
} 
开发者ID:dnvriend,项目名称:kafka-streams-test,代码行数:42,代码来源:Main.scala


示例6: BasicTest

//设置package包名称以及导入依赖的类
package com.landoop.kafka.testing

import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import org.apache.kafka.clients.producer.ProducerRecord

class BasicTest extends ClusterTestingCapabilities {

  private def createAvroRecord = {
    val userSchema = "{\"namespace\": \"example.avro\", \"type\": \"record\", " + "\"name\": \"User\"," + "\"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}"
    val parser = new Schema.Parser
    val schema = parser.parse(userSchema)
    val avroRecord = new GenericData.Record(schema)
    avroRecord.put("name", "testUser")
    avroRecord
  }

  "KCluster" should {
    "start up and be able to handle avro records being sent " in {
      val topic = "testAvro"
      val avroRecord = createAvroRecord
      val objects = Array[AnyRef](avroRecord)
      val producerProps = stringAvroProducerProps
      val producer = createProducer[String,Any](producerProps)

      for (o <- objects) {
        val message = new ProducerRecord[String, Any](topic, o)
        producer.send(message)
      }
      val consumerProps = stringAvroConsumerProps()
      val consumer = createStringAvroConsumer(consumerProps)
      val records = consumeStringAvro(consumer, topic, objects.length)
      objects.toSeq shouldBe records
    }

    "handle the avro new producer" in {
      val topic = "testAvro"
      val avroRecord = createAvroRecord
      val objects = Array[Any](avroRecord, true, 130, 345L, 1.23f, 2.34d, "abc", "def".getBytes)
      val producerProps = stringAvroProducerProps
      val producer = createProducer[String,Any](producerProps)
      for (o <- objects) {
        producer.send(new ProducerRecord[String, Any](topic, o))
      }
      val consumerProps = stringAvroConsumerProps()
      val consumer = createStringAvroConsumer(consumerProps)
      val records = consumeStringAvro(consumer, topic, objects.length)
      objects.deep shouldBe records.toArray.deep
    }
  }

} 
开发者ID:Landoop,项目名称:kafka-testing,代码行数:53,代码来源:BasicTest.scala


示例7: BasicTest

//设置package包名称以及导入依赖的类
package com.landoop.kafka.ws.cluster

import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import org.apache.kafka.clients.producer.ProducerRecord

class BasicTest extends ClusterTestingCapabilities {

  private def createAvroRecord = {
    val userSchema = "{\"namespace\": \"example.avro\", \"type\": \"record\", " + "\"name\": \"User\"," + "\"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}"
    val parser = new Schema.Parser
    val schema = parser.parse(userSchema)
    val avroRecord = new GenericData.Record(schema)
    avroRecord.put("name", "testUser")
    avroRecord
  }

  "KCluster" should {
    "start up and be able to handle avro records being sent " in {
      val topic = "testAvro"
      val avroRecord = createAvroRecord
      val objects = Array[AnyRef](avroRecord)
      val producerProps = stringAvroProducerProps
      val producer = createProducer(producerProps)

      for (o <- objects) {
        val message = new ProducerRecord[String, Any](topic, o)
        producer.send(message)
      }
      val consumerProps = stringAvroConsumerProps()
      val consumer = createStringAvroConsumer(consumerProps)
      val records = consumeStringAvro(consumer, topic, objects.length)
      objects.toSeq shouldBe records
    }

    "handle the avro new producer" in {
      val topic = "testAvro"
      val avroRecord = createAvroRecord
      val objects = Array[Any](avroRecord, true, 130, 345L, 1.23f, 2.34d, "abc", "def".getBytes)
      val producerProps = stringAvroProducerProps
      val producer = createProducer(producerProps)
      for (o <- objects) {
        producer.send(new ProducerRecord[String, Any](topic, o))
      }
      val consumerProps = stringAvroConsumerProps()
      val consumer = createStringAvroConsumer(consumerProps)
      val records = consumeStringAvro(consumer, topic, objects.length)
      objects.deep shouldBe records.toArray.deep
    }
  }

} 
开发者ID:Landoop,项目名称:kafka-ws,代码行数:53,代码来源:BasicTest.scala


示例8: BasicTest

//设置package包名称以及导入依赖的类
package com.landoop.kstreams.sql.cluster

import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import org.apache.kafka.clients.producer.ProducerRecord

class BasicTest extends ClusterTestingCapabilities {

  private def createAvroRecord = {
    val userSchema = "{\"namespace\": \"example.avro\", \"type\": \"record\", " + "\"name\": \"User\"," + "\"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}"
    val parser = new Schema.Parser
    val schema = parser.parse(userSchema)
    val avroRecord = new GenericData.Record(schema)
    avroRecord.put("name", "testUser")
    avroRecord
  }

  "KCluster" should {
    "start up and be able to handle avro records being sent " in {
      val topic = "testAvro"
      val avroRecord = createAvroRecord
      val objects = Array[AnyRef](avroRecord)
      val producerProps = stringAvroProducerProps
      val producer = createProducer(producerProps)

      for (o <- objects) {
        val message = new ProducerRecord[String, Any](topic, o)
        producer.send(message)
      }
      val consumerProps = stringAvroConsumerProps()
      val consumer = createStringAvroConsumer(consumerProps)
      val records = consumeStringAvro(consumer, topic, objects.length)
      objects.toSeq shouldBe records
    }

    "handle the avro new producer" in {
      val topic = "testAvro"
      val avroRecord = createAvroRecord
      val objects = Array[Any](avroRecord, true, 130, 345L, 1.23f, 2.34d, "abc", "def".getBytes)
      val producerProps = stringAvroProducerProps
      val producer = createProducer(producerProps)
      for (o <- objects) {
        producer.send(new ProducerRecord[String, Any](topic, o))
      }
      val consumerProps = stringAvroConsumerProps()
      val consumer = createStringAvroConsumer(consumerProps)
      val records = consumeStringAvro(consumer, topic, objects.length)
      objects.deep shouldBe records.toArray.deep
    }
  }

} 
开发者ID:Landoop,项目名称:kstreams-sql-transform,代码行数:53,代码来源:BasicTest.scala



注:本文中的org.apache.avro.generic.GenericData类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala ServletHolder类代码示例发布时间:2022-05-23
下一篇:
Scala EOFException类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap