diff --git a/README.md b/README.md index 8e50f37..23777bf 100644 --- a/README.md +++ b/README.md @@ -2,8 +2,6 @@ ### 此为系统核心交互组件,包含了事件和RPC系统 -!!!! 现在RPC Server不能正常返回 method not found错误 - ### 依赖包 1. amqp-client-4.0.2 > http://central.maven.org/maven2/com/rabbitmq/amqp-client/4.0.2/amqp-client-4.0.2.jar @@ -13,6 +11,8 @@ > http://central.maven.org/maven2/org/slf4j/slf4j-api/1.7.25/slf4j-api-1.7.25.jar 4. slf4j-nop-1.7.25 > http://central.maven.org/maven2/org/slf4j/slf4j-nop/1.7.25/slf4j-nop-1.7.25.jar +5. commons-cli-1.4 (测试程序部分) +> http://mirrors.hust.edu.cn/apache//commons/cli/binaries/commons-cli-1.4-bin.zip #### 安装包: 1. 下载 Jar 包(已经包含所有依赖) diff --git a/src/META-INF/MANIFEST.MF b/src/META-INF/MANIFEST.MF index d396f31..d50b25b 100644 --- a/src/META-INF/MANIFEST.MF +++ b/src/META-INF/MANIFEST.MF @@ -1,2 +1,3 @@ -Manifest-Version: 1.4.2 +Manifest-Version: 1.0 +Main-Class: rpc.synapse.test.Test diff --git a/src/rpc/synapse/chaos/RpcServer.java b/src/rpc/synapse/chaos/RpcServer.java index f281d79..918733a 100644 --- a/src/rpc/synapse/chaos/RpcServer.java +++ b/src/rpc/synapse/chaos/RpcServer.java @@ -44,33 +44,30 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp if (s.debug) { Synapse.log(String.format("Rpc Receive: (%s)%s@%s->%s %s", properties.getMessageId(), properties.getType(), s.appName, properties.getReplyTo(), content.toJSONString()), Synapse.LOGDEBUG); } + JSONObject res = new JSONObject(); if (alias.containsKey(properties.getType())) { - JSONObject res = new JSONObject(); Class[] cArgs = new Class[2]; cArgs[0] = JSONObject.class; cArgs[1] = AMQP.BasicProperties.class; try { Method m = s.rpcCallback.getClass().getMethod(alias.get(properties.getType()), cArgs); - System.out.println("我是外貌dfsdfsldjfklsdjfklsjdklfjsdlf第一个"); res = (JSONObject) m.invoke(s.rpcCallback, content, properties); } catch (Exception e) { - res.put("rpc_error", "method not found"); - System.out.println("我是CATCH语句"); - } - System.out.println("我是外貌第一个"); - String reply = String.format("client.%s.%s", properties.getReplyTo(), properties.getAppId()); - AMQP.BasicProperties props = new AMQP.BasicProperties().builder() - .appId(s.appId) - .replyTo(s.appName) - .correlationId(properties.getMessageId()) - .messageId(Synapse.randomString()) - .type(properties.getType()).build(); - ch.basicPublish(s.sysName, reply, props, res.toJSONString().getBytes()); - if (s.debug) { - Synapse.log(String.format("Rpc Return: (%s)%s@%s->%s %s", properties.getMessageId(), properties.getType(), s.appName, properties.getReplyTo(), res.toJSONString()), Synapse.LOGDEBUG); + e.getMessage(); } } else { - ch.basicNack(envelope.getDeliveryTag(), false, false); + res.put("rpc_error", "method not found"); + } + String reply = String.format("client.%s.%s", properties.getReplyTo(), properties.getAppId()); + AMQP.BasicProperties props = new AMQP.BasicProperties().builder() + .appId(s.appId) + .replyTo(s.appName) + .correlationId(properties.getMessageId()) + .messageId(Synapse.randomString()) + .type(properties.getType()).build(); + ch.basicPublish(s.sysName, reply, props, res.toJSONString().getBytes()); + if (s.debug) { + Synapse.log(String.format("Rpc Return: (%s)%s@%s->%s %s", properties.getMessageId(), properties.getType(), s.appName, properties.getReplyTo(), res.toJSONString()), Synapse.LOGDEBUG); } } }; diff --git a/src/rpc/synapse/test/EventTest.java b/src/rpc/synapse/test/EventTest.java new file mode 100644 index 0000000..6dc1724 --- /dev/null +++ b/src/rpc/synapse/test/EventTest.java @@ -0,0 +1,26 @@ +package rpc.synapse.test; + +import com.alibaba.fastjson.JSONObject; +import com.rabbitmq.client.AMQP; +import rpc.synapse.chaos.BaseCallback; + +import java.util.Hashtable; + +public class EventTest extends BaseCallback { + @Override + public Hashtable regAlias() { + Hashtable alias = new Hashtable<>(); + alias.put("dotnet.test", "test"); + alias.put("golang.test", "test"); + alias.put("python.test", "test"); + alias.put("php.test", "test"); + alias.put("ruby.test", "test"); + alias.put("java.test", "test"); + return alias; + } + + public boolean test(JSONObject body, AMQP.BasicProperties props) { + System.out.printf("**收到EVENT: %s@%s %s\n", props.getType(), props.getReplyTo(), body.toJSONString()); + return true; + } +} diff --git a/src/rpc/synapse/test/RpcTest.java b/src/rpc/synapse/test/RpcTest.java new file mode 100644 index 0000000..0c579b7 --- /dev/null +++ b/src/rpc/synapse/test/RpcTest.java @@ -0,0 +1,25 @@ +package rpc.synapse.test; + +import com.alibaba.fastjson.JSONObject; +import com.rabbitmq.client.AMQP; +import rpc.synapse.chaos.BaseCallback; + +import java.util.Hashtable; + +public class RpcTest extends BaseCallback { + @Override + public Hashtable regAlias() { + Hashtable alias = new Hashtable<>(); + alias.put("test", "test"); + return alias; + } + + public JSONObject test(JSONObject body, AMQP.BasicProperties props) { + System.out.printf("**RPC请求: %s@%s %s\n", props.getType(), props.getReplyTo(), body.toJSONString()); + JSONObject ret = new JSONObject(); + ret.put("from", "java"); + ret.put("m", body.get("msg")); + ret.put("number", 5233); + return ret; + } +} diff --git a/src/rpc/synapse/test/Test.java b/src/rpc/synapse/test/Test.java new file mode 100644 index 0000000..29ae23b --- /dev/null +++ b/src/rpc/synapse/test/Test.java @@ -0,0 +1,110 @@ +package rpc.synapse.test; + +import com.alibaba.fastjson.JSONObject; +import org.apache.commons.cli.*; +import rpc.synapse.chaos.Synapse; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; + + +public class Test { + public static void main(String[] args) throws Exception { + Options options = new Options(); + options.addOption("h", "help", false, "Show Help"); + options.addRequiredOption(null, "host", true, "RabbitMQ Host"); + options.addOption(null, "port", true, "RabbitMQ Port"); + options.addRequiredOption("u", "user", true, "RabbitMQ Username"); + options.addRequiredOption("p", "pass", true, "RabbitMQ Password"); + options.addOption(null, "vhost", true, "RabbitMQ Virtual Host"); + options.addRequiredOption(null, "sys_name", true, "system name"); + options.addOption("d", "debug", false, "Debug Mode"); + CommandLineParser parser = new DefaultParser(); + CommandLine cmd = parser.parse(options, args); + if (cmd.hasOption("h")) { + String formatStr = "must need theses parameters"; + HelpFormatter hf = new HelpFormatter(); + hf.printHelp(formatStr, "", options, ""); + return; + } + Synapse app = new Synapse(); + if (cmd.hasOption("debug")) { + app.debug = true; + } + if (cmd.hasOption("host")) { + app.mqHost = cmd.getOptionValue("host"); + } + if (cmd.hasOption("port")) { + app.mqPort = Integer.parseInt(cmd.getOptionValue("port")); + } + if (cmd.hasOption("user")) { + app.mqUser = cmd.getOptionValue("user"); + } + if (cmd.hasOption("pass")) { + app.mqPass = cmd.getOptionValue("pass"); + } + if (cmd.hasOption("vhost")) { + app.mqVHost = cmd.getOptionValue("vhost"); + } + if (cmd.hasOption("sys_name")) { + app.sysName = cmd.getOptionValue("sys_name"); + } + app.appName = "java"; + app.eventCallback = new EventTest(); + app.rpcCallback = new RpcTest(); + app.serve(); + String[] inputs; + JSONObject body; + JSONObject res; + showHelp(); + while (true) { + body = new JSONObject(); + String str = readDataFromConsole("input >> "); + inputs = str.split(" "); + switch (inputs[0]) { + case "event": + if (inputs.length != 3) { + showHelp(); + continue; + } + body.put("msg", inputs[2]); + app.sendEvent(inputs[1], body); + break; + case "rpc": + if (inputs.length != 4) { + showHelp(); + continue; + } + body.put("msg", inputs[2]); + res = app.sendRpc(inputs[1], inputs[2], body); + System.out.println(res.toJSONString()); + break; + default: + showHelp(); + } + } + } + + private static void showHelp() { + System.out.println("----------------------------------------------"); + System.out.println("| event usage: |"); + System.out.println("| > event [event] [msg] |"); + System.out.println("| rpc usage: |"); + System.out.println("| > rpc [app] [method] [msg] |"); + System.out.println("----------------------------------------------"); + } + + private static String readDataFromConsole(String prompt) { + BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); + String str = null; + try { + System.out.print(prompt); + str = br.readLine(); + + } catch (IOException e) { + e.printStackTrace(); + } + return str; + } +}