Featured image of post Java工程师 中间件 Kafka的使用

Java工程师 中间件 Kafka的使用

🌏Java工程师 Kafka的使用 🎯 这篇文章用于记录 Kafka的使用

windows系统操作Kafka

官网下载Kafka安装包

安装Kafka

(1)解压到图中目录

(2)配置zookeeper的数据存放地

(3)配置kafka的日志存放地

启动Kafka

(1)先在Kafka的安装目录下启动zookeeper

./bin/windows/zookeeper-server-start.bat ./config/zookeeper.properties

解决方法?将kafka直接解压缩在E盘启动

(2)将kafka直接解压缩在D盘 启动zookeeper

(2.1)重新安装

(2.2)重新配置目录

(2.3)启动zookeeper 这次启动成功

(3)启动Kafka

./bin/windows/kafka-server-start.bat ./config/server.properties

报错端口被占用

解决办法:将2181的进程kill掉(Linux)

windows如何kill?windows kill进程

netstat -ano

查询对应的服务 tasklist | findstr "12428"

kill这个服务 taskkill -PID 12428 -F

重新启动kafka:./bin/windows/kafka-server-start.bat ./config/server.properties

启动成功

命令行实操

创建主题topic

./bin/windows/kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

这行命令是用于Apache Kafka的一个命令行工具,具体是kafka-topics.bat(在Windows环境下)。这个命令的作用是创建一个新的Kafka主题(topic),并设置其相关参数。下面是对这个命令各部分的详细解释:

  1. ./bin/windows/kafka-topics.bat:这是Kafka为Windows提供的命令行工具的路径。通过这个脚本,用户可以执行与Kafka主题相关的各种操作。
  2. --create:这个参数指示我们要创建一个新的Kafka主题。
  3. --bootstrap-server localhost:9092:这个参数指定了Kafka集群的地址。在这里,localhost:9092表示Kafka集群正在本地机器上运行,并且 ** 9092端口。Kafka客户端(包括这个命令行工具)会使用这个地址来连接到Kafka集群。
  4. --replication-factor 1:这个参数设置了主题的副本因子(replication factor)。副本因子决定了主题的每个分区(partition)应该有多少个副本。在这个例子中,每个分区只有一个副本。通常,为了提高数据的可靠性和系统的容错性,会设置副本因子大于1。
  5. --partitions 1:这个参数设置了主题的分区数。分区是Kafka实现高吞吐量的关键之一,它允许并行读写操作。在这个例子中,主题只有一个分区。
  6. --topic test:这个参数指定了要创建的主题的名称,这里是“test”。

这行命令的作用是:在本地运行的Kafka集群上创建一个名为“test”的新主题,该主题有一个分区和一个副本。

在实际生产环境中,通常会设置更高的副本因子和更多的分区,以提高系统的可靠性和吞吐量。

列出所有的topic

./bin/windows/kafka-topics.bat --list --bootstrap-server localhost:9092

使用终端进行生产

./bin/windows/kafka-console-producer.bat --broker-list localhost:9092 --topic test

使用终端进行消费

./bin/windows/kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning #从头开始读取

上面生产,下面消费

java操作Kakfa

(1)依赖

<!-- 20230613 加上kafka的依赖 -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

(2)配置

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: test-consumer-group
      enable-auto-commit: true # 是否自动提交偏移量offset->下标索引
      auto-commit-interval: 3000 # 自动提交的频率

(3)定义生产者

@Component
public class EventProducer {
    @Autowired
    private KafkaTemplate kafkaTemplate;
    // 处理事件
    public void fireEvent(Event event){
        // 将事件发布到指定的主题
        kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));
        System.out.println("C EventProducer M fireEvent(Event event) -> 发出event = " + JSONObject.toJSONString(event));
    }
}

(4)定义消费者

@Component
public class EventConsumer {
    /**
     * 监听评论、点赞、关注三个主题
     */
    @KafkaListener(topics = {CommunityConstant.TOPIC_COMMENT,
        CommunityConstant.TOPIC_LIKE, CommunityConstant.TOPIC_FOLLOW})
    public void handleCommentMessage(ConsumerRecord record) {
        if (record == null || record.value() == null) {
            logger.error("消息的内容为空!");
            return;
        }
        // 转换成消息对象进行处理
        Event event = JSONObject.parseObject(record.value().toString(), Event.class);
        if (event == null) {
            logger.error("消息格式错误!");
            return;
        }
        // 处理event
        ...event
    }
}

