springboot集成rabbitmq

WuYiLong原创大约 5 分钟代码编程springbootrabbitmq

开发环境

  • jdk:1.8
  • spring-rabbit:2.3.9
  • springboot:2.4.8

项目目录

配置文件

spring:
  datasource:
    type: com.zaxxer.hikari.HikariDataSource
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/library?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=GMT%2B8&rewriteBatchedStatements=true
    username: root
    password: 123456
  rabbitmq:
    host: localhost
    port: 5675
    publisher-confirm-type: correlated # 开启确认模式
    publisher-returns: true # 开启退回模式
    listener:
      simple:
        acknowledge-mode: manual # 手动确认
        concurrency: 10 # 并发数
        max-concurrency: 20 # 最大并发数


  jpa:
    hibernate:
      ddl-auto: update
    properties:
      hibernate:
        dialect: org.hibernate.dialect.MySQL8Dialect

server:
  port: 8112

第一种模式

/**
 * @Description 1. "Hello World!"
 * @Author WuYiLong
 * @Date 2024/8/14 15:34
 */
@Slf4j
@Service
public class FirstModeExample {

    public static final String QUEUE = "firstModeQueue";
    public static final String EXCHANGE = "firstModeExchange";
    public static final String ROUTE_KEY = "firstModeRouteKey";

    @RabbitListener(bindings = {@QueueBinding(value = @Queue(name = QUEUE), exchange = @Exchange(name = EXCHANGE), key = {ROUTE_KEY})})
    public void consumer(String data, Channel channel, Message message) throws IOException {
        log.info("第一种模式:生产者-队列-消费者,数据:{}",data);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }
}

第二种模式

/**
 * @Description 2. Work Queues
 * @Author WuYiLong
 * @Date 2024/8/14 16:12
 */
@Slf4j
@Service
public class SecondModeExample {

    public static final String QUEUE = "secondModeQueue";
    public static final String EXCHANGE = "secondModeExchange";
    public static final String ROUTE_KEY = "secondModeRouteKey";

    @RabbitListener(bindings = @QueueBinding(value =@Queue(name = QUEUE),exchange = @Exchange(name = EXCHANGE),key = ROUTE_KEY))
    public void consumer1(String data, Channel channel, Message message) throws IOException {
        log.info("消费者1正在消费:{}",data);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }

    @RabbitListener(bindings = @QueueBinding(value =@Queue(name = QUEUE),exchange = @Exchange(name = EXCHANGE),key = ROUTE_KEY))
    public void consumer2(String data, Channel channel, Message message) throws IOException {
        log.info("消费者2正在消费:{}",data);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }

}

第三种模式

/**
 * @Description 3. Publish/Subscribe
 * @Author WuYiLong
 * @Date 2024/8/14 16:37
 */
@Slf4j
@Service
public class ThirdModeExample {

    public static final String QUEUE1 = "thirdModeQueue1";
    public static final String QUEUE2 = "thirdModeQueue2";
    public static final String EXCHANGE = "thirdModeExchange";

    @RabbitListener(bindings = @QueueBinding(value = @Queue(QUEUE1), exchange = @Exchange(name = EXCHANGE,type = ExchangeTypes.FANOUT)))
    public void consumer1(String data, Channel channel, Message message) throws Exception {
        log.info("consumer1: {}", data);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

    @RabbitListener(bindings = @QueueBinding(value = @Queue(QUEUE2), exchange = @Exchange(name=EXCHANGE,type = ExchangeTypes.FANOUT)))
    public void consumer2(String data, Channel channel, Message message) throws Exception {
        log.info("consumer2: {}", data);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

}

第四种模式

/**
 * @Description 4. Routing
 * @Author WuYiLong
 * @Date 2024/8/15 14:05
 */
@Slf4j
@Service
public class FourModeExample {

    public static final String QUEUE1 = "fourModeQueue1";
    public static final String QUEUE2 = "fourModeQueue2";
    public static final String EXCHANGE = "fourModeExchange";
    public static final String ROUTING_KEY1 = "fourModeRoutingKey1";
    public static final String ROUTING_KEY2 = "fourModeRoutingKey2";
    public static final String ROUTING_KEY3 = "fourModeRoutingKey3";

    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = QUEUE1),exchange = @Exchange(value = EXCHANGE),key = {ROUTING_KEY1,ROUTING_KEY2,ROUTING_KEY3}))
    public void consumer1(String data, Channel channel, Message message) throws Exception {
        log.info("consumer1->{}", data);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = QUEUE2),exchange = @Exchange(value = EXCHANGE),key = {ROUTING_KEY1}))
    public void consumer2(String data, Channel channel, Message message) throws Exception {
        log.info("consumer2->{}", data);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

}

