diff --git a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java index bb3eb0d592..ad03fd6b14 100644 --- a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java +++ b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java @@ -27,6 +27,8 @@ import java.util.Optional; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.Set; +import java.util.stream.Collectors; import org.apache.samza.config.Config; import org.apache.samza.config.MapConfig; import org.apache.samza.job.model.ContainerModel; @@ -186,6 +188,22 @@ public void addProcessorStopEvents(List stopEventList) { * @param config the config to add. */ public void addConfig(Config config) { + + if (metricsHeader.isPortableJob().isPresent() && metricsHeader.isPortableJob().get()) { + + MapConfig cfg = new MapConfig(config); + + Set> entrySet = cfg.entrySet(); + if (entrySet.removeIf(entry + -> entry.getKey().contains("beamPipelineOptions") + || entry.getKey().contains("beamJobInfo") + || entry.getKey().contains("beamPipelinProto"))) { + LOG.debug("Removed serialized beam properties from portable job's config"); + } + + config = new MapConfig(entrySet.stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))); + } + addToMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, CONFIG_METRIC_NAME, (Map) config); }