Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
328 views
in Technique[技术] by (71.8m points)

hbase - Spark serialization error

I am trying to learn spark + scala. I want to read from HBase, but without mapreduce. I created a simple HBase table - "test" and did 3 puts in it. I want to read it via spark (without HBaseTest which uses mapreduce). I tried to run the following commands on shell

val numbers = Array(
  new Get(Bytes.toBytes("row1")), 
  new Get(Bytes.toBytes("row2")), 
  new Get(Bytes.toBytes("row3")))
val conf = new HBaseConfiguration()
val table = new HTable(conf, "test")
sc.parallelize(numbers, numbers.length).map(table.get).count()

I keep getting error - org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.apache.hadoop.hbase.HBaseConfiguration

Can someone help me , how can I create a Htable which uses serialzable configuration

thanks

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Your problem is that table is not serializable (rather it's member conf) and your trying to serialize it by using it inside a map. They way your trying to read HBase isn't quite correct, it looks like your trying some specific Get's and then trying to do them in parallel. Even if you did get this working, this really wouldn't scale as your going to perform random reads. What you want to do is perform a table scan using Spark, here is a code snippet that should help you do it:

val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, tableName)

sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
  classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
  classOf[org.apache.hadoop.hbase.client.Result])

This will give you an RDD containing the NaviagableMap's that constitute the rows. Below is how you can change the NaviagbleMap to a normal Scala map of Strings:

...
.map(kv => (kv._1.get(), navMapToMap(kv._2.getMap)))
.map(kv => (Bytes.toString(kv._1), rowToStrMap(kv._2)))

def navMapToMap(navMap: HBaseRow): CFTimeseriesRow =
  navMap.asScala.toMap.map(cf =>
    (cf._1, cf._2.asScala.toMap.map(col =>
      (col._1, col._2.asScala.toMap.map(elem => (elem._1.toLong, elem._2))))))

def rowToStrMap(navMap: CFTimeseriesRow): CFTimeseriesRowStr =
  navMap.map(cf =>
    (Bytes.toString(cf._1), cf._2.map(col =>
      (Bytes.toString(col._1), col._2.map(elem => (elem._1, Bytes.toString(elem._2)))))))

Final point, if you really do want to try to perform random reads in parallel I believe you might be able to put the HBase table initialization inside the map.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...