项目地址
spring-boot-aliRocketMQ-starter
application.yml
1
2
3
4
5
6
7mq-config:
producerId: PID_*
consumerId: CID_*
accessKey: *
secretKey: *
onsAddr: *
topic: *
添加启动类用于初始化消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class RocketMQRunner implements CommandLineRunner {
    private static final Logger logger = LoggerFactory.getLogger(RocketMQRunner.class);
    
    private MQConfig mqConfig;
    
    ("orderConsumer")
    private OrderConsumer orderConsumer;
    /**消息监听器**/
    
    private ConsumerHandler consumerHandler;
    /**
     * 初始化订阅者,生产者信息,启动
     * @param args
     * @throws Exception
     */
    
    public void run(String... args) throws Exception {
        orderConsumer.subscribe(mqConfig.getTopic(), mqConfig.getTag(), consumerHandler);
        orderConsumer.start();
    }
}
生产消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class SysMessageProducer {
    private static final Logger logger = LoggerFactory.getLogger(SysMessageProducer.class);
    
    private MQHelper<SysMessage> mqHelper;
    
    private MQConfig mqConfig;
    /**
     * 此方法用于在消息中心创建消息后调用推送消息
     *
     * @param sysMessage
     */
    public void PushMessageWhenCreate(SysMessage sysMessage) {
        if (AssertValue.isNotNull(sysMessage)) {
            //发送消息到队列中
            try {
                ProducerMessage<SysMessage> producerMessage = new ProducerMessage<SysMessage>()
                        .setTopic(mqConfig.getTopic())
                        .setTags("middle")
                        .setName(MQHandlerType.PUSH_APPMESSAGE_NEWS.getTypeName())
                        .setKey(MQHandlerType.PUSH_APPMESSAGE_NEWS.toString())
                        .setBody(sysMessage)
                        .setShardingKey(String.valueOf(sysMessage.getId()))
                        .setState("none");
                //设置立刻发送消息
                producerMessage.setType(RocketMQServiceConstant.SYNCHRONOUS_ORDER_MESSAGE);
                producerMessage.setStartDeliveryTime(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
                //创建消息
                Message message = mqHelper
                        .generateMessage(producerMessage);
                //发送消息
                SendResult sendResult = mqHelper.sendMessage(message, producerMessage);
                logger.info(new Date() + " 发送成功! Topic:" + mqConfig.getTopic() + " msgId: " + sendResult.getMessageId());
            } catch (Exception e) {
                e.printStackTrace();
                logger.error("消息生产失败,将进行两次重试", e.getMessage());
                logger.info("发送失败");
                throw new RestInternalServerErrorException(ExceptionEnumeration.SYS_NEWS_PUSH_FAIL, "平台消息推送失败");
            }
        } else {
            throw new RestInternalServerErrorException(ExceptionEnumeration.SYS_NEWS_SELECT_FAIL, "找不到该平台消息");
        }
    }
}
监听器消费消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class ConsumerHandler implements MessageOrderListener {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerHandler.class);
    /**
     * 消费消息 handler
     *
     * @param message
     * @param context
     * @return
     */
    
    public OrderAction consume(Message message, ConsumeOrderContext context) {
    }
}
