Skip to content

Commit

Permalink
Replace Anonymous types with lambda
Browse files Browse the repository at this point in the history
  • Loading branch information
sigee committed Sep 20, 2024
1 parent a7d5075 commit 67b684a
Show file tree
Hide file tree
Showing 10 changed files with 121 additions and 193 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.livy.client.common;

import java.io.ByteArrayOutputStream;
import java.lang.invoke.SerializedLambda;
import java.nio.ByteBuffer;

import com.esotericsoftware.kryo.Kryo;
Expand All @@ -41,23 +42,20 @@ public class Serializer {
private final ThreadLocal<Kryo> kryos;

public Serializer(final Class<?>... klasses) {
this.kryos = new ThreadLocal<Kryo>() {
@Override
protected Kryo initialValue() {
Kryo kryo = new Kryo();
int count = 0;
for (Class<?> klass : klasses) {
kryo.register(klass, REG_ID_BASE + count);
count++;
}
kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(
new StdInstantiatorStrategy()));
kryo.register(java.lang.invoke.SerializedLambda.class);
kryo.register(ClosureSerializer.Closure.class, new ClosureSerializer());
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
return kryo;
this.kryos = ThreadLocal.withInitial(() -> {
Kryo kryo = new Kryo();
int count = 0;
for (Class<?> klass : klasses) {
kryo.register(klass, REG_ID_BASE + count);
count++;
}
};
kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(
new StdInstantiatorStrategy()));
kryo.register(SerializedLambda.class);
kryo.register(ClosureSerializer.Closure.class, new ClosureSerializer());
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
return kryo;
});
}

public Object deserialize(ByteBuffer data) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,10 @@ class HttpClient implements LivyClient {

// Because we only have one connection to the server, we don't need more than a single
// threaded executor here.
this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "HttpClient-" + sessionId);
t.setDaemon(true);
return t;
}
this.executor = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "HttpClient-" + sessionId);
t.setDaemon(true);
return t;
});

this.serializer = new Serializer();
Expand Down Expand Up @@ -146,24 +143,18 @@ public Future<?> addFile(URI uri) {
}

