使用Python的kafka-python库来与Kafka进行通信
安装kafka-python库:
pip install kafka-python
编写pipeline
import json
from kafka import KafkaProducer
class KafkaPipeline:
def open_spider(self, spider):
self.producer = KafkaProducer(
bootstrap_servers=['localhost:9092'], # 替换为你的Kafka服务器地址
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
def close_spider(self, spider):
self.producer.close()
def process_item(self, item, spider):
self.producer.send('your_topic', dict(item)) # 替换为你的Kafka主题
return item
使用的时候注意将自己的kafka路径,以及topic配置好,同时注意需要将item对象先解成python的字典。