Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand Down Expand Up @@ -100,6 +101,7 @@ abstract class AbstractHollowProducer {
private final boolean focusHoleFillInFewestShards;
private final boolean allowTypeResharding;
private final boolean forceCoverageOfTypeResharding; // exercise re-sharding often (for testing)
private final Supplier<Boolean> ignoreSoftLimits;

@Deprecated
public AbstractHollowProducer(
Expand All @@ -110,7 +112,7 @@ public AbstractHollowProducer(
new VersionMinterWithCounter(), null, 0,
DEFAULT_TARGET_MAX_TYPE_SHARD_SIZE, false, false, false, null,
new DummyBlobStorageCleaner(), new BasicSingleProducerEnforcer(),
null, true, HollowConsumer.UpdatePlanBlobVerifier.DEFAULT_INSTANCE);
null, true, HollowConsumer.UpdatePlanBlobVerifier.DEFAULT_INSTANCE, null);
}

// The only constructor should be that which accepts a builder
Expand All @@ -123,7 +125,7 @@ public AbstractHollowProducer(
b.numStatesBetweenSnapshots, b.targetMaxTypeShardSize, b.focusHoleFillInFewestShards,
b.allowTypeResharding, b.forceCoverageOfTypeResharding,
b.metricsCollector, b.blobStorageCleaner, b.singleProducerEnforcer,
b.hashCodeFinder, b.doIntegrityCheck, b.updatePlanBlobVerifier);
b.hashCodeFinder, b.doIntegrityCheck, b.updatePlanBlobVerifier, b.ignoreSoftLimits);
}

