• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala BytesWritable类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中org.apache.hadoop.io.BytesWritable的典型用法代码示例。如果您正苦于以下问题:Scala BytesWritable类的具体用法?Scala BytesWritable怎么用?Scala BytesWritable使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了BytesWritable类的8个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: Utils

//设置package包名称以及导入依赖的类
package com.larry.da.jobs.idmap

import org.apache.commons.io.output.ByteArrayOutputStream
import org.apache.hadoop.io.{NullWritable, BytesWritable}
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.KryoSerializer

import scala.reflect.ClassTag


object Utils {

  val keys="0123456789ABCDEFHIJKMNPRSTUVWXYZ"
  val keyDic = Map( keys zip (0 until 32 ) toSeq :_* )

  def compressAguid(uid:Long)={
    var n = uid
    val res = 0 until 13 map(i=>{ val index = n & 31; n = n >>> 5; keys(index.toInt)})
    res.mkString("").reverse + "u"
  }

  def unCompressAguid(uid:String)={
    val res = uid.take(13).map(s=>keyDic(s))
    var n = res.head.toLong
    res.tail.foreach(p=> {
      n = (n << 5) | p
    })
    n
  }


  def saveAsObjectFile[T: ClassTag](rdd: RDD[T], path: String) {
    val kryoSerializer = new KryoSerializer(rdd.context.getConf)

    rdd.mapPartitions(iter => iter.grouped(10)
      .map(_.toArray))
      .map(splitArray => {
        //initializes kyro and calls your registrator class
        val kryo = kryoSerializer.newKryo()

        //convert data to bytes
        val bao = new ByteArrayOutputStream()
        val output = kryoSerializer.newKryoOutput()
        output.setOutputStream(bao)
        kryo.writeClassAndObject(output, splitArray)
        output.close()

        // We are ignoring key field of sequence file
        val byteWritable = new BytesWritable(bao.toByteArray)
        (NullWritable.get(), byteWritable)
      }).saveAsSequenceFile(path)
  }

} 
开发者ID:larry88,项目名称:spark_da,代码行数:55,代码来源:Utils.scala


示例2: Converter

//设置package包名称以及导入依赖的类
package com.dataoptimo.imgprocessing.convert
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.hadoop.io.IOUtils
import org.apache.hadoop.io.SequenceFile
import java.io.IOException
import java.lang.IllegalArgumentException

class Converter(conf: Configuration) {
  
  def imageToSequence(srcPath: String, dstPath: String){
    try {
    val fs = FileSystem.get(conf);
    val inPath = new Path(srcPath);
    val outPath = new Path(dstPath);
    val key = new Text();
    val value = new BytesWritable();
    val in = fs.open(inPath);
    val buffer = new Array[Byte](in.available())
    in.read(buffer);
    var writer = SequenceFile.createWriter(fs, conf, outPath, key.getClass(),value.getClass());
    writer.append(new Text(inPath.getName()), new BytesWritable(buffer));
    IOUtils.closeStream(writer);
    }
    catch {
      case io: IOException => println(io.getMessage)
      case illegalArgument: IllegalArgumentException => println(illegalArgument.getMessage)
    }
    
    
    
  }
} 
开发者ID:mfawadalam,项目名称:imgprocessing,代码行数:36,代码来源:Converters.scala


示例3: filename

//设置package包名称以及导入依赖的类
package uk.co.odinconsultants.bitcoin.uk.co.odinconsultants.bitcoin.parsing

import org.apache.hadoop.io.BytesWritable
import org.scalatest.{Matchers, WordSpec}
import org.zuinnote.hadoop.bitcoin.format.common.BitcoinBlock
import org.zuinnote.hadoop.bitcoin.format.mapreduce.BitcoinBlockFileInputFormat
import uk.co.odinconsultants.bitcoin.core.Logging
import uk.co.odinconsultants.bitcoin.hbase.HBaseMetaRetrieval
import uk.co.odinconsultants.bitcoin.hbase.HBaseSetup.{createAddressesTable, familyName, metaTable}
import uk.co.odinconsultants.bitcoin.integration.hadoop.MiniHadoopClusterRunning
import uk.co.odinconsultants.bitcoin.integration.hbase.HBaseForTesting.{admin, utility}
import uk.co.odinconsultants.bitcoin.integration.hbase.HBaseTestConfig.getConnection
import uk.co.odinconsultants.bitcoin.integration.spark.SparkForTesting.sc
import uk.co.odinconsultants.bitcoin.parsing.Indexer.{index, write}

trait HdfsFixture extends MiniHadoopClusterRunning with Matchers with Logging { this: WordSpec =>

  def filename: String

