1
Closed this issue · 0 comments
//变化原因在于0.98和1.0.0接口变化,如下:
//===========================
//第一:社区0.98代码地址:
//https://github.com/apache/hbase/blob/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
//社区0.98InternalScanner.java下boolean next(List result, int limit) throws IOException;
//社区0.98RegionScanner.java下:没有int getBatch(),且boolean nextRaw(List result, int limit) throws IOException;
//===========================
//===========================
//第三:社区branch1代码地址:(master代码也一样)
//https://github.com/apache/hbase/blob/branch-1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
//社区branch1中InternalScanner.java下:boolean next(List result, ScannerContext scannerContext) throws IOException;
//社区branch1中RegionScanner.java下:int getBatch()以及boolean nextRaw(List result, ScannerContext scannerContext) throws IOException;
//=============================
package org.apache.spark.sql.hbase
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.coprocessor._
import org.apache.hadoop.hbase.regionserver._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.log4j.Logger
import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
import org.apache.spark.sql.hbase.util.DataTypeUtils
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLContext}
/**
-
HBaseCoprocessorSQLReaderRDD:
*/
class HBaseCoprocessorSQLReaderRDD(var relation: HBaseRelation,
val codegenEnabled: Boolean,
var finalOutput: Seq[Attribute],
var otherFilters: Option[Expression],
@transient sqlContext: SQLContext)
extends RDD[Row](sqlContext.sparkContext, Nil) with Logging {@transient var scanner: RegionScanner = _
private def createIterator(context: TaskContext): Iterator[Row] = {
val otherFilter: (Row) => Boolean = {
if (otherFilters.isDefined) {
if (codegenEnabled) {
GeneratePredicate.generate(otherFilters.get, finalOutput)
} else {
InterpretedPredicate.create(otherFilters.get, finalOutput)
}
} else null
}val projections = finalOutput.zipWithIndex
var finished: Boolean = false
var gotNext: Boolean = false
val results: java.util.ArrayList[Cell] = new java.util.ArrayListCell
val row = new GenericMutableRow(finalOutput.size)val iterator = new Iterator[Row] {
override def hasNext: Boolean = {
if (!finished) {
if (!gotNext) {
results.clear()
scanner.nextRaw(results)
finished = results.isEmpty
gotNext = true
}
}
if (finished) {
close()
}
!finished
}override def next(): Row = { if (hasNext) { gotNext = false relation.buildRowInCoprocessor(projections, results, row) } else { null } } def close() = { try { scanner.close() relation.closeHTable() } catch { case e: Exception => logWarning("Exception in scanner.close", e) } }
}
if (otherFilter == null) {
new InterruptibleIterator(context, iterator)
} else {
new InterruptibleIterator(context, iterator.filter(otherFilter))
}
}override def getPartitions: Array[Partition] = {
Array()
}override def compute(split: Partition, context: TaskContext): Iterator[Row] = {
scanner = split.asInstanceOf[HBasePartition].newScanner
createIterator(context)
}
}
abstract class BaseRegionScanner extends RegionScanner {
override def getBatch={0} //实现这个从接口中继承的函数
//新版本的hbase中在RegionScanner中添加了一个这样一个函数:int getBatch();
//但是这个函数在Astro继承过来之后没有用,而实例化后面的类的时候又不得不实现继承过来的函数
//所以仅是实现它防止编译报错,在Astro中并没有作用。
override def isFilterDone = false
override def next(result: java.util.List[Cell], scannerContext: ScannerContext)= next(result)//这里把limit: Int=>scannerContext: ScannerContext
//next函数上层继承自hbase的RegionScanner,再上层继承自InternalScanner
//在0.98版本中next函数boolean next(List result, int limit) throws IOException;
//在新版本中boolean next(List result, ScannerContext scannerContext) throws IOException;
override def reseek(row: Array[Byte]) = throw new DoNotRetryIOException("Unsupported")
override def getMvccReadPoint = Long.MaxValue
override def nextRaw(result: java.util.List[Cell]) = next(result)
override def nextRaw(result: java.util.List[Cell], scannerContext: ScannerContext) = next(result, scannerContext)//这里把limit: Int=>scannerContext: ScannerContext
//这里对比两个版本HBase中RegionScanner的区别:
//0.98版HBase这里定义为boolean nextRaw(List result, int limit) throws IOException;
//而新版这里定义为 boolean nextRaw(List result, ScannerContext scannerContext) throws IOException;
}
class SparkSqlRegionObserver extends BaseRegionObserver {
lazy val logger = Logger.getLogger(getClass.getName)
lazy val EmptyArray = ArrayByte
override def postScannerOpen(e: ObserverContext[RegionCoprocessorEnvironment],
scan: Scan,
s: RegionScanner) = {
val serializedPartitionIndex = scan.getAttribute(CoprocessorConstants.COINDEX)
if (serializedPartitionIndex == null) {
logger.debug("Work without coprocessor")
super.postScannerOpen(e, scan, s)
} else {
logger.debug("Work with coprocessor")
val partitionIndex: Int = Bytes.toInt(serializedPartitionIndex)
val serializedOutputDataType = scan.getAttribute(CoprocessorConstants.COTYPE)
val outputDataType: Seq[DataType] =
HBaseSerializer.deserialize(serializedOutputDataType).asInstanceOf[Seq[DataType]]
val serializedRDD = scan.getAttribute(CoprocessorConstants.COKEY)
val subPlanRDD: RDD[Row] = HBaseSerializer.deserialize(serializedRDD).asInstanceOf[RDD[Row]]
val taskParaInfo = scan.getAttribute(CoprocessorConstants.COTASK)
val (stageId, partitionId, taskAttemptId, attemptNumber) =
HBaseSerializer.deserialize(taskParaInfo).asInstanceOf[(Int, Int, Long, Int)]
val taskContext = new TaskContextImpl(
stageId, partitionId, taskAttemptId, attemptNumber, null, false, new TaskMetrics)
val regionInfo = s.getRegionInfo
val startKey = if (regionInfo.getStartKey.isEmpty) None else Some(regionInfo.getStartKey)
val endKey = if (regionInfo.getEndKey.isEmpty) None else Some(regionInfo.getEndKey)
val result = subPlanRDD.compute(
new HBasePartition(partitionIndex, partitionIndex, startKey, endKey, newScanner = s),
taskContext)
new BaseRegionScanner() {
override def getRegionInfo: HRegionInfo = regionInfo
override def getMaxResultSize: Long = s.getMaxResultSize
override def close(): Unit = s.close()
override def next(results: java.util.List[Cell]): Boolean = {
val hasMore: Boolean = result.hasNext
if (hasMore) {
val nextRow = result.next()
val numOfCells = outputDataType.length
for (i <- 0 until numOfCells) {
val data = nextRow(i)
val dataType = outputDataType(i)
val dataOfBytes: HBaseRawType = {
if (data == null) null else DataTypeUtils.dataToBytes(data, dataType)
}
results.add(new KeyValue(EmptyArray, EmptyArray, EmptyArray, dataOfBytes))
}
}
hasMore
}
}
}
}
}