快盘下载:好资源、好软件、快快下载吧!

快盘排行|快盘最新

当前位置:首页软件教程电脑软件教程 → RabbitMQ消息中间件在项目中的使用详解

RabbitMQ消息中间件在项目中的使用详解

时间:2022-09-25 08:30:33人气:作者:快盘下载我要评论

1. RabbitMQ消息中间件

1.1 什么MQ?

MQ全称 Message Queue;消息队列;;是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。

思考: 原来服务与服务之间如何通信?

Openfeign 服务与服务之间直接调用。

RabbitMQ消息中间件在项目中的使用详解 我们也可以使用MQ完成系统与系统之间得调用。

1.2 MQ优点

1. 应用解耦

RabbitMQ消息中间件在项目中的使用详解RabbitMQ消息中间件在项目中的使用详解

 2. 异步提速

RabbitMQ消息中间件在项目中的使用详解

3. 削锋填谷  

RabbitMQ消息中间件在项目中的使用详解

1.3 MQ缺点

 RabbitMQ消息中间件在项目中的使用详解

1.4 如何选择MQ

RabbitMQ消息中间件在项目中的使用详解  

1.5 MQ得种类

rabbitMQ

kafka

RocketMQ

ActiveMQ

1.6 RabbitMQ

安装RabbitMQ <<详细内容看另一篇博客---RabbitMQ安装说明文档>>

1.7 概述端口号

RabbitMQ消息中间件在项目中的使用详解 1.8 rabbit的工作原理

RabbitMQ消息中间件在项目中的使用详解

2. java程序连接RabbitMQ服务---maven项目

提供了5种模式。

        1.简单模式--Hello

        RabbitMQ消息中间件在项目中的使用详解

        2. 工作者模式--work queues

        RabbitMQ消息中间件在项目中的使用详解

        3.发布订阅模式---

        RabbitMQ消息中间件在项目中的使用详解

         4.路由模式--router

        RabbitMQ消息中间件在项目中的使用详解

        5.主题模式--topic

        RabbitMQ消息中间件在项目中的使用详解

 ​​

2.1 准备工作

2.1.1 创建maven项目

RabbitMQ消息中间件在项目中的使用详解

2.1.2 添加依赖--在父工程下添加依赖

<dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.14.2</version>
        </dependency>
    </dependencies>

2.1.3 启动rabbitmq

我这里rabbitmq安装在本地虚拟机上;直接开启虚拟机输入以下命令就可以进行测试了

centos6用这个命令;
/sbin/service rabbitmq-server restart

centos7用这个命令;
systemctl start rabbitmq-server

2.2 simple 简单模式

RabbitMQ消息中间件在项目中的使用详解

 P: 一个生产者

C: 一个消费者

Q: 队列

 生产者负责把消息发送到队列;消费者负责把队列的消息消费掉并确认消费

代码--生产者:

package com.wt.service;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * ;Author wt
 * ;Date 2022/9/19 20:31
 * ;PackageName:com.wt.service
 * ;ClassName: Test01
 * ;Description: 简单模式
 * ;Version 1.0
 */
public class Test01 {
    public static void main(String[] args) throws IOException, TimeoutException {
        /**
         * 连接rabbitmq
         */
        ConnectionFactory factory = new ConnectionFactory();
        //设置rabbilemMq服务器的地址 默认为localhost
        factory.setHost(;192.168.135.156;);
        //设置rabbitMQ的端口号 默认5672
        factory.setPort(5672);
        //设置账号和密码 默认guest
        factory.setUsername(;guest;);
        factory.setPassword(;guest;);
        //设置虚拟主机名 默认 /
        factory.setVirtualHost(;/;);

        //获取连接通道
        Connection connection = factory.newConnection();
        //获取channel信道
        Channel channel = connection.createChannel();

        //创建队列
        /**
         * 如果该队列名不存在则自动创建;存在则不创建
         * String queue ,队列名
         * boolean durable ,是否持久化
         * boolean exclusive ;独占;声明队列同一时间只能包含一个连接;且该队列只有这一个被连接使用。
         * boolean autoDelte ,是否自动删除
         * Map<String,Object>arguments
         */
        channel.queueDeclare(;simple_queue;,true,false,false,null);

        /**
         * String exchange ,把消息发给哪个交换机--简单模式没有交换机“”
         * String routingKey ,消息绑定的路由key 如果为简单模式 默认写为队列名称
         * BasicProperties props, 消息的属性
         * byte[] body; 消息的内容
         */
        String msg = ;{code:2000,name:张三,age:18};;
        channel.basicPublish(;;,;simple_queue;,null,msg.getBytes());

        connection.close();
    }
}

