Python Kafka
kafka是一个分布式消息队列。具有高性能、持久化、多副本备份、横向扩展能力。生产者往队列里写消息,消费者从队列里取消息进行业务逻辑。一般在架构设计中起到解耦、削峰、异步处理的作用。
Confluent kafka client
Reference
https://anaconda.org/conda-forge/python-confluent-kafka
https://docs.confluent.io/current/clients/confluent-kafka-python/
https://github.com/confluentinc/confluent-kafka-python
https://kafka.apache.org/quickstart
Envs
- anaconda 4.5.11
- python 3.6
- python-confluent-kafka 0.11.4
Prepare
建立新的python3环境
conda create --name confluent-kafka-env python=3.6
conda activate confluent-kafka-env
通过
conda
安装 confluent kafkaconda install python-confluent-kafka
下载并启动kafka server
wget http://mirrors.hust.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz && tar -xzf kafka_2.11-2.1.0.tgz
cd kafka_2.11-2.1.0
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
Test
https://github.com/Murugar/ConfluentPythonKafka.git
Producer.py
1 | from time import gmtime, strftime, sleep |
Consumer.py
1 | from confluent_kafka import Consumer, KafkaError |
运行python
python Producer.py
python Consumer.py
consumer 可以接收到来自 producer 的msg
Python Kafka