kafka概述

Apache Kafka是一个分布式发布 - 订阅消息系统和一个强大的队列,可以处理大量的数据,并使您能够将消息从一个端点传递到另一个端点。 Kafka适合离线和在线消息消费。 Kafka消息保留在磁盘上,并在集群内复制以防止数据丢失。 Kafka构建在ZooKeeper同步服务之上。 它与Apache Storm和Spark非常好地集成,用于实时流式数据分析。

以下是Kafka的几个好处

  • 可靠性 - Kafka是分布式,分区,复制和容错的。
  • 可扩展性 - Kafka消息传递系统轻松缩放,无需停机。
  • 耐用性 - Kafka使用"分布式提交日志",这意味着消息会尽可能快地保留在磁盘上,因此它是持久的。
  • 性能 - Kafka对于发布和订阅消息都具有高吞吐量。 即使存储了许多TB的消息,它也保持稳定的性能。

Kafka非常快,并保证零停机和零数据丢失。

kafka高可用

Kafka 一个最基本的架构认识:由多个 broker 组成,每个 broker 是一个节点;你创建一个 topic,这个 topic 可以划分为多个 partition,每个 partition 可以存在于不同的 broker 上,每个 partition 就放一部分数据。

这就是天然的分布式消息队列,就是说一个 topic 的数据,是分散放在多个机器上的,每个机器就放一部分数据

Kafka 0.8 以后,提供了 HA 机制,就是 replica(复制品) 副本机制。每个 partition 的数据都会同步到其它机器上,形成自己的多个 replica 副本。所有 replica 会选举一个 leader 出来,那么生产和消费都跟这个 leader 打交道,然后其他 replica 就是 follower。写的时候,leader 会负责把数据同步到所有 follower 上去,读的时候就直接读 leader 上的数据即可。只能读写 leader?很简单,要是你可以随意读写每个 follower,那么就要 care 数据一致性的问题,系统复杂度太高,很容易出问题。Kafka 会均匀地将一个 partition 的所有 replica 分布在不同的机器上,这样才可以提高容错性。

可靠性

消费端

关闭自动提交offset,手动提交 offset,就可以保证数据不会丢。但是此时还是可能会有重复消费,需要保证幂等性。

kafka

一般是要求起码设置如下 4 个参数:

  • 给 topic 设置 replication.factor 参数:这个值必须大于 1,要求每个 partition 必须有至少 2 个副本。
  • 在 Kafka 服务端设置 min.insync.replicas 参数:这个值必须大于 1,这个是要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系,没掉队,这样才能确保 leader 挂了还有一个 follower 。
  • 在 producer 端设置 acks=all :这个是要求每条数据,必须是写入所有 replica 之后,才能认为是写成功了
  • 在 producer 端设置 retries=MAX (很大很大很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试,卡在这里了。

这样配置之后,至少在 Kafka broker 端就可以保证在 leader 所在 broker 发生故障,进行 leader 切换时,数据不会丢失。

生产者

如果按照上述的思路设置了 acks=all ,一定不会丢,要求是,你的 leader 接收到消息,所有的 follower 都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断的重试,重试无限次。

发布 - 订阅消息的工作流程

以下是 Pub-Sub 消息的逐步工作流程

  • 生产者定期向主题发送消息。
  • Kafka 代理存储为该特定主题配置的分区中的所有消息。 它确保消息在分区之间平等共享。 如果生产者发送两个消息并且有两个分区,Kafka 将在第一分区中存储一个消息,在第二分区中存储第二消息。
  • 消费者订阅特定主题。
  • 一旦消费者订阅主题,Kafka 将向消费者提供主题的当前偏移,并且还将偏移保存在 Zookeeper 系统中。
  • 消费者将定期请求 Kafka (如100 Ms)新消息。
  • 一旦 Kafka 收到来自生产者的消息,它将这些消息转发给消费者。
  • 消费者将收到消息并进行处理。
  • 一旦消息被处理,消费者将向 Kafka 代理发送确认。
  • 一旦 Kafka 收到确认,它将偏移更改为新值,并在 Zookeeper 中更新它。 由于偏移在 Zookeeper 中维护,消费者可以正确地读取下一封邮件,即使在服务器暴力期间。
  • 以上流程将重复,直到消费者停止请求。
  • 消费者可以随时回退/跳到所需的主题偏移量,并阅读所有后续消息。

队列消息/用户组的工作流

在队列消息传递系统而不是单个消费者中,具有相同组 ID 的一组消费者将订阅主题。 简单来说,订阅具有相同 Group ID 的主题的消费者被认为是单个组,并且消息在它们之间共享。 让我们检查这个系统的实际工作流程。

  • 生产者以固定间隔向某个主题发送消息。
  • Kafka存储在为该特定主题配置的分区中的所有消息,类似于前面的方案。
  • 单个消费者订阅特定主题,假设 Topic-01 为 Group ID 为 Group-1 。
  • Kafka 以与发布 - 订阅消息相同的方式与消费者交互,直到新消费者以相同的组 ID 订阅相同主题Topic-01。
  • 一旦新消费者到达,Kafka 将其操作切换到共享模式,并在两个消费者之间共享数据。 此共享将继续,直到用户数达到为该特定主题配置的分区数。
  • 一旦消费者的数量超过分区的数量,新消费者将不会接收任何进一步的消息,直到现有消费者取消订阅任何一个消费者。 出现这种情况是因为 Kafka 中的每个消费者将被分配至少一个分区,并且一旦所有分区被分配给现有消费者,新消费者将必须等待。
  • 此功能也称为使用者组。 同样,Kafka 将以非常简单和高效的方式提供两个系统中最好的。

安装

# 安装zookeeper
docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2  --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime wurstmeister/zookeeper

# 安装kafka
docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.x.x:2181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.x.x:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -v /etc/localtime:/etc/localtime wurstmeister/kafka

# 安装 kafka-manager
docker run -it -d -p 9000:9000 -e ZK_HOSTS="192.168.x.x:2181" -m 300m --memory-swap -1  --net=host sheepkiller/kafka-manager

使用

maven依赖

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>

生产者

@Bean
public NewTopic topic() {
    return TopicBuilder.name("topic1")
        .partitions(1)
        .replicas(1)
        .build();
}

@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
    return args -> {
        template.send("topic1", "test");
    };
}

消费者

@Bean
public NewTopic topic() {
    return TopicBuilder.name("topic1")
        .partitions(10)
        .replicas(1)
        .build();
}

@KafkaListener(id = "myId", topics = "topic1")
public void listen(String in) {
    System.out.println(in);
}