Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1547,9 +1547,6 @@ class BeamModulePlugin implements Plugin<Project> {
"ExtendsAutoValue",
"InlineMeSuggester",
"InvalidBlockTag",
"InvalidInlineTag",
"InvalidLink",
"InvalidParam",
"InvalidThrows",
"JavaTimeDefaultTimeZone",
"JavaUtilDate",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ static Builder builder() {
return new AutoValue_ImageRequest.Builder();
}

/** Build an {@link ImageRequest} from a {@param url}. */
/** Build an {@link ImageRequest} from a {@code url}. */
static ImageRequest of(String url) {
return builder().setImageUrl(url).setMimeType(mimeTypeOf(url)).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ protected boolean waitForNumMessages(String jobId, String pcollection, Long expe
*
* @param metrics a map of raw metrics. The results are also appened in the map.
* @param launchInfo Job info of the job
* @param config a {@class MetricsConfiguration}
* @param config a {@link MetricsConfiguration}
*/
private void computeDataflowMetrics(
Map<String, Double> metrics, LaunchInfo launchInfo, MetricsConfiguration config)
Expand Down Expand Up @@ -365,7 +365,7 @@ protected Map<String, Double> getCpuUtilizationMetrics(String jobId, TimeInterva
* Computes throughput metrics of the given pcollection in dataflow job.
*
* @param jobInfo dataflow job LaunchInfo
* @param config the {@class MetricsConfiguration}
* @param config the {@link MetricsConfiguration}
* @param timeInterval interval for the monitoring query
* @return throughput metrics of the pcollection
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ static class Configuration extends SyntheticSourceOptions {
/** Number of dynamic destinations to write. */
@JsonProperty public int numShards = 0;

/** See {@class org.apache.beam.sdk.io.Compression}. */
/** See {@link org.apache.beam.sdk.io.Compression}. */
@JsonProperty public String compressionType = "UNCOMPRESSED";

/** Runner specified to run the pipeline. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@
/**
* A customized {@link DoFnRunner} that handles late data dropping and garbage collection for
* stateful {@link DoFn DoFns}. It registers a GC timer in {@link #processElement(WindowedValue)}
* and does cleanup in {@link #onTimer(String, String, BoundedWindow, Instant, Instant, TimeDomain,
* boolean)}
* and does cleanup in {@link #onTimer}
*
* @param <InputT> the type of the {@link DoFn} (main) input elements
* @param <OutputT> the type of the {@link DoFn} (main) output elements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,14 @@ public void deleteTimer(
timeDomain));
}

/** @deprecated use {@link #deleteTimer(StateNamespace, String, TimeDomain)}. */
/** @deprecated use {@link #deleteTimer(StateNamespace, String, String, TimeDomain)}. */
@Deprecated
@Override
public void deleteTimer(StateNamespace namespace, String timerId, String timerFamilyId) {
throw new UnsupportedOperationException("Canceling of timer by ID is not yet supported.");
}

/** @deprecated use {@link #deleteTimer(StateNamespace, String, TimeDomain)}. */
/** @deprecated use {@link #deleteTimer(StateNamespace, String, String, TimeDomain)}. */
@Deprecated
@Override
public void deleteTimer(TimerData timerData) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public interface OutputWrapper<T> extends Output<T> {
@Override
default void emitWatermarkStatus(WatermarkStatus watermarkStatus) {}

/** In Flink 1.19 the {@code emitRecordAttributes} method was added. */
/** In Flink 1.19 the {@code recordAttributes} method was added. */
@Override
default void emitRecordAttributes(RecordAttributes recordAttributes) {
throw new UnsupportedOperationException("emitRecordAttributes not implemented");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1655,7 +1655,7 @@ void onFiredOrDeletedTimer(TimerData timer) {
}
}

/** @deprecated use {@link #deleteTimer(StateNamespace, String, TimeDomain)}. */
/** @deprecated use {@link #deleteTimer(StateNamespace, String, String, TimeDomain)}. */
@Deprecated
@Override
public void deleteTimer(StateNamespace namespace, String timerId, String timerFamilyId) {
Expand All @@ -1672,7 +1672,7 @@ public void deleteTimer(
}
}

/** @deprecated use {@link #deleteTimer(StateNamespace, String, TimeDomain)}. */
/** @deprecated use {@link #deleteTimer(StateNamespace, String, String, TimeDomain)}. */
@Override
@Deprecated
public void deleteTimer(TimerData timer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1655,7 +1655,7 @@ void onFiredOrDeletedTimer(TimerData timer) {
}
}

/** @deprecated use {@link #deleteTimer(StateNamespace, String, TimeDomain)}. */
/** @deprecated use {@link #deleteTimer(StateNamespace, String, String, TimeDomain)}. */
@Deprecated
@Override
public void deleteTimer(StateNamespace namespace, String timerId, String timerFamilyId) {
Expand All @@ -1672,7 +1672,7 @@ public void deleteTimer(
}
}

/** @deprecated use {@link #deleteTimer(StateNamespace, String, TimeDomain)}. */
/** @deprecated use {@link #deleteTimer(StateNamespace, String, String, TimeDomain)}. */
@Override
@Deprecated
public void deleteTimer(TimerData timer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public interface OutputWrapper<T> extends Output<T> {
@Override
default void emitWatermarkStatus(WatermarkStatus watermarkStatus) {}

/** In Flink 1.19 the {@code emitRecordAttributes} method was added. */
/** In Flink 1.19 the {@code recordAttributes} method was added. */
@Override
default void emitRecordAttributes(RecordAttributes recordAttributes) {
throw new UnsupportedOperationException("emitRecordAttributes not implemented");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1658,7 +1658,7 @@ void onFiredOrDeletedTimer(TimerData timer) {
}
}

/** @deprecated use {@link #deleteTimer(StateNamespace, String, TimeDomain)}. */
/** @deprecated use {@link #deleteTimer(StateNamespace, String, String, TimeDomain)}. */
@Deprecated
@Override
public void deleteTimer(StateNamespace namespace, String timerId, String timerFamilyId) {
Expand All @@ -1675,7 +1675,7 @@ public void deleteTimer(
}
}

/** @deprecated use {@link #deleteTimer(StateNamespace, String, TimeDomain)}. */
/** @deprecated use {@link #deleteTimer(StateNamespace, String, String, TimeDomain)}. */
@Override
@Deprecated
public void deleteTimer(TimerData timer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,14 @@ private static class ElementExecution {
/** Marker execution to represent when there is no element currently being processed. */
static final ElementExecution IDLE = new ElementExecution();

/** Only empty for {@see IDLE}. */
/** Only empty for {@link #IDLE}. */
final Optional<NameContext> step;

ElementExecution(NameContext step) {
this.step = Optional.of(step);
}

/** Only used for {@see IDLE}. */
/** Only used for {@link #IDLE}. */
private ElementExecution() {
step = Optional.empty();
}
Expand Down Expand Up @@ -155,8 +155,8 @@ private static class ReaderWriterState {
* Journal of fragments of execution per element to count for attributing processing time. Each
* time we transition up or down the stage fusion graph we add an execution fragment for the
* currently processing element with an incremented snapshot version. Each snapshot version must
* have a representative value in the {@code executionJournal}, or {@see IDLE_EXECUTION} to
* represent completion of processing.
* have a representative value in the {@code executionJournal}, or {@link ElementExecution#IDLE}
* to represent completion of processing.
*/
private final Journal<ElementExecution> executionJournal;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ public DataflowExecutionContext(
* PCollectionView PCollectionViews}.
*
* <p>If side input source metadata is provided by the service in {@link SideInputInfo
* sideInputInfos}, we request a {@link SideInputReader} from the {@code executionContext} using
* that info. If no side input source metadata is provided but the DoFn expects side inputs, as a
* sideInputInfos}, we request a {@link SideInputReader} from the execution context using that
* info. If no side input source metadata is provided but the DoFn expects side inputs, as a
* fallback, we request a {@link SideInputReader} based only on the expected views.
*
* <p>These cases are not disjoint: Whenever a {@link GroupAlsoByWindowFn} takes side inputs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ private static void addOutlierStatsToHistogram(

/**
* @param metricName The {@link MetricName} that represents this Histogram.
* @param value The histogram value. Currently we only support converting histograms that use
* {@code linear} or {@code exponential} buckets.
* @param inputHistogram The histogram value. Currently we only support converting histograms that
* use {@code linear} or {@code exponential} buckets.
* @return If this conversion succeeds, a {@code MetricValue} that represents this histogram.
* Otherwise returns an empty optional.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public interface WorkUnitClient {
* Reports the worker messages to dataflow. We currently report autoscaling signals and
* perworkermetrics with this path.
*
* @param msg the WorkerMessages to report
* @param messages the WorkerMessages to report
* @return a list of {@link WorkerMessageResponse}
*/
List<WorkerMessageResponse> reportWorkerMessage(List<WorkerMessage> messages) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@
* broken stream.
*
* <p>Subclasses should override {@link #newResponseHandler()} to implement a handler for physical
* stream connection. {@link #onNewStream()} to perform any work that must be done when a new stream
* is created, such as sending headers or retrying requests.
* stream connection. {@link #onFlushPending(boolean)} to perform any work that must be done when a
* new stream is created, such as sending headers or retrying requests.
*
* <p>{@link #trySend(RequestT)} and {@link #startStream()} should not be called when handling
* <p>{@link #trySend(Object)} and {@link #startStream()} should not be called when handling
* responses; use {@link #executeSafely(Runnable)} instead.
*
* <p>Synchronization on this is used to synchronize the gRpc stream state and internal data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,12 @@
import org.slf4j.Logger;

/**
* Request observer that allows resetting its internal delegate using a {@link
* #streamObserverFactory}.
* Request observer that allows resetting its internal delegate.
*
* @implNote {@link StreamObserver}s generated by {@link #streamObserverFactory} are expected to be
* {@link ThreadSafe}. Has same methods declared in {@link StreamObserver}, but they throw
* {@link StreamClosedException} and {@link WindmillStreamShutdownException}, which much be
* handled by callers.
* @implNote {@link StreamObserver}s injected via {@link #reset(TerminatingStreamObserver)} are
* expected to be {@link ThreadSafe}. Has same methods declared in {@link StreamObserver}, but
* they throw {@link StreamClosedException} and {@link WindmillStreamShutdownException}, which
* much be handled by callers.
*/
@ThreadSafe
@Internal
Expand All @@ -49,9 +48,9 @@ final class ResettableThrowingStreamObserver<T> {
private boolean isPoisoned = false;

/**
* Indicates that the current delegate is closed via {@link #poison() or {@link #onCompleted()}}.
* If not poisoned, a call to {@link #reset()} is required to perform future operations on the
* StreamObserver.
* Indicates that the current delegate is closed via {@link #poison()} or {@link #onCompleted()}.
* If not poisoned, a call to {@link #reset(TerminatingStreamObserver)} is required to perform
* future operations on the StreamObserver.
*/
@GuardedBy("this")
private boolean isCurrentStreamClosed = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,6 @@ private Optional<RunnerApi.ArtifactInformation> getLocal() {
/**
* Attempts to provide a reasonable filename for the artifact.
*
* @param index a monotonically increasing index, which provides uniqueness
* @param environment the environment id
* @param artifact the artifact itself
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ void execute() throws IOException {

/**
* Execute the {@link ProcessBuilder} that starts the Prism service. Redirects output to the
* {@param outputStream}.
* {@code outputStream}.
*/
void execute(OutputStream outputStream) throws IOException {
execute(createProcessBuilder().redirectErrorStream(true));
Expand All @@ -127,7 +127,7 @@ void execute(OutputStream outputStream) throws IOException {

/**
* Execute the {@link ProcessBuilder} that starts the Prism service. Redirects output to the
* {@param file}.
* {@code file}.
*/
void execute(File file) throws IOException {
execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class PrismPipelineResult implements PipelineResult {
private final Runnable cleanup;

/**
* Instantiate the {@link PipelineResult} from the {@param delegate} and a {@param cancel} to be
* Instantiate the {@link PipelineResult} from the {@code delegate} and a {@code cancel} to be
* called when stopping the underlying executable Job management service.
*/
PrismPipelineResult(PipelineResult delegate, Runnable cancel) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,11 @@
import org.slf4j.LoggerFactory;

/**
* Implementation of BundleManager for non-portable mode. Keeps track of the async function
* completions.
* {@inheritDoc} Implementation of BundleManager for non-portable mode. Keeps track of the async
* function completions.
*
* <p>This class is not thread safe and the current implementation relies on the assumption that
* messages are dispatched to BundleManager in a single threaded mode.
*
* <p>{@inheritDoc}
*/
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,10 @@ public static <T> Encoder<T> encoderFor(Coder<T> coder) {

/**
* Creates a Spark {@link Encoder} for {@link T} of {@link StructType} with fields {@code value},
* {@code timestamp}, {@code windows} and {@code pane}.
* {@code timestamp}, {@code window} and {@code pane}.
*
* @param value {@link Encoder} to encode field `{@code value}`.
* @param window {@link Encoder} to encode individual windows in field `{@code windows}`
* @param window {@link Encoder} to encode individual windows in field `{@code window}`
*/
public static <T, W extends BoundedWindow> Encoder<WindowedValue<T>> windowedValueEncoder(
Encoder<T> value, Encoder<W> window) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public static <K, V> JavaRDD<KV<K, Iterable<WindowedValue<V>>>> groupByKeyOnly(
/**
* Spark-level group by key operation that keeps original Beam {@link KV} pairs unchanged.
*
* @returns {@link JavaPairRDD} where the first value in the pair is the serialized key, and the
* @return {@link JavaPairRDD} where the first value in the pair is the serialized key, and the
* second is an iterable of the {@link KV} pairs with that key.
*/
static <K, V> JavaPairRDD<ByteArray, Iterable<WindowedValue<KV<K, V>>>> groupByKeyPair(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ public abstract class FileSystem<ResourceIdT extends ResourceId> {
* <p>Implementation should handle the following ambiguities of a user-provided spec:
*
* <ol>
* <li>{@code spec} could be a glob or a uri. {@link #match} should be able to tell and choose
* <li>{@code specs} could be a glob or a uri. {@link #match} should be able to tell and choose
* efficient implementations.
* <li>The user-provided {@code spec} might refer to files or directories. It is common that
* <li>The user-provided {@code specs} might refer to files or directories. It is common that
* users that wish to indicate a directory will omit the trailing {@code /}, such as in a
* spec of {@code "/tmp/dir"}. The {@link FileSystem} should be able to recognize a
* directory with the trailing {@code /} omitted, but should always return a correct {@link
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ public static boolean hasGlobWildcard(String spec) {
* <p>Implementation handles the following ambiguities of a user-provided spec:
*
* <ol>
* <li>{@code spec} could be a glob or a uri. {@link #match} should be able to tell and choose
* <li>{@code specs} could be a glob or a uri. {@link #match} should be able to tell and choose
* efficient implementations.
* <li>The user-provided {@code spec} might refer to files or directories. It is common that
* <li>The user-provided {@code specs} might refer to files or directories. It is common that
* users that wish to indicate a directory will omit the trailing path delimiter, such as
* {@code "/tmp/dir"} in Linux. The {@link FileSystem} should be able to recognize a
* directory with the trailing path delimiter omitted, but should always return a correct
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,13 @@ public static class MetricsContainerHolder implements MetricsEnvironmentState {
}
}

/**
* Set the {@link MetricsContainer} for the associated {@link MetricsEnvironment}.
*
* @return The previous container for the associated {@link MetricsEnvironment}.
*/
/** Set the {@link MetricsContainer} for the associated {@link MetricsEnvironment}. */
public interface MetricsEnvironmentState {
/**
* Activates the given container.
*
* @return The previous container for the associated {@link MetricsEnvironment}.
*/
@Nullable
MetricsContainer activate(@Nullable MetricsContainer metricsContainer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
*
* <p>Java transforms and JVM runners should take care when processing these types as they may have
* a particular semantic meaning in the context that created them. For example, consider an
* enumerated type backed by a primitive {@class FieldType.INT8}. A Java transform can clearly pass
* enumerated type backed by a primitive {@code FieldType.INT8}. A Java transform can clearly pass
* through this value and pass it back to a context that understands it, but that transform should
* not blindly perform arithmetic on this type.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1049,7 +1049,7 @@ public interface MultiOutputReceiver {
* RestrictionTracker.HasProgress} implementation within the {@link RestrictionTracker} is an
* inaccurate representation of known work.
*
* <p>It is up to each splittable {@DoFn} to convert between their natural representation of
* <p>It is up to each splittable {@link DoFn} to convert between their natural representation of
* outstanding work and this representation. For example:
*
* <ul>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public static <T> Iterables<T> iterables() {
* {@code other} and then applying {@link #pCollections()}, but has the advantage that it can be
* more easily used inline.
*
* <p>Both {@cpde PCollections} must have equal {@link WindowFn}s. The output elements of {@code
* <p>Both {@code PCollection}s must have equal {@link WindowFn}s. The output elements of {@code
* Flatten<T>} are in the same windows and have the same timestamps as their corresponding input
* elements. The output {@code PCollection} will have the same {@link WindowFn} as both inputs.
*
Expand Down
Loading
Loading