Для выполнения этого урока у вас должна быть активная установка Kafka на вашем компьютере. Прочтите Установка Apache Kafka в Ubuntu, чтобы узнать, как это сделать.
Установка клиента Python для Apache Kafka
Прежде чем мы сможем начать работать с Apache Kafka в программе Python, нам необходимо установить клиент Python для Apache Kafka. Это можно сделать с помощью пип (Индекс пакета Python). Вот команда для этого:
pip3 установить кафка-питонЭто будет быстрая установка на терминал:
Установка клиента Python Kafka с использованием PIP
Теперь, когда у нас есть активная установка для Apache Kafka, и мы также установили клиент Python Kafka, мы готовы начать кодирование.
Создание продюсера
Первое, что необходимо для публикации сообщений в Kafka, - это приложение-производитель, которое может отправлять сообщения в темы в Kafka.
Обратите внимание, что производители Kafka являются производителями асинхронных сообщений. Это означает, что операции, выполняемые во время публикации сообщения в разделе Kafka Topic, не являются блокирующими. Для простоты мы напишем для этого урока простой издатель JSON.
Для начала создадим экземпляр для Kafka Producer:
из кафки импорт КафкаПродюсеримпортировать json
импортный отпечаток
продюсер = KafkaProducer (
bootstrap_servers = 'локальный: 9092',
value_serializer = лямбда v: json.свалки (v).кодировать ('utf-8'))
Атрибут bootstrap_servers сообщает о хосте и порте для сервера Kafka. Атрибут value_serializer предназначен только для сериализации JSON обнаруженных значений JSON.
Чтобы поиграть с Kafka Producer, давайте попробуем распечатать метрики, относящиеся к кластеру Producer и Kafka:
метрики = производитель.метрики ()pprint.pprint (метрики)
Сейчас мы увидим следующее:
Кафка Мтерикс
Теперь давайте, наконец, попробуем отправить какое-нибудь сообщение в очередь Kafka. Хорошим примером будет простой объект JSON:
режиссер.send ('linuxhint', 'тема': 'кафка')В linuxhint это раздел темы, на который будет отправлен объект JSON. Когда вы запустите сценарий, вы не получите никакого вывода, так как сообщение просто отправляется в раздел темы. Пришло время написать потребителя, чтобы мы могли протестировать наше приложение.
Создание потребителя
Теперь мы готовы установить новое соединение в качестве потребительского приложения и получать сообщения из темы Kafka. Начнем с создания нового экземпляра для Потребителя:
от kafka import KafkaConsumerиз kafka import TopicPartition
print ('Подключение.')
потребитель = KafkaConsumer (bootstrap_servers = 'localhost: 9092')
Теперь назначьте тему для этого соединения и возможное значение смещения.
print ('Назначение темы.')потребитель.назначить ([TopicPartition ('linuxhint', 2)])
Наконец, мы готовы напечатать сообщение:
print ('Получение сообщения.')для сообщения в потребителе:
print ("СМЕЩЕНИЕ:" + str (сообщение [0]) + "\ t MSG:" + str (сообщение))
Благодаря этому мы получим список всех опубликованных сообщений в разделе потребительских тем Kafka. Результатом этой программы будет:
Kafka Consumer
Вот полный сценарий продюсера для краткой справки:
из кафки импорт КафкаПродюсеримпортировать json
импортный отпечаток
продюсер = KafkaProducer (
bootstrap_servers = 'локальный: 9092',
value_serializer = лямбда v: json.свалки (v).кодировать ('utf-8'))
режиссер.send ('linuxhint', 'тема': 'кафка')
# метрики = производитель.метрики ()
# pprint.pprint (метрики)
А вот полная программа Consumer, которую мы использовали:
от kafka import KafkaConsumerиз kafka import TopicPartition
print ('Подключение.')
потребитель = KafkaConsumer (bootstrap_servers = 'localhost: 9092')
print ('Назначение темы.')
потребитель.назначить ([TopicPartition ('linuxhint', 2)])
print ('Получение сообщения.')
для сообщения в потребителе:
print ("СМЕЩЕНИЕ:" + str (сообщение [0]) + "\ t MSG:" + str (сообщение))
Заключение
В этом уроке мы рассмотрели, как установить и начать использовать Apache Kafka в наших программах Python. Мы показали, насколько легко выполнять простые задачи, связанные с Kafka на Python, с продемонстрированным клиентом Kafka для Python.