/* * JBoss, Home of Professional Open Source. * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors * as indicated by the @author tags. See the copyright.txt file in the * distribution for a full listing of individual contributors. * * This is free software; you can redistribute it and/or modify it * under the terms of the GNU Lesser General Public License as * published by the Free Software Foundation; either version 2.1 of * the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this software; if not, write to the Free * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA * 02110-1301 USA, or see the FSF site: http://www.fsf.org. */ package org.jboss.cache; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.jboss.cache.commands.ReplicableCommand; import org.jboss.cache.config.Configuration; import org.jboss.cache.config.Configuration.NodeLockingScheme; import org.jboss.cache.config.ConfigurationException; import org.jboss.cache.config.RuntimeConfig; import org.jboss.cache.factories.ComponentRegistry; import org.jboss.cache.factories.annotations.Inject; import org.jboss.cache.factories.annotations.Start; import org.jboss.cache.factories.annotations.Stop; import org.jboss.cache.interceptors.InterceptorChain; import org.jboss.cache.invocation.InvocationContextContainer; import org.jboss.cache.jmx.annotations.MBean; import org.jboss.cache.jmx.annotations.ManagedAttribute; import org.jboss.cache.jmx.annotations.ManagedOperation; import org.jboss.cache.lock.LockManager; import org.jboss.cache.lock.LockUtil; import org.jboss.cache.lock.TimeoutException; import org.jboss.cache.marshall.CommandAwareRpcDispatcher; import org.jboss.cache.marshall.InactiveRegionAwareRpcDispatcher; import org.jboss.cache.marshall.Marshaller; import org.jboss.cache.notifications.Notifier; import org.jboss.cache.remoting.jgroups.ChannelMessageListener; import org.jboss.cache.statetransfer.DefaultStateTransferManager; import org.jboss.cache.transaction.GlobalTransaction; import org.jboss.cache.transaction.TransactionTable; import org.jboss.cache.util.CachePrinter; import org.jboss.cache.util.concurrent.ReclosableLatch; import org.jboss.cache.util.reflect.ReflectionUtil; import org.jgroups.Address; import org.jgroups.Channel; import org.jgroups.ChannelClosedException; import org.jgroups.ChannelException; import org.jgroups.ChannelFactory; import org.jgroups.ChannelNotConnectedException; import org.jgroups.ExtendedMembershipListener; import org.jgroups.JChannel; import org.jgroups.View; import org.jgroups.blocks.GroupRequest; import org.jgroups.blocks.RspFilter; import org.jgroups.protocols.TP; import org.jgroups.protocols.pbcast.STREAMING_STATE_TRANSFER; import org.jgroups.stack.ProtocolStack; import org.jgroups.util.Rsp; import org.jgroups.util.RspList; import javax.transaction.TransactionManager; import java.net.URL; import java.text.NumberFormat; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Random; import java.util.Set; import java.util.Vector; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantReadWriteLock; /** * Manager that handles all RPC calls between JBoss Cache instances * * @author Manik Surtani (manik AT jboss DOT org) */ @MBean(objectName = "RPCManager", description = "Manages RPC connections to remote caches") public class RPCManagerImpl implements RPCManager { private Channel channel; private final Log log = LogFactory.getLog(RPCManagerImpl.class); private final boolean trace = log.isTraceEnabled(); private volatile List
members; private long replicationCount; private long replicationFailures; private boolean statisticsEnabled = false; private final Object coordinatorLock = new Object(); /** * True if this Cache is the coordinator. */ private volatile boolean coordinator = false; /** * The most recent state transfer source */ volatile Address lastStateTransferSource; /** * JGroups RpcDispatcher in use. */ private CommandAwareRpcDispatcher rpcDispatcher = null; /** * JGroups message listener. */ private ChannelMessageListener messageListener; Configuration configuration; private Notifier notifier; private CacheSPI spi; private InvocationContextContainer invocationContextContainer; private Marshaller marshaller; private TransactionManager txManager; private TransactionTable txTable; private InterceptorChain interceptorChain; private boolean isUsingBuddyReplication; private volatile boolean isInLocalMode; private ComponentRegistry componentRegistry; private LockManager lockManager; private FlushTracker flushTracker; @Inject public void setupDependencies(ChannelMessageListener messageListener, Configuration configuration, Notifier notifier, CacheSPI spi, Marshaller marshaller, TransactionTable txTable, TransactionManager txManager, InvocationContextContainer container, InterceptorChain interceptorChain, ComponentRegistry componentRegistry, LockManager lockManager) { this.messageListener = messageListener; this.configuration = configuration; this.notifier = notifier; this.spi = spi; this.marshaller = marshaller; this.txManager = txManager; this.txTable = txTable; this.invocationContextContainer = container; this.interceptorChain = interceptorChain; this.componentRegistry = componentRegistry; this.lockManager = lockManager; } public abstract class FlushTracker { // closed whenever a FLUSH is in progress. Open by default. final ReclosableLatch flushBlockGate = new ReclosableLatch(true); private final AtomicInteger flushCompletionCount = new AtomicInteger(); // closed whenever a FLUSH is NOT in progress. Closed by default. final ReclosableLatch flushWaitGate = new ReclosableLatch(false); public void block() { flushBlockGate.close(); flushWaitGate.open(); } public void unblock() { flushWaitGate.close(); flushCompletionCount.incrementAndGet(); flushBlockGate.open(); } public int getFlushCompletionCount() { return flushCompletionCount.get(); } public abstract void lockProcessingLock() throws InterruptedException; public abstract void unlockProcessingLock(); public abstract void lockSuspendProcessingLock() throws InterruptedException; public abstract void unlockSuspendProcessingLock(); public void waitForFlushCompletion(long timeout) throws InterruptedException { if (channel.flushSupported() && !flushBlockGate.await(timeout, TimeUnit.MILLISECONDS)) { throw new TimeoutException("State retrieval timed out waiting for flush to unblock. (timeout = " + CachePrinter.prettyPrint(timeout) + ")"); } } public void waitForFlushStart(long timeout) throws InterruptedException { if (channel.flushSupported() && !flushWaitGate.await(timeout, TimeUnit.MILLISECONDS)) { throw new TimeoutException("State retrieval timed out waiting for flush to block. (timeout = " + CachePrinter.prettyPrint(timeout) + " )"); } } } private final class StandardFlushTracker extends FlushTracker { // All locking methods are no-ops public void lockProcessingLock() { } public void lockSuspendProcessingLock() { } public void unlockProcessingLock() { } public void unlockSuspendProcessingLock() { } } private final class NonBlockingFlushTracker extends FlushTracker { private final ReentrantReadWriteLock coordinationLock = new ReentrantReadWriteLock(); public void lockProcessingLock() throws InterruptedException { while (true) { try { if (!coordinationLock.readLock().tryLock(configuration.getStateRetrievalTimeout(), TimeUnit.MILLISECONDS)) throw new TimeoutException("Could not obtain processing lock"); return; } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } } } public void unlockProcessingLock() { coordinationLock.readLock().unlock(); } public void lockSuspendProcessingLock() throws InterruptedException { while (true) { try { if (!coordinationLock.writeLock().tryLock(configuration.getStateRetrievalTimeout(), TimeUnit.MILLISECONDS)) throw new TimeoutException("Could not obtain processing lock"); return; } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } } } public void unlockSuspendProcessingLock() { if (coordinationLock.isWriteLockedByCurrentThread()) { coordinationLock.writeLock().unlock(); } } @Override public void waitForFlushCompletion(long timeout) throws InterruptedException { while (true) { try { if (!flushBlockGate.await(timeout, TimeUnit.MILLISECONDS)) throw new TimeoutException("State retrieval timed out waiting for flush to unblock. (timeout = " + CachePrinter.prettyPrint(timeout) + ")"); return; } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } } } @Override public void waitForFlushStart(long timeout) throws InterruptedException { while (true) { try { if (!flushWaitGate.await(timeout, TimeUnit.MILLISECONDS)) throw new TimeoutException("State retrieval timed out waiting for flush to block. (timeout = " + CachePrinter.prettyPrint(timeout) + ")"); return; } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } } } } // ------------ START: Lifecycle methods ------------ @Start(priority = 15) public void start() { switch (configuration.getCacheMode()) { case LOCAL: if (log.isDebugEnabled()) log.debug("cache mode is local, will not create the channel"); isInLocalMode = true; isUsingBuddyReplication = false; break; case REPL_SYNC: case REPL_ASYNC: case INVALIDATION_ASYNC: case INVALIDATION_SYNC: isInLocalMode = false; isUsingBuddyReplication = configuration.getBuddyReplicationConfig() != null && configuration.getBuddyReplicationConfig().isEnabled(); if (log.isDebugEnabled()) log.debug("Cache mode is " + configuration.getCacheMode()); boolean fetchState = shouldFetchStateOnStartup(); boolean nonBlocking = configuration.isNonBlockingStateTransfer(); sanityCheckConfiguration(nonBlocking, fetchState); this.flushTracker = nonBlocking ? new NonBlockingFlushTracker() : new StandardFlushTracker(); initialiseChannelAndRpcDispatcher(fetchState && !nonBlocking, nonBlocking); if (!fetchState || nonBlocking) { try { // Allow commands to be ACKed during state transfer if (nonBlocking) { componentRegistry.setStatusCheckNecessary(false); } channel.connect(configuration.getClusterName()); if (log.isInfoEnabled()) log.info("Cache local address is " + getLocalAddress()); } catch (ChannelException e) { throw new CacheException("Unable to connect to JGroups channel", e); } if (!fetchState) { return; } } long start = System.currentTimeMillis(); if (nonBlocking) { startNonBlockStateTransfer(getMembers()); } else { try { channel.connect(configuration.getClusterName(), null, null, configuration.getStateRetrievalTimeout()); if (log.isInfoEnabled()) log.info("Cache local address is " + getLocalAddress()); if (getMembers().size() > 1) messageListener.waitForState(); } catch (ChannelException e) { throw new CacheException("Unable to connect to JGroups channel", e); } catch (Exception ex) { // make sure we disconnect from the channel before we throw this exception! // JBCACHE-761 disconnect(); throw new CacheException("Unable to fetch state on startup", ex); } } if (log.isInfoEnabled()) { log.info("state was retrieved successfully (in " + CachePrinter.prettyPrint((System.currentTimeMillis() - start)) + ")"); } } } private void sanityCheckJGroupsStack(JChannel channel) { if (channel.getProtocolStack().findProtocol(STREAMING_STATE_TRANSFER.class) == null) { throw new ConfigurationException("JGroups channel does not use STREAMING_STATE_TRANSFER! This is a requirement for non-blocking state transfer. Either make sure your JGroups configuration uses STREAMING_STATE_TRANSFER or disable non-blocking state transfer."); } } private void sanityCheckConfiguration(boolean nonBlockingStateTransfer, boolean fetchStateOnStart) { if (isInLocalMode || !nonBlockingStateTransfer || !fetchStateOnStart) return; // don't care about these cases! if (configuration.getNodeLockingScheme() != NodeLockingScheme.MVCC) { throw new ConfigurationException("Non-blocking state transfer is only supported with the MVCC node locking scheme. Please change your node locking scheme to MVCC or disable non-blocking state transfer."); } if (isUsingBuddyReplication) { throw new ConfigurationException("Non-blocking state transfer cannot be used with buddy replication at this time. Please disable either buddy replication or non-blocking state transfer."); } } private void startNonBlockStateTransfer(List members) { if (members.size() < 2) { if (log.isInfoEnabled()) log.info("Not retrieving state since cluster size is " + members.size()); return; } boolean success = false; int numRetries = 5; int initwait = (1 + new Random().nextInt(10)) * 100; int waitIncreaseFactor = 2; outer: for (int i = 0, wait = initwait; i < numRetries; i++) { for (Address member : members) { if (member.equals(getLocalAddress())) { continue; } try { if (log.isInfoEnabled()) log.info("Trying to fetch state from: " + member); if (getState(null, member)) { messageListener.waitForState(); success = true; break outer; } } catch (Exception e) { if (log.isDebugEnabled()) log.debug("Error while fetching state", e); } } if (!success) { wait *= waitIncreaseFactor; if (log.isWarnEnabled()) { log.warn("Could not find available peer for state, backing off and retrying after " + wait + " millis. Retries left: " + (numRetries - 1 - i)); } try { Thread.sleep(wait); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } if (!success) { disconnect(); throw new CacheException("Unable to fetch state on startup"); } componentRegistry.setStatusCheckNecessary(true); } public void disconnect() { if (channel != null && channel.isOpen()) { if (log.isInfoEnabled()) log.info("Disconnecting and closing the Channel"); channel.disconnect(); channel.close(); } } @Stop(priority = 8) public void stop() { try { disconnect(); } catch (Exception toLog) { log.error("Problem closing channel; setting it to null", toLog); } channel = null; configuration.getRuntimeConfig().setChannel(null); if (rpcDispatcher != null) { if (log.isInfoEnabled()) log.info("Stopping the RpcDispatcher"); rpcDispatcher.stop(); } if (members != null) members = null; coordinator = false; rpcDispatcher = null; } /** * @return true if we need to fetch state on startup. I.e., initiate a state transfer. */ private boolean shouldFetchStateOnStartup() { boolean loaderFetch = configuration.getCacheLoaderConfig() != null && configuration.getCacheLoaderConfig().isFetchPersistentState(); return !configuration.isInactiveOnStartup() && !isUsingBuddyReplication && (configuration.isFetchInMemoryState() || loaderFetch); } @SuppressWarnings("deprecation") private void initialiseChannelAndRpcDispatcher(boolean fetchStateWithoutNBST, boolean nbst) throws CacheException { channel = configuration.getRuntimeConfig().getChannel(); if (channel == null) { // Try to create a multiplexer channel channel = getMultiplexerChannel(); if (channel != null) { ReflectionUtil.setValue(configuration, "accessible", true); configuration.setUsingMultiplexer(true); if (log.isDebugEnabled()) { log.debug("Created Multiplexer Channel for cache cluster " + configuration.getClusterName() + " using stack " + configuration.getMultiplexerStack()); } } else { try { if (configuration.getJGroupsConfigFile() != null) { URL u = configuration.getJGroupsConfigFile(); if (trace) log.trace("Grabbing cluster properties from " + u); channel = new JChannel(u); } else if (configuration.getClusterConfig() == null) { if (log.isDebugEnabled()) log.debug("setting cluster properties to default value"); channel = new JChannel(configuration.getDefaultClusterConfig()); } else { if (trace) log.trace("Cache cluster properties: " + configuration.getClusterConfig()); channel = new JChannel(configuration.getClusterConfig()); } } catch (ChannelException e) { throw new CacheException(e); } } configuration.getRuntimeConfig().setChannel(channel); } if (nbst) sanityCheckJGroupsStack((JChannel) channel); // Channel.LOCAL *must* be set to false so we don't see our own messages - otherwise invalidations targeted at // remote instances will be received by self. channel.setOpt(Channel.LOCAL, false); channel.setOpt(Channel.AUTO_RECONNECT, true); channel.setOpt(Channel.AUTO_GETSTATE, fetchStateWithoutNBST); channel.setOpt(Channel.BLOCK, true); if (configuration.isUseRegionBasedMarshalling()) { rpcDispatcher = new InactiveRegionAwareRpcDispatcher(channel, messageListener, new MembershipListenerAdaptor(), spi, invocationContextContainer, interceptorChain, componentRegistry, this); } else { rpcDispatcher = new CommandAwareRpcDispatcher(channel, messageListener, new MembershipListenerAdaptor(), invocationContextContainer, invocationContextContainer, interceptorChain, componentRegistry, this); } checkAppropriateConfig(); rpcDispatcher.setRequestMarshaller(marshaller); rpcDispatcher.setResponseMarshaller(marshaller); } public Channel getChannel() { return channel; } private JChannel getMultiplexerChannel() throws CacheException { String stackName = configuration.getMultiplexerStack(); RuntimeConfig rtc = configuration.getRuntimeConfig(); ChannelFactory channelFactory = rtc.getMuxChannelFactory(); JChannel muxchannel = null; if (channelFactory != null) { try { muxchannel = (JChannel) channelFactory.createMultiplexerChannel(stackName, configuration.getClusterName()); } catch (Exception e) { throw new CacheException("Failed to create multiplexed channel using stack " + stackName, e); } } return muxchannel; } @Deprecated private void removeLocksForDeadMembers(NodeSPI node, List deadMembers) { Set