From 1aff9cb38f66f23fa9c8bc68c7fcb4c97ca81049 Mon Sep 17 00:00:00 2001 From: wangsong Date: Mon, 6 Sep 2021 19:45:45 +0800 Subject: [PATCH] m --- rabbitmq2/pom.xml | 54 +++++++++++++ .../org/javaboy/rabbitmq/ConsumerDemo.java | 56 +++++++++++++ .../org/javaboy/rabbitmq/HelloController.java | 24 ++++++ .../java/org/javaboy/rabbitmq/MsgService.java | 63 +++++++++++++++ .../org/javaboy/rabbitmq/RabbitConfig.java | 80 +++++++++++++++++++ .../javaboy/rabbitmq/RabbitmqApplication.java | 14 ++++ .../src/main/resources/application.properties | 16 ++++ .../org/javaboy/rabbitmq/ConsumerTest.java | 27 +++++++ .../rabbitmq/RabbitmqApplicationTests.java | 19 +++++ 9 files changed, 353 insertions(+) create mode 100644 rabbitmq2/pom.xml create mode 100644 rabbitmq2/src/main/java/org/javaboy/rabbitmq/ConsumerDemo.java create mode 100644 rabbitmq2/src/main/java/org/javaboy/rabbitmq/HelloController.java create mode 100644 rabbitmq2/src/main/java/org/javaboy/rabbitmq/MsgService.java create mode 100644 rabbitmq2/src/main/java/org/javaboy/rabbitmq/RabbitConfig.java create mode 100644 rabbitmq2/src/main/java/org/javaboy/rabbitmq/RabbitmqApplication.java create mode 100644 rabbitmq2/src/main/resources/application.properties create mode 100644 rabbitmq2/src/test/java/org/javaboy/rabbitmq/ConsumerTest.java create mode 100644 rabbitmq2/src/test/java/org/javaboy/rabbitmq/RabbitmqApplicationTests.java diff --git a/rabbitmq2/pom.xml b/rabbitmq2/pom.xml new file mode 100644 index 0000000..c4ac00a --- /dev/null +++ b/rabbitmq2/pom.xml @@ -0,0 +1,54 @@ + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 2.5.3 + + + org.javaboy + rabbitmq2 + 0.0.1-SNAPSHOT + rabbitmq2 + Demo project for Spring Boot + + 11 + + + + org.springframework.boot + spring-boot-starter-amqp + + + + org.springframework.boot + spring-boot-starter-web + + + + org.springframework.boot + spring-boot-starter-test + test + + + org.springframework.amqp + spring-rabbit-test + test + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + diff --git a/rabbitmq2/src/main/java/org/javaboy/rabbitmq/ConsumerDemo.java b/rabbitmq2/src/main/java/org/javaboy/rabbitmq/ConsumerDemo.java new file mode 100644 index 0000000..ccc0d6c --- /dev/null +++ b/rabbitmq2/src/main/java/org/javaboy/rabbitmq/ConsumerDemo.java @@ -0,0 +1,56 @@ +package org.javaboy.rabbitmq; + +import com.rabbitmq.client.Channel; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; + +import java.io.IOException; + +/** + * @author 江南一点雨 + * @微信公众号 江南一点雨 + * @网站 http://www.itboyhub.com + * @国际站 http://www.javaboy.org + * @微信 a_java_boy + * @GitHub https://github.com/lenve + * @Gitee https://gitee.com/lenve + */ +@Component +public class ConsumerDemo { +// @RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME) +// public void handle(Channel channel, Message message) { +// //获取消息编号 +// long deliveryTag = message.getMessageProperties().getDeliveryTag(); +// try { +// //拒绝消息 +// channel.basicReject(deliveryTag, true); +// } catch (IOException e) { +// e.printStackTrace(); +// } +// } + +// @RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME) +// public void handle2(String msg) { +// System.out.println("msg = " + msg); +// } + + @RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME) + public void handle3(Message message,Channel channel) { + long deliveryTag = message.getMessageProperties().getDeliveryTag(); + try { + //消息消费的代码写到这里 + String s = new String(message.getBody()); + System.out.println("s = " + s); + //消费完成后,手动 ack + channel.basicAck(deliveryTag, false); + } catch (Exception e) { + //手动 nack + try { + channel.basicNack(deliveryTag, false, true); + } catch (IOException ex) { + ex.printStackTrace(); + } + } + } +} diff --git a/rabbitmq2/src/main/java/org/javaboy/rabbitmq/HelloController.java b/rabbitmq2/src/main/java/org/javaboy/rabbitmq/HelloController.java new file mode 100644 index 0000000..91c1ad6 --- /dev/null +++ b/rabbitmq2/src/main/java/org/javaboy/rabbitmq/HelloController.java @@ -0,0 +1,24 @@ +package org.javaboy.rabbitmq; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + * @author 江南一点雨 + * @微信公众号 江南一点雨 + * @网站 http://www.itboyhub.com + * @国际站 http://www.javaboy.org + * @微信 a_java_boy + * @GitHub https://github.com/lenve + * @Gitee https://gitee.com/lenve + */ +@RestController +public class HelloController { + @Autowired + MsgService msgService; + @GetMapping("/send") + public void send() { + msgService.send(); + } +} diff --git a/rabbitmq2/src/main/java/org/javaboy/rabbitmq/MsgService.java b/rabbitmq2/src/main/java/org/javaboy/rabbitmq/MsgService.java new file mode 100644 index 0000000..59e82ac --- /dev/null +++ b/rabbitmq2/src/main/java/org/javaboy/rabbitmq/MsgService.java @@ -0,0 +1,63 @@ +package org.javaboy.rabbitmq; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.GetResponse; +import org.springframework.amqp.rabbit.connection.CorrelationData; +import org.springframework.amqp.rabbit.core.BatchingRabbitTemplate; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.retry.annotation.Retryable; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.UUID; + +/** + * @author 江南一点雨 + * @微信公众号 江南一点雨 + * @网站 http://www.itboyhub.com + * @国际站 http://www.javaboy.org + * @微信 a_java_boy + * @GitHub https://github.com/lenve + * @Gitee https://gitee.com/lenve + */ +@Service +public class MsgService { + @Autowired + RabbitTemplate rabbitTemplate; + + // @Transactional + public void send() { + rabbitTemplate.convertAndSend(RabbitConfig.JAVABOY_EXCHANGE_NAME, RabbitConfig.JAVABOY_QUEUE_NAME, "hello rabbitmq!".getBytes(), new CorrelationData(UUID.randomUUID().toString())); + } + + @Transactional(transactionManager = "transactionManager") + public void receive() { + Object o = rabbitTemplate.receiveAndConvert(RabbitConfig.JAVABOY_QUEUE_NAME); + try { + System.out.println("o = " + new String(((byte[]) o), "UTF-8")); + int i = 1 / 0; + } catch (UnsupportedEncodingException e) { + e.printStackTrace(); + } + } + + public void receive2() { + Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(true); + long deliveryTag = 0L; + try { + GetResponse getResponse = channel.basicGet(RabbitConfig.JAVABOY_QUEUE_NAME, false); + deliveryTag = getResponse.getEnvelope().getDeliveryTag(); + System.out.println("o = " + new String((getResponse.getBody()), "UTF-8")); + channel.basicAck(deliveryTag, false); + } catch (IOException e) { + try { + channel.basicNack(deliveryTag, false, true); + } catch (IOException ex) { + ex.printStackTrace(); + } + } + } +} diff --git a/rabbitmq2/src/main/java/org/javaboy/rabbitmq/RabbitConfig.java b/rabbitmq2/src/main/java/org/javaboy/rabbitmq/RabbitConfig.java new file mode 100644 index 0000000..6dce562 --- /dev/null +++ b/rabbitmq2/src/main/java/org/javaboy/rabbitmq/RabbitConfig.java @@ -0,0 +1,80 @@ +package org.javaboy.rabbitmq; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.*; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.connection.CorrelationData; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.retry.RetryCallback; +import org.springframework.retry.RetryContext; +import org.springframework.retry.RetryListener; +import org.springframework.retry.backoff.ExponentialBackOffPolicy; +import org.springframework.retry.policy.SimpleRetryPolicy; +import org.springframework.retry.support.RetryTemplate; + +import javax.annotation.PostConstruct; + +/** + * @author 江南一点雨 + * @微信公众号 江南一点雨 + * @网站 http://www.itboyhub.com + * @国际站 http://www.javaboy.org + * @微信 a_java_boy + * @GitHub https://github.com/lenve + * @Gitee https://gitee.com/lenve + */ +@Configuration +public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback { + public static final String JAVABOY_EXCHANGE_NAME = "javaboy_exchange_name"; + public static final String JAVABOY_QUEUE_NAME = "javaboy_queue_name"; + private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class); + @Autowired + RabbitTemplate rabbitTemplate; + + @Bean + Queue queue() { + return new Queue(JAVABOY_QUEUE_NAME); + } + + @Bean + DirectExchange directExchange() { + return new DirectExchange(JAVABOY_EXCHANGE_NAME); + } + + @Bean + Binding binding() { + return BindingBuilder.bind(queue()) + .to(directExchange()) + .with(JAVABOY_QUEUE_NAME); + } + + @PostConstruct + public void initRabbitTemplate() { + rabbitTemplate.setConfirmCallback(this); + rabbitTemplate.setReturnsCallback(this); + } + + @Override + public void confirm(CorrelationData correlationData, boolean ack, String cause) { + if (ack) { + logger.info("{}:消息成功到达交换器", correlationData.getId()); + } else { + logger.error("{}:消息发送失败", correlationData.getId()); + } + } + + @Override + public void returnedMessage(ReturnedMessage returned) { + logger.error("{}:消息未成功路由到队列", returned.getMessage().getMessageProperties().getMessageId()); + } + + @Bean + RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) { + return new RabbitTransactionManager(connectionFactory); + } +} diff --git a/rabbitmq2/src/main/java/org/javaboy/rabbitmq/RabbitmqApplication.java b/rabbitmq2/src/main/java/org/javaboy/rabbitmq/RabbitmqApplication.java new file mode 100644 index 0000000..ff6a229 --- /dev/null +++ b/rabbitmq2/src/main/java/org/javaboy/rabbitmq/RabbitmqApplication.java @@ -0,0 +1,14 @@ +package org.javaboy.rabbitmq; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.retry.annotation.EnableRetry; + +@SpringBootApplication +public class RabbitmqApplication { + + public static void main(String[] args) { + SpringApplication.run(RabbitmqApplication.class, args); + } + +} diff --git a/rabbitmq2/src/main/resources/application.properties b/rabbitmq2/src/main/resources/application.properties new file mode 100644 index 0000000..769b0f4 --- /dev/null +++ b/rabbitmq2/src/main/resources/application.properties @@ -0,0 +1,16 @@ +spring.rabbitmq.host=localhost +spring.rabbitmq.username=guest +spring.rabbitmq.password=guest +spring.rabbitmq.port=5672 + + +#spring.rabbitmq.publisher-confirm-type=correlated +#spring.rabbitmq.publisher-returns=true + +spring.rabbitmq.template.retry.enabled=true +spring.rabbitmq.template.retry.initial-interval=1000ms +spring.rabbitmq.template.retry.max-attempts=10 +spring.rabbitmq.template.retry.max-interval=10000ms +spring.rabbitmq.template.retry.multiplier=2 + +spring.rabbitmq.listener.simple.acknowledge-mode=manual \ No newline at end of file diff --git a/rabbitmq2/src/test/java/org/javaboy/rabbitmq/ConsumerTest.java b/rabbitmq2/src/test/java/org/javaboy/rabbitmq/ConsumerTest.java new file mode 100644 index 0000000..56f6946 --- /dev/null +++ b/rabbitmq2/src/test/java/org/javaboy/rabbitmq/ConsumerTest.java @@ -0,0 +1,27 @@ +package org.javaboy.rabbitmq; + +import org.junit.jupiter.api.Test; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; + +import java.io.UnsupportedEncodingException; + +/** + * @author 江南一点雨 + * @微信公众号 江南一点雨 + * @网站 http://www.itboyhub.com + * @国际站 http://www.javaboy.org + * @微信 a_java_boy + * @GitHub https://github.com/lenve + * @Gitee https://gitee.com/lenve + */ +@SpringBootTest +public class ConsumerTest { + @Autowired + MsgService msgService; + @Test + public void test01() throws UnsupportedEncodingException { + msgService.receive(); + } +} diff --git a/rabbitmq2/src/test/java/org/javaboy/rabbitmq/RabbitmqApplicationTests.java b/rabbitmq2/src/test/java/org/javaboy/rabbitmq/RabbitmqApplicationTests.java new file mode 100644 index 0000000..ae89430 --- /dev/null +++ b/rabbitmq2/src/test/java/org/javaboy/rabbitmq/RabbitmqApplicationTests.java @@ -0,0 +1,19 @@ +package org.javaboy.rabbitmq; + +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; + +import java.io.IOException; + +@SpringBootTest +class RabbitmqApplicationTests { + + @Autowired + MsgService msgService; + @Test + void contextLoads() throws IOException { + msgService.send(); + } + +}