[Spring Cloud] Spring Cloud Data Flow (local_ver)

2025. 2. 17. 20:37Backend/Spring

 

회사에서 다루고 있는 기술 스택 중 하나가 Spring Cloud Data Flow (이하 'SCDF') 입니다. 

 

로컬 환경(로컬보다는 Docker 등을 많이들 추천하신다)에서 설치하여, 개인적으로 공부해보려고 합니다.

 

Spring Cloud Data Flow 의 소스 코드가 있는 Github 의 About에서 SCDF를 다음과 같이 소개하고 있습니다.

 

GitHub - spring-cloud/spring-cloud-dataflow: A microservices-based Streaming and Batch data processing in Cloud Foundry and Kube

A microservices-based Streaming and Batch data processing in Cloud Foundry and Kubernetes - spring-cloud/spring-cloud-dataflow

github.com

 

 

A microservices-based Streaming and Batch data processing in Cloud Foundry and Kubernetes

by github (https://github.com/spring-cloud/spring-cloud-dataflow)

 

'Cloud Foundry와 Kubernetes 환경에서 사용하는 마이크로서비스 기반의 스트리밍 및 배치 데이터 처리'라고 소개하고 있습니다.

 

 

스프링 공식 문서에서도 Cloud Foundry나 Kubernetes를 소개하고 있습니다. 다만 저는  local에서 설치해보려고 합니다.


 

Spring Cloud Data Flow 설치

 

 

Documentation | Spring Cloud Data Flow

 

dataflow.spring.io

 

먼저 SCDF를 실행하기 전에, 설치 과정이 필요합니다. SCDF 서버와 SCDF 스키퍼를 다운로드 받을 수 있습니다.

 

SCDF 서버는 Stream / Batch 애플리케이션을 정의하고 관리하는 역할을 담당합니다. SCDF 스키퍼(skipper)는 앱을 배포하고 업데이트를 담당하는 관리자의 역할을 담당합니다. 

 

저는 크롬을 이용해서 직접 jar 를 다운로드 해줬습니다. 하단의 명령어는 공식 홈페이지에서 확인할 수 있었습니다.

wget https://repo.maven.apache.org/maven2/org/springframework/cloud/spring-cloud-dataflow-server/2.11.5/spring-cloud-dataflow-server-2.11.5.jar

wget https://repo.maven.apache.org/maven2/org/springframework/cloud/spring-cloud-dataflow-shell/2.11.5/spring-cloud-dataflow-shell-2.11.5.jar

wget https://repo.maven.apache.org/maven2/org/springframework/cloud/spring-cloud-skipper-server/2.11.5/spring-cloud-skipper-server-2.11.5.jar

 

그리고 보이시는 것처럼, 각 jar 파일을 실행해주려고 합니다.

 

 

 

저는 이 때 해당 블로그 글을 참고했는데요!! shell 파일을 따로 사용하지 않고 시도해보려고 해서, scdf 서버와 scdf 스키퍼만 실행해보려고 합니다.

 

스프링 클라우드 데이터 플로우 (SCDF)를 이용한 로컬 서버 구축 및 어플리케이션 통합 - 실습 가

여러가지 문서들을 찾아 보면서 네이버 개발자분의 정리가 잘되어있어 참고하였습니다. 스프링 클라우드 데이터 플로우에 대한 이해를 높이기 위해 먼저 살펴보면 좋을 것 같아 참조합니다. 참

rhgustmfrh.tistory.com

 

아래에 있는 명령어를 각 로컬 환경에 맞춰 수정하신 뒤 사용하시면 됩니다!

SCDF 서버
java -Dloader.path=lib -jar spring-cloud-dataflow-server-2.11.5.jar --spring.datasource.url=jdbc:postgresql://localhost:5432/test --spring.datasource.username=postgres --spring.datasource.password=(비밀번호) --spring.datasource.driver-class-name=org.postgresql.Driver --spring.rabbitmq.host=localhost --spring.rabbitmq.port=5672 --spring.rabbitmq.username=guest --spring.rabbitmq.password=(비밀번호)

SCDF 스키퍼
java -jar spring-cloud-skipper-server-2.11.5.jar

 

이후 메인 페이지에 접속하여, SCDF 가 정상적으로 작동하는지 확인할 수 있습니다. (http://localhost:9393/dashboard/)

 

 


Spring Cloud Data Flow 간단한 Test를 위한, 간단한 Spring Boot 소스 코드 

 

Source

데이터의 출발 단계입니다. 외부로부터 데이터를 받는 역할을 담당합니다.

@Slf4j
@RestController
@Configuration
public class TestSourceConfig {

    private String messages;

    @Bean
    public Supplier<String> sendMessage() {
        return () -> {
            String msg = messages;
            return msg != null ? msg : "Hello";
        };
    }

    @GetMapping("/test")
    public ResponseEntity<?> testSourceInputData(@RequestParam(name = "sample") String received) {
        log.info("====== test source =====");
        messages = received;
        return ResponseEntity.status(HttpStatus.OK).body("received");
    }

}
spring.application.name=testSource
server.port=9000

spring.cloud.stream.default-binder=rabbit
spring.cloud.function.definition=sendMessage
spring.cloud.stream.bindings.sendMessage-in-0.destination=test-source
spring.cloud.stream.bindings.sendMessage-out-0.destination=test-parse

# RabbitMQ 기본 연결 설정
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=
spring.rabbitmq.virtual-host=/

 

 

Processor

Source로부터 데이터를 받아 가공(변환, 필터링 등)을 하는 단계입니다.

@Slf4j
@Configuration
public class testProcessorConfig {

    @Bean
    public Function<String, String> parseMessage() {
        log.info("====== test processor =====");
        return input -> input + " is parsed";
    }

}
spring.application.name=testProcessor
server.port=9100

spring.cloud.stream.default-binder=rabbit
spring.cloud.function.definition=parseMessage
spring.cloud.stream.bindings.parseMessage-in-0.destination=test-source
spring.cloud.stream.bindings.parseMessage-out-0.destination=test-parse

# RabbitMQ 기본 연결 설정
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=
spring.rabbitmq.virtual-host=/

 

 

Sink

데이터의 마지막 단계로서, 데이터를 저장하거나 활용하는 역할을 담당하고 있습니다.

@Slf4j
@Configuration
@RequiredArgsConstructor
public class TestSinkConfig {

    private final TestRepository testRepository;

    @Bean
    public Consumer<Message<String>> testConsumer() {
        log.info("====== test sink =====");
        return content -> {
            AcknowledgmentCallback acknowledgmentCallback = StaticMessageHeaderAccessor.getAcknowledgmentCallback(content);

            try {
                TestSample testSample = new TestSample();
                testSample.setMessage(content.getPayload());
                testRepository.save(testSample);
                log.info("=== message saved ===");
                if (acknowledgmentCallback != null) {
                    acknowledgmentCallback.acknowledge(AcknowledgmentCallback.Status.ACCEPT);
                    log.info("message = {} ", content.getPayload());
                }
            } catch (Exception e) {
                if(acknowledgmentCallback != null) {
                    acknowledgmentCallback.acknowledge(AcknowledgmentCallback.Status.REJECT);
                    log.info("message rejected");
                }
            }
        };
    }
    
}
pring.application.name=testSink
server.port=9200

spring.cloud.stream.default-binder=rabbit
spring.cloud.function.definition=testConsumer
spring.cloud.stream.bindings.testConsumer-in-0.destination=test-parse

# RabbitMQ 기본 연결 설정
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=
spring.rabbitmq.virtual-host=/

 


Spring Cloud Data Flow 간단한 Test를 위한 모듈 On

 

아쉽게도 local에서는 stream을 만들어주고 Deploy 해줬는데도, 자동 실행이 되지 않았습니다. 직접 배포 등을 활용하고 싶다면 꼭! Docker를 활용해주세요. (로컬 특성 상 stream에서 log가 수집되지 않는다고 하네요)

 

1. Applications 에서 Application 을 등록해주시면 됩니다.

 

2. 다음에 Streams에서 stream을 만들어주려고 하는데요. 사용하고 싶은 방법으로 Stream을 생성해주면 됩니다.

 

3. 2.에서 나온 단순 방법으로 Stream을 만들게 된다면, 3과 같은 Stream을 확인하실 수 있습니다.


 

Spring Cloud Data Flow : Message 전송 간단한 Test

 

1. postman 요청 

2. source log

 

3. processor log

 

4. sink log

 

5. DB(PostgreSQL) 확인

 


 

하지만, local에서 scdf server, skipper server, rabbitmq, postgreSQL, 각 모듈(source, processor, sink)들을 실행한다는 건... 실행의 주체가 local computer(OS)가 된다는 말입니다. 따라서 scdf server의 등록된 각 app에서 log들을 전혀 관찰할 수 없었는데요

 

다음으로 docker-compose를 이용하여 실행해보려고 합니다. (다음 예정)

 


 

아쉽게도 SCDF는 2025년 4월 이후로 더 이상 오픈소스로서 제공되지 않는다고 하네요 (not be maintaining Spring Cloud Data Flow). 대신 개발/운영하며 배웠던 내용들을 가지고, 앞으로 메시징 서비스 등에서 잘 활용할 수 있게 Kafka를 더 공부해보려고 합니다. 

 

Spring Cloud Data Flow End of Open-Source

Spring Cloud Data Flow End of Open-Source TL;DR; Today we're announcing that going forward we will not be maintaining Spring Cloud Data Flow, Spring Cloud Deployer, or Spring Statemachine as open-source projects. Spring Cloud Data Flow 2.11.x, Spring Cloud

spring.io

 

 

 

 

References

1. 스프링 클라우드 데이터 플로우 (SCDF)를 이용한 로컬 서버 구축 및 어플리케이션 통합

2. SCDF로 하루 N만곡 이상

3. SCDF-Batch

4. SCDF github(공식)

5. Spring Cloud Data Flow(공식)

6. SCDF