Многомодульный проект для демо по Кафке
Технологии: Java, Spring Boot, Kafka
-
Скачать проект с гитхаба https://github.com/<>
-
Обновить gradle зависимости
-
Добавить в конфигурацию идеи environment variables DB_PORT=;DB_HOST=localhost;DB_USER=postgres
-
Установить докер(десктоп версию под Win or Mac)
-
запустить локальный файл компоуз для поднятия бд либо через idea либо в терминале
docker-compose -f docker-compose-env-only.yml up -
запустить Applications
-
проверить состояние контейнеров : логи
#Тестирование эндпоинтов: Для проверки основного контроллера сервиса - необходимо выполнить запрос : http://localhost:8080/
- Выполнить в корне
docker-compose -f docker-compose-env-only.yml up#Cостав
-
order-service. 1.1 эндпоинт, на который приходит POST-запрос с сущностью Order. Сущность Order состоит из двух полей: String product и Integer quantity. Эндпоинт принимает сущность и отправляет в Kafka событие OrderEvent (которое также состоит из полей product и quantity). 2.2 Событие отправляется в топик order-topic 2.3 KafkaListener, который будет слушать события по топику order-status-topic
-
order-status-service — 2.1 KafkaListener, который слушает топик order-topic. Когда в слушатель приходит событие, происходит отправка другого события в топик order-status-service 2.2 Kafka Producer в топик order-status-service. Это событие состоит из полей String status и Instat date. В поле status записывается произвольная строка (например, CREATED или PROCESS), в поле date — текущая дата
-
common - подключаемый модуль несамостоятельный. Общий пакет для общей бизнес - сущности
-
test-config
#Дополнительные материалы ТЕОРИЯ В Kafka group.id обязателен для использования, когда включено управление группами потребителей. Вот почему это необходимо:
Группы потребителей: Kafka использует концепцию групп потребителей для разделения нагрузки между разными экземплярами потребителей. Каждый потребитель, входящий в одну и ту же группу, получает уникальные сообщения, чтобы разные экземпляры не обрабатывали одно и то же сообщение.
Управление смещением (offset): Kafka автоматически сохраняет смещения (offsets) для каждой группы потребителей, чтобы при перезапуске потребитель продолжил читать сообщения с правильного места. Без group.id Kafka не знает, какому набору потребителей сохранять смещения.
Обработка сообщений: Если не указать group.id, Kafka не сможет определить, к какой группе относится ваш потребитель, и это приведет к ошибке, поскольку Kafka ожидает идентификатор для управления распределением сообщений. spring: kafka: consumer: enable-auto-commit: false Когда вы решаете самостоятельно управлять смещениями (offsets) в Kafka, вам нужно отключить автоматическое сохранение смещений, а затем вручную сохранять их после обработки сообщений. Это позволяет вам более точно контролировать момент, когда Kafka считает сообщение обработанным. Вот как это работает: в вашем компоненте, где вы используете @KafkaListener, нужно немного изменить логику обработки сообщений, добавив явное подтверждение смещений с помощью интерфейса Acknowledgment. После обработки каждого сообщения нужно самостоятельно сохранять смещения. Это делается с помощью метода acknowledgment.acknowledge()
Когда вы вызываете метод acknowledge() для ручного управления смещениями в Kafka, вы фактически сообщаете брокеру Kafka, что сообщение было успешно обработано, и Kafka может зафиксировать смещение что именно происходит:
Что фиксируется: Смещение (offset): Это числовой идентификатор, представляющий позицию сообщения внутри партиции топика. Например, если текущий смещение равно 10, то это означает, что вы успешно обработали сообщение с порядковым номером 10 в конкретной партиции. Сохранение смещения: При вызове acknowledge() Kafka фиксирует, что для данного consumer group (группы потребителей) и конкретной партиции следующее сообщение, которое нужно обработать, начнется с очередного смещения (в нашем примере, с 11).
Реальный процесс: В Kafka данные внутри топика разделяются на несколько партиций (partitions). Каждая партиция — это независимый логический сегмент данных, который хранит определенную часть сообщений топика. Когда вы отправляете сообщения в топик, они распределяются между партициями, и каждая партиция имеет своё собственное последовательное смещение (offset). Когда вы вызываете acknowledge(), Kafka обновляет запись в брокере, указывая, что текущее смещение для этой группы потребителей можно считать окончательно обработанным. Таким образом, Kafka гарантирует, что при перезапуске потребителя или при его отказе следующие сообщения начнут обрабатываться с правильного смещения. Хранение смещений: Смещения хранятся на стороне брокера в специальном внутреннем топике Kafka — __consumer_offsets. Этот топик используется для записи текущих смещений каждой группы потребителей по каждой партиции.
Если вы получили сообщение с смещением 15 из партиции 0, и после обработки вызываете acknowledge(), Kafka зафиксирует, что ваша группа потребителей завершила обработку этого сообщения. Следующее сообщение, которое будет отправлено потребителю, будет иметь смещение 16.
"в конкретной партиции": Партиции как независимые потоки данных:
Сообщения внутри каждой партиции хранятся в упорядоченном виде, и каждое сообщение имеет уникальное смещение в пределах этой партиции. Например, если топик имеет 3 партиции, то смещение 10 в партиции 0 — это не то же самое сообщение, что и смещение 10 в партиции 1 или 2. Это означает, что каждая партиция ведет свое собственное счётное смещение сообщений. Потребители могут обрабатывать партиции независимо:
Когда группа потребителей читает данные из топика, каждый потребитель обычно обрабатывает одну или несколько партиций. Если ваш потребитель получает сообщение с партиции 0 с смещением 10, то это будет конкретное сообщение, которое относится только к партиции 0. Другие партиции в это время могут иметь совсем другие смещения (например, 5 или 20). Контроль за смещениями по каждой партиции:
Kafka отслеживает, какое смещение обработано для каждой партиции отдельно. Когда вы вызываете acknowledge() для сообщения с смещением 10 в партиции 0, это значит, что Kafka считает сообщения до 10-го включительно обработанными только в этой партиции. В других партициях может быть совершенно другой прогресс обработки.
Партиции (partitions) в Kafka создаются при создании топика или могут быть добавлены к уже существующему топику. Партиции помогают масштабировать Kafka, распределяя нагрузку между несколькими потребителями и увеличивая параллелизм обработки сообщений. Вот как они создаются:
- Создание топика с партициями (при создании топика) При создании топика вы можете указать количество партиций. Это можно сделать разными способами:
a) Через командную строку (CLI) Kafka предоставляет инструмент командной строки для управления топиками. При создании нового топика через CLI вы можете задать количество партиций с помощью параметра --partitions.
bash
Копировать код
kafka-topics.sh --create
--zookeeper localhost:2181
--replication-factor 1
--partitions 3
--topic my-topic
--partitions 3: В этом примере создается топик с тремя партициями.
--replication-factor 1: Фактор репликации указывает, сколько копий каждой партиции будет храниться на разных брокерах (это для отказоустойчивости).
b) Через Kafka Admin API
Вы также можете программно создавать топики с заданным количеством партиций с помощью Kafka Admin API в Java:
java Копировать код import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.NewTopic; import java.util.Collections; import java.util.Properties;
Properties config = new Properties(); config.put("bootstrap.servers", "localhost:9092");
try (AdminClient admin = AdminClient.create(config)) { NewTopic newTopic = new NewTopic("my-topic", 3, (short) 1); admin.createTopics(Collections.singletonList(newTopic)).all().get(); } new NewTopic("my-topic", 3, (short) 1): Здесь создается топик с именем my-topic, 3 партициями и фактором репликации 1. 2. Добавление партиций к существующему топику Если топик уже существует, вы можете добавить больше партиций. Однако важно помнить, что добавление партиций не перераспределяет старые сообщения — они остаются в тех партициях, в которые были записаны изначально. Это делается через командную строку:
bash
Копировать код
kafka-topics.sh --alter
--zookeeper localhost:2181
--topic my-topic
--partitions 5
Здесь топик my-topic будет изменен, и количество партиций увеличится до 5.
Автоматическое распределение сообщений по партициям Когда сообщения отправляются в топик с несколькими партициями, Kafka использует ключ сообщения для распределения их по партициям. Если ключ не указан, Kafka распределяет сообщения по партициям случайным образом. Это делает процесс масштабирования эффективным, так как нагрузка равномерно распределяется по потребителям.
Итог: Партиции создаются либо при создании топика (указанием количества партиций), либо добавляются позже через командную строку или API. Партиции помогают масштабировать производительность Kafka, распределяя сообщения между разными потребителями и брокерами.
В Kafka, закрепление консьюмера за партицией происходит через механизм "групповой координации". Вот как это работает:
Группы Консьюмеров: Консьюмеры организованы в группы. Каждая группа имеет уникальное имя, и все консьюмеры в одной группе работают вместе, чтобы обрабатывать сообщения из топика.
Разделение Партиций: Топик в Kafka может быть разделён на несколько партиций. Каждая партиция может быть обработана только одним консьюмером из группы в любой момент времени. Это позволяет достичь параллелизма и масштабируемости.
Балансировка Нагрузки: Когда консьюмеры вступают в группу, Kafka использует механизм балансировки нагрузки для распределения партиций между консьюмерами. Если в группе несколько консьюмеров, каждая партиция будет обработана одним из них, и никакая партиция не будет обрабатываться более чем одним консьюмером из этой группы одновременно.
Автоматическое Управление: Kafka автоматически управляет закреплением партиций за консьюмерами, а также реагирует на изменения, такие как добавление или удаление консьюмеров из группы. При этом Kafka может перераспределить партиции между консьюмерами, чтобы обеспечить равномерное распределение нагрузки.
Сохранение Состояния: Позиция (offset) чтения для каждой партиции хранится в записной группе (offset storage), что позволяет консьюмерам продолжать чтение с того места, где они остановились в случае сбоя или перезапуска. OrderListener.java /* public void listen(OrderEvent message, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp) { log.info("Received message: {}", message); log.info("Key: {}; Partition: {}; Topic: {}, Timestamp: {}", key, partition, topic, timestamp); }*/ // механизм горизонт масштабирования - группа консьюмеров - разделение по вычитке - спомошь. партиций 10
#Дополнительные материалы ТЕСТЫ //ждем асинхронное взаимодействие чтобы прочитать логи и провалидировать /* Awaitility.await() .atMost(10, TimeUnit.SECONDS) .pollInterval(500, TimeUnit.MILLISECONDS) .untilAsserted(() -> assertThat(output.getOut()) .contains("[Producer clientId=order-producer-1]"));*/