/* * JBoss, Home of Professional Open Source. * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors * as indicated by the @author tags. See the copyright.txt file in the * distribution for a full listing of individual contributors. * * This is free software; you can redistribute it and/or modify it * under the terms of the GNU Lesser General Public License as * published by the Free Software Foundation; either version 2.1 of * the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this software; if not, write to the Free * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA * 02110-1301 USA, or see the FSF site: http://www.fsf.org. */ package org.jboss.cache.loader; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.jboss.cache.CacheException; import org.jboss.cache.Fqn; import org.jboss.cache.Modification; import org.jboss.cache.config.CacheLoaderConfig.IndividualCacheLoaderConfig; import org.jboss.cache.loader.tcp.TcpCacheOperations; import org.jboss.cache.util.concurrent.SynchronizedRestarter; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.Socket; import java.util.List; import java.util.Map; import java.util.Set; /** * DelegatingCacheLoader implementation which delegates to a remote (not in the same VM) * CacheImpl using TCP/IP for communication. Example configuration for connecting to a TcpCacheServer * running at myHost:12345:
** * @author Bela Ban * */ public class TcpDelegatingCacheLoader extends AbstractCacheLoader { volatile private Socket sock; private TcpDelegatingCacheLoaderConfig config; volatile ObjectInputStream in; volatile ObjectOutputStream out; private static final Log log = LogFactory.getLog(TcpDelegatingCacheLoader.class); private static final boolean trace = log.isTraceEnabled(); private final SynchronizedRestarter restarter = new SynchronizedRestarter(); private static Method GET_CHILDREN_METHOD, GET_METHOD, PUT_KEY_METHOD, PUT_DATA_METHOD, REMOVE_KEY_METHOD, REMOVE_METHOD, PUT_MODS_METHOD, EXISTS_METHOD, REMOVE_DATA_METHOD; static { try { GET_CHILDREN_METHOD = TcpDelegatingCacheLoader.class.getDeclaredMethod("_getChildrenNames", Fqn.class); GET_METHOD = TcpDelegatingCacheLoader.class.getDeclaredMethod("_get", Fqn.class); EXISTS_METHOD = TcpDelegatingCacheLoader.class.getDeclaredMethod("_exists", Fqn.class); PUT_KEY_METHOD = TcpDelegatingCacheLoader.class.getDeclaredMethod("_put", Fqn.class, Object.class, Object.class); PUT_DATA_METHOD = TcpDelegatingCacheLoader.class.getDeclaredMethod("_put", Fqn.class, Map.class); REMOVE_KEY_METHOD = TcpDelegatingCacheLoader.class.getDeclaredMethod("_remove", Fqn.class, Object.class); REMOVE_DATA_METHOD = TcpDelegatingCacheLoader.class.getDeclaredMethod("_removeData", Fqn.class); REMOVE_METHOD = TcpDelegatingCacheLoader.class.getDeclaredMethod("_remove", Fqn.class); PUT_MODS_METHOD = TcpDelegatingCacheLoader.class.getDeclaredMethod("_put", List.class); } catch (Exception e) { log.fatal("Unable to initialise reflection methods", e); } } /** * Allows configuration via XML config file. */ public void setConfig(IndividualCacheLoaderConfig base) { if (base instanceof TcpDelegatingCacheLoaderConfig) { this.config = (TcpDelegatingCacheLoaderConfig) base; } else { config = new TcpDelegatingCacheLoaderConfig(base); } } public IndividualCacheLoaderConfig getConfig() { return config; } /** * Invokes the specified Method with the specified parameters, catching SocketExceptions and attempting to reconnect * to the TcpCacheServer if necessary. * * @param m method to invoke * @param params parameters * @return method return value */ protected Object invokeWithRetries(Method m, Object... params) { long endTime = System.currentTimeMillis() + config.getTimeout(); do { try { if (trace) log.trace("About to invoke operation " + m); Object rv = m.invoke(this, params); if (trace) log.trace("Completed invocation of " + m); return rv; } catch (IllegalAccessException e) { log.error("Should never get here!", e); } catch (InvocationTargetException e) { if (e.getCause() instanceof IOException) { try { // sleep 250 ms if (log.isDebugEnabled()) log.debug("Caught IOException. Retrying.", e); Thread.sleep(config.getReconnectWaitTime()); restarter.restartComponent(this); } catch (InterruptedException e1) { Thread.currentThread().interrupt(); } catch (Exception e1) { if (trace) log.trace("Unable to reconnect", e1); } } else { throw new CacheException("Problems invoking method call!", e); } } } while (System.currentTimeMillis() < endTime); throw new CacheException("Unable to communicate with TCPCacheServer(" + config.getHost() + ":" + config.getPort() + ") after " + config.getTimeout() + " millis, with reconnects every " + config.getReconnectWaitTime() + " millis."); } // ------------------ CacheLoader interface methods, which delegate to retry-aware methods public Set> getChildrenNames(Fqn fqn) throws Exception { return (Set>) invokeWithRetries(GET_CHILDREN_METHOD, fqn); } @SuppressWarnings("unchecked") public Maporg.jboss.cache.loader.TcpDelegatingCacheLoader ** host=localhost * port=2099 * *