RxJava의 concatMapEager() 연산자: 동시성 + 순서 보장

RxJava에서는 flatMap을 무척 많이 사용하게 된다. 문제는 flatMap의 특성(merge)상 목록 순서가 꼬이는 경우가 있다. 특히 비동기 항목과 함께 사용하면 제대로 틀어진다. 해결책으로 concatMap을 이야기하는 경우를 보았지만 내부적으로 concat을 사용하기 때문애 동시성 처리를 하더라도 앞선 항목을 처리할 때 까지 기다려야 하므로 결국 직렬 실행과 차이가 없기에 제대로 된 해결책은 아니다.

RxJava 1.0.15 버전부터 (실험적인1) concatMapEager()라는 operator를 사용할 수 있게 되었다. 이 버전이 처음 나왔을 때가 2015년 8월이므로 그 이전 RxJava에는 물론 이 기능이 없어 다른 방식으로 정렬 문제를 해결했었다. 정렬에 필요한 시간은 네트워크 호출 시간에 비하면 극히 짧으므로 속도 면에서는 분명 이득을 보았지만 정렬 처리를 위해 이런저런 구현을 조금 더 해야 해서 코드가 지저분해진다.

겉보기에는 그냥 단순하게 flatMap이나 concatMap을 쓸 자리에 concatMapEager를 대신 넣었을 뿐인데, 병렬 처리를 하면서 호출 순서까지 보장한다.

Observable.from(list)
    .concatMapEager(deferredClient::findById)
    ...

비록 아직은 실험적인 기능이지만 실제로 concatMapEager 연산자를 이용해서 Couchbase 자바 커넥터에서 뷰 쿼리 정렬 순서가 뒤섞이는 버그를 해결하여 Couchbase Java Client 2.2.5 릴리즈 시 반영했으니 실무에 사용해도 큰 무리는 없겠다. 자세한 내용은 code contribution 상세 내용을 참조하자.

내부적으로는 링 버퍼를 사용해서 순서 보장 문제를 해결한다. 왜 링 버퍼를 사용했을지는 조만간 다시 자세히 이야기할 예정이다.


  1. @Experimental 애노테이션이 붙어 있다 [return]
comments powered by Disqus