在SpringBoot应用中使用RabbitMQ
约 448 个字 133 行代码 预计阅读时间 3 分钟
引入依赖
| XML |
|---|
| <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
|
编写配置
| YAML |
|---|
1
2
3
4
5
6
7
8
9
10
11
12 | spring:
rabbitmq:
addresses: amqp://admin:admin@127.0.0.1:5672/study
# 或者使用下面分开写的形式
spring:
rabbitmq:
host: 127.0.0.1
port: 5672 # 默认为5672
username: admin
password: admin
virtual-host: study # 默认值为 /
|
声明队列
编写配置文件,并创建一个Bean:
| Java |
|---|
1
2
3
4
5
6
7
8
9
10
11
12 | import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
@Configuration
public class RabbitConfig {
// 创建工作队列
@Bean
public Queue workQueue() {
// 使用QueueBuilder创建一个名为work-queue的可持久化队列
return QueueBuilder.durable("work-queue").build();
}
}
|
声明交换机与绑定关系
声明交换机和绑定关系与声明队列类似,但是需要注意多个Bean注入时的冲突问题,使用@Qualifier指定Bean。交换机的类型有下面三种:
FanoutExchange DirectExchange TopicExchange
生产者发送消息
生产者可以使用RabbitTemplate对象调用covertAndSend方法发送消息:
| Java |
|---|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 | import org.springframework.amqp.rabbit.core.RabbitTemplate;
@RestController
@RequestMapping("/producer")
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/work")
public String workQueue() {
rabbitTemplate.convertAndSend("", "work-queue", "hello spring-amqp: work-queue");
return "消息发送成功";
}
}
|
消费者消费消息
使用@RabbitListener来声明一个消费者,这个注解可以使用在类上(整个类为一个消费者,此时要执行的方法需要被@RabbitHandler修饰),也可以使用在方法上(指定方法为一个消费者),注解中的queues参数值为消费的队列名称。方法的参数可以有两种:
- 消息:
String类型/Message类型,如果是String,则参数值即为接收到的消息,否则参数值包含消息以及其他内容(例如deliveryTag) - 连接:
Channel类型,参数值即为当前消费者的连接信息
| Java |
|---|
| import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
@Component
public class WorkQueueConsumer {
@RabbitListener(queues = "work-queue")
public void workQueue(Message message) {
System.out.println("消费者接收到消息:" + new String(message.getBody()));
}
}
|
发送与接受对象
默认情况下,convertAndSend是支持发送一个对象的,但是这个对象必须要实现Serializable接口,并且直接发送时消息类型会被设置为Java序列化对象,可读性差,所以可以考虑设置一下发送前对象的序列化方式,以Json为例:
| Java |
|---|
1
2
3
4
5
6
7
8
9
10
11
12 | // 返回用于序列化和反序列化的Json对象
@Bean
public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
// 自定义RabbitTemplate对象,使其支持Json序列化和反序列化
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(jackson2JsonMessageConverter()); // 设置消息转换器
return template;
}
|
需要注意,如果生产者发送的是某个对象的Json字符串,那么消费者在接收时如果想直接使用该对象作为参数,也同样需要上面的步骤
例如: