/*
* JBoss, Home of Professional Open Source.
* Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
* as indicated by the @author tags. See the copyright.txt file in the
* distribution for a full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.cache.notifications;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.cache.Cache;
import org.jboss.cache.CacheException;
import org.jboss.cache.CacheSPI;
import org.jboss.cache.Fqn;
import org.jboss.cache.InvocationContext;
import org.jboss.cache.buddyreplication.BuddyGroup;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.factories.annotations.Destroy;
import org.jboss.cache.factories.annotations.Inject;
import org.jboss.cache.factories.annotations.NonVolatile;
import org.jboss.cache.factories.annotations.Start;
import org.jboss.cache.factories.annotations.Stop;
import org.jboss.cache.marshall.MarshalledValueMap;
import org.jboss.cache.notifications.annotation.*;
import org.jboss.cache.notifications.event.*;
import static org.jboss.cache.notifications.event.Event.Type.*;
import org.jboss.cache.util.Immutables;
import org.jboss.cache.util.concurrent.BoundedExecutors;
import org.jboss.cache.util.concurrent.WithinThreadExecutor;
import org.jgroups.View;
import javax.transaction.Transaction;
import java.lang.annotation.Annotation;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Helper class that handles all notifications to registered listeners.
*
* @author Manik Surtani (manik AT jboss DOT org)
* @author Galder Zamarreno
*/
@NonVolatile
public class NotifierImpl implements Notifier
{
private static final Log log = LogFactory.getLog(NotifierImpl.class);
private static final Class emptyMap = Collections.emptyMap().getClass();
private static final Class singletonMap = Collections.singletonMap(null, null).getClass();
private static final Class[] allowedMethodAnnotations =
{
CacheStarted.class, CacheStopped.class, CacheBlocked.class, CacheUnblocked.class, NodeCreated.class, NodeRemoved.class, NodeVisited.class, NodeModified.class, NodeMoved.class,
NodeActivated.class, NodePassivated.class, NodeLoaded.class, NodeEvicted.class, TransactionRegistered.class, TransactionCompleted.class, ViewChanged.class, BuddyGroupChanged.class,
NodeInvalidated.class
};
private static final Class[] parameterTypes =
{
CacheStartedEvent.class, CacheStoppedEvent.class, CacheBlockedEvent.class, CacheUnblockedEvent.class, NodeCreatedEvent.class, NodeRemovedEvent.class, NodeVisitedEvent.class, NodeModifiedEvent.class, NodeMovedEvent.class,
NodeActivatedEvent.class, NodePassivatedEvent.class, NodeLoadedEvent.class, NodeEvictedEvent.class, TransactionRegisteredEvent.class, TransactionCompletedEvent.class, ViewChangedEvent.class, BuddyGroupChangedEvent.class,
NodeInvalidatedEvent.class
};
final Map, List> listenersMap = new HashMap, List>(32);
final List cacheStartedListeners = new CopyOnWriteArrayList();
final List cacheStoppedListeners = new CopyOnWriteArrayList();
final List cacheBlockedListeners = new CopyOnWriteArrayList();
final List cacheUnblockedListeners = new CopyOnWriteArrayList();
final List nodeCreatedListeners = new CopyOnWriteArrayList();
final List nodeRemovedListeners = new CopyOnWriteArrayList();
final List nodeVisitedListeners = new CopyOnWriteArrayList();
final List nodeModifiedListeners = new CopyOnWriteArrayList();
final List nodeMovedListeners = new CopyOnWriteArrayList();
final List nodeActivatedListeners = new CopyOnWriteArrayList();
final List nodePassivatedListeners = new CopyOnWriteArrayList();
final List nodeLoadedListeners = new CopyOnWriteArrayList();
final List nodeInvalidatedListeners = new CopyOnWriteArrayList();
final List nodeEvictedListeners = new CopyOnWriteArrayList();
final List transactionRegisteredListeners = new CopyOnWriteArrayList();
final List transactionCompletedListeners = new CopyOnWriteArrayList();
final List viewChangedListeners = new CopyOnWriteArrayList();
final List buddyGroupChangedListeners = new CopyOnWriteArrayList();
// final Map> listenerInvocations = new ConcurrentHashMap>();
private Cache cache;
private boolean useMarshalledValueMaps;
private Configuration config;
// two separate executor services, one for sync and one for async listeners
private ExecutorService syncProcessor;
private ExecutorService asyncProcessor;
private static final AtomicInteger asyncNotifierThreadNumber = new AtomicInteger(0);
public NotifierImpl()
{
listenersMap.put(CacheStarted.class, cacheStartedListeners);
listenersMap.put(CacheStopped.class, cacheStoppedListeners);
listenersMap.put(CacheBlocked.class, cacheBlockedListeners);
listenersMap.put(CacheUnblocked.class, cacheUnblockedListeners);
listenersMap.put(NodeCreated.class, nodeCreatedListeners);
listenersMap.put(NodeRemoved.class, nodeRemovedListeners);
listenersMap.put(NodeVisited.class, nodeVisitedListeners);
listenersMap.put(NodeModified.class, nodeModifiedListeners);
listenersMap.put(NodeMoved.class, nodeMovedListeners);
listenersMap.put(NodeActivated.class, nodeActivatedListeners);
listenersMap.put(NodePassivated.class, nodePassivatedListeners);
listenersMap.put(NodeLoaded.class, nodeLoadedListeners);
listenersMap.put(NodeEvicted.class, nodeEvictedListeners);
listenersMap.put(TransactionRegistered.class, transactionRegisteredListeners);
listenersMap.put(TransactionCompleted.class, transactionCompletedListeners);
listenersMap.put(ViewChanged.class, viewChangedListeners);
listenersMap.put(BuddyGroupChanged.class, buddyGroupChangedListeners);
listenersMap.put(NodeInvalidated.class, nodeInvalidatedListeners);
}
@Inject
void injectDependencies(CacheSPI cache, Configuration config)
{
this.cache = cache;
this.config = config;
}
@Stop
void stop()
{
if (syncProcessor != null) syncProcessor.shutdownNow();
if (asyncProcessor != null) asyncProcessor.shutdownNow();
}
@Destroy
void destroy()
{
removeAllCacheListeners();
}
@Start
void start()
{
useMarshalledValueMaps = config.isUseLazyDeserialization();
syncProcessor = new WithinThreadExecutor();
// first try and use an injected executor for async listeners
if ((asyncProcessor = config.getRuntimeConfig().getAsyncCacheListenerExecutor()) == null)
{
// create one if needed
if (config.getListenerAsyncPoolSize() > 0)
{
asyncProcessor = BoundedExecutors.newFixedThreadPool(config.getListenerAsyncPoolSize(), new ThreadFactory()
{
public Thread newThread(Runnable r)
{
return new Thread(r, "AsyncNotifier-" + asyncNotifierThreadNumber.getAndIncrement());
}
}, config.getListenerAsyncQueueSize());
}
else
{
// use the same sync executor
asyncProcessor = syncProcessor;
}
}
}
/**
* Loops through all valid methods on the object passed in, and caches the relevant methods as {@link NotifierImpl.ListenerInvocation}
* for invocation by reflection.
*
* @param listener object to be considered as a listener.
*/
@SuppressWarnings("unchecked")
private void validateAndAddListenerInvocation(Object listener)
{
boolean sync = testListenerClassValidity(listener.getClass());
boolean foundMethods = false;
// now try all methods on the listener for anything that we like. Note that only PUBLIC methods are scanned.
for (Method m : listener.getClass().getMethods())
{
// loop through all valid method annotations
for (int i = 0; i < allowedMethodAnnotations.length; i++)
{
if (m.isAnnotationPresent(allowedMethodAnnotations[i]))
{
testListenerMethodValidity(m, parameterTypes[i], allowedMethodAnnotations[i].getName());
addListenerInvocation(allowedMethodAnnotations[i], new ListenerInvocation(listener, m, sync));
foundMethods = true;
}
}
}
if (!foundMethods && log.isWarnEnabled())
log.warn("Attempted to register listener of class " + listener.getClass() + ", but no valid, public methods annotated with method-level event annotations found! Ignoring listener.");
}
/**
* Tests if a class is properly annotated as a CacheListener and returns whether callbacks on this class should be invoked
* synchronously or asynchronously.
*
* @param listenerClass class to inspect
* @return true if callbacks on this class should use the syncProcessor; false if it should use the asyncProcessor.
*/
private static boolean testListenerClassValidity(Class> listenerClass)
{
CacheListener cl = listenerClass.getAnnotation(CacheListener.class);
if (cl == null)
throw new IncorrectCacheListenerException("Cache listener class MUST be annotated with org.jboss.cache.notifications.annotation.CacheListener");
if (!Modifier.isPublic(listenerClass.getModifiers()))
throw new IncorrectCacheListenerException("Cache listener class MUST be public!");
return cl.sync();
}
private static void testListenerMethodValidity(Method m, Class allowedParameter, String annotationName)
{
if (m.getParameterTypes().length != 1 || !m.getParameterTypes()[0].isAssignableFrom(allowedParameter))
throw new IncorrectCacheListenerException("Methods annotated with " + annotationName + " must accept exactly one parameter, of assignable from type " + allowedParameter.getName());
if (!m.getReturnType().equals(void.class))
throw new IncorrectCacheListenerException("Methods annotated with " + annotationName + " should have a return type of void.");
}
private void addListenerInvocation(Class annotation, ListenerInvocation li)
{
List result = getListenerCollectionForAnnotation(annotation);
result.add(li);
}
public void addCacheListener(Object listener)
{
validateAndAddListenerInvocation(listener);
}
public void removeCacheListener(Object listener)
{
for (Class annotation : allowedMethodAnnotations) removeListenerInvocation(annotation, listener);
}
private void removeListenerInvocation(Class annotation, Object listener)
{
if (listener == null) return;
List l = getListenerCollectionForAnnotation(annotation);
Set