Статья разбита на 2 части: Установка Kafka на сервер и создание SpringBoot приложения.
Если вы вообще не в курсе за брокеров сообщений, то советую посмотреть два видосика про Apache Kafka и Rabbit MQ. Они дадут понимание как там что работает и чем отличается:
I. Установка Apache Kafka
В качестве базовой системы у меня CentOS 7 x86_64 Minimal 1804. Для кафки рекомендуется иметь 4 Gb оперативки.
Перед началом работы ставлю нужные мне программы:
sudo yum install mc nano net-tools wget -y
далее, согласно инструкции https://www.digitalocean.com/community/tutorials/how-to-install-apache-kafka-on-centos-7:
1 Установка OpenJDK8
1 | sudo yum install java-1.8.0-openjdk |
2 Создание нового пользователя
1 | sudo useradd kafka -m |
флаг -m означает, что также будет создана домашняя папка этого пользователя (/home/kafka).
Устанавливаем пароль:
1 | sudo passwd kafka |
Добовляем пользователя в группу wheel чтобы у него были права устанавливать зависимости Kafka:
1 | sudo usermod -aG wheel kafka |
Теперь войдем из-под этого пользователя:
1 | su -l kafka |
3 Загрузка и установка Kafka Binaries
Для начала создадим папку для загрузок
1 | mkdir ~/downloads |
Теперь идём на сайт и достаём оттуда ссылку на свежий дистрибутив кафки https://kafka.apache.org/downloads. Только мой вам совет - удостоверьтесь что эта ссылка ведёт непосредственно на файл, а не на какую-то другую страницу, а то я так целый час тупил и не мог распаковать скаченный архив .tgz.
Далее качаем этот файл себе в папку:
1 | cd ~/downloads |
1 | wget "https://apache-mirror.rbc.ru/pub/apache/kafka/2.6.0/kafka_2.13-2.6.0.tgz" |
Создаем папку под кафку и распаковываем его туда:
1 | mkdir ~/kafka |
1 | cd ~/kafka |
1 | tar -xvzf ~/Downloads/kafka.tgz --strip 1 |
флаг --strip 1 просит архиватор не создавать родительскую папку типа ~/kafka/kafka_2.13-2.6.0, а распаковывать содержимое сразю в ~/kafka.
4 Настройка сервера Kafka
Поведение Kafka по умолчанию не позволяет нам удалить тему, категорию, группу или название канала, в котором можно публиковать сообщения. Чтобы исправить это, давайте отредактируем файл конфигурации:
1 | nano ~/kafka/config/server.properties |
и добавим это в конец:
1 | delete.topic.enable = true |
5 Создание Systemd Unit - файлов и запуск сервера Kafka
Создаем файл для зукипера (необходим для работы Kafka)
1 | sudo nano /etc/systemd/system/zookeeper.service |
со следующим содержанием
1 2 3 4 | <br> [Unit]<br> Requires=network.target remote-fs.target<br> After=network.target remote-fs.target |
1 |
1 2 3 4 5 6 | <p>[Service]<br> Type=simple<br> User=kafka<br> ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties<br> ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh<br> Restart=on-abnormal</p> |
1 |
1 2 | [Install]<br> WantedBy=multi-user.target<br> |
секция Unit требует чтобы перед запуском zookeeper сеть и файловая система уже были готовы
секция Service назначает используемые при старте и остановке zookeeper скрипты
Теперь создаем то же самое для Kafka:
1 | sudo nano /etc/systemd/system/kafka.service |
со следующим содержанием
1 2 3 4 | <br> [Unit]<br> Requires=zookeeper.service<br> After=zookeeper.service |
1 |
1 2 3 4 5 6 | <p>[Service]<br> Type=simple<br> User=kafka<br> ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1'<br> ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh<br> Restart=on-abnormal</p> |
1 |
1 2 | [Install]<br> WantedBy=multi-user.target<br> |
секция Unit требует чтобы перед запуском kafka - zookeeper уже был готов.
Запускаем:
1 | sudo systemctl start kafka |
для проверки успешности запуска, смотрим журнал:
1 | journalctl -u kafka |
вывод должен содержать что-то подобное: Jul 17 18:38:59 kafka-centos systemd[1]: Started kafka.service.
Теперь по идее кафка работает на 9092 порту. Для проверки можно взглянуть на прослушиваемые порты:
1 | netstat -tulpn |
Теперь добавим кафку в автозагрузку:
1 | sudo systemctl enable kafka |
6 Настройка фаерволла
Хоть порт и прослушивается, но фаерволл не даёт подключиться к серверу извне. Исправляем это следующими командами:
1 | firewall-cmd --permanent --zone=public --add-port=9092/tcp |
1 | firewall-cmd --permanent --zone=public --add-port=9092/udp |
1 | firewall-cmd --reload |
Теперь подключение извне должно быть доступно. Возможно потребуется перезапуск.
7 Тестирование работы Kafka
Можете пропустить этот шаг если он вам не нужен. Но можно и потестировать в консоли:
7.1 Создайте новый топик
1 | ~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic |
вывод должен быть Created topic "TutorialTopic".
7.2 Создайте продюссера и пошлите сообщение:
1 | echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null |
7.3 Создайте консьюмера и запустите приём сообщений:
1 | ~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TutorialTopic --from-beginning |
флаг --from-beginning даст возможность получить сообщения отправленные до запуска консьюмера. Вывод должен быть Hello, World.
Этот скрипт будет выполняться и принимать сообщения в реальном времени. Можете подключиться через другой терминал и попробовать отправить ещё что-нибудь в этот топик.
Также рекомендую скачать и установить клёвый десктопный клиент https://www.conduktor.io/ и законнектиться к серверу Kafka снаружи.

II. Создание Java SpringBoot проекта
Источником инфотмации послужила статья https://habr.com/ru/post/496182/, поэтому если что непонятно - вэлком туда.
Для начала создадим новый SpringBoot проект с зависимостями: Spring Web и Spring for Apache Kafka.
Мы не будем юзать Spring Web, он тут нужен только для того чтобы подтянулся Jackson. Либо можно не включать его в зависимости, а в файле pom.xml, прописать ручками
<dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency>
Создадим класс который будет слать сообщения каждые 3 секунды:
@Component public static class Runner implements CommandLineRunner{ @Autowired //it's ok. https://stackoverflow.com/questions/55280173/the-correct-way-for-creation-of-kafkatemplate-in-spring-boot @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") private KafkaTemplate<String, String> kafkaTemplate; @Override public void run(String... args) throws Exception { Thread thread = new Thread(() -> { try { while (true) { sleep(3000); kafkaTemplate.send("msg", "currentTime", (new Date()).toString()); } }catch (InterruptedException e){} }); thread.setDaemon(true); thread.start(); } }
SuppressWarnings здесь потому что идея упорно говорит что такого бина не существует. Но он есть.
Теперь создадим класс консьюмера:
package ru.knastnt.kafkatest; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @EnableKafka @Component public class Listener { @KafkaListener(topics = "msg", groupId = "app.1") public void messageListener(String msg) { System.out.println(msg); } }
метод messageListener будет вызываться как поступит новое сообщение.
Група консьюмеров - это группа в рамках которой доставляется один экземпляр сообщения. Например, у Вас есть три консьюмера в одной группе, и все они слушают одну тему. Как только на сервере появляется новое сообщение с данной темой, оно доставляется кому-то одному из группы. Остальные два консьюмера сообщение не получают.
Ну и напоследок отредактируем файл application.properties:
#адрес сервера Kafka spring.kafka.bootstrap-servers=192.168.0.239:9092
Запускаем и тестируем! Можно юзать, например, Postman, либо встроенный в идею инструмент Tools > HTTP Client > Test RESTful Web Service.
Эксперименты
Продюссер при отправке сообщения может вернуть ответ об успешности или ошибке. Настрою коллбэк и буду выводить в консоль результат отправки сообщения. Дополним вызов kafkaTemplate.send:
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("msg", "currentTime", (new Date()).toString()); future.addCallback(System.out::println, System.err::println);
Теперь попробую перезагружать сервер Kafka. Что из этого вышло:
Если приложение запускается при недоступном сервере Kafka, То возникает исключение org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic msg not present in metadata after 60000 ms.
Если сервер Kafka становится недоступным в процессе работы, то ошибки обрабатываются в возвращаемом future объекте метода kafkaTemplate.send.
Пока Kafka недоступен, Spring копит отправленные но недоставленные сообщения внутни себя
- Если сервер недоступен менее 2-х минут, то как только Kafka станет доступен, спринг выплюнет все сообщения туда прикрепив к ним правильное время отправки.
- Если сервер недоступен более 2-х минут, то вызовется коллбэк с ошибкой доставки. Типа такого org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Expiring 40 record(s) for msg-0:120000 ms has passed since batch creation и такого org.apache.kafka.common.errors.TimeoutException: Expiring 40 record(s) for msg-0:120000 ms has passed since batch creation. Причем пачкой для нескольких сообщений. В итоге получится такой цикл: накопление сообщений - пачка ошибок - накопление сообщений - следующая пачка ошибок. Не разбирался по какому принципу, но как-то так. При появлении сервера Kafka, накопленные будут отправлены, а те, что с ошибками, - потерялись навсегда.
Отправка и получение кастомных объектов
Сначала реализуем продюссер. Для отправки объектов будем использовать сериализацию в Json. Для каждого кастомного класса нам нужен свой KafkaTemplate. Прежде чем приступить к их созданию, сделаем пару кастомных классов: UserDTO и UserDTO.Address:
package ru.knastnt.kafkatest; public class UserDTO { private String name; private int age; private Address address; public static UserDTO getTestInstance(){ UserDTO u = new UserDTO(); UserDTO.Address a = new UserDTO.Address(); a.setStreet("Ленина"); a.setHouse(16); u.setName("Иван"); u.setAge(25); u.setAddress(a); return u; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public Address getAddress() { return address; } public void setAddress(Address address) { this.address = address; } public static class Address { private String street; private int house; public String getStreet() { return street; } public void setStreet(String street) { this.street = street; } public int getHouse() { return house; } public void setHouse(int house) { this.house = house; } } }
Теперь настроим кастомные шаблоны. Всё поместим в отдельный класс KafkaProducerConfig:
package ru.knastnt.kafkatest; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.serializer.JsonSerializer; import java.util.HashMap; import java.util.Map; @Configuration public class KafkaProducerConfig { @Value("${spring.kafka.bootstrap-servers}") private String kafkaSrv; @Bean public <T> KafkaTemplate<String, T> kafkaStringJsonTemplate() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSrv); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props)); } }
Основное что мы тут делаем - назначаем сериализаторы для ключа и значения сообщений.
Дополним наш класс Runner. Добавим пару @Autowired полей:
@Autowired private KafkaTemplate<String, UserDTO> kafkaUserTemplate; @Autowired private KafkaTemplate<String, UserDTO.Address> kafkaAddressTemplate;
А внутрь цикла в методе run, допишем действия по отправке наших кастомных объектов:
//Шлём объекты через кастомные шаблоны sleep(2000); ListenableFuture<SendResult<String, UserDTO>> userFuture = kafkaUserTemplate.send("msg2", "user", UserDTO.getTestInstance()); userFuture.addCallback(System.out::println, System.err::println); sleep(2000); ListenableFuture<SendResult<String, UserDTO.Address>> userFuture2 = kafkaAddressTemplate.send("msg3", "addr", UserDTO.getTestInstance().getAddress()); userFuture2.addCallback(System.out::println, System.err::println);
Предлагаю запуститься и посмотреть как объекты улетают в кафку (это будет видно в консоли). К тому же можете воспользоваться https://www.conduktor.io/ и убедиться что топики пополняются новыми сообщениями.
Сделаем теперь несколько консьюмеров чтобы получать эти сообщения из кафки. Это несколько сложнее, но принцип тот же.
Создаем класс KafkaConsumerConfig:
package ru.knastnt.kafkatest; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.support.serializer.JsonDeserializer; import java.util.HashMap; import java.util.Map; @Configuration @EnableKafka public class KafkaConsumerConfig { //Этот класс сейчас используется только для настройки приёма кастомных объектов через JSON @Value("${spring.kafka.bootstrap-servers}") private String kafkaSrv; public <T> ConsumerFactory<String, T> consumerFactory(Class<T> clazz) { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSrv); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(clazz)); } @Bean public ConcurrentKafkaListenerContainerFactory<String, UserDTO> userKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, UserDTO> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory(UserDTO.class)); return factory; } @Bean public ConcurrentKafkaListenerContainerFactory<String, UserDTO.Address> addresKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, UserDTO.Address> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory(UserDTO.Address.class)); return factory; } }
Собственно для каждого кастомного класса нужно сделать по параметризованному ConcurrentKafkaListenerContainerFactory - это спринговая потокобезопасная обёртка.
В каждую из них нужно заинджектить конфигурацию консьюмера. Я реализовал её в виде параметризованного метода, в котором ключ - строка, а значение - объект передаваемого класса в json формате.
Теперь добавим пару слушающих методов в наш класс Listener:
//Кастомный Json слушатель для класса UserDTO @KafkaListener(topics = "msg2", containerFactory = "userKafkaListenerContainerFactory", groupId = "usersConsumers") public void messageListener(@Payload UserDTO userDTO, @Headers MessageHeaders headers) { System.out.println(headers); System.out.println(userDTO); } //Кастомный Json слушатель для класса Address @KafkaListener(topics = "msg3", containerFactory = "addresKafkaListenerContainerFactory", groupId = "addressesConsumers") public void messageListener(@Payload UserDTO.Address s, @Headers MessageHeaders headers) { System.out.println(headers); System.out.println(s); }
Каждому @KafkaListener нужно присвоить свой containerFactory, чтобы всё работало. Значение равно имени метода.
Запускайтесь, всё должно работать! Можете скачать мой проект - https://github.com/knastnt/kafkatest, там ничего лишнего - разберётесь 😉