Skip to content

Commit

Permalink
Add statistics information module (pingcap#244)
Browse files Browse the repository at this point in the history
  • Loading branch information
Novemser authored and ilovesoup committed Mar 9, 2018
1 parent c1ee0bd commit d0a6024
Show file tree
Hide file tree
Showing 29 changed files with 1,590 additions and 62 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ Below configurations can be put together with spark-defaults.conf or passed in t
| set |
| year |

## Statistics information
If you want to know how TiSpark could benefit from TiDB's statistic information, read more [here](./docs/userguide.md).

## Quick start

Read the [Quick Start](./docs/userguide.md).
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/com/pingcap/tispark/TiConfigConst.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,6 @@ object TiConfigConst {
val REGION_INDEX_SCAN_DOWNGRADE_THRESHOLD: String = "spark.tispark.plan.downgrade.index_threshold"
val KV_TIMEZONE_OFFSET: String = "spark.tispark.request.timezone.offset"
val UNSUPPORTED_TYPES: String = "spark.tispark.type.unsupported_mysql_types"
val ENABLE_AUTO_LOAD_STATISTICS: String = "spark.tispark.statistics.auto_load"
val CACHE_EXPIRE_AFTER_ACCESS: String = "spark.tispark.statistics.expire_after_access"
}
3 changes: 3 additions & 0 deletions core/src/main/scala/com/pingcap/tispark/TiDBRelation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package com.pingcap.tispark
import com.pingcap.tikv.TiSession
import com.pingcap.tikv.exception.TiClientInternalException
import com.pingcap.tikv.meta.{TiDAGRequest, TiTableInfo, TiTimestamp}
import com.pingcap.tispark.statistics.StatisticsManager
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.{Attribute, NamedExpression}
Expand All @@ -36,6 +37,8 @@ class TiDBRelation(session: TiSession, tableRef: TiTableReference, meta: MetaMan

override lazy val schema: StructType = TiUtils.getSchemaFromTable(table)

override def sizeInBytes: Long = tableRef.sizeInBytes

def logicalPlanToRDD(dagRequest: TiDAGRequest): TiRDD = {
val ts: TiTimestamp = session.getTimestamp
dagRequest.setStartTs(ts.getVersion)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,7 @@

package com.pingcap.tispark

class TiTableReference(val databaseName: String, val tableName: String) extends Serializable
class TiTableReference(val databaseName: String,
val tableName: String,
val sizeInBytes: Long = Long.MaxValue)
extends Serializable
2 changes: 2 additions & 0 deletions core/src/main/scala/com/pingcap/tispark/TiUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import com.pingcap.tikv.region.RegionStoreClient.RequestTypes
import com.pingcap.tikv.types._
import com.pingcap.tikv.{TiConfiguration, TiSession}
import com.pingcap.tispark.listener.CacheListenerManager
import com.pingcap.tispark.statistics.StatisticsManager
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, Literal, NamedExpression}
import org.apache.spark.sql.types.{DataType, DataTypes, MetadataBuilder, StructField, StructType}
Expand Down Expand Up @@ -219,5 +220,6 @@ object TiUtils {
session.udf.register("ti_version", () => TiSparkVersion.version)
CacheListenerManager.initCacheListener(session.sparkContext, tiSession.getRegionManager)
tiSession.injectCallBackFunc(CacheListenerManager.CACHE_ACCUMULATOR_FUNCTION)
StatisticsManager.initStatisticsManager(tiSession, session)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/*
*
* Copyright 2017 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package com.pingcap.tispark.statistics

import com.pingcap.tikv.expression.{ByItem, ColumnRef, ComparisonBinaryExpression, Constant}
import com.pingcap.tikv.key.{Key, TypedKey}
import com.pingcap.tikv.meta.TiDAGRequest.PushDownType
import com.pingcap.tikv.meta.{TiDAGRequest, TiTableInfo}
import com.pingcap.tikv.row.Row
import com.pingcap.tikv.statistics._
import com.pingcap.tikv.types.{DataType, DataTypeFactory, MySQLType}
import org.slf4j.LoggerFactory

import scala.collection.JavaConversions._
import scala.collection.mutable

object StatisticsHelper {
private final lazy val logger = LoggerFactory.getLogger(getClass.getName)

def extractStatisticsDTO(row: Row,
table: TiTableInfo,
loadAll: Boolean,
neededColIds: mutable.ArrayBuffer[Long]): StatisticsDTO = {
val isIndex = if (row.getLong(1) > 0) true else false
val histID = row.getLong(2)
val distinct = row.getLong(3)
val histVer = row.getLong(4)
val nullCount = row.getLong(5)
val cMSketch = row.getBytes(6)
val indexInfos = table.getIndices.filter(_.getId == histID)
val colInfos = table.getColumns.filter(_.getId == histID)
var needed = true

// we should only query those columns that user specified before
if (!loadAll && !neededColIds.contains(histID)) needed = false

var indexFlag = 1
var dataType: DataType = DataTypeFactory.of(MySQLType.TypeBlob)
// Columns info found
if (!isIndex && colInfos.nonEmpty) {
indexFlag = 0
dataType = colInfos.head.getType
} else if (!isIndex || indexInfos.isEmpty) {
logger.error(
s"We cannot find histogram id $histID in table info ${table.getName} now. It may be deleted."
)
needed = false
}

if (needed) {
StatisticsDTO(
histID,
indexFlag,
distinct,
histVer,
nullCount,
dataType,
cMSketch,
if (indexInfos.nonEmpty) indexInfos.head else null,
if (colInfos.nonEmpty) colInfos.head else null
)
} else {
null
}
}

def extractStatisticResult(histId: Long,
rows: Iterator[Row],
requests: Seq[StatisticsDTO]): StatisticsResult = {
val matches = requests.filter(_.colId == histId)
if (matches.nonEmpty) {
val matched = matches.head
var totalCount: Long = 0
val buckets = mutable.ArrayBuffer[Bucket]()
while (rows.hasNext) {
val row = rows.next()
val count = row.getLong(0)
val repeats = row.getLong(1)
var lowerBound: Key = null
var upperBound: Key = null
// all bounds are stored as blob in bucketTable currently, decode using blob type
lowerBound = TypedKey.toTypedKey(row.getBytes(2), DataTypeFactory.of(MySQLType.TypeBlob))
upperBound = TypedKey.toTypedKey(row.getBytes(3), DataTypeFactory.of(MySQLType.TypeBlob))
totalCount += count
buckets += new Bucket(totalCount, repeats, lowerBound, upperBound)
}
// create histogram for column `colId`
val histogram = Histogram
.newBuilder()
.setId(matched.colId)
.setNDV(matched.distinct)
.setNullCount(matched.nullCount)
.setLastUpdateVersion(matched.version)
.setBuckets(buckets)
.build()
// parse CMSketch
val rawData = matched.rawCMSketch
val cMSketch = if (rawData == null || rawData.length <= 0) {
null
} else {
val sketch = com.pingcap.tidb.tipb.CMSketch.parseFrom(rawData)
val result =
CMSketch.newCMSketch(sketch.getRowsCount, sketch.getRows(0).getCountersCount)
for (i <- 0 until sketch.getRowsCount) {
val row = sketch.getRows(i)
result.setCount(0)
for (j <- 0 until row.getCountersCount) {
val counter = row.getCounters(j)
result.getTable()(i)(j) = counter
result.setCount(result.getCount + counter)
}
}
result
}
StatisticsResult(histId, histogram, cMSketch, matched.idxInfo, matched.colInfo)
} else {
null
}
}

def buildHistogramsRequest(histTable: TiTableInfo,
targetTblId: Long,
startTs: Long): TiDAGRequest = {
TiDAGRequest.Builder
.newBuilder()
.setFullTableScan(histTable)
.addFilter(
ComparisonBinaryExpression
.equal(ColumnRef.create("table_id"), Constant.create(targetTblId))
)
.addRequiredCols(
"table_id",
"is_index",
"hist_id",
"distinct_count",
"version",
"null_count",
"cm_sketch"
)
.setStartTs(startTs)
.build(PushDownType.NORMAL)
}

def buildMetaRequest(metaTable: TiTableInfo, targetTblId: Long, startTs: Long): TiDAGRequest = {
TiDAGRequest.Builder
.newBuilder()
.setFullTableScan(metaTable)
.addFilter(
ComparisonBinaryExpression
.equal(ColumnRef.create("table_id"), Constant.create(targetTblId))
)
.addRequiredCols("table_id", "count", "modify_count", "version")
.setStartTs(startTs)
.build(PushDownType.NORMAL)
}

def buildBucketRequest(bucketTable: TiTableInfo, targetTblId: Long, startTs: Long): TiDAGRequest = {
TiDAGRequest.Builder
.newBuilder()
.setFullTableScan(bucketTable)
.addFilter(
ComparisonBinaryExpression
.equal(ColumnRef.create("table_id"), Constant.create(targetTblId))
)
.setLimit(Int.MaxValue)
.addOrderBy(ByItem.create(ColumnRef.create("bucket_id"), false))
.addRequiredCols(
"count",
"repeats",
"lower_bound",
"upper_bound",
"bucket_id",
"table_id",
"is_index",
"hist_id"
)
.setStartTs(startTs)
.build(PushDownType.NORMAL)
}
}
Loading

0 comments on commit d0a6024

Please sign in to comment.