Imagine a bacon-wrapped Ferrari. Still not better than our free technical reports.
See all our reports

Java Parallel Streams Are Bad for Your Health!

This post continues the series that started with sneaky default methods in interfaces, which when used unwisely can cause your application to turn into a code mess that you don’t want to look at.

As we claimed previously, Java 8 delivers three major features everyone is eager to use: Lambdas, stream API and default methods in interfaces. Sadly, all of them can easily be abused and can actually be a detriment to your code if you add them to your toolbelt.

Today, we will look at the stream API, specifically parallel streams. If you want to get an overview of the pitfalls that await you with serial stream processing, check out this post on the jOOQ blog by Lukas Eder.

But for now let’s focus on the parallel execution that the stream API is praised for. Allegedly, it might speed up some tasks your application executes by utilizing multiple threads from the default ForkJoinPool.

Mousetrap of parallel streams

Here’s a classic example of the awesomeness that parallel streams promise you. In this example we want to query multiple search engines and return the output from the first to reply.

public static String query(String question) {
    List<String> engines = new ArrayList<String>() {{
      add("http://www.google.com/?q=");
      add("http://duckduckgo.com/?q=");
      add("http://www.bing.com/search?q=");
    }};   
    // get element as soon as it is available
    Optional<String> result = engines.stream().parallel().map((base) -> {
      String url = base + question;
      // open connection and fetch the result
      return WS.url(url).get();
    }).findAny();
    return result.get();
  }

Nice, isn’t it? But let’s dig a bit deeper and check what happens in the background. Parallel streams are processed by the parent thread that ordered the operation and additionally by the threads in the default JVM’s fork join pool: ForkJoinPool.common().

However, an important aspect to notice here is that querying a search engine is a blocking operation. So at some point of time every worker thread will call the get() operation and sit right there waiting for the results to come back.

Hang on, isn’t this what we wanted in the first place? Instead of going through the list and waiting for each url to respond sequentially, we wait on all of the responses at the same time. Saving your time, just like using JRebel does (sorry couldn’t resist :-) ).

However, one side-effect of such parallel waiting is that instead of just the main thread waiting, ForkJoin pool workers are. And given the current ForkJoin pool implementation, which doesn’t compensate workers that are stuck waiting with other freshly spawned workers, at some point of time all the threads in the ForkJoinPool.common() will be exhausted.

Which means next time you call the query method, above, at the same time with any other parallel stream processing, the performance of the second task will suffer!

However, don’t rush to blame the ForkJoinPool implementation, in a different use case you’d be able to give it a ManagedBlocker instance and ensure that it knows when to compensate workers stuck in a blocking call. And get your scalability back.

Now, the interesting bit is, that it doesn’t have to be a parallel stream processing with blocking calls to stall the performance of your system. Any long running function used to map over a collection can produce the same issue.

Consider this example:

long a = IntStream.range(0, 100).mapToLong(x -> {
    for (int i = 0; i < 100_000_000; i++) {
    System.out.println("X:" + i);
  }
  return x; 
}).sum();

This code suffers from the same problem as our networking attempt. Every lambda execution is not instantaneous and during all that time workers won’t be available for other components of the system.

This means that any system that relies on parallel streams have unpredictable latency spikes when someone else occupies the common ForkJoin pool.

So what, I’m the boss in my program anyway, right?

Indeed, if you’re creating an otherwise single-threaded program and know exactly when you intend to use parallel streams, then you might think that this issue is kinda superficial. However, many of us deal with web applications, various frameworks and heavy application servers.

How can a server that is designed to be a host for multiple independent applications, that do who knows what, offer you a predictable parallel stream performance if it doesn’t control the inputs?

One way to do this is to limit the parallelism that the ForkJoinPool offers you. You can do it yourself by supplying the -Djava.util.concurrent.ForkJoinPool.common.parallelism=1, so that the pool size is limited to one and no gain from parallelization can tempt you into using it incorrectly.

Alternatively, a parallelStream() implementation that would accept a ForkJoinPool to be parallelized might be a workaround for that. Unfortunately it is not currently offered by the JDK.

Moral of the story

Parallel streams are unpredictable and complex to use correctly. Almost any use of parallel streams can affect the performance of other unrelated system components in an unpredictable way. I have no doubt that there are people who can manage to use them to their benefit, clearly and correctly. However, I’d think twice before typing stream.parallel() into my code and would look twice when reviewing the code containing it.