  "Copied file to HDFS" should {
    info(s"Using blockchain file: '$filename'")
    val hdfsFile = copyToHdfs(localFile(filename))

    "be possible" in {
      val files = list(dir)
      files should have size 1
    }

    val rdd = sc.newAPIHadoopFile(hdfsFile.toString, classOf[BitcoinBlockFileInputFormat], classOf[BytesWritable], classOf[BitcoinBlock], conf)
    "allow Spark to use it" in {
      rdd.count() should be > 0L
    }

    val outputs = index(rdd)
    "not generated dupes when indexed" in {
      outputs.count() should be > 0L
      val dupes = outputs.map(_ -> 1).reduceByKey(_ + _).filter(_._2 > 1).collect()
      withClue(s"\n${dupes.mkString("\n")}\n") {
        dupes shouldBe empty
      }
    }

    "have its metadata persisted in HBase" in {
      createAddressesTable(admin)
      write(outputs, () => getConnection(utility.getConfiguration))

      val reader = new HBaseMetaRetrieval(admin.getConnection.getTable(metaTable), familyName)
      outputs.collect().foreach { payload =>
        val (backReference, pubKey) = payload
        val actual                  = reader(backReference)
        withClue(s"\nWrote = $pubKey (${pubKey.mkString(",")})\nRead = $actual (${actual.mkString(",")})\n") {
          actual shouldEqual pubKey
        }
      }
    }
  }
} 
开发者ID:PhillHenry,项目名称:Cryptorigin,代码行数:58,代码来源:HdfsFixture.scala


示例4: Indexer

//设置package包名称以及导入依赖的类
package uk.co.odinconsultants.bitcoin.parsing

import org.apache.hadoop.hbase.client.Connection
import org.apache.hadoop.io.BytesWritable
import org.apache.spark.rdd.RDD
import org.bitcoinj.params.MainNetParams
import org.zuinnote.hadoop.bitcoin.format.common.BitcoinBlock
import uk.co.odinconsultants.bitcoin.hbase.HBaseMetaStore
import uk.co.odinconsultants.bitcoin.hbase.HBaseSetup.{familyName, tableName}
import uk.co.odinconsultants.bitcoin.parsing.DomainOps._
import uk.co.odinconsultants.bitcoin.parsing.MetaStore.Payload

object Indexer {

  type PubKey         = Array[Byte]
  type BackReference  = (Array[Byte], Long)

  val networkParams: MainNetParams = MainNetParams.get()

  def index(rdd: RDD[(BytesWritable, BitcoinBlock)]): RDD[Payload] =
    rdd.flatMap{ case(_, block) => toTransactions(block) }.flatMap(toBackReferenceAddressTuples)

  def write(rdd: RDD[Payload], connectionFactory: () => Connection): Unit = {
    val batchSize = 100
    rdd.foreachPartition { iter =>
      val connection  = connectionFactory()
      val table       = connection.getTable(tableName)
      val metaStore   = new HBaseMetaStore(table, familyName)

      iter.grouped(batchSize).foreach { metaIter =>
        metaStore(metaIter.toList)
      }

      connection.close()
    }
  }

} 
开发者ID:PhillHenry,项目名称:Cryptorigin,代码行数:39,代码来源:Indexer.scala


示例5: SequenceSource

//设置package包名称以及导入依赖的类
package io.eels.component.sequence

import java.util.concurrent.atomic.AtomicBoolean

import com.sksamuel.exts.Logging
import com.sksamuel.exts.io.Using
import io.eels._
import io.eels.datastream.{DataStream, Publisher, Subscriber, Subscription}
import io.eels.schema.StructType
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{BytesWritable, IntWritable, SequenceFile}

case class SequenceSource(path: Path)(implicit conf: Configuration) extends Source with Logging {
  logger.debug(s"Creating sequence source from $path")

  override def schema: StructType = SequenceSupport.schema(path)
  override def parts(): Seq[Publisher[Seq[Row]]] = List(new SequencePublisher(path))
}

object SequenceReaderIterator {
  def apply(schema: StructType, reader: SequenceFile.Reader): Iterator[Row] = new Iterator[Row] {
    private val k = new IntWritable()
    private val v = new BytesWritable()
    // throw away the header
    reader.next(k, v)
    override def next(): Row = Row(schema, SequenceSupport.toValues(v).toVector)
    override def hasNext(): Boolean = reader.next(k, v)
  }
}

class SequencePublisher(val path: Path)(implicit conf: Configuration) extends Publisher[Seq[Row]] with Logging with Using {

  override def subscribe(subscriber: Subscriber[Seq[Row]]): Unit = {
    try {
      using(SequenceSupport.createReader(path)) { reader =>
        val schema = SequenceSupport.schema(path)
        val running = new AtomicBoolean(true)
        subscriber.subscribed(Subscription.fromRunning(running))
        SequenceReaderIterator(schema, reader)
          .takeWhile(_ => running.get)
          .grouped(DataStream.DefaultBatchSize)
          .foreach(subscriber.next)

        subscriber.completed()
      }
    } catch {
      case t: Throwable => subscriber.error(t)
    }
  }
} 
开发者ID:51zero,项目名称:eel-sdk,代码行数:52,代码来源:SequenceSource.scala


