Using only Atomikos, an implementation of JTA and XA , I made a simple example that allows you to perform multi-threaded processing within a transaction.
The full project is available in my GitHub .
Implementation
First of all, we have the initialization of DataSource
and TransactionManager
:
// Atomikos implementations
private static UserTransactionManager utm;
private static AtomikosDataSourceBean adsb;
// initialize resources
public static void init() {
utm = new UserTransactionManager();
try {
utm.init();
adsb = new AtomikosDataSourceBean();
adsb.setMaxPoolSize(20);
adsb.setUniqueResourceName("postgres");
adsb.setXaDataSourceClassName("org.postgresql.xa.PGXADataSource");
Properties p = new Properties();
p.setProperty("user", "postgres");
p.setProperty("password", "0");
p.setProperty("serverName", "localhost");
p.setProperty("portNumber", "5432");
p.setProperty("databaseName", "postgres");
adsb.setXaProperties(p);
} catch (SystemException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
Then, a thread that receives the transaction instance ( Transaction
) main:
private static class Processamento implements Callable<Integer> {
private int id;
private boolean falhar;
private Transaction transaction;
public Processamento(int id, boolean falhar, Transaction transaction) {
this.falhar = falhar;
this.transaction = transaction;
this.id = id;
}
public Integer call() throws Exception {
if (falhar) {
throw new RuntimeException("Falhou inesperadamente!");
}
//enlist xa connection
XAConnection xac = AtomikosDataSource.getDS().getXaDataSource().getXAConnection();
synchronized (transaction) {
transaction.enlistResource(xac.getXAResource());
}
//normal execution, update row with OK
Connection c = xac.getConnection();
Statement s = c.createStatement();
s.executeUpdate("update teste set processado = 'ok' where id = " + id);
s.close();
c.close();
//delist xa connection
synchronized (transaction) {
transaction.delistResource(xac.getXAResource(), XAResource.TMSUCCESS);
}
return id;
}
}
Note that instead of using JTA, I'm directly using the XA API implemented by Atomikos.
Call AtomikosDataSource.getDS().getXaDataSource().getXAConnection()
retrieves an XA connection, which is added to the main transaction with the transaction.enlistResource(xac.getXAResource())
command.
I did the sync in some snippets because I randomly got some NullPointerException
in the tests, however this should not be a problem if you use the connections with caution, that is, without opening them and closing them all the time .
Finally, a method that starts five instances of thread above:
public static int processar(boolean falhar) {
int ok = 0;
Transaction transaction = null;
try {
//start transaction
AtomikosDataSource.getTM().begin();
transaction = AtomikosDataSource.getTM().getTransaction();
//create thread pool
ExecutorService executor = Executors.newFixedThreadPool(5);
List<Callable<Integer>> processos = new ArrayList<Callable<Integer>>();
//create 5 threads, passing the main transaction as argument
for (int i = 0; i < 5; i++) {
processos.add(new Processamento(i + 1, i == 4 && falhar, transaction));
}
//execute threads and wait
List<Future<Integer>> futures = executor.invokeAll(processos);
//count the result; get() will fail if thread threw an exception
Throwable ex = null;
for (Future<Integer> future : futures) {
try {
int threadId = future.get();
System.out.println("Thread " + threadId + " sucesso!");
ok++;
} catch (Throwable e) {
ex = e;
}
}
if (ex != null) {
throw ex;
}
//finish transaction normally
transaction.commit();
} catch (Throwable e) {
e.printStackTrace();
try {
//try to rollback
if (transaction != null) {
AtomikosDataSource.getTM().rollback();
}
} catch (IllegalStateException e1) {
e1.printStackTrace();
} catch (SecurityException e1) {
e1.printStackTrace();
} catch (SystemException e1) {
e1.printStackTrace();
}
}
return ok;
}
I did some testing in both a success and failure scenario to validate the result.
In the success scenario, each of the five threads updates a line of the TESTE
table with the value ok
and does the commit of the transaction. p>
In the crash scenario, the last thread always throws an exception, forcing the rollback of the other four threads .
See the code on GitHub for more details.
Configuration Notes
I used PostgreSQL in the example. It was necessary to enable the max_prepared_transactions
setting with a value greater than zero in the postgresql.conf
configuration file.
It's important to make sure your database driver supports distributed transactions. I read somewhere that MySQL might have some issues with that.
Observations
To make the example work with Spring, simply set up the classes created manually in beans in XML or through annotations. It's up to you.
Be careful if you decide to implement something like this within an Application Server, so as not to interfere with normal system transactions.
Personally, I do not see a real need for parallel processing within the same transaction. It is much more efficient to split the processing into transactional blocks. There are several techniques for doing this without making the system state inconsistent, for example, by using additional columns in the table or even an extra table.