From e17587c7c74b93878f67be70ea7523fe635ddbd6 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 10 Oct 2023 16:15:25 +0800 Subject: [PATCH] Expose preferred locations config (#2762) (#2765) --- .../main/scala/com/pingcap/tispark/TiConfigConst.scala | 2 ++ .../main/scala/com/pingcap/tispark/utils/TiUtil.scala | 5 +++++ .../main/scala/org/apache/spark/sql/tispark/TiRDD.scala | 9 +++++++-- docs/userguide_3.0.md | 1 + .../src/main/java/com/pingcap/tikv/TiConfiguration.java | 1 + .../com/pingcap/tikv/operation/iterator/DAGIterator.java | 5 +++++ 6 files changed, 21 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/com/pingcap/tispark/TiConfigConst.scala b/core/src/main/scala/com/pingcap/tispark/TiConfigConst.scala index 13edb589f1..916c1f98c7 100644 --- a/core/src/main/scala/com/pingcap/tispark/TiConfigConst.scala +++ b/core/src/main/scala/com/pingcap/tispark/TiConfigConst.scala @@ -103,4 +103,6 @@ object TiConfigConst { // health check timeout val GRPC_HEALTH_CHECK_TIMEOUT = "spark.tispark.grpc.health_check_timeout_in_ms" val GPRC_HEALTH_CHECK_PERIOD = "spark.tispark.grpc.health_check_period_in_ms" + // preferred locations + val PREFERRED_LOCATIONS = "spark.tispark.preferred_locations" } diff --git a/core/src/main/scala/com/pingcap/tispark/utils/TiUtil.scala b/core/src/main/scala/com/pingcap/tispark/utils/TiUtil.scala index 485608127d..e3a76f205f 100644 --- a/core/src/main/scala/com/pingcap/tispark/utils/TiUtil.scala +++ b/core/src/main/scala/com/pingcap/tispark/utils/TiUtil.scala @@ -267,6 +267,11 @@ object TiUtil { tiConf.setHealthCheckPeriod(conf .get(TiConfigConst.GPRC_HEALTH_CHECK_PERIOD, TiConfiguration.DEFHealthCheckPeriod.toString) .toInt) + + if (conf.contains(TiConfigConst.PREFERRED_LOCATIONS)) { + tiConf.setPreferredLocations(conf.get(TiConfigConst.PREFERRED_LOCATIONS)) + } + tiConf } diff --git a/core/src/main/scala/org/apache/spark/sql/tispark/TiRDD.scala b/core/src/main/scala/org/apache/spark/sql/tispark/TiRDD.scala index 5d474ef730..b2d997b6df 100644 --- a/core/src/main/scala/org/apache/spark/sql/tispark/TiRDD.scala +++ b/core/src/main/scala/org/apache/spark/sql/tispark/TiRDD.scala @@ -43,6 +43,7 @@ abstract class TiRDD( extends RDD[InternalRow](sparkSession.sparkContext, Nil) { private lazy val partitionPerSplit = tiConf.getPartitionPerSplit + private lazy val preferredLocations = tiConf.getPreferredLocations protected def checkTimezone(): Unit = { if (!tiConf.getLocalTimeZone.equals(Converter.getLocalTimezone)) { @@ -80,6 +81,10 @@ abstract class TiRDD( result.toArray } - override protected def getPreferredLocations(split: Partition): Seq[String] = - split.asInstanceOf[TiPartition].tasks.head.getHost :: Nil + override protected def getPreferredLocations(split: Partition): Seq[String] = { + if (preferredLocations.equalsIgnoreCase("host")) { + return split.asInstanceOf[TiPartition].tasks.head.getHost :: Nil + } + Nil + } } diff --git a/docs/userguide_3.0.md b/docs/userguide_3.0.md index ad6a7ebe4b..fbc598dd09 100644 --- a/docs/userguide_3.0.md +++ b/docs/userguide_3.0.md @@ -279,6 +279,7 @@ spark.sql("select t1.id,t2.id from spark_catalog.default.t t1 left join tidb_cat | `spark.tispark.load_tables` | true | (experimental) Whether load all tables when we reload catalog cache. Disable it may cause table not find in scenarios where the table changes frequently. | | `spark.tispark.grpc.health_check_timeout_in_ms` | 2000 | The timeout of health check for TiKV and TiFlash. | | `spark.tispark.grpc.health_check_period_in_ms` | 3000 | The period duration of health check. | +| `spark.tispark.preferred_locations` | "" | The preferred locations of TiRDD partitions in TiSpark. Only `host` is available now, which takes host as preferred locations. This configuration is for forward compatibility. | ### TLS Configuration diff --git a/tikv-client/src/main/java/com/pingcap/tikv/TiConfiguration.java b/tikv-client/src/main/java/com/pingcap/tikv/TiConfiguration.java index 2d472912b9..6b9b56498c 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/TiConfiguration.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/TiConfiguration.java @@ -192,6 +192,7 @@ public TiConfiguration setCertReloadIntervalInSeconds(String interval) { public static final int DEFHealthCheckPeriod = 3000; private int healthCheckTimeout = DEFHealthCheckTimeout; private int healthCheckPeriod = DEFHealthCheckPeriod; + private String preferredLocations = ""; private static Long getTimeAsSeconds(String key) { return Utils.timeStringAsSec(key); diff --git a/tikv-client/src/main/java/com/pingcap/tikv/operation/iterator/DAGIterator.java b/tikv-client/src/main/java/com/pingcap/tikv/operation/iterator/DAGIterator.java index adad6aebe9..6783a9e1fb 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/operation/iterator/DAGIterator.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/operation/iterator/DAGIterator.java @@ -264,6 +264,11 @@ private SelectResponse process(RegionTask regionTask) { } client.addResolvedLocks(startTs, resolvedLocks); + logger.info( + String.format( + "start coprocess request to %s in region %d with timeout %s", + task.getHost(), region.getId(), client.getTimeout())); + Collection tasks = client.coprocess(backOffer, dagRequest, ranges, responseQueue, startTs); if (tasks != null) {