springcloud之stream消息驱动

springcloud学习笔记,第十章,Stream消息驱动。

消息中间件有:ActiveMQ,RabbitMQ,RocketMQ,Kafka。

简介:

屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。

消息中间件有很多,如果一个项目用到了很多的消息中间件,那么在日常的切换、维护和开发上会很复杂。

一句话就是:屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。

设计思想:

标准MQ:

  • 生产者、消费者之间靠消息媒介传递信息Message内容
  • 消息必须走特定的通道。消息通道MessageChannel
  • 消息通道里的消息如何被消费,谁负责收发处理?,消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息处理器订阅

Spring Cloud Stream是怎么屏蔽底层差异的?

绑定器Binder:

通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件之间的隔离。

Spring Cloud Stream的业务流程:

类似flume中的channel,source,sink 估计是借鉴(抄袭)的source用于获取数据(要发送到mq的数据)。channel类似SpringCloudStream中的中间件,用于存放source接收到的数据,或者是存放binder拉取的数据。

Stream的编码API和常用注解:

案例说明:

  • 首先RabbitMQ环境已经OK了
  • 工程中新建三个子模块:
    • cloud-stream-rabbitmq-provider8801,作为生成者进行发消息模块。
    • cloud-stream-rabbitmq-consumer8802,作为生成者进行发消息模块。
    • cloud-stream-rabbitmq-consumer8803,作为生成者进行发消息模块。

消息驱动之生产者:

  1. 新建moudle,cloud-stream-rabbitmq-provider8801

  2. POM:

    1
    2
    3
    4
    5
    <!--stream,rabbitMQ-->
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
  3. YML:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    server:
    port: 8801

    spring:
    application:
    name: cloud-stream-provider
    cloud:
    stream:
    binders: # 在此处配置要绑定的rabbitmq的服务信息;
    defaultRabbit: # 表示定义的名称,用于于binding整合
    type: rabbit # 消息组件类型
    environment: # 设置rabbitmq的相关的环境配置
    spring:
    rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    bindings: # 服务的整合处理
    output: # 这个名字是一个通道的名称
    destination: studyExchange # 表示要使用的Exchange名称定义
    content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
    binder: defaultRabbit # 设置要绑定的消息服务的具体设置

    eureka:
    client: # 客户端进行Eureka注册的配置
    service-url:
    defaultZone: http://localhost:7001/eureka
    instance:
    lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
    instance-id: send-8801.com # 在信息列表时显示主机名称
    prefer-ip-address: true # 访问的路径变为IP地址
  4. 主启动类StreamMQMain8801:

    1
    2
    3
    4
    5
    6
    7
    8
    @SpringBootApplication
    public class StreamMQMain8801
    {
    public static void main(String[] args)
    {
    SpringApplication.run(StreamMQMain8801.class,args);
    }
    }
  5. 业务类:

    • service:写一个接口与实现类
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    @EnableBinding(Source.class) //定义消息的推送管道
    public class MessageProviderImpl implements IMessageProvider
    {
    @Resource
    private MessageChannel output; // 消息发送管道

    @Override
    public String send()
    {
    String serial = UUID.randomUUID().toString();
    output.send(MessageBuilder.withPayload(serial).build());
    System.out.println("*****serial: "+serial);
    return null;
    }
    }

    • controller:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    @RestController
    public class SendMessageController
    {
    @Resource
    private IMessageProvider messageProvider;

    @GetMapping(value = "/sendMessage")
    public String sendMessage()
    {
    return messageProvider.send();
    }

    }
  6. 测试:

    • 启动eureka,rabbitMQrabbitmq-server.bat,stream8801。访问http://localhost:8801/sendMessage

消息驱动之消费者:

消费者8802:

  1. 新建moudle,cloud-stream-rabbitmq-consumer8802

  2. POM:

    1
    2
    3
    4
    5
    <!--stream,rabbitMQ-->
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
  3. YML:

  4. 主启动类StreamMQMain8802:

  5. 业务类:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    @Component
    @EnableBinding(Sink.class)
    public class ReceiveMessageListenerController
    {
    @Value("${server.port}")
    private String serverPort;


    @StreamListener(Sink.INPUT)
    public void input(Message<String> message)
    {
    System.out.println("消费者1号,----->接受到的消息: "+message.getPayload()+"\t port: "+serverPort);
    }
    }
  6. 测试:测试8801发送,8802接受消息,localhost:8801/sendMessage,8802控制台输出:

Stream之消息重复消费:

依照8802,clone一份8803,cloud-stream-rabbitmq-consumer8803

  1. 启动RabbitMQ,7001进行服务注册,8801进行消息生产。
  2. 8802在消息消费
  3. 8803也在消息消费

那么,运行后就有两个问题:重复消费和消息持久化问题。

如何解决重复消费问题:

分组消费与持久化:

将众多消费者放进同一个消费者组,那么就可以解决重复消费的问题。

原理:

微服务应用放置于同一个group中,就能够保证消息只会被其中一个应用消费一次,不同的组是可以消费的,同一个组发生竞争关系,只有其中一个会被消费。那么我们来试验下以下两种情况:

8802和8803分成不同的组:

  1. 8802改YML:

  2. 8803改YML:

  3. 结论

    • 当8801发送消息时,消费者同时在接受消息。出现了消息的重复消费。

8802和8803在相同的组:

  1. 都在aaron组

  2. 结论

    • 当8801发送消息时,只有一个消费者会接受消息,且是轮询接收的。

消息持久化:

那么消息持久化怎么实现?看例子:

  1. 停止8802/8803并去掉8802分组的group: aarona,8803的不用去。
  2. 8801先发送4条消息到rabbitmq
  3. 先启动8802.无分组属性配置,后台没有打印出来消息
  4. 先启动8803,有分组属性配置,后台打印出来了MQ上的消息。
  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!

请我喝杯咖啡吧~

支付宝
微信