RabbitMQ 最常用的 3 大模式!
Java技术栈
共 9193字,需浏览 19分钟
· 2020-09-19
Java技术栈
www.javastack.cn
关注阅读更多优质文章
作者:海向
出处:www.cnblogs.com/haixiang/p/10864339.html
Direct 模式
所有发送到 Direct Exchange 的消息被转发到 RouteKey 中指定的 Queue。
Direct 模式可以使用 RabbitMQ 自带的 Exchange: default Exchange,所以不需要将 Exchange 进行任何绑定(binding)操作。
消息传递时,RouteKey 必须完全匹配才会被队列接收,否则该消息会被抛弃,
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class DirectProducer {
public static void main(String[] args) throws Exception {
//1. 创建一个 ConnectionFactory 并进行设置
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
//2. 通过连接工厂来创建连接
Connection connection = factory.newConnection();
//3. 通过 Connection 来创建 Channel
Channel channel = connection.createChannel();
//4. 声明
String exchangeName = "test_direct_exchange";
String routingKey = "item.direct";
//5. 发送
String msg = "this is direct msg";
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
System.out.println("Send message : " + msg);
//6. 关闭连接
channel.close();
connection.close();
}
}
import com.rabbitmq.client.*;
import java.io.IOException;
public class DirectConsumer {
public static void main(String[] args) throws Exception {
//1. 创建一个 ConnectionFactory 并进行设置
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(3000);
//2. 通过连接工厂来创建连接
Connection connection = factory.newConnection();
//3. 通过 Connection 来创建 Channel
Channel channel = connection.createChannel();
//4. 声明
String exchangeName = "test_direct_exchange";
String queueName = "test_direct_queue";
String routingKey = "item.direct";
channel.exchangeDeclare(exchangeName, "direct", true, false, null);
channel.queueDeclare(queueName, false, false, false, null);
//一般不用代码绑定,在管理界面手动绑定
channel.queueBind(queueName, exchangeName, routingKey);
//5. 创建消费者并接收消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
//6. 设置 Channel 消费者绑定队列
channel.basicConsume(queueName, true, consumer);
}
}
Send message : this is direct msg
[x] Received 'this is direct msg'
Topic 模式
可以使用通配符进行模糊匹配
符号'#" 匹配一个或多个词
符号"*”匹配不多不少一个词
例如:
'log.#"能够匹配到'log.info.oa"
"log.*"只会匹配到"log.erro“
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class TopicProducer {
public static void main(String[] args) throws Exception {
//1. 创建一个 ConnectionFactory 并进行设置
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
//2. 通过连接工厂来创建连接
Connection connection = factory.newConnection();
//3. 通过 Connection 来创建 Channel
Channel channel = connection.createChannel();
//4. 声明
String exchangeName = "test_topic_exchange";
String routingKey1 = "item.update";
String routingKey2 = "item.delete";
String routingKey3 = "user.add";
//5. 发送
String msg = "this is topic msg";
channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());
channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());
channel.basicPublish(exchangeName, routingKey3, null, msg.getBytes());
System.out.println("Send message : " + msg);
//6. 关闭连接
channel.close();
connection.close();
}
}
import com.rabbitmq.client.*;
import java.io.IOException;
public class TopicConsumer {
public static void main(String[] args) throws Exception {
//1. 创建一个 ConnectionFactory 并进行设置
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(3000);
//2. 通过连接工厂来创建连接
Connection connection = factory.newConnection();
//3. 通过 Connection 来创建 Channel
Channel channel = connection.createChannel();
//4. 声明
String exchangeName = "test_topic_exchange";
String queueName = "test_topic_queue";
String routingKey = "item.#";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, false, false, false, null);
//一般不用代码绑定,在管理界面手动绑定
channel.queueBind(queueName, exchangeName, routingKey);
//5. 创建消费者并接收消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
//6. 设置 Channel 消费者绑定队列
channel.basicConsume(queueName, true, consumer);
}
}
Send message : this is topc msg
[x] Received 'this is topc msg'
[x] Received 'this is topc msg'
Fanout 模式
不处理路由键,只需要简单的将队列绑定到交换机上发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。系列RabbitMQ教程请关注公众号Java技术栈获取阅读。
Fanout交换机转发消息是最快的。
import com.rabbitmq.client.*;
import java.io.IOException;
public class FanoutConsumer {
public static void main(String[] args) throws Exception {
//1. 创建一个 ConnectionFactory 并进行设置
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(3000);
//2. 通过连接工厂来创建连接
Connection connection = factory.newConnection();
//3. 通过 Connection 来创建 Channel
Channel channel = connection.createChannel();
//4. 声明
String exchangeName = "test_fanout_exchange";
String queueName = "test_fanout_queue";
String routingKey = "item.#";
channel.exchangeDeclare(exchangeName, "fanout", true, false, null);
channel.queueDeclare(queueName, false, false, false, null);
//一般不用代码绑定,在管理界面手动绑定
channel.queueBind(queueName, exchangeName, routingKey);
//5. 创建消费者并接收消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
//6. 设置 Channel 消费者绑定队列
channel.basicConsume(queueName, true, consumer);
}
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class FanoutProducer {
public static void main(String[] args) throws Exception {
//1. 创建一个 ConnectionFactory 并进行设置
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
//2. 通过连接工厂来创建连接
Connection connection = factory.newConnection();
//3. 通过 Connection 来创建 Channel
Channel channel = connection.createChannel();
//4. 声明
String exchangeName = "test_fanout_exchange";
String routingKey1 = "item.update";
String routingKey2 = "";
String routingKey3 = "ookjkjjkhjhk";//任意routingkey
//5. 发送
String msg = "this is fanout msg";
channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());
channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());
channel.basicPublish(exchangeName, routingKey3, null, msg.getBytes());
System.out.println("Send message : " + msg);
//6. 关闭连接
channel.close();
connection.close();
}
}
Send message : this is fanout msg
[x] Received 'this is fanout msg'
[x] Received 'this is fanout msg'
[x] Received 'this is fanout msg'
关注Java技术栈看更多干货
评论
了解加密货币到加密货币的互换
1、什么是加密货币互换?加密货币到加密货币的互换是指以现行市场汇率将一种加密货币直接兑换为另一种加密货币。与需要法定货币存款和较长流程的传统交易所不同,加密货币到加密货币的互换可以无缝地促进交换。掉期在提高加密货币的流动性和效率方面发挥着重要作用。该功能使用户能够将他们的加密货币与钱包中的其他代币进
区块链头条
0
李彦宏:开源大模型不如闭源,后者会持续领先;周鸿祎:“开源不如闭源” 的言论是胡说八道
架构师大咖
架构师大咖,打造有价值的架构师交流平台。分享架构师干货、教程、课程、资讯。架构师大咖,每日推送。
公众号该公众号已被封禁0、李彦宏:开源大模型不如闭源,后者会持续领先当今
源码共读
0
【第129期】程序员的新宠:三款终端工具,让你告别Xshell!
概述 WindTerm:跨平台的SSH利器 首先介绍的是WindTerm,这是一款使用C语言开发的跨平台SSH客户端。它不仅完全免费,而且没有商业使用的限制。WindTerm支持SSH v2、Telnet、Raw Tcp等协议,而且性能出色,甚至超过了FinalShell和Electerm。功能
前端微服务
0
字节员工:35岁以后被裁员的,后来都走了哪条路?现在2-2,要不要利用最后一年拼命上个岸。
架构师大咖
架构师大咖,打造有价值的架构师交流平台。分享架构师干货、教程、课程、资讯。架构师大咖,每日推送。
公众号该公众号已被封禁在当今竞争激烈的职场环境中,年龄并不总是一个决定性
源码共读
0
上班的时候,有一群摸鱼搭子非常重要...
上班的时候,有一群摸鱼搭子非常重要!一到上班时间,他们就从四面八方涌进群里冒泡...从八卦聊到股市、从职场聊到乌X兰局势,偶尔还会复读、相亲、battle...然后,下午6点钟准时消失不见...所以你要不要加入我们一起摸鱼?我们有北京、上海、深圳、广州、杭州、武汉、成都、南京等8个城市的摸鱼群,还有
产品经理日记
0
周四002 瑞超:同样落寞的境遇——北雪平vs埃尔夫斯堡
上赛季最终排名联赛第9的北雪平本赛季伊始表现不佳,4轮战罢他们仅以1胜1平2负的战绩排在倒数第三,这支历史上曾夺得13次联赛冠军、6次杯赛冠军老牌劲旅,正如英格兰赛场上的一众百年俱乐部,在低谷中不断探索着出路。球队主教练安德烈亚斯·阿尔姆曾是AIK索尔纳及赫根队的主教练,他于今年年初刚刚拿起球队教鞭
产品与体验
0
雷军辟谣了!不是高考状元,卡里也没有冰冷的 40 亿
架构师大咖
架构师大咖,打造有价值的架构师交流平台。分享架构师干货、教程、课程、资讯。架构师大咖,每日推送。
公众号该公众号已被封禁最近很火的雷军简历,听说落魄时卡里只有冰冷的 40
源码共读
0
【比特币减半后价格表现大揭秘】历史数据告诉你什么?
加密货币现状的十张图表Glassnode 和 Coinbase 发布了《加密货币市场指南》,这是一个季度系列,旨在提供对加密货币市场主要发展的详细分析。以下是报告中引起我们注意的10张图表:1.比特币主导地位从50%上升至52%通常由减半引发的山寨季会降低比特币的主导地位,使其更倾向于新的山寨币。这
区块链头条
0