客户端 API 公开 AMQP 0-9-1 协议模型中的关键实体, 具有额外的抽象,易于使用。
RabbitMQ Java 客户端使用 com.rabbitmq.client 作为其顶级包。 关键类和接口是:
通道:表示一个AMQP 0-9-1通道,并提供大部分操作(协议方法)。
连接:表示 AMQP 0-9-1 连接
连接工厂:构造连接实例
使用者:表示消息使用者
DefaultConsumer:消费者常用的基类
基本属性:消息属性(元数据)
BasicProperties.Builder:BasicProperties 的生成器
协议操作可通过通道接口进行。连接是 用于打开通道,注册连接生命周期事件 处理程序,并关闭不再需要的连接。连接通过 ConnectionFactory 实例化, 这是您配置各种连接设置(例如虚拟主机或用户名)的方式。
此客户端版本独立于 RabbitMQ 服务器版本,可以与 RabbitMQ 服务器一起使用。 它们需要 Java 8 或更高版本:
com.rabbitmq amqp-client 5.16.0
核心 API 类是连接和通道,表示 AMQP 0-9-1 连接和 分别是通道。它们通常在使用前导入:
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
以下代码使用给定的参数(主机名、端口号等)连接到 RabbitMQ 节点:
ConnectionFactory factory = new ConnectionFactory();
// "guest"/"guest" by default, limited to localhost connections
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);Connection conn = factory.newConnection();
所有这些参数对于 RabbitMQ 都有合理的默认值 节点在本地运行。
如果属性,则将使用该属性的默认值 在创建连接之前保持未分配状态:
Property | Default Value |
Username | "guest" |
Password | "guest" |
Virtual host | "/" |
Hostname | "localhost" |
port | 5672 for regular connections, 5671 for connections that use TLS |
或者,可以使用 URI:
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost");
Connection conn = factory.newConnection();
所有这些参数都有合理的股票默认值 本地运行的 RabbitMQ 服务器。
可以在服务器节点日志中观察到成功和不成功的客户端连接事件。
请注意,默认情况下,用户来宾只能从本地主机连接。 这是为了限制生产系统中的已知凭据使用。
应用程序开发人员可以为连接分配自定义名称。如果设置, 该名称将在 RabbitMQ 节点日志以及管理 UI 中提及。
然后,可以使用连接接口打开通道:
Channel channel = conn.createChannel();
该通道现在可用于发送和接收消息,如后续部分所述。
可以指定连接时要使用的终结点列表。第一个 将使用可访问的终结点。如果连接失败,请使用 终结点列表使应用程序可以连接到不同的 节点(如果原始节点已关闭)。
要使用多个端点,请向 ConnectionFactory#newConnection 提供地址列表。 地址表示主机名和端口对。
Address[] addrArr = new Address[]{ new Address(hostname1, portnumber1), new Address(hostname2, portnumber2)};
Connection conn = factory.newConnection(addrArr);
将尝试连接到主机名 1:端口号1,如果 主机名 2:端口号2 失败。返回的连接是 数组中第一个成功(不抛出 IOException)。这完全等同于重复 在工厂上设置主机和端口,每次调用 factory.newConnection(),直到其中一个成功。
如果还提供了执行人服务(使用 form factory.newConnection(es, addrArr)) 线程池是 与(第一个)成功连接相关联。
要断开连接,只需关闭通道和连接:
channel.close();
conn.close();
请注意,关闭通道可能被认为是很好的做法,但在这里不是绝对必要的 - 它会完成 在基础连接关闭时自动执行。
客户端连接应具有长期生存期。底层协议的设计和优化是针对 长时间运行的连接。这意味着每个操作打开一个新连接, 例如,发布的消息是不必要的,强烈建议不要这样做,因为它会引入很多 网络往返和开销。
通道也意味着长期存在,但由于许多可恢复的协议错误将 导致通道关闭,通道寿命可能短于其连接寿命。 每个操作关闭和打开新通道通常是不必要的,但可以 适当。如有疑问,请考虑先重复使用频道。
通道级异常,例如尝试从 不存在的队列将导致通道关闭。封闭的通道不能 更长的使用时间,并且不会再从服务器接收任何事件(例如 作为消息传递
RabbitMQ 节点的客户端信息量有限:
其 TCP 终结点(源 IP 地址和端口)
使用的凭据
仅此信息就可能使识别应用程序和实例变得困难,特别是当凭据可能 共享和客户端通过负载均衡器连接,但无法启用代理协议。
为了更轻松地在服务器日志和管理 UI 中标识客户端, AMQP 0-9-1 客户机连接(包括 RabbitMQ Java 客户机)可以提供定制标识符。 如果设置,标识符将在日志条目和管理 UI 中提及。标识符称为 客户端提供的连接名称。该名称可用于标识应用程序或特定组件 在应用程序中。名称是可选的;但是,强烈建议开发人员提供一个 因为它将大大简化某些操作任务。
RabbitMQ Java 客户机的 ConnectionFactory#newConnection 方法覆盖接受客户机提供的连接名称。下面是上面使用的修改后的连接示例 它提供了这样一个名称:
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost");
// provides a custom connection name
Connection conn = factory.newConnection("app:audit component:event-consumer");
客户端应用程序使用 [交换] 和队列, 协议的高级构建块。 必须先声明这些,然后才能使用。声明任一类型的对象 只需确保存在该名称之一,并在必要时创建它。
继续前面的示例,以下代码声明了一个交换和一个以服务器命名的队列, 然后将它们绑定在一起。
channel.exchangeDeclare(exchangeName, "direct", true);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, routingKey);
这将主动声明以下对象, 两者都可以通过使用其他参数进行自定义。 在这里,他们都没有任何特殊的论据。
持久、非自动删除的“直接”交换
具有生成名称的非持久、独占、自动删除队列
然后,上述函数调用将队列绑定到交换 给定路由密钥。
请注意,当只有一个队列时,这将是声明队列的典型方法 客户想要使用它:它不需要一个众所周知的名字,不需要 其他客户端可以使用它(独占)并将被清理 自动(自动删除)。如果多个客户端想要共享一个队列 使用众所周知的名称,此代码将是合适的:
channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
这将主动声明:
持久、非自动删除的“直接”交换
具有已知名称的持久、非独占、非自动删除队列
许多通道 API 方法都过载。 这些方便的 exchangeDeclare、queueDeclare 和 queueBind 的缩写形式使用合理的默认值。还有带有更多参数的较长表单,可让您覆盖这些默认值 必要时,在需要时给予完全控制。
这种“短格式,长格式”模式在整个客户端 API 使用过程中使用。
队列和交换可以“被动”声明。被动声明只是检查实体是否 具有提供的名称存在。如果是这样,则操作是无操作。对于队列成功 被动声明将返回与非被动声明相同的信息,即 队列中处于就绪状态的使用者和消息。
如果实体不存在,则操作将失败,并出现通道级异常。频道 之后不能使用。应该打开一个新通道。通常使用一次性(临时) 被动声明的通道。
通道#队列声明被动和通道#交换声明被动是 用于被动声明的方法。下面的示例演示 Channel#queueDeclarePassive:
Queue.DeclareOk response = channel.queueDeclarePassive("queue-name");
// returns the number of messages in Ready state in the queue
response.getMessageCount();
// returns the number of consumers the queue has
response.getConsumerCount();
Channel#exchangeDeclarePassive的返回值不包含任何有用的信息。因此 如果该方法返回并且没有发生通道异常,则表示交换确实存在。
一些常见的操作也有一个“不等待”版本,它不会等待服务器响应。例如,要声明队列并指示服务器不发送任何响应,请使用
channel.queueDeclareNoWait(queueName, true, false, false, null);
“无等待”版本效率更高,但提供的安全保障更低,例如,它们更依赖于检测失败操作的心跳机制。如果有疑问,请从标准版本开始。只有在具有高拓扑(队列、绑定)变化的场景中才需要“无等待”版本。
可以显式删除队列或交换:
channel.queueDelete("queue-name")
仅当队列为空时,才能删除队列:
channel.queueDelete("queue-name", false, true)
或者如果未使用(没有任何使用者):
channel.queueDelete("queue-name", true, false)
可以清除队列(删除其所有消息):
channel.queuePurge("queue-name")
若要将消息发布到交易所,请使用 Channel.basicPublish,如下所示:
byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
对于精细控制,请使用重载变体来指定强制标志, 或使用预设的消息属性发送消息
channel.basicPublish(exchangeName, routingKey, mandatory,MessageProperties.PERSISTENT_TEXT_PLAIN,messageBodyBytes);
这将发送传递模式为 2(持久)、优先级为 1 的消息 和内容类型“文本/纯文本”。使用 Builder 类生成 消息属性对象,具有所需数量的属性,例如:
channel.basicPublish(exchangeName, routingKey,new AMQP.BasicProperties.Builder().contentType("text/plain").deliveryMode(2).priority(1).userId("bob").build(),messageBodyBytes);
此示例发布带有自定义标头的邮件:
Map headers = new HashMap();
headers.put("latitude", 51.5252949);
headers.put("longitude", -0.0905493);channel.basicPublish(exchangeName, routingKey,new AMQP.BasicProperties.Builder().headers(headers).build(),messageBodyBytes);
此示例发布过期的消息:
channel.basicPublish(exchangeName, routingKey,new AMQP.BasicProperties.Builder().expiration("60000").build(),messageBodyBytes);
这只是一组简短的示例,并未演示每个示例 支持的属性。
请注意,BasicProperties 是外部类 AMQP 的内部类。
如果资源驱动的警报生效,则 Channel#basicPublish 的调用最终将阻塞。
大家好,我是Doker品牌的Sinbad,欢迎点赞和评论!或者加微信进入技术群聊!