springboot集成rabbitmq
原创大约 5 分钟
开发环境
- jdk:1.8
- spring-rabbit:2.3.9
- springboot:2.4.8
项目目录
配置文件
spring:
datasource:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/library?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=GMT%2B8&rewriteBatchedStatements=true
username: root
password: 123456
rabbitmq:
host: localhost
port: 5675
publisher-confirm-type: correlated # 开启确认模式
publisher-returns: true # 开启退回模式
listener:
simple:
acknowledge-mode: manual # 手动确认
concurrency: 10 # 并发数
max-concurrency: 20 # 最大并发数
jpa:
hibernate:
ddl-auto: update
properties:
hibernate:
dialect: org.hibernate.dialect.MySQL8Dialect
server:
port: 8112
第一种模式
/**
* @Description 1. "Hello World!"
* @Author WuYiLong
* @Date 2024/8/14 15:34
*/
@Slf4j
@Service
public class FirstModeExample {
public static final String QUEUE = "firstModeQueue";
public static final String EXCHANGE = "firstModeExchange";
public static final String ROUTE_KEY = "firstModeRouteKey";
@RabbitListener(bindings = {@QueueBinding(value = @Queue(name = QUEUE), exchange = @Exchange(name = EXCHANGE), key = {ROUTE_KEY})})
public void consumer(String data, Channel channel, Message message) throws IOException {
log.info("第一种模式:生产者-队列-消费者,数据:{}",data);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
第二种模式
/**
* @Description 2. Work Queues
* @Author WuYiLong
* @Date 2024/8/14 16:12
*/
@Slf4j
@Service
public class SecondModeExample {
public static final String QUEUE = "secondModeQueue";
public static final String EXCHANGE = "secondModeExchange";
public static final String ROUTE_KEY = "secondModeRouteKey";
@RabbitListener(bindings = @QueueBinding(value =@Queue(name = QUEUE),exchange = @Exchange(name = EXCHANGE),key = ROUTE_KEY))
public void consumer1(String data, Channel channel, Message message) throws IOException {
log.info("消费者1正在消费:{}",data);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
@RabbitListener(bindings = @QueueBinding(value =@Queue(name = QUEUE),exchange = @Exchange(name = EXCHANGE),key = ROUTE_KEY))
public void consumer2(String data, Channel channel, Message message) throws IOException {
log.info("消费者2正在消费:{}",data);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
第三种模式
/**
* @Description 3. Publish/Subscribe
* @Author WuYiLong
* @Date 2024/8/14 16:37
*/
@Slf4j
@Service
public class ThirdModeExample {
public static final String QUEUE1 = "thirdModeQueue1";
public static final String QUEUE2 = "thirdModeQueue2";
public static final String EXCHANGE = "thirdModeExchange";
@RabbitListener(bindings = @QueueBinding(value = @Queue(QUEUE1), exchange = @Exchange(name = EXCHANGE,type = ExchangeTypes.FANOUT)))
public void consumer1(String data, Channel channel, Message message) throws Exception {
log.info("consumer1: {}", data);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
@RabbitListener(bindings = @QueueBinding(value = @Queue(QUEUE2), exchange = @Exchange(name=EXCHANGE,type = ExchangeTypes.FANOUT)))
public void consumer2(String data, Channel channel, Message message) throws Exception {
log.info("consumer2: {}", data);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
第四种模式
/**
* @Description 4. Routing
* @Author WuYiLong
* @Date 2024/8/15 14:05
*/
@Slf4j
@Service
public class FourModeExample {
public static final String QUEUE1 = "fourModeQueue1";
public static final String QUEUE2 = "fourModeQueue2";
public static final String EXCHANGE = "fourModeExchange";
public static final String ROUTING_KEY1 = "fourModeRoutingKey1";
public static final String ROUTING_KEY2 = "fourModeRoutingKey2";
public static final String ROUTING_KEY3 = "fourModeRoutingKey3";
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = QUEUE1),exchange = @Exchange(value = EXCHANGE),key = {ROUTING_KEY1,ROUTING_KEY2,ROUTING_KEY3}))
public void consumer1(String data, Channel channel, Message message) throws Exception {
log.info("consumer1->{}", data);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = QUEUE2),exchange = @Exchange(value = EXCHANGE),key = {ROUTING_KEY1}))
public void consumer2(String data, Channel channel, Message message) throws Exception {
log.info("consumer2->{}", data);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
第五种模式
/**
* @Description 5. Topics
* @Author WuYiLong
* @Date 2024/8/16 10:39
*/
@Slf4j
@Service
public class FiveModeExample {
public static final String QUEUE1 = "queue1";
public static final String QUEUE2 = "queue2";
public static final String EXCHANGE = "exchange";
public static final String ROUTING_KEY1 = "*.a.*";
public static final String ROUTING_KEY2 = "*.*.b";
public static final String ROUTING_KEY3 = "c.#";
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = QUEUE1),exchange = @Exchange(name = EXCHANGE,type = ExchangeTypes.TOPIC),key = ROUTING_KEY1))
public void consumer1(String data, Channel channel, Message message) throws Exception {
log.info("consumer1: {}", data);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = QUEUE2),exchange = @Exchange(name = EXCHANGE,type = ExchangeTypes.TOPIC),key = {ROUTING_KEY2,ROUTING_KEY3}))
public void consumer2(String data, Channel channel, Message message) throws Exception {
log.info("consumer2: {}", data);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
第六种模式
注意:客户端和服务器端不能在同一个服务,必须分开两个服务写
要不会造成不断循环
服务器端
- 配置类
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
rabbitTemplate.setConfirmCallback(new DefaultConfirmCallback());
rabbitTemplate.setReturnsCallback(new DefaultReturnsCallback());
rabbitTemplate.setMandatory(true); //设置为true,才会触发DefaultReturnsCallback方法
// rabbitTemplate.setChannelTransacted(true); // 支持事务,但使用RPC模式时需要关闭
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); // 消息转换器,支持转换对象
return rabbitTemplate;
}
- 队列和交换机的绑定
/**
* @Description
* @Author WuYiLong
* @Date 2024/8/19 17:24
*/
@Configuration
public class RabbitRpcConfig {
public static final String REPLY_QUEUE = "sixModeReplyQueue";
public static final String RPC_QUEUE = "sixModeRpcQueue";
public static final String EXCHANGE = "sixModeExchange";
public static final String RPC_ROUTING_KEY = "sixModeRpcRoutingKey";
public static final String REPLY_ROUTING_KEY = "sixModeReplyRoutingKey";
@Bean
public Queue replyQueue() {
return QueueBuilder.durable(REPLY_QUEUE).build();
}
@Bean
public Exchange rpcExchange() {
return ExchangeBuilder.topicExchange(EXCHANGE).build();
}
@Bean
public Binding replyBinding() {
return BindingBuilder.bind(replyQueue()).to(rpcExchange()).with(REPLY_ROUTING_KEY).noargs();
}
@Bean
public Queue rpcQueue() {
return QueueBuilder.durable(RPC_QUEUE).build();
}
@Bean
public Binding rpcBinding() {
return BindingBuilder.bind(rpcQueue()).to(rpcExchange()).with(RPC_ROUTING_KEY).noargs();
}
}
- 监听类
用于监听客户端发送过来的消息
/**
* @Description rpc模式服务器端
* @Author WuYiLong
* @Date 2024/8/16 14:31
*/
@Slf4j
@Service
public class SixModeExample {
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = RabbitRpcConfig.RPC_QUEUE)
public void consumer(Channel channel,Message message) throws IOException {
log.info("consumer->{}", StringUtils.toEncodedString(message.getBody(), StandardCharsets.UTF_8));
Message build = MessageBuilder.withBody("已收到消息了".getBytes(StandardCharsets.UTF_8)).build();
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
String correlationId = message.getMessageProperties().getCorrelationId();
CorrelationData correlationData = new CorrelationData(correlationId);
rabbitTemplate.sendAndReceive(RabbitRpcConfig.EXCHANGE, RabbitRpcConfig.REPLY_ROUTING_KEY,build ,correlationData);
}
}
客户端
- 配置类
/**
* @Description
* @Author WuYiLong
* @Date 2024/8/19 17:20
*/
@Configuration
public class RabbitClientConfig {
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
simpleMessageListenerContainer.setConnectionFactory(connectionFactory);
simpleMessageListenerContainer.setQueueNames(RabbitRpcConfig.REPLY_QUEUE); // 监听的队列
simpleMessageListenerContainer.setMessageListener(rabbitTemplate(connectionFactory));
return simpleMessageListenerContainer;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); // 消息转换器,支持转换对象
rabbitTemplate.setReplyAddress(RabbitRpcConfig.REPLY_QUEUE); // 使用 RabbitTemplate发送和接收消息,并设置回调队列地址
rabbitTemplate.setReplyTimeout(60000);
return rabbitTemplate;
}
}
- 客户端发送消息
/**
* @Description
* @Author WuYiLong
* @Date 2024/8/19 17:30
*/
@Api(tags = "rpc-客户端api")
@RestController
@RequestMapping(value = "rpcClient")
public class RpcClientController {
@Autowired
private RabbitTemplate rabbitTemplate;
@ApiOperation(value = "客户端发送消息")
@GetMapping(value = "sendMessage")
public void sendMessage(String msg) {
Message message = MessageBuilder.withBody(msg.getBytes(StandardCharsets.UTF_8)).build();
Message result = rabbitTemplate.sendAndReceive(RabbitRpcConfig.EXCHANGE, RabbitRpcConfig.RPC_ROUTING_KEY, message);
if(result != null) {
String correlationId = message.getMessageProperties().getCorrelationId();
String returnCorrelationId = result.getMessageProperties().getHeader("spring_returned_message_correlation");
if(correlationId.equals(returnCorrelationId)) {
System.out.println("已确认该消息由该客户端发送");
}
System.out.println(new String(result.getBody()));
}
}
}
死信队列(也可以当延迟队列使用)
- 队列和交换机的设置
/********************************测试死信队列****************************************/
@Bean
public Queue ordinaryQueue() {
return QueueBuilder.durable(DeadLetterQueueExample.ORDINARY_QUEUE).ttl(5000).deadLetterExchange(DeadLetterQueueExample.DEAD_LETTER_EXCHANGE).deadLetterRoutingKey(DeadLetterQueueExample.DEAD_LETTER_ROUTING_KEY).build();
}
@Bean
public Exchange ordinaryExchange() {
return ExchangeBuilder.directExchange(DeadLetterQueueExample.ORDINARY_EXCHANGE).durable(true).build();
}
@Bean
public Binding ordinaryBinding() {
return BindingBuilder.bind(ordinaryQueue()).to(ordinaryExchange()).with(DeadLetterQueueExample.ORDINARY_ROUTING_KEY).noargs();
}
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable(DeadLetterQueueExample.DEAD_LETTER_QUEUE).build();
}
@Bean
public Exchange deadLetterExchange() {
return ExchangeBuilder.directExchange(DeadLetterQueueExample.DEAD_LETTER_EXCHANGE).durable(true).build();
}
@Bean
public Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(DeadLetterQueueExample.DEAD_LETTER_ROUTING_KEY).noargs();
}
- 监听类
只需要监听deadLetterQueue队列的消息
/**
* @Description 死信队列栗子
* @Author WuYiLong
* @Date 2024/8/20 9:41
*/
@Slf4j
@Service
public class DeadLetterQueueExample {
public static final String ORDINARY_QUEUE = "ordinaryQueue";
public static final String ORDINARY_EXCHANGE = "ordinaryExchange";
public static final String ORDINARY_ROUTING_KEY = "ordinaryRoutingKey";
public static final String DEAD_LETTER_QUEUE = "deadLetterQueue";
public static final String DEAD_LETTER_EXCHANGE = "deadLetterExchange";
public static final String DEAD_LETTER_ROUTING_KEY = "deadLetterRoutingKey";
@RabbitListener(queues = DEAD_LETTER_QUEUE)
public void deadConsumer(Channel channel, Message message) throws IOException {
log.info("deadConsumer message: {}", new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
- 测试类
/**
* @Description
* @Author WuYiLong
* @Date 2024/8/20 10:46
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class DeadLetterQueueExampleTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testDeadLetterQueueExample() throws InterruptedException {
for (int i = 0; i < 10; i++) {
String msg = "java是一门计算机语言"+i;
// rabbitTemplate.convertAndSend(DeadLetterQueueExample.ORDINARY_EXCHANGE,DeadLetterQueueExample.ORDINARY_ROUTING_KEY,msg);
Message message = new Message(msg.getBytes(StandardCharsets.UTF_8));
// message.getMessageProperties().setExpiration("5000"); // 如果消息设置了过期时间,队列就不用设置了.ttl(5000)
rabbitTemplate.send(DeadLetterQueueExample.ORDINARY_EXCHANGE,DeadLetterQueueExample.ORDINARY_ROUTING_KEY,message);
}
TimeUnit.SECONDS.sleep(20);
}
}