spring-cloud-stream消息驱动
创始人
2025-05-30 09:25:38

Stream 消息驱动

Stream 概述

  1. Spring Cloud Stream 是一个构建消息驱动微服务应用的框架。
  2. Stream 解决了开发人员无感知的使用消息中间件的问题,因为Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件,使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。
  3. Spring Cloud Stream目前支持两种消息中间件RabbitMQ和Kafka
    在这里插入图片描述

Stream 组件

  1. Spring Cloud Stream 构建的应用程序与消息中间件之间是通过绑定器 Binder相关联的。绑定器对于应用程序而言起到了隔离作用, 它 使得不同消息中间件的实现细节对应用程序来说是透明的。
  2. binding 是我们通过配置把应用和spring cloud stream 的 binder 绑定在一起
  3. output:发送消息 Channel,内置 Source接口
  4. input:接收消息 Channel,内置 Sink接口

在这里插入图片描述

Stream 消息生产者

  1. 创建消息生产者模块,引入依赖 starter-stream-rabbit
  2. 编写配置,定义 binder,和 bingings
  3. 定义消息发送业务类。添加 @EnableBinding(Source.class),注入MessageChannel output
    ,完成消息发送
  4. 编写启动类,测试

MessageProducer.java

package com.itheima.stream.producer;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;@Component
@EnableBinding(Source.class)
public class MessageProducer {@Autowiredprivate MessageChannel output; //动态创建出来public void send() {String message = "hello message~~~";///发送消息output.send(MessageBuilder.withPayload(message).build()); //构建messgae对象build() 消息体构建System.out.println("消息发送成功");}
}

ProducerController.java

package com.itheima.stream.producer;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class ProducerController {@Autowiredprivate MessageProducer producer;@RequestMapping("/send")public String sendMsg(){producer.send();return "send msg";}
}
package com.itheima.stream;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class ProducerApp {public static void main(String[] args) {SpringApplication.run(ProducerApp.class, args);}
}
server:port: 8000spring:cloud:stream:# 定义绑定器,绑定到哪个消息中间件上binders:itheima_binder: # 自定义的绑定器名称type: rabbit # 绑定器类型environment: # 指定mq的环境spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestvirtual-host: /bindings:output: # channel名称binder: itheima_binder #指定使用哪一个binderdestination: itheima_exchange # 消息目的地

stream-parentcom.itheima1.0-SNAPSHOT4.0.0stream-producerorg.springframework.bootspring-boot-starter-weborg.springframework.cloudspring-cloud-starter-stream-rabbit

在这里插入图片描述

stream-consumer

Stream 消息消费者

  1. 创建消息消费者模块,引入依赖 starter-stream-rabbit
  2. 编写配置,定义 binder,和 bingings
  3. 定义消息接收业务类。添加@EnableBinding(Sink.class),使用@StreamListener(Sink.INPUT),完成消息接收。
  4. 编写启动类,测试
MessageListener.javapackage com.itheima.stream.consumer;import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;/*** 消息接收类*/
@EnableBinding({Sink.class})
@Componentpublic class MessageListener {@StreamListener(Sink.INPUT)public void receive(Message message) {System.out.println(message);System.out.println(message.getPayload());}
}
package com.itheima.stream;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class ConsumerApp {public static void main(String[] args) {SpringApplication.run(ConsumerApp.class, args);}
}
server:port: 9000spring:cloud:stream:# 定义绑定器,绑定到哪个消息中间件上binders:itheima_binder: # 自定义的绑定器名称type: rabbit # 绑定器类型environment: # 指定mq的环境spring:rabbitmq:host: 175.24.181.110port: 5672username: guestpassword: guestvirtual-host: /bindings:input: # channel名称binder: itheima_binder #指定使用哪一个binderdestination: itheima_exchange # 消息目的地

stream-parentcom.itheima1.0-SNAPSHOT4.0.0stream-consumerorg.springframework.bootspring-boot-starter-weborg.springframework.cloudspring-cloud-starter-stream-rabbit

父工程


4.0.0com.itheimastream-parentpom1.0-SNAPSHOTstream-producerstream-consumerorg.springframework.bootspring-boot-starter-parent2.1.0.RELEASE UTF-8UTF-81.8Greenwich.RELEASEorg.springframework.cloudspring-cloud-dependencies${spring-cloud.version}pomimport

相关内容

热门资讯

『重大发现』“云南绍兴麻将辅助... 您好:云南绍兴麻将这款游戏可以开挂,确实是有挂的,需要了解加客服微信【3398215】很多玩家在这款...
〖玩家分享〗大赢家跑得快有挂吗... 【无需打开直接搜索微信【4579337】操作使用教程:1.亲,实际上大赢家跑得快是可以开挂的,确实有...
盘点一款“吉祥联盟确实有挂”其... 您好:吉祥联盟这款游戏可以开挂,确实是有挂的,需要软件加微信【3671900】很多玩家在这款游戏中打...
重大通报“来来淮北麻将究竟有没... 您好:来来淮北麻将这款游戏可以开挂,确实是有挂的,需要了解加客服微信【69174242】很多玩家在来...
今日重大通报“天王斗牛到底有没... 亲:天王斗牛这款游戏是可以开挂的,确实是有挂的,添加客服【8487422】很多玩家在这款游戏中怀疑是...