/* * JBoss, Home of Professional Open Source. * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors * as indicated by the @author tags. See the copyright.txt file in the * distribution for a full listing of individual contributors. * * This is free software; you can redistribute it and/or modify it * under the terms of the GNU Lesser General Public License as * published by the Free Software Foundation; either version 2.1 of * the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this software; if not, write to the Free * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA * 02110-1301 USA, or see the FSF site: http://www.fsf.org. */ package org.jboss.cache.loader; import net.jcip.annotations.ThreadSafe; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.jboss.cache.CacheStatus; import org.jboss.cache.Fqn; import org.jboss.cache.Modification; import org.jboss.cache.NodeSPI; import org.jboss.cache.RegionManager; import org.jboss.cache.ReplicationException; import org.jboss.cache.commands.CommandsFactory; import org.jboss.cache.commands.DataCommand; import org.jboss.cache.commands.read.ExistsCommand; import org.jboss.cache.commands.read.GetChildrenNamesCommand; import org.jboss.cache.commands.read.GetDataMapCommand; import org.jboss.cache.commands.read.GetKeyValueCommand; import org.jboss.cache.commands.remote.ClusteredGetCommand; import org.jboss.cache.config.CacheLoaderConfig.IndividualCacheLoaderConfig; import org.jboss.cache.factories.annotations.Inject; import org.jboss.cache.lock.StripedLock; import org.jgroups.Address; import org.jgroups.blocks.GroupRequest; import org.jgroups.blocks.RspFilter; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; /** * A cache loader that consults other members in the cluster for values. Does * not propagate update methods since replication should take care of this. A * timeout property is required, a long that * specifies in milliseconds how long to wait for results before returning a * null. * * @author Manik Surtani (manik AT jboss DOT org) */ @ThreadSafe public class ClusteredCacheLoader extends AbstractCacheLoader { private static final Log log = LogFactory.getLog(ClusteredCacheLoader.class); private static final boolean trace = log.isTraceEnabled(); private StripedLock lock = new StripedLock(); private ClusteredCacheLoaderConfig config; private CommandsFactory commandsFactory; /** * A test to check whether the cache is in its started state. If not, calls should not be made as the channel may * not have properly started, blocks due to state transfers may be in progress, etc. * * @return true if the cache is in its STARTED state. */ protected boolean isCacheReady() { return cache.getCacheStatus() == CacheStatus.STARTED; } @Inject public void setCommandsFactory(CommandsFactory commandsFactory) { this.commandsFactory = commandsFactory; } /** * Sets the configuration. * A property timeout is used as the timeout value. */ public void setConfig(IndividualCacheLoaderConfig base) { if (base instanceof ClusteredCacheLoaderConfig) { this.config = (ClusteredCacheLoaderConfig) base; } else { config = new ClusteredCacheLoaderConfig(base); } } public IndividualCacheLoaderConfig getConfig() { return config; } public Set getChildrenNames(Fqn fqn) throws Exception { if (!isCacheReady() || !cache.getInvocationContext().isOriginLocal()) return Collections.emptySet(); lock.acquireLock(fqn, true); try { GetChildrenNamesCommand command = commandsFactory.buildGetChildrenNamesCommand(fqn); Object resp = callRemote(command); return (Set) resp; } finally { lock.releaseLock(fqn); } } @SuppressWarnings("deprecation") private Object callRemote(DataCommand dataCommand) throws Exception { if (trace) log.trace("cache=" + cache.getLocalAddress() + "; calling with " + dataCommand); ClusteredGetCommand clusteredGet = commandsFactory.buildClusteredGetCommand(false, dataCommand); List resps; // JBCACHE-1186 resps = cache.getRPCManager().callRemoteMethods(null, clusteredGet, GroupRequest.GET_ALL, config.getTimeout(), new ResponseValidityFilter(cache.getMembers(), cache.getLocalAddress()), false); if (resps == null) { if (log.isInfoEnabled()) log.info("No replies to call " + dataCommand + ". Perhaps we're alone in the cluster?"); throw new ReplicationException("No replies to call " + dataCommand + ". Perhaps we're alone in the cluster?"); } else { // test for and remove exceptions Iterator i = resps.iterator(); Object result = null; while (i.hasNext()) { Object o = i.next(); if (o instanceof Exception) { if (log.isDebugEnabled()) log.debug("Found remote exception among responses - removing from responses list", (Exception) o); } else if (o != null) { // keep looping till we find a FOUND answer. List clusteredGetResp = (List) o; // found? if (clusteredGetResp.get(0)) { result = clusteredGetResp.get(1); break; } } else if (!cache.getConfiguration().isUseRegionBasedMarshalling()) { throw new IllegalStateException("Received unexpected null response to " + clusteredGet); } // else region was inactive on peer; // keep looping to see if anyone else responded } if (trace) log.trace("got responses " + resps); return result; } } public Map get(Fqn name) throws Exception { return get0(name); } protected Map get0(Fqn name) throws Exception { // DON'T make a remote call if this is a remote call in the first place - leads to deadlocks - JBCACHE-1103 if (!isCacheReady() || !cache.getInvocationContext().isOriginLocal()) return null; // return Collections.emptyMap(); lock.acquireLock(name, true); try { GetDataMapCommand command = commandsFactory.buildGetDataMapCommand(name); Object resp = callRemote(command); return (Map) resp; } finally { lock.releaseLock(name); } } public boolean exists(Fqn name) throws Exception { // DON'T make a remote call if this is a remote call in the first place - leads to deadlocks - JBCACHE-1103 if (!isCacheReady() || !cache.getInvocationContext().isOriginLocal()) return false; lock.acquireLock(name, false); try { ExistsCommand command = commandsFactory.buildExistsNodeCommand(name); Object resp = callRemote(command); return resp != null && (Boolean) resp; } finally { lock.releaseLock(name); } } public Object put(Fqn name, Object key, Object value) throws Exception { // DON'T make a remote call if this is a remote call in the first place - leads to deadlocks - JBCACHE-1103 if (!isCacheReady() || !cache.getInvocationContext().isOriginLocal()) return null; lock.acquireLock(name, true); try { NodeSPI n = cache.peek(name, false); if (n == null) { GetKeyValueCommand command = commandsFactory.buildGetKeyValueCommand(name, key, true); return callRemote(command); } else { // dont bother with a remote call return n.getDirect(key); } } finally { lock.releaseLock(name); } } /** * Does nothing; replication handles put. */ public void put(Fqn name, Map attributes) throws Exception { } /** * Does nothing; replication handles put. */ @Override public void put(List modifications) throws Exception { } /** * Fetches the remove value, does not remove. Replication handles * removal. */ public Object remove(Fqn name, Object key) throws Exception { // DON'T make a remote call if this is a remote call in the first place - leads to deadlocks - JBCACHE-1103 if (!isCacheReady() || !cache.getInvocationContext().isOriginLocal()) return false; lock.acquireLock(name, true); try { NodeSPI n = cache.peek(name, true); if (n == null) { GetKeyValueCommand command = commandsFactory.buildGetKeyValueCommand(name, key, true); return callRemote(command); } else { // dont bother with a remote call return n.getDirect(key); } } finally { lock.releaseLock(name); } } /** * Does nothing; replication handles removal. */ public void remove(Fqn name) throws Exception { // do nothing } /** * Does nothing; replication handles removal. */ public void removeData(Fqn name) throws Exception { } /** * Does nothing. */ @Override public void prepare(Object tx, List modifications, boolean one_phase) throws Exception { } /** * Does nothing. */ @Override public void commit(Object tx) throws Exception { } /** * Does nothing. */ @Override public void rollback(Object tx) { } @Override public void loadEntireState(ObjectOutputStream os) throws Exception { //intentional no-op } @Override public void loadState(Fqn subtree, ObjectOutputStream os) throws Exception { // intentional no-op } @Override public void storeEntireState(ObjectInputStream is) throws Exception { // intentional no-op } @Override public void storeState(Fqn subtree, ObjectInputStream is) throws Exception { // intentional no-op } @Override public void setRegionManager(RegionManager manager) { } public static class ResponseValidityFilter implements RspFilter { private int numValidResponses = 0; private List
pendingResponders; public ResponseValidityFilter(List
expected, Address localAddress) { this.pendingResponders = new ArrayList
(expected); // We'll never get a response from ourself this.pendingResponders.remove(localAddress); } public boolean isAcceptable(Object object, Address address) { pendingResponders.remove(address); if (object instanceof List) { List response = (List) object; Boolean foundResult = (Boolean) response.get(0); if (foundResult) numValidResponses++; } // always return true to make sure a response is logged by the JGroups RpcDispatcher. return true; } public boolean needMoreResponses() { return numValidResponses < 1 && pendingResponders.size() > 0; } } }