diff --git a/client-common/src/main/java/org/apache/livy/client/common/Serializer.java b/client-common/src/main/java/org/apache/livy/client/common/Serializer.java index 2f879ac3d..3814c721c 100644 --- a/client-common/src/main/java/org/apache/livy/client/common/Serializer.java +++ b/client-common/src/main/java/org/apache/livy/client/common/Serializer.java @@ -18,6 +18,7 @@ package org.apache.livy.client.common; import java.io.ByteArrayOutputStream; +import java.lang.invoke.SerializedLambda; import java.nio.ByteBuffer; import com.esotericsoftware.kryo.Kryo; @@ -41,23 +42,20 @@ public class Serializer { private final ThreadLocal kryos; public Serializer(final Class... klasses) { - this.kryos = new ThreadLocal() { - @Override - protected Kryo initialValue() { - Kryo kryo = new Kryo(); - int count = 0; - for (Class klass : klasses) { - kryo.register(klass, REG_ID_BASE + count); - count++; - } - kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy( - new StdInstantiatorStrategy())); - kryo.register(java.lang.invoke.SerializedLambda.class); - kryo.register(ClosureSerializer.Closure.class, new ClosureSerializer()); - kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); - return kryo; + this.kryos = ThreadLocal.withInitial(() -> { + Kryo kryo = new Kryo(); + int count = 0; + for (Class klass : klasses) { + kryo.register(klass, REG_ID_BASE + count); + count++; } - }; + kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy( + new StdInstantiatorStrategy())); + kryo.register(SerializedLambda.class); + kryo.register(ClosureSerializer.Closure.class, new ClosureSerializer()); + kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); + return kryo; + }); } public Object deserialize(ByteBuffer data) { diff --git a/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java b/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java index 1211a5288..f95e30d1a 100644 --- a/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java +++ b/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java @@ -84,13 +84,10 @@ class HttpClient implements LivyClient { // Because we only have one connection to the server, we don't need more than a single // threaded executor here. - this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r, "HttpClient-" + sessionId); - t.setDaemon(true); - return t; - } + this.executor = Executors.newSingleThreadScheduledExecutor(r -> { + Thread t = new Thread(r, "HttpClient-" + sessionId); + t.setDaemon(true); + return t; }); this.serializer = new Serializer(); @@ -146,24 +143,18 @@ public Future addFile(URI uri) { } private Future uploadResource(final File file, final String command, final String paramName) { - Callable task = new Callable() { - @Override - public Void call() throws Exception { - conn.post(file, Void.class, paramName, "/%d/%s", sessionId, command); - return null; - } + Callable task = () -> { + conn.post(file, Void.class, paramName, "/%d/%s", sessionId, command); + return null; }; return executor.submit(task); } private Future addResource(final String command, final URI resource) { - Callable task = new Callable() { - @Override - public Void call() throws Exception { - ClientMessage msg = new AddResource(resource.toString()); - conn.post(msg, Void.class, "/%d/%s", sessionId, command); - return null; - } + Callable task = () -> { + ClientMessage msg = new AddResource(resource.toString()); + conn.post(msg, Void.class, "/%d/%s", sessionId, command); + return null; }; return executor.submit(task); } diff --git a/client-http/src/main/java/org/apache/livy/client/http/JobHandleImpl.java b/client-http/src/main/java/org/apache/livy/client/http/JobHandleImpl.java index d39dfe994..d5e74a784 100644 --- a/client-http/src/main/java/org/apache/livy/client/http/JobHandleImpl.java +++ b/client-http/src/main/java/org/apache/livy/client/http/JobHandleImpl.java @@ -134,38 +134,32 @@ protected Throwable error() { } void start(final String command, final ByteBuffer serializedJob) { - Runnable task = new Runnable() { - @Override - public void run() { - try { - ClientMessage msg = new SerializedJob(BufferUtils.toByteArray(serializedJob), "spark"); - JobStatus status = conn.post(msg, JobStatus.class, "/%d/%s", sessionId, command); - - if (isCancelPending) { - sendCancelRequest(status.id); - } - - jobId = status.id; + Runnable task = () -> { + try { + ClientMessage msg = new SerializedJob(BufferUtils.toByteArray(serializedJob), "spark"); + JobStatus status = conn.post(msg, JobStatus.class, "/%d/%s", sessionId, command); - pollTask = executor.schedule(new JobPollTask(initialPollInterval), - initialPollInterval, TimeUnit.MILLISECONDS); - } catch (Exception e) { - setResult(null, e, State.FAILED); + if (isCancelPending) { + sendCancelRequest(status.id); } + + jobId = status.id; + + pollTask = executor.schedule(new JobPollTask(initialPollInterval), + initialPollInterval, TimeUnit.MILLISECONDS); + } catch (Exception e) { + setResult(null, e, State.FAILED); } }; executor.submit(task); } private void sendCancelRequest(final long id) { - executor.submit(new Runnable() { - @Override - public void run() { - try { - conn.post(null, Void.class, "/%d/jobs/%d/cancel", sessionId, id); - } catch (Exception e) { - setResult(null, e, State.FAILED); - } + executor.submit(() -> { + try { + conn.post(null, Void.class, "/%d/jobs/%d/cancel", sessionId, id); + } catch (Exception e) { + setResult(null, e, State.FAILED); } }); } diff --git a/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java b/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java index 1251a0f04..bbd570468 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java +++ b/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java @@ -123,12 +123,7 @@ public void onFailure(Throwable error) throws Exception { // Set up a timeout to fail the promise if we don't hear back from the context // after a configurable timeout. - Runnable timeoutTask = new Runnable() { - @Override - public void run() { - connectTimeout(handler); - } - }; + Runnable timeoutTask = () -> connectTimeout(handler); this.timeout = factory.getServer().getEventLoopGroup().schedule(timeoutTask, conf.getTimeAsMs(RPC_CLIENT_HANDSHAKE_TIMEOUT), TimeUnit.MILLISECONDS); } catch (Exception e) { @@ -226,14 +221,11 @@ private static ChildProcess startDriver(final RSCConf conf, Promise promise) } else if (conf.getBoolean(CLIENT_IN_PROCESS)) { // Mostly for testing things quickly. Do not do this in production. LOG.warn("!!!! Running remote driver in-process. !!!!"); - Runnable child = new Runnable() { - @Override - public void run() { - try { - RSCDriverBootstrapper.main(new String[] { confFile.getAbsolutePath() }); - } catch (Exception e) { - throw Utils.propagate(e); - } + Runnable child = () -> { + try { + RSCDriverBootstrapper.main(new String[] { confFile.getAbsolutePath() }); + } catch (Exception e) { + throw Utils.propagate(e); } }; return new ChildProcess(conf, promise, child, confFile); @@ -346,12 +338,9 @@ private void handle(ChannelHandlerContext ctx, RemoteDriverAddress msg) { LOG.warn("Connection established but promise is already finalized."); } - ctx.executor().submit(new Runnable() { - @Override - public void run() { - dispose(); - ContextLauncher.this.dispose(false); - } + ctx.executor().submit(() -> { + dispose(); + ContextLauncher.this.dispose(false); }); } @@ -380,25 +369,22 @@ public ChildProcess(RSCConf conf, Promise promise, final Process childProc, F this.child = childProc; this.confFile = confFile; - Runnable monitorTask = new Runnable() { - @Override - public void run() { - try { - RSCClientFactory.childProcesses().incrementAndGet(); - int exitCode = child.waitFor(); - if (exitCode != 0) { - LOG.warn("Child process exited with code {}.", exitCode); - fail(new IOException(String.format("Child process exited with code %d.", exitCode))); - } - } catch (InterruptedException ie) { - LOG.warn("Waiting thread interrupted, killing child process."); - Thread.interrupted(); - child.destroy(); - } catch (Exception e) { - LOG.warn("Exception while waiting for child process.", e); - } finally { - RSCClientFactory.childProcesses().decrementAndGet(); + Runnable monitorTask = () -> { + try { + RSCClientFactory.childProcesses().incrementAndGet(); + int exitCode = child.waitFor(); + if (exitCode != 0) { + LOG.warn("Child process exited with code {}.", exitCode); + fail(new IOException(String.format("Child process exited with code %d.", exitCode))); } + } catch (InterruptedException ie) { + LOG.warn("Waiting thread interrupted, killing child process."); + Thread.interrupted(); + child.destroy(); + } catch (Exception e) { + LOG.warn("Exception while waiting for child process.", e); + } finally { + RSCClientFactory.childProcesses().decrementAndGet(); } }; this.monitor = monitor(monitorTask, childId); @@ -435,25 +421,19 @@ public void detach() { } private Thread monitor(final Runnable task, int childId) { - Runnable wrappedTask = new Runnable() { - @Override - public void run() { - try { - task.run(); - } finally { - confFile.delete(); - } + Runnable wrappedTask = () -> { + try { + task.run(); + } finally { + confFile.delete(); } }; Thread thread = new Thread(wrappedTask); thread.setDaemon(true); thread.setName("ContextLauncher-" + childId); - thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { - @Override - public void uncaughtException(Thread t, Throwable e) { - LOG.warn("Child task threw exception.", e); - fail(e); - } + thread.setUncaughtExceptionHandler((t, e) -> { + LOG.warn("Child task threw exception.", e); + fail(e); }); thread.start(); return thread; diff --git a/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java b/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java index 51f84b4c5..668d0a1d5 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java +++ b/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java @@ -358,15 +358,12 @@ public void onFailure(Throwable error) throws Exception { promise.tryFailure(error); } }); - promise.addListener(new GenericFutureListener>() { - @Override - public void operationComplete(Promise p) { - if (jobId != null) { - jobs.remove(jobId); - } - if (p.isCancelled() && !rpc.isDone()) { - rpc.cancel(true); - } + promise.addListener((GenericFutureListener>) p -> { + if (jobId != null) { + jobs.remove(jobId); + } + if (p.isCancelled() && !rpc.isDone()) { + rpc.cancel(true); } }); return handle; diff --git a/rsc/src/main/java/org/apache/livy/rsc/Utils.java b/rsc/src/main/java/org/apache/livy/rsc/Utils.java index 3c8a5e664..ca0b9de5f 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/Utils.java +++ b/rsc/src/main/java/org/apache/livy/rsc/Utils.java @@ -99,14 +99,11 @@ public static String stackTraceAsString(Throwable t) { } public static void addListener(Future future, final FutureListener lsnr) { - future.addListener(new GenericFutureListener>() { - @Override - public void operationComplete(Future f) throws Exception { - if (f.isSuccess()) { - lsnr.onSuccess(f.get()); - } else { - lsnr.onFailure(f.cause()); - } + future.addListener((GenericFutureListener>) f -> { + if (f.isSuccess()) { + lsnr.onSuccess(f.get()); + } else { + lsnr.onFailure(f.cause()); } }); } diff --git a/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java b/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java index 0e4fbc5ba..4e9044544 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java +++ b/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java @@ -242,12 +242,9 @@ private void setupIdleTimeout() { return; } - Runnable timeoutTask = new Runnable() { - @Override - public void run() { - LOG.warn("Shutting down RSC due to idle timeout ({}).", livyConf.get(SERVER_IDLE_TIMEOUT)); - shutdown(); - } + Runnable timeoutTask = () -> { + LOG.warn("Shutting down RSC due to idle timeout ({}).", livyConf.get(SERVER_IDLE_TIMEOUT)); + shutdown(); }; ScheduledFuture timeout = server.getEventLoopGroup().schedule(timeoutTask, livyConf.getTimeAsMs(SERVER_IDLE_TIMEOUT), TimeUnit.MILLISECONDS); diff --git a/rsc/src/main/java/org/apache/livy/rsc/rpc/Rpc.java b/rsc/src/main/java/org/apache/livy/rsc/rpc/Rpc.java index e4871f528..4ee7ff858 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/rpc/Rpc.java +++ b/rsc/src/main/java/org/apache/livy/rsc/rpc/Rpc.java @@ -111,39 +111,28 @@ public static Promise createClient( final AtomicReference rpc = new AtomicReference<>(); // Set up a timeout to undo everything. - final Runnable timeoutTask = new Runnable() { - @Override - public void run() { - promise.setFailure(new TimeoutException("Timed out waiting for RPC server connection.")); - } - }; + final Runnable timeoutTask = () -> promise.setFailure(new TimeoutException("Timed out waiting for RPC server connection.")); final ScheduledFuture timeoutFuture = eloop.schedule(timeoutTask, config.getTimeAsMs(RPC_CLIENT_HANDSHAKE_TIMEOUT), TimeUnit.MILLISECONDS); // The channel listener instantiates the Rpc instance when the connection is established, // and initiates the SASL handshake. - cf.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture cf) throws Exception { - if (cf.isSuccess()) { - SaslClientHandler saslHandler = new SaslClientHandler(config, clientId, promise, - timeoutFuture, secret, dispatcher); - Rpc rpc = createRpc(config, saslHandler, (SocketChannel) cf.channel(), eloop); - saslHandler.rpc = rpc; - saslHandler.sendHello(cf.channel()); - } else { - promise.setFailure(cf.cause()); - } + cf.addListener((ChannelFutureListener) cf1 -> { + if (cf1.isSuccess()) { + SaslClientHandler saslHandler = new SaslClientHandler(config, clientId, promise, + timeoutFuture, secret, dispatcher); + Rpc rpc1 = createRpc(config, saslHandler, (SocketChannel) cf1.channel(), eloop); + saslHandler.rpc = rpc1; + saslHandler.sendHello(cf1.channel()); + } else { + promise.setFailure(cf1.cause()); } }); // Handle cancellation of the promise. - promise.addListener(new GenericFutureListener>() { - @Override - public void operationComplete(Promise p) { - if (p.isCancelled()) { - cf.cancel(true); - } + promise.addListener((GenericFutureListener>) p -> { + if (p.isCancelled()) { + cf.cancel(true); } }); @@ -429,25 +418,19 @@ public Future call(final Object msg, Class retType) { try { final long id = rpcId.getAndIncrement(); final Promise promise = egroup.next().newPromise(); - final ChannelFutureListener listener = new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture cf) { - if (!cf.isSuccess() && !promise.isDone()) { - LOG.warn("Failed to send RPC, closing connection.", cf.cause()); - promise.setFailure(cf.cause()); - discardRpcCall(id); - close(); - } - } + final ChannelFutureListener listener = cf -> { + if (!cf.isSuccess() && !promise.isDone()) { + LOG.warn("Failed to send RPC, closing connection.", cf.cause()); + promise.setFailure(cf.cause()); + discardRpcCall(id); + close(); + } }; registerRpcCall(id, promise, msg.getClass().getName()); - channel.eventLoop().submit(new Runnable() { - @Override - public void run() { - channel.write(new MessageHeader(id, Rpc.MessageType.CALL)).addListener(listener); - channel.writeAndFlush(msg).addListener(listener); - } + channel.eventLoop().submit(() -> { + channel.write(new MessageHeader(id, MessageType.CALL)).addListener(listener); + channel.writeAndFlush(msg).addListener(listener); }); return promise; diff --git a/rsc/src/main/java/org/apache/livy/rsc/rpc/RpcServer.java b/rsc/src/main/java/org/apache/livy/rsc/rpc/RpcServer.java index 14694fbc9..21b30c844 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/rpc/RpcServer.java +++ b/rsc/src/main/java/org/apache/livy/rsc/rpc/RpcServer.java @@ -145,12 +145,9 @@ public void initChannel(SocketChannel ch) throws Exception { final Rpc newRpc = Rpc.createServer(saslHandler, config, ch, group); saslHandler.rpc = newRpc; - Runnable cancelTask = new Runnable() { - @Override - public void run() { - LOG.warn("Timed out waiting for hello from client."); - newRpc.close(); - } + Runnable cancelTask = () -> { + LOG.warn("Timed out waiting for hello from client."); + newRpc.close(); }; saslHandler.cancelTask = group.schedule(cancelTask, config.getTimeAsMs(RPC_CLIENT_HANDSHAKE_TIMEOUT), diff --git a/rsc/src/test/java/org/apache/livy/rsc/TestSparkClient.java b/rsc/src/test/java/org/apache/livy/rsc/TestSparkClient.java index c8e44f00a..90e7c06af 100644 --- a/rsc/src/test/java/org/apache/livy/rsc/TestSparkClient.java +++ b/rsc/src/test/java/org/apache/livy/rsc/TestSparkClient.java @@ -410,22 +410,16 @@ public void testKillServerWhileSparkSubmitIsRunning() throws Exception { // Block waitFor until process.destroy() is called. final CountDownLatch waitForCalled = new CountDownLatch(1); - when(mockSparkSubmit.waitFor()).thenAnswer(new Answer() { - @Override - public Integer answer(InvocationOnMock invocation) throws Throwable { - waitForCalled.await(); - return 0; - } + when(mockSparkSubmit.waitFor()).thenAnswer((Answer) invocation -> { + waitForCalled.await(); + return 0; }); // Verify process.destroy() is called. final CountDownLatch destroyCalled = new CountDownLatch(1); - doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - destroyCalled.countDown(); - return null; - } + doAnswer((Answer) invocation -> { + destroyCalled.countDown(); + return null; }).when(mockSparkSubmit).destroy(); ContextLauncher.mockSparkSubmit = mockSparkSubmit;