Предварительное условие
Вам необходимо установить необходимую библиотеку Python для чтения данных из Kafka. Python3 используется в этом руководстве для написания сценария потребителя и производителя. Если пакет pip не был установлен ранее в вашей операционной системе Linux, вам необходимо установить pip перед установкой библиотеки Kafka для python. python3-kafka используется в этом руководстве для чтения данных из Kafka. Выполните следующую команду, чтобы установить библиотеку.
$ pip установить python3-kafkaЧтение простых текстовых данных из Kafka
Различные типы данных могут быть отправлены от производителя по определенной теме, которые могут быть прочитаны потребителем. В этой части этого руководства показано, как простые текстовые данные могут быть отправлены и получены от Kafka с использованием производителя и потребителя.
Создайте файл с именем продюсер1.ру со следующим скриптом Python. КафкаПродюсер модуль импортируется из библиотеки Kafka. Список брокеров необходимо определить во время инициализации объекта-производителя для подключения к серверу Kafka. Порт Kafka по умолчанию - '9092'. Аргумент bootstrap_servers используется для определения имени хоста с портом. 'First_Topic'устанавливается как название темы, по которой будет отправлено текстовое сообщение от производителя. Затем простое текстовое сообщение "Привет от Кафки'отправляется с использованием Отправить() метод КафкаПродюсер к теме, 'First_Topic'.
продюсер1.ру:
# Импортировать KafkaProducer из библиотеки Kafkaиз кафки импорт КафкаПродюсер
# Определить сервер с портом
bootstrap_servers = ['локальный: 9092']
# Определите название темы, в которой будет опубликовано сообщение
topicName = 'First_Topic'
# Инициализировать переменную производителя
производитель = KafkaProducer (bootstrap_servers = bootstrap_servers)
# Опубликовать текст в определенной теме
режиссер.send (topicName, привет от кафки ... ')
# Распечатать сообщение
print ("Сообщение отправлено")
Создайте файл с именем потребитель1.ру со следующим скриптом Python. КафкаПотребитель модуль импортируется из библиотеки Kafka для чтения данных из Kafka. sys модуль используется здесь для завершения скрипта. То же имя хоста и номер порта производителя используются в скрипте потребителя для чтения данных из Kafka. Название темы потребителя и производителя должно совпадать с 'Первая_тема'. Затем объект-потребитель инициализируется тремя аргументами. Название темы, идентификатор группы и информация о сервере. для цикл используется здесь для чтения текста, отправленного от производителя Kafka.
потребитель1.ру:
# Импортировать KafkaConsumer из библиотеки Kafkaиз кафки импорт KafkaConsumer
# Импортировать модуль sys
import sys
# Определить сервер с портом
bootstrap_servers = ['локальный: 9092']
# Определить название темы, откуда будет приходить сообщение
topicName = 'First_Topic'
# Инициализировать потребительскую переменную
потребитель = KafkaConsumer (topicName, group_id = 'group1', bootstrap_servers =
bootstrap_servers)
# Прочитать и распечатать сообщение от потребителя
для сообщения в потребителе:
print ("Название темы =% s, Сообщение =% s"% (msg.тема, сообщение.значение))
# Завершить скрипт
sys.выход()
Выход:
Выполните следующую команду с одного терминала, чтобы выполнить сценарий производителя.
$ python3 продюсер1.руСледующий вывод появится после отправки сообщения.
Выполните следующую команду с другого терминала, чтобы выполнить сценарий потребителя.
$ python3 consumer1.руВывод показывает название темы и текстовое сообщение, отправленное от производителя.
Чтение данных в формате JSON из Kafka
Данные в формате JSON могут быть отправлены производителем Kafka и прочитаны потребителем Kafka с помощью JSON модуль Python. Как данные JSON могут быть сериализованы и десериализованы перед отправкой и получением данных с помощью модуля python-kafka показано в этой части этого руководства.
Создайте скрипт Python с именем продюсер2.ру со следующим сценарием. Другой модуль с именем JSON импортируется с КафкаПродюсер модуль здесь. value_serializer аргумент используется с bootstrap_servers аргумент здесь для инициализации объекта производителя Kafka. Этот аргумент указывает, что данные JSON будут закодированы с использованием 'utf-8'набор символов во время отправки. Затем данные в формате JSON отправляются в тему с именем JSONtopic.
продюсер2.ру:
# Импортировать KafkaProducer из библиотеки Kafkaиз кафки импорт КафкаПродюсер
# Импортировать модуль JSON для сериализации данных
импортировать json
# Инициализировать переменную производителя и установить параметр для кодирования JSON
производитель = KafkaProducer (bootstrap_servers =
['localhost: 9092'], value_serializer = lambda v: json.свалки (v).кодировать ('utf-8'))
# Отправлять данные в формате JSON
режиссер.send ('JSONtopic', 'name': 'fahmida', 'email': '[электронная почта защищена]')
# Распечатать сообщение
print ("Сообщение отправлено в JSONtopic")
Создайте скрипт Python с именем потребитель2.ру со следующим сценарием. КафкаПотребитель, sys и модули JSON импортируются в этот скрипт. КафкаПотребитель модуль используется для чтения данных в формате JSON из Kafka. Модуль JSON используется для декодирования закодированных данных JSON, отправленных от производителя Kafka. Sys модуль используется для завершения скрипта. value_deserializer аргумент используется с bootstrap_servers чтобы определить, как будут декодироваться данные JSON. Следующий, для цикл используется для печати всех записей потребителей и данных JSON, полученных из Kafka.
потребитель2.ру:
# Импортировать KafkaConsumer из библиотеки Kafkaот kafka import KafkaConsumer
# Импортировать модуль sys
import sys
# Импортировать модуль json для сериализации данных
импортировать json
# Инициализировать потребительскую переменную и установить свойство для декодирования JSON
потребитель = KafkaConsumer ('JSONtopic', bootstrap_servers = ['localhost: 9092'],
value_deserializer = лямбда m: json.нагрузки (м.декодировать ('utf-8')))
# Читать данные из кафки
для сообщения в потребителе:
print ("Потребительские записи: \ n")
печать (сообщение)
print ("\ nЧтение из данных JSON \ n")
print ("Имя:", сообщение [6] ['имя'])
print ("Электронная почта:", сообщение [6] ['электронная почта'])
# Завершить скрипт
sys.выход()
Выход:
Выполните следующую команду с одного терминала, чтобы выполнить сценарий производителя.
$ python3 продюсер2.руСкрипт напечатает следующее сообщение после отправки данных JSON.
Выполните следующую команду с другого терминала, чтобы выполнить сценарий потребителя.
$ python3 consumer2.руСледующий вывод появится после запуска скрипта.
Заключение:
Данные можно отправлять и получать в разных форматах из Kafka с помощью python. Данные также могут быть сохранены в базе данных и извлечены из базы данных с помощью Kafka и python. Я дома, это руководство поможет пользователю python начать работу с Kafka.