private Future<?> uploadResource(final File file, final String command, final String paramName) {
Callable<Void> task = new Callable<Void>() {
@Override
public Void call() throws Exception {
conn.post(file, Void.class, paramName, "/%d/%s", sessionId, command);
return null;
}
Callable<Void> task = () -> {
conn.post(file, Void.class, paramName, "/%d/%s", sessionId, command);
return null;
};
return executor.submit(task);
}

private Future<?> addResource(final String command, final URI resource) {
Callable<Void> task = new Callable<Void>() {
@Override
public Void call() throws Exception {
ClientMessage msg = new AddResource(resource.toString());
conn.post(msg, Void.class, "/%d/%s", sessionId, command);
return null;
}
Callable<Void> task = () -> {
ClientMessage msg = new AddResource(resource.toString());
conn.post(msg, Void.class, "/%d/%s", sessionId, command);
return null;
};
return executor.submit(task);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,38 +134,32 @@ protected Throwable error() {
}

void start(final String command, final ByteBuffer serializedJob) {
Runnable task = new Runnable() {
@Override
public void run() {
try {
ClientMessage msg = new SerializedJob(BufferUtils.toByteArray(serializedJob), "spark");
JobStatus status = conn.post(msg, JobStatus.class, "/%d/%s", sessionId, command);

if (isCancelPending) {
sendCancelRequest(status.id);
}

jobId = status.id;
Runnable task = () -> {
try {
ClientMessage msg = new SerializedJob(BufferUtils.toByteArray(serializedJob), "spark");
JobStatus status = conn.post(msg, JobStatus.class, "/%d/%s", sessionId, command);

pollTask = executor.schedule(new JobPollTask(initialPollInterval),
initialPollInterval, TimeUnit.MILLISECONDS);
} catch (Exception e) {
setResult(null, e, State.FAILED);
if (isCancelPending) {
sendCancelRequest(status.id);
}

jobId = status.id;

pollTask = executor.schedule(new JobPollTask(initialPollInterval),
initialPollInterval, TimeUnit.MILLISECONDS);
} catch (Exception e) {
setResult(null, e, State.FAILED);
}
};
executor.submit(task);
}

private void sendCancelRequest(final long id) {
executor.submit(new Runnable() {
@Override
public void run() {
try {
conn.post(null, Void.class, "/%d/jobs/%d/cancel", sessionId, id);
} catch (Exception e) {
setResult(null, e, State.FAILED);
}
executor.submit(() -> {
try {
conn.post(null, Void.class, "/%d/jobs/%d/cancel", sessionId, id);
} catch (Exception e) {
setResult(null, e, State.FAILED);
}
});
}
Expand Down
84 changes: 32 additions & 52 deletions rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,7 @@ public void onFailure(Throwable error) throws Exception {

// Set up a timeout to fail the promise if we don't hear back from the context
// after a configurable timeout.
Runnable timeoutTask = new Runnable() {
@Override
public void run() {
connectTimeout(handler);
}
};
Runnable timeoutTask = () -> connectTimeout(handler);
this.timeout = factory.getServer().getEventLoopGroup().schedule(timeoutTask,
conf.getTimeAsMs(RPC_CLIENT_HANDSHAKE_TIMEOUT), TimeUnit.MILLISECONDS);
} catch (Exception e) {
Expand Down Expand Up @@ -226,14 +221,11 @@ private static ChildProcess startDriver(final RSCConf conf, Promise<?> promise)
} else if (conf.getBoolean(CLIENT_IN_PROCESS)) {
// Mostly for testing things quickly. Do not do this in production.
LOG.warn("!!!! Running remote driver in-process. !!!!");
Runnable child = new Runnable() {
@Override
public void run() {
try {
RSCDriverBootstrapper.main(new String[] { confFile.getAbsolutePath() });
} catch (Exception e) {
throw Utils.propagate(e);
}
Runnable child = () -> {
try {
RSCDriverBootstrapper.main(new String[] { confFile.getAbsolutePath() });
} catch (Exception e) {
throw Utils.propagate(e);
}
};
return new ChildProcess(conf, promise, child, confFile);
Expand Down Expand Up @@ -346,12 +338,9 @@ private void handle(ChannelHandlerContext ctx, RemoteDriverAddress msg) {
LOG.warn("Connection established but promise is already finalized.");
}

ctx.executor().submit(new Runnable() {
@Override
public void run() {
dispose();
ContextLauncher.this.dispose(false);
}
ctx.executor().submit(() -> {
dispose();
ContextLauncher.this.dispose(false);
});
}

Expand Down Expand Up @@ -380,25 +369,22 @@ public ChildProcess(RSCConf conf, Promise<?> promise, final Process childProc, F
this.child = childProc;
this.confFile = confFile;

Runnable monitorTask = new Runnable() {
@Override
public void run() {
try {
RSCClientFactory.childProcesses().incrementAndGet();
int exitCode = child.waitFor();
if (exitCode != 0) {
LOG.warn("Child process exited with code {}.", exitCode);
fail(new IOException(String.format("Child process exited with code %d.", exitCode)));
}
} catch (InterruptedException ie) {
LOG.warn("Waiting thread interrupted, killing child process.");
Thread.interrupted();
child.destroy();
} catch (Exception e) {
LOG.warn("Exception while waiting for child process.", e);
} finally {
RSCClientFactory.childProcesses().decrementAndGet();
Runnable monitorTask = () -> {
try {
RSCClientFactory.childProcesses().incrementAndGet();
int exitCode = child.waitFor();
if (exitCode != 0) {
LOG.warn("Child process exited with code {}.", exitCode);
fail(new IOException(String.format("Child process exited with code %d.", exitCode)));
}
} catch (InterruptedException ie) {
LOG.warn("Waiting thread interrupted, killing child process.");
Thread.interrupted();
child.destroy();
} catch (Exception e) {
LOG.warn("Exception while waiting for child process.", e);
} finally {
RSCClientFactory.childProcesses().decrementAndGet();
}
};
this.monitor = monitor(monitorTask, childId);
Expand Down Expand Up @@ -435,25 +421,19 @@ public void detach() {
}

private Thread monitor(final Runnable task, int childId) {
Runnable wrappedTask = new Runnable() {
@Override
public void run() {
try {
task.run();
} finally {
confFile.delete();
}
Runnable wrappedTask = () -> {
try {
task.run();
} finally {
confFile.delete();
}
};
Thread thread = new Thread(wrappedTask);
thread.setDaemon(true);
thread.setName("ContextLauncher-" + childId);
thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
LOG.warn("Child task threw exception.", e);
fail(e);
}
thread.setUncaughtExceptionHandler((t, e) -> {
LOG.warn("Child task threw exception.", e);
fail(e);
});
thread.start();
return thread;
Expand Down
15 changes: 6 additions & 9 deletions rsc/src/main/java/org/apache/livy/rsc/RSCClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -358,15 +358,12 @@ public void onFailure(Throwable error) throws Exception {
promise.tryFailure(error);
}
});
promise.addListener(new GenericFutureListener<Promise<T>>() {
@Override
public void operationComplete(Promise<T> p) {
if (jobId != null) {
jobs.remove(jobId);
}
if (p.isCancelled() && !rpc.isDone()) {
rpc.cancel(true);
}
promise.addListener((GenericFutureListener<Promise<T>>) p -> {
if (jobId != null) {
jobs.remove(jobId);
}
if (p.isCancelled() && !rpc.isDone()) {
rpc.cancel(true);
}
});
return handle;
Expand Down
13 changes: 5 additions & 8 deletions rsc/src/main/java/org/apache/livy/rsc/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,11 @@ public static String stackTraceAsString(Throwable t) {
}

public static <T> void addListener(Future<T> future, final FutureListener<T> lsnr) {
future.addListener(new GenericFutureListener<Future<T>>() {
@Override
public void operationComplete(Future<T> f) throws Exception {
if (f.isSuccess()) {
lsnr.onSuccess(f.get());
} else {
lsnr.onFailure(f.cause());
}
future.addListener((GenericFutureListener<Future<T>>) f -> {
if (f.isSuccess()) {
lsnr.onSuccess(f.get());
} else {
lsnr.onFailure(f.cause());
}
});
}
Expand Down
9 changes: 3 additions & 6 deletions rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java
Original file line number Diff line number Diff line change
Expand Up @@ -242,12 +242,9 @@ private void setupIdleTimeout() {
return;
}

Runnable timeoutTask = new Runnable() {
@Override
public void run() {
LOG.warn("Shutting down RSC due to idle timeout ({}).", livyConf.get(SERVER_IDLE_TIMEOUT));
shutdown();
}
Runnable timeoutTask = () -> {
LOG.warn("Shutting down RSC due to idle timeout ({}).", livyConf.get(SERVER_IDLE_TIMEOUT));
shutdown();
};
ScheduledFuture<?> timeout = server.getEventLoopGroup().schedule(timeoutTask,
livyConf.getTimeAsMs(SERVER_IDLE_TIMEOUT), TimeUnit.MILLISECONDS);
Expand Down
Loading

0 comments on commit 67b684a

Please sign in to comment.