001/* 002 * HA-JDBC: High-Availability JDBC 003 * Copyright (c) 2004-2007 Paul Ferraro 004 * 005 * This library is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Lesser General Public License as published by the 007 * Free Software Foundation; either version 2.1 of the License, or (at your 008 * option) any later version. 009 * 010 * This library is distributed in the hope that it will be useful, but WITHOUT 011 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 012 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Lesser General Public License 016 * along with this library; if not, write to the Free Software Foundation, 017 * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 018 * 019 * Contact: ferraro@users.sourceforge.net 020 */ 021package net.sf.hajdbc.sql; 022 023import java.lang.reflect.InvocationHandler; 024import java.lang.reflect.Method; 025import java.sql.SQLException; 026import java.util.HashMap; 027import java.util.Iterator; 028import java.util.List; 029import java.util.Map; 030import java.util.Set; 031import java.util.SortedMap; 032import java.util.WeakHashMap; 033 034import net.sf.hajdbc.Database; 035import net.sf.hajdbc.DatabaseCluster; 036import net.sf.hajdbc.Messages; 037import net.sf.hajdbc.util.SQLExceptionFactory; 038import net.sf.hajdbc.util.reflect.Methods; 039 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043/** 044 * @author Paul Ferraro 045 * @param <D> 046 * @param <T> 047 */ 048@SuppressWarnings("nls") 049public abstract class AbstractInvocationHandler<D, T> implements InvocationHandler, SQLProxy<D, T> 050{ 051 private static final Method equalsMethod = Methods.getMethod(Object.class, "equals", Object.class); 052 private static final Method hashCodeMethod = Methods.getMethod(Object.class, "hashCode"); 053 private static final Method toStringMethod = Methods.getMethod(Object.class, "toString"); 054 /* JDBC 4.0 methods */ 055 private static final Method isWrapperForMethod = Methods.findMethod("java.sql.Wrapper", "isWrapperFor", Class.class); 056 private static final Method unwrapMethod = Methods.findMethod("java.sql.Wrapper", "unwrap", Class.class); 057 058 protected Logger logger = LoggerFactory.getLogger(this.getClass()); 059 060 protected DatabaseCluster<D> cluster; 061 private Class<T> proxyClass; 062 private Map<Database<D>, T> objectMap; 063 private Map<SQLProxy<D, ?>, Void> childMap = new WeakHashMap<SQLProxy<D, ?>, Void>(); 064 private Map<Method, Invoker<D, T, ?>> invokerMap = new HashMap<Method, Invoker<D, T, ?>>(); 065 066 /** 067 * @param cluster the database cluster 068 * @param proxyClass the interface being proxied 069 * @param objectMap a map of database to sql object. 070 */ 071 protected AbstractInvocationHandler(DatabaseCluster<D> cluster, Class<T> proxyClass, Map<Database<D>, T> objectMap) 072 { 073 this.cluster = cluster; 074 this.proxyClass = proxyClass; 075 this.objectMap = objectMap; 076 } 077 078 /** 079 * @see java.lang.reflect.InvocationHandler#invoke(java.lang.Object, java.lang.reflect.Method, java.lang.Object[]) 080 */ 081 @SuppressWarnings("unchecked") 082 @Override 083 public final Object invoke(Object object, Method method, Object[] parameters) throws Exception 084 { 085 if (method.equals(toStringMethod)) return ""; 086 087 if (!this.cluster.isActive()) 088 { 089 throw new SQLException(Messages.getMessage(Messages.CLUSTER_NOT_ACTIVE, this.cluster)); 090 } 091 092 T proxy = this.proxyClass.cast(object); 093 094 InvocationStrategy strategy = this.getInvocationStrategy(proxy, method, parameters); 095 Invoker invoker = this.getInvoker(proxy, method, parameters); 096 097 Object result = strategy.invoke(this, invoker); 098 099 this.record(invoker, method, parameters); 100 101 this.postInvoke(proxy, method, parameters); 102 103 return result; 104 } 105 106 /** 107 * Returns the appropriate {@link InvocationStrategy} for the specified method. 108 * This implementation detects {@link java.sql.Wrapper} methods; and {@link Object#equals}, {@link Object#hashCode()}, and {@link Object#toString()}. 109 * Default invocation strategy is {@link DatabaseWriteInvocationStrategy}. 110 * @param object the proxied object 111 * @param method the method to invoke 112 * @param parameters the method invocation parameters 113 * @return an invocation strategy 114 * @throws Exception 115 */ 116 protected InvocationStrategy<D, T, ?> getInvocationStrategy(final T object, Method method, final Object[] parameters) throws Exception 117 { 118 // Most Java 1.6 sql classes implement java.sql.Wrapper 119 if (((isWrapperForMethod != null) && method.equals(isWrapperForMethod)) || ((unwrapMethod != null) && method.equals(unwrapMethod))) 120 { 121 return new DriverReadInvocationStrategy<D, T, Object>(); 122 } 123 124 if (method.equals(equalsMethod)) 125 { 126 return new InvocationStrategy<D, T, Boolean>() 127 { 128 public Boolean invoke(SQLProxy<D, T> proxy, Invoker<D, T, Boolean> invoker) 129 { 130 return object == parameters[0]; 131 } 132 }; 133 } 134 135 if (method.equals(hashCodeMethod) || method.equals(toStringMethod)) 136 { 137 return new DriverReadInvocationStrategy<D, T, Object>(); 138 } 139 140 return new DatabaseWriteInvocationStrategy<D, T, Object>(this.cluster.getNonTransactionalExecutor()); 141 } 142 143 /** 144 * Return the appropriate invoker for the specified method. 145 * @param object 146 * @param method 147 * @param parameters 148 * @return an invoker 149 * @throws Exception 150 */ 151 protected Invoker<D, T, ?> getInvoker(T object, Method method, Object[] parameters) throws Exception 152 { 153 if (this.isSQLMethod(method)) 154 { 155 long now = System.currentTimeMillis(); 156 157 if (this.cluster.isCurrentTimestampEvaluationEnabled()) 158 { 159 parameters[0] = this.cluster.getDialect().evaluateCurrentTimestamp((String) parameters[0], new java.sql.Timestamp(now)); 160 } 161 162 if (this.cluster.isCurrentDateEvaluationEnabled()) 163 { 164 parameters[0] = this.cluster.getDialect().evaluateCurrentDate((String) parameters[0], new java.sql.Date(now)); 165 } 166 167 if (this.cluster.isCurrentTimeEvaluationEnabled()) 168 { 169 parameters[0] = this.cluster.getDialect().evaluateCurrentTime((String) parameters[0], new java.sql.Time(now)); 170 } 171 172 if (this.cluster.isRandEvaluationEnabled()) 173 { 174 parameters[0] = this.cluster.getDialect().evaluateRand((String) parameters[0]); 175 } 176 } 177 178 return new SimpleInvoker(method, parameters); 179 } 180 181 /** 182 * Indicates whether or not the specified method accepts a SQL string as its first parameter. 183 * @param method a method 184 * @return true, if the specified method accepts a SQL string as its first parameter, false otherwise. 185 */ 186 protected boolean isSQLMethod(Method method) 187 { 188 return false; 189 } 190 191 /** 192 * Called after method is invoked. 193 * @param proxy the proxied object 194 * @param method the method that was just invoked 195 * @param parameters the parameters of the method that was just invoked 196 */ 197 protected void postInvoke(T proxy, Method method, Object[] parameters) 198 { 199 // Do nothing 200 } 201 202 /** 203 * @see net.sf.hajdbc.sql.SQLProxy#entry() 204 */ 205 @Override 206 public Map.Entry<Database<D>, T> entry() 207 { 208 synchronized (this.objectMap) 209 { 210 return this.objectMap.entrySet().iterator().next(); 211 } 212 } 213 214 /** 215 * @see net.sf.hajdbc.sql.SQLProxy#entries() 216 */ 217 @Override 218 public Set<Map.Entry<Database<D>, T>> entries() 219 { 220 synchronized (this.objectMap) 221 { 222 return this.objectMap.entrySet(); 223 } 224 } 225 226 /** 227 * @see net.sf.hajdbc.sql.SQLProxy#addChild(net.sf.hajdbc.sql.SQLProxy) 228 */ 229 @Override 230 public final void addChild(SQLProxy<D, ?> child) 231 { 232 synchronized (this.childMap) 233 { 234 this.childMap.put(child, null); 235 } 236 } 237 238 /** 239 * @see net.sf.hajdbc.sql.SQLProxy#removeChildren() 240 */ 241 @Override 242 public final void removeChildren() 243 { 244 synchronized (this.childMap) 245 { 246 this.childMap.clear(); 247 } 248 } 249 250 /** 251 * @see net.sf.hajdbc.sql.SQLProxy#removeChild(net.sf.hajdbc.sql.SQLProxy) 252 */ 253 @Override 254 public final void removeChild(SQLProxy<D, ?> child) 255 { 256 child.removeChildren(); 257 258 synchronized (this.childMap) 259 { 260 this.childMap.remove(child); 261 } 262 } 263 264 /** 265 * Returns the underlying SQL object for the specified database. 266 * If the sql object does not exist (this might be the case if the database was newly activated), it will be created from the stored operation. 267 * Any recorded operations are also executed. If the object could not be created, or if any of the executed operations failed, then the specified database is deactivated. 268 * @param database a database descriptor. 269 * @return an underlying SQL object 270 */ 271 @Override 272 public T getObject(Database<D> database) 273 { 274 synchronized (this.objectMap) 275 { 276 T object = this.objectMap.get(database); 277 278 if (object == null) 279 { 280 try 281 { 282 object = this.createObject(database); 283 284 this.replay(database, object); 285 286 this.objectMap.put(database, object); 287 } 288 catch (Exception e) 289 { 290 if (!this.objectMap.isEmpty() && this.cluster.deactivate(database, this.cluster.getStateManager())) 291 { 292 this.logger.warn(Messages.getMessage(Messages.SQL_OBJECT_INIT_FAILED, this.getClass().getName(), database), e); 293 } 294 } 295 } 296 297 return object; 298 } 299 } 300 301 protected abstract T createObject(Database<D> database) throws Exception; 302 303 protected void record(Invoker<D, T, ?> invoker, Method method, Object[] parameters) 304 { 305 // Record only the last invocation of a given recordable method 306 if (this.isRecordable(method)) 307 { 308 synchronized (this.invokerMap) 309 { 310 this.invokerMap.put(method, invoker); 311 } 312 } 313 } 314 315 protected boolean isRecordable(Method method) 316 { 317 return false; 318 } 319 320 protected void replay(Database<D> database, T object) throws Exception 321 { 322 synchronized (this.invokerMap) 323 { 324 for (Invoker<D, T, ?> invoker: this.invokerMap.values()) 325 { 326 invoker.invoke(database, object); 327 } 328 } 329 } 330 331 /** 332 * @see net.sf.hajdbc.sql.SQLProxy#retain(java.util.Set) 333 */ 334 @Override 335 public final void retain(Set<Database<D>> databaseSet) 336 { 337 synchronized (this.childMap) 338 { 339 for (SQLProxy<D, ?> child: this.childMap.keySet()) 340 { 341 child.retain(databaseSet); 342 } 343 } 344 345 synchronized (this.objectMap) 346 { 347 Iterator<Map.Entry<Database<D>, T>> mapEntries = this.objectMap.entrySet().iterator(); 348 349 while (mapEntries.hasNext()) 350 { 351 Map.Entry<Database<D>, T> mapEntry = mapEntries.next(); 352 353 Database<D> database = mapEntry.getKey(); 354 355 if (!databaseSet.contains(database)) 356 { 357 T object = mapEntry.getValue(); 358 359 if (object != null) 360 { 361 this.close(database, object); 362 } 363 364 mapEntries.remove(); 365 } 366 } 367 } 368 } 369 370 protected abstract void close(Database<D> database, T object); 371 372 /** 373 * @see net.sf.hajdbc.sql.SQLProxy#getDatabaseCluster() 374 */ 375 @Override 376 public final DatabaseCluster<D> getDatabaseCluster() 377 { 378 return this.cluster; 379 } 380 381 /** 382 * @see net.sf.hajdbc.sql.SQLProxy#handleFailure(net.sf.hajdbc.Database, java.lang.Exception) 383 */ 384 @Override 385 public void handleFailure(Database<D> database, Exception exception) throws Exception 386 { 387 Set<Database<D>> databaseSet = this.cluster.getBalancer().all(); 388 389 // If cluster has only one database left, don't deactivate 390 if (databaseSet.size() <= 1) 391 { 392 throw exception; 393 } 394 395 Map<Boolean, List<Database<D>>> aliveMap = this.cluster.getAliveMap(databaseSet); 396 397 this.detectClusterPanic(aliveMap); 398 399 List<Database<D>> aliveList = aliveMap.get(true); 400 401 // If all are dead, assume the worst and throw caught exception 402 // If failed database is alive, then throw caught exception 403 if (aliveList.isEmpty() || aliveList.contains(database)) 404 { 405 throw exception; 406 } 407 408 // Otherwise deactivate failed database 409 if (this.cluster.deactivate(database, this.cluster.getStateManager())) 410 { 411 this.logger.error(Messages.getMessage(Messages.DATABASE_DEACTIVATED, database, this), exception); 412 } 413 } 414 415 /** 416 * @see net.sf.hajdbc.sql.SQLProxy#handleFailures(java.util.SortedMap) 417 */ 418 @Override 419 public void handleFailures(SortedMap<Database<D>, Exception> exceptionMap) throws Exception 420 { 421 if (exceptionMap.size() == 1) 422 { 423 throw exceptionMap.get(exceptionMap.firstKey()); 424 } 425 426 Map<Boolean, List<Database<D>>> aliveMap = this.cluster.getAliveMap(exceptionMap.keySet()); 427 428 this.detectClusterPanic(aliveMap); 429 430 List<Database<D>> aliveList = aliveMap.get(true); 431 List<Database<D>> deadList = aliveMap.get(false); 432 433 if (!aliveList.isEmpty()) 434 { 435 for (Database<D> database: deadList) 436 { 437 if (this.cluster.deactivate(database, this.cluster.getStateManager())) 438 { 439 this.logger.error(Messages.getMessage(Messages.DATABASE_DEACTIVATED, database, this.cluster), exceptionMap.get(database)); 440 } 441 } 442 } 443 444 List<Database<D>> list = aliveList.isEmpty() ? deadList : aliveList; 445 446 SQLException exception = SQLExceptionFactory.createSQLException(exceptionMap.get(list.get(0))); 447 448 for (Database<D> database: list.subList(1, list.size())) 449 { 450 exception.setNextException(SQLExceptionFactory.createSQLException(exceptionMap.get(database))); 451 } 452 453 throw exception; 454 } 455 456 /** 457 * @see net.sf.hajdbc.sql.SQLProxy#handlePartialFailure(java.util.SortedMap, java.util.SortedMap) 458 */ 459 @Override 460 public <R> SortedMap<Database<D>, R> handlePartialFailure(SortedMap<Database<D>, R> resultMap, SortedMap<Database<D>, Exception> exceptionMap) throws Exception 461 { 462 Map<Boolean, List<Database<D>>> aliveMap = this.cluster.getAliveMap(exceptionMap.keySet()); 463 464 // Assume success databases are alive 465 aliveMap.get(true).addAll(resultMap.keySet()); 466 467 this.detectClusterPanic(aliveMap); 468 469 for (Map.Entry<Database<D>, Exception> exceptionMapEntry: exceptionMap.entrySet()) 470 { 471 Database<D> database = exceptionMapEntry.getKey(); 472 Exception exception = exceptionMapEntry.getValue(); 473 474 if (this.cluster.deactivate(database, this.cluster.getStateManager())) 475 { 476 this.logger.error(Messages.getMessage(Messages.DATABASE_DEACTIVATED, database, this.cluster), exception); 477 } 478 } 479 480 return resultMap; 481 } 482 483 /** 484 * Detect cluster panic if all conditions are met: 485 * <ul> 486 * <li>We're in distributable mode</li> 487 * <li>We're the only group member</li> 488 * <li>All alive databases are local</li> 489 * <li>All dead databases are remote</li> 490 * </ul> 491 * @param aliveMap 492 * @throws Exception 493 */ 494 protected void detectClusterPanic(Map<Boolean, List<Database<D>>> aliveMap) throws Exception 495 { 496 if (this.cluster.getStateManager().isMembershipEmpty()) 497 { 498 List<Database<D>> aliveList = aliveMap.get(true); 499 List<Database<D>> deadList = aliveMap.get(false); 500 501 if (!aliveList.isEmpty() && !deadList.isEmpty() && sameProximity(aliveList, true) && sameProximity(deadList, false)) 502 { 503 this.cluster.stop(); 504 505 String message = Messages.getMessage(Messages.CLUSTER_PANIC_DETECTED, this.cluster); 506 507 this.logger.error(message); 508 509 throw new SQLException(message); 510 } 511 } 512 } 513 514 private boolean sameProximity(List<Database<D>> databaseList, boolean local) 515 { 516 boolean same = true; 517 518 for (Database<D> database: databaseList) 519 { 520 same &= (database.isLocal() == local); 521 } 522 523 return same; 524 } 525 526 protected class SimpleInvoker implements Invoker<D, T, Object> 527 { 528 private Method method; 529 private Object[] parameters; 530 531 /** 532 * @param method 533 * @param parameters 534 */ 535 public SimpleInvoker(Method method, Object[] parameters) 536 { 537 this.method = method; 538 this.parameters = parameters; 539 } 540 541 /** 542 * @see net.sf.hajdbc.sql.Invoker#invoke(net.sf.hajdbc.Database, java.lang.Object) 543 */ 544 @Override 545 public Object invoke(Database<D> database, T object) throws Exception 546 { 547 return Methods.invoke(this.method, object, this.parameters); 548 } 549 } 550}