From 1f6230c16f4548a1703a10c3d0460eddf834c996 Mon Sep 17 00:00:00 2001 From: Ziad Othman Date: Sat, 18 May 2024 03:25:25 +0300 Subject: [PATCH] Feat/controller (#103) * Added controller. Co-authored-by: Abdelrahman Elsalh Co-authored-by: Ziad Othman * Added missing exclusion. Adding coauthers aho :) Co-authored-by: Abdelrahman Elsalh <70035104+abd0123@users.noreply.github.com> Co-authored-by: Ziad Othman * Removed setting root logger in jobs. --------- Co-authored-by: Abdelrahman Elsalh Co-authored-by: Ahmed Elwasefi Co-authored-by: Abdelrahman Elsalh <70035104+abd0123@users.noreply.github.com> --- compose.override.yaml | 1 - compose.yaml | 6 +- controller/.env | 4 + controller/Dockerfile | 5 + controller/pom.xml | 47 ++++- .../com/workup/controller/CLIHandler.java | 96 +++++++++-- pom.xml | 19 +- services/jobs/pom.xml | 23 ++- .../com/workup/jobs/ControllerMQListener.java | 109 ++++++++++-- .../java/com/workup/jobs/JobsApplication.java | 6 +- .../com/workup/jobs/RabbitMQListener.java | 11 +- .../com/workup/jobs/commands/JobCommand.java | 6 +- .../workup/jobs/commands/JobCommandMap.java | 6 +- .../src/main/resources/application.properties | 2 +- .../workup/payments/ControllerMQListener.java | 163 ++++++++++++++---- .../workup/payments/PaymentsApplication.java | 17 +- shared/pom.xml | 89 +++++----- .../workup/shared/commands/CommandMap.java | 4 + .../commands/controller/ContinueRequest.java | 5 + .../commands/controller/FreezeRequest.java | 5 + ...quest.java => SetLoggingLevelRequest.java} | 5 +- .../controller/UpdateCommandRequest.java | 3 +- .../com/workup/shared/enums/ErrorLevel.java | 8 - .../workup/shared/enums/ThreadPoolSize.java | 5 + 24 files changed, 492 insertions(+), 153 deletions(-) create mode 100644 controller/.env create mode 100644 controller/Dockerfile rename shared/src/main/java/com/workup/shared/commands/controller/{SetErrorReportingLevelRequest.java => SetLoggingLevelRequest.java} (68%) delete mode 100644 shared/src/main/java/com/workup/shared/enums/ErrorLevel.java create mode 100644 shared/src/main/java/com/workup/shared/enums/ThreadPoolSize.java diff --git a/compose.override.yaml b/compose.override.yaml index 9902dd7a..b70f434b 100644 --- a/compose.override.yaml +++ b/compose.override.yaml @@ -1,4 +1,3 @@ - version: '3.7' services: service_jobs: diff --git a/compose.yaml b/compose.yaml index cb11db81..0700244d 100644 --- a/compose.yaml +++ b/compose.yaml @@ -30,6 +30,8 @@ services: service_mq: image: rabbitmq:3.13-management + ports: + - "5672:5672" # hacky method.. dont ever do this :( healthcheck: test: rabbitmq-diagnostics -q ping interval: 30s @@ -38,7 +40,7 @@ services: networks: - default - frontend - + service_redis: image: redis:7.2.4 healthcheck: @@ -101,7 +103,7 @@ services: POSTGRES_USER: payments_user POSTGRES_DB: payments_database healthcheck: - test: ["CMD", "pg_isready"] + test: [ "CMD", "pg_isready" ] interval: 20s timeout: 10s retries: 10 diff --git a/controller/.env b/controller/.env new file mode 100644 index 00000000..3fedf8df --- /dev/null +++ b/controller/.env @@ -0,0 +1,4 @@ +JOBS_MQ_URL=service_mq +JOBS_MQ_USER=guest +JOBS_MQ_PASSWORD=guest +JOBS_MQ_PORT=guest \ No newline at end of file diff --git a/controller/Dockerfile b/controller/Dockerfile new file mode 100644 index 00000000..6730ed34 --- /dev/null +++ b/controller/Dockerfile @@ -0,0 +1,5 @@ +FROM eclipse-temurin:21-jre-alpine +VOLUME /tmp +ARG JAR_FILE=target/*.jar +COPY ${JAR_FILE} controller.jar +ENTRYPOINT ["tail", "-f", "/dev/null"] \ No newline at end of file diff --git a/controller/pom.xml b/controller/pom.xml index c2035f0c..f5ccdbdc 100644 --- a/controller/pom.xml +++ b/controller/pom.xml @@ -20,16 +20,57 @@ org.springframework.boot spring-boot-starter + + + org.springframework.boot + spring-boot-starter-logging + + + + + com.workup + jobs + ${project.version} + + + org.testcontainers + cassandra + + + org.springframework.boot + spring-boot-starter-data-cassandra + + + org.springframework.boot + spring-boot-starter-web + + - org.springframework.boot spring-boot-starter-test + + + org.springframework.boot + spring-boot-starter-logging + + test org.springframework.boot spring-boot-starter-amqp + + + org.springframework.boot + spring-boot-starter-logging + + + + + javassist + javassist + 3.12.1.GA com.workup @@ -41,6 +82,10 @@ cliche 110413 + + org.springframework.boot + spring-boot-starter-log4j2 + diff --git a/controller/src/main/java/com/workup/controller/CLIHandler.java b/controller/src/main/java/com/workup/controller/CLIHandler.java index 69be21e8..9e0200fe 100644 --- a/controller/src/main/java/com/workup/controller/CLIHandler.java +++ b/controller/src/main/java/com/workup/controller/CLIHandler.java @@ -2,10 +2,15 @@ import asg.cliche.CLIException; import asg.cliche.Command; -import com.workup.shared.commands.controller.SetMaxThreadsRequest; +import com.workup.shared.commands.controller.*; +import com.workup.shared.commands.jobs.requests.CreateJobRequest; import com.workup.shared.enums.ControllerQueueNames; +import com.workup.shared.enums.ServiceQueueNames; +import java.io.IOException; import java.util.HashMap; import java.util.Map; +import javassist.*; +import org.apache.logging.log4j.Level; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; @@ -21,7 +26,7 @@ public class CLIHandler { } @Command(description = "Set the maximum number of threads for a specific app") - public String maxthreads(String app, int maxThreads) throws CLIException { + public String maxThreads(String app, int maxThreads) throws CLIException { app = app.toLowerCase(); if (!appQueueMap.containsKey(app)) { return "Error: app can only be jobs, users, contracts or payments!"; @@ -33,22 +38,44 @@ public String maxthreads(String app, int maxThreads) throws CLIException { appQueueMap.get(app), "", SetMaxThreadsRequest.builder().withMaxThreads(maxThreads).build()); - return "MaxThreads"; + return "Command sent"; } @Command(description = "Set the maximum number of DB connections for a specific app") - public String maxdb(String app, int appNum, String maxDBConn) { - return "maxdb"; + public String maxdb(String app, int maxDBConn) { + app = app.toLowerCase(); + if (!appQueueMap.containsKey(app)) { + return "Error: app can only be jobs, users, contracts or payments!"; + } + if (maxDBConn > 100 || maxDBConn < 1) { + return "Error: Max threads must have a value between 1 and 100"; + } + rabbitTemplate.convertAndSend( + appQueueMap.get(app), + "", + SetMaxDBConnectionsRequest.builder().withMaxDBConnections(maxDBConn).build()); + return "Command Sent!"; } @Command(description = "starts a specific app") - public String start(String app, int appNum) { - return "start"; + public String start(String app) { + app = app.toLowerCase(); + if (!appQueueMap.containsKey(app)) { + return "Error: app can only be jobs, users, contracts or payments!"; + } + + rabbitTemplate.convertAndSend(appQueueMap.get(app), "", ContinueRequest.builder().build()); + return "Command sent"; } @Command(description = "stops a specific app") - public String freeze(String app, int appNum) { - return "freeze"; + public String freeze(String app) { + app = app.toLowerCase(); + if (!appQueueMap.containsKey(app)) { + return "Error: app can only be jobs, users, contracts or payments!"; + } + rabbitTemplate.convertAndSend(appQueueMap.get(app), "", FreezeRequest.builder().build()); + return "Command sent"; } @Command(description = "stops a specific app") @@ -57,8 +84,22 @@ public String setmq(String app, int appNum) { } @Command(description = "stops a specific app") - public String setErrorReportingLevel(String app, int appNum) { - return "error level"; + public String setLoggingLevel(String app, String level) { + app = app.toLowerCase(); + if (!appQueueMap.containsKey(app)) { + return "Error: app can only be jobs, users, contracts or payments!"; + } + // To throw an error in case an invalid level is provided :) + Level.valueOf(level); + rabbitTemplate.convertAndSend( + appQueueMap.get(app), "", SetLoggingLevelRequest.builder().withLevel(level).build()); + return "Command sent!!"; + } + + @Command(description = "test") + public void test() { + CreateJobRequest request = CreateJobRequest.builder().withTitle("Ziko").build(); + rabbitTemplate.convertSendAndReceive(ServiceQueueNames.JOBS, request); } @Command(description = "Creates a new command") @@ -67,8 +108,37 @@ public String addcommand(String app, String commandName, String className) { } @Command(description = "Updates an existing command") - public String updatecommand(String app, String commandName, String className) { - return "Update Command"; + public String updateCommand(String app, String commandName, String className) throws Exception { + app = app.toLowerCase(); + if (!appQueueMap.containsKey(app)) { + return "Error: app can only be jobs, users, contracts or payments!"; + } + try { + byte[] byteArray = getByteCode(commandName, className); + rabbitTemplate.convertAndSend( + appQueueMap.get(app), + "", + UpdateCommandRequest.builder() + .withCommandName(commandName) + .withByteCode(byteArray) + .build()); + } catch (Exception ex) { + ex.printStackTrace(); + } + return "Command sent!!"; + } + + private byte[] getByteCode(String commandName, String className) + throws InstantiationException, + IllegalAccessException, + NotFoundException, + IOException, + CannotCompileException { + ClassPool pool = ClassPool.getDefault(); + pool.insertClassPath(new ClassClassPath(ControllerApplication.class)); + CtClass ctClass = pool.get(className); + // That's the compiled class byte code + return ctClass.toBytecode(); } @Command(description = "Deletes an existing command") diff --git a/pom.xml b/pom.xml index 750662ae..c85af379 100644 --- a/pom.xml +++ b/pom.xml @@ -16,7 +16,6 @@ shared - controller services/jobs services/payments services/users @@ -61,18 +60,18 @@ git-build-hook-maven-plugin 3.5.0 - - true - + + true + - - - configure - - + + + configure + + - + diff --git a/services/jobs/pom.xml b/services/jobs/pom.xml index 53281e1e..d6d51d0f 100644 --- a/services/jobs/pom.xml +++ b/services/jobs/pom.xml @@ -48,19 +48,26 @@ shared ${project.version} - - javassist - javassist - 3.12.1.GA - org.springframework.boot spring-boot-starter-test + + + org.springframework.boot + spring-boot-starter-logging + + test org.springframework.boot spring-boot-testcontainers + + + org.springframework.boot + spring-boot-starter-logging + + test @@ -94,6 +101,12 @@ org.springframework.boot spring-boot-starter-data-cassandra + + + org.springframework.boot + spring-boot-starter-logging + + org.projectlombok diff --git a/services/jobs/src/main/java/com/workup/jobs/ControllerMQListener.java b/services/jobs/src/main/java/com/workup/jobs/ControllerMQListener.java index 1a63839a..8ef28ae6 100644 --- a/services/jobs/src/main/java/com/workup/jobs/ControllerMQListener.java +++ b/services/jobs/src/main/java/com/workup/jobs/ControllerMQListener.java @@ -1,38 +1,123 @@ package com.workup.jobs; +import com.workup.jobs.commands.JobCommand; import com.workup.jobs.commands.JobCommandMap; -import com.workup.shared.commands.controller.SetMaxThreadsRequest; +import com.workup.shared.commands.Command; +import com.workup.shared.commands.CommandRequest; +import com.workup.shared.commands.CommandResponse; +import com.workup.shared.commands.controller.*; +import com.workup.shared.enums.ServiceQueueNames; +import com.workup.shared.enums.ThreadPoolSize; import java.lang.reflect.Field; -import javassist.*; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.config.Configurator; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; @Service -@RabbitListener(queues = "#{controllerQueue.name}") +@RabbitListener(queues = "#{controllerQueue.name}", id = "#{controllerQueue.name}") public class ControllerMQListener { - @Autowired public JobCommandMap commandMap; @Autowired public ThreadPoolTaskExecutor taskExecutor; - @Autowired private ApplicationContext context; + @Autowired private RabbitListenerEndpointRegistry registry; @RabbitHandler public void receive(SetMaxThreadsRequest in) throws Exception { try { - ThreadPoolTaskExecutor myBean = context.getBean(ThreadPoolTaskExecutor.class); - Field maxPoolSize = ThreadPoolTaskExecutor.class.getDeclaredField("maxPoolSize"); - maxPoolSize.setAccessible(true); - maxPoolSize.set(myBean, in.getMaxThreads()); - Field corePoolSize = ThreadPoolTaskExecutor.class.getDeclaredField("corePoolSize"); - corePoolSize.setAccessible(true); - corePoolSize.set(myBean, in.getMaxThreads()); + System.out.println("Max threads is: " + taskExecutor.getMaxPoolSize()); + setThreads(in.getMaxThreads()); + ThreadPoolSize.POOL_SIZE = taskExecutor.getMaxPoolSize(); + System.out.println("Max threads set to: " + taskExecutor.getMaxPoolSize()); + } catch (Exception e) { + System.out.println(e.getMessage()); + e.printStackTrace(); + } + } + + @RabbitHandler + public void receive(SetLoggingLevelRequest in) throws Exception { + try { + Logger logger = LogManager.getRootLogger(); + Configurator.setAllLevels(logger.getName(), Level.valueOf(in.getLevel())); + } catch (Exception e) { + System.out.println(e.getMessage()); + e.printStackTrace(); + } + } + + @RabbitHandler + public void receive(FreezeRequest in) throws Exception { + try { + registry.getListenerContainer(ServiceQueueNames.JOBS).stop(); + taskExecutor.shutdown(); + setThreads(0); + System.out.println("Stopped all threads."); } catch (Exception e) { System.out.println(e.getMessage()); e.printStackTrace(); } } + + @RabbitHandler + public void receive(ContinueRequest in) throws Exception { + try { + taskExecutor.start(); + setThreads(ThreadPoolSize.POOL_SIZE); + registry.getListenerContainer(ServiceQueueNames.JOBS).start(); + } catch (Exception e) { + System.out.println(e.getMessage()); + e.printStackTrace(); + } + } + + @SuppressWarnings("unchecked") + @RabbitHandler + public void receive(UpdateCommandRequest in) throws Exception { + try { + String className = commandMap.getCommand(in.getCommandName()).getClass().getName(); + byte[] byteArray = in.getByteCode(); + Class clazz = + (Class) + (new MyClassLoader(this.getClass().getClassLoader()).loadClass(byteArray, className)); + + commandMap.replaceCommand( + in.getCommandName(), + (Class>) + ((Command) clazz.newInstance()).getClass()); + + System.out.println("Updated command: " + in.getCommandName()); + // clazz.newInstance().Run(null); + } catch (Exception e) { + System.out.println(e.getMessage()); + e.printStackTrace(); + } + } + + static class MyClassLoader extends ClassLoader { + public MyClassLoader(ClassLoader classLoader) { + super(classLoader); + } + + public Class loadClass(byte[] byteCode, String className) { + return defineClass(className, byteCode, 0, byteCode.length); + } + } + + private void setThreads(int threads) throws NoSuchFieldException, IllegalAccessException { + ThreadPoolTaskExecutor myBean = context.getBean(ThreadPoolTaskExecutor.class); + Field maxPoolSize = ThreadPoolTaskExecutor.class.getDeclaredField("maxPoolSize"); + maxPoolSize.setAccessible(true); + maxPoolSize.set(myBean, threads); + Field corePoolSize = ThreadPoolTaskExecutor.class.getDeclaredField("corePoolSize"); + corePoolSize.setAccessible(true); + corePoolSize.set(myBean, threads); + } } diff --git a/services/jobs/src/main/java/com/workup/jobs/JobsApplication.java b/services/jobs/src/main/java/com/workup/jobs/JobsApplication.java index 01eb15b9..4ae506ef 100644 --- a/services/jobs/src/main/java/com/workup/jobs/JobsApplication.java +++ b/services/jobs/src/main/java/com/workup/jobs/JobsApplication.java @@ -2,6 +2,7 @@ import com.workup.shared.enums.ControllerQueueNames; import com.workup.shared.enums.ServiceQueueNames; +import com.workup.shared.enums.ThreadPoolSize; import org.springframework.amqp.core.*; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; @@ -56,8 +57,9 @@ public MessageConverter messageConverter() { @Bean public ThreadPoolTaskExecutor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); - executor.setCorePoolSize(50); - executor.setMaxPoolSize(50); + executor.setCorePoolSize(ThreadPoolSize.POOL_SIZE); + executor.setMaxPoolSize(ThreadPoolSize.POOL_SIZE); + executor.setWaitForTasksToCompleteOnShutdown(true); executor.setQueueCapacity(500); executor.setThreadNamePrefix("jobs-"); executor.initialize(); diff --git a/services/jobs/src/main/java/com/workup/jobs/RabbitMQListener.java b/services/jobs/src/main/java/com/workup/jobs/RabbitMQListener.java index 847f20c3..76e5a1b7 100644 --- a/services/jobs/src/main/java/com/workup/jobs/RabbitMQListener.java +++ b/services/jobs/src/main/java/com/workup/jobs/RabbitMQListener.java @@ -1,7 +1,6 @@ package com.workup.jobs; import com.workup.jobs.commands.AcceptProposalCommand; -import com.workup.jobs.commands.CreateJobCommand; import com.workup.jobs.commands.CreateProposalCommand; import com.workup.jobs.commands.GetJobByIdCommand; import com.workup.jobs.commands.GetMyJobsCommand; @@ -9,6 +8,8 @@ import com.workup.jobs.commands.GetProposalsByJobIdCommand; import com.workup.jobs.commands.JobCommandMap; import com.workup.jobs.commands.SearchJobsCommand; +import com.workup.shared.commands.Command; +import com.workup.shared.commands.CommandRequest; import com.workup.shared.commands.jobs.proposals.requests.AcceptProposalRequest; import com.workup.shared.commands.jobs.proposals.requests.CreateProposalRequest; import com.workup.shared.commands.jobs.proposals.requests.GetMyProposalsRequest; @@ -34,7 +35,7 @@ import org.springframework.stereotype.Service; @Service -@RabbitListener(queues = ServiceQueueNames.JOBS) +@RabbitListener(queues = ServiceQueueNames.JOBS, id = ServiceQueueNames.JOBS) public class RabbitMQListener { @Autowired public JobCommandMap commandMap; @@ -42,8 +43,10 @@ public class RabbitMQListener { @RabbitHandler @Async public CompletableFuture receive(CreateJobRequest in) throws Exception { - CreateJobResponse response = ((CreateJobCommand) commandMap.getCommand("CreateJob")).Run(in); - return CompletableFuture.completedFuture(response); + CreateJobResponse resp = + (CreateJobResponse) + ((Command) commandMap.getCommand("CreateJob")).Run(in); + return CompletableFuture.completedFuture(resp); } @RabbitHandler diff --git a/services/jobs/src/main/java/com/workup/jobs/commands/JobCommand.java b/services/jobs/src/main/java/com/workup/jobs/commands/JobCommand.java index de96129e..1b46a9f7 100644 --- a/services/jobs/src/main/java/com/workup/jobs/commands/JobCommand.java +++ b/services/jobs/src/main/java/com/workup/jobs/commands/JobCommand.java @@ -11,9 +11,9 @@ public abstract class JobCommand implements Command { - @Setter JobRepository jobRepository; + @Setter public JobRepository jobRepository; - @Setter ProposalRepository proposalRepository; + @Setter public ProposalRepository proposalRepository; - @Setter AmqpTemplate rabbitTemplate; + @Setter public AmqpTemplate rabbitTemplate; } diff --git a/services/jobs/src/main/java/com/workup/jobs/commands/JobCommandMap.java b/services/jobs/src/main/java/com/workup/jobs/commands/JobCommandMap.java index 8bc9456a..3de55692 100644 --- a/services/jobs/src/main/java/com/workup/jobs/commands/JobCommandMap.java +++ b/services/jobs/src/main/java/com/workup/jobs/commands/JobCommandMap.java @@ -13,11 +13,11 @@ public class JobCommandMap extends CommandMap> { - @Autowired JobRepository jobRepository; + @Autowired public JobRepository jobRepository; - @Autowired ProposalRepository proposalRepository; + @Autowired public ProposalRepository proposalRepository; - @Autowired AmqpTemplate rabbitTemplate; + @Autowired public AmqpTemplate rabbitTemplate; public void registerCommands() { commands.put("CreateJob", CreateJobCommand.class); diff --git a/services/jobs/src/main/resources/application.properties b/services/jobs/src/main/resources/application.properties index 61221848..ce5d1578 100644 --- a/services/jobs/src/main/resources/application.properties +++ b/services/jobs/src/main/resources/application.properties @@ -5,4 +5,4 @@ spring.rabbitmq.password=guest spring.cassandra.local-datacenter=datacenter1 spring.cassandra.keyspace-name=jobs_data spring.cassandra.contact-points=${JOBS_DB_URL} -spring.cassandra.schema-action=CREATE_IF_NOT_EXISTS \ No newline at end of file +spring.cassandra.schema-action=CREATE_IF_NOT_EXISTS diff --git a/services/payments/src/main/java/com/workup/payments/ControllerMQListener.java b/services/payments/src/main/java/com/workup/payments/ControllerMQListener.java index 706bf74d..17526e5f 100644 --- a/services/payments/src/main/java/com/workup/payments/ControllerMQListener.java +++ b/services/payments/src/main/java/com/workup/payments/ControllerMQListener.java @@ -1,38 +1,125 @@ -package com.workup.payments; - -import com.workup.shared.commands.controller.SetMaxThreadsRequest; -import java.lang.reflect.Field; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.springframework.amqp.rabbit.annotation.RabbitHandler; -import org.springframework.amqp.rabbit.annotation.RabbitListener; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.ApplicationContext; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import org.springframework.stereotype.Service; - -@Service -@RabbitListener(queues = "#{controllerQueue.name}") -public class ControllerMQListener { - - private static final Logger logger = LogManager.getLogger(ControllerMQListener.class); - - @Autowired public ThreadPoolTaskExecutor taskExecutor; - - @Autowired private ApplicationContext context; - - @RabbitHandler - public void receive(SetMaxThreadsRequest in) throws Exception { - try { - ThreadPoolTaskExecutor myBean = context.getBean(ThreadPoolTaskExecutor.class); - Field maxPoolSize = ThreadPoolTaskExecutor.class.getDeclaredField("maxPoolSize"); - maxPoolSize.setAccessible(true); - maxPoolSize.set(myBean, in.getMaxThreads()); - Field corePoolSize = ThreadPoolTaskExecutor.class.getDeclaredField("corePoolSize"); - corePoolSize.setAccessible(true); - corePoolSize.set(myBean, in.getMaxThreads()); - } catch (Exception e) { - logger.error("Error setting max threads", e.getMessage()); - } - } -} +package com.workup.payments; + +import com.workup.payments.commands.PaymentCommandMap; +import com.workup.shared.commands.controller.*; +import com.workup.shared.enums.ServiceQueueNames; +import com.workup.shared.enums.ThreadPoolSize; +import com.zaxxer.hikari.HikariDataSource; +import java.lang.reflect.Field; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.config.Configurator; +import org.springframework.amqp.rabbit.annotation.RabbitHandler; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Service; + +@Service +@RabbitListener(queues = "#{controllerQueue.name}", id = "#{controllerQueue.name}") +public class ControllerMQListener { + @Autowired public PaymentCommandMap commandMap; + @Autowired public ThreadPoolTaskExecutor taskExecutor; + @Autowired private ApplicationContext context; + @Autowired private RabbitListenerEndpointRegistry registry; + @Autowired private HikariDataSource hikariDataSource; + + @RabbitHandler + public void receive(SetMaxThreadsRequest in) throws Exception { + try { + System.out.println("Max threads is: " + taskExecutor.getMaxPoolSize()); + setThreads(in.getMaxThreads()); + ThreadPoolSize.POOL_SIZE = taskExecutor.getMaxPoolSize(); + System.out.println("Max threads set to: " + taskExecutor.getMaxPoolSize()); + } catch (Exception e) { + System.out.println(e.getMessage()); + e.printStackTrace(); + } + } + + @RabbitHandler + public void receive(SetLoggingLevelRequest in) throws Exception { + try { + Logger logger = LogManager.getRootLogger(); + Configurator.setAllLevels(logger.getName(), Level.valueOf(in.getLevel())); + } catch (Exception e) { + System.out.println(e.getMessage()); + e.printStackTrace(); + } + } + + @RabbitHandler + public void receive(FreezeRequest in) throws Exception { + try { + registry.getListenerContainer(ServiceQueueNames.JOBS).stop(); + taskExecutor.shutdown(); + setThreads(0); + System.out.println("Stopped all threads."); + } catch (Exception e) { + System.out.println(e.getMessage()); + e.printStackTrace(); + } + } + + @RabbitHandler + public void receive(ContinueRequest in) throws Exception { + try { + taskExecutor.start(); + setThreads(ThreadPoolSize.POOL_SIZE); + registry.getListenerContainer(ServiceQueueNames.JOBS).start(); + } catch (Exception e) { + System.out.println(e.getMessage()); + e.printStackTrace(); + } + } + + @RabbitHandler + public void receive(UpdateCommandRequest in) throws Exception { + // try { + // String className = commandMap.getCommand(in.getName()).getClass().getName(); + // System.out.println("Updating command: " + in.getName()); + // System.out.println("Class: " + className); + // Class newClass = new MyClassLoader().loadClass(in.getByteCode(), className); + // commandMap.replaceCommand(in.getName(), newClass); + // } catch (Exception e) { + // System.out.println(e.getMessage()); + // e.printStackTrace(); + // } + } + + @RabbitHandler + private void SetMaxDBConnections(SetMaxDBConnectionsRequest in) { + try { + if (hikariDataSource == null) { + System.out.println("HikariDataSource is null"); + return; + } + System.out.println("Max DB connections is: " + hikariDataSource.getMaximumPoolSize()); + hikariDataSource.setMaximumPoolSize(in.getMaxDBConnections()); + System.out.println("Max DB connections set to: " + hikariDataSource.getMaximumPoolSize()); + } catch (Exception e) { + System.out.println(e.getMessage()); + e.printStackTrace(); + } + } + + private void setThreads(int threads) throws NoSuchFieldException, IllegalAccessException { + ThreadPoolTaskExecutor myBean = context.getBean(ThreadPoolTaskExecutor.class); + Field maxPoolSize = ThreadPoolTaskExecutor.class.getDeclaredField("maxPoolSize"); + maxPoolSize.setAccessible(true); + maxPoolSize.set(myBean, threads); + Field corePoolSize = ThreadPoolTaskExecutor.class.getDeclaredField("corePoolSize"); + corePoolSize.setAccessible(true); + corePoolSize.set(myBean, threads); + } +} + +class MyClassLoader extends ClassLoader { + public Class loadClass(byte[] byteCode, String className) { + System.out.println("Loading class: " + className); + return defineClass(className, byteCode, 0, byteCode.length); + } +} diff --git a/services/payments/src/main/java/com/workup/payments/PaymentsApplication.java b/services/payments/src/main/java/com/workup/payments/PaymentsApplication.java index fee10a68..819e50fc 100644 --- a/services/payments/src/main/java/com/workup/payments/PaymentsApplication.java +++ b/services/payments/src/main/java/com/workup/payments/PaymentsApplication.java @@ -2,6 +2,8 @@ import com.workup.shared.enums.ControllerQueueNames; import com.workup.shared.enums.ServiceQueueNames; +import com.workup.shared.enums.ThreadPoolSize; + import org.springframework.amqp.core.AnonymousQueue; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; @@ -32,11 +34,6 @@ public Queue myQueue() { return new Queue(ServiceQueueNames.PAYMENTS); } - @Bean - public MessageConverter messageConverter() { - return new Jackson2JsonMessageConverter(); - } - @Bean public Queue controllerQueue() { return new AnonymousQueue(); @@ -52,11 +49,17 @@ public Binding fanoutBinding(FanoutExchange fanout, Queue controllerQueue) { return BindingBuilder.bind(controllerQueue).to(fanout); } + @Bean + public MessageConverter messageConverter() { + return new Jackson2JsonMessageConverter(); + } + @Bean public ThreadPoolTaskExecutor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); - executor.setCorePoolSize(50); - executor.setMaxPoolSize(50); + executor.setCorePoolSize(ThreadPoolSize.POOL_SIZE); + executor.setMaxPoolSize(ThreadPoolSize.POOL_SIZE); + executor.setWaitForTasksToCompleteOnShutdown(true); executor.setQueueCapacity(500); executor.setThreadNamePrefix("payments-"); executor.initialize(); diff --git a/shared/pom.xml b/shared/pom.xml index 17c02a8c..cec90cd1 100644 --- a/shared/pom.xml +++ b/shared/pom.xml @@ -1,42 +1,53 @@ - 4.0.0 - com.workup - shared - 0.0.1-SNAPSHOT - shared - - 21 - 21 - - - com.workup - main - 1.0-SNAPSHOT - - - - com.fasterxml.jackson.dataformat - jackson-dataformat-xml - 2.16.0 - - - org.springframework.boot - spring-boot-starter-data-redis - 3.1.2 - - - redis.clients - jedis - 3.7.0 - - - org.projectlombok - lombok - 1.18.30 - provided - - + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + 4.0.0 + com.workup + shared + 0.0.1-SNAPSHOT + shared + + 21 + 21 + + + com.workup + main + 1.0-SNAPSHOT + + + + com.fasterxml.jackson.dataformat + jackson-dataformat-xml + 2.16.0 + + + org.springframework.boot + spring-boot-starter-data-redis + 3.1.2 + + + org.springframework.boot + spring-boot-starter-logging + + + + + redis.clients + jedis + 3.7.0 + + + org.projectlombok + lombok + 1.18.30 + provided + + + org.springframework.boot + spring-boot-starter-log4j2 + 3.2.5 + + \ No newline at end of file diff --git a/shared/src/main/java/com/workup/shared/commands/CommandMap.java b/shared/src/main/java/com/workup/shared/commands/CommandMap.java index 8d34facd..ece18e29 100644 --- a/shared/src/main/java/com/workup/shared/commands/CommandMap.java +++ b/shared/src/main/java/com/workup/shared/commands/CommandMap.java @@ -24,4 +24,8 @@ public R getCommand(String command) throws Exception { setupCommand(commandInstance); return commandInstance; } + + public void replaceCommand(String command, Class newCommand) { + commands.put(command, newCommand); + } } diff --git a/shared/src/main/java/com/workup/shared/commands/controller/ContinueRequest.java b/shared/src/main/java/com/workup/shared/commands/controller/ContinueRequest.java index 947802a6..99f68a77 100644 --- a/shared/src/main/java/com/workup/shared/commands/controller/ContinueRequest.java +++ b/shared/src/main/java/com/workup/shared/commands/controller/ContinueRequest.java @@ -1,6 +1,11 @@ package com.workup.shared.commands.controller; +import lombok.Builder; +import lombok.extern.jackson.Jacksonized; + /** Makes a service start accepting requests and acquire resources again. */ +@Builder +@Jacksonized public class ContinueRequest { // No fields are required? } diff --git a/shared/src/main/java/com/workup/shared/commands/controller/FreezeRequest.java b/shared/src/main/java/com/workup/shared/commands/controller/FreezeRequest.java index 563c88c2..3f841e89 100644 --- a/shared/src/main/java/com/workup/shared/commands/controller/FreezeRequest.java +++ b/shared/src/main/java/com/workup/shared/commands/controller/FreezeRequest.java @@ -1,6 +1,11 @@ package com.workup.shared.commands.controller; +import lombok.Builder; +import lombok.extern.jackson.Jacksonized; + /** Makes a service stop accepting requests and release resources. */ +@Builder +@Jacksonized public class FreezeRequest { // No fields are required? } diff --git a/shared/src/main/java/com/workup/shared/commands/controller/SetErrorReportingLevelRequest.java b/shared/src/main/java/com/workup/shared/commands/controller/SetLoggingLevelRequest.java similarity index 68% rename from shared/src/main/java/com/workup/shared/commands/controller/SetErrorReportingLevelRequest.java rename to shared/src/main/java/com/workup/shared/commands/controller/SetLoggingLevelRequest.java index f7b15132..e035135a 100644 --- a/shared/src/main/java/com/workup/shared/commands/controller/SetErrorReportingLevelRequest.java +++ b/shared/src/main/java/com/workup/shared/commands/controller/SetLoggingLevelRequest.java @@ -1,6 +1,5 @@ package com.workup.shared.commands.controller; -import com.workup.shared.enums.ErrorLevel; import lombok.Builder; import lombok.Getter; import lombok.extern.jackson.Jacksonized; @@ -9,6 +8,6 @@ @Getter @Builder(setterPrefix = "with") @Jacksonized -public class SetErrorReportingLevelRequest { - ErrorLevel errorLevel; +public class SetLoggingLevelRequest { + String level; } diff --git a/shared/src/main/java/com/workup/shared/commands/controller/UpdateCommandRequest.java b/shared/src/main/java/com/workup/shared/commands/controller/UpdateCommandRequest.java index 1d2fbafa..b8c01ed0 100644 --- a/shared/src/main/java/com/workup/shared/commands/controller/UpdateCommandRequest.java +++ b/shared/src/main/java/com/workup/shared/commands/controller/UpdateCommandRequest.java @@ -9,5 +9,6 @@ @Builder(setterPrefix = "with") @Jacksonized public class UpdateCommandRequest { - String name; + String commandName; + byte[] byteCode; } diff --git a/shared/src/main/java/com/workup/shared/enums/ErrorLevel.java b/shared/src/main/java/com/workup/shared/enums/ErrorLevel.java deleted file mode 100644 index 444baebe..00000000 --- a/shared/src/main/java/com/workup/shared/enums/ErrorLevel.java +++ /dev/null @@ -1,8 +0,0 @@ -package com.workup.shared.enums; - -public enum ErrorLevel { - INFO, - WARNING, - ERROR, - FATAL -} diff --git a/shared/src/main/java/com/workup/shared/enums/ThreadPoolSize.java b/shared/src/main/java/com/workup/shared/enums/ThreadPoolSize.java new file mode 100644 index 00000000..4a6df8bc --- /dev/null +++ b/shared/src/main/java/com/workup/shared/enums/ThreadPoolSize.java @@ -0,0 +1,5 @@ +package com.workup.shared.enums; + +public class ThreadPoolSize { + public static int POOL_SIZE = 50; +}