Your problem is that table
is not serializable (rather it's member conf
) and your trying to serialize it by using it inside a map
. They way your trying to read HBase isn't quite correct, it looks like your trying some specific Get's and then trying to do them in parallel. Even if you did get this working, this really wouldn't scale as your going to perform random reads. What you want to do is perform a table scan using Spark, here is a code snippet that should help you do it:
val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, tableName)
sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
This will give you an RDD containing the NaviagableMap's that constitute the rows. Below is how you can change the NaviagbleMap to a normal Scala map of Strings:
...
.map(kv => (kv._1.get(), navMapToMap(kv._2.getMap)))
.map(kv => (Bytes.toString(kv._1), rowToStrMap(kv._2)))
def navMapToMap(navMap: HBaseRow): CFTimeseriesRow =
navMap.asScala.toMap.map(cf =>
(cf._1, cf._2.asScala.toMap.map(col =>
(col._1, col._2.asScala.toMap.map(elem => (elem._1.toLong, elem._2))))))
def rowToStrMap(navMap: CFTimeseriesRow): CFTimeseriesRowStr =
navMap.map(cf =>
(Bytes.toString(cf._1), cf._2.map(col =>
(Bytes.toString(col._1), col._2.map(elem => (elem._1, Bytes.toString(elem._2)))))))
Final point, if you really do want to try to perform random reads in parallel I believe you might be able to put the HBase table initialization inside the map
.
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…