Python Kafka

kafka是一个分布式消息队列。具有高性能、持久化、多副本备份、横向扩展能力。生产者往队列里写消息,消费者从队列里取消息进行业务逻辑。一般在架构设计中起到解耦、削峰、异步处理的作用。

Confluent kafka client

Reference

https://anaconda.org/conda-forge/python-confluent-kafka

https://docs.confluent.io/current/clients/confluent-kafka-python/

https://github.com/confluentinc/confluent-kafka-python

https://kafka.apache.org/quickstart

Envs

  • anaconda 4.5.11
  • python 3.6
  • python-confluent-kafka 0.11.4

Prepare

  1. 建立新的python3环境

    conda create --name confluent-kafka-env python=3.6

    conda activate confluent-kafka-env

  2. 通过conda安装 confluent kafka

    conda install python-confluent-kafka

  3. 下载并启动kafka server

    wget http://mirrors.hust.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz && tar -xzf kafka_2.11-2.1.0.tgz

    cd kafka_2.11-2.1.0

    bin/zookeeper-server-start.sh config/zookeeper.properties

    bin/kafka-server-start.sh config/server.properties

Test

https://github.com/Murugar/ConfluentPythonKafka.git

Producer.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from time import gmtime, strftime, sleep
from confluent_kafka import Producer

p = Producer({'bootstrap.servers': 'localhost:9092'})


while True:
now = strftime("%Y-%m-%d %H:%M:%S", gmtime())
p.produce('test_topic', now.encode('utf-8'))
p.flush()
sleep(1)

#p.produce('mytopic', 'Test Message')
#p.flush()

Consumer.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from confluent_kafka import Consumer, KafkaError

c = Consumer({'bootstrap.servers': 'localhost:9092', 'group.id': 'mygroup',
'default.topic.config': {'auto.offset.reset': 'smallest'}})
c.subscribe(['test_topic'])
running = True
while running:
msg = c.poll()
if not msg.error():
print('Received message: %s' % msg.value().decode('utf-8'))
elif msg.error().code() != KafkaError._PARTITION_EOF:
print(msg.error())
running = False
c.close()

运行python

python Producer.py

python Consumer.py

consumer 可以接收到来自 producer 的msg

作者

Junxin Gao

发布于

2019-01-04

更新于

2019-01-04

许可协议