package org.jfox.tm;
import java.util.TimerTask;
import java.util.List;
import java.util.ArrayList;
import java.util.Iterator;
import javax.transaction.Transaction;
import javax.transaction.RollbackException;
import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;
import javax.transaction.SystemException;
import javax.transaction.Synchronization;
import javax.transaction.Status;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import javax.transaction.xa.XAException;
import org.huihoo.jfox.logging.Logger;
class TransactionImpl extends TimerTask implements Transaction {
private static Logger logger = Logger.getLogger(TransactionImpl.class.getName());
private int status = Status.STATUS_NO_TRANSACTION;
private Xid xid = null;
private static int branchCount = 0;
private List syncs = new ArrayList();
private List enlistedXARes = new ArrayList();
private List suspendedXARes = new ArrayList();
public TransactionImpl() {
status = Status.STATUS_ACTIVE;
xid = new XidImpl();
logger.debug(
new StringBuffer()
.append("begin a new Transaction, tx= ")
.append(this.toString())
.append(" status=")
.append(StatusHelper.toString(status))
.toString());
}
public synchronized boolean delistResource(XAResource xaResource, int flag) throws IllegalStateException, SystemException {
logger.debug(
new StringBuffer()
.append("Transaction.delistResource(),flag = " + flag + " tx= ")
.append(this.toString())
.append(" status=")
.append(StatusHelper.toString(status))
.toString());
if (xaResource == null) throw new IllegalArgumentException("null xaRes");
if (flag != XAResource.TMSUCCESS &&
flag != XAResource.TMSUSPEND &&
flag != XAResource.TMFAIL)
throw new IllegalArgumentException("wrong flag, flag must be one of XAResource.TMSUCCESS, XAResource.TMSUSPEND, XAResource.TMFAIL");
KeyXAResource keyXAres = new KeyXAResource(xaResource);
int index = enlistedXARes.indexOf(keyXAres);
if(index < 0) {
throw new IllegalArgumentException("xaResource not enlisted");
}
switch (status) {
case Status.STATUS_ACTIVE:
case Status.STATUS_MARKED_ROLLBACK:
break;
case Status.STATUS_PREPARING:
throw new IllegalStateException("Already started preparing.");
case Status.STATUS_ROLLING_BACK:
throw new IllegalStateException("Already started rolling back.");
case Status.STATUS_PREPARED:
throw new IllegalStateException("Already prepared.");
case Status.STATUS_COMMITTING:
throw new IllegalStateException("Already started committing.");
case Status.STATUS_COMMITTED:
throw new IllegalStateException("Already committed.");
case Status.STATUS_ROLLEDBACK:
throw new IllegalStateException("Already rolled back.");
case Status.STATUS_NO_TRANSACTION:
throw new IllegalStateException("No Transaction.");
case Status.STATUS_UNKNOWN:
throw new IllegalStateException("Unknown state");
default:
throw new IllegalStateException("Illegal status: " + StatusHelper.toString(status));
}
keyXAres = (KeyXAResource)enlistedXARes.get(index);
try {
logger.debug("delist resource " + keyXAres.getXid() + " with flag " + flag);
xaResource.end(keyXAres.getXid(),flag);
enlistedXARes.remove(index);
return true;
}
catch(XAException e){
logger.warn(e);
status = Status.STATUS_MARKED_ROLLBACK;
return false;
}
}
public synchronized boolean enlistResource(XAResource xaResource) throws RollbackException, IllegalStateException, SystemException {
logger.debug(
new StringBuffer()
.append("Transaction.enlistResource(), tx= ")
.append(this.toString())
.append(" status=")
.append(StatusHelper.toString(status))
.toString());
if (xaResource == null) throw new IllegalArgumentException("null xaRes");
switch (status) {
case Status.STATUS_ACTIVE :
case Status.STATUS_PREPARING :
break;
case Status.STATUS_PREPARED :
throw new IllegalStateException("Transaction already prepared.");
case Status.STATUS_COMMITTING :
throw new IllegalStateException("Transaction already started committing.");
case Status.STATUS_COMMITTED :
throw new IllegalStateException("Transaction already committed.");
case Status.STATUS_MARKED_ROLLBACK :
throw new RollbackException("Transaction already marked for rollback");
case Status.STATUS_ROLLING_BACK :
throw new RollbackException("Transaction already started rolling back.");
case Status.STATUS_ROLLEDBACK :
throw new RollbackException("Transaction already rolled back.");
case Status.STATUS_NO_TRANSACTION :
throw new IllegalStateException("No current Transaction.");
case Status.STATUS_UNKNOWN :
throw new IllegalStateException("Unknown Transaction status");
default :
throw new IllegalStateException(
"Illegal Transaction status : " + StatusHelper.toString(status));
}
int flag = XAResource.TMNOFLAGS;
KeyXAResource keyXAres = new KeyXAResource(xaResource);
int index = enlistedXARes.indexOf(keyXAres);
if(index >=0) {
keyXAres = (KeyXAResource)enlistedXARes.get(index);
if(keyXAres.getXaResource() == xaResource) {
return false;
}
flag = XAResource.TMJOIN;
}
else if((index=suspendedXARes.indexOf(keyXAres)) >= 0) { flag = XAResource.TMRESUME;
keyXAres = (KeyXAResource)suspendedXARes.get(index);
suspendedXARes.remove(index);
enlistedXARes.add(keyXAres);
}
else {
enlistedXARes.add(keyXAres);
}
try {
logger.debug("resource start new branch Transaction " + keyXAres.getXid() + " with flag " + flag);
xaResource.start(keyXAres.getXid(),flag);
return true;
}
catch(XAException e){
logger.warn(e);
return false;
}
}
public int getStatus() throws SystemException {
return status;
}
public synchronized void registerSynchronization(Synchronization synchronization) throws RollbackException, IllegalStateException, SystemException {
logger.debug(
new StringBuffer()
.append("Transaction.registerSynchronization(), tx= ")
.append(this.toString())
.append(" status=")
.append(StatusHelper.toString(status))
.toString());
if (synchronization == null) throw new IllegalArgumentException("Null synchronization");
switch (status) {
case Status.STATUS_ACTIVE:
case Status.STATUS_PREPARING:
case Status.STATUS_MARKED_ROLLBACK:
break;
case Status.STATUS_PREPARED:
throw new IllegalStateException("Already prepared.");
case Status.STATUS_COMMITTING:
throw new IllegalStateException("Already started committing.");
case Status.STATUS_COMMITTED:
throw new IllegalStateException("Already committed.");
case Status.STATUS_ROLLING_BACK:
throw new RollbackException("Already started rolling back.");
case Status.STATUS_ROLLEDBACK:
throw new RollbackException("Already rolled back.");
case Status.STATUS_NO_TRANSACTION:
throw new IllegalStateException("No Transaction.");
case Status.STATUS_UNKNOWN:
throw new IllegalStateException("Unknown state");
default:
throw new IllegalStateException("Illegal status: " + StatusHelper.toString(status));
}
syncs.add(synchronization);
}
public synchronized void commit() throws RollbackException, HeuristicMixedException, HeuristicRollbackException, SecurityException, SystemException {
logger.debug(
new StringBuffer()
.append("Transaction.commit(), tx= ")
.append(this.toString())
.append(" status=")
.append(StatusHelper.toString(status))
.toString());
switch (status) {
case Status.STATUS_PREPARING:
throw new IllegalStateException("Already started preparing.");
case Status.STATUS_PREPARED:
throw new IllegalStateException("Already prepared.");
case Status.STATUS_ROLLING_BACK:
throw new IllegalStateException("Already started rolling back.");
case Status.STATUS_ROLLEDBACK:
throw new IllegalStateException("Already rolled back.");
case Status.STATUS_COMMITTING:
throw new IllegalStateException("Already started committing.");
case Status.STATUS_COMMITTED:
throw new IllegalStateException("Already committed.");
case Status.STATUS_NO_TRANSACTION:
throw new IllegalStateException("No Transaction.");
case Status.STATUS_UNKNOWN:
throw new IllegalStateException("Unknown state");
case Status.STATUS_MARKED_ROLLBACK:
this.rollback();
throw new RollbackException("Already marked for rollback");
case Status.STATUS_ACTIVE:
break;
default:
throw new IllegalStateException("Illegal status: " + StatusHelper.toString(status));
}
doBeforeCompletion();
try {
if(status == Status.STATUS_ACTIVE) {
if (enlistedXARes.size() == 0) {
status = Status.STATUS_COMMITTED;
}
else if (enlistedXARes.size() == 1) {
doOnePhaseCommit();
}
else { doTwoPhaseCommint();
}
}
else if(status == Status.STATUS_MARKED_ROLLBACK){
this.rollback();
}
}
finally {
doAfterCompletion();
enlistedXARes.clear();
}
}
private void doOnePhaseCommit() throws RollbackException {
status = Status.STATUS_COMMITTING;
KeyXAResource keyXARes = (KeyXAResource)enlistedXARes.get(0);
try {
endResource(keyXARes,XAResource.TMSUCCESS);
}
catch(XAException e){
logger.warn("end resource error, to be roll back", e);
status = Status.STATUS_MARKED_ROLLBACK;
}
if(status == Status.STATUS_MARKED_ROLLBACK){
try {
status = Status.STATUS_ROLLING_BACK;
keyXARes.getXaResource().rollback(keyXARes.getXid());
status = Status.STATUS_ROLLEDBACK;
}
catch(XAException e){
logger.warn(e);
}
throw new RollbackException("commit failed, already rolled back");
}
else {
try {
keyXARes.getXaResource().commit(keyXARes.getXid(),true);
}
catch(XAException e){
logger.warn(e);
}
}
status = Status.STATUS_COMMITTED;
}
private void doTwoPhaseCommint() throws
HeuristicMixedException,
HeuristicRollbackException,
RollbackException{
logger.debug("start two phase commit");
status = Status.STATUS_PREPARING;
int heuristicCode = XAException.XA_RETRY;
int prepareStatus = XAResource.XA_RDONLY;
for(Iterator it = enlistedXARes.iterator();it.hasNext();){
KeyXAResource keyXARes = (KeyXAResource)it.next();
try {
logger.debug("prepare " + keyXARes.getXid());
endResource(keyXARes,XAResource.TMSUCCESS);
int xaStatus = keyXARes.getXaResource().prepare(keyXARes.getXid());
if(xaStatus == XAResource.XA_OK){
prepareStatus = XAResource.XA_OK;
}
else if(xaStatus == XAResource.XA_RDONLY){
}
else { status = Status.STATUS_MARKED_ROLLBACK;
break;
}
}
catch(XAException e){
logger.warn(e);
prepareStatus = XAResource.XA_OK;
switch (e.errorCode) {
case XAException.XA_HEURCOM:
heuristicCode = decideHeuristic(heuristicCode,e.errorCode);
break;
case XAException.XA_HEURRB:
case XAException.XA_HEURMIX:
case XAException.XA_HEURHAZ:
heuristicCode = decideHeuristic(heuristicCode,e.errorCode);
if (status == Status.STATUS_PREPARING)
status = Status.STATUS_MARKED_ROLLBACK;
break;
default:
if (status == Status.STATUS_PREPARING)
status = Status.STATUS_MARKED_ROLLBACK;
break;
}
try {
keyXARes.getXaResource().forget(keyXARes.getXid());
}
catch (XAException ex) {
logger.warn(e);
}
}
catch(Throwable e){
logger.warn(e);
status = Status.STATUS_MARKED_ROLLBACK;
}
}
status = Status.STATUS_PREPARED;
if(prepareStatus == XAResource.XA_RDONLY) { status = Status.STATUS_COMMITTED;
}
else {
if(heuristicCode == XAException.XA_RETRY || heuristicCode == XAException.XA_HEURCOM){
status = Status.STATUS_COMMITTING;
List temp = new ArrayList(enlistedXARes);
for(Iterator it = temp.iterator();it.hasNext();) {
KeyXAResource keyXARes = (KeyXAResource)it.next();
try {
keyXARes.getXaResource().commit(keyXARes.getXid(),false);
}
catch (XAException e) {
logger.warn(e);
switch (e.errorCode) {
case XAException.XA_HEURRB:
case XAException.XA_HEURCOM:
case XAException.XA_HEURMIX:
case XAException.XA_HEURHAZ:
heuristicCode = decideHeuristic(heuristicCode,e.errorCode);
break;
default: break;
}
try {
keyXARes.getXaResource().forget(keyXARes.getXid());
}
catch (XAException ex) {
ex.printStackTrace();
}
}
catch (Throwable t) {
t.printStackTrace();
}
}
status = Status.STATUS_COMMITTED;
checkHeuristics(heuristicCode);
}
else {
status = Status.STATUS_ROLLING_BACK;
List temp = new ArrayList(enlistedXARes);
for(Iterator it = temp.iterator();it.hasNext();) {
KeyXAResource keyXARes = (KeyXAResource)it.next();
try {
keyXARes.getXaResource().rollback(keyXARes.getXid());
}
catch (XAException e) {
logger.warn(e);
switch (e.errorCode) {
case XAException.XA_HEURRB:
case XAException.XA_HEURCOM:
case XAException.XA_HEURMIX:
case XAException.XA_HEURHAZ:
decideHeuristic(heuristicCode,e.errorCode);
break;
default: break;
}
try {
keyXARes.getXaResource().forget(keyXARes.getXid());
}
catch (XAException ex) {
ex.printStackTrace();
}
}
catch(Throwable t){
t.printStackTrace();
}
}
status = Status.STATUS_ROLLEDBACK;
checkHeuristics(heuristicCode);
throw new RollbackException("Unable to commit because of heuristic code, tx=" + toString());
}
}
}
private int decideHeuristic(int heuristicCode, int xaExceptionCode){
switch (xaExceptionCode) {
case XAException.XA_HEURMIX:
heuristicCode = XAException.XA_HEURMIX;
break;
case XAException.XA_HEURRB:
if (heuristicCode == XAException.XA_RETRY)
heuristicCode = XAException.XA_HEURRB;
else if (heuristicCode == XAException.XA_HEURCOM ||
heuristicCode == XAException.XA_HEURHAZ)
heuristicCode = XAException.XA_HEURMIX;
break;
case XAException.XA_HEURCOM:
if (heuristicCode == XAException.XA_RETRY)
heuristicCode = XAException.XA_HEURCOM;
else if (heuristicCode == XAException.XA_HEURRB ||
heuristicCode == XAException.XA_HEURHAZ)
heuristicCode = XAException.XA_HEURMIX;
break;
case XAException.XA_HEURHAZ:
if (heuristicCode == XAException.XA_RETRY)
heuristicCode = XAException.XA_HEURHAZ;
else if (heuristicCode == XAException.XA_HEURCOM ||
heuristicCode == XAException.XA_HEURRB)
heuristicCode = XAException.XA_HEURMIX;
break;
default:
throw new IllegalArgumentException();
}
return heuristicCode;
}
private void checkHeuristics(int heuristicCode) throws HeuristicMixedException,
HeuristicRollbackException
{
switch (heuristicCode) {
case XAException.XA_HEURHAZ:
case XAException.XA_HEURMIX:
throw new HeuristicMixedException();
case XAException.XA_HEURRB:
throw new HeuristicRollbackException();
case XAException.XA_HEURCOM:
return;
}
}
public synchronized void rollback() throws IllegalStateException, SystemException {
logger.debug(
new StringBuffer()
.append("Transaction.rollback(), tx= ")
.append(this.toString())
.append(" status=")
.append(StatusHelper.toString(status))
.toString());
switch (status) {
case Status.STATUS_ACTIVE :
case Status.STATUS_MARKED_ROLLBACK :
break;
case Status.STATUS_ROLLEDBACK :
logger.warn("Transaction.rollback(): already rolled back");
return;
default :
logger.error("Transaction.rollback(): bad status");
throw new IllegalStateException("Cannot rollback(), " +
"tx=" + toString() +
" status=" +
StatusHelper.toString(status));
}
doBeforeCompletion();
status = Status.STATUS_ROLLING_BACK;
List temp = new ArrayList(enlistedXARes);
int size = temp.size();
for (int i = 0; i < size; i++) {
KeyXAResource keyXARes = ((KeyXAResource)temp.get(i));
try {
endResource(keyXARes,XAResource.TMSUCCESS);
keyXARes.getXaResource().rollback(keyXARes.getXid());
}
catch (XAException e) {
logger.error("rollback XAResource " + keyXARes.getXaResource() + " failed.", e);
}
}
status = Status.STATUS_ROLLEDBACK;
doAfterCompletion();
}
public synchronized void setRollbackOnly() throws IllegalStateException, SystemException {
logger.debug(
new StringBuffer()
.append("Transaction.setRollbackOnly(), tx= ")
.append(this.toString())
.append(" status=")
.append(StatusHelper.toString(status))
.toString());
switch (status) {
case Status.STATUS_ACTIVE:
case Status.STATUS_PREPARING:
case Status.STATUS_PREPARED:
status = Status.STATUS_MARKED_ROLLBACK;
return;
case Status.STATUS_MARKED_ROLLBACK:
case Status.STATUS_ROLLING_BACK:
return;
case Status.STATUS_COMMITTING:
throw new IllegalStateException("started committing.");
case Status.STATUS_COMMITTED:
throw new IllegalStateException("already committed.");
case Status.STATUS_ROLLEDBACK:
throw new IllegalStateException("already rolled back.");
case Status.STATUS_NO_TRANSACTION:
throw new IllegalStateException("no Transaction.");
case Status.STATUS_UNKNOWN:
throw new IllegalStateException("unknown state");
default:
throw new IllegalStateException("Illegal status: " + StatusHelper.toString(status));
}
}
public String toString() {
return "Transaction [" + xid.toString() + "]";
}
public synchronized void run() {
timeout();
}
void timeout() {
try {
logger.debug(
new StringBuffer()
.append("Transaction timeout, tx= ")
.append(this.toString())
.append(" status=")
.append(StatusHelper.toString(status))
.toString());
this.setRollbackOnly();
}
catch(Exception e){
logger.warn(e.getMessage(),e);
}
}
private void doBeforeCompletion() {
try {
for (int i = 0; i < syncs.size(); i++) {
((Synchronization)syncs.get(i)).beforeCompletion();
}
}
catch (Throwable t) {
logger.warn("Synchronization.beforeCompletion()",t);
status = Status.STATUS_MARKED_ROLLBACK;
}
}
private void doAfterCompletion() {
try {
for (int i = 0; i < syncs.size(); i++) {
((Synchronization)syncs.get(i)).afterCompletion(status);
}
}
catch (Throwable t) {
logger.warn("Synchronization.afterCompletion()",t);
}
}
synchronized void suspend() throws SystemException {
logger.debug(
new StringBuffer()
.append("Transaction.suspend(), tx= ")
.append(this.toString())
.append(" status=")
.append(StatusHelper.toString(status))
.toString());
suspendedXARes.addAll(enlistedXARes);
for (int i = 0; i < suspendedXARes.size(); i++) {
delistResource(((KeyXAResource) suspendedXARes.get(i)).getXaResource(), XAResource.TMSUSPEND);
}
enlistedXARes.clear();
}
synchronized void resume() throws SystemException, RollbackException {
logger.debug(
new StringBuffer()
.append("Transaction.resume(), tx= ")
.append(this.toString())
.append(" status=")
.append(StatusHelper.toString(status))
.toString());
List temp = new ArrayList(suspendedXARes);
for (int i = 0; i < temp.size(); i++) {
enlistResource(((KeyXAResource)temp.get(i)).getXaResource());
}
suspendedXARes.clear();
}
private void endResource(KeyXAResource keyXAres, int flag) throws XAException {
logger.debug(
new StringBuffer()
.append("XAResource.end(), tx= ")
.append(this.toString())
.append(" status=")
.append(StatusHelper.toString(status))
.append(" flag = " + flag)
.toString());
keyXAres.getXaResource().end(keyXAres.getXid(),flag);
if (flag == XAResource.TMFAIL) status = Status.STATUS_MARKED_ROLLBACK;
enlistedXARes.remove(keyXAres);
}
public static void main(String[] args) {
}
private class KeyXAResource {
private XAResource xaRes = null;
private Xid branchXid = null;
public KeyXAResource(XAResource xaRes) {
this.xaRes = xaRes;
}
public synchronized Xid getXid(){
if(branchXid == null) {
branchXid = XidImpl.createBranchXid(xid,branchCount++);
}
return branchXid;
}
public XAResource getXaResource() {
return xaRes;
}
public int hashCode() {
return 0;
}
public boolean equals(Object obj) {
if(!(obj instanceof KeyXAResource)) {
return false;
}
else {
try {
return ((KeyXAResource)obj).getXaResource().isSameRM(xaRes);
}
catch(XAException e){
logger.warn(e);
return false;
}
}
}
}
}