> 文章列表 > 路由模式下的RabbtMQ

路由模式下的RabbtMQ

路由模式下的RabbtMQ

1.在配置文件中配置mq的url,端口号,用户名,密码

2.读取配置文件,并获取mq的connection

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public static Connection getConnection() {
    if(connection == null) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(RABBIT_HOST);
        connectionFactory.setPort(Integer.parseInt(RABBIT_PORT));
        connectionFactory.setUsername(RABBIT_USERNAME);
        connectionFactory.setPassword(RABBIT_PASSWORD);
        try {
            connection = connectionFactory.newConnection();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
    return connection;
}

3.通过connection换取channel,并生产和消费信息

Channel channel = null;
try {
    //获取连接
    Connection connection = MqConnectionUtil.getConnection();
    //创建通道
    channel = connection.createChannel();

生产者

channel.exchangeDeclare("log",BuiltinExchangeType.DIRECT);
创建交换机,采用路由模式direct
channel.queueBind("queue1","log","the1");
channel.queueBind("queue2","log","the2");
将交换机与队列绑定
channel.basicPublish("log","the1",null,"第一条发送".getBytes(StandardCharsets.UTF_8));
channel.basicPublish("log","the2",null,"第二条发送".getBytes(StandardCharsets.UTF_8));

消费者
  

channel.queueDeclare("queue1", true, false, false, null);
channel.queueDeclare("queue2", true, false, false, null);
由消费者创建队列//4.开启监听Queue
DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("接收到消息-"+body);}
};

5.关闭连接

  • channel.close();

  • connection.close();