Skip to content

Commit

Permalink
m
Browse files Browse the repository at this point in the history
  • Loading branch information
lenve committed Sep 6, 2021
1 parent e96c45c commit 1aff9cb
Show file tree
Hide file tree
Showing 9 changed files with 353 additions and 0 deletions.
54 changes: 54 additions & 0 deletions rabbitmq2/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.3</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>org.javaboy</groupId>
<artifactId>rabbitmq2</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>rabbitmq2</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>
56 changes: 56 additions & 0 deletions rabbitmq2/src/main/java/org/javaboy/rabbitmq/ConsumerDemo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package org.javaboy.rabbitmq;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
* @author 江南一点雨
* @微信公众号 江南一点雨
* @网站 http://www.itboyhub.com
* @国际站 http://www.javaboy.org
* @微信 a_java_boy
* @GitHub https://github.com/lenve
* @Gitee https://gitee.com/lenve
*/
@Component
public class ConsumerDemo {
// @RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)
// public void handle(Channel channel, Message message) {
// //获取消息编号
// long deliveryTag = message.getMessageProperties().getDeliveryTag();
// try {
// //拒绝消息
// channel.basicReject(deliveryTag, true);
// } catch (IOException e) {
// e.printStackTrace();
// }
// }

// @RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)
// public void handle2(String msg) {
// System.out.println("msg = " + msg);
// }

@RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)
public void handle3(Message message,Channel channel) {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//消息消费的代码写到这里
String s = new String(message.getBody());
System.out.println("s = " + s);
//消费完成后,手动 ack
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
//手动 nack
try {
channel.basicNack(deliveryTag, false, true);
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
}
24 changes: 24 additions & 0 deletions rabbitmq2/src/main/java/org/javaboy/rabbitmq/HelloController.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package org.javaboy.rabbitmq;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
* @author 江南一点雨
* @微信公众号 江南一点雨
* @网站 http://www.itboyhub.com
* @国际站 http://www.javaboy.org
* @微信 a_java_boy
* @GitHub https://github.com/lenve
* @Gitee https://gitee.com/lenve
*/
@RestController
public class HelloController {
@Autowired
MsgService msgService;
@GetMapping("/send")
public void send() {
msgService.send();
}
}
63 changes: 63 additions & 0 deletions rabbitmq2/src/main/java/org/javaboy/rabbitmq/MsgService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package org.javaboy.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.GetResponse;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.BatchingRabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.UUID;

/**
* @author 江南一点雨
* @微信公众号 江南一点雨
* @网站 http://www.itboyhub.com
* @国际站 http://www.javaboy.org
* @微信 a_java_boy
* @GitHub https://github.com/lenve
* @Gitee https://gitee.com/lenve
*/
@Service
public class MsgService {
@Autowired
RabbitTemplate rabbitTemplate;

// @Transactional
public void send() {
rabbitTemplate.convertAndSend(RabbitConfig.JAVABOY_EXCHANGE_NAME, RabbitConfig.JAVABOY_QUEUE_NAME, "hello rabbitmq!".getBytes(), new CorrelationData(UUID.randomUUID().toString()));
}

@Transactional(transactionManager = "transactionManager")
public void receive() {
Object o = rabbitTemplate.receiveAndConvert(RabbitConfig.JAVABOY_QUEUE_NAME);
try {
System.out.println("o = " + new String(((byte[]) o), "UTF-8"));
int i = 1 / 0;
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}

public void receive2() {
Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(true);
long deliveryTag = 0L;
try {
GetResponse getResponse = channel.basicGet(RabbitConfig.JAVABOY_QUEUE_NAME, false);
deliveryTag = getResponse.getEnvelope().getDeliveryTag();
System.out.println("o = " + new String((getResponse.getBody()), "UTF-8"));
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
try {
channel.basicNack(deliveryTag, false, true);
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
}
80 changes: 80 additions & 0 deletions rabbitmq2/src/main/java/org/javaboy/rabbitmq/RabbitConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package org.javaboy.rabbitmq;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryListener;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;

import javax.annotation.PostConstruct;

/**
* @author 江南一点雨
* @微信公众号 江南一点雨
* @网站 http://www.itboyhub.com
* @国际站 http://www.javaboy.org
* @微信 a_java_boy
* @GitHub https://github.com/lenve
* @Gitee https://gitee.com/lenve
*/
@Configuration
public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
public static final String JAVABOY_EXCHANGE_NAME = "javaboy_exchange_name";
public static final String JAVABOY_QUEUE_NAME = "javaboy_queue_name";
private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class);
@Autowired
RabbitTemplate rabbitTemplate;

@Bean
Queue queue() {
return new Queue(JAVABOY_QUEUE_NAME);
}

@Bean
DirectExchange directExchange() {
return new DirectExchange(JAVABOY_EXCHANGE_NAME);
}

@Bean
Binding binding() {
return BindingBuilder.bind(queue())
.to(directExchange())
.with(JAVABOY_QUEUE_NAME);
}

@PostConstruct
public void initRabbitTemplate() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}

@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
logger.info("{}:消息成功到达交换器", correlationData.getId());
} else {
logger.error("{}:消息发送失败", correlationData.getId());
}
}

@Override
public void returnedMessage(ReturnedMessage returned) {
logger.error("{}:消息未成功路由到队列", returned.getMessage().getMessageProperties().getMessageId());
}

@Bean
RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.javaboy.rabbitmq;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.retry.annotation.EnableRetry;

@SpringBootApplication
public class RabbitmqApplication {

public static void main(String[] args) {
SpringApplication.run(RabbitmqApplication.class, args);
}

}
16 changes: 16 additions & 0 deletions rabbitmq2/src/main/resources/application.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
spring.rabbitmq.host=localhost
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.port=5672


#spring.rabbitmq.publisher-confirm-type=correlated
#spring.rabbitmq.publisher-returns=true

spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=1000ms
spring.rabbitmq.template.retry.max-attempts=10
spring.rabbitmq.template.retry.max-interval=10000ms
spring.rabbitmq.template.retry.multiplier=2

spring.rabbitmq.listener.simple.acknowledge-mode=manual
27 changes: 27 additions & 0 deletions rabbitmq2/src/test/java/org/javaboy/rabbitmq/ConsumerTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package org.javaboy.rabbitmq;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.io.UnsupportedEncodingException;

/**
* @author 江南一点雨
* @微信公众号 江南一点雨
* @网站 http://www.itboyhub.com
* @国际站 http://www.javaboy.org
* @微信 a_java_boy
* @GitHub https://github.com/lenve
* @Gitee https://gitee.com/lenve
*/
@SpringBootTest
public class ConsumerTest {
@Autowired
MsgService msgService;
@Test
public void test01() throws UnsupportedEncodingException {
msgService.receive();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.javaboy.rabbitmq;

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.io.IOException;

@SpringBootTest
class RabbitmqApplicationTests {

@Autowired
MsgService msgService;
@Test
void contextLoads() throws IOException {
msgService.send();
}

}

0 comments on commit 1aff9cb

Please sign in to comment.