RabbitMQ消息中间件在项目中的使用详解 代码-消费者:

package com.wt.simple;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * ;Author wt
 * ;Date 2022/9/19 21:22
 * ;PackageName:com.wt.simple
 * ;ClassName: Customer
 * ;Description: 简单模式
 * ;Version 1.0
 */
public class Customer {
    public static void main(String[] args) throws Exception{
        /**
         * 连接rabbitmq
         */
        ConnectionFactory factory = new ConnectionFactory();
        //设置rabbilemMq服务器的地址 默认为localhost
        factory.setHost(;192.168.135.156;);
        //设置rabbitMQ的端口号 默认5672
        factory.setPort(5672);
        //设置账号和密码 默认guest
        factory.setUsername(;guest;);
        factory.setPassword(;guest;);
        //设置虚拟主机名 默认 /
        factory.setVirtualHost(;/;);

        //获取连接通道
        Connection connection = factory.newConnection();
        //获取channel信道
        Channel channel = connection.createChannel();

        //监听队列
        /**
         * String queue 监听的队列名称
         * autoAck :是否自动确认消息
         * Consumer callback: 监听到消息后触发的回调函数
         */

        DefaultConsumer callback = new DefaultConsumer(channel){
            //一旦有消息就会触发该方法
            //body:表示消息的内容
            ;Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(;接收的消息内容:;;new String(body));
            }
        };

        channel.basicConsume(;simple_queue;,true,callback);

    }
}

不要关闭连接对象

2.3 woker模式

RabbitMQ消息中间件在项目中的使用详解

work模式与简单模式的区别是worker模式的一个队列对应多个消费者

P:生产者

C1:消费者1

C2:消费者2

Q: 队列

消费者1和消费者2属于竞争关系;一个消息只会被一个消费者消费

 代码---生产者:

package com.wt.work;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * ;Author wt
 * ;Date 2022/9/19 21:38
 * ;PackageName:com.wt.service.test
 * ;ClassName: Work
 * ;Description: work模式
 * ;Version 1.0
 */
public class WorkTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        //设置rabbilemMq服务器的地址 默认为localhost
        factory.setHost(;192.168.135.156;);
        //设置rabbitMQ的端口号 默认5672
        factory.setPort(5672);
        //设置账号和密码 默认guest
        factory.setUsername(;guest;);
        factory.setPassword(;guest;);
        //设置虚拟主机名 默认 /
        factory.setVirtualHost(;/;);

        //获取连接通道
        Connection connection = factory.newConnection();
        //获取channel信道
        Channel channel = connection.createChannel();
        //创建队列
        /**
         * 如果该队列名不存在则自动创建;存在则不创建
         * String queue ,队列名
         * boolean durable ,是否持久化
         * boolean exclusive ;独占;声明队列同一时间只能包含一个连接;且该队列只有这一个被连接使用。
         * boolean autoDelte ,是否自动删除
         * Map<String,Object>arguments
         */
        channel.queueDeclare(;work_queue;,true,false,false,null);

        /**
         * String exchange ,把消息发给哪个交换机--简单模式没有交换机“”
         * String routingKey ,消息绑定的路由key 如果为简单模式 默认写为队列名称
         * BasicProperties props, 消息的属性
         * byte[] body; 消息的内容
         */
        for (int i=0;i<=10;i;;){
            String msg = ;{code:2000,name:张三,age:18};;i;
            channel.basicPublish(;;,;work_queue;,null,msg.getBytes());
        }


        connection.close();
    }
}

代码--消费01:

package com.wt.work;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * ;Author wt
 * ;Date 2022/9/19 21:22
 * ;PackageName:com.wt.simple
 * ;ClassName: Customer
 * ;Description: worker-----消费者01
 * ;Version 1.0
 */
public class CustomerWork01 {
    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        //设置rabbilemMq服务器的地址 默认为localhost
        factory.setHost(;192.168.135.156;);
        //设置rabbitMQ的端口号 默认5672
        factory.setPort(5672);
        //设置账号和密码 默认guest
        factory.setUsername(;guest;);
        factory.setPassword(;guest;);
        //设置虚拟主机名 默认 /
        factory.setVirtualHost(;/;);

        //获取连接通道
        Connection connection = factory.newConnection();
        //获取channel信道
        Channel channel = connection.createChannel();

        //监听队列
        /**
         * String queue 监听的队列名称
         * autoAck :是否自动确认消息
         * Consumer callback: 监听到消息后触发的回调函数
         */

