Huawei-Spark/Spark-SQL-on-HBase

1

Closed this issue · 0 comments

//变化原因在于0.98和1.0.0接口变化,如下:
//===========================
//第一:社区0.98代码地址:
//https://github.com/apache/hbase/blob/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
//社区0.98InternalScanner.java下boolean next(List result, int limit) throws IOException;
//社区0.98RegionScanner.java下:没有int getBatch(),且boolean nextRaw(List result, int limit) throws IOException;
//===========================

//===========================
//第三:社区branch1代码地址:(master代码也一样)
//https://github.com/apache/hbase/blob/branch-1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
//社区branch1中InternalScanner.java下:boolean next(List result, ScannerContext scannerContext) throws IOException;
//社区branch1中RegionScanner.java下:int getBatch()以及boolean nextRaw(List result, ScannerContext scannerContext) throws IOException;
//=============================

package org.apache.spark.sql.hbase

import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.coprocessor._
import org.apache.hadoop.hbase.regionserver._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.log4j.Logger
import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
import org.apache.spark.sql.hbase.util.DataTypeUtils
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLContext}

/**

  • HBaseCoprocessorSQLReaderRDD:
    */
    class HBaseCoprocessorSQLReaderRDD(var relation: HBaseRelation,
    val codegenEnabled: Boolean,
    var finalOutput: Seq[Attribute],
    var otherFilters: Option[Expression],
    @transient sqlContext: SQLContext)
    extends RDD[Row](sqlContext.sparkContext, Nil) with Logging {

    @transient var scanner: RegionScanner = _

    private def createIterator(context: TaskContext): Iterator[Row] = {
    val otherFilter: (Row) => Boolean = {
    if (otherFilters.isDefined) {
    if (codegenEnabled) {
    GeneratePredicate.generate(otherFilters.get, finalOutput)
    } else {
    InterpretedPredicate.create(otherFilters.get, finalOutput)
    }
    } else null
    }

    val projections = finalOutput.zipWithIndex
    var finished: Boolean = false
    var gotNext: Boolean = false
    val results: java.util.ArrayList[Cell] = new java.util.ArrayListCell
    val row = new GenericMutableRow(finalOutput.size)

    val iterator = new Iterator[Row] {
    override def hasNext: Boolean = {
    if (!finished) {
    if (!gotNext) {
    results.clear()
    scanner.nextRaw(results)
    finished = results.isEmpty
    gotNext = true
    }
    }
    if (finished) {
    close()
    }
    !finished
    }

    override def next(): Row = {
      if (hasNext) {
        gotNext = false
        relation.buildRowInCoprocessor(projections, results, row)
      } else {
        null
      }
    }
    
    def close() = {
      try {
        scanner.close()
        relation.closeHTable()
      } catch {
        case e: Exception => logWarning("Exception in scanner.close", e)
      }
    }
    

    }

    if (otherFilter == null) {
    new InterruptibleIterator(context, iterator)
    } else {
    new InterruptibleIterator(context, iterator.filter(otherFilter))
    }
    }

    override def getPartitions: Array[Partition] = {
    Array()
    }

    override def compute(split: Partition, context: TaskContext): Iterator[Row] = {
    scanner = split.asInstanceOf[HBasePartition].newScanner
    createIterator(context)
    }
    }

abstract class BaseRegionScanner extends RegionScanner {
override def getBatch={0} //实现这个从接口中继承的函数
//新版本的hbase中在RegionScanner中添加了一个这样一个函数:int getBatch();
//但是这个函数在Astro继承过来之后没有用,而实例化后面的类的时候又不得不实现继承过来的函数
//所以仅是实现它防止编译报错,在Astro中并没有作用。
override def isFilterDone = false

override def next(result: java.util.List[Cell], scannerContext: ScannerContext)= next(result)//这里把limit: Int=>scannerContext: ScannerContext
//next函数上层继承自hbase的RegionScanner,再上层继承自InternalScanner
//在0.98版本中next函数boolean next(List result, int limit) throws IOException;
//在新版本中boolean next(List result, ScannerContext scannerContext) throws IOException;

override def reseek(row: Array[Byte]) = throw new DoNotRetryIOException("Unsupported")

override def getMvccReadPoint = Long.MaxValue

override def nextRaw(result: java.util.List[Cell]) = next(result)

override def nextRaw(result: java.util.List[Cell], scannerContext: ScannerContext) = next(result, scannerContext)//这里把limit: Int=>scannerContext: ScannerContext
//这里对比两个版本HBase中RegionScanner的区别:
//0.98版HBase这里定义为boolean nextRaw(List result, int limit) throws IOException;
//而新版这里定义为 boolean nextRaw(List result, ScannerContext scannerContext) throws IOException;
}

class SparkSqlRegionObserver extends BaseRegionObserver {
lazy val logger = Logger.getLogger(getClass.getName)
lazy val EmptyArray = ArrayByte

override def postScannerOpen(e: ObserverContext[RegionCoprocessorEnvironment],
scan: Scan,
s: RegionScanner) = {
val serializedPartitionIndex = scan.getAttribute(CoprocessorConstants.COINDEX)
if (serializedPartitionIndex == null) {
logger.debug("Work without coprocessor")
super.postScannerOpen(e, scan, s)
} else {
logger.debug("Work with coprocessor")
val partitionIndex: Int = Bytes.toInt(serializedPartitionIndex)
val serializedOutputDataType = scan.getAttribute(CoprocessorConstants.COTYPE)
val outputDataType: Seq[DataType] =
HBaseSerializer.deserialize(serializedOutputDataType).asInstanceOf[Seq[DataType]]

  val serializedRDD = scan.getAttribute(CoprocessorConstants.COKEY)
  val subPlanRDD: RDD[Row] = HBaseSerializer.deserialize(serializedRDD).asInstanceOf[RDD[Row]]

  val taskParaInfo = scan.getAttribute(CoprocessorConstants.COTASK)
  val (stageId, partitionId, taskAttemptId, attemptNumber) =
    HBaseSerializer.deserialize(taskParaInfo).asInstanceOf[(Int, Int, Long, Int)]
  val taskContext = new TaskContextImpl(
    stageId, partitionId, taskAttemptId, attemptNumber, null, false, new TaskMetrics)

  val regionInfo = s.getRegionInfo
  val startKey = if (regionInfo.getStartKey.isEmpty) None else Some(regionInfo.getStartKey)
  val endKey = if (regionInfo.getEndKey.isEmpty) None else Some(regionInfo.getEndKey)

  val result = subPlanRDD.compute(
    new HBasePartition(partitionIndex, partitionIndex, startKey, endKey, newScanner = s),
    taskContext)

  new BaseRegionScanner() {
    override def getRegionInfo: HRegionInfo = regionInfo

    override def getMaxResultSize: Long = s.getMaxResultSize

    override def close(): Unit = s.close()

    override def next(results: java.util.List[Cell]): Boolean = {
      val hasMore: Boolean = result.hasNext
      if (hasMore) {
        val nextRow = result.next()
        val numOfCells = outputDataType.length
        for (i <- 0 until numOfCells) {
          val data = nextRow(i)
          val dataType = outputDataType(i)
          val dataOfBytes: HBaseRawType = {
            if (data == null) null else DataTypeUtils.dataToBytes(data, dataType)
          }
          results.add(new KeyValue(EmptyArray, EmptyArray, EmptyArray, dataOfBytes))
        }
      }
      hasMore
    }
  }
}

}
}