Апач Кафка

Начало работы с Apache Kafka и Python

Начало работы с Apache Kafka и Python
В этом уроке мы увидим, как мы можем использовать Apache Kafka с Python и создать образец приложения с использованием клиента Python для Apache Kafka.

Для выполнения этого урока у вас должна быть активная установка Kafka на вашем компьютере. Прочтите Установка Apache Kafka в Ubuntu, чтобы узнать, как это сделать.

Установка клиента Python для Apache Kafka

Прежде чем мы сможем начать работать с Apache Kafka в программе Python, нам необходимо установить клиент Python для Apache Kafka. Это можно сделать с помощью пип (Индекс пакета Python). Вот команда для этого:

pip3 установить кафка-питон

Это будет быстрая установка на терминал:

Установка клиента Python Kafka с использованием PIP

Теперь, когда у нас есть активная установка для Apache Kafka, и мы также установили клиент Python Kafka, мы готовы начать кодирование.

Создание продюсера

Первое, что необходимо для публикации сообщений в Kafka, - это приложение-производитель, которое может отправлять сообщения в темы в Kafka.

Обратите внимание, что производители Kafka являются производителями асинхронных сообщений. Это означает, что операции, выполняемые во время публикации сообщения в разделе Kafka Topic, не являются блокирующими. Для простоты мы напишем для этого урока простой издатель JSON.

Для начала создадим экземпляр для Kafka Producer:

из кафки импорт КафкаПродюсер
импортировать json
импортный отпечаток
продюсер = KafkaProducer (
bootstrap_servers = 'локальный: 9092',
value_serializer = лямбда v: json.свалки (v).кодировать ('utf-8'))

Атрибут bootstrap_servers сообщает о хосте и порте для сервера Kafka. Атрибут value_serializer предназначен только для сериализации JSON обнаруженных значений JSON.

Чтобы поиграть с Kafka Producer, давайте попробуем распечатать метрики, относящиеся к кластеру Producer и Kafka:

метрики = производитель.метрики ()
pprint.pprint (метрики)

Сейчас мы увидим следующее:

Кафка Мтерикс

Теперь давайте, наконец, попробуем отправить какое-нибудь сообщение в очередь Kafka. Хорошим примером будет простой объект JSON:

режиссер.send ('linuxhint', 'тема': 'кафка')

В linuxhint это раздел темы, на который будет отправлен объект JSON. Когда вы запустите сценарий, вы не получите никакого вывода, так как сообщение просто отправляется в раздел темы. Пришло время написать потребителя, чтобы мы могли протестировать наше приложение.

Создание потребителя

Теперь мы готовы установить новое соединение в качестве потребительского приложения и получать сообщения из темы Kafka. Начнем с создания нового экземпляра для Потребителя:

от kafka import KafkaConsumer
из kafka import TopicPartition
print ('Подключение.')
потребитель = KafkaConsumer (bootstrap_servers = 'localhost: 9092')

Теперь назначьте тему для этого соединения и возможное значение смещения.

print ('Назначение темы.')
потребитель.назначить ([TopicPartition ('linuxhint', 2)])

Наконец, мы готовы напечатать сообщение:

print ('Получение сообщения.')
для сообщения в потребителе:
print ("СМЕЩЕНИЕ:" + str (сообщение [0]) + "\ t MSG:" + str (сообщение))

Благодаря этому мы получим список всех опубликованных сообщений в разделе потребительских тем Kafka. Результатом этой программы будет:

Kafka Consumer

Вот полный сценарий продюсера для краткой справки:

из кафки импорт КафкаПродюсер
импортировать json
импортный отпечаток
продюсер = KafkaProducer (
bootstrap_servers = 'локальный: 9092',
value_serializer = лямбда v: json.свалки (v).кодировать ('utf-8'))
режиссер.send ('linuxhint', 'тема': 'кафка')
# метрики = производитель.метрики ()
# pprint.pprint (метрики)

А вот полная программа Consumer, которую мы использовали:

от kafka import KafkaConsumer
из kafka import TopicPartition
print ('Подключение.')
потребитель = KafkaConsumer (bootstrap_servers = 'localhost: 9092')
print ('Назначение темы.')
потребитель.назначить ([TopicPartition ('linuxhint', 2)])
print ('Получение сообщения.')
для сообщения в потребителе:
print ("СМЕЩЕНИЕ:" + str (сообщение [0]) + "\ t MSG:" + str (сообщение))

Заключение

В этом уроке мы рассмотрели, как установить и начать использовать Apache Kafka в наших программах Python. Мы показали, насколько легко выполнять простые задачи, связанные с Kafka на Python, с продемонстрированным клиентом Kafka для Python.

Shadow of the Tomb Raider для Linux Учебное пособие
Shadow of the Tomb Raider - двенадцатое дополнение к серии Tomb Raider - франшизе приключенческой игры, созданной Eidos Montreal. Игра была очень хоро...
Как увеличить FPS в Linux?
FPS означает Кадров в секунду. Задача FPS - измерить частоту кадров при воспроизведении видео или во время игры. Проще говоря, количество непрерывных ...
Лучшие игры Oculus App Lab
Если вы владелец гарнитуры Oculus, вы должны знать о загрузке неопубликованных приложений. Боковая загрузка - это процесс установки на гарнитуру содер...