        DefaultConsumer callback = new DefaultConsumer(channel){
            //一旦有消息就会触发该方法
            //body:表示消息的内容
            ;Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(;接收的消息内容:;;new String(body));
            }
        };

        channel.basicConsume(;work_queue;,true,callback);

    }
}

 代码--消费者02--和上面的消费者相同。

可以先提前开启两个消费者;然后在开启创造者;观察消息都被哪个消费者消费了;是否有重复消费

2.4 public/Subscribe发布订阅模式

RabbitMQ消息中间件在项目中的使用详解

public模式和worker模式的区别是在worker的基础上新增加了一个交换机x;生产者传输消息给交换机;交换机再将相应的信息发送给两个队列(两个队列接收的信息相同),两个队列分别对应的消费者

p: producter 生产者

x;exchange交换机

Q: 队列

C1和C2:消费者

生产者--代码:

package com.wt.publish;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * ;Author wt
 * ;Date 2022/9/19 20:31
 * ;PackageName:com.wt.service
 * ;ClassName: Test01
 * ;Description: public
 * ;Version 1.0
 */
public class PublishTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        //设置rabbilemMq服务器的地址 默认为localhost
        factory.setHost(;192.168.135.156;);
        //设置rabbitMQ的端口号 默认5672
        factory.setPort(5672);
        //设置账号和密码 默认guest
        factory.setUsername(;guest;);
        factory.setPassword(;guest;);
        //设置虚拟主机名 默认 /
        factory.setVirtualHost(;/;);

        //获取连接通道
        Connection connection = factory.newConnection();
        //获取channel信道
        Channel channel = connection.createChannel();
        //创建队列
        /**
         * 如果该队列名不存在则自动创建;存在则不创建
         * String queue ,队列名
         * boolean durable ,是否持久化
         * boolean exclusive ;独占;声明队列同一时间只能包含一个连接;且该队列只有这一个被连接使用。
         * boolean autoDelte ,是否自动删除
         * Map<String,Object>arguments
         */
        channel.queueDeclare(;publish_queue01;,true,false,false,null);
        channel.queueDeclare(;publish_queue02;,true,false,false,null);

        //创建交换机
        /**
         * String exchange,交换机的名称
         * BuiltinExchangeType type,交换机的种类
         * boolean durable:是否持久化
         */
        channel.exchangeDeclare(;publish_queue;, BuiltinExchangeType.FANOUT,true);

        //交换机和队列绑定
        channel.queueBind(;publish_queue01;,;publish_queue;,;;);
        channel.queueBind(;publish_queue02;,;publish_queue;,;;);



        /**
         * String exchange ,把消息发给哪个交换机--简单模式没有交换机“”
         * String routingKey ,消息绑定的路由key 如果为简单模式 默认写为队列名称
         * BasicProperties props, 消息的属性
         * byte[] body; 消息的内容
         */
        for (int i=0;i<=10;i;;){
            String msg = ;{code:2000,name:张三,age:18};;i;
            channel.basicPublish(;publish_queue;,;;,null,msg.getBytes());
        }

        connection.close();
    }
}

 消费者--01

package com.wt.publish;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * ;Author wt
 * ;Date 2022/9/19 21:22
 * ;PackageName:com.wt.simple
 * ;ClassName: Customer
 * ;Description: 消费者01---队列public01
 * ;Version 1.0
 */
public class CustomerPublish01 {
    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        //设置rabbilemMq服务器的地址 默认为localhost
        factory.setHost(;192.168.135.156;);
        //设置rabbitMQ的端口号 默认5672
        factory.setPort(5672);
        //设置账号和密码 默认guest
        factory.setUsername(;guest;);
        factory.setPassword(;guest;);
        //设置虚拟主机名 默认 /
        factory.setVirtualHost(;/;);

        //获取连接通道
        Connection connection = factory.newConnection();
        //获取channel信道
        Channel channel = connection.createChannel();

        //监听队列
        /**
         * String queue 监听的队列名称
         * autoAck :是否自动确认消息
         * Consumer callback: 监听到消息后触发的回调函数
         */

        DefaultConsumer callback = new DefaultConsumer(channel){
            //一旦有消息就会触发该方法
            //body:表示消息的内容
            ;Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(;接收的消息内容:;;new String(body));
            }
        };

        channel.basicConsume(;publish_queue01;,true,callback);

    }
}

消费者02;

package com.wt.publish;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * ;Author wt
 * ;Date 2022/9/19 21:22
 * ;PackageName:com.wt.simple
 * ;ClassName: Customer
 * ;Description: 消费者02---队列public02
 * ;Version 1.0
 */
