시그마 삽질==six 시그마

Spring webflux 본문

프로그래밍/Spring

Spring webflux

Ethan Matthew Hunt 2022. 7. 19. 22:24

spring webflux?

 

  • 리액티브 스택 웹 프레임워크.( 스프링5부터 적용)
  • Asynchronous Non-blocking I/O  방식
  • back pressure 지원
  • Netty, UnderTow, Servlet 3.1+ 컨테이너에서 구동됨
  • Non Blocking 기반으로 코드를 작성하고 DB도 Non Blocking 기반을 써야 효과가 있다.

webflux 탄생 배경?

    • 적은 수의 쓰레드로 동시 처리를 제어하고 적은 하드웨어 리소스를 확장하기 위해 논블로킹 웹 스택이 필요했기 때문(비동기/논블록을 이용해서 더 적은 자원으로 더 많은 트래픽을 처리하기 위함)
    • 이벤트 루핑 기법을 활용하여 쓰레드 당 많은 요청을 처리할 수 있어서 한 연결당 소요 비용이 더 경제적임( 비용 과다는 콜백으로 등록하고 다른 요청 받음)

reactive?

  • 변화에 반응하는 것을 중심에 두고 만든 프로그래밍 모델
  • 사용자가 해당 소프트웨어를 사용하기 위해서 어떤 입력을 발생시켰을 때 꾸물거리지 않고 최대한 빠른 시간 내에 응답을 한다는 의미

Reactor?

  • Reactor는 Pivotal의 오픈소스 프로젝트로, JVM 위에서 동작하는 논블럭킹 애플리케이션을 만들기 위한 리액티브 라이브러리
  • Reactive Streams 을 구현한 구현체 중 하나
  • spring webflux가 선택한 라이브러리

back pressure?

  • 컨슈머 랙이 걸리지 않도록 이벤트 속도를 제어
  • Publisher의 일방적 데이터 Push 가 아니라, Subscriber가 처리할 수 있을 만큼의 데이터만 Subscriber의 요청에 의해서 전달해주는 것

 

Reactive Streams 구동방식?

  • 리액티브 스트림즈는 Publisher를 이용해서 스트림을 정의하며 Subscriber를 이용해서 발생한 신호를 처리한다. Subscriber가 Publisher로부터 신호를 받는 것을 구독이라고 한다
  • 스트림은 0개 이상의 next 신호를 발생할 수 있다. next 신호는 데이터를 담는다. complete 신호는 스트림이 끝났음을 의미하며 error 신호는 에러가 발생했음을 의미한다. complete와 error는 둘 중 하나만 발생할 수 있으며, 이 두 신호는 발생하지 않을 수도 있다.

 

 

 

Flux와 Mono?

Publisher<T> 의 구현체는 Flux와 Mono 두개임.

  • Flux와 Mono는 Reactive Streams의 Publisher를 구현한 객체며 차이점은 발행하는 데이터 갯수입니다.
  • Flux : 0 ~ N 개의 데이터 전달
  • Mono : 0 ~ 1 개의 데이터 전달
  •  Flux는 0개 이상의 데이터를 발생하므로 0개 이상의 next 신호를 발생할 수 있고 complete나 error 신호를 발생하거나 발생하지 않을 수 있다. 
public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

public interface Subscription {
    public void request(long n);
    public void cancel();
}

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

 

https://javacan.tistory.com/entry/Reactor-Start-1-RS-Flux-Mono-Subscriber

 

 

  • onSubscribe(Subscription s): 구독을 하면 Publisher와 연동된 Subscription을 받는다. 전달받은 Subscription을 이용해서 Publisher에 데이터를 요청한다.
  • onNext(T t): Publisher가 next 신호를 보내면 호출된다.
  • onError(Throwable t): Publisher가 error 신호를 보내면 호출된다.
  • onComplete(): Publisher가 complete 신호를 보내면 호출된다.

 

출처: 자바캔(최범균님)&nbsp;https://javacan.tistory.com/entry/Reactor-Start-1-RS-Flux-Mono-Subscriber
FluxArray

푸시 모델 vs 풀모델

