Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1376] Push data failed should always release request body #2449

Closed
wants to merge 19 commits into from
Closed
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"When an outbound (a.k.a. downstream) message reaches at the beginning of the pipeline, Netty will release it after writing it out." So why is pushdata not released when rpc failure occurs?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's because in this case error/Exception occurs before data is sent to pipeline. future.isSuccess() returns false in StdChannelListener#operationComplete

Original file line number Diff line number Diff line change
Expand Up @@ -208,14 +208,21 @@ public void sendRpc(ByteBuffer message) {

public ChannelFuture pushData(
PushData pushData, long pushDataTimeout, RpcResponseCallback callback) {
return pushData(pushData, pushDataTimeout, callback, null);
Runnable rpcFailureCallback =
new Runnable() {
@Override
public void run() {
pushData.body().release();
}
};
return pushData(pushData, pushDataTimeout, callback, rpcFailureCallback);
}

public ChannelFuture pushData(
PushData pushData,
long pushDataTimeout,
RpcResponseCallback callback,
Runnable rpcSendoutCallback) {
Runnable rpcFailureCallback) {
if (logger.isTraceEnabled()) {
logger.trace("Pushing data to {}", NettyUtils.getRemoteAddress(channel));
}
Expand All @@ -225,14 +232,29 @@ public ChannelFuture pushData(
PushRequestInfo info = new PushRequestInfo(dueTime, callback);
handler.addPushRequest(requestId, info);
pushData.requestId = requestId;
PushChannelListener listener = new PushChannelListener(requestId, rpcSendoutCallback);
PushChannelListener listener = new PushChannelListener(requestId, rpcFailureCallback);
ChannelFuture channelFuture = channel.writeAndFlush(pushData).addListener(listener);
info.setChannelFuture(channelFuture);
return channelFuture;
}

public ChannelFuture pushMergedData(
PushMergedData pushMergedData, long pushDataTimeout, RpcResponseCallback callback) {
Runnable rpcFailureCallback =
new Runnable() {
@Override
public void run() {
pushMergedData.body().release();
}
};
return pushMergedData(pushMergedData, pushDataTimeout, callback, rpcFailureCallback);
}

public ChannelFuture pushMergedData(
PushMergedData pushMergedData,
long pushDataTimeout,
RpcResponseCallback callback,
Runnable rpcFailureCallback) {
if (logger.isTraceEnabled()) {
logger.trace("Pushing merged data to {}", NettyUtils.getRemoteAddress(channel));
}
Expand All @@ -243,7 +265,7 @@ public ChannelFuture pushMergedData(
handler.addPushRequest(requestId, info);
pushMergedData.requestId = requestId;

PushChannelListener listener = new PushChannelListener(requestId);
PushChannelListener listener = new PushChannelListener(requestId, rpcFailureCallback);
ChannelFuture channelFuture = channel.writeAndFlush(pushMergedData).addListener(listener);
info.setChannelFuture(channelFuture);
return channelFuture;
Expand Down Expand Up @@ -415,29 +437,29 @@ protected void handleFailure(String errorMsg, Throwable cause) {

private class PushChannelListener extends StdChannelListener {
final long pushRequestId;
Runnable rpcSendOutCallback;
Runnable rpcFailureCallback;

PushChannelListener(long pushRequestId) {
this(pushRequestId, null);
}

PushChannelListener(long pushRequestId, Runnable rpcSendOutCallback) {
PushChannelListener(long pushRequestId, Runnable rpcFailureCallback) {
super("PUSH " + pushRequestId);
this.pushRequestId = pushRequestId;
this.rpcSendOutCallback = rpcSendOutCallback;
this.rpcFailureCallback = rpcFailureCallback;
}

@Override
public void operationComplete(Future<? super Void> future) throws Exception {
super.operationComplete(future);
if (rpcSendOutCallback != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes behavior, is it OK? cc @RexXiong @FMX

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC this will affect buffer release from flink, IMO we can retain the current setup and remove the rpcFailureCallback call from handleFailure, since operationComplete will be invoked regardless of whether the channel fails

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about current?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about current?

Make sense.

rpcSendOutCallback.run();
}
}

@Override
protected void handleFailure(String errorMsg, Throwable cause) {
handler.handlePushFailure(pushRequestId, errorMsg, cause);
if (rpcFailureCallback != null) {
rpcFailureCallback.run();
}
}
}
}
Loading