public class CustomerPublish02 {
    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        //设置rabbilemMq服务器的地址 默认为localhost
        factory.setHost(;192.168.135.156;);
        //设置rabbitMQ的端口号 默认5672
        factory.setPort(5672);
        //设置账号和密码 默认guest
        factory.setUsername(;guest;);
        factory.setPassword(;guest;);
        //设置虚拟主机名 默认 /
        factory.setVirtualHost(;/;);

        //获取连接通道
        Connection connection = factory.newConnection();
        //获取channel信道
        Channel channel = connection.createChannel();

        //监听队列
        /**
         * String queue 监听的队列名称
         * autoAck :是否自动确认消息
         * Consumer callback: 监听到消息后触发的回调函数
         */

        DefaultConsumer callback = new DefaultConsumer(channel){
            //一旦有消息就会触发该方法
            //body:表示消息的内容
            ;Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(;接收的消息内容:;;new String(body));
            }
        };

        channel.basicConsume(;publish_queue02;,true,callback);

    }
}

2.5 router路由模式

RabbitMQ消息中间件在项目中的使用详解

 router和public的区别是在原先的基础上增加了路由;交换机通过路由来判断将消息发送给哪个队列;二不是像public一样两个队列都会接收到一模一样的消息;只有满足路由的队列才会收到消息

p:生产者

x: 交换机---Direct (路由模式)

c1和c2表示消费者;

Q:队列

生产者--代码;

package com.wt.router;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * ;Author wt
 * ;Date 2022/9/20 14:33
 * ;PackageName:com.wt.router
 * ;ClassName: RouterTest
 * ;Description: router路由模式
 * ;Version 1.0
 */
public class RouterTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        //设置rabbilemMq服务器的地址 默认为localhost
        factory.setHost(;192.168.135.156;);
        //设置rabbitMQ的端口号 默认5672
        factory.setPort(5672);
        //设置账号和密码 默认guest
        factory.setUsername(;guest;);
        factory.setPassword(;guest;);
        //设置虚拟主机名 默认 /
        factory.setVirtualHost(;/;);

        //获取连接通道
        Connection connection = factory.newConnection();
        //获取channel信道
        Channel channel = connection.createChannel();
        //创建队列
        /**
         * 如果该队列名不存在则自动创建;存在则不创建
         * String queue ,队列名
         * boolean durable ,是否持久化
         * boolean exclusive ;独占;声明队列同一时间只能包含一个连接;且该队列只有这一个被连接使用。
         * boolean autoDelte ,是否自动删除
         * Map<String,Object>arguments
         */
        channel.queueDeclare(;router_queue01;,true,false,false,null);
        channel.queueDeclare(;router_queue02;,true,false,false,null);

        //创建交换机
        /**
         * String exchange,交换机的名称
         * BuiltinExchangeType type,交换机的种类
         * boolean durable:是否持久化
         */
        channel.exchangeDeclare(;router_queue;, BuiltinExchangeType.DIRECT,true);

        //交换机和队列绑定
        /**
         * s:String queue,队列名 s1:String exchange,交换机名 s2:String routerkey 路由key 
         * 如果为发布订阅模式(public)则无需有路由key
         */
        channel.queueBind(;router_queue01;,;router_queue;,;error;);

        channel.queueBind(;router_queue02;,;router_queue;,;error;);
        channel.queueBind(;router_queue02;,;router_queue;,;info;);
        channel.queueBind(;router_queue02;,;router_queue;,;warning;);



        /**
         * String exchange ,把消息发给哪个交换机--简单模式没有交换机“”
         * String routingKey ,消息绑定的路由key 如果为简单模式 默认写为队列名称
         * BasicProperties props, 消息的属性
         * byte[] body; 消息的内容
         */
        String msg = ;{code:2000,name:张三,age:18};;
        channel.basicPublish(;router_queue;,;info;,null,msg.getBytes());
            //channel.basicPublish(;router_queue;,;lazy.orange.ss;,null,msg.getBytes());


        connection.close();
    }
}

消费者01-代码;

package com.wt.router;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * ;Author wt
 * ;Date 2022/9/19 21:22
 * ;PackageName:com.wt.simple
 * ;ClassName: Customer
 * ;Description: router路由模式 ---消费者01
 * ;Version 1.0
 */
public class CustomerRouter01 {
    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        //设置rabbilemMq服务器的地址 默认为localhost
        factory.setHost(;192.168.135.156;);
        //设置rabbitMQ的端口号 默认5672
        factory.setPort(5672);
        //设置账号和密码 默认guest
        factory.setUsername(;guest;);
        factory.setPassword(;guest;);
        //设置虚拟主机名 默认 /
        factory.setVirtualHost(;/;);

