Brian Goetz, Java Language Architect at Oracle, gave an interesting presentation “From Concurrent to Parallel” (available on InfoQ) on the subject back in 2009. Here are the most important points of his talk.

Java 8 introduces the Stream library (which was, in Brians words, developed as a showcase for the new Java 8 language features). Just calling “.parallel()” advices the library to process the stream “in parallel”, i.e. in multiple threads.

Do you really need it?

Parallel processing is an optimization and therefore the general questions regarding optimizations have to be answered:

  • Do you have any performance requirements at all (otherwise you are already fast enough)
  • Do you have any means to measure the performance?
  • Does the performance you measure violate the requirements?

Only if the answer to all these questions is “yes” you may take the time to investigate whether “.parallel()” will increase the performance.

Why the performance may not increase?

Compared to sequentionally processing a stream, processing it in parallel has overhead costs: splitting the data, managing the threads that will process the data and combining the results. So you may not see as much speed up as you might hope for.

Brian talks about several factors that undermine speedup.

NQ is insufficiently high

He talks about a simple model called “NQ model”. N*Q should be greater than 10.000. N is the number of data items. Q is factor that expresses how CPU-expensive the processing step is. “Summing numbers” or “finding the max of integers” is very inexpensive and Q would be near 1. More complex tasks would have higher values of Q. So if all you want is to add up numbers, you need a lot of numbers to see a performance gain.

Cache-miss ratio too high

If the data to be processed is saved next to each other in RAM, it will be transfered together in CPU caches. Accessing cache memory instead of main memory is very fast, so data in an array is processed much faster than data in a linked list that is spread all over the main memory. Also when the data in an array is just pointers data access will be slow. As a demonstration, Brian shows that summing up an array of (native) integers in parallel scales well with number of CPU cores. Summing up an array of Integer-objects scales very poorly because there are just pointers in the array.

The more indirections (pointers) you work with the more cache-misses will have the CPU wait for memory access.

The source is expensive to split

In order to process data in parallel, the source of the data has to be splitted in order to hand parts of the data to different CPUs. Splitting arrays is simple, splitting linked list is hard (Brian said: linked list are splitted into “first element, rest”).

Result combination cost is to high

When the result combination is the sum of numbers, that is easy to calculate. If the result is a set, and the result combination is to merge the resulting sets, this is expensive. It might be faster to sequentially add each result into one set.

Order-sensitive operations

Operations like “limit()”, “skip()” and “findFirst()” depend on the order of the data in the stream. This makes the pipelines using them “less exploitable” regarding parallelism. You can call “unordered()” if the order is not meaningful to you and the JVM will optimize those operations.

Mutating shared state is a big NO!

Of course you will lose performance if you have to mutate shared state and guarding access to it with locks etc.

Parallel streams are for CPU-heavy process steps

Brian mentions that parallel streams where built for CPU-heavy tasks. If you do mainly IO in the processing steps than use ThreadPools and Executors etc.

What degree of parallelism does the Stream API uses?

As stated in the Streams API documentation, the API uses one JVM wide Fork-Join-Pool with a number of threads that defaults to the number of processors on the computer system. Why? Because it is expected that a processing steps will utilize the CPU as much as possible. So there is no sense in having more threads then CPU cores.

In contrast, when having IO-heavy tasks you want to have many more threads than CPU cores because the threads will wait for IO most of the time and therefore not utilize the CPU.

For almost 10 years I have been working with email (SMTP) as
integration technology. It is often used between different
organizations. Using email as integration technology may seem
outdated but it is not.

mail-button

Here is a list of advantages:

  • Infrastructure (Mail servers etc.) is often already there. no need to install new software
  • Knowledge of SMTP is often available because someone has to operate the mail service within the organization anyway
  • Chances are high there are email libraries for your development  plattform given how widespread the use case of sending and receiving mails is.
  • No need for a point-to-point connection between two IT-systems because email uses several “hops” to get delivered. No need for new firewall rules, application level gateways etc.
  • Email is asynchronous. Unless you need an instant answer, using async services is a good thing. Asynchronous messaging is promoted in the context of microservices.
  • For manual testing you can send mails from your email client to
    the system
  • For manual testing or monitoring, you can easily configure another mailbox that receives mails beside the actual mailbox (just add an additional address in “To” field or add a “Cc” field)
  • Although email is one-way communication in one case we have implemented acknowledge mails with timeout rules and a retry mechanism
  • emails can be automatically acknowledged (called “delivery status notification”) but this is often configured for non-delivery only
  • there are standards for “doing things”, like encryption

Of course there are some disadvantages:

  • less control: you may depend on that “mail guy” for configuring stuff and looking after log files. this might not be a problem unless you are practicing DevOps because that “mail guy” is part of the operations team anyway
  • less control: there might be a mail server between source and destiny that does something like adding a PGP signature or rewriting a header that changes the message
  • you may have to deal with additional mails like non-delivery-mails. These mails inform you that your original mail could not be delivered
  • there are size limitations, email is not suitable for sending large filesSo from my point of you there are a number of advantage of using email
    as integration technology when exchanging messages between two or
    more different organizations.

Zachary Flower points out the importance of proper documention in his article “API Design: A Documentation-first Approach ” and even encourages a “documentation-first approach”.

He also cautions us against a “build-first” approach, utilizing existing architecture:

When designing an API, it is often desireable to take a “build first” approach, especially when utilizing the architecture of a pre-existing product. Unfortunately, this mindset doesn’t follow the standard usability practices that we follow when building apps with graphic interfaces. It is extremely important that we take a user-centric approach to API design, because we are developing a product to be consumed by other developers. If we can’t empathise with their needs and frustrations, then who can we empathise with? This user-centric focus is an important reason to start by writing your API documentation, rather than just designing it. When you create good documentation, good design follows, but the reverse isn’t necessarily true.

In Java, a synchronized method is not thread safe if it reads from and writes to one or more static member variables.

Consider:

public class SomeClass {

  static int someCounter = 0;

  synchronized void doSomething() {
    for(int i = 0; i < 20; i++) {
      someCounter++;
      // do something that takes a bit of time, e.g.
      // java.net.InetAddress.getByName("www.wikipedia.org");
      System.out.println("counter="+counter);
    }
  }
}

and assume the access to someCounter is somehow thread safe because of the synchronized keyword on doSomething.
As soon as you call doSomething concurrently on multiple SomeClass instances, it will not print unique numbers. This is because the all instance share the same static member variables. Between the increment of someCounter and printing it, its value might have already changed by another instance.

That particular bug was a bit hidden because a “SomeClass” instance was “cached” in a JEE stateless session bean. Of course the JEE container creates multiple instances of the session bean and hence multiple instances of SomeClass.

If you want to stop the route you are currently using, you can’t do it using

context.stopRoute()

because it will wait for the current “inflight” message to be processed until “the end”.

So you need to do this async:

// ....
  private void shutdownRoute() {
    ShutdownRoute shutdownRoute = new ShutdownRoute(camelContext);
    new Thread(shutdownRoute).start();
    try {
      Thread.sleep(1000); // makes sure route is stop before this message finished 
    } catch (InterruptedException e) {
      // whatever
    }
  }
// ...

class ShutdownRoute implements Runnable {
  /** log instance */
  private Logger log = LoggerFactory.getLogger(ShutdownRoute.class.getName());
  
  private final CamelContext context;
  
  ShutdownRoute(final CamelContext cContext) {
    context = cContext;
    
  }
  
  @Override
  public void run() {
    try {      
      context.stopRoute(YOUR_ROUTE_ID);
    } catch (Exception e) {
      log.error("Failed to stop route",e);
    }
  }
}

We do this when using the circuit breaker pattern. In the error handling processor we check if the circuit breaker goes to the OPEN state. If so, the route stops itself. Via another route we check periodically if the circuit breaker is in the HALF-OPEN state and start the route again.

At one point in our latest project using Apache Camel, I wanted to stop a message from being further processed depending on a condition. This is quite easy to do with the route builder, for example:

from("direct:example")
  .choice()
    .when(header("someheader").isEqualTo("bla"))
      .stop()
  .end()

If you already know that a message needs to be stopped right away in a Processor or a AggregationStrategy, there is an easy way to signal this without a seperate “stop-branch” in the route. You simply set the header ROUTE_STOP to TRUE like this:

if  (....) { // stop here 
      original.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE);
      return original;
}

And the message will not be further processed.