Subscription#request() : pull모델

request(Long.MAX_VALUE): push 모델

 

 

 

Cold/Hot?

 

  • cold 방식(일반적인 방식) :Mono / Flux는 subcribe 하지 않으면, 아무 일도 일어나지 않는다. 또한 subcribe 할 때마다 매번 독립적인 데이터를 생성하고 동작한다.(API호출, Flux.just(), Mono.just()... )
  • Hot: Hot은 Cold와 다르게, subcribe를 하기 전에 데이터를 생성할 수도 있고,  한번 생성한 데이터를 subcribe 할 때마다 생성해 놓은 데이터를 통해 동작하게 할 수 있다. (. ).
  • Cold의 장점이자 단점인 독립적인 구독은 성능상의 문제가 될 수 있다.  그렇다고 Hot을 무분별하게 사용할 경우 publishing이 되지 않거나, 뒤늦은 streaming 을 받아 원치 않는 동작을 할 수도 있다. 

 

  • cold -> hot 전환: just->defer  ,ConnectableFlux , share...
  • ConnectableFlux로 변환하는 과정

         1.publish()라는 연산자를 호출시 ConnectableFlux로 변경가능

         2.연결해서 완성(방법3가지)

           1) 구독 모두 완료 후에 connect()호출해서 실행

           2) publish() 후 autoConnect(구독수) 넣어서 일정 구독수 되면 자동 connect()되게 함

                autoConnect/refCount: 이름 그대로 최소 구독 개수를 만족하면 자동으로 connect()를 호출하는 역할

 

          3) publish() 후 refCount(구독수) 넣어서 일정 구독수 되면 자동 connect()되게 함

               refCount는 autoConnect가 하는 일에 더해서 구독하고 있는 구독자의 개수를 세다가 하나도 구독하는 곳이 없으면

               기존 소스의 스트림도 구독을 해제하는 역할을 함

 

 

publishOn vs SubcribeOn?

publishOn()
publishOn() 앞쪽의 publisher chain는 그대로 두고 "뒷쪽의 subscriber chain만 별도의 쓰레드로 분리"함
비유하자면 "과거와는 결별하고 나는 나만의 길을 가겠어!"라는 의미에 가까움
일반적으로 빠른 publisher와 느린 subscriber로 chain이 구성될 때 사용함. 외부 의존성에 쓰기를 수행할 때 필요함

subscribeOn()
subscribeOn() "앞쪽의 publisher chain와 뒷쪽의 subscriber chain을 함께 묶어서 별도의 쓰레드로 분리"함
비유하자면 "과거에 책임자들을 다 내쫓고 내가 다 책임지겠어!"에 가까움
subscribeOn()이 호출되는 위치와 상관없음
일반적으로 느린 publisher와 빠른 subscriber로 구성된 chain에서 사용함. 외부 의존성으로부터 읽어올 때 subscribeOn()을 사용하는 게 필요함


출처 :https://wiki.terzeron.com/Programming/Java/Reactor_Flux의_publishOn_subscribeOn을_이용한_스케쥴링
https://www.woolha.com/tutorials/project-reactor-publishon-vs-subscribeon-difference

 

 

메서드?

just: 시퀀스로 사용할 데이터가 이미 존재할 때 사용 Flux.just(1, 2, 3); Mono.just(1);

range: 순차적으로 증가하는 Integer를 생성 Flux.range(11, 5);

generate: 데이터를 함수를 이용해서 생성할 수 있다

create: 비동기나 push 방식으로 데이터를 발생

fromStream: 자바 8의 Stram에서 Flux를 생성할 수 있다

fromIterable: Iterable을 이용해서 Flux를 생성할 수 있다

map: 자바 스트림의 map()과 유사

map 동기식 방식 !! 가능하면 사용금지!!

flatMap: flatMap은 1개의 데이터로부터 시퀀스를 생성할 때 사용한다. 

즉 1-n 방식의 변환을 처리한다.flatMap에 전달한 함수가 생성하는 각 Flux는 하나의 시퀀스처럼 연결된다. 그래서 flatMap()의 결과로 생성되는 Flux의 타입이 Flux<Flux<Integer>>가 아니라 Flux<Integer>다

