Spring Cloud Stream在 Spring Cloud 体系内用于构建高度可扩展的基于事件驱动的微服务,其目的是为了简化消息在 Spring Cloud 应用程序中的开发。
使用
依赖
<!-- stream-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
生产者
配置
#对接具体的消息中间件
spring.cloud.stream.binders.test01.type=rabbit
spring.cloud.stream.binders.test01.environment.spring.rabbitmq.host=${spring.rabbitmq.host}
spring.cloud.stream.binders.test01.environment.spring.rabbitmq.port=${spring.rabbitmq.port}
spring.cloud.stream.binders.test01.environment.spring.rabbitmq.username=${spring.rabbitmq.username}
spring.cloud.stream.binders.test01.environment.spring.rabbitmq.password=${spring.rabbitmq.password}
spring.cloud.stream.binders.test01.environment.spring.rabbitmq.virtual-host=/
#消息生产者
#其中utput是一个key,这个名字是一个通道的名称,在代码中会用到
#destination表示要使用的Exchange名称定义
spring.cloud.stream.bindings.test01output.destination=spring.cloud.stream.exchange
#设置要绑定的消息服务的binder
spring.cloud.stream.bindings.test01output.binder=test01
自定义消息通道
/**
* @author Ybond
*/
public interface SendMsgOutput {
/**
* 这里对应的是通道名称,不是通道的路径
*/
String OUTPUT = "test01output";
@Output(OUTPUT)
MessageChannel output();
}
发送消息
/**
* @author Ybond
*/
@EnableBinding(SendMsgOutput.class)
@RequiredArgsConstructor
public class SendMsgService {
private final MessageChannel messageChannel;
public void send(String message) {
messageChannel.send(MessageBuilder.withPayload(message).build());
StaticLog.info("发送消息:" + message);
}
}
消费者
配置
#对接具体的消息中间件
spring.cloud.stream.binders.test01.type=rabbit
spring.cloud.stream.binders.test01.environment.spring.rabbitmq.host=${spring.rabbitmq.host}
spring.cloud.stream.binders.test01.environment.spring.rabbitmq.port=${spring.rabbitmq.port}
spring.cloud.stream.binders.test01.environment.spring.rabbitmq.username=${spring.rabbitmq.username}
spring.cloud.stream.binders.test01.environment.spring.rabbitmq.password=${spring.rabbitmq.password}
spring.cloud.stream.binders.test01.environment.spring.rabbitmq.virtual-host=/
#配置消息消费者
#指定交换机
spring.cloud.stream.bindings.test01input.destination=spring.cloud.stream.exchange
#设置要绑定的消息服务的binder
spring.cloud.stream.bindings.test01input.binder=test01
消息通道
/**
* @author Ybond
*/
public interface ReceiveMsg {
String INPUT = "test01input";
@Input(INPUT)
SubscribableChannel input();
}
接收消息
/**
* @author Ybond
*/
@EnableBinding(ReceiveMsg.class)
public class MessageReceive {
@StreamListener(ReceiveMsg.INPUT)
public void receive(String message) {
StaticLog.info("接收消息:" + message);
}
}
分组和持久化
配置分组,消息就会持久化,分组配置在消费者
spring.cloud.stream.bindings.test01output.binder=test01
队列名会变成destination+group
没有做分组时,一个消息可以被多个消费者接收,分组可以让一个消息只能被一个消费者接收,避免一个消息被多个消费者消费;
当项目集群部署了很多份,那么就会变成多个消费者,但是业务可能需要的是一个消息只消费一次,所以此时需要加个分组,就可以实现同一个分组里面的消费者只会有一个消费者能接收到消息;
注意:
1、不分组的话,消费者要先启动起来,然后再用生产者发送消息,这样才可以接收到消息,否则发送的消息就丢失了,生产者先发了消息,消费者后面才启动的话是接收不到消息的;
2、不分组的话,多个消费者都能接收消息,也就是一个消息可以被多个消费者接收;
设置路由键
默认情况下Spring Cloud Stream传送消息属于广播消息,默认匹配方式是 #,表示所有消费者都可以匹配上,我们也可以通过指定路由键 RoutingKey实现按需求匹配消息消费端进行消息接收处理;
在消费端进行设置
# 设置一个RoutingKey路由key,默认是#,我们可以指定spring.cloud.stream.rabbit.bindings.myInput.consumer.bindingRoutingKey=spring.cloud.stream.#