Skip to content

Commit

Permalink
Remove beam-portable-pipeline fields from config inside DiagnosticMes…
Browse files Browse the repository at this point in the history
  • Loading branch information
Aman Singh authored Feb 13, 2024
1 parent 4243af0 commit 366fb4b
Showing 1 changed file with 18 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.Optional;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.job.model.ContainerModel;
Expand Down Expand Up @@ -186,6 +188,22 @@ public void addProcessorStopEvents(List<ProcessorStopEvent> stopEventList) {
* @param config the config to add.
*/
public void addConfig(Config config) {

if (metricsHeader.isPortableJob().isPresent() && metricsHeader.isPortableJob().get()) {

MapConfig cfg = new MapConfig(config);

Set<Map.Entry<String, String>> entrySet = cfg.entrySet();
if (entrySet.removeIf(entry
-> entry.getKey().contains("beamPipelineOptions")
|| entry.getKey().contains("beamJobInfo")
|| entry.getKey().contains("beamPipelinProto"))) {
LOG.debug("Removed serialized beam properties from portable job's config");
}

config = new MapConfig(entrySet.stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
}

addToMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, CONFIG_METRIC_NAME, (Map<String, String>) config);
}

Expand Down

0 comments on commit 366fb4b

Please sign in to comment.