需求: 使用简单模式完成消息传递
步骤:
创建工程(生产者、消费者)
分别添加依赖
<dependencies>
<!--rabbitmq 依赖客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
<!--操作文件流的一个依赖-->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
</dependencies>
编写生产者发送消息
package com.dmbjz.one;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/*HelloWord模型 生产者代码*/
public class Producer {
public static final String QUEUE_NAME = "hello"; // 队列名称
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.189.128"); // 设置MQ所在机器IP进行连接
factory.setPort(5672); // 指定MQ服务端口
factory.setVirtualHost("study"); // 指定使用的VirtualHost
factory.setUsername("admin"); // 指定MQ账号名
factory.setPassword("123"); // 指定MQ密码
Connection connection = factory.newConnection(); // 创建连接
Channel channel = connection.createChannel(); // 创建信道
/* 队列设置(创建队列)
*参数1:队列名称,名称不存在就自动创建
*参数2:定义队列是否持久化(重启MQ后是队列否存在),true开启,false关闭
*参数3:exclusive 是否独占队列(设置是否只能有一个消费者使用),true独占,false非独占
*参数4:autoelete 是否在消费完成后是否自动删除队列 ,true删除,false不删除
*参数5:额外附加参数
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String message = "Hello RabbitMQ"; // 需要发送的消息
/* 交换机&队列设置(指定消息使用的交换机和队列)
* 参数1: exchange交换机名称(简单队列无交换机,这里不写)
* 参数2: 有交换机就是路由key。没有交换机就是队列名称,意为往该队列里存放消息
* 参数3: 传递消息的额外设置 (设置消息是否持久化) MessageProperties.PERSISTENT_TEXT_PLAIN设置消息持久化
* 参数4: 消息具体内容(要为 Byte类型)
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
/*关闭资源*/
channel.close();
connection.close();
System.out.println("消息生产完毕");
}
}
编写消费者接收消息
package com.dmbjz.one;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*HelloWord模型 消费者案例*/
public class Consumer {
public static final String QUEUE_NAME = "hello"; // 队列名称
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.226.129"); // 设置MQ所在机器IP进行连接
factory.setPort(5672); // 指定MQ服务端口
factory.setVirtualHost("study"); // 指定使用的VirtualHost
factory.setUsername("admin"); // 指定MQ账号名
factory.setPassword("123"); // 指定MQ密码
Connection connection = factory.newConnection(); // 创建连接
Channel channel = connection.createChannel(); // 创建信道
/*消费者成功消费时的回调接口,这里为打印获取到的消息*/
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println(new String(message.getBody()));
};
/*消费者取消消费的回调*/
CancelCallback callback = consumerTag -> {
System.out.println("消息者取消消费接口回调逻辑");
};
/* 消费消息
* 参数1 : 消费队列的名称
* 参数2 : 消息的自动确认机制(一获得消息就通知 MQ 消息已被消费) true打开,false关闭 (接收到消息并消费后也不通知 MQ ,常用)
* 参数3 : 消费者成功消费时的回调接口
* 参数4 : 消费者取消消费的回调
*/
channel.basicConsume(QUEUE_NAME, true, deliverCallback, callback);
System.out.println("消费者执行完毕");
}
}
打开生产者,并运行程序。在控制台可以看到创建的队列
工作队列模式:让多个消费者绑定到一个队列共同消费队列中的消息。默认情况下,RabbitMQ将按顺序将每个消息发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息
与入门程序的简单模式相比,多了有个或一些消费者,多个消费者共同消费一个队列中的消息。
应用场景:对于任务过重或任务较多的情况使用工作队列可以提高任务处理速度。
生产者代码:使用循环反复生产,增加消息队列中的数量
package com.dmbjz.one;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/*HelloWord模型 生产者代码*/
public class Producer {
public static final String QUEUE_NAME = "hello"; // 队列名称
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.226.129"); // 设置MQ所在机器IP进行连接
factory.setPort(5672); // 指定MQ服务端口
factory.setVirtualHost("study"); // 指定使用的VirtualHost
factory.setUsername("admin"); // 指定MQ账号名
factory.setPassword("123"); // 指定MQ密码
Connection connection = factory.newConnection(); // 创建连接
Channel channel = connection.createChannel(); // 创建信道
/* 队列设置(创建队列)
*参数1:队列名称,名称不存在就自动创建
*参数2:定义队列是否持久化(重启MQ后是队列否存在),true开启,false关闭
*参数3:exclusive 是否独占队列(设置是否只能有一个消费者使用),true独占,false非独占
*参数4:autoelete 是否在消费完成后是否自动删除队列 ,true删除,false不删除
*参数5:额外附加参数
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String message = "Hello RabbitMQ"; // 需要发送的消息
/* 交换机&队列设置(指定消息使用的交换机和队列)
* 参数1: exchange交换机名称(简单队列无交换机,这里不写)
* 参数2: 有交换机就是路由key。没有交换机就是队列名称,意为往该队列里存放消息
* 参数3: 传递消息的额外设置 (设置消息是否持久化) MessageProperties.PERSISTENT_TEXT_PLAIN设置消息持久化
* 参数4: 消息具体内容(要为 Byte类型)
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
/*关闭资源*/
channel.close();
connection.close();
System.out.println("消息生产完毕");
}
}
消费者代码:多个消费者代码同时运行监听队列。(多复制几个并运行)
package com.dmbjz.one;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*HelloWord模型 消费者案例*/
public class Consumer {
public static final String QUEUE_NAME = "hello"; // 队列名称
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.226.129"); // 设置MQ所在机器IP进行连接
factory.setPort(5672); // 指定MQ服务端口
factory.setVirtualHost("study"); // 指定使用的VirtualHost
factory.setUsername("admin"); // 指定MQ账号名
factory.setPassword("123"); // 指定MQ密码
Connection connection = factory.newConnection(); // 创建连接
Channel channel = connection.createChannel(); // 创建信道
/*消费者成功消费时的回调接口,这里为打印获取到的消息*/
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println(new String(message.getBody()));
};
/*消费者取消消费的回调*/
CancelCallback callback = consumerTag -> {
System.out.println("消息者取消消费接口回调逻辑");
};
/* 消费消息
* 参数1 : 消费队列的名称
* 参数2 : 消息的自动确认机制(一获得消息就通知 MQ 消息已被消费) true打开,false关闭 (接收到消息并消费后也不通知 MQ ,常用)
* 参数3 : 消费者成功消费时的回调接口
* 参数4 : 消费者取消消费的回调
*/
channel.basicConsume(QUEUE_NAME, true, deliverCallback, callback);
System.out.println("消费者执行完毕");
}
}
定义好交换机,消息的传输路径变为,从Producer发出,发送至Fanout类型的交换机,交换机将消息分别推送给和自己绑定的Queue上,消费者再去消费对应的Queue,此处与前面模式一致。
交换机X:Exchange有三种类型:
交换机只负责转发消息,不具备存储消息的能力,因此如果没有队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失。
常情况下生产者产生消息后才会执行消费者代码,因此交换机的创建都由生产者执行,消费者不需要再次创建交换机(交换机只需要被声明一次即可)
在发布订阅模式中,只要有任意一个消费者的代码中进行了其他消费者交换机和 routingKey 的绑定,则这些消费者可以直接进行消息消费,而不再需要再次声明交换机和队列绑定
**如果整合SpringBoot,所有的交换机、队列声明将会放在一个配置文件,**无需像非SpringBoot案例中如此麻烦
创建Producer_PubSub
public class Producer_PubSub {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("172.16.98.133");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/itcast");//虚拟机 默认值/
factory.setUsername("heima");//用户名 默认 guest
factory.setPassword("heima");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
/*
exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal,
Map<String, Object> arguments)
参数:
1. exchange:交换机名称
2. type:交换机类型
DIRECT("direct"),:定向
FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
TOPIC("topic"),通配符的方式
HEADERS("headers");参数匹配
3. durable:是否持久化
4. autoDelete:自动删除
5. internal:内部使用。 一般false
6. arguments:参数
*/
String exchangeName = "test_fanout";
//5. 创建交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
//6. 创建队列
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//7. 绑定队列和交换机
/*
queueBind(String queue, String exchange, String routingKey)
参数:
1. queue:队列名称
2. exchange:交换机名称
3. routingKey:路由键,绑定规则
如果交换机的类型为fanout ,routingKey设置为""
*/
channel.queueBind(queue1Name,exchangeName,"");
channel.queueBind(queue2Name,exchangeName,"");
String body = "日志信息:张三调用了findAll方法...日志级别:info...";
//8. 发送消息
channel.basicPublish(exchangeName,"",null,body.getBytes());
//9. 释放资源
channel.close();
connection.close();
}
}
创建Consumer_PubSub1和Consumer_PubSub2消费者
public class Consumer_PubSub1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("172.16.98.133");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/itcast");//虚拟机 默认值/
factory.setUsername("heima");//用户名 默认 guest
factory.setPassword("heima");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/* System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);*/
System.out.println("body:"+new String(body));
System.out.println("将日志信息打印到控制台.....");
}
};
channel.basicConsume(queue1Name,true,consumer);
//关闭资源?不要
}
}
public class Consumer_PubSub1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("172.16.98.133");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/itcast");//虚拟机 默认值/
factory.setUsername("heima");//用户名 默认 guest
factory.setPassword("heima");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/* System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);*/
System.out.println("body:"+new String(body));
System.out.println("将日志信息打印到控制台.....");
}
};
channel.basicConsume(queue2Name,true,consumer);
//关闭资源?不要
}
}
在路由工作模式中,我们需要配置一个类型为direct的交换机,并且需要指定不同的路由键(routing key),把对应的消息从交换机路由到不同的消息队列进行存储,由消费者进行消费。
P:生产者,向交换机发送消息的时候,会指定一个routing key X:Exchange(交换机),接收生产者的消息,然后把消息传递给和routing key完全匹配的队列 C1:消费者,它所在队列指定了需要routing key为error的信息 C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
队列和交换机的绑定是需要指定routing key的,不可以随意绑定 消息的发送方向交换机发送消息的时候,也需要指定消息的routing key 交换机不再把消息交给每一个绑定的队列,而是根据消息的routing key来进行判断,只有队列的routing key和消息的routing key完全一样才会接收到消息。
生产者
public class Producer_Routing {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("172.16.98.133");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/itcast");//虚拟机 默认值/
factory.setUsername("heima");//用户名 默认 guest
factory.setPassword("heima");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
/*
exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
参数:
1. exchange:交换机名称
2. type:交换机类型
DIRECT("direct"),:定向
FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
TOPIC("topic"),通配符的方式
HEADERS("headers");参数匹配
3. durable:是否持久化
4. autoDelete:自动删除
5. internal:内部使用。 一般false
6. arguments:参数
*/
String exchangeName = "test_direct";
//5. 创建交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
//6. 创建队列
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//7. 绑定队列和交换机
/*
queueBind(String queue, String exchange, String routingKey)
参数:
1. queue:队列名称
2. exchange:交换机名称
3. routingKey:路由键,绑定规则
如果交换机的类型为fanout ,routingKey设置为""
*/
//队列1绑定 error
channel.queueBind(queue1Name,exchangeName,"error");
//队列2绑定 info error warning
channel.queueBind(queue2Name,exchangeName,"info");
channel.queueBind(queue2Name,exchangeName,"error");
channel.queueBind(queue2Name,exchangeName,"warning");
String body = "日志信息:张三调用了delete方法...出错误了。。。日志级别:error...";
//8. 发送消息
channel.basicPublish(exchangeName,"warning",null,body.getBytes());
//9. 释放资源
channel.close();
connection.close();
}
}
消费者1
public class Consumer_Routing1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("172.16.98.133");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/itcast");//虚拟机 默认值/
factory.setUsername("heima");//用户名 默认 guest
factory.setPassword("heima");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/* System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);*/
System.out.println("body:"+new String(body));
System.out.println("将日志信息打印到控制台.....");
}
};
channel.basicConsume(queue2Name,true,consumer);
//关闭资源?不要
}
}
消费者2
public class Consumer_Routing2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("172.16.98.133");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/itcast");//虚拟机 默认值/
factory.setUsername("heima");//用户名 默认 guest
factory.setPassword("heima");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/* System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);*/
System.out.println("body:"+new String(body));
System.out.println("将日志信息存储到数据库.....");
}
};
channel.basicConsume(queue1Name,true,consumer);
//关闭资源?不要
}
}
消费者1
public class Consumer_Topic1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("172.16.98.133");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/itcast");//虚拟机 默认值/
factory.setUsername("heima");//用户名 默认 guest
factory.setPassword("heima");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/* System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);*/
System.out.println("body:"+new String(body));
System.out.println("将日志信息存入数据库.......");
}
};
channel.basicConsume(queue1Name,true,consumer);
//关闭资源?不要
}
}
消费者2
public class Consumer_Topic2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("172.16.98.133");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/itcast");//虚拟机 默认值/
factory.setUsername("heima");//用户名 默认 guest
factory.setPassword("heima");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/* System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);*/
System.out.println("body:"+new String(body));
System.out.println("将日志信息打印控制台.......");
}
};
channel.basicConsume(queue2Name,true,consumer);
//关闭资源?不要
}
}
生产者
public class Producer_Topics {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("172.16.98.133");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/itcast");//虚拟机 默认值/
factory.setUsername("heima");//用户名 默认 guest
factory.setPassword("heima");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
/*
exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
参数:
1. exchange:交换机名称
2. type:交换机类型
DIRECT("direct"),:定向
FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
TOPIC("topic"),通配符的方式
HEADERS("headers");参数匹配
3. durable:是否持久化
4. autoDelete:自动删除
5. internal:内部使用。 一般false
6. arguments:参数
*/
String exchangeName = "test_topic";
//5. 创建交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
//6. 创建队列
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//7. 绑定队列和交换机
/*
queueBind(String queue, String exchange, String routingKey)
参数:
1. queue:队列名称
2. exchange:交换机名称
3. routingKey:路由键,绑定规则
如果交换机的类型为fanout ,routingKey设置为""
*/
// routing key 系统的名称.日志的级别。
//=需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库
channel.queueBind(queue1Name,exchangeName,"#.error");
channel.queueBind(queue1Name,exchangeName,"order.*");
channel.queueBind(queue2Name,exchangeName,"*.*");
String body = "日志信息:张三调用了findAll方法...日志级别:info...";
//8. 发送消息
channel.basicPublish(exchangeName,"goods.error",null,body.getBytes());
//9. 释放资源
channel.close();
connection.close();
}
}