> 文章列表 > Windows安装RocketMQ

Windows安装RocketMQ

Windows安装RocketMQ

1、下载安装包

下载地址:https://rocketmq.apache.org/download

 2、 解压、配置环境变量

ROCKETMQ_HOME=RocketMQ安装地址

 3、 用cmd页面启动mqnamesrv.cmd

命令,start mqnamesrv.cmd

 4、 用cmd页面启动mqbroker.cmd

 命令:start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

 5、 下载可视化界面

百度网盘地址:https://pan.baidu.com/s/1jU36FONVndj4bZqCvmFwSw

提取码:4oru 

6、 生产者发送消息

6.1 导入Maven依赖

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>5.0.0</version>
</dependency>

6.2 生产者发送消息

6.2.1 同步发送

同步发送使用场景:发送短信【对消息是否传递非常敏感,所以要确定是否收到】

public class 同步通信 {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("BIO"); // 新建生产者producer.setNamesrvAddr("127.0.0.1:9876"); // 设置注册中心地址---nameServerproducer.start(); // 启动生产者Message msg = new Message("Base", "Base-1", "您好啊,生产者同步通信发来消息".getBytes(StandardCharsets.UTF_8)); // 发送消息,(topic, tags, byte[] msg)SendResult result = producer.send(msg); // 发送消息System.out.println(result.toString()); // 打印发送消息内容producer.shutdown(); // 关闭连接}
}

6.2.2 异步发送

异步发送使用场景:下订单【订单下了就不理了,其余的减库存,新增订单等让后台自己去做,MQ通过回调函数告诉我是否流程正确就可以了;对响应性能十分敏感】

public class 异步通信 {public static void main(String[] args) throws Exception{DefaultMQProducer producer = new DefaultMQProducer("AIO");producer.setNamesrvAddr("127.0.0.1:9876");producer.start();for (int i = 0; i < 100; i++) {Message msg = new Message("AIO", "AIO-1", ("您好啊,异步生产者发来消息" + (i+1)).getBytes(StandardCharsets.UTF_8));producer.send(msg, new SendCallback() { // 不同于同步通信只有这一步,多加了回调函数@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("成功发送消息," + sendResult.toString());}@Overridepublic void onException(Throwable throwable) {System.out.println("消息发送失败," + throwable.toString());}});}TimeUnit.SECONDS.sleep(5);producer.shutdown();}
}

6.2.3 单向发送

单向发送使用场景:日志记录【丢几条日志也无所谓,强大的MQ一般也不会丢】

public class 单向通信 {public static void main(String[] args) throws Exception{DefaultMQProducer producer = new DefaultMQProducer("One-way");producer.setNamesrvAddr("127.0.0.1:9876");producer.start();Message msg = new Message("OneWay", "OneWay-1", "您好啊,单向通信生产者发来消息".getBytes(StandardCharsets.UTF_8));producer.sendOneway(msg); // 不同于同步通信,这里设置为oneway发送方式即可producer.shutdown();}
}

6.3 消费者消费消息

public class 消费消息 {public static void main(String[] args) throws Exception{DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Consumer-1");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.subscribe("AIO", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for(MessageExt msg : list) {System.out.println(new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("消费者,订阅主题为:AIO,持续消费消息中......");}
}

国学知识