값을 꺼내서 새로운 Publisher로 바꿔줄 수 있는 연산자

flatmap 비동기식 

flatMap은 리턴하는 Publisher가 비동기로 동작할 때 순서를 보장하지 않으므로, 순서 보장을 하려면 flatMapSequential 또는 concatMap을 사용해야한다.

flatMapSequential과 concatMap의 차이는 concatMap은 인자로 지정된 함수에서 리턴하는 Publisher의 스트림이 다 끝난 후에 그다음 넘어오는 값의 Publisher스트림을 처리하지만, flatMapSequential은 일단 오는 대로 구독하고 결과는 순서에 맞게 리턴하는 역할을 해서, 비동기 환경에서 동시성을 지원하면서도 순서를 보장할 때 쓰이는 것이 차이점

 

 

Flux<Integer> seq = Flux.just(1, 2, 3)

             .flatMap(i -> Flux.range(1, i)) // Integer를 Flux<Integer>로 1-N 변환

seq.subscribe(System.out::println);

 

1 -> Flux.range(1, 1) : [1] 생성

2 -> Flux.range(1, 2) : [1, 2] 생성

3 -> Flux.range(1, 3) : [1, 2, 3] 생성

 

1

1

2

1

2

3

 

 

filter : 필터

defaultIfEmpty : 빈 시퀀스인 경우 기본 값 사용하기

switchIfEmpty : 빈 시퀀스인 경우 다른 시퀀스 사용하기

startWith :  특정 값으로 시작하는 시퀀스로 변환

concatWithValues :  특정 값으로 끝나는 시퀀스로 변환

concatWithValues:  특정 값으로 끝나는 시퀀스로 변환

cancatWith: 시퀀스 순서대로 연결

mergeWith: 시퀀스 발생 순서대로 섞기

zipWith:  시퀀스 묶기

combineLatest: 시퀀스 묶기(가장 최근의 데이터를 쌍으로 만든다)

take(처음 n개), takeLast(마지막 n개) : 지정한 개수/시간에 해당하는 데이터만 유지

skip(처음 n개), skipLast(마지막 n개)  : 지정한 개수/시간만큼 데이터 거르기

collectList : List 콜렉션으로 모으기

collectMap : Map 콜렉션으로 모으기

collectMultiMap : Map의 값을 콜렉션으로 모으기

count :  개수 새기

reduce : 누적하기

scan :  누적하면서 값 생성하기

window(int), window(int, int) : 일정 개수로 묶어서 Flux 만들기

window(Duration), window(Duration, Duration): 일정 시간 간격으로 묶어서 Flux 만들기

windowUntil(Predicate) : 특정 조건에 다다를 때가지 묶어서 Flux 만들기

windowWhile(Predicate) : 특정 조건을 충족하는 동안 묶어서 Flux 만들기

buffer류 메서드 :  Flux 대신 List로 묶기

 

 

리액터 쓰레드 스케줄링?

리액터는 비동기 실행을 강제하지 않는다

 

1)publishOn을 이용한 신호 처리 쓰레드 스케줄링

publishOn() 메서드를 이용하면  Subscriber의 onNext(), onComplete(), onError()신호를 별도 쓰레드로 처리할 수 있다. map(), flatMap() 등의 변환도 publishOn()이 지정한 쓰레드를 이용해서 처리한다

publishOn()에 지정한 스케줄러는 다음 publishOn()을 설정할 때까지 적용된다

이 연산자가 호출된 위치 이후에 실행되는 연산자들은 publishOn에서 지정된 스케줄러에서 실행되도록 할 수 있습니다. 

 

2)subscribeOn을 이용한 구독 처리 쓰레드 스케줄링

해당 스트림을 구독할 때 동작하는 스케줄러를 지정할 수 있습니다

subscribeOn()을 사용하면 Subscriber가 시퀀스에 대한 request 신호를 별도 스케줄러로 처리한다

 

https://velog.io/@nkjang/publishOn-subscribeOn

https://javacan.tistory.com/entry/Reactor-Start-6-Thread-Scheduling