        //获取连接通道
        Connection connection = factory.newConnection();
        //获取channel信道
        Channel channel = connection.createChannel();

        //监听队列
        /**
         * String queue 监听的队列名称
         * autoAck :是否自动确认消息
         * Consumer callback: 监听到消息后触发的回调函数
         */

        DefaultConsumer callback = new DefaultConsumer(channel){
            //一旦有消息就会触发该方法
            //body:表示消息的内容
            ;Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(;接收的消息内容:;;new String(body));
            }
        };

        channel.basicConsume(;router_queue01;,true,callback);

    }
}

消费者02;

package com.wt.router;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * ;Author wt
 * ;Date 2022/9/19 21:22
 * ;PackageName:com.wt.simple
 * ;ClassName: Customer
 * ;Description: router路由模式 ---消费者01
 * ;Version 1.0
 */
public class CustomerRouter02 {
    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        //设置rabbilemMq服务器的地址 默认为localhost
        factory.setHost(;192.168.135.156;);
        //设置rabbitMQ的端口号 默认5672
        factory.setPort(5672);
        //设置账号和密码 默认guest
        factory.setUsername(;guest;);
        factory.setPassword(;guest;);
        //设置虚拟主机名 默认 /
        factory.setVirtualHost(;/;);

        //获取连接通道
        Connection connection = factory.newConnection();
        //获取channel信道
        Channel channel = connection.createChannel();

        //监听队列
        /**
         * String queue 监听的队列名称
         * autoAck :是否自动确认消息
         * Consumer callback: 监听到消息后触发的回调函数
         */

        DefaultConsumer callback = new DefaultConsumer(channel){
            //一旦有消息就会触发该方法
            //body:表示消息的内容
            ;Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(;接收的消息内容:;;new String(body));
            }
        };

        channel.basicConsume(;router_queue02;,true,callback);

    }
}

2.6 主题模式--topic

RabbitMQ消息中间件在项目中的使用详解

topic模式和router模式的区别是将路由有指定内容变成了通配符 

*: 统配一个单词

: 统配零个或者n个单词  

生产者代码

package com.wt.topic;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * ;Author wt
 * ;Date 2022/9/20 14:33
 * ;PackageName:com.wt.router
 * ;ClassName: RouterTest
 * ;Description: topic模式
 * ;Version 1.0
 */
public class TopicTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        //设置rabbilemMq服务器的地址 默认为localhost
        factory.setHost(;192.168.135.156;);
        //设置rabbitMQ的端口号 默认5672
        factory.setPort(5672);
        //设置账号和密码 默认guest
        factory.setUsername(;guest;);
        factory.setPassword(;guest;);
        //设置虚拟主机名 默认 /
        factory.setVirtualHost(;/;);

        //获取连接通道
        Connection connection = factory.newConnection();
        //获取channel信道
        Channel channel = connection.createChannel();
        //创建队列
        /**
         * 如果该队列名不存在则自动创建;存在则不创建
         * String queue ,队列名
         * boolean durable ,是否持久化
         * boolean exclusive ;独占;声明队列同一时间只能包含一个连接;且该队列只有这一个被连接使用。
         * boolean autoDelte ,是否自动删除
         * Map<String,Object>arguments: 其它参数
         */
        channel.queueDeclare(;topic_queue01;,true,false,false,null);
        channel.queueDeclare(;topic_queue02;,true,false,false,null);


        //创建交换机
        /**
         * String exchange,交换机的名称
         * BuiltinExchangeType type,交换机的种类
         * boolean durable:是否持久化
         */
        channel.exchangeDeclare(;topic_queue;, BuiltinExchangeType.TOPIC,true);

        //交换机和队列绑定
        /**
         * String queue,队列名 String exchange,交换机名 String routerkey 路由key 如果为发布订阅模式则
         * 无需有路由key
         */
        channel.queueBind(;topic_queue01;,;topic_queue;,;*.orange.*;);

        channel.queueBind(;topic_queue02;,;topic_queue;,;*.*.rabbit;);
        channel.queueBind(;topic_queue02;,;topic_queue;,;lazy.#;);




        //发送消息到队列
        /**
         * String exchange ,把消息发给哪个交换机--简单模式没有交换机“”
         * String routingKey ,消息绑定的路由key 如果为简单模式 默认写为队列名称
         * BasicProperties props, 消息的属性
         * byte[] body; 消息的内容
         */
        String msg = ;{code:2000,name:张三,age:18};;
        channel.basicPublish(;topic_queue;,;lazy.orange.rabbit;,null,msg.getBytes());
        //channel.basicPublish(;router_queue;,;lazy.orange.ss;,null,msg.getBytes());


        connection.close();
    }
}

