Апач Кафка

Как читать данные из Kafka с помощью Python

Как читать данные из Kafka с помощью Python
Kafka - это распределенная система обмена сообщениями с открытым исходным кодом для отправки сообщений по разделам и различным темам. Потоковая передача данных в реальном времени может быть реализована с помощью Kafka для приема данных между приложениями. Он состоит из трех основных частей. Это производитель, потребитель и темы. Производитель используется для отправки сообщения в определенную тему, и к каждому сообщению прилагается ключ. Потребитель используется для чтения сообщения по определенной теме из набора разделов. Данные, полученные от производителя и хранящиеся в разделах по определенной теме. В python существует множество библиотек для создания производителя и потребителя для создания системы обмена сообщениями с использованием Kafka. В этом руководстве показано, как данные из Kafka могут быть прочитаны с помощью Python.

Предварительное условие

Вам необходимо установить необходимую библиотеку Python для чтения данных из Kafka. Python3 используется в этом руководстве для написания сценария потребителя и производителя. Если пакет pip не был установлен ранее в вашей операционной системе Linux, вам необходимо установить pip перед установкой библиотеки Kafka для python. python3-kafka используется в этом руководстве для чтения данных из Kafka. Выполните следующую команду, чтобы установить библиотеку.

$ pip установить python3-kafka

Чтение простых текстовых данных из Kafka

Различные типы данных могут быть отправлены от производителя по определенной теме, которые могут быть прочитаны потребителем. В этой части этого руководства показано, как простые текстовые данные могут быть отправлены и получены от Kafka с использованием производителя и потребителя.

Создайте файл с именем продюсер1.ру со следующим скриптом Python. КафкаПродюсер модуль импортируется из библиотеки Kafka. Список брокеров необходимо определить во время инициализации объекта-производителя для подключения к серверу Kafka. Порт Kafka по умолчанию - '9092'. Аргумент bootstrap_servers используется для определения имени хоста с портом. 'First_Topic'устанавливается как название темы, по которой будет отправлено текстовое сообщение от производителя. Затем простое текстовое сообщение "Привет от Кафки'отправляется с использованием Отправить() метод КафкаПродюсер к теме, 'First_Topic'.

продюсер1.ру:

# Импортировать KafkaProducer из библиотеки Kafka
из кафки импорт КафкаПродюсер
# Определить сервер с портом
bootstrap_servers = ['локальный: 9092']
# Определите название темы, в которой будет опубликовано сообщение
topicName = 'First_Topic'
# Инициализировать переменную производителя
производитель = KafkaProducer (bootstrap_servers = bootstrap_servers)
# Опубликовать текст в определенной теме
режиссер.send (topicName, привет от кафки ... ')
# Распечатать сообщение
print ("Сообщение отправлено")

Создайте файл с именем потребитель1.ру со следующим скриптом Python. КафкаПотребитель модуль импортируется из библиотеки Kafka для чтения данных из Kafka. sys модуль используется здесь для завершения скрипта. То же имя хоста и номер порта производителя используются в скрипте потребителя для чтения данных из Kafka. Название темы потребителя и производителя должно совпадать с 'Первая_тема'.  Затем объект-потребитель инициализируется тремя аргументами. Название темы, идентификатор группы и информация о сервере. для цикл используется здесь для чтения текста, отправленного от производителя Kafka.

потребитель1.ру:

# Импортировать KafkaConsumer из библиотеки Kafka
из кафки импорт KafkaConsumer
# Импортировать модуль sys
import sys
# Определить сервер с портом
bootstrap_servers = ['локальный: 9092']
# Определить название темы, откуда будет приходить сообщение
topicName = 'First_Topic'
# Инициализировать потребительскую переменную
потребитель = KafkaConsumer (topicName, group_id = 'group1', bootstrap_servers =
bootstrap_servers)
# Прочитать и распечатать сообщение от потребителя
для сообщения в потребителе:
print ("Название темы =% s, Сообщение =% s"% (msg.тема, сообщение.значение))
# Завершить скрипт
sys.выход()

Выход:

Выполните следующую команду с одного терминала, чтобы выполнить сценарий производителя.

$ python3 продюсер1.ру

Следующий вывод появится после отправки сообщения.

Выполните следующую команду с другого терминала, чтобы выполнить сценарий потребителя.

$ python3 consumer1.ру

Вывод показывает название темы и текстовое сообщение, отправленное от производителя.

Чтение данных в формате JSON из Kafka

Данные в формате JSON могут быть отправлены производителем Kafka и прочитаны потребителем Kafka с помощью JSON модуль Python. Как данные JSON могут быть сериализованы и десериализованы перед отправкой и получением данных с помощью модуля python-kafka показано в этой части этого руководства.

Создайте скрипт Python с именем продюсер2.ру со следующим сценарием. Другой модуль с именем JSON импортируется с КафкаПродюсер модуль здесь. value_serializer аргумент используется с bootstrap_servers аргумент здесь для инициализации объекта производителя Kafka. Этот аргумент указывает, что данные JSON будут закодированы с использованием 'utf-8'набор символов во время отправки. Затем данные в формате JSON отправляются в тему с именем JSONtopic.

продюсер2.ру:

# Импортировать KafkaProducer из библиотеки Kafka
из кафки импорт КафкаПродюсер
# Импортировать модуль JSON для сериализации данных
импортировать json
# Инициализировать переменную производителя и установить параметр для кодирования JSON
производитель = KafkaProducer (bootstrap_servers =
['localhost: 9092'], value_serializer = lambda v: json.свалки (v).кодировать ('utf-8'))
# Отправлять данные в формате JSON
режиссер.send ('JSONtopic', 'name': 'fahmida', 'email': '[электронная почта защищена]')
 
# Распечатать сообщение
print ("Сообщение отправлено в JSONtopic")

Создайте скрипт Python с именем потребитель2.ру со следующим сценарием. КафкаПотребитель, sys и модули JSON импортируются в этот скрипт. КафкаПотребитель модуль используется для чтения данных в формате JSON из Kafka. Модуль JSON используется для декодирования закодированных данных JSON, отправленных от производителя Kafka. Sys модуль используется для завершения скрипта. value_deserializer аргумент используется с bootstrap_servers чтобы определить, как будут декодироваться данные JSON. Следующий, для цикл используется для печати всех записей потребителей и данных JSON, полученных из Kafka.

потребитель2.ру:

# Импортировать KafkaConsumer из библиотеки Kafka
от kafka import KafkaConsumer
# Импортировать модуль sys
import sys
# Импортировать модуль json для сериализации данных
импортировать json
# Инициализировать потребительскую переменную и установить свойство для декодирования JSON
потребитель = KafkaConsumer ('JSONtopic', bootstrap_servers = ['localhost: 9092'],
value_deserializer = лямбда m: json.нагрузки (м.декодировать ('utf-8')))
# Читать данные из кафки
для сообщения в потребителе:
print ("Потребительские записи: \ n")
печать (сообщение)
print ("\ nЧтение из данных JSON \ n")
print ("Имя:", сообщение [6] ['имя'])
print ("Электронная почта:", сообщение [6] ['электронная почта'])
# Завершить скрипт
sys.выход()

Выход:

Выполните следующую команду с одного терминала, чтобы выполнить сценарий производителя.

$ python3 продюсер2.ру

Скрипт напечатает следующее сообщение после отправки данных JSON.

Выполните следующую команду с другого терминала, чтобы выполнить сценарий потребителя.

$ python3 consumer2.ру

Следующий вывод появится после запуска скрипта.

Заключение:

Данные можно отправлять и получать в разных форматах из Kafka с помощью python. Данные также могут быть сохранены в базе данных и извлечены из базы данных с помощью Kafka и python. Я дома, это руководство поможет пользователю python начать работу с Kafka.

Порты коммерческих игровых движков с открытым исходным кодом
Бесплатные игры с открытым исходным кодом и кроссплатформенные версии игрового движка можно использовать для игры как в старые, так и в некоторые из с...
Лучшие игры с командной строкой для Linux
Командная строка - не только ваш главный союзник при использовании Linux - она ​​также может быть источником развлечений, потому что вы можете использ...
Лучшие приложения для сопоставления геймпадов для Linux
Если вам нравится играть в игры на Linux с помощью геймпада вместо стандартной системы ввода с клавиатуры и мыши, для вас есть несколько полезных прил...