第五种模式

/**
 * @Description 5. Topics
 * @Author WuYiLong
 * @Date 2024/8/16 10:39
 */
@Slf4j
@Service
public class FiveModeExample {

    public static final String QUEUE1 = "queue1";
    public static final String QUEUE2 = "queue2";
    public static final String EXCHANGE = "exchange";
    public static final String ROUTING_KEY1 = "*.a.*";
    public static final String ROUTING_KEY2 = "*.*.b";
    public static final String ROUTING_KEY3 = "c.#";

    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = QUEUE1),exchange = @Exchange(name = EXCHANGE,type = ExchangeTypes.TOPIC),key = ROUTING_KEY1))
    public void consumer1(String data, Channel channel, Message message) throws Exception {
        log.info("consumer1: {}", data);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = QUEUE2),exchange = @Exchange(name = EXCHANGE,type = ExchangeTypes.TOPIC),key = {ROUTING_KEY2,ROUTING_KEY3}))
    public void consumer2(String data, Channel channel, Message message) throws Exception {
        log.info("consumer2: {}", data);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }


}

第六种模式

注意:客户端和服务器端不能在同一个服务,必须分开两个服务写

要不会造成不断循环

服务器端

  • 配置类
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
rabbitTemplate.setConfirmCallback(new DefaultConfirmCallback());
rabbitTemplate.setReturnsCallback(new DefaultReturnsCallback());
rabbitTemplate.setMandatory(true); //设置为true,才会触发DefaultReturnsCallback方法
//        rabbitTemplate.setChannelTransacted(true); // 支持事务,但使用RPC模式时需要关闭
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());  // 消息转换器,支持转换对象
return rabbitTemplate;
}
  • 队列和交换机的绑定
/**
 * @Description
 * @Author WuYiLong
 * @Date 2024/8/19 17:24
 */
@Configuration
public class RabbitRpcConfig {

    public static final String REPLY_QUEUE = "sixModeReplyQueue";
    public static final String RPC_QUEUE = "sixModeRpcQueue";
    public static final String EXCHANGE = "sixModeExchange";
    public static final String RPC_ROUTING_KEY = "sixModeRpcRoutingKey";
    public static final String REPLY_ROUTING_KEY = "sixModeReplyRoutingKey";

    @Bean
    public Queue replyQueue() {
        return QueueBuilder.durable(REPLY_QUEUE).build();
    }

    @Bean
    public Exchange rpcExchange() {
        return ExchangeBuilder.topicExchange(EXCHANGE).build();
    }

    @Bean
    public Binding replyBinding() {
        return BindingBuilder.bind(replyQueue()).to(rpcExchange()).with(REPLY_ROUTING_KEY).noargs();
    }

    @Bean
    public Queue rpcQueue() {
        return QueueBuilder.durable(RPC_QUEUE).build();
    }

    @Bean
    public Binding rpcBinding() {
        return BindingBuilder.bind(rpcQueue()).to(rpcExchange()).with(RPC_ROUTING_KEY).noargs();
    }


}

  • 监听类

用于监听客户端发送过来的消息

/**
 * @Description rpc模式服务器端
 * @Author WuYiLong
 * @Date 2024/8/16 14:31
 */
@Slf4j
@Service
public class SixModeExample {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = RabbitRpcConfig.RPC_QUEUE)
    public void consumer(Channel channel,Message message) throws IOException {
        log.info("consumer->{}", StringUtils.toEncodedString(message.getBody(), StandardCharsets.UTF_8));
        Message build = MessageBuilder.withBody("已收到消息了".getBytes(StandardCharsets.UTF_8)).build();
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        String correlationId = message.getMessageProperties().getCorrelationId();
        CorrelationData correlationData = new CorrelationData(correlationId);
        rabbitTemplate.sendAndReceive(RabbitRpcConfig.EXCHANGE, RabbitRpcConfig.REPLY_ROUTING_KEY,build ,correlationData);

    }
}

客户端

  • 配置类
/**
 * @Description
 * @Author WuYiLong
 * @Date 2024/8/19 17:20
 */
@Configuration
public class RabbitClientConfig {

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
        simpleMessageListenerContainer.setConnectionFactory(connectionFactory);
        simpleMessageListenerContainer.setQueueNames(RabbitRpcConfig.REPLY_QUEUE); // 监听的队列
        simpleMessageListenerContainer.setMessageListener(rabbitTemplate(connectionFactory));
        return simpleMessageListenerContainer;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());  // 消息转换器,支持转换对象
        rabbitTemplate.setReplyAddress(RabbitRpcConfig.REPLY_QUEUE); // 使用 RabbitTemplate发送和接收消息,并设置回调队列地址
        rabbitTemplate.setReplyTimeout(60000);
        return rabbitTemplate;
    }
}

  • 客户端发送消息