消费者01:

package com.wt.topic;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * ;Author wt
 * ;Date 2022/9/19 21:22
 * ;PackageName:com.wt.simple
 * ;ClassName: Customer
 * ;Description: topic模式-------消费者01
 * ;Version 1.0
 */
public class CustomerTopic01 {
    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        //设置rabbilemMq服务器的地址 默认为localhost
        factory.setHost(;192.168.135.156;);
        //设置rabbitMQ的端口号 默认5672
        factory.setPort(5672);
        //设置账号和密码 默认guest
        factory.setUsername(;guest;);
        factory.setPassword(;guest;);
        //设置虚拟主机名 默认 /
        factory.setVirtualHost(;/;);

        //获取连接通道
        Connection connection = factory.newConnection();
        //获取channel信道
        Channel channel = connection.createChannel();

        //监听队列
        /**
         * String queue 监听的队列名称
         * autoAck :是否自动确认消息
         * Consumer callback: 监听到消息后触发的回调函数
         */

        DefaultConsumer callback = new DefaultConsumer(channel){
            //一旦有消息就会触发该方法
            //body:表示消息的内容
            ;Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(;接收的消息内容:;;new String(body));
            }
        };

        channel.basicConsume(;topic_queue01;,true,callback);

    }
}

消费者02:

package com.wt.topic;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * ;Author wt
 * ;Date 2022/9/19 21:22
 * ;PackageName:com.wt.simple
 * ;ClassName: Customer
 * ;Description: TODO
 * ;Version 1.0
 */
public class CustomerTopic02 {
    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        //设置rabbilemMq服务器的地址 默认为localhost
        factory.setHost(;192.168.135.156;);
        //设置rabbitMQ的端口号 默认5672
        factory.setPort(5672);
        //设置账号和密码 默认guest
        factory.setUsername(;guest;);
        factory.setPassword(;guest;);
        //设置虚拟主机名 默认 /
        factory.setVirtualHost(;/;);

        //获取连接通道
        Connection connection = factory.newConnection();
        //获取channel信道
        Channel channel = connection.createChannel();

        //监听队列
        /**
         * String queue 监听的队列名称
         * autoAck :是否自动确认消息
         * Consumer callback: 监听到消息后触发的回调函数
         */

        DefaultConsumer callback = new DefaultConsumer(channel){
            //一旦有消息就会触发该方法
            //body:表示消息的内容
            ;Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(;接收的消息内容:;;new String(body));
            }
        };

        channel.basicConsume(;topic_queue02;,true,callback);

    }
}

3. springboot整合rabbitMQ

3.1 准备工作

3.1.1 创建如下的springboot项目

RabbitMQ消息中间件在项目中的使用详解

 3.1.2 添加依赖

        <dependency>
            <groupId>org.springFramework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

3.1.3 添加配置文件application.properties

生产者微服务和消费者微服务都要添加

server.port=9999

#rabbitMQ的配置
spring.rabbitmq.host=192.168.135.156
spring.rabbitmq.port=5672
#账号密码和host默认为以下配置;如果没有修改可以不写
#spring.rabbitmq.username=guest
#spring.rabbitmq.password=guest
#spring.rabbitmq.virtual-host=/

3.2 springboot的测试

队列我们这次直接在rabbitmq 的图形化界面创建 ;并且给队列创建exchange交换机

创建普通队列test01和test02,创建方式入下图

RabbitMQ消息中间件在项目中的使用详解

 创建交换机testXRabbitMQ消息中间件在项目中的使用详解

 点击testX进入创建好的交换机testX并配置相关内容RabbitMQ消息中间件在项目中的使用详解

 RabbitMQ消息中间件在项目中的使用详解

3.2.1 测试开始

使用product工具类发送消息到队列

;SpringBootTest
public class ProductTest {
    //springboot集成了rabbitMQ 提供了一个工具类 ;该类封装了消息的发送
    ;Autowired
    private RabbitTemplate rabbitTemplate;

     /**
     * 给交换机为testX,路由为a的队列发送消息hello springboot
     */
   ;Test
    public void test01(){
        rabbitTemplate.convertAndSend(;testX;,;a;,;hello springboot;);
    }
}

消费者

package com.wt.rabbitmq;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

;Component
public class MyListener {


    /**
     * ;RabbitListener:队列监听,queues:队列名
     */
    ;RabbitListener(queues = ;test02;)
    public void test(Map<String,Object> msg){
        System.out.println(msg);
        //运行相关的业务处理
    }
}