kafka常见术语

  1. Broker:这是Kafka集群中的一个或多个节点。Kafka的集群架构可以看作是由多个Broker组成,它们一起处理生产者发布的消息,并且向消费者提供服务。每个Broker在Kafka中都有一个唯一的ID。
  2. Topic:这是一个特定的消息类别,类似于消息队列中的一个队列或者一个数据库的表名。Kafka的每个Topic都分为多个Partition,生产者将数据发布到特定的Topic中,而消费者从Topic中消费数据。物理上,不同Topic的消息是分开存储的;但在逻辑上,一个Topic的消息可以被一个或多个Consumer Group同时消费。
  3. Producer:也被称为数据的发布者。该角色负责将数据发送到Kafka的Broker上,每条发送的数据都必须有一个特定的Topic,Broker在接收到Producer发送的消息后,会将数据追加到特定Partition的末尾。生产者可以指定数据发送到哪个Partition。
  4. Consumer:从Broker中读取数据的角色,消费者可以从一个或多个Topic中读取数据。Consumer读取数据时,需要指定特定的Consumer Group。一个Topic的消息可以被一个或多个Consumer Group消费,但是Consumer Group中的Consumer需要竞争消费同一个Topic的数据。
  5. Partition:Partition是Kafka保证数据顺序性和一致性的单位,同时也是Kafka实现分布式、高并发的手段。Kafka中一个Topic被分为多个Partition,每个Partition只能被一个Consumer Group中的一个Consumer消费。一个Consumer Group可以同时消费多个Partition。Partition的数据是以日志文件的形式保存在硬盘上。
  6. Offset:这是一个记录数据位置的指标,每消费一条消息,Offset的值就会增加。消费者可以指定Offset的位置开始消费,这意味着消费者可以控制从哪个位置开始读取消息。Kafka通过这种方式保证消息的有序性和一致性。
  7. Replica(副本):Kafka为了防止数据丢失和故障转移,会为每个Partition在多个Broker上创建Replica(副本),这些Replica中只有一个是Leader,其他的是Follower。当Leader发生故障时,会选出一个新的Leader来处理数据读写操作。
  8. ZooKeeper:ZooKeeper是Kafka中用于管理集群状态、配置信息的组件。在Kafka集群中,每个Broker启动时都会在ZooKeeper上注册自己的信息,并通过ZooKeeper来获取其他Broker的信息,以便形成一个集群。此外,Kafka也会使用ZooKeeper来记录消费者的偏移量等元数据信息。

假设有一个电商网站,用户在下单时会产生订单数据。这些订单数据需要被实时处理,以便进行后续的分析和统计。

  • Broker:电商网站搭建了一个Kafka集群,其中包含多个Broker节点,用于处理订单数据的读写请求。
  • Topic:电商网站定义了一个名为“orders”的Topic,用于存储订单数据。
  • Producer:电商网站的后端系统作为Producer,将用户下单时产生的订单数据发送到“orders”Topic中。
  • Consumer:电商网站的数据分析系统作为Consumer,订阅了“orders”Topic,实时拉取订单数据进行处理。
  • Partition:“orders”Topic被分为多个Partition,每个Partition存储一部分订单数据,以实现负载均衡和并行处理。
  • Offset:每条订单数据在Partition中都有一个唯一的Offset,Consumer通过追踪Offset来确保能够按顺序处理每条订单数据,并且不会重复处理或遗漏处理。
  • Replica:为了保证数据的可靠性和容错性,每个Partition都有多个Replica,其中一个被选举为Leader,负责处理读写请求,其他为Follower,用于备份数据。如果Leader出现故障,ZooKeeper会协调选举一个新的Leader。
  • ZooKeeper:Kafka集群中的ZooKeeper服务负责维护集群的状态信息,确保Broker之间的协调一致,以及Partition的Leader选举等操作的正确性。
Licensed under CC BY-NC-SA 4.0
最后更新于 2023年3月1日