Java Parallel Streams
Processe streams Java em paralelo para ganhar velocidade — quando parallelStream ajuda e quando piora.
Um stream paralelo é o mesmo pipeline de stream que você já conhece, mas a JVM pode dividir a fonte em partes e processá-las em múltiplas threads. A mudança no ponto de chamada é mínima:
long total = nums.parallelStream().mapToLong(n -> heavy(n)).sum();
// ^^^^^^^^^^^^^^^^^ou:
long total = nums.stream().parallel().mapToLong(n -> heavy(n)).sum();O formato do pipeline, as operações, o resultado — tudo inalterado. O que muda é quem executa: em vez de uma única thread percorrendo a fonte, vários trabalhadores do ForkJoinPool comum (um por núcleo de CPU, menos um) dividem o trabalho e um coordenador mescla os resultados parciais. Quando o trabalho por elemento é suficientemente pesado e a fonte divide bem, o pipeline termina em aproximadamente tempo-de-relógio / núcleos. Quando isso não acontece, o paralelo é mais lento que o sequencial — e às vezes incorreto. Este capítulo trata de distinguir essas situações.
O que "paralelo" realmente faz
Um stream sequencial puxa um elemento pelo pipeline, depois o próximo. Um stream paralelo:
- Divide a fonte em sub-streams por meio do
Spliteratorda fonte. Arrays,ArrayList,IntStream.rangee fontes similares dividem bem em O(1).LinkedList,Files.lines,Stream.iterateeStream.generatedividem mal ou se recusam a dividir. - Executa a cadeia intermediária de cada sub-stream em uma thread trabalhadora do pool comum.
- Mescla os resultados parciais — para
reduceecollect, é para isso que serve ocombiner.
forEach em um stream paralelo chama seu Consumer a partir de múltiplas threads de forma concorrente e em ordem não especificada. forEachOrdered preserva a ordem de encontro ao custo de sincronização. findFirst em paralelo é mais caro que findAny pelo mesmo motivo — precisa coordenar para identificar a primeira correspondência.
O contrato — o que seu pipeline deve satisfazer
O paralelo só fornece uma resposta correta quando o pipeline obedece a três regras. Código sequencial que as ignora por acaso ainda funciona; código paralelo que o faz produz silenciosamente resultados sem sentido.
- O redutor deve ser associativo.
f(f(a, b), c) == f(a, f(b, c)).+,*,max,min, união de conjuntos e concatenação de listas se qualificam. Subtração, divisão, "primeira correspondência" e "append de lista com ordem" não. Se você passar umBinaryOperatornão associativo parareduceouCollectors.reducing, a resposta depende de como a JVM decidir dividir. - O pipeline deve ser sem estado. Seus lambdas não devem ler nem escrever em estado mutável compartilhado. Um lambda que captura e modifica um
ArrayListexterno, incrementa umint[]externo ou usa qualquer contador não atômico vai gerar condição de corrida em paralelo. - O pipeline deve ser livre de efeitos colaterais. Registrar logs está bem; persistir por meio de um sink thread-safe está bem; todo o resto é um bug esperando que um trabalhador o intercale de forma diferente.
Os coletores embutidos em Collectors satisfazem as regras 1–3 por construção (quando usados conforme documentado). Seus próprios lambdas dentro de map, filter, reduce e peek são os que devem ser observados.
Quando o paralelo ajuda (e quando não ajuda)
Um stream paralelo só vence quando o trabalho por elemento é grande o suficiente para superar o custo de coordenação — divisão, agendamento, mesclagem e o overhead do framework. Um modelo mental aproximado:
- Fonte grande + trabalho por elemento vinculado à CPU + mesclagem barata + fonte divisível = paralelo costuma vencer. Processamento de imagem por pixel, análise por registro, hashing por arquivo — casos clássicos.
- Fonte pequena = sequencial vence. O custo de inicialização do pool é mais caro que toda a computação.
- Trabalho por elemento barato = sequencial vence.
nums.stream().mapToInt(Integer::intValue).sum()é mais rápido que sua versão comparallelStream()até quenumsesteja na casa dos milhões; em tamanhos pequenos, o overhead do framework domina. - I/O bloqueante por elemento = streams paralelos são a ferramenta errada. O
ForkJoinPoolcomum é dimensionado para trabalho de CPU; uma chamada de I/O bloqueante prende um trabalhador e priva todos os outros streams paralelos na JVM (incluindo os de bibliotecas). UseCompletableFuturecom um executor limitado para fan-out de I/O. - Fonte não divisível = o paralelo ou cai de volta para sequencial ou divide mal.
Files.lines,Stream.iterate,Stream.generateeLinkedList.stream()são os divisores ruins canônicos; arrays,ArrayListeIntStream.rangesão os bons.
O conselho honesto: prefira sequencial por padrão; mude para paralelo apenas quando tiver uma razão medida, com números do jmh ou de tempo de relógio em mãos.
Operações que ficam estranhas em paralelo
Algumas operações cujo significado muda quando o pipeline vai a paralelo:
forEach— executa a partir de múltiplas threads, em ordem não especificada. Se a ordem importa, useforEachOrdered(que tem custo de sincronização).findFirst— precisa coordenar entre os trabalhadores para identificar a primeira correspondência na ordem de encontro. UsefindAnyse não importa qual correspondência vence.limit/skip— bem definidos em streams ordenados, mas mais caros em paralelo porque a JVM deve respeitar a ordem. Em um stream paralelo onde a ordem não importa,stream.parallel().unordered().limit(n)é mais barato.distinct/sorted— devem coordenar entre trabalhadores; o buffer que mantêm é compartilhado.reducecom a sobrecarga de 3 argumentos usa ocombinerpara mesclar as saídas dos trabalhadores. Com a sobrecarga de 2 argumentos, a JVM usa a identidade duas vezes mais o acumulador — mesmo contrato, mesma regra de associatividade.collect— osCollectorssão projetados para serem seguros em paralelo; o detalhe é que o container de resultado pode ser umHashMapouArrayListcomum, e a coleção paralela coordena internamente para manter isso seguro. Seus coletores downstream devem obedecer ao contrato.
A armadilha do estado compartilhado, de forma concreta
O bug mais comum em código paralelo de iniciantes:
// WRONG -- looks fine, races in parallel
List<String> shouts = new ArrayList<>();
words.parallelStream().forEach(w -> shouts.add(w.toUpperCase()));ArrayList.add não é thread-safe; trabalhadores concorrentes podem perder elementos, adicionar duplicatas, lançar ArrayIndexOutOfBoundsException ou corromper a lista silenciosamente. A versão correta expressa o resultado como a saída do pipeline, não como um efeito colateral dele:
List<String> shouts = words.parallelStream().map(String::toUpperCase).toList();toList(), como todo outro coletor e terminal que produz um valor, é projetado para uso paralelo. No momento em que você recorre a um forEach que modifica uma variável externa, você saiu do caminho seguro.
Se você realmente precisa de um sink thread-safe para forEach, use ConcurrentLinkedQueue, AtomicLong, LongAdder ou Collections.synchronizedList(...). Mas quase sempre, a resposta certa é "não use forEach para acumulação; deixe o pipeline construir o resultado."
ForkJoinPool e por que isso importa
Por padrão, todo stream paralelo na sua JVM compartilha o pool comum, dimensionado com Runtime.getRuntime().availableProcessors() - 1 threads trabalhadoras. Isso tem duas consequências:
- Um stream paralelo de longa duração monopoliza o pool. Qualquer outro stream paralelo — incluindo os dentro de bibliotecas — ficará na fila atrás dele.
- Um stream paralelo que bloqueia (I/O, locks,
Thread.sleep) prende uma thread trabalhadora sem fazer nenhum trabalho, reduzindo pela metade o tamanho efetivo do pool enquanto aguarda.
Você pode dedicar um pool privado para um pipeline específico:
try (var pool = new java.util.concurrent.ForkJoinPool(4)) {
long total = pool.submit(() ->
nums.parallelStream().mapToLong(n -> heavy(n)).sum()
).get();
}Esta é a decisão certa para computação de longa duração que você não quer compartilhar com o restante da JVM. Ainda é a decisão errada para I/O bloqueante — mude para threads virtuais ou uma cadeia explícita de CompletableFuture em um executor de I/O limitado.
Um exemplo trabalhado: aceleração paralela, a armadilha do estado compartilhado e um bug de associatividade
O programa abaixo mede o tempo sequencial versus paralelo para uma soma IntStream vinculada à CPU, demonstra a condição de corrida de estado compartilhado com forEach, mostra a versão correta baseada em coletor e contrasta redutores associativos (Integer::sum) com não associativos ((a, b) -> a - b) em paralelo.
O que extrair da execução:
- A soma paralela produziu o mesmo resultado que a sequencial e (em qualquer máquina multi-núcleo) terminou em uma fração do tempo de relógio. A chamada
heavypor elemento é vinculada à CPU e a fonte (umint[]) divide bem — os dois ingredientes que o paralelo precisa. - O
forEachque modificoubadSinkperdeu elementos ou travou. Não há correção que adicione umsynchronizedaqui sem tornar a versão paralela mais lenta que a sequencial. A correção é não escreverforEachpara acumulação — use um coletor ou um terminal que produz o resultado. Integer::sumé associativo; a redução paralela produziu a mesma resposta que a sequencial. O(a, b) -> a - bnão associativo produziu respostas diferentes em sequencial versus paralelo porque a JVM é livre para dividir e mesclar em qualquer ordem associativamente equivalente. Mesmo código, duas respostas — o sintoma que todo bug de streams paralelos eventualmente produz.parallel().forEach(...)imprimiu0..15em alguma ordem não monótona;parallel().forEachOrdered(...)os imprimiu em ordem ao custo de sincronização entre trabalhadores. Se seuforEachse importa com a ordem, você está pagando por isso.- O
ForkJoinPool(2)privado executou o pipeline em um pool dedicado. Use isso quando você tem um job de computação de longa duração e não quer que ele compartilhe o pool comum com o restante da JVM. Não o use como remendo para I/O bloqueante — esse é um problema diferente com uma ferramenta diferente.
O que vem a seguir
Agora você pode raciocinar sobre qualquer pipeline de stream: quando escrever um, como construí-lo, o que é lazy, o que faz curto-circuito, o que executa em paralelo com segurança e o que não. Uma abstração central ainda está na mesa — aquela que permite que um pipeline expresse "este valor pode estar ausente" sem um único null. O próximo capítulo, Java Optional, cobre Optional<T> — o que é, onde a API de stream deixa suas pontas soltas e como usar map, flatMap, orElse e ifPresent para escrever código que é null-safe por construção.