/* * JBoss, Home of Professional Open Source. * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors * as indicated by the @author tags. See the copyright.txt file in the * distribution for a full listing of individual contributors. * * This is free software; you can redistribute it and/or modify it * under the terms of the GNU Lesser General Public License as * published by the Free Software Foundation; either version 2.1 of * the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this software; if not, write to the Free * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA * 02110-1301 USA, or see the FSF site: http://www.fsf.org. */ package org.jboss.cache.marshall; import org.jboss.cache.InvocationContext; import org.jboss.cache.RPCManager; import org.jboss.cache.RPCManagerImpl.FlushTracker; import org.jboss.cache.commands.ReplicableCommand; import org.jboss.cache.commands.VisitableCommand; import org.jboss.cache.commands.remote.AnnounceBuddyPoolNameCommand; import org.jboss.cache.commands.remote.AssignToBuddyGroupCommand; import org.jboss.cache.commands.remote.RemoveFromBuddyGroupCommand; import org.jboss.cache.commands.remote.StateTransferControlCommand; import org.jboss.cache.config.Configuration; import org.jboss.cache.factories.ComponentRegistry; import org.jboss.cache.interceptors.InterceptorChain; import org.jboss.cache.invocation.InvocationContextContainer; import org.jboss.cache.util.concurrent.BoundedExecutors; import org.jboss.cache.util.concurrent.WithinThreadExecutor; import org.jgroups.Address; import org.jgroups.Channel; import org.jgroups.MembershipListener; import org.jgroups.Message; import org.jgroups.MessageListener; import org.jgroups.blocks.GroupRequest; import org.jgroups.blocks.RpcDispatcher; import org.jgroups.blocks.RspFilter; import org.jgroups.util.Buffer; import org.jgroups.util.Rsp; import org.jgroups.util.RspList; import java.io.NotSerializableException; import java.util.Map; import java.util.Vector; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * A JGroups RPC dispatcher that knows how to deal with {@link org.jboss.cache.commands.ReplicableCommand}s. * * @author Manik Surtani (manik AT jboss DOT org) * @since 2.2.0 */ public class CommandAwareRpcDispatcher extends RpcDispatcher { protected InvocationContextContainer invocationContextContainer; protected InterceptorChain interceptorChain; protected ComponentRegistry componentRegistry; protected boolean trace; private ExecutorService replicationProcessor; private AtomicInteger replicationProcessorCount; private boolean asyncSerial; private Configuration configuration; private RPCManager rpcManager; private ReplicationObserver replicationObserver; public CommandAwareRpcDispatcher() {} public CommandAwareRpcDispatcher(Channel channel, MessageListener l, MembershipListener l2, Object serverObj, InvocationContextContainer container, InterceptorChain interceptorChain, ComponentRegistry componentRegistry, RPCManager manager) { super(channel, l, l2, serverObj); this.invocationContextContainer = container; this.componentRegistry = componentRegistry; this.interceptorChain = interceptorChain; this.rpcManager = manager; trace = log.isTraceEnabled(); // what sort of a repl processor do we need? Configuration c = componentRegistry.getComponent(Configuration.class); this.configuration = c; replicationProcessor = c.getRuntimeConfig().getAsyncSerializationExecutor(); if (c.getCacheMode().isSynchronous() || (replicationProcessor == null && c.getSerializationExecutorPoolSize() < 1) || requireSyncMarshalling(c)) // if an executor has not been injected and the pool size is set { // in-process thread. Not async. replicationProcessor = new WithinThreadExecutor(); asyncSerial = false; } else { asyncSerial = true; if (replicationProcessor == null) { replicationProcessorCount = new AtomicInteger(0); replicationProcessor = BoundedExecutors.newFixedThreadPool(c.isUseReplQueue() ? 1 : c.getSerializationExecutorPoolSize(), new ThreadFactory() { public Thread newThread(Runnable r) { return new Thread(r, "AsyncReplicationProcessor-" + replicationProcessorCount.incrementAndGet()); } }, c.getSerializationExecutorQueueSize() ); } } } public ReplicationObserver setReplicationObserver(ReplicationObserver replicationObserver) { ReplicationObserver result = this.replicationObserver; this.replicationObserver = replicationObserver; return result; } /** * Serial(sync) marshalling should be enabled for async optimistic caches. That is because optimistic async is a 2PC, * which might cause the Commit command to be send before the Prepare command, so replication will fail. This is not * the same for async pessimistic/mvcc replication, as this uses a 1PC. */ private boolean requireSyncMarshalling(Configuration c) { boolean enforceSerialMarshalling = c.getNodeLockingScheme().equals(Configuration.NodeLockingScheme.OPTIMISTIC) && !c.getCacheMode().isInvalidation(); if (enforceSerialMarshalling) { if (c.getSerializationExecutorPoolSize() > 1 && log.isWarnEnabled()) { log.warn("Async optimistic caches do not support serialization pools."); } if (trace) log.trace("Disbaling serial marshalling for opt async cache"); } return enforceSerialMarshalling; } @Override public void stop() { replicationProcessor.shutdownNow(); try { replicationProcessor.awaitTermination(60, TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } super.stop(); } protected boolean isValid(Message req) { if (server_obj == null) { log.error("no method handler is registered. Discarding request."); return false; } if (req == null || req.getLength() == 0) { log.error("message or message buffer is null"); return false; } return true; } /** * Similar to {@link #callRemoteMethods(java.util.Vector, org.jgroups.blocks.MethodCall, int, long, boolean, boolean, org.jgroups.blocks.RspFilter)} except that this version * is aware of {@link org.jboss.cache.commands.ReplicableCommand} objects. */ public RspList invokeRemoteCommands(Vector
dests, ReplicableCommand command, int mode, long timeout, boolean anycasting, boolean oob, RspFilter filter) throws NotSerializableException, ExecutionException, InterruptedException { if (dests != null && dests.isEmpty()) { // don't send if dest list is empty if (trace) log.trace("Destination list is empty: no need to send message"); return new RspList(); } if (trace) log.trace(new StringBuilder("dests=").append(dests).append(", command=").append(command). append(", mode=").append(mode).append(", timeout=").append(timeout)); boolean supportReplay = configuration.isNonBlockingStateTransfer(); ReplicationTask replicationTask = new ReplicationTask(command, oob, dests, mode, timeout, anycasting, supportReplay, filter); Future