本文整理汇总了Scala中org.apache.hadoop.io.BytesWritable类的典型用法代码示例。如果您正苦于以下问题:Scala BytesWritable类的具体用法?Scala BytesWritable怎么用?Scala BytesWritable使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了BytesWritable类的8个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: Utils
//设置package包名称以及导入依赖的类
package com.larry.da.jobs.idmap
import org.apache.commons.io.output.ByteArrayOutputStream
import org.apache.hadoop.io.{NullWritable, BytesWritable}
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.KryoSerializer
import scala.reflect.ClassTag
object Utils {
val keys="0123456789ABCDEFHIJKMNPRSTUVWXYZ"
val keyDic = Map( keys zip (0 until 32 ) toSeq :_* )
def compressAguid(uid:Long)={
var n = uid
val res = 0 until 13 map(i=>{ val index = n & 31; n = n >>> 5; keys(index.toInt)})
res.mkString("").reverse + "u"
}
def unCompressAguid(uid:String)={
val res = uid.take(13).map(s=>keyDic(s))
var n = res.head.toLong
res.tail.foreach(p=> {
n = (n << 5) | p
})
n
}
def saveAsObjectFile[T: ClassTag](rdd: RDD[T], path: String) {
val kryoSerializer = new KryoSerializer(rdd.context.getConf)
rdd.mapPartitions(iter => iter.grouped(10)
.map(_.toArray))
.map(splitArray => {
//initializes kyro and calls your registrator class
val kryo = kryoSerializer.newKryo()
//convert data to bytes
val bao = new ByteArrayOutputStream()
val output = kryoSerializer.newKryoOutput()
output.setOutputStream(bao)
kryo.writeClassAndObject(output, splitArray)
output.close()
// We are ignoring key field of sequence file
val byteWritable = new BytesWritable(bao.toByteArray)
(NullWritable.get(), byteWritable)
}).saveAsSequenceFile(path)
}
}
开发者ID:larry88,项目名称:spark_da,代码行数:55,代码来源:Utils.scala
示例2: Converter
//设置package包名称以及导入依赖的类
package com.dataoptimo.imgprocessing.convert
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.hadoop.io.IOUtils
import org.apache.hadoop.io.SequenceFile
import java.io.IOException
import java.lang.IllegalArgumentException
class Converter(conf: Configuration) {
def imageToSequence(srcPath: String, dstPath: String){
try {
val fs = FileSystem.get(conf);
val inPath = new Path(srcPath);
val outPath = new Path(dstPath);
val key = new Text();
val value = new BytesWritable();
val in = fs.open(inPath);
val buffer = new Array[Byte](in.available())
in.read(buffer);
var writer = SequenceFile.createWriter(fs, conf, outPath, key.getClass(),value.getClass());
writer.append(new Text(inPath.getName()), new BytesWritable(buffer));
IOUtils.closeStream(writer);
}
catch {
case io: IOException => println(io.getMessage)
case illegalArgument: IllegalArgumentException => println(illegalArgument.getMessage)
}
}
}
开发者ID:mfawadalam,项目名称:imgprocessing,代码行数:36,代码来源:Converters.scala
示例3: filename
//设置package包名称以及导入依赖的类
package uk.co.odinconsultants.bitcoin.uk.co.odinconsultants.bitcoin.parsing
import org.apache.hadoop.io.BytesWritable
import org.scalatest.{Matchers, WordSpec}
import org.zuinnote.hadoop.bitcoin.format.common.BitcoinBlock
import org.zuinnote.hadoop.bitcoin.format.mapreduce.BitcoinBlockFileInputFormat
import uk.co.odinconsultants.bitcoin.core.Logging
import uk.co.odinconsultants.bitcoin.hbase.HBaseMetaRetrieval
import uk.co.odinconsultants.bitcoin.hbase.HBaseSetup.{createAddressesTable, familyName, metaTable}
import uk.co.odinconsultants.bitcoin.integration.hadoop.MiniHadoopClusterRunning
import uk.co.odinconsultants.bitcoin.integration.hbase.HBaseForTesting.{admin, utility}
import uk.co.odinconsultants.bitcoin.integration.hbase.HBaseTestConfig.getConnection
import uk.co.odinconsultants.bitcoin.integration.spark.SparkForTesting.sc
import uk.co.odinconsultants.bitcoin.parsing.Indexer.{index, write}
trait HdfsFixture extends MiniHadoopClusterRunning with Matchers with Logging { this: WordSpec =>
def filename: String
"Copied file to HDFS" should {
info(s"Using blockchain file: '$filename'")
val hdfsFile = copyToHdfs(localFile(filename))
"be possible" in {
val files = list(dir)
files should have size 1
}
val rdd = sc.newAPIHadoopFile(hdfsFile.toString, classOf[BitcoinBlockFileInputFormat], classOf[BytesWritable], classOf[BitcoinBlock], conf)
"allow Spark to use it" in {
rdd.count() should be > 0L
}
val outputs = index(rdd)
"not generated dupes when indexed" in {
outputs.count() should be > 0L
val dupes = outputs.map(_ -> 1).reduceByKey(_ + _).filter(_._2 > 1).collect()
withClue(s"\n${dupes.mkString("\n")}\n") {
dupes shouldBe empty
}
}
"have its metadata persisted in HBase" in {
createAddressesTable(admin)
write(outputs, () => getConnection(utility.getConfiguration))
val reader = new HBaseMetaRetrieval(admin.getConnection.getTable(metaTable), familyName)
outputs.collect().foreach { payload =>
val (backReference, pubKey) = payload
val actual = reader(backReference)
withClue(s"\nWrote = $pubKey (${pubKey.mkString(",")})\nRead = $actual (${actual.mkString(",")})\n") {
actual shouldEqual pubKey
}
}
}
}
}
开发者ID:PhillHenry,项目名称:Cryptorigin,代码行数:58,代码来源:HdfsFixture.scala
示例4: Indexer
//设置package包名称以及导入依赖的类
package uk.co.odinconsultants.bitcoin.parsing
import org.apache.hadoop.hbase.client.Connection
import org.apache.hadoop.io.BytesWritable
import org.apache.spark.rdd.RDD
import org.bitcoinj.params.MainNetParams
import org.zuinnote.hadoop.bitcoin.format.common.BitcoinBlock
import uk.co.odinconsultants.bitcoin.hbase.HBaseMetaStore
import uk.co.odinconsultants.bitcoin.hbase.HBaseSetup.{familyName, tableName}
import uk.co.odinconsultants.bitcoin.parsing.DomainOps._
import uk.co.odinconsultants.bitcoin.parsing.MetaStore.Payload
object Indexer {
type PubKey = Array[Byte]
type BackReference = (Array[Byte], Long)
val networkParams: MainNetParams = MainNetParams.get()
def index(rdd: RDD[(BytesWritable, BitcoinBlock)]): RDD[Payload] =
rdd.flatMap{ case(_, block) => toTransactions(block) }.flatMap(toBackReferenceAddressTuples)
def write(rdd: RDD[Payload], connectionFactory: () => Connection): Unit = {
val batchSize = 100
rdd.foreachPartition { iter =>
val connection = connectionFactory()
val table = connection.getTable(tableName)
val metaStore = new HBaseMetaStore(table, familyName)
iter.grouped(batchSize).foreach { metaIter =>
metaStore(metaIter.toList)
}
connection.close()
}
}
}
开发者ID:PhillHenry,项目名称:Cryptorigin,代码行数:39,代码来源:Indexer.scala
示例5: SequenceSource
//设置package包名称以及导入依赖的类
package io.eels.component.sequence
import java.util.concurrent.atomic.AtomicBoolean
import com.sksamuel.exts.Logging
import com.sksamuel.exts.io.Using
import io.eels._
import io.eels.datastream.{DataStream, Publisher, Subscriber, Subscription}
import io.eels.schema.StructType
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{BytesWritable, IntWritable, SequenceFile}
case class SequenceSource(path: Path)(implicit conf: Configuration) extends Source with Logging {
logger.debug(s"Creating sequence source from $path")
override def schema: StructType = SequenceSupport.schema(path)
override def parts(): Seq[Publisher[Seq[Row]]] = List(new SequencePublisher(path))
}
object SequenceReaderIterator {
def apply(schema: StructType, reader: SequenceFile.Reader): Iterator[Row] = new Iterator[Row] {
private val k = new IntWritable()
private val v = new BytesWritable()
// throw away the header
reader.next(k, v)
override def next(): Row = Row(schema, SequenceSupport.toValues(v).toVector)
override def hasNext(): Boolean = reader.next(k, v)
}
}
class SequencePublisher(val path: Path)(implicit conf: Configuration) extends Publisher[Seq[Row]] with Logging with Using {
override def subscribe(subscriber: Subscriber[Seq[Row]]): Unit = {
try {
using(SequenceSupport.createReader(path)) { reader =>
val schema = SequenceSupport.schema(path)
val running = new AtomicBoolean(true)
subscriber.subscribed(Subscription.fromRunning(running))
SequenceReaderIterator(schema, reader)
.takeWhile(_ => running.get)
.grouped(DataStream.DefaultBatchSize)
.foreach(subscriber.next)
subscriber.completed()
}
} catch {
case t: Throwable => subscriber.error(t)
}
}
}
开发者ID:51zero,项目名称:eel-sdk,代码行数:52,代码来源:SequenceSource.scala
示例6: SequenceSupport
//设置package包名称以及导入依赖的类
package io.eels.component.sequence
import java.io.StringReader
import java.nio.charset.Charset
import com.sksamuel.exts.Logging
import com.sksamuel.exts.io.Using
import io.eels.component.csv.{CsvFormat, CsvSupport}
import io.eels.schema.{Field, StructType}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{BytesWritable, IntWritable, SequenceFile}
object SequenceSupport extends Logging with Using {
def createReader(path: Path)(implicit conf: Configuration): SequenceFile.Reader =
new SequenceFile.Reader(conf, SequenceFile.Reader.file(path))
def toValues(v: BytesWritable): Array[String] = toValues(new String(v.copyBytes(), Charset.forName("UTF8")))
def toValues(str: String): Array[String] = {
val parser = CsvSupport.createParser(CsvFormat(), false, false, false, null, null)
parser.beginParsing(new StringReader(str))
val record = parser.parseNext()
parser.stopParsing()
record
}
def schema(path: Path)(implicit conf: Configuration): StructType = {
logger.debug(s"Fetching sequence schema for $path")
using(createReader(path)) { it =>
val k = new IntWritable()
val v = new BytesWritable()
val fields: Array[Field] = {
it.next(k, v)
toValues(v).map { it => new Field(it) }
}
StructType(fields.toList)
}
}
}
开发者ID:51zero,项目名称:eel-sdk,代码行数:42,代码来源:SequenceSupport.scala
示例7: SequenceSinkTest
//设置package包名称以及导入依赖的类
package io.eels.component.sequence
import io.eels.datastream.DataStream
import io.eels.schema.StructType
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.{BytesWritable, IntWritable, SequenceFile}
import org.scalatest.{Matchers, WordSpec}
class SequenceSinkTest extends WordSpec with Matchers {
private val ds = DataStream.fromValues(
StructType("a", "b", "c", "d"),
Seq(
List("1", "2", "3", "4"),
List("5", "6", "7", "8")
)
)
"SequenceSink" should {
"write sequence files" in {
implicit val conf = new Configuration
implicit val fs = FileSystem.get(conf)
val path = new Path("seqsink.seq")
if (fs.exists(path))
fs.delete(path, true)
ds.to(SequenceSink(path))
val reader = new SequenceFile.Reader(new Configuration, SequenceFile.Reader.file(path))
val k = new IntWritable
val v = new BytesWritable
val set = for (_ <- 1 to 3) yield {
reader.next(k, v)
new String(v.copyBytes)
}
set.toSet shouldBe Set(
"a,b,c,d",
"1,2,3,4",
"5,6,7,8"
)
reader.close()
fs.delete(path, true)
}
}
}
开发者ID:51zero,项目名称:eel-sdk,代码行数:54,代码来源:SequenceSinkTest.scala
示例8: SequenceSink
//设置package包名称以及导入依赖的类
package io.eels.component.sequence
import java.io.StringWriter
import com.univocity.parsers.csv.{CsvWriter, CsvWriterSettings}
import io.eels.{Row, Sink, SinkWriter}
import io.eels.schema.StructType
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{BytesWritable, IntWritable, SequenceFile}
case class SequenceSink(path: Path)(implicit conf: Configuration) extends Sink {
override def open(schema: StructType): SinkWriter = new SequenceSinkWriter(schema, path)
class SequenceSinkWriter(schema: StructType, path: Path) extends SinkWriter {
val writer = SequenceFile.createWriter(conf,
SequenceFile.Writer.file(path),
SequenceFile.Writer.keyClass(classOf[IntWritable]),
SequenceFile.Writer.valueClass(classOf[BytesWritable])
)
val key = new IntWritable(0)
val headers = valuesToCsv(schema.fieldNames())
writer.append(key, new BytesWritable(headers.getBytes))
override def close(): Unit = writer.close()
override def write(row: Row): Unit = {
this.synchronized {
val csv = valuesToCsv(row.values)
writer.append(key, new BytesWritable(csv.getBytes()))
key.set(key.get() + 1)
}
}
private def valuesToCsv(values: Seq[Any]): String = {
val swriter = new StringWriter()
val csv = new CsvWriter(swriter, new CsvWriterSettings())
csv.writeRow(values.map {
case null => null
case other => other.toString
}: _*)
csv.close()
swriter.toString().trim()
}
}
}
开发者ID:51zero,项目名称:eel-sdk,代码行数:51,代码来源:SequenceSink.scala
注:本文中的org.apache.hadoop.io.BytesWritable类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论