时间: 2023-10-27 【学无止境】 阅读量:共415人围观
简介 Spring Cloud Stream是构建消息驱动的微服务应用程序的框架。Spring Cloud Stream基于Spring Boot建立独立的生产级Spring应用程序,并使用Spring Integration提供与消息代理的连接。SpringCloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。
什么是SpringCloudStream?
官方定义 Spring Cloud stream 是一个构建消息驱动微服务的框架。
应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream中binder对象交互。
通过我们配置来binding(绑定),而 Spring Cloud Stream 的 binder对象负责与消息中间件交互.
所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。
目前仅支持RabbitMQ、Kafka
通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。
使用框架
总结成一句话:屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型
设计思想
cloud Stream设计
在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性,通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(rabbitmq切换为kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。
通过stream连接RabbitMQ
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
通过stream连接Kafka
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency>
Stream中的消息通信方式遵循了发布-订阅模式,Topic主题进行广播,在RabbitMQ就是Exchange,在Kakfa中就是Topic。
应用模型各组成
生产者配置bootstrap.yml
server: port: 8001 spring: application: name: cloud-stream-provider cloud: stream: binders: # 在此处配置需要绑定的rabbitMq的服务信息 defaultRabbit: ## 表示定义的名称,用于binding的整合 type: rabbit # 消息组件类型 environment: # 设置rabbitmq的相关的环境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest virtual-host: / bindings: ## 服务的整合处理 output: # 这个名字是一个通道的名称,消息发送方使用的output、消息接收方使用的是input destination: qyExchange # 表示要是用的exchange名称定义 content-type: application/json # 设置消息类型,本次为json,文本则设置为text/plain ## 设置要绑定的消息服务的具体设置 binder: defaultRabbit eureka: client: register-with-eureka: true #向注册中心注册自己 fetch-registry: true #从EurekaServer抓取已有的注册信息,集群必须设置成true,才能配合ribbon负载均衡 service-url: defaultZone: http://eureka7001.com:7001/eureka instance: instance-id: provider8001 #主机名称修改 prefer-ip-address: true #访问路径可以显示ip
注意:生产者配置的是output
启动类
@SpringBootApplication @EnableEurekaClient public class StreamProviderApplication { public static void main(String[] args) { SpringApplication.run(StreamProviderApplication.class, args); } }
生产者业务类
cloud-stream-provider工程业务类处理,模拟发送消息的业务
// controller @RestController public class SendMessageController { @Resource private IMessageProviderService messageProviderService; @GetMapping(value = "/sendMessage") public String sendMessage() { return messageProviderService.send(); } } // Service接口 public interface IMessageProviderService { String send() ; } // Service接口实现类 @Slf4j @EnableBinding(Source.class) //定义消息的推送管道,即:源 //将@EnableBinding注释应用于应用程序的配置类之一。@EnableBinding注释本身使用@Configuration进行元注释 /*此处不再需要引入 spring 注解 @Service,这里的业务实现类是与RabbitMQ配合的,使用的 SpringCloud Stream 的注解*/ public class MessageProviderServiceImpl implements IMessageProviderService { @Resource //在Spring Cloud Stream 1.0中,唯一支持的可绑定组件是Spring消息传递MessageChannel及其扩展名SubscribableChannel和PollableChannel private MessageChannel output; //消息发送管道 @Override public String send() { String serial = UUID.randomUUID().toString(); output.send(MessageBuilder.withPayload(serial).build()); log.info("*****serial:" + serial); return "RabbitMQ 消息发送方:" + serial; } }
Stream消息驱动之消费者
消费者配置bootstrap.yml
server: port: 8002 spring: application: name: cloud-stream-consumer cloud: stream: binders: # 在此处配置要绑定的 rabbitmq 的服务信息 defaultRabbit: # 表示定义的名称,用于 binding 整合 type: rabbit # 消息组件类型 environment: # 设置 rabbitmq 的相关的环境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 服务的整合处理 input: # 这个名字是一个通道的名称,消息发送方使用的output、消息接收方使用的是input destination: qyExchange # 表示要使用的 Exchange 名称定义 content-type: application/json #设置消息类型,本次为json,文本则设置 text/plain binder: defaultRabbit # 设置要绑定的消息服务的具体设置 group: consumerGroup eureka: client: register-with-eureka: true #向注册中心注册自己 fetch-registry: true #从EurekaServer抓取已有的注册信息,集群必须设置成true,才能配合ribbon负载均衡 service-url: defaultZone: http://eureka7001.com:7001/eureka instance: instance-id: consumer8002 #主机名称修改 prefer-ip-address: true #访问路径可以显示ip
注意:消费者配置的是input
消费者业务类
/** *增加订阅监听器 **/ @Component @EnableBinding(Sink.class) public class ReceiveMessageListenerController { @Value(value = "${server.port}") private String serverPort; @StreamListener(Sink.INPUT)//使用@StreamListener进行自动内容类型处理 //@StreamListener注释提供了一种更简单的处理入站邮件的模型,特别是在处理涉及内容类型管理和类型强制的用例时。 public void input(Message<String> message) { System.out.println("消费者 1 号,----->接收到的消息:" + message.getPayload() + "\t port:" + serverPort); } }
如何解决消息的重复消费?消息丢失?
当集群方式进行消息消费时,就会存在消息的重复消费问题。通过分组解决,只要是一个组的消费者,就处于竞争关系,一次只能有一个去消费,这就可以解决重复消费的问题了。而且分组(group)还解决了持久化的问题。
修改 8002(group1)、8003(group2) 的 yml 配置文件,添加group分组
在实际的生产过程中,一定要配置消息分组(group),以免造成服务宕机造成的消息丢失的问题