Fork / Join
The fork/join
feature from Java 7 can be a little hard to understand at first, because it caters to a specific class of problems.
According to the documentation for the class ForkJoinTask
, the idea is to enable a more efficient method of parallel processing by imposing some restrictions on how threads work. The goal is to be able to split a problem into smaller tasks and execute them independently, without using synchronization ( synchronized
).
If we think about it for a while, this will solve issues of Dynamic Programming . One of the classic examples is the Fibonacci number. The following code is an example of using fork/join
that calculates the Fibonacci number:
public class Fibonacci extends RecursiveTask<Long> {
long n;
public Fibonacci(long n) {
this.n = n;
}
@Override
protected Long compute() {
if (n <= 1) {
return n;
}
Fibonacci f1 = new Fibonacci(n - 1);
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
return f2.compute() + f1.join();
}
}
We can point out the following points:
- We've extended
RecursiveTask
, one of two implementations available from ForkJoinTask
. A "recursive task" is intended to execute recursive subtasks in parallel and return a final value.
- Within the
compute()
method the "magic" occurs, where we define the limit of the recursar in the first if
and recursively invoke class instances in new threads.
- When invoking
f1.fork()
, we are asking you to compute f(n-2)
into another thread . This frees parallel processing from f(n-1)
.
- Finally, we invoke
f2.compute()
to calculate the value immediately and f1.join()
to retrieve the value of fork
or wait for processing to finish.
You can run the above code as follows:
ForkJoinPool pool = new ForkJoinPool(4);
Fibonacci fibonacci = new Fibonacci(10);
long resultado = pool.invoke(fibonacci);
System.out.println(resultado);
A class ForkJoinPool
enables the management of parallel tasks and the constructor parameter sets the level of parallelism, that is, how many threads will be used simultaneously.
The pool.invoke(fibonacci)
method starts processing, waits for calculation and returns the calculated number.
ThreadPoolExecutor
However, if your problem does not fall into the problem category where "divide and conquer" recursively is the best strategy, you can use more generic APIs, such as ThreadPoolExecutor
.
It is part of the same package as fork/join
because both implement ExecutorService
, but without the recursive and limited nature of the "sister".
To create an instance of ThreadPoolexecutor
:
ExecutorService threadPoolExecutor = new ThreadPoolExecutor(
4, //tamanho inicial do pool
8, //tamanho máximo do pool
10000, //tempo de espera máximo para cada thread na fila
TimeUnit.MILLISECONDS, //unidade de tempo
new LinkedBlockingQueue<Runnable>() //implementação da fila de theads
);
Then we ask you to run threads and use futures to keep the promise of a future result:
Future<Integer> f1 = threadPoolExecutor.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return 1;
}
});
Future<Integer> f2 = threadPoolExecutor.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return 2;
}
});
Note that the above code uses the submit()
method to request execution of two Callable
s. A Callable
represents a thread , as well as a Runnable
, but returning a value.
The class Future
saves a reference to the thread so that we can continue execution and retrieve the result when it is ready, like this:
try {
Integer r1 = f1.get(1000, TimeUnit.MILLISECONDS);
Integer r2 = f2.get(1000, TimeUnit.MILLISECONDS);
System.out.println(r1 + r2);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
In the example above, I used the get()
method of Future
instances to wait for processing and effectively retrieve the result.
Parameters indicate that the wait timeout is 1000 milliseconds. If that happens, a TimeoutException
will be thrown.
Final considerations
The choice between using classical threads, fork/join
, ThreadPoolExecutor
, or any other mechanism depends on the nature of the problem you are trying to solve. One approach is not necessarily better than the other for all kinds of problems.
In addition, certain situations will require more specific implementations, where you will need to extend the classes and interfaces of the java.util.concurrent
API. The good news is that this API is done with that in mind.