private final HollowProducerListener producerMetricsListener;
Expand All @@ -148,7 +150,8 @@ private AbstractHollowProducer(
SingleProducerEnforcer singleProducerEnforcer,
HollowObjectHashCodeFinder hashCodeFinder,
boolean doIntegrityCheck,
HollowConsumer.UpdatePlanBlobVerifier updatePlanBlobVerifier) {
HollowConsumer.UpdatePlanBlobVerifier updatePlanBlobVerifier,
Supplier<Boolean> ignoreSoftLimits) {
this.publisher = publisher;
this.announcer = announcer;
this.versionMinter = versionMinter;
Expand All @@ -162,13 +165,15 @@ private AbstractHollowProducer(
this.allowTypeResharding = allowTypeResharding;
this.forceCoverageOfTypeResharding = forceCoverageOfTypeResharding;
this.focusHoleFillInFewestShards = focusHoleFillInFewestShards;
this.ignoreSoftLimits = ignoreSoftLimits;

HollowWriteStateEngine writeEngine = hashCodeFinder == null
? new HollowWriteStateEngine()
: new HollowWriteStateEngine(hashCodeFinder);
writeEngine.setTargetMaxTypeShardSize(targetMaxTypeShardSize);
writeEngine.allowTypeResharding(allowTypeResharding);
writeEngine.setFocusHoleFillInFewestShards(focusHoleFillInFewestShards);
writeEngine.setIgnoreOrdinalLimits(ignoreSoftLimits);

this.objectMapper = new HollowObjectMapper(writeEngine);
if (hashCodeFinder != null) {
Expand Down Expand Up @@ -326,6 +331,7 @@ private HollowProducer.ReadState restore(
writeEngine.setTargetMaxTypeShardSize(targetMaxTypeShardSize);
writeEngine.allowTypeResharding(allowTypeResharding);
writeEngine.setFocusHoleFillInFewestShards(focusHoleFillInFewestShards);
writeEngine.setIgnoreOrdinalLimits(ignoreSoftLimits);
HollowWriteStateCreator.populateStateEngineWithTypeWriteStates(writeEngine, schemas);
HollowObjectMapper newObjectMapper = new HollowObjectMapper(writeEngine);
if (hashCodeFinder != null) {
Expand Down Expand Up @@ -639,6 +645,7 @@ HollowProducer.Populator incrementalPopulate(
*/
void publish(ProducerListeners listeners, long toVersion, Artifacts artifacts) throws IOException {
Status.StageBuilder psb = listeners.firePublishStart(toVersion);
PublishStageStats stats = null;
try {
// We want a header to be created for all states.
artifacts.header = blobStager.openHeader(toVersion);
Expand Down Expand Up @@ -672,12 +679,16 @@ void publish(ProducerListeners listeners, long toVersion, Artifacts artifacts) t
artifacts.markSnapshotPublishComplete();
numStatesUntilNextSnapshot = numStatesBetweenSnapshots;
}

stats = new PublishStageStats.Builder().withStats(
getWriteEngine().getOrdinalMapStats()
).build();
psb.success();
} catch (Throwable throwable) {
psb.fail(throwable);
throw throwable;
} finally {
listeners.firePublishComplete(psb);
listeners.firePublishComplete(psb, stats);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.function.Supplier;

/**
* A HollowProducer is the top-level class used by producers of Hollow data to populate, publish, and announce data states.
Expand Down Expand Up @@ -752,6 +753,7 @@ public static class Builder<B extends HollowProducer.Builder<B>> {
boolean doIntegrityCheck = true;
ProducerOptionalBlobPartConfig optionalPartConfig = null;
HollowConsumer.UpdatePlanBlobVerifier updatePlanBlobVerifier = HollowConsumer.UpdatePlanBlobVerifier.DEFAULT_INSTANCE;
Supplier<Boolean> ignoreSoftLimits = null;

protected HollowProducerEventListener customProducerMetricsListener = null;

Expand Down Expand Up @@ -973,6 +975,18 @@ public B withUpdatePlanVerifier(HollowConsumer.UpdatePlanBlobVerifier updatePlan
return (B) this;
}

/**
* set the ignoreSoftLimits. When set to true, ignore soft limits like
* {@link com.netflix.hollow.core.memory.ByteArrayOrdinalMap} ordinal limit, instead of failing.
* @param ignoreSoftLimits the supplier for whether to ignore
* {@link com.netflix.hollow.core.memory.ByteArrayOrdinalMap} ordinal soft limit breaches
* @return this builder
*/
public B withIgnoreSoftLimits(Supplier<Boolean> ignoreSoftLimits) {
this.ignoreSoftLimits = ignoreSoftLimits;
return (B) this;
}

protected void checkArguments() {
if (allowTypeResharding == true && doIntegrityCheck == false) { // type resharding feature rollout
throw new IllegalArgumentException("Enabling type re-sharding requires integrity check to also be enabled");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.netflix.hollow.api.producer.validation.ValidationStatus;
import com.netflix.hollow.api.producer.validation.ValidationStatusListener;
import com.netflix.hollow.api.producer.validation.ValidatorListener;

import java.time.Duration;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -213,7 +214,6 @@ void fireBlobStage(Status.PublishBuilder b) {
l -> l.onBlobStage(s, blob, elapsed));
}


void fireBlobPublishAsync(CompletableFuture<HollowProducer.Blob> f) {
fire(PublishListener.class,
l -> l.onBlobPublishAsync(f));
Expand All @@ -228,13 +228,13 @@ void fireBlobPublish(Status.PublishBuilder b) {
l -> l.onBlobPublish(s, blob, elapsed));
}

void firePublishComplete(Status.StageBuilder b) {
void firePublishComplete(Status.StageBuilder b, PublishStageStats stats) {
Status s = b.build();
long version = b.version;
Duration elapsed = b.elapsed();

fire(PublishListener.class,
l -> l.onPublishComplete(s, version, elapsed));
l -> l.onPublishComplete(s, version, elapsed, stats));
}

Status.StageWithStateBuilder fireIntegrityCheckStart(HollowProducer.ReadState readState) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2016-2026 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.netflix.hollow.api.producer;

import com.netflix.hollow.core.memory.ByteArrayOrdinalMapStats;

import java.util.Map;

public class PublishStageStats implements StageStats {
private final Map<String, ByteArrayOrdinalMapStats> getOrdinalMapStats;
private PublishStageStats(Map<String, ByteArrayOrdinalMapStats> getOrdinalMapStats) {
this.getOrdinalMapStats = getOrdinalMapStats;
}

public Map<String, ByteArrayOrdinalMapStats> getOrdinalMapStats() {
return this.getOrdinalMapStats;
}

public static class Builder {
private Map<String, ByteArrayOrdinalMapStats> byteArrayOrdinalMapStatsByType;
public Builder withStats(Map<String, ByteArrayOrdinalMapStats> byteArrayOrdinalMapStatsByType) {
this.byteArrayOrdinalMapStatsByType = byteArrayOrdinalMapStatsByType;
return this;
}

public PublishStageStats build() {
return new PublishStageStats(byteArrayOrdinalMapStatsByType);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2016-2026 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.netflix.hollow.api.producer;

/**
* The top-level type for all stage stats.
*/
public interface StageStats {
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
package com.netflix.hollow.api.producer.listener;

import com.netflix.hollow.api.producer.HollowProducer;
import com.netflix.hollow.api.producer.PublishStageStats;
import com.netflix.hollow.api.producer.Status;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -71,7 +73,6 @@ default void onBlobStage(Status status, HollowProducer.Blob blob, Duration elaps
* @param elapsed time taken to publish the blob
*/
void onBlobPublish(Status status, HollowProducer.Blob blob, Duration elapsed);

/**
* Called if a blob is to be published asynchronously.
* This method is called for a {@link HollowProducer.Blob.Type#SNAPSHOT snapshot} blob when the
Expand All @@ -88,13 +89,34 @@ default void onBlobPublishAsync(CompletableFuture<HollowProducer.Blob> blob) {
}

/**
* Called after the publish stage finishes normally or abnormally. A {@code SUCCESS} status indicates that
* the {@code HollowBlob}s produced this cycle has been published to the blob store.
* Deprecated in favor of
* {@code onPublishComplete(Status status, long version, Duration elapsed, PublishStageStats stats)}.
*
* @param status CycleStatus of the publish stage. {@link Status#getType()} will return {@code SUCCESS}
* when the publish was successful; @{code FAIL} otherwise.
* @param version version that was published.
* @param elapsed duration of the publish stage in {@code unit} units
*/
@Deprecated
void onPublishComplete(Status status, long version, Duration elapsed);

/**
* Called when the publish stage finishes normally or abnormally. A {@code SUCCESS} status indicates that
* the {@code HollowBlob}s produced this cycle has been published to the blob store.
* <p>
* This method provides access to publish stage statistics including ordinal map metrics for each type.
* The stats parameter will be {@code null} if the publish stage encountered errors before statistics
* could be collected (e.g., during blob staging or publishing failures).
*
* @param status CycleStatus of the publish stage. {@link Status#getType()} will return {@code SUCCESS}
* when the publish was successful; {@code FAIL} otherwise.
* @param version version that was published.
* @param elapsed duration of the publish stage
* @param stats stats collected upon publish stage completion, including
* {@link com.netflix.hollow.core.memory.ByteArrayOrdinalMapStats} for each type.
* Will be {@code null} if the stage encountered errors before stats could be collected.
*/
default void onPublishComplete(Status status, long version, Duration elapsed, PublishStageStats stats) {
onPublishComplete(status, version, elapsed);
}
}
Loading