Skip to content

Commit 3406627

Browse files
committed
feedback changes
1 parent bd496bd commit 3406627

File tree

5 files changed

+94
-17
lines changed

5 files changed

+94
-17
lines changed

java/hsfs/src/main/java/com/logicalclocks/hsfs/JobConfiguration.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package com.logicalclocks.hsfs;
1919

20+
import java.util.HashMap;
21+
import java.util.Map;
22+
2023
import com.google.common.base.Strings;
2124

2225
import lombok.AllArgsConstructor;
@@ -28,8 +31,12 @@
2831
@AllArgsConstructor
2932
public class JobConfiguration {
3033

34+
private static Map<String, String> DEFAULT_PROPERTIES = new HashMap<String, String>() {{
35+
put("spark.yarn.maxAppAttempts", "2");
36+
}};
37+
38+
3139
@Getter
32-
@Setter
3340
private String type = "sparkJobConfiguration";
3441

3542

@@ -73,11 +80,14 @@ public class JobConfiguration {
7380
private String properties;
7481

7582
public String getProperties() {
76-
String defaultProperty = "spark.yarn.maxAppAttempts=2";
77-
if (Strings.isNullOrEmpty(properties)) {
78-
properties = defaultProperty;
79-
} else {
80-
properties = properties + (properties.contains(defaultProperty.split("=")[0]) ? "" : "\n" + defaultProperty);
83+
// Add default properties to the properties
84+
for (Map.Entry<String, String> entry : JobConfiguration.DEFAULT_PROPERTIES.entrySet()) {
85+
String defaultProperty = entry.getKey() + "=" + entry.getValue();
86+
if (Strings.isNullOrEmpty(properties)) {
87+
properties = defaultProperty;
88+
} else if (!properties.contains(entry.getKey())) {
89+
properties = properties + "\n" + defaultProperty;
90+
}
8191
}
8292

8393
return properties;
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright (c) 2025. Hopsworks AB
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
*
14+
* See the License for the specific language governing permissions and limitations under the License.
15+
*
16+
*/
17+
18+
package com.logicalclocks.hsfs;
19+
20+
import org.junit.Assert;
21+
import org.junit.jupiter.api.Test;
22+
23+
import java.io.IOException;
24+
25+
26+
class TestJobConfiguration {
27+
@Test
28+
void testGetProperties() throws FeatureStoreException, IOException {
29+
// Arrange
30+
JobConfiguration jobConfiguration = new JobConfiguration(null, 1, 2, 3, 4, 0, false, 0, 0, 0, null);
31+
32+
// Act
33+
String result = jobConfiguration.getProperties();
34+
35+
// Assert
36+
Assert.assertEquals("spark.yarn.maxAppAttempts=2", result);
37+
}
38+
39+
@Test
40+
void testGetPropertiesProvidedProperties() throws FeatureStoreException, IOException {
41+
// Arrange
42+
JobConfiguration jobConfiguration = new JobConfiguration(null, 1, 2, 3, 4, 0, false, 0, 0, 0, "spark.test=xxx");
43+
44+
// Act
45+
String result = jobConfiguration.getProperties();
46+
47+
// Assert
48+
Assert.assertEquals("spark.test=xxx\nspark.yarn.maxAppAttempts=2", result);
49+
}
50+
51+
@Test
52+
void testGetPropertiesProvidedPropertiesOverlapDefault() throws FeatureStoreException, IOException {
53+
// Arrange
54+
JobConfiguration jobConfiguration = new JobConfiguration(null, 1, 2, 3, 4, 0, false, 0, 0, 0, "spark.yarn.maxAppAttempts=9");
55+
56+
// Act
57+
String result = jobConfiguration.getProperties();
58+
59+
// Assert
60+
Assert.assertEquals("spark.yarn.maxAppAttempts=9", result);
61+
}
62+
}

python/hsfs/core/ingestion_job_conf.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ def __init__(
2828
self._data_format = data_format
2929
self._data_options = data_options
3030
self._write_options = write_options
31-
self._spark_job_configuration = spark_job_configuration
31+
32+
self._spark_job_configuration = JobConfiguration(**spark_job_configuration if spark_job_configuration else {}).to_dict(),
3233

3334
@property
3435
def data_format(self):
@@ -74,5 +75,5 @@ def to_dict(self):
7475
]
7576
if self._write_options
7677
else None,
77-
JobConfiguration.DTO_TYPE: JobConfiguration(**self._spark_job_configuration if self._spark_job_configuration else {}).to_dict(),
78+
JobConfiguration.DTO_TYPE: self._spark_job_configuration,
7879
}

python/hsfs/core/job_configuration.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
class JobConfiguration:
2424
DTO_TYPE = "sparkJobConfiguration"
25+
DEFAULT_PROPERTIES = {"spark.yarn.maxAppAttempts": 2}
2526

2627
def __init__(
2728
self,
@@ -33,7 +34,7 @@ def __init__(
3334
dynamic_allocation=True,
3435
dynamic_min_executors=1,
3536
dynamic_max_executors=2,
36-
properties=None,
37+
properties="",
3738
**kwargs,
3839
):
3940
self._am_memory = am_memory
@@ -44,15 +45,17 @@ def __init__(
4445
self._dynamic_allocation = dynamic_allocation
4546
self._dynamic_min_executors = dynamic_min_executors
4647
self._dynamic_max_executors = dynamic_max_executors
47-
self._properties = properties
4848

49-
def to_dict(self):
50-
default_property = "spark.yarn.maxAppAttempts=2"
51-
if not self._properties:
52-
self._properties = default_property
49+
# Add default properties to the properties
50+
default_properties = "\n".join([
51+
f"{key}={value}" if not properties or key not in properties else ""
52+
for key, value in JobConfiguration.DEFAULT_PROPERTIES.items()])
53+
if properties:
54+
self._properties = properties + ("\n" + default_properties if default_properties else "")
5355
else:
54-
self._properties = self._properties + (f"\n{default_property}" if default_property.split("=")[0] not in self._properties else "")
56+
self._properties = default_properties
5557

58+
def to_dict(self):
5659
return {
5760
"amMemory": self._am_memory,
5861
"amCores": self._am_cores,

python/hsfs/core/training_dataset_job_conf.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ def __init__(self, query, overwrite, write_options, spark_job_configuration):
2626
self._query = query
2727
self._overwrite = overwrite
2828
self._write_options = write_options
29-
self._spark_job_configuration = spark_job_configuration
29+
30+
self._spark_job_configuration = JobConfiguration(**spark_job_configuration if spark_job_configuration else {}).to_dict(),
3031

3132
@property
3233
def query(self):
@@ -72,5 +73,5 @@ def to_dict(self):
7273
]
7374
if self._write_options
7475
else None,
75-
JobConfiguration.DTO_TYPE: JobConfiguration(**self._spark_job_configuration if self._spark_job_configuration else {}).to_dict(),
76+
JobConfiguration.DTO_TYPE: self._spark_job_configuration,
7677
}

0 commit comments

Comments
 (0)