示例6: SequenceSupport

//设置package包名称以及导入依赖的类
package io.eels.component.sequence

import java.io.StringReader
import java.nio.charset.Charset

import com.sksamuel.exts.Logging
import com.sksamuel.exts.io.Using
import io.eels.component.csv.{CsvFormat, CsvSupport}
import io.eels.schema.{Field, StructType}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{BytesWritable, IntWritable, SequenceFile}

object SequenceSupport extends Logging with Using {

  def createReader(path: Path)(implicit conf: Configuration): SequenceFile.Reader =
    new SequenceFile.Reader(conf, SequenceFile.Reader.file(path))

  def toValues(v: BytesWritable): Array[String] = toValues(new String(v.copyBytes(), Charset.forName("UTF8")))

  def toValues(str: String): Array[String] = {
    val parser = CsvSupport.createParser(CsvFormat(), false, false, false, null, null)
    parser.beginParsing(new StringReader(str))
    val record = parser.parseNext()
    parser.stopParsing()
    record
  }

  def schema(path: Path)(implicit conf: Configuration): StructType = {
    logger.debug(s"Fetching sequence schema for $path")
    using(createReader(path)) { it =>
      val k = new IntWritable()
      val v = new BytesWritable()
      val fields: Array[Field] = {
        it.next(k, v)
        toValues(v).map { it => new Field(it) }
      }
      StructType(fields.toList)
    }
  }
} 
开发者ID:51zero,项目名称:eel-sdk,代码行数:42,代码来源:SequenceSupport.scala


示例7: SequenceSinkTest

//设置package包名称以及导入依赖的类
package io.eels.component.sequence

import io.eels.datastream.DataStream
import io.eels.schema.StructType
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.{BytesWritable, IntWritable, SequenceFile}
import org.scalatest.{Matchers, WordSpec}

class SequenceSinkTest extends WordSpec with Matchers {

  private val ds = DataStream.fromValues(
    StructType("a", "b", "c", "d"),
    Seq(
      List("1", "2", "3", "4"),
      List("5", "6", "7", "8")
    )
  )

  "SequenceSink" should {
    "write sequence files" in {

      implicit val conf = new Configuration
      implicit val fs = FileSystem.get(conf)

      val path = new Path("seqsink.seq")
      if (fs.exists(path))
        fs.delete(path, true)

      ds.to(SequenceSink(path))

      val reader = new SequenceFile.Reader(new Configuration, SequenceFile.Reader.file(path))

      val k = new IntWritable
      val v = new BytesWritable

      val set = for (_ <- 1 to 3) yield {
        reader.next(k, v)
        new String(v.copyBytes)
      }

      set.toSet shouldBe Set(
        "a,b,c,d",
        "1,2,3,4",
        "5,6,7,8"
      )

      reader.close()

      fs.delete(path, true)
    }
  }
} 
开发者ID:51zero,项目名称:eel-sdk,代码行数:54,代码来源:SequenceSinkTest.scala


示例8: SequenceSink

//设置package包名称以及导入依赖的类
package io.eels.component.sequence

import java.io.StringWriter

import com.univocity.parsers.csv.{CsvWriter, CsvWriterSettings}
import io.eels.{Row, Sink, SinkWriter}
import io.eels.schema.StructType
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{BytesWritable, IntWritable, SequenceFile}

case class SequenceSink(path: Path)(implicit conf: Configuration) extends Sink {

  override def open(schema: StructType): SinkWriter = new SequenceSinkWriter(schema, path)

  class SequenceSinkWriter(schema: StructType, path: Path) extends SinkWriter {

    val writer = SequenceFile.createWriter(conf,
        SequenceFile.Writer.file(path),
      SequenceFile.Writer.keyClass(classOf[IntWritable]),
      SequenceFile.Writer.valueClass(classOf[BytesWritable])
    )

    val key = new IntWritable(0)

    val headers = valuesToCsv(schema.fieldNames())
    writer.append(key, new BytesWritable(headers.getBytes))

    override def close(): Unit = writer.close()

    override def write(row: Row): Unit = {
      this.synchronized {
        val csv = valuesToCsv(row.values)
        writer.append(key, new BytesWritable(csv.getBytes()))
        key.set(key.get() + 1)
      }
    }

    private def valuesToCsv(values: Seq[Any]): String = {
      val swriter = new StringWriter()
      val csv = new CsvWriter(swriter, new CsvWriterSettings())
      csv.writeRow(values.map {
        case null => null
        case other => other.toString
      }: _*)
      csv.close()
      swriter.toString().trim()
    }
  }
} 
开发者ID:51zero,项目名称:eel-sdk,代码行数:51,代码来源:SequenceSink.scala



注:本文中的org.apache.hadoop.io.BytesWritable类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala UserGroupInformation类代码示例发布时间:2022-05-23
下一篇:
Scala TrieMap类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap