2026. 4. 1. 23:13ㆍBackend/Messaging(Kafka,RabbitMQ)
📝 회사에서 이번에 Kafka 를 일부 사용하고 있습니다. 따라서 Kafka를 미리 공부하면서, Spring App을 연동하여 Message를 Consume하는 내용을 구현하며 연습해봤습니다.
Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. ( https://kafka.apache.org/ )
Apache Kafka
Apache Kafka
kafka.apache.org
공식 홈페이지에서 설명한 것처럼 Apache Kafka는 분산 이벤트 스트리밍 플랫폼입니다. RabbitMQ가 Queue 기반으로 메시지를 전달하는 구조라면, Kafka는 Topic과 Partition으로 구성된 로그 기반 저장 구조를 사용합니다. 또한 Kafka에서 Consumer는 offset을 기준으로 원하는 시점부터 메시지를 구독할 수 있다는 특징이 있습니다.
1. wsl, docker desktop
windows 환경에서 리눅스 환경을 사용할 수 있는 wsl 과 docker engine을 사용할 수 있는 docker desktop을 이용하려고 합니다.

이후 linux에서 app 업데이트를 진행하고, java를 설치하였습니다.
sudo apt update // update
sudo apt install openjdk-21-jdk // java - jdk
2. kafka 설치
docker-compose를 활용하여 kafka를 설치하려고 합니다.
먼저 Linux에 docker-compose 파일을 관리할 디렉토리를 만들고, VScode를 이용하여 docker-compose.yml 파일을 생성합니다.
mkdir ~/lab/kafka-lab
cd ~/lab/kafka-lab
code .
docker-compose.yml (예시)
//Example
services:
controller1: // 메타데이터를 관리하는 컨트롤러 노드
image: apache/kafka:latest
container_name: controller1
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: controller
KAFKA_LISTENERS: CONTROLLER://:9093
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller1:9093,2@controller2:9093,3@controller3:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
networks:
- kafka-net
controller2:
image: apache/kafka:latest
container_name: controller2
environment:
KAFKA_NODE_ID: 2
KAFKA_PROCESS_ROLES: controller
KAFKA_LISTENERS: CONTROLLER://:9093
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller1:9093,2@controller2:9093,3@controller3:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
networks:
- kafka-net
controller3:
image: apache/kafka:latest
container_name: controller3
environment:
KAFKA_NODE_ID: 3
KAFKA_PROCESS_ROLES: controller
KAFKA_LISTENERS: CONTROLLER://:9093
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller1:9093,2@controller2:9093,3@controller3:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
networks:
- kafka-net
broker1: // 메시지를 저장하는 broker(서버)
image: apache/kafka:latest
container_name: broker1
ports:
- "19092:9092" //docker와 연결하기 위해 port 매핑
environment:
KAFKA_NODE_ID: 4
KAFKA_PROCESS_ROLES: broker
KAFKA_LISTENERS: PLAINTEXT://:19092,PLAINTEXT_HOST://:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker1:19092,PLAINTEXT_HOST://localhost:19092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller1:9093,2@controller2:9093,3@controller3:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
depends_on:
- controller1
- controller2
- controller3
networks:
- kafka-net
broker2:
image: apache/kafka:latest
container_name: broker2
ports:
- "29092:9092"
environment:
KAFKA_NODE_ID: 5
KAFKA_PROCESS_ROLES: broker
KAFKA_LISTENERS: PLAINTEXT://:19092,PLAINTEXT_HOST://:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker2:19092,PLAINTEXT_HOST://localhost:29092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller1:9093,2@controller2:9093,3@controller3:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
depends_on:
- controller1
- controller2
- controller3
networks:
- kafka-net
broker3:
image: apache/kafka:latest
container_name: broker3
ports:
- "39092:9092"
environment:
KAFKA_NODE_ID: 6
KAFKA_PROCESS_ROLES: broker
KAFKA_LISTENERS: PLAINTEXT://:19092,PLAINTEXT_HOST://:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker3:19092,PLAINTEXT_HOST://localhost:39092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller1:9093,2@controller2:9093,3@controller3:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
depends_on:
- controller1
- controller2
- controller3
networks:
- kafka-net
networks:
kafka-net:
driver: bridge
이후 아래처럼 생성된 docker-compose.yml 을 이용하여 kafka를 실행해보려고 합니다.

docker-compose up -d
docker ps
docker compose를 실행한 뒤, 프로세스를 확인한 결과입니다.

이후에 spring app과 연동을 위해 topic을 생성하였습니다.
3. spring boot
kafka를 연동하기 위해서는 아래와 같은 의존성을 추가해줘야 합니다. (gradle 기준)
implementation 'org.springframework.kafka:spring-kafka'
application.yml (예시)
spring:
kafka:
bootstrap-servers:
- localhost:19092
- localhost:29092
- localhost:39092
consumer:
group-id: test-group
auto-offset-reset: latest
enable-auto-commit: false
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 10
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
listener:
ack-mode: manual
KafkaProperties (예시)
application.yml에서 작성했던 여러 속성값들을 가져올 수 있는 클래스 코드입니다.
@ConfigurationProperties(prefix = "spring.kafka")
@Getter
public class KafkaProperties {
private List<String> bootstrapServers;
private Consumer consumer;
@Data
public static class Consumer {
private String groupId;
private String autoOffsetReset;
private Boolean enableAutoCommit;
private String keyDeserializer;
private String valueDeserializer;
private Integer maxPollRecords;
}
}
KafkaConfig (예시)
아래와 같이 Kafka를 구독할 수 있는 설정 클래스 코드를 추가하였습니다.
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
private final KafkaProperties kafkaProperties;
public KafkaConsumerConfig(KafkaProperties kafkaProperties) {
System.out.println("kafkaProperties = " + kafkaProperties);
this.kafkaProperties = kafkaProperties;
}
@Bean
public ConsumerFactory<String, Object> testKafkaConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getConsumer().getGroupId());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaProperties.getConsumer().getEnableAutoCommit());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaProperties.getConsumer().getKeyDeserializer());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaProperties.getConsumer().getValueDeserializer());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaProperties.getConsumer().getMaxPollRecords());
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean(name = "testKafkaConsumerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, Object> testKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(testKafkaConsumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
}
KafkaConsumer (예시)
@Slf4j
@Component
public class KafkaConsumer {
@KafkaListener(topics = "test", groupId = "test-group", containerFactory = "testKafkaConsumerContainerFactory")
public void listenTest(String message, Acknowledgment ack) {
try {
log.info("수신 메시지 = {}", message);
//로직 추가해도 됨
ack.acknowledge();
} catch (Exception e) {
log.error("메시지 수신을 실패하였습니다.");
log.error("error message = {}", e.getMessage());
}
}
}
Spring - Kafka 연동
이후, 간단하게 @Scheduled 를 이용하여 내부적으로 몇 초마다 한 번씩 메시지를 생성하게끔 했습니다. 이 때 @KafkaListener를 이용하여 특정 topic을 일정 주기로 구독하였는데요!
kafka 메시지 구독 성공 로그 입니다.

추가로, docker에서 실행중인 kafka로 접속해서 로그를 확인한 결과입니다.


느낀점
실무에서 RabbitMQ를 사용하고 있어 RabbitMQ의 특성에 익숙해져 있다가, Kafka를 한 번 겪어보니.. Kafka를 왜 엄청 많이 사용하는지 크게 깨닫게 되었습니다. RabbitMQ는 메시지가 소비되면 Queue에서 메시지가 없어집니다. 반면, Kafka는 로그 세그먼트와 offset 개념을 활용해 내용을 확인할 수 있다는 장점이 있습니다. Kafka 강의를 인프런에서 수강중인데, Kafka를 자유롭게 다루면서 개발할 수 있도록 노력해야겠다고 느꼈습니다.
'Backend > Messaging(Kafka,RabbitMQ)' 카테고리의 다른 글
| [RabbitMQ] 설치 및 기동 (1) | 2024.12.15 |
|---|