RocketMQ——服务启动时初始化自定义RocketMQ的生产者Producer
需求
- 构造
RocketMQ
生产者。 - 在服务启动时构造生产者,在服务关闭时销毁生产者。
代码模板
抽象类
@Slf4j
public abstract class AbstractMqProducer {/* 原生默认mq生产者*/protected DefaultMQProducer producer;/* 启动时,构造 @throws MQClientException*/@PostConstructpublic abstract void start() throws MQClientException;/* 关闭时,销毁*/@PreDestroypublic void shutdown() {log.info("AbstractMqProducer shutdown...");producer.shutdown();}}
生产者实现类
@Component
@Configuration
@Slf4j
@RequiredArgsConstructor
public class DemoRocketMqProducer extends AbstractMqProducer {/* 参数配置*/@Value("${rocketmq.producer.group}")private String producerGroup;@Value("${rocketmq.name-server}")private String nameServer;@Value("${rocketmq.producer.send-message-timeout}")private String sendMessageTimeout;@Value("${rocketmq.producer.compress-message-body-threshold}")private String compressMessageBodyThreshold;@Value("${rocketmq.producer.max-message-size}")private String maxMessageSize;@Value("${rocketmq.producer.retry-times-when-send-failed}")private String retryTimesWhenSendFailed;@Value("${rocketmq.producer.retry-times-when-send-async-failed}")private String retryTimesWhenSendAsyncFailed;@Value("${rocketmq.producer.retry-next-server}")private String retryNextServer;/* 启动producer @throws MQClientException*/@Overridepublic void start() throws MQClientException {//判空if (ObjectUtil.isNull(producer)) {log.info("init DemoRocketMqProducer begin... ...");//构建producerproducer = new DefaultMQProducer(producerGroup);//name server地址producer.setNamesrvAddr(nameServer);//客户端实例名称producer.setInstanceName(UtilAll.getPid() + "#" + System.nanoTime());//发送消息超时时间,单位毫秒,默认3000producer.setSendMsgTimeout(sendMessageTimeout);//body超过多大开始压缩,单位字节,默认4096Bproducer.setCompressMsgBodyOverHowmuch(compressMessageBodyThreshold);//客户端限制的消息大小,单位字节,默认4194304producer.setMaxMessageSize(maxMessageSize);//如果消息发送失败,最大重试次数,默认2producer.setRetryTimesWhenSendFailed(retryTimesWhenSendFailed);//如果消息异步发送失败,最大重试次数,默认2producer.setRetryTimesWhenSendAsyncFailed(retryTimesWhenSendAsyncFailed);//如果发送消息返回sendResult,但sendStatus != SEND_OK,是否重试其他broker发送,默认falseproducer.setRetryAnotherBrokerWhenNotStoreOK(retryNextServer);}//启动producerproducer.start();log.info("DemoRocketMqProducer is starting...");}/* 发送消息 @param topic* @param tag* @param key* @param dto* @param <T>* @return*/public <T> boolean send(String topic, String tag, String key, T dto, String hashKey) {try {//转换dto为String类型String body = JSON.toJSONString(dto);//构建messageMessage message = new Message(topic, tag, key, body.getBytes("utf-8"));//发送消息SendResult sendResult = producer.send(message, new SelectMessageQueueByHash(), hashKey);log.info("DemoRocketMqProducer 消息生产结果:{}", sendResult);return true;} catch (Exception e) {//错误日志log.error("DemoRocketMqProducer 消息生产失败,错误信息:{}", e);//异常处理throw new RuntimeException(e.getMessage());}}/* 关闭producer*/@Override@PreDestroypublic void shutdown() {//关闭producerproducer.shutdown();log.warn("DemoRocketMqProducer shutdown...");}
}
注解详解
@PostConstruct
介绍
@PostConstruct
注解被用来修饰一个非静态的void()
方法。- 被
@PostConstruct
注解修饰的方法会在服务加载Servlet
时运行,只会被执行一次。 @PostConstruct
注解在init()
方法之前、构造函数之后执行。- 上下文启动顺序:
服务加载Servlet->Constructor(构造方法) -> @Autowired(依赖注入) -> @PostConstruct(注解修饰的方法)->init()方法
使用场景
若想在生成对象时完成某些初始化操作,此时这些初始化需要依赖于一些依赖注入,则无法在构造函数中实现。此时,可使用@PostConstruct
注解修饰一个方法来完成初始化,@PostConstruct
注解修饰的方法将会在依赖注入完成后被自动调用。
源码
@Documented
@Retention (RUNTIME)
@Target(METHOD)
public @interface PostConstruct {
}
@PreDestroy
介绍
@PreDestroy
注解被用来修饰一个非静态的void()
方法。- 被
@PreDestroy
注解修饰的方法会在服务卸载Servlet
时运行,只会被执行一次。 @PreDestroy
注解在Servlet
卸载之前,destroy()
方法之后执行。- 上下文停止顺序:
destroy()方法 -> @PreDestroy(注解修饰的方法)-> bean销毁->服务卸载Servlet
源码
@Documented
@Retention (RUNTIME)
@Target(METHOD)
public @interface PreDestroy {
}
配置文件详解
# rocketmq消息队列
rocketmq:# 指定 nameServername-server: 127.0.0.1:9876;127.0.0.1:9877# Producer 生产者producer:# 生产组group: my-group# 发送消息超时时间,单位毫秒,默认3000send-message-timeout: 3000# body超过多大开始压缩,单位字节,默认4096Bcompress-message-body-threshold: 4096# 客户端限制的消息大小,单位字节,默认4194304max-message-size: 4194304# 如果消息发送失败,最大重试次数,默认2retry-times-when-send-failed: 2# 如果消息异步发送失败,最大重试次数,默认2retry-times-when-send-async-failed: 2# 如果发送消息返回sendResult,但sendStatus != SEND_OK,是否重试其他broker发送,默认falseretry-next-server: false