일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- java
- intellij 핵심 단축키
- 원격 브랜 삭제
- @TransactionalEventListener
- JPA
- IntelliJ
- #docker compose
- 자바 ORM 표준 JPA 프로그래밍 정리
- 자바 ORM 표준 JPA 프로그래밍
- ksqldb
- aws
- HandlerMethodArgumentResolver
- Spring Cloud Netflix
- 마이크로 서비스
- 리팩토링 2판
- 리눅스
- intellij favorites
- intellij 즐겨찾기
- Linux
- Stream
- ksql
- findTopBy
- CompletableFuture
- multipart테스트
- 백명석님
- javascript case
- 친절한 SQL 튜닝
- git
- @Transactional Propagation
- vue.js
- Today
- Total
시그마 삽질==six 시그마
Spring webflux 본문
spring webflux?
- 리액티브 스택 웹 프레임워크.( 스프링5부터 적용)
- Asynchronous Non-blocking I/O 방식
- back pressure 지원
- Netty, UnderTow, Servlet 3.1+ 컨테이너에서 구동됨
- Non Blocking 기반으로 코드를 작성하고 DB도 Non Blocking 기반을 써야 효과가 있다.
webflux 탄생 배경?
- 적은 수의 쓰레드로 동시 처리를 제어하고 적은 하드웨어 리소스를 확장하기 위해 논블로킹 웹 스택이 필요했기 때문(비동기/논블록을 이용해서 더 적은 자원으로 더 많은 트래픽을 처리하기 위함)
- 이벤트 루핑 기법을 활용하여 쓰레드 당 많은 요청을 처리할 수 있어서 한 연결당 소요 비용이 더 경제적임( 비용 과다는 콜백으로 등록하고 다른 요청 받음)
reactive?
- 변화에 반응하는 것을 중심에 두고 만든 프로그래밍 모델
- 사용자가 해당 소프트웨어를 사용하기 위해서 어떤 입력을 발생시켰을 때 꾸물거리지 않고 최대한 빠른 시간 내에 응답을 한다는 의미
Reactor?
- Reactor는 Pivotal의 오픈소스 프로젝트로, JVM 위에서 동작하는 논블럭킹 애플리케이션을 만들기 위한 리액티브 라이브러리
- Reactive Streams 을 구현한 구현체 중 하나
- spring webflux가 선택한 라이브러리
back pressure?
- 컨슈머 랙이 걸리지 않도록 이벤트 속도를 제어
- Publisher의 일방적 데이터 Push 가 아니라, Subscriber가 처리할 수 있을 만큼의 데이터만 Subscriber의 요청에 의해서 전달해주는 것
Reactive Streams 구동방식?
- 리액티브 스트림즈는 Publisher를 이용해서 스트림을 정의하며 Subscriber를 이용해서 발생한 신호를 처리한다. Subscriber가 Publisher로부터 신호를 받는 것을 구독이라고 한다
- 스트림은 0개 이상의 next 신호를 발생할 수 있다. next 신호는 데이터를 담는다. complete 신호는 스트림이 끝났음을 의미하며 error 신호는 에러가 발생했음을 의미한다. complete와 error는 둘 중 하나만 발생할 수 있으며, 이 두 신호는 발생하지 않을 수도 있다.
Flux와 Mono?
Publisher<T> 의 구현체는 Flux와 Mono 두개임.
- Flux와 Mono는 Reactive Streams의 Publisher를 구현한 객체며 차이점은 발행하는 데이터 갯수입니다.
- Flux : 0 ~ N 개의 데이터 전달
- Mono : 0 ~ 1 개의 데이터 전달
- Flux는 0개 이상의 데이터를 발생하므로 0개 이상의 next 신호를 발생할 수 있고 complete나 error 신호를 발생하거나 발생하지 않을 수 있다.
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
public interface Subscription {
public void request(long n);
public void cancel();
}
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
https://javacan.tistory.com/entry/Reactor-Start-1-RS-Flux-Mono-Subscriber
- onSubscribe(Subscription s): 구독을 하면 Publisher와 연동된 Subscription을 받는다. 전달받은 Subscription을 이용해서 Publisher에 데이터를 요청한다.
- onNext(T t): Publisher가 next 신호를 보내면 호출된다.
- onError(Throwable t): Publisher가 error 신호를 보내면 호출된다.
- onComplete(): Publisher가 complete 신호를 보내면 호출된다.
푸시 모델 vs 풀모델
Subscription#request() : pull모델
request(Long.MAX_VALUE): push 모델
Cold/Hot?
- cold 방식(일반적인 방식) :Mono / Flux는 subcribe 하지 않으면, 아무 일도 일어나지 않는다. 또한 subcribe 할 때마다 매번 독립적인 데이터를 생성하고 동작한다.(API호출, Flux.just(), Mono.just()... )
- Hot: Hot은 Cold와 다르게, subcribe를 하기 전에 데이터를 생성할 수도 있고, 한번 생성한 데이터를 subcribe 할 때마다 생성해 놓은 데이터를 통해 동작하게 할 수 있다. (. ).
- Cold의 장점이자 단점인 독립적인 구독은 성능상의 문제가 될 수 있다. 그렇다고 Hot을 무분별하게 사용할 경우 publishing이 되지 않거나, 뒤늦은 streaming 을 받아 원치 않는 동작을 할 수도 있다.
- cold -> hot 전환: just->defer ,ConnectableFlux , share...
- ConnectableFlux로 변환하는 과정
1.publish()라는 연산자를 호출시 ConnectableFlux로 변경가능
2.연결해서 완성(방법3가지)
1) 구독 모두 완료 후에 connect()호출해서 실행
2) publish() 후 autoConnect(구독수) 넣어서 일정 구독수 되면 자동 connect()되게 함
autoConnect/refCount: 이름 그대로 최소 구독 개수를 만족하면 자동으로 connect()를 호출하는 역할
3) publish() 후 refCount(구독수) 넣어서 일정 구독수 되면 자동 connect()되게 함
refCount는 autoConnect가 하는 일에 더해서 구독하고 있는 구독자의 개수를 세다가 하나도 구독하는 곳이 없으면
기존 소스의 스트림도 구독을 해제하는 역할을 함
publishOn vs SubcribeOn?
publishOn()
publishOn() 앞쪽의 publisher chain는 그대로 두고 "뒷쪽의 subscriber chain만 별도의 쓰레드로 분리"함
비유하자면 "과거와는 결별하고 나는 나만의 길을 가겠어!"라는 의미에 가까움
일반적으로 빠른 publisher와 느린 subscriber로 chain이 구성될 때 사용함. 외부 의존성에 쓰기를 수행할 때 필요함
subscribeOn()
subscribeOn() "앞쪽의 publisher chain와 뒷쪽의 subscriber chain을 함께 묶어서 별도의 쓰레드로 분리"함
비유하자면 "과거에 책임자들을 다 내쫓고 내가 다 책임지겠어!"에 가까움
subscribeOn()이 호출되는 위치와 상관없음
일반적으로 느린 publisher와 빠른 subscriber로 구성된 chain에서 사용함. 외부 의존성으로부터 읽어올 때 subscribeOn()을 사용하는 게 필요함
출처 :https://wiki.terzeron.com/Programming/Java/Reactor_Flux의_publishOn_subscribeOn을_이용한_스케쥴링
https://www.woolha.com/tutorials/project-reactor-publishon-vs-subscribeon-difference
메서드?
just: 시퀀스로 사용할 데이터가 이미 존재할 때 사용 Flux.just(1, 2, 3); Mono.just(1);
range: 순차적으로 증가하는 Integer를 생성 Flux.range(11, 5);
generate: 데이터를 함수를 이용해서 생성할 수 있다
create: 비동기나 push 방식으로 데이터를 발생
fromStream: 자바 8의 Stram에서 Flux를 생성할 수 있다
fromIterable: Iterable을 이용해서 Flux를 생성할 수 있다
map: 자바 스트림의 map()과 유사
map 동기식 방식 !! 가능하면 사용금지!!
flatMap: flatMap은 1개의 데이터로부터 시퀀스를 생성할 때 사용한다.
즉 1-n 방식의 변환을 처리한다.flatMap에 전달한 함수가 생성하는 각 Flux는 하나의 시퀀스처럼 연결된다. 그래서 flatMap()의 결과로 생성되는 Flux의 타입이 Flux<Flux<Integer>>가 아니라 Flux<Integer>다
값을 꺼내서 새로운 Publisher로 바꿔줄 수 있는 연산자
flatmap 비동기식
flatMap은 리턴하는 Publisher가 비동기로 동작할 때 순서를 보장하지 않으므로, 순서 보장을 하려면 flatMapSequential 또는 concatMap을 사용해야한다.
flatMapSequential과 concatMap의 차이는 concatMap은 인자로 지정된 함수에서 리턴하는 Publisher의 스트림이 다 끝난 후에 그다음 넘어오는 값의 Publisher스트림을 처리하지만, flatMapSequential은 일단 오는 대로 구독하고 결과는 순서에 맞게 리턴하는 역할을 해서, 비동기 환경에서 동시성을 지원하면서도 순서를 보장할 때 쓰이는 것이 차이점
Flux<Integer> seq = Flux.just(1, 2, 3)
.flatMap(i -> Flux.range(1, i)) // Integer를 Flux<Integer>로 1-N 변환
seq.subscribe(System.out::println);
1 -> Flux.range(1, 1) : [1] 생성
2 -> Flux.range(1, 2) : [1, 2] 생성
3 -> Flux.range(1, 3) : [1, 2, 3] 생성
1
1
2
1
2
3
filter : 필터
defaultIfEmpty : 빈 시퀀스인 경우 기본 값 사용하기
switchIfEmpty : 빈 시퀀스인 경우 다른 시퀀스 사용하기
startWith : 특정 값으로 시작하는 시퀀스로 변환
concatWithValues : 특정 값으로 끝나는 시퀀스로 변환
concatWithValues: 특정 값으로 끝나는 시퀀스로 변환
cancatWith: 시퀀스 순서대로 연결
mergeWith: 시퀀스 발생 순서대로 섞기
zipWith: 시퀀스 묶기
combineLatest: 시퀀스 묶기(가장 최근의 데이터를 쌍으로 만든다)
take(처음 n개), takeLast(마지막 n개) : 지정한 개수/시간에 해당하는 데이터만 유지
skip(처음 n개), skipLast(마지막 n개) : 지정한 개수/시간만큼 데이터 거르기
collectList : List 콜렉션으로 모으기
collectMap : Map 콜렉션으로 모으기
collectMultiMap : Map의 값을 콜렉션으로 모으기
count : 개수 새기
reduce : 누적하기
scan : 누적하면서 값 생성하기
window(int), window(int, int) : 일정 개수로 묶어서 Flux 만들기
window(Duration), window(Duration, Duration): 일정 시간 간격으로 묶어서 Flux 만들기
windowUntil(Predicate) : 특정 조건에 다다를 때가지 묶어서 Flux 만들기
windowWhile(Predicate) : 특정 조건을 충족하는 동안 묶어서 Flux 만들기
buffer류 메서드 : Flux 대신 List로 묶기
리액터 쓰레드 스케줄링?
리액터는 비동기 실행을 강제하지 않는다
1)publishOn을 이용한 신호 처리 쓰레드 스케줄링
publishOn() 메서드를 이용하면 Subscriber의 onNext(), onComplete(), onError()신호를 별도 쓰레드로 처리할 수 있다. map(), flatMap() 등의 변환도 publishOn()이 지정한 쓰레드를 이용해서 처리한다
publishOn()에 지정한 스케줄러는 다음 publishOn()을 설정할 때까지 적용된다
이 연산자가 호출된 위치 이후에 실행되는 연산자들은 publishOn에서 지정된 스케줄러에서 실행되도록 할 수 있습니다.
2)subscribeOn을 이용한 구독 처리 쓰레드 스케줄링
해당 스트림을 구독할 때 동작하는 스케줄러를 지정할 수 있습니다
subscribeOn()을 사용하면 Subscriber가 시퀀스에 대한 request 신호를 별도 스케줄러로 처리한다
https://velog.io/@nkjang/publishOn-subscribeOn
https://javacan.tistory.com/entry/Reactor-Start-6-Thread-Scheduling
https://100100e.tistory.com/569
https://dreamchaser3.tistory.com/6
subscribeOn으로 지정한 스케줄러는 첫 번째 publishOn이 올때까지 적용
로그
.log() 블로킹 i/o 일으킴. 꼭 제거해줘야함
map 동기식 방식
flatmap 비동기식
성능 향상 개선
TPS & response Time 95% 같이 봐야함.
하위 95%의 응답속도..
1)log()제거 RollingFileAppender로 변경해야
2)BlockHound 블로킹 코드 찾아줌( 테스트코드에서 테스트!!)
3)Lettuce 설정
Connection Validation시 synchronous로 동작
command 실행마다 ping command를 synchroous로 실행
성능하락원인
factory.setValidateConnection(true); <—문제임!!
4)Avoiding Reactor MeltDown( from InfoQ)
Reactor MeltDown
Event Loop 의 Tread들이 Blocking API 때문에 Reactor 시스템이 hang 걸리는 현상
Blocking API를 별도의 Thread Pool로 격리시키는 방법
subcribeOn(), publishOn()
webflux 프로그래밍 모델???
1) 어노테이션 기반 Controllers
스프링 MVC과 동일하며 spring-web 모듈에 있는 같은 어노테이션과 같다. 스프링 MVC와 웹플럭스 컨트롤러 모두 리액티브(Reactor, RxJava) 리턴 타입을 지원하기 때문에 이 둘을 구분하기 어렵다. 한 가지 눈에 띄는 차이는 웹플럭스에선 @RequestBody 로 리액티브 인자를 받을 수 있다는 것이다.
2) 함수형 Endpoints
요청을 라우팅해주는 경량화된 람다 기반 함수형 프로그래밍 모델.
어노테이션으로 의도를 선언해서 콜백 받기보단 요청을 어플리케이션이 처음부터 끝까지 다 제어함.
Node.js의 Express와 비슷
curl https://start.spring.io/starter.tgz \
-d bootVersion=2.4.4 \
-d dependencies=webflux \
-d baseDir=spring-webflux-tutorial \ <— 폴더명 보통은 artifactId와 똑같이함.
-d artifactId=webflux \ <— intellij에서 project name이 됨
-d packageName=com.dev.webflux \
-d applicationName=HelloWebFluxApplication \ <— 시작하는 클래스명
-d type=gradle-project | tar -xzvf -
curl -i localhost:8080
curl -i localhost:8080 -H 'Accept: text/event-stream'
curl -i localhost:8080 -H 'Accept: application/stream+json'
webflux 모니터링 어떻게 해야되나??
쓰레드가 계속 변경이 되서 우리가 알고 있는 apm 제니퍼같은 경우 모니터링이 안된다. 어떻게 해야할까?
1)Reactor With MDC를 사용하면됨!!!
2)Spring Boot Micrometer
https://techblog.woowahan.com/2667/
responseEntity 관련
https://dzone.com/articles/webflux-reactive-programming-with-spring-part-3
https://stackoverflow.com/questions/57769190/what-is-the-difference-between-responseentitymono-and-monoresponseentity-as
https://codehunter.cc/a/spring/what-is-the-difference-between-responseentity-mono-and-mono-responseentity-as-a-return-type-of-a-rest-controller
참고
https://tech.kakao.com/2018/05/29/reactor-programming/
https://javacan.tistory.com/entry/Reactor-Start-1-RS-Flux-Mono-Subscriber?category=699082
'프로그래밍 > Spring' 카테고리의 다른 글
Spring batch 요약 (0) | 2020.10.25 |
---|---|
@TransactionalEventListener (0) | 2020.10.22 |
Spring Cloud Netflix (0) | 2020.10.18 |
Spring Kafka Basic (0) | 2020.08.25 |
ObjectMapper & OrikaMapper basic (0) | 2020.08.24 |