Skip to content

Commit 404922f

Browse files
committed
Apply scalafmt changes
1 parent ec8bdae commit 404922f

File tree

102 files changed

+3962
-2245
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

102 files changed

+3962
-2245
lines changed

pom.xml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,29 @@
293293
</execution>
294294
</executions>
295295
</plugin>
296+
<plugin>
297+
<groupId>com.diffplug.spotless</groupId>
298+
<artifactId>spotless-maven-plugin</artifactId>
299+
<version>2.30.0</version>
300+
<configuration>
301+
<scala>
302+
<scalafmt>
303+
<version>3.7.17</version>
304+
<file>${project.basedir}/.scalafmt.conf</file>
305+
</scalafmt>
306+
</scala>
307+
</configuration>
308+
<executions>
309+
<execution>
310+
<!-- Runs in compile phase to fail fast in case of formatting issues.-->
311+
<id>spotless-check</id>
312+
<phase>compile</phase>
313+
<goals>
314+
<goal>check</goal>
315+
</goals>
316+
</execution>
317+
</executions>
318+
</plugin>
296319
</plugins>
297320
</build>
298321

src/main/scala/uk/co/gresearch/spark/dgraph/connector/ClusterState.scala

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,12 @@ import java.util.UUID
2323
import scala.jdk.CollectionConverters._
2424
import scala.util.{Failure, Success, Try}
2525

26-
case class ClusterState(groupMembers: Map[String, Set[Target]],
27-
groupPredicates: Map[String, Set[String]],
28-
maxUid: Option[UnsignedLong],
29-
cid: UUID)
26+
case class ClusterState(
27+
groupMembers: Map[String, Set[Target]],
28+
groupPredicates: Map[String, Set[String]],
29+
maxUid: Option[UnsignedLong],
30+
cid: UUID
31+
)
3032

