Ezio's Blog
Posts Categories Tags Music Mood About
Ezio's Blog· Light
☰ Menu
Posts Categories Tags Music Mood About
Expand all Back to top Go to bottom

Spring AMQP

Author: Ezio Date: August 7, 2022  23:02:04 Category: RabbitMQ

**AMQP:**Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang 中的实现有 RabbitMQ 等。

**Spring AMQP:**基于AMQP协议定义的一套API规范,提供了模板来发送和接受消息。有两部分,spring-amqp 是基础抽象,spring-rabbit 是底层的默认实现。

基本使用流程

1.引入依赖

1
2
3
4
5
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.在publisher服务和consumer服务中编写application.yml文件,添加mq连接信息:

1
2
3
4
5
6
7
spring:
rabbitmq:
host: 127.0.0.1 # rabbitMQ的ip地址
port: 5672 # 端口
username: ezio
password: 123456
virtual-host: /

3.publisher发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Slf4j
@SpringBootTest
public class SpringAmqpTest {

//注入rabbitTemplate,用来发送消息
@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void testSendMessage2SimpleQueue() {
String queueName = "simple.queue";
String message = "hello, spring amqp!";
rabbitTemplate.convertAndSend(queueName, message);
System.out.println("发送消息完成!");
}
}

4.consumer消费消息

1
2
3
4
5
6
7
8
9
@Component
public class SpringRabbitListener {

//@RabbitListener,用于监听队列
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");
}
}

5.@RabbitListener

1
2
3
4
5
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"), //绑定的队列
exchange = @Exchange(name = "test.direct"), //声明交换机名称
key = {"red", "blue"} //路由key,可配置通配符,red.# or red.*
))

消息预取

消费者会预先从队列中获取消息来等待处理,会忽略消费者的处理能力,可以通过设置预取上限(prefetch)来打到竞争模式的效果,能者多劳。

1
2
3
4
5
6
7
8
9
10
spring:
rabbitmq:
host: 127.0.0.1 # rabbitMQ的ip地址
port: 5672 # 端口
username: ezio
password: 123456
virtual-host: /
listener:
simple:
prefetch: 1 # 预取上限,默认250条,每次取一条,谁消费快则消费得多

消息转换器

Spring AMQP的对消息对象的处理是有org.springframework.amqp.support.converter.MessageConverter来处理的。默认实现是SimpleMessageConverter,基于JDK的 ObjectOutputStream 完成序列化。想要发送对象消息,需要自定义消息转换器。

1.引入依赖

1
2
3
4
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

2.在springboot启动类中,注入bean

1
2
3
4
5
6
7
8
9
10
11
@SpringBootApplication
public class PublisherApplication {
public static void main(String[] args) {
SpringApplication.run(PublisherApplication.class);
}

@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}

Author: Ezio

Permalink: https://ezioy.cn/2022/08/07/Spring-AMQP/

License: Copyright (c) 2019 CC-BY-NC-4.0 LICENSE

Slogan: Nothing is true,Everything is permitted

Tag(s): # RabbitMQ # Spring AMQP
back · home
消息丢失 Feign
Ezio © 2019 - 2026 | Powered by Hexo & Chic | 访客数量:   浏览次数: | 渝公网安备50011302222043 | 渝ICP备2023013933号-1