Apache Kafka practical tutorial

#Apache Kafka practical tutorial


What is Apache Kafka?

In scenarios with millions of events per second such as e-commerce Double 11 and live broadcast barrages, Apache Kafka is absolutely indispensable traffic reservoir and event pipeline. It is an open source distributed event streaming platform. Its core advantages are high throughput (millions per second), low latency, persistent storage, horizontal scalability, and support for multiple languages.

Common application scenarios include: log aggregation, event sourcing, stream processing, data synchronization between systems, and asynchronous message queues.


1. Kafka quick installation (Docker Compose single node)

Docker Compose is the first choice for beginners to start Zookeeper (the metadata management that the old version of Kafka relies on), single-node Kafka, and visual monitoring Kafka Manager with one click:

# 保存为 docker-compose.yml
version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:latest
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 # 宿主机可访问地址
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 # 单节点副本数只能1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' # 新手可以自动创建

  kafka-manager:
    image: hlebalbau/kafka-manager:latest
    container_name: kafka-manager
    depends_on:
      - zookeeper
    ports:
      - "9000:9000"
    environment:
      ZK_HOSTS: zookeeper:2181

Start the service:

docker-compose up -d

verify:

  • Browser openshttp://localhost:9000Enter Kafka Manager and add a cluster (fill in the ZK addresszookeeper:2181
  • Subsequent command line operations can be done by installing the Kafka tool on the host machine or executing it in the container (docker exec -it kafka /bin/bash

2. Core concepts (1 minute to understand)

ConceptPopular explanation
ProducerThe person/system that sends the message (for example, the order system sends the "order creation" event)
ConsumerThe person/system that receives the message (for example, the inventory system receives "Order Creation" to reduce inventory)
TopicClassification tag of the message (for exampleorder-events
PartitionPhysical partitioning of Topic (similar to folder partitioning, improving read and write performance)
BrokerSingle Kafka server node
Consumer GroupA group composed of multiple Consumers to achieve load balancing (the same message will only be sent to one person in the group) and fault tolerance (if someone fails, someone else will make up for it)

3. Topic management and command line testing

3.1 Topic management (common commands)

# 创建一个有3个分区、单副本的Topic(auto_create可以关,手动创建更可控)
kafka-topics --create --topic order-events --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

# 查看所有Topic
kafka-topics --list --bootstrap-server localhost:9092

# 查看Topic详情(分区数、副本、Leader等)
kafka-topics --describe --topic order-events --bootstrap-server localhost:9092

# 增加分区(只能加不能减!)
kafka-topics --alter --topic order-events --partitions 6 --bootstrap-server localhost:9092

3.2 Command line production and consumption test

# 打开生产者终端(输入一行回车就是一条消息)
kafka-console-producer --bootstrap-server localhost:9092 --topic order-events

# 打开另一个消费者终端(--from-beginning 从最早的消息开始读)
kafka-console-consumer --bootstrap-server localhost:9092 --topic order-events --from-beginning

4. Python and Kafka integration (the most commonly usedkafka-python

4.1 Install dependencies

pip install kafka-python

4.2 Encapsulated producers and consumers (out of the box)

from kafka import KafkaProducer, KafkaConsumer
import json
import logging

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# ------------------- 生产者 -------------------
class SimpleKafkaProducer:
    def __init__(self, bootstrap_servers=["localhost:9092"]):
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode("utf-8"),  # 自动序列化JSON
            acks="1",  # Leader确认即可,平衡性能和可靠性
            retries=3,  # 失败重试3次
            linger_ms=5,  # 延迟5ms凑批次,提高吞吐量
        )

    def send(self, topic: str, value: dict, key: str = None):
        """发送单条消息,key可以控制消息发到固定分区(比如同一用户的事件发到同一区)"""
        try:
            future = self.producer.send(
                topic, 
                value=value, 
                key=key.encode("utf-8") if key else None
            )
            # 等待发送完成(可选,生产环境可以异步)
            record_metadata = future.get(timeout=10)
            logger.info(
                f"消息发送成功:Topic={record_metadata.topic},分区={record_metadata.partition},偏移量={record_metadata.offset}"
            )
        except Exception as e:
            logger.error(f"消息发送失败:{e}")
            raise

    def close(self):
        self.producer.flush()  # 刷新缓冲区
        self.producer.close()

# ------------------- 消费者 -------------------
class SimpleKafkaConsumer:
    def __init__(self, topics: list, group_id: str, bootstrap_servers=["localhost:9092"]):
        self.consumer = KafkaConsumer(
            *topics,
            bootstrap_servers=bootstrap_servers,
            group_id=group_id,
            value_deserializer=lambda m: json.loads(m.decode("utf-8")),  # 自动反序列化JSON
            auto_offset_reset="earliest",  # 新组从最早的消息读
            enable_auto_commit=True,  # 自动提交偏移量(生产环境建议手动)
            auto_commit_interval_ms=1000,
        )

    def consume(self, callback):
        """循环消费,传入回调函数处理消息"""
        logger.info(f"开始消费:Topics={self.consumer.subscription()},Group={self.consumer.config['group_id']}")
        try:
            for msg in self.consumer:
                callback(msg.value)
        except KeyboardInterrupt:
            logger.info("消费已停止")
        finally:
            self.consumer.close()

# ------------------- 使用示例 -------------------
if __name__ == "__main__":
    # 1. 发送订单消息
    producer = SimpleKafkaProducer()
    producer.send(
        topic="order-events",
        value={"order_id": 1001, "user_id": 123, "product_id": 456, "amount": 99.9},
        key=str(123),  # 同一用户的订单发到同一分区
    )
    producer.close()

    # 2. 消费订单消息(单独开一个终端运行)
    # def process_order(msg):
    #     logger.info(f"收到订单:{msg}")
    # consumer = SimpleKafkaConsumer(topics=["order-events"], group_id="order-consumer-1")
    # consumer.consume(process_order)

5. Practical best practices (simplified but core)

5.1 Partition strategy

  • **How ​​to determine the number of partitions? **General advice:分区数 ≈ 目标吞吐量 / 单分区吞吐量(A single partition can usually reach 100,000-1 million/second, depending on the message size and hardware)
  • **How ​​to control message partitioning? ** Specifykey(The same key is sent to the same partition, ensuring order), or not specified (polling)

5.2 Reliability Tuning

Scenarioacks valueDescription
Log aggregation (allows a small amount to be lost)0Does not wait for any confirmation, fastest
Order, payment (must not be lost)all (or -1)Wait for all ISR (synchronized replicas) to confirm
General business (balance)1 (default)Waiting for Leader confirmation

5.3 Performance Tuning

  • Producer: Onlinger_ms(collect batches),batch_size(batch size, default 16KB),compression_type(Compressed with snappy/lz4 to reduce network transmission)
  • Consumer: Onmax_poll_records(Batch pull, default 500),fetch_min_bytes(Wait at least how many bytes before returning, default 1B)

Summarize

Apache Kafka is the preferred tool for building real-time event systems. Get started quickly through Docker Compose, master core concepts, Python integration, and best practices, and you can handle most business scenarios. If a production-level cluster is required, it is recommended to expand to 3 nodes (to ensure replica fault tolerance) and configure monitoring alarms.