https://100100e.tistory.com/569

https://dreamchaser3.tistory.com/6 

subscribeOn으로 지정한 스케줄러는 첫 번째 publishOn이 올때까지 적용

 

로그

.log()  블로킹 i/o 일으킴. 꼭 제거해줘야함

map 동기식 방식

flatmap 비동기식 

 

 

성능 향상 개선

 

TPS  & response Time 95% 같이 봐야함.

하위 95%의 응답속도..

 

 

1)log()제거 RollingFileAppender로 변경해야

 

2)BlockHound  블로킹 코드 찾아줌( 테스트코드에서 테스트!!)

 

3)Lettuce 설정

Connection Validation시 synchronous로 동작

command 실행마다 ping command를 synchroous로 실행 

성능하락원인

factory.setValidateConnection(true); <—문제임!!

 

4)Avoiding Reactor MeltDown( from InfoQ)

Reactor MeltDown

Event Loop 의 Tread들이 Blocking API 때문에 Reactor 시스템이 hang 걸리는 현상

Blocking API를 별도의 Thread Pool로 격리시키는 방법

subcribeOn(), publishOn()

 

 

 

webflux 프로그래밍 모델???

 

1) 어노테이션 기반 Controllers

 

스프링 MVC과 동일하며  spring-web  모듈에 있는 같은 어노테이션과 같다. 스프링 MVC와 웹플럭스 컨트롤러 모두 리액티브(Reactor, RxJava) 리턴 타입을 지원하기 때문에 이 둘을 구분하기 어렵다. 한 가지 눈에 띄는 차이는 웹플럭스에선 @RequestBody 로 리액티브 인자를 받을 수 있다는 것이다.

 

 

2) 함수형  Endpoints

 

요청을 라우팅해주는 경량화된 람다 기반 함수형 프로그래밍 모델.  

어노테이션으로 의도를 선언해서 콜백 받기보단 요청을 어플리케이션이 처음부터 끝까지 다 제어함.

Node.js의 Express와 비슷

 

 

 

curl https://start.spring.io/starter.tgz  \

       -d bootVersion=2.4.4 \

       -d dependencies=webflux \

       -d baseDir=spring-webflux-tutorial \   <— 폴더명 보통은 artifactId와 똑같이함.

       -d artifactId=webflux \   <— intellij에서 project name이 됨

       -d packageName=com.dev.webflux \   

       -d applicationName=HelloWebFluxApplication \  <— 시작하는 클래스명

       -d type=gradle-project | tar -xzvf -

 

 

curl -i localhost:8080

curl -i localhost:8080 -H 'Accept: text/event-stream'

curl -i localhost:8080 -H 'Accept: application/stream+json'

 

 

webflux 모니터링 어떻게 해야되나??

쓰레드가 계속 변경이 되서 우리가 알고 있는 apm 제니퍼같은 경우 모니터링이 안된다. 어떻게 해야할까?

1)Reactor With MDC를 사용하면됨!!! 

2)Spring Boot Micrometer 

https://techblog.woowahan.com/2667/

 

 

responseEntity 관련
https://dzone.com/articles/webflux-reactive-programming-with-spring-part-3
https://stackoverflow.com/questions/57769190/what-is-the-difference-between-responseentitymono-and-monoresponseentity-as
https://codehunter.cc/a/spring/what-is-the-difference-between-responseentity-mono-and-mono-responseentity-as-a-return-type-of-a-rest-controller

 

 

참고

https://tech.kakao.com/2018/05/29/reactor-programming/

https://javacan.tistory.com/entry/Reactor-Start-1-RS-Flux-Mono-Subscriber?category=699082 

https://www.youtube.com/watch?v=I0zMm6wIbRI

'프로그래밍 > Spring' 카테고리의 다른 글

Spring batch 요약  (0) 2020.10.25
@TransactionalEventListener  (0) 2020.10.22
Spring Cloud Netflix  (0) 2020.10.18
Spring Kafka Basic  (0) 2020.08.25
ObjectMapper & OrikaMapper basic  (0) 2020.08.24
Comments