/* * JBoss, Home of Professional Open Source. * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors * as indicated by the @author tags. See the copyright.txt file in the * distribution for a full listing of individual contributors. * * This is free software; you can redistribute it and/or modify it * under the terms of the GNU Lesser General Public License as * published by the Free Software Foundation; either version 2.1 of * the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this software; if not, write to the Free * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA * 02110-1301 USA, or see the FSF site: http://www.fsf.org. */ package org.jboss.cache.marshall; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.jboss.cache.Fqn; import org.jboss.cache.factories.ComponentRegistry; import org.jboss.cache.factories.annotations.Inject; import org.jboss.cache.factories.annotations.Start; import org.jboss.cache.io.ByteBuffer; import org.jboss.cache.io.ExposedByteArrayOutputStream; import org.jboss.cache.util.Util; import org.jboss.util.stream.MarshalledValueInputStream; import java.io.ByteArrayInputStream; import java.io.InputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.util.HashMap; import java.util.Map; import java.util.StringTokenizer; /** * A delegate to various other marshallers like {@link org.jboss.cache.marshall.CacheMarshaller200}. * This delegating marshaller adds versioning information to the stream when marshalling objects and * is able to pick the appropriate marshaller to delegate to based on the versioning information when * unmarshalling objects. * * @author Manik Surtani (manik AT jboss DOT org) * @author Galder Zamarreno */ public class VersionAwareMarshaller extends AbstractMarshaller { private static final Log log = LogFactory.getLog(VersionAwareMarshaller.class); private static final int VERSION_200 = 20; private static final int VERSION_210 = 21; private static final int VERSION_220 = 22; private static final int VERSION_300 = 30; private static final int VERSION_310 = 31; private static final int CUSTOM_MARSHALLER = 999; private ComponentRegistry componentRegistry; Marshaller defaultMarshaller; final Map marshallers = new HashMap(); private int versionInt; @Inject void injectComponents(ComponentRegistry componentRegistry) { this.componentRegistry = componentRegistry; } @Start public void initReplicationVersions() { String replVersionString = configuration.getReplVersionString(); // this will cause the correct marshaller to be created and put in the map of marshallers defaultMarshaller = configuration.getMarshaller(); if (defaultMarshaller == null) { String marshallerClass = configuration.getMarshallerClass(); if (marshallerClass != null) { if (trace) { log.trace("Cache marshaller implementation specified as " + marshallerClass + ". Overriding any version strings passed in. "); } try { defaultMarshaller = (Marshaller) Util.loadClass(marshallerClass).newInstance(); } catch (Exception e) { log.warn("Unable to instantiate marshaller of class " + marshallerClass, e); log.warn("Falling back to using the default marshaller for version string " + replVersionString); } } } if (defaultMarshaller == null) { // "Rounds down" the replication version passed in to the MINOR version. // E.g., 1.4.1.SP3 -> 1.4.0 versionInt = toMinorVersionInt(replVersionString); this.defaultMarshaller = getMarshaller(versionInt); } else { if (log.isDebugEnabled()) log.debug("Using the marshaller passed in - " + defaultMarshaller); versionInt = getCustomMarshallerVersionInt(); marshallers.put(versionInt, defaultMarshaller); } if (log.isDebugEnabled()) { log.debug("Started with version " + replVersionString + " and versionInt " + versionInt); log.debug("Using default marshaller class " + this.defaultMarshaller.getClass()); } } protected int getCustomMarshallerVersionInt() { if (defaultMarshaller.getClass().equals(CacheMarshaller210.class)) return VERSION_210; if (defaultMarshaller.getClass().equals(CacheMarshaller200.class)) return VERSION_200; return CUSTOM_MARSHALLER; } /** * Converts versions to known compatible version ids. *

* Typical return values: *

* < 1.4.0 = "1" * 1.4.x = "14" * 1.5.x = "15" * 2.0.x = "20" * 2.1.x = "21" *

* etc. * * @param version * @return a version integer */ private int toMinorVersionInt(String version) { try { StringTokenizer strtok = new StringTokenizer(version, "."); // major, minor, micro, patch String[] versionComponents = {null, null, null, null}; int i = 0; while (strtok.hasMoreTokens()) { versionComponents[i++] = strtok.nextToken(); } int major = Integer.parseInt(versionComponents[0]); int minor = Integer.parseInt(versionComponents[1]); return (major > 1 || minor > 3) ? (10 * major) + minor : 1; } catch (Exception e) { throw new IllegalArgumentException("Unsupported replication version string " + version); } } @Override public ByteBuffer objectToBuffer(Object obj) throws Exception { ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(128); ObjectOutputStream out = new ObjectOutputStream(baos); out.writeShort(versionInt); if (trace) log.trace("Wrote version " + versionInt); //now marshall the contents of the object defaultMarshaller.objectToObjectStream(obj, out); out.close(); // and return bytes. return new ByteBuffer(baos.getRawBuffer(), 0, baos.size()); } @Override public Object objectFromByteBuffer(byte[] bytes, int offset, int len) throws Exception { Marshaller marshaller; int versionId; ObjectInputStream in = new MarshalledValueInputStream(new ByteArrayInputStream(bytes, offset, len)); try { versionId = in.readShort(); if (trace) log.trace("Read version " + versionId); } catch (Exception e) { log.error("Unable to read version id from first two bytes of stream, barfing."); throw e; } marshaller = getMarshaller(versionId); return marshaller.objectFromObjectStream(in); } @Override public RegionalizedMethodCall regionalizedMethodCallFromByteBuffer(byte[] buf) throws Exception { Marshaller marshaller; int versionId; ObjectInputStream in = new MarshalledValueInputStream(new ByteArrayInputStream(buf)); try { versionId = in.readShort(); if (trace) log.trace("Read version " + versionId); } catch (Exception e) { log.error("Unable to read version id from first two bytes of stream, barfing."); throw e; } marshaller = getMarshaller(versionId); return marshaller.regionalizedMethodCallFromObjectStream(in); } @Override public Object objectFromStream(InputStream is) throws Exception { if (is instanceof ByteArrayInputStream) { int avbl = is.available(); byte[] bytes = new byte[avbl]; is.read(bytes, 0, avbl); return objectFromByteBuffer(bytes); } else { // actually attempt to "stream" this stuff. We need to revert to an old-fashioned Object Input Stream since // we don't have a reusable implementation for non-byte-backed streams as yet. short versionId; Marshaller marshaller; ObjectInputStream in = new MarshalledValueInputStream(is); try { versionId = in.readShort(); if (trace) log.trace("Read version " + versionId); } catch (Exception e) { log.error("Unable to read version id from first two bytes of stream, barfing."); throw e; } marshaller = getMarshaller(versionId); return marshaller.objectFromObjectStream(in); } } public void objectToObjectStream(Object obj, ObjectOutputStream out, Fqn region) throws Exception { out.writeShort(versionInt); if (trace) log.trace("Wrote version " + versionInt); defaultMarshaller.objectToObjectStream(obj, out, region); } /** * Lazily instantiates and loads the relevant marshaller for a given version. * * @param versionId * @return appropriate marshaller for the version requested. */ Marshaller getMarshaller(int versionId) { Marshaller marshaller; AbstractMarshaller am; boolean knownVersion = false; switch (versionId) { case VERSION_200: marshaller = marshallers.get(VERSION_200); if (marshaller == null) { am = new CacheMarshaller200(); marshaller = am; componentRegistry.wireDependencies(am); am.init(); marshallers.put(VERSION_200, marshaller); } break; case VERSION_220: case VERSION_210: marshaller = marshallers.get(VERSION_210); if (marshaller == null) { am = new CacheMarshaller210(); marshaller = am; componentRegistry.wireDependencies(am); am.init(); marshallers.put(VERSION_210, marshaller); } break; case VERSION_310: case VERSION_300: knownVersion = true; default: if (!knownVersion && log.isWarnEnabled()) { log.warn("Unknown replication version [" + versionId + "]. Falling back to the default marshaller installed."); } marshaller = marshallers.get(VERSION_300); if (marshaller == null) { am = new CacheMarshaller300(); marshaller = am; componentRegistry.wireDependencies(am); am.init(); marshallers.put(VERSION_300, marshaller); } break; } return marshaller; } public void objectToObjectStream(Object obj, ObjectOutputStream out) throws Exception { out.writeShort(versionInt); if (trace) log.trace("Wrote version " + versionInt); defaultMarshaller.objectToObjectStream(obj, out); } public Object objectFromObjectStream(ObjectInputStream in) throws Exception { Marshaller marshaller; int versionId; try { versionId = in.readShort(); if (trace) log.trace("Read version " + versionId); } catch (Exception e) { log.error("Unable to read version id from first two bytes of stream, barfing."); throw e; } marshaller = getMarshaller(versionId); return marshaller.objectFromObjectStream(in); } }