Do you think I’m overdramatizing the issue? Leave a comment or find me on Twitter: @shelajev.

  • edharned

    Oleg:

    Another fine article. As more and more people use the parallel option they are finding out it ain’t quite as advertised.

    The common pool eliminates every user having to supply a ForkJoinPool itself. However, as you prove there are unintended consequences. As you point out, the ForkJoinPool.managedBlock() Interface is one way around the stalling threads but you can get into a lot of trouble with compensation threads as follows.

    Not all parallel streams stall without compensation threads. When using nested parallel streams:
    range(0,outerLoop).parallel().forEach(i -> {
    range(0,innerLoop).parallel().forEach(i -> {
    the outer loop threads stall with ForkJoinPool.awaitJoin(). The framework creates continuation threads to keep the level of parallelism steady. Those additional threads can number as many as ten times the proper parallelism level. Therefore, if you set the parallelism level to eight, you may end up with eighty threads. Another unintended consequence.

    The problem of compensation threads persists in classes that use the F/J framework underneath with no way to limit compensation threads. Phasers.arriveAndAwaitAdvance(), CompletableFuture .get() to name two.

    Oracle’s mistake was not following the lead of others by creating a parallel engine within core Java (see Microsoft’s .NET Framework for one.) The F/J framework is the experiment for an academic research paper. It is not, and never will be, a general-purpose application development tool.

  • Oleg Šelajev

    Thanks, Ed.

    I wouldn’t say that current F/J implementation is a mistake. It’s sufficiently fast, parallel and powerful tool. I know you see lots of problems with that and probably would like to see it done differently.

    Also, isn’t Java is an open source language, where anyone who has a strong enough will to change something eventually succeeds?

    Anyway, this particular problem I don’t blame at the FJ pool. Rather than stream API makes it really easy to abuse the particular problem of stalling the system, when you’re not careful enough.

    I was also pointed to a workaround for this issue: when you execute your lamdba from a callable already in some FJP, it get’s executed there instead of the common one.

    Like this:
    ForkJoinPool forkJoinPool = new ForkJoinPool(2);
    forkJoinPool.submit(() ->
    //parallel task here, for example
    range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList())
    ).get();

    It doesn’t remove the problem because we still have to figure out for every stream that we get hands on if it is parallel or not before we call a terminating operation, but it is working.

    What I want to say is not that Java streams or FJP are no good. Rather that it really doesn’t take much to hurt the performance of a complex piece of software if you’re not careful enough.

  • Aleksey Shipilёv

    “How can a server that is designed to be a host for multiple independent applications, that do who knows what, offer you a predictable parallel stream performance if it doesn’t control the inputs?”

    I invite you to offer the solution for predictability problem, even without considering the Parallel Streams, but only a stock thread pools and hand-crafted code. Consider a bunch of webapps deployed on the servlet container: does that container share the fixed-sized thread pool for all the applications (stepping on the very issue you are describing), or does it have independent thread pools for each application (throwing the fairness, i.e. predictability, out of the window, because the system is oversubscribed and maintaining fairness is too damn costly)?

  • Autoletics

    predictability => prioritization => QoS => adaptive control

    The problem here is that thread pools are not the most ideal way to manage workloads and resource consumption. This is especially the case when there is significant diversity and differentiation across activities (tasks) performed and resources consumed. Far better to employ an adaptive control mechanism that is contextual (profile) aware and which is most certainly external to the pooling implementation and scheduling mechanism. QoS and AC seems to have been relatively successful in other domains at managing over subscription of resources.

  • Aleksey Shipilёv

    Okay, it’s still turtles all the way down there, therefore you can’t get away from threads. Abstract from them, maybe, but can you actually demonstrate the “adaptive control mechanism that is contextual (profile) aware and which is most certainly external to the pooling implementation and scheduling mechanism” working in Java ecosystem? Otherwise we are day-dreaming here about what could be done without a proof it is actually doable.

  • Autoletics

    Demonstrations =>
    https://vimeo.com/groups/sentris

    A mapping from the theory to a model and implementation =>
    http://www.infoq.com/articles/QoS-for-Applications

    Now if only the Oracle JVM team had seriously considered my proposed Probes Open API as the means to exchange information between applications and the runtime the Java community would have had by now

    – activity metering
    – resource management
    – adaptive control
    – execution recording
    – simulated playback
    – behavioral (self-)reflection

    all behind a single abstraction of context (thread), probe (activity=>method) and meter (resource)

  • Aleksey Shipilёv

    Ok, there’s a lot of info there. Can you point to the the concrete success story where AC/QoS control solves the real predictability problems in a scalable manner?

    “If only [JVM team] had seriously considered my proposed Probes Open API”. I don’t see the JEP for this anywhere, and there is no sane hits here: http://markmail.org/search/William+Louth. I see the hint you were at JVMLS with a talk, what happened since then? To get seriously considered, you first have to have a serious proposal.

  • Autoletics

    Not terribly interesting for those here but I did on many occasions discuss this in person with the team (twice at JVMLS). In 2003 I even made a presentation to the team in Stockholm when BEA own a JVM. There’s a history which unfortunately is kind of the problem. Anyway the last piece of advice from the senior mgmt team was for me to go fix a few bugs in the JDK and get others to like me first before they would listen to me. I’m not entirely innocent of course but at the end I just wanted the work to stand on its own but that was humanely impossible, apparently, no matter how innovative your work is.

    Here is the final say from Oracle on this matter.

    “Thank you for the interesting presentation and introduction to jInspire. After getting feedback from rest of the organization we decided not to move forward with the discussions.”

    Wow, they could not even get the company name right.

    Now keep in mind that pretty much everybody understanding the technology considers it ground breaking and years ahead.

    The final attempt at finding a home for the research was the publication of the Probes Open API under an Apache license, which is something that IBM had requested before further consideration of my proposal. I did that and then the community claimed it was a “trojan horse”. It was never the tech that was the problem, just pay back for the shit I lobbied, at many and with far too much ease, in the past. A lesson hopefully learned but not without some sores.

    Getting back on track and hopefully without already having lost all reader interest. I don’t see how we can continue to think that developers (or operators) can fix an “optimal” pool size when the optimal set point is ever changing reflecting the state of the runtime, in-flight workloads, resources (and queues),….

    The idea I had related to thread pools, was to treat potential call sites (or methods) as a kind of router/switch in a telecom network and for threads to be paths and call chains as links. QoS services could be associated with a thread, method or call path (chain) and then at each call point resource mgmt was applied using a reservation mechanism which could reflect past profiling of the behavior that followed (or flowed) with the path.

    Thread pools => potential capacity => fixed limits
    QoS => adaptive control => dynamic capacities

  • Aleksey Shipilёv

    I understand the personal feelings about this, but it does feel systematic that you can’t sell this idea (even though you try to do this under somewhat covert channels when there is a public channel in OpenJDK) — can’t resist feeling that is because the idea is impractical and/or not showing the real life improvements? It should be sellable, so I’d like to reiterate my practical question again:

    Can you point to the the concrete success story where AC/QoS control solves the real predictability problems in a scalable manner?

  • Autoletics

    The adaptive control valve technology has been employed by a number of gaming companies for improved resilience. It is far better than what is considered state of the art – circuit breakers.

    It works at “web scale” :-)

    Adaptively Controlling Apache Cassandra Client Request Processing

    http://www.theserverside.com/discussions/thread.tss?thread_id=76134

    Pretty much every “large scale” company these days is employing some form of in-flight execution control and behavioral self reflection. Do a Google search for “tail at scale”. I wrote many self adaptive software articles on JINSPIRED.com but unfortunately that is now redirected to autoletics.com.

    My proposal would standardize that.

    On QoS for Apps, which was a forerunner for adaptive control valves, there is a very well known German bank that uses it to speed up order processing. Here there are two meters applied: clock time (time constraints) and $$$ (value or cost).

    I don’t think it is hard to imagine that much of what is happening at lower levels in the stack (i.e. networking) is not going to be pushed upwards. We’ve already seen the JVM employ adaptive mechanisms though completely oblivious to what is an application, service or transaction and how it maps to a flow (thread, queue, actor) and series of activities and interactions (change set).

    What I tried to do at the start in 2008 was create a general purpose software activity model consisting of Context (thread), Probe (Activity), Meter (Resource), Environment (State), Name (Hierarchy),…then attach various extensions to this model which ran in tandem with the application whether a Probe (or Activity) was mapped to a method and/or its parameterization (SQL/URL/…). This method of instrumentation was completely decoupled. This was a model that would bridge and connect the Application and Runtime. The Activity (or Probe) would represent the goal and intent of the runtime’s execution of bytecode. The runtime could reason beyond the current stack frame or stack. It was the managed model for the runtime, which allowed control valves to be placed into the flow of the execution. It even allowed for multiple JVMs to be mirrored across into another JVM that simulated the same activity model in near real-time.

    A Time Machine for Augmenting Past Software Execution in the Cloud
    https://vimeo.com/99367877

    I am pretty convinced it is doable because it was done and not just for one use case but many:

    – activity metering
    – resource management
    – adaptive control
    – execution recording
    – simulated playback
    – behavioral (self-)reflection

    Sun had attempted resource management before. That effort failed. Oracle a while back mentioned QoS for JEE. That I believe failed to even be understood.

    The issue is not whether it can be done because clearly it has. I suspect it might have something to do with:

    – market acceptance: I ran my company more like a R&D lab so probably not helpful to my case
    – JDK class library impact: all or nothing is what the community expects with any new addition which pretty much kills off ambitious changes
    – JVM runtime impact: the teams have enough problems at the moment cross cutting concerns like this appear far too disruptive to be “standardized” unless hand is forced
    – software tooling: the ugly sisters are still trying to squeeze into Cinderella’s shoe, that story is still sadly playing out
    – NIH, Ego, Ignorance,…: The usual human faults
    – Hating on William
    – Probes Open API: licensing, style,…

    It was a HUGE vision and it was realized but I can understand the reluctance of a company like Oracle to go with a BIG PLAN for the runtime largely created by one guy. But that was the same plan I had in 2003 when I proposed to the BEA/JRockit team a model that refocused efforts on bringing a Matrix like runtime management solution to the market…before liquid vm was thing. That pretty much scared the pants off the consensus loving Swedes. In the end I did get there but it took nearly 10 years.

    Lets end the sad story there as we do still need to eventually come up with better resource management that is not coupled to thread pool sizing. Developers and Operations are clearly not up to the task in figuring the optimal set point (in a state of flux would you not agree). The runtime has a much better chance but it is for the most part blind to the application, activity and the associated policies (capacity, quality, reliability,…). There needs to be a way to expose this information via service classification annotations fixed in code and an API for dynamic service classification and prioritization that is respected in the runtime over and above the Thread.setPriority() setting.

  • Aleksey Shipilёv

    Can we please keep to the point and concrete examples? I begin to understand why it’s problematic to convey your message. Neither of Cassandra articles you mention are working without redirecting to autoletics site.

    Third time a charm: can you *please* point to the the *concrete* success story where AC/QoS control solves the real predictability problems in a scalable manner?

  • Autoletics
  • Autoletics

    Throttling Concurrency in the CLR 4.0 ThreadPool

    http://msdn.microsoft.com/en-us/magazine/ff960958.aspx

  • Autoletics
  • Autoletics
  • Autoletics
  • Autoletics

    Remember all of the above from the same Open API but with various different configured extensions. On scale how about 200M/events a second.

    https://vimeo.com/98918816

  • carofe

    What do you mean with “Every lambda execution is not instantaneous and during all that time workers won’t be available for other components of the system.”? Those lambdas are called when needed (at the end when the “sum() is called”) but it does not use the F/J Framework, does it?

  • Oleg Šelajev

    Most parallel streams by default use ForkJoinPool.common() for parallelization.

  • Jatin

    Its quite simple to even put it here as the solution. But isn’t having another method which can take the ExecutorService as argument resolve the Issue. Something like: `parallellelStream(ExecutorService exec)`.

    In scala I would just: val ls = (1 to 100).par
    ls.tasksupport = new ForkJoinTaskSupport(
    | new scala.concurrent.forkjoin.ForkJoinPool(2))

    ls.map(x => whatever())

    I wonder what would be the thought behind the makers while designing such thing. Its a serious shortfall.

  • honet

    I remembered reading somewhere that Oracle did not recommend to use block i/o calls in FJP. Also, from the old days, I was told to keep the task short in thread. I believe the common sense would still apply with FJP or ExecutorService. The gain of FJP in Java make it possible to implementing parallel algorithms like FFT and Verterbi, and etc…

  • taupeGriller

    Most of the web container are oversubscribed. There goes down the drain fairness usage of the system. Even with the perfect hand-crafted code it’s impossible to predict the behaviour of parallelism.

  • http://ibragimov.by/ Ruslan Ibragimov

    -Djava.util.concurrent.ForkJoinPool.common.parallelism=1 make things worst. U still using F/J pool, but it has only one thread!