Fork in Join in Java

2

During a project I was suggested to use Fork in Join of the Java API instead of threads , I found nothing easy to understand by the examples found on Google.

I understand that it is possible to pass a list of tasks and it subdivides and then gathers the results.

I would like to know if it is better to use threads and if someone has an example to show or even tell their use experience with this template.

    
asked by anonymous 06.02.2014 / 12:33

3 answers

3

Fork / Join

The fork/join feature from Java 7 can be a little hard to understand at first, because it caters to a specific class of problems.

According to the documentation for the class ForkJoinTask , the idea is to enable a more efficient method of parallel processing by imposing some restrictions on how threads work. The goal is to be able to split a problem into smaller tasks and execute them independently, without using synchronization ( synchronized ).

If we think about it for a while, this will solve issues of Dynamic Programming . One of the classic examples is the Fibonacci number. The following code is an example of using fork/join that calculates the Fibonacci number:

public class Fibonacci extends RecursiveTask<Long> {

    long n;

    public Fibonacci(long n) {
        this.n = n;
    }

    @Override
    protected Long compute() {
        if (n <= 1) {
            return n;
        }
        Fibonacci f1 = new Fibonacci(n - 1);
        f1.fork();
        Fibonacci f2 = new Fibonacci(n - 2);
        return f2.compute() + f1.join();
    }

}

We can point out the following points:

  • We've extended RecursiveTask , one of two implementations available from ForkJoinTask . A "recursive task" is intended to execute recursive subtasks in parallel and return a final value.
  • Within the compute() method the "magic" occurs, where we define the limit of the recursar in the first if and recursively invoke class instances in new threads.
  • When invoking f1.fork() , we are asking you to compute f(n-2) into another thread . This frees parallel processing from f(n-1) .
  • Finally, we invoke f2.compute() to calculate the value immediately and f1.join() to retrieve the value of fork or wait for processing to finish.

You can run the above code as follows:

ForkJoinPool pool = new ForkJoinPool(4);
Fibonacci fibonacci = new Fibonacci(10);
long resultado = pool.invoke(fibonacci);
System.out.println(resultado);

A class ForkJoinPool enables the management of parallel tasks and the constructor parameter sets the level of parallelism, that is, how many threads will be used simultaneously.

The pool.invoke(fibonacci) method starts processing, waits for calculation and returns the calculated number.

ThreadPoolExecutor

However, if your problem does not fall into the problem category where "divide and conquer" recursively is the best strategy, you can use more generic APIs, such as ThreadPoolExecutor .

It is part of the same package as fork/join because both implement ExecutorService , but without the recursive and limited nature of the "sister".

To create an instance of ThreadPoolexecutor :

ExecutorService threadPoolExecutor = new ThreadPoolExecutor(
        4, //tamanho inicial do pool
        8, //tamanho máximo do pool  
        10000, //tempo de espera máximo para cada thread na fila 
        TimeUnit.MILLISECONDS, //unidade de tempo
        new LinkedBlockingQueue<Runnable>() //implementação da fila de theads
);

Then we ask you to run threads and use futures to keep the promise of a future result:

Future<Integer> f1 = threadPoolExecutor.submit(new Callable<Integer>() {
    @Override
    public Integer call() throws Exception {
        return 1;
    }
});

Future<Integer> f2 = threadPoolExecutor.submit(new Callable<Integer>() {
    @Override
    public Integer call() throws Exception {
        return 2;
    }
});

Note that the above code uses the submit() method to request execution of two Callable s. A Callable represents a thread , as well as a Runnable , but returning a value.

The class Future saves a reference to the thread so that we can continue execution and retrieve the result when it is ready, like this:

try {
    Integer r1 = f1.get(1000, TimeUnit.MILLISECONDS);
    Integer r2 = f2.get(1000, TimeUnit.MILLISECONDS);
    System.out.println(r1 + r2);
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
} catch (TimeoutException e) {
    e.printStackTrace();
}

In the example above, I used the get() method of Future instances to wait for processing and effectively retrieve the result.

Parameters indicate that the wait timeout is 1000 milliseconds. If that happens, a TimeoutException will be thrown.

Final considerations

The choice between using classical threads, fork/join , ThreadPoolExecutor , or any other mechanism depends on the nature of the problem you are trying to solve. One approach is not necessarily better than the other for all kinds of problems.

In addition, certain situations will require more specific implementations, where you will need to extend the classes and interfaces of the java.util.concurrent API. The good news is that this API is done with that in mind.

    
06.02.2014 / 13:57
1

There is a project called Java Concurrent Animated that created a Java capabilities demonstration application in concurrent / parallel programming.

You can download a demo (with fonts) that you can use to study everything related to this theme.

There are several types of animations that demonstrate how to use each competing programming model. During the animation, snippets of the sample code are being posted, giving a good idea of how to implement.

The demo is just a self-executing JAR. Just double click.

It's really cool and very well done!

    
06.02.2014 / 13:41
0

I'm posting the code as an example to anyone who wants to see how it was done. This can be changed by anyone, as long as it has improved.

package service.forkinjoin;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RecursiveAction;

import javax.mail.Message;
import javax.mail.MessagingException;
import javax.mail.Multipart;
import javax.mail.Part;
import javax.mail.internet.MimeBodyPart;

import service.FileUtil;

public class ForkSortMessages extends RecursiveAction {

    private static final long serialVersionUID = -1092415796824205832L;
    private List<Message> listMessages;
    private List<Message> listMessagesToDelete;

    public ForkSortMessages(List<Message> listMessages, List<Message> listMessagesToDelete) {
        this.listMessages = listMessages;
        this.listMessagesToDelete = listMessagesToDelete;
    }

    @Override
    protected void compute() {

        List<RecursiveAction> actions = new ArrayList<>();

        if (this.listMessages.size() <= Runtime.getRuntime().availableProcessors()) {
            try {
                this.separateMessages();
            } catch (MessagingException | IOException e) {
                e.printStackTrace();
            }
        } else {

            int end = this.listMessages.size() / 2;
            actions.add(new ForkSortMessages(this.listMessages.subList(0, end), this.listMessagesToDelete));
            end += this.listMessages.size() % 2 == 0 ? 0 : 1;
            actions.add(new ForkSortMessages(this.listMessages.subList(end, this.listMessages.size()), this.listMessagesToDelete));
            invokeAll(actions);
        }
    }

    private void separateMessages() throws MessagingException, IOException {

        for (Message message : this.listMessages) {

            if ((this.hasNoAttachment(message.getContentType()) || (this.hasNoXmlAttachment(message)))) {
                listMessagesToDelete.add(message);
            }
        }
    }

    private boolean hasNoAttachment(String content) {
        return !content.contains("multipart/MIXED");
    }

    private boolean hasNoXmlAttachment(Message message) throws IOException, MessagingException {

        Multipart multipart = (Multipart) message.getContent();

        for (int i = 0; i < multipart.getCount(); i++) {

            MimeBodyPart mimeBodyPart = (MimeBodyPart) multipart.getBodyPart(i);

            if (Part.ATTACHMENT.equalsIgnoreCase(mimeBodyPart.getDisposition())) {

                if (FileUtil.isXmlFile(mimeBodyPart.getFileName())) {
                    return false;
                }
            }
        }

        return true;
    }
}
    
06.02.2014 / 17:12