이전에는 데이터 컬렉션을 병렬로 처리하기가 어려웠다.
자바 7은 포크(fork)/조인(join) 프레임워크 기능을 제공하여 더 쉽게 병렬화를 수행하면서 에러를 최소화할 수 있도록 하였다.
컬렉션에 parallelStream을 호출하면 병렬 스트림(parallel stream)이 생성된다.
병렬 스트림이란 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림으로, 모든 멀티코어 프로세서가 각각의 청크를 처리하도록 할당할 수 있다.
숫자 n을 인수로 받아서 1부터 n까지의 모든 숫자의 합계를 반환하는 메서드를 구현해보자.
public static long sequentialSum(long n) {
return Stream.iterate(1L, i -> i + 1) // 무한 자연수 스트림 생성
.limit(n)
.reduce(Long::sum) // 스트림 리듀싱 연산
.get();
}순차 스트림에 parallel 메서드를 호출하면 기존의 함수형 리듀싱 연산이 병렬로 처리된다.
public static long parallelSum(long n) {
return Stream.iterate(1L, i -> i + 1)
.limit(n)
.parallel() // 스트림을 병렬 스트림으로 변환
.reduce(Long::sum)
.get();
}- 스트림이 여러 청크로 분할되어 있어, 리듀싱 연산을 여러 청크에 병렬로 수행할 수 있다.
- 리듀싱 연산으로 생성된 부분 결과를 다시 리듀싱 연산으로 합쳐서 전체 스트림의 리듀싱 결과를 도출한다.
sequential로 병렬 스트림으르 순차 스트림으로 바꿀 수 있다.
💡
parallel과sequential두 메서드 중 최종적으로 호출된 메서드가 전체 파이프라인에 영향을 미친다.
병렬 스트림은 내부적으로 ForkJoinPool을 사용한다. (7.2절에서 자세히 설명)
이는 기본적으로 프로세서 수, 즉 Runtime.getRuntime().availableProcessors()가 반환하는 값에 상응하는 스레드를 갖는다.
아래 코드는 전역 설정 코드로, 모든 병렬 스트림 연산에 영향을 준다.
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "12");자바 마이크로벤치마크 하니스(Java Microbenchmark Harness, JMH) 라이브러리를 통해 벤치마크를 구현하고, 위 코드들의 성능을 측정해보자.
코드 : src/main/java/chapter07/ParallelStreamBenchmark.java
병럴화를 이용하면 순차나 반복 형식에 비해 성능이 더 좋아질 것이라고 추측했지만 그 결과는 반대였다.
전통적인 for 루프를 사용해 반복하는 방법에 비해 순차적 스트림을 사용하는 버전은 4배 정도 느렸고,
병렬 스트림을 사용하는 버전은 5배 정보나 느렸다.
병렬 스트림이 더 느렸던 이유는 다음과 같다.
- 반복 결과로 박싱된 객체가 만들어지므로 숫자를 더하려면 언박싱을 해야 한다.
- 반복 작업(iterate)은 병렬로 수행할 수 있는 독립 단위로 나누기가 어렵다. (본질적으로 순차적이기 때문)
이런 상황에서는 ‘7.1.1 순차 스트림을 병렬 스트림으로 변환하기’에서의 그림처럼 리듀싱 연산이 수행되지 않는다.
리듀싱 과정을 시작하는 시점에 전체 숫자 리스트가 준비되지 않았으므로 스트림을 병렬로 처리할 수 있도록 청크로 분할할 수 없기 때문이다.
이처럼 병렬 프로그래밍을 오용하면 오히려 전체 프로그램의 성능이 더 나빠질 수 있다.
5장에서 배운 LongStream.rangeClosed을 사용해보자.
- 기본형 long을 직접 사용하므로 박싱과 언박싱 오버헤드가 사라진다.
- 쉽게 청크로 분할할 수 있는 숫자 범위를 생산한다.
@Benchmark
public long rangedSum() {
return LongStream.rangeClosed(1, N)
.reduce(0L, Long::sum);
}
// 병령 스트림
@Benchmark
public long parallelRangedSum() {
return LongStream.rangeClosed(1, N)
.parallel()
.reduce(0L, Long::sum);
}성능을 측정해보면 기존의 iterate 팩토리 메서드로 생성한 순차 버전에 비해 숫자 스트림 처리 속도가 더 빠르다!
이번에는 ‘7.1.1 순차 스트림을 병렬 스트림으로 변환하기’에서의 그림처럼 실질적으로 리듀싱 연산이 병렬로 수행된다.
💡 함수형 프로그래밍을 올바로 사용하면 병렬 실행의 힘을 직접적으로 얻을 수 있다.
하지만 병렬화가 완전 공짜는 아니다.
스트림을 재귀적으로 분할해야 하고,
각 서브스트림을 서로 다른 스레드의 리듀싱 연산으로 할당하고,
이들 결과를 하나의 값으로 합쳐야 한다.
멀티코어 간의 데이터 이동은 생각보다 비싸다.
병렬 스트림을 잘못 사용하면서 발생하는 많은 문제는 공유된 상태를 바꾸는 알고리즘을 사용하기 때문에 일어난다.
💡 병렬 스트림과 병렬 계산에서는 공유된 가변 상태를 피해야 한다.
언제나 병렬 스트림이 순차 스트림보다 빠른 것은 아니며, 병렬 스트림의 수행 과정은 투명하지 않을 때가 많다.
순차 스트림과 병렬 스트림 중 어떤 것이 좋을지 모르겠다면 적절한 벤치마크로 직접 성능을 측정하는 것이 바람직하다.
자동 박싱과 언박싱은 성능을 크게 저하시킬 수 있는 요소다.
자바 8은 박싱 동작을 피할 수 있도록 기본형 특화 스트림(IntStream, LongStream, DoubleStream)을 제공하며, 되도록이면 이들을 사용하는 것이 좋다.
limit나 findFirst처럼 요소의 순서에 의존하는 연산은 병렬 스트림에서 수행하려면 비싼 비용을 치러야 한다.
findAny는 요소의 순서와 상관없이 연산하므로 findFirst보다 성능이 좋다.
전체 스트림 파이프라인 처리 비용 = N*Q
N: 처리해야 할 요소 수Q: 하나의 요소를 처리하는 데 드는 비용
Q가 높아진다 → 병렬 스트림으로 성능을 개선할 수 있는 가능성이 있다.
부가 비용을 상쇄할 수 있을 만큼의 이득을 얻지 못한다.
ArrayList를 LinkedList보다 효율적으로 분할할 수 있다.
ArrayList: 요소를 탐색하지 않고도 리스트를 분할할 수 있다.LinkedList: 분할하려면 모든 요소를 탐색해야 한다.
range 팩토리 메서드로 만든 기본형 스트림도 쉽게 분해할 수 있다.
SIZED 스트림은 정확히 같은 크기의 두 스트림으로 분해할 수 있으므로 효과적으로 스트림을 병렬 처리할 수 있다.
필터 연산이 있으면 스트림의 길이를 예측할 수 없으므로 효과적으로 스트림을 병렬 처리할 수 있을지 알 수 없게 된다.
e.g., Collector의 combiner 메서드
병합 과정의 비용이 비싸다면 병렬 스트림으로 얻은 성능의 이익이 서브스트림의 부분 결과를 합치는 과정에서 상쇄될 수 있다.
자바 7에서 추가된 포크/조인 프레임워크로 병렬 스트림이 처리된다.
다음은 다양한 스트림 소스의 병렬화 친밀도를 요약한 것이다.
이는 병렬화할 수 있는 작업을 재귀적으로 작은 작업으로 분할한 다음에 서브태스크 각각의 결과를 합쳐서 전체 결과를 만들도록 설계되었다.
포크/조인 프레임워크에서는 서브태스크를 스레드 풀(ForkJoinPool)의 작업자 스레드에 분산 할당하는 ExecutorService 인터페이스를 구현한다.
스레드 풀을 이용하려면 RecursiveTask<R>의 서브클래스를 만들어야 한다.
R
- 병렬화된 태스크가 생성하는 결과 형식
- 결과가 없을 때는 RecursiveAction 형식
RecursiveTask를 정의하려면 추상 메서드 compute를 구현해야 하는데,
이는 태스크를 서브태스크로 분할하는 로직과 더 이상 분할할 수 없을 때 개별 서브태스크의 결과를 생산할 알고리즘을 정의한다.
// pseudo code
if (태스크가 충분히 작거나 더 이상 분할할 수 없으면) {
순차적으로 태스크 계산
} else {
태스크를 두 서브태스크로 분할
태스크가 다시 서브태스크로 분할되도록 이 메서드를 재귀적으로 호출함
모든 서브태스크의 연산이 완료될 때까지 기다림
각 서브태스크의 결과를 합침
}
이 알고리즘은 분할 후 정복(divide-and-conquer) 알고리즘의 병렬화 버전이다.
포크/조인 프레임워크를 이용해서 n까지의 자연수 덧셈 작업을 병렬로 수행하는 방법은 다음과 같다.
코드 : src/main/java/chapter07/forkjoin
성능을 측정해보면 병렬 스트림을 이용할 때보다 성능이 나빠진 것을 볼 수 있다.
이는 ForkJoinSumCalculator 태스크에서 사용할 수 있도록 전체 스트림을 long[]으로 변환했기 때문이다.
join 메서드를 태스크에 호출하면 태스크가 생산하는 결과가 준비될 때까지 호출자를 블록(block)시킨다.
대신 compute나 fork 메서드를 직접 호출할 수 있다.
순차 코드에서 병렬 계산을 시작할 때만 invoke를 사용한다.
왼쪽 작업과 오른쪽 작업 모두에 fork 메서드를 호출하는 것이 자연스러울 것 같지만
한쪽 작업에는 fork를 호출하는 것보다는 compute를 호출하는 것이 효율적이다.
그러면 두 서브태스크의 한 태스크에는 같은 스레드를 재사용할 수 있으므로 풀에서 불필요한 태스크를 할당하는 오버헤드를 피할 수 있다.
병렬 처리로 성능을 개선하려면 태스크를 여러 독립적인 서브태스크로 분할할 수 있어야 한다.
각 서브태스크의 실행시간은 새로운 태스크를 포킹하는 데 드는 시간보다 길어야 한다.
💡 포크/조인 분할 전략에서는 주어진 서브태스크를 더 분할할 것인지 결정할 기준을 정해야 한다.
포크/조인 프레임워크에서는 작업 훔치기(work stealing) 기법을 통해 ForkJoinPool의 모든 스레드를 거의 공정하게 분배한다.
각각의 스레드는 자신에게 할당된 태스크를 포함하는 이중 연결 리스트(doubly linked list)를 참조하면서 작업이 끝날 때마다 큐의 헤드에서 다른 태스크를 가져와서 작업을 처리한다.
한 스레드는 다른 스레드보다 자신에게 할당된 태스크를 더 빨리 처리할 수 있고, 할일이 없어진 스레드는 다른 스레드 큐의 꼬리에서 작업을 훔쳐온다.
풀에 있는 작업자 스레드의 태스크를 재분배하고 균형을 맞출 때 작업 훔치기 알고리즘을 사용한다.
하지만 우리는 분할 로직을 개발하지 않고도 병렬 스트림을 이용할 수 있었다.
즉, 스트림을 자동으로 분할해주는 기능이 존재한다. (Spliterator)
자바 8은 Spliterator(splitable iterator, 분할할 수 있는 반복자)라는 새로운 인터페이스를 제공한다.
Iterator처럼 소스의 요소 탐색 기능을 제공하지만, 병렬 작업에 특화되어 있다.
자바 8은 컬렉션 프레임워크에 포함된 모든 자료구조에 사용할 수 있는 디폴트 Spliterator 구현을 제공한다.
컬렉션은 spliterator라는 메서드를 제공하는 Spliterator 인터페이스를 구현한다.
public interface Spliterator<T> {
boolean tryAdvance(Consumer<? super T> action); // Spliterator의 요소를 하나씩 순차적으로 소비하면서 탐색해야 할 요소가 남아있으면 참을 반환
Spliterator<T> trySplit(); // Spliterator의 일부 요소(자신이 반환한 요소)를 분할해서 두 번째 Spliterator를 생성
long estimateSize(); // 탐색해야 할 요소 수 정보 제공
int characteristics(); // Spliterato 자체의 특성 집합을 포함하는 int를 반환
}스트림을 여러 스트림으로 분할하는 과정은 재귀적으로 일어난다.
이 분할 과정은 characteristics 메서드로 정의하는 Spliterator의 특성에 영향을 받는다.
문자열의 단어 수를 계산하는 메서드를 구현해보자.
전체 코드 : src/main/java/chapter07/wordcount
코드 : src/main/java/chapter07/wordcount/WordCounter.java
하지만 이것을 실행해보면 원하는 결과가 나오지 않는다.
원래 문자열을 임의의 위치에서 둘로 나누다보니 하나의 단어를 둘로 계산하는 상황이 발생할 수 있기 때문이다.
💡 순차 스트림을 병렬 스트림으로 바꿀 때 스트림 분할 위치에 따라 잘못된 결과가 나올 수 있다.
문자열을 단어가 끝나는 위치에서만 분할하는 방법으로 위의 문제를 해결할 수 있다.
코드 : src/main/java/chapter07/wordcount/WordCounterSpliterator.java
Spliterator는 첫 번째 탐색 시점, 첫 번째 분할 시점, 또는 첫 번째 예상 크기(estimatedSize) 요청 시점에 요소의 소스를 바인딩할 수 있다.
이와 같은 동작을 늦은 바인딩 Spliterator라고 부른다.