3133
object ClusterState extends Logging {
3234

@@ -35,30 +37,38 @@ object ClusterState extends Logging {
3537
val groups = root.getAsJsonObject("groups")
3638
val groupMap =
3739
groups
38-
.entrySet().asScala
40+
.entrySet()
41+
.asScala
3942
.map(e => e.getKey -> e.getValue.getAsJsonObject)
4043
.toMap
4144

42-
val groupMembers = groupMap.map { case (key, group) => key -> getMembersFromGroup(group) }
45+
val groupMembers = groupMap
46+
.map { case (key, group) => key -> getMembersFromGroup(group) }
4347
.map { case (key, members) => key -> members.map(Target).map(t => t.withPort(t.port + 2000)) }
4448
val groupPredicates = groupMap.map { case (key, group) => key -> getPredicatesFromGroup(group) }
4549
val cid = UUID.fromString(root.getAsJsonPrimitive("cid").getAsString)
4650

4751
val maxUid = (if (root.has("maxUID")) {
48-
getUnsignedLongFromJson(root.getAsJsonPrimitive("maxUID"))
49-
} else if (root.has("maxLeaseId")) {
50-
getUnsignedLongFromJson(root.getAsJsonPrimitive("maxLeaseId"))
51-
} else {
52-
Failure(new IllegalArgumentException("Cluster state does not contain maxLeaseId or maxUID, this disables uid range partitioning"))
53-
}) match {
52+
getUnsignedLongFromJson(root.getAsJsonPrimitive("maxUID"))
53+
} else if (root.has("maxLeaseId")) {
54+
getUnsignedLongFromJson(root.getAsJsonPrimitive("maxLeaseId"))
55+
} else {
56+
Failure(
57+
new IllegalArgumentException(
58+
"Cluster state does not contain maxLeaseId or maxUID, this disables uid range partitioning"
59+
)
60+
)
61+
}) match {
5462
case Success(value) => Some(value)
5563
case Failure(exception) =>
5664
log.error("Failed to retrieve maxUID from cluster state", exception)
5765
None
5866
}
5967

6068
if (maxUid.exists(_.compareTo(UnsignedLong.ZERO) < 0)) {
61-
log.error(s"Cluster state indicates negative maxLeaseId or maxUID, this disables uid range partitioning: ${maxUid.get}")
69+
log.error(
70+
s"Cluster state indicates negative maxLeaseId or maxUID, this disables uid range partitioning: ${maxUid.get}"
71+
)
6272
}
6373
val positiveMaxUID = maxUid.filter(_.compareTo(UnsignedLong.ZERO) >= 0)
6474

src/main/scala/uk/co/gresearch/spark/dgraph/connector/ClusterStateProvider.scala

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,14 @@ trait ClusterStateProvider extends Logging {
2626
val clusterStates = targets.flatMap(target => getClusterState(target))
2727
val cids = clusterStates.map(_.cid).toSet
2828
if (cids.size > 1)
29-
throw new RuntimeException(s"Retrieved multiple cluster ids from " +
30-
s"Dgraph alphas (${targets.map(_.target).mkString(", ")}): ${cids.mkString(", ")}")
29+
throw new RuntimeException(
30+
s"Retrieved multiple cluster ids from " +
31+
s"Dgraph alphas (${targets.map(_.target).mkString(", ")}): ${cids.mkString(", ")}"
32+
)
3133
val clusterState = clusterStates.headOption.getOrElse(
32-
throw new RuntimeException(s"Could not retrieve cluster state from Dgraph alphas (${targets.map(_.target).mkString(", ")})")
34+
throw new RuntimeException(
35+
s"Could not retrieve cluster state from Dgraph alphas (${targets.map(_.target).mkString(", ")})"
36+
)
3337
)
3438

3539
// filter out reserved predicates as configured
@@ -42,16 +46,18 @@ trait ClusterStateProvider extends Logging {
4246
}
4347

4448
def getClusterState(target: Target): Option[ClusterState] = {
45-
val url = s"http://${target.withPort(target.port-1000).target}/state"
49+
val url = s"http://${target.withPort(target.port - 1000).target}/state"
4650
try {
4751
val startTs = Clock.systemUTC().instant().toEpochMilli
4852
val request = requests.get(url)
4953
val endTs = Clock.systemUTC().instant().toEpochMilli
5054
val json = Json(request.text())
5155

52-
log.info(s"retrieved cluster state from ${target.target} " +
53-
s"with ${json.string.getBytes.length} bytes " +
54-
s"in ${(endTs - startTs) / 1000.0}s")
56+
log.info(
57+
s"retrieved cluster state from ${target.target} " +
58+
s"with ${json.string.getBytes.length} bytes " +
59+
s"in ${(endTs - startTs) / 1000.0}s"
60+
)
5561
log.trace(s"retrieved cluster state: ${abbreviate(json.string)}")
5662

5763
if (request.statusCode == 200) {

src/main/scala/uk/co/gresearch/spark/dgraph/connector/DgraphReader.scala

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,39 +19,49 @@ package uk.co.gresearch.spark.dgraph.connector
1919
import org.apache.spark.sql.{DataFrame, DataFrameReader}
2020

2121
case class DgraphReader(reader: DataFrameReader) {
22+
2223
/**
23-
* Loads all triples of a Dgraph database into a DataFrame. Requires at least one target.
24-
* Use triples(targets.head, targets.tail: _*) if you want to provide a Seq[String].
24+
* Loads all triples of a Dgraph database into a DataFrame. Requires at least one target. Use triples(targets.head,
25+
* targets.tail: _*) if you want to provide a Seq[String].
2526
*
26-
* @param target a target
27-
* @param targets more targets
28-
* @return triples DataFrame
27+
* @param target
28+
* a target
29+
* @param targets
30+
* more targets
31+
* @return
32+
* triples DataFrame
2933
*/
3034
def triples(target: String, targets: String*): DataFrame =
3135
reader
3236
.format(TriplesSource)
3337
.load(Seq(target) ++ targets: _*)
3438

3539
/**
36-
* Loads all edges of a Dgraph database into a DataFrame. Requires at least one target.
37-
* Use edges(targets.head, targets.tail: _*) if want to provide a Seq[String].
40+
* Loads all edges of a Dgraph database into a DataFrame. Requires at least one target. Use edges(targets.head,
41+
* targets.tail: _*) if want to provide a Seq[String].
3842
*
39-
* @param target a target
40-
* @param targets more targets
41-
* @return edges DataFrame
43+
* @param target
44+
* a target
45+
* @param targets
46+
* more targets
47+
* @return
48+
* edges DataFrame
4249
*/
4350
def edges(target: String, targets: String*): DataFrame =
4451
reader
4552
.format(EdgesSource)
4653
.load(Seq(target) ++ targets: _*)
4754

4855
/**
49-
* Loads all nodes of a Dgraph database into a DataFrame. Requires at least one target.
50-
* Use nodes(targets.head, targets.tail: _*) if you want to provide a Seq[String].
56+
* Loads all nodes of a Dgraph database into a DataFrame. Requires at least one target. Use nodes(targets.head,
57+
* targets.tail: _*) if you want to provide a Seq[String].
5158
*
52-
* @param target a target
53-
* @param targets more targets
54-
* @return nodes DataFrame
59+
* @param target
60+
* a target
61+
* @param targets
62+
* more targets
63+
* @return
64+
* nodes DataFrame
5565
*/
5666
def nodes(target: String, targets: String*): DataFrame =
5767
reader

src/main/scala/uk/co/gresearch/spark/dgraph/connector/Filter.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,10 @@ case class PredicateNameIs(name: String) extends PredicateNameIsIn(Set(name))
5454

5555
// PredicateValueIsIn comes with two semantics: one that has to be intersected and one that cannot
5656
abstract class PredicateValueIsIn(val names: Set[String], val values: Set[Any]) extends Filter
57-
case class IntersectPredicateValueIsIn(override val names: Set[String], override val values: Set[Any]) extends PredicateValueIsIn(names, values)
58-
case class SinglePredicateValueIsIn(name: String, override val values: Set[Any]) extends PredicateValueIsIn(Set(name), values)
57+
case class IntersectPredicateValueIsIn(override val names: Set[String], override val values: Set[Any])
58+
extends PredicateValueIsIn(names, values)
59+
case class SinglePredicateValueIsIn(name: String, override val values: Set[Any])
60+
extends PredicateValueIsIn(Set(name), values)
5961

6062
case class ObjectTypeIsIn(types: Set[String]) extends Filter
6163
case class ObjectValueIsIn(values: Set[Any]) extends Filter

0 commit comments

Comments
 (0)