/**
 * @Description
 * @Author WuYiLong
 * @Date 2024/8/19 17:30
 */
@Api(tags = "rpc-客户端api")
@RestController
@RequestMapping(value = "rpcClient")
public class RpcClientController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @ApiOperation(value = "客户端发送消息")
    @GetMapping(value = "sendMessage")
    public void sendMessage(String msg) {
        Message message = MessageBuilder.withBody(msg.getBytes(StandardCharsets.UTF_8)).build();
        Message result = rabbitTemplate.sendAndReceive(RabbitRpcConfig.EXCHANGE, RabbitRpcConfig.RPC_ROUTING_KEY, message);
        if(result != null) {
            String correlationId = message.getMessageProperties().getCorrelationId();
            String returnCorrelationId = result.getMessageProperties().getHeader("spring_returned_message_correlation");
            if(correlationId.equals(returnCorrelationId)) {
                System.out.println("已确认该消息由该客户端发送");
            }

            System.out.println(new String(result.getBody()));

        }
    }
}

死信队列(也可以当延迟队列使用)

  • 队列和交换机的设置
  /********************************测试死信队列****************************************/

    @Bean
    public Queue ordinaryQueue() {
        return QueueBuilder.durable(DeadLetterQueueExample.ORDINARY_QUEUE).ttl(5000).deadLetterExchange(DeadLetterQueueExample.DEAD_LETTER_EXCHANGE).deadLetterRoutingKey(DeadLetterQueueExample.DEAD_LETTER_ROUTING_KEY).build();
    }

    @Bean
    public Exchange ordinaryExchange() {
        return ExchangeBuilder.directExchange(DeadLetterQueueExample.ORDINARY_EXCHANGE).durable(true).build();
    }

    @Bean
    public Binding ordinaryBinding() {
        return BindingBuilder.bind(ordinaryQueue()).to(ordinaryExchange()).with(DeadLetterQueueExample.ORDINARY_ROUTING_KEY).noargs();
    }

    @Bean
    public Queue deadLetterQueue() {
        return QueueBuilder.durable(DeadLetterQueueExample.DEAD_LETTER_QUEUE).build();
    }

    @Bean
    public Exchange deadLetterExchange() {
        return ExchangeBuilder.directExchange(DeadLetterQueueExample.DEAD_LETTER_EXCHANGE).durable(true).build();
    }

    @Bean
    public Binding deadLetterBinding() {
        return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(DeadLetterQueueExample.DEAD_LETTER_ROUTING_KEY).noargs();
    }
  • 监听类

只需要监听deadLetterQueue队列的消息

/**
 * @Description 死信队列栗子
 * @Author WuYiLong
 * @Date 2024/8/20 9:41
 */
@Slf4j
@Service
public class DeadLetterQueueExample {

    public static final String ORDINARY_QUEUE = "ordinaryQueue";
    public static final String ORDINARY_EXCHANGE = "ordinaryExchange";
    public static final String ORDINARY_ROUTING_KEY = "ordinaryRoutingKey";
    public static final String DEAD_LETTER_QUEUE = "deadLetterQueue";
    public static final String DEAD_LETTER_EXCHANGE = "deadLetterExchange";
    public static final String DEAD_LETTER_ROUTING_KEY = "deadLetterRoutingKey";

    @RabbitListener(queues = DEAD_LETTER_QUEUE)
    public void deadConsumer(Channel channel, Message message) throws IOException {
       log.info("deadConsumer message: {}", new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

}
  • 测试类
/**
 * @Description
 * @Author WuYiLong
 * @Date 2024/8/20 10:46
 */
@RunWith(SpringRunner.class)
@SpringBootTest
public class DeadLetterQueueExampleTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testDeadLetterQueueExample() throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            String msg = "java是一门计算机语言"+i;
//            rabbitTemplate.convertAndSend(DeadLetterQueueExample.ORDINARY_EXCHANGE,DeadLetterQueueExample.ORDINARY_ROUTING_KEY,msg);

            Message message = new Message(msg.getBytes(StandardCharsets.UTF_8));
//            message.getMessageProperties().setExpiration("5000"); // 如果消息设置了过期时间,队列就不用设置了.ttl(5000)
            rabbitTemplate.send(DeadLetterQueueExample.ORDINARY_EXCHANGE,DeadLetterQueueExample.ORDINARY_ROUTING_KEY,message);
        }
        TimeUnit.SECONDS.sleep(20);

    }
}

仓库地址

githubopen in new window

上次编辑于:
贡献者: wuyilong