3.3 如何确保消息的可靠性

首先确定消息可能在哪些位置丢失---不同的位置可以有不同的解决方案。

RabbitMQ消息中间件在项目中的使用详解

3.3.1 保证消息从生产者到交换机

1. comfirm确认机制

 该模式必须在生产者的application.properties配置文件中开启手动确认机制

#开启确认机制
spring.rabbitmq.publisher-confirm-type=correlated
//保证消息从生产者到交换机
    //测试确认机制

    /**
     * 1,手动开启确认机制spring.rabbitmq.publisher-confirm-type=correlated
     * 2.为rabbitTemplate设置确认回调函数
     */
    ;Test
    public void testConfirm(){
        //为rabbitTemplate设置确认回调函数
       rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){
            //不管是否到大交换机;都会触发该方法
           ;Override
           public void confirm(CorrelationData correlationData, boolean b, String s) {
               System.out.println(;b~~~~~~~~~~~;;b);
               if (b==false){
                   System.out.println(;消息发送失败,开启回滚;);
               }
           }
       });
        //故意设置一个不存在的交换机
       rabbitTemplate.convertAndSend(;testX2;,;a;,;Heollo Simple2;);
    }

3.3.2 保证消息可以从交换机到队列

returning机制: 如果消息无法到达队列;则会触发returning机制。如果能从交换机到队列则不会触发returning机制。

默认rabbitMQ不开启该机制。

 该模式必须在生产者的application.properties配置文件中开启手动returning机制  

#开启returning机制
spring.rabbitmq.publisher-returns=true
**
     * 1.开启returning机制
     * 2.为rabbitTemplate设置returning回调函数
     */
    ;Test
    public void testReturning(){
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            //该方法只有从交换机到队列失败时才会触发
            ;Override
            public void returnedMessage(Message message, int i, String s, String s1, String s2) {
                System.out.println(;~~~~~~~~~~~~~~~~~~~~;);
            }
        });
        //故意写一个不存在的队列测试是否能执行
        rabbitTemplate.convertAndSend(;testX;,;a;,;Hello Springboot3;);
    }

3.3.3 如何保证消息在队列

队列持久化--->

搭建rabbitmq集群--保证高可用

3.3.4 消费者可靠的消费消息

在消费者的配置文件中修改为手动确认模式

 #开启消息确认auto :自动确认  manual:手动确认  none:不确认
 spring.rabbitmq.listener.simple.acknowledge-mode=manual

 当业务处理完毕后在确认消息给队列让其删除该消息

 ;RabbitListener(queues = ;test01;)
    public void test02(Message message, Channel channel){
        byte[] body = message.getBody();
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.println(;接收到的消息;;new String(body));

        try{
            System.out.println(;处理业务代码;);
            //可以故意抛出一个异常来测试是否会继续发送
//            int i = 10/0;
            System.out.println(;业务处理完毕;);
            //创建手动确认---队列会把消息溢出
            /**
             * long deliveryTag 消息的标记
             * boolean multiple: 是否把之前没有确认的消息也确认掉
             */
            //手动确认;需要在配置文件中进行开启
            channel.basicAck(deliveryTag,true);
        }catch (Exception e){
            //出现异常让队列再发一次
            //boolean requeue: true继续发给我 false还是直接丢掉
            try {
                channel.basicNack(deliveryTag,true,true);

            }catch (Exception exception){
                e.printStackTrace();
            }
        }

    }

如何保证消息的可靠性。

设置confirm和returning机制

设置队列和交互机的持久化

搭建rabbitMQ服务集群

消费者改为手动确认机制。

3.4 如何限制消费者消费消息的条数

RabbitMQ消息中间件在项目中的使用详解

设置消费者消费消息的条数

消费者端必须为手动确认模式。

在消费者的配置文件中修改每次拉取消息的条数

#设置消息限流---消费者一次最多消费的消息个数
spring.rabbitmq.listener.simple.prefetch=3

按照上述创建队列的方式;创建test03、test04队列;并将队列加入到testX交换机中;并设置test03 的路由为d ;test04的路由为e 

生产者:

//测试限制消费者消费的个数
    ;Test
    public void testMessage(){
        for (int i = 0; i <10; i;;) {
            rabbitTemplate.convertAndSend(;testX;,;d;,;Hello Word00;;i);
        }
    }

消费者:

/**
     * 限制消费者消费的个数
     *
     * 设置限制个数;需要在配置文件中进行配置
     * 这里测试时没有进行消息确认;否则会因为消息确认太快而无法看出限流
     * 这里测试的是没次通过3个;在3个消息确认之后才会接着发送其他请求
     */
    ;RabbitListener(queues = ;test03;)
    public void test03 (Message message, Channel channel) throws IOException {
        byte[] body = message.getBody();
        System.out.println(;消息的内容:;;new String(body));
        //消息确认
//        long deliveryTag = message.getMessageProperties().getDeliveryTag();
//        channel.basicAck(deliveryTag,true);
    }

3.5 设置过期时间

TTL;time to live

可以为整个队列设置也可以单独为某条信息设置

在rabbitmq图形化界面创建设置过期时间的队列test04 

RabbitMQ消息中间件在项目中的使用详解 队列创建完成后将该队列加入到交换机exchange内;并设置路由为e

3.5.1 创建队列时为队列设置过期时间

 生产者:

//测试设置过期时间

    /**
     * 为队列设置过期时间
     * 设置休眠时间;判断数据过期是所有数据都被删除;还是哪个数据进入队列给这个数据进行计时;到期自动删除这个数据
     */
    ;Test
    public void test(){
        for (int i = 0; i <10; i;;) {
            if (i<5){
                rabbitTemplate.convertAndSend(;testX;,;e;,;Hello Word00;;i);
            }else {
                try{
                    Thread.sleep(6000);
                }catch (Exception e){
                    e.printStackTrace();
                }
                rabbitTemplate.convertAndSend(;testX;,;e;,;Hello Word00;;i);
            }

        }
    }

 消费者;

;RabbitListener(queues = ;test04;)
    public void test04 (Message message, Channel channel) throws IOException {
        byte[] body = message.getBody();
        System.out.println(;消息的内容:;;new String(body));
        //消息确认
//        long deliveryTag = message.getMessageProperties().getDeliveryTag();
//        channel.basicAck(deliveryTag,true);
    }

3.5.2 单独为消息设置过期时间

/*
    单独为一条信息设置过期时间
     */
    ;Test
    public void test00(){
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            ;Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration(;10000;);
                return message;
            }
        };
        rabbitTemplate.convertAndSend(;testX;,;d;,;Hello Word0;,messagePostProcessor);

    }

如何保证消息的可靠性

TTL过期

限定消费消息的条数

3.6 创建死信队列 

3.6.1 准备工作

在rabbitmq中创建创建普通队列和死信队列

死信队列连接连接的是普通交换机、普通队列连接的是死信交换机

创建死信队列

RabbitMQ消息中间件在项目中的使用详解创建普通队列

RabbitMQ消息中间件在项目中的使用详解

创建死信队列连接的普通交换机

RabbitMQ消息中间件在项目中的使用详解 创建普通队列连接的死信交换机

RabbitMQ消息中间件在项目中的使用详解

 生产者

 我们的队列设置了消息上限和超时时间;并且没有设置消息确认;这次一共发送十条;有五条会等待普通队列确认;剩下的五条会进入死信交换机;若普通队列的五条超过20秒;这5条消息也会进入死信队列

    /**
     * 死信队列
     */
    ;Test
    public void testSx(){
        for (int i = 0; i < 10; i;;) {
            rabbitTemplate.convertAndSend(;pt_exchange;,;dead;,;Hello Word~~~~~~~;;i);
        }
    }

RabbitMQ消息中间件在项目中的使用详解

3.7 延迟队列

RabbitMQ消息中间件在项目中的使用详解

RabbitMQ消息中间件在项目中的使用详解

RabbitMQ消息中间件在项目中的使用详解  这里的判断订单状态 是因为 如果支付系统第29分分钟去支付;支付的比较慢;最后在第31分钟支付成功了。消息30分钟加入死信队列执行库存回滚;就会出错。

3.8 如何防止消息被重复消费

RabbitMQ消息中间件在项目中的使用详解

3.9 rabbitMQ的常见面试题 

1. 如何防止消息被重复消费

2.如何保证消息的可靠性

3.rabbitMQ消息积压过多

RabbitMQ消息中间件在项目中的使用详解

RabbitMQ消息中间件在项目中的使用详解

 

网友评论

快盘下载暂未开通留言功能。

关于我们| 广告联络| 联系我们| 网站帮助| 免责声明| 软件发布

Copyright 2019-2029 【快快下载吧】 版权所有 快快下载吧 | 豫ICP备10006759号公安备案:41010502004165

声明: 快快下载吧上的所有软件和资料来源于互联网,仅供学习和研究使用,请测试后自行销毁,如有侵犯你版权的,请来信指出,本站将立即改正。