How to guarantee transactional atomicity in more than one concurrent Thread in a Java SE environment using the Spring framework?

8

A Java SE solution, using the Spring framework, is being developed to parallelize the execution of a Stored Procedure in Oracle 9i, which takes as parameter one or more rows from a giant file to be processed.

This will be done through a Threads pool, where each Thread will invoke this same Stored Procedure passing as parameter different lines of this file, in order to optimize the processing time compared to only one execution of this Procedure. However, it is necessary to guarantee transactional atomicity on all the executions / transactions of this same Procedure, as it is guaranteed today in a single execution of this Procedure - that is, committing the transactions at the end and only if no other acknowledgment error: and in case there is an error in some execution of a thread, it will be necessary to rollback all other transactions.

It was logically considered to use a JTA implementation (XA) in Spring, as here , where each thread / transaction would be considered a two-phase commit feature, but I believe that the design of this solution hurts the JTA mechanism principle, since at first it only guarantees atomicity on the resources used in the same transactional method, that is, in only one thread.

How do you ensure this in a non-programmatic way?

    
asked by anonymous 17.04.2014 / 18:14

1 answer

5

Using only Atomikos, an implementation of JTA and XA , I made a simple example that allows you to perform multi-threaded processing within a transaction.

The full project is available in my GitHub .

Implementation

First of all, we have the initialization of DataSource and TransactionManager :

// Atomikos implementations
private static UserTransactionManager utm;
private static AtomikosDataSourceBean adsb;

// initialize resources
public static void init() {
    utm = new UserTransactionManager();
    try {
        utm.init();
        adsb = new AtomikosDataSourceBean();
        adsb.setMaxPoolSize(20);
        adsb.setUniqueResourceName("postgres");
        adsb.setXaDataSourceClassName("org.postgresql.xa.PGXADataSource");
        Properties p = new Properties();
        p.setProperty("user", "postgres");
        p.setProperty("password", "0");
        p.setProperty("serverName", "localhost");
        p.setProperty("portNumber", "5432");
        p.setProperty("databaseName", "postgres");
        adsb.setXaProperties(p);
    } catch (SystemException e) {
        e.printStackTrace();
        throw new RuntimeException(e);
    }
}

Then, a thread that receives the transaction instance ( Transaction ) main:

private static class Processamento implements Callable<Integer> {

    private int id;
    private boolean falhar;
    private Transaction transaction;

    public Processamento(int id, boolean falhar, Transaction transaction) {
        this.falhar = falhar;
        this.transaction = transaction;
        this.id = id;
    }

    public Integer call() throws Exception {
        if (falhar) {
            throw new RuntimeException("Falhou inesperadamente!");
        }

        //enlist xa connection
        XAConnection xac = AtomikosDataSource.getDS().getXaDataSource().getXAConnection();
        synchronized (transaction) {
            transaction.enlistResource(xac.getXAResource());
        }

        //normal execution, update row with OK
        Connection c = xac.getConnection();
        Statement s = c.createStatement();
        s.executeUpdate("update teste set processado = 'ok' where id = " + id);
        s.close();
        c.close();

        //delist xa connection
        synchronized (transaction) {
            transaction.delistResource(xac.getXAResource(), XAResource.TMSUCCESS);
        }
        return id;
    }

}

Note that instead of using JTA, I'm directly using the XA API implemented by Atomikos.

Call AtomikosDataSource.getDS().getXaDataSource().getXAConnection() retrieves an XA connection, which is added to the main transaction with the transaction.enlistResource(xac.getXAResource()) command.

I did the sync in some snippets because I randomly got some NullPointerException in the tests, however this should not be a problem if you use the connections with caution, that is, without opening them and closing them all the time .

Finally, a method that starts five instances of thread above:

public static int processar(boolean falhar) {
    int ok = 0;
    Transaction transaction = null;
    try {

        //start transaction
        AtomikosDataSource.getTM().begin();
        transaction = AtomikosDataSource.getTM().getTransaction();

        //create thread pool
        ExecutorService executor = Executors.newFixedThreadPool(5);
        List<Callable<Integer>> processos = new ArrayList<Callable<Integer>>();

        //create 5 threads, passing the main transaction as argument
        for (int i = 0; i < 5; i++) {
            processos.add(new Processamento(i + 1, i == 4 && falhar, transaction));
        }

        //execute threads and wait
        List<Future<Integer>> futures = executor.invokeAll(processos);

        //count the result; get() will fail if thread threw an exception
        Throwable ex = null;
        for (Future<Integer> future : futures) {
            try {
                int threadId = future.get();
                System.out.println("Thread " + threadId + " sucesso!");
                ok++; 
            } catch (Throwable e) {
                ex = e;
            }
        }

        if (ex != null) {
            throw ex;
        }

        //finish transaction normally
        transaction.commit();

    } catch (Throwable e) {

        e.printStackTrace();
        try {
            //try to rollback
            if (transaction != null) {
                AtomikosDataSource.getTM().rollback();
            }
        } catch (IllegalStateException e1) {
            e1.printStackTrace();
        } catch (SecurityException e1) {
            e1.printStackTrace();
        } catch (SystemException e1) {
            e1.printStackTrace();
        }

    }
    return ok;
}

I did some testing in both a success and failure scenario to validate the result.

In the success scenario, each of the five threads updates a line of the TESTE table with the value ok and does the commit of the transaction. p>

In the crash scenario, the last thread always throws an exception, forcing the rollback of the other four threads .

See the code on GitHub for more details.

Configuration Notes

I used PostgreSQL in the example. It was necessary to enable the max_prepared_transactions setting with a value greater than zero in the postgresql.conf configuration file.

It's important to make sure your database driver supports distributed transactions. I read somewhere that MySQL might have some issues with that.

Observations

To make the example work with Spring, simply set up the classes created manually in beans in XML or through annotations. It's up to you.

Be careful if you decide to implement something like this within an Application Server, so as not to interfere with normal system transactions.

Personally, I do not see a real need for parallel processing within the same transaction. It is much more efficient to split the processing into transactional blocks. There are several techniques for doing this without making the system state inconsistent, for example, by using additional columns in the table or even an extra table.

    
17.04.2014 / 20:12