/* * Copyright (c) 2002, 2018, Oracle and/or its affiliates. All rights reserved. * * This program is free software; you can redistribute it and/or modify it under * the terms of the GNU General Public License, version 2.0, as published by the * Free Software Foundation. * * This program is also distributed with certain software (including but not * limited to OpenSSL) that is licensed under separate terms, as designated in a * particular file or component or in included license documentation. The * authors of MySQL hereby grant you an additional permission to link the * program and your derivative works with the separately licensed software that * they have included with MySQL. * * Without limiting anything contained in the foregoing, this file, which is * part of MySQL Connector/J, is also subject to the Universal FOSS Exception, * version 1.0, a copy of which can be found at * http://oss.oracle.com/licenses/universal-foss-exception. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS * FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0, * for more details. * * You should have received a copy of the GNU General Public License along with * this program; if not, write to the Free Software Foundation, Inc., * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ package com.mysql.cj.protocol; import java.io.IOException; import java.io.InputStream; import java.util.Arrays; import com.mysql.cj.log.Log; /** * A non-blocking buffered input stream. Reads more if it can, won't block to fill the buffer, only blocks to satisfy a request of read(byte[]) */ public class ReadAheadInputStream extends InputStream { private final static int DEFAULT_BUFFER_SIZE = 4096; private InputStream underlyingStream; private byte buf[]; protected int endOfCurrentData; protected int currentPosition; protected boolean doDebug = false; protected Log log; private void fill(int readAtLeastTheseManyBytes) throws IOException { checkClosed(); this.currentPosition = 0; /* no mark: throw away the buffer */ this.endOfCurrentData = this.currentPosition; // Read at least as many bytes as the caller wants, but don't block to fill the whole buffer (like java.io.BufferdInputStream does) int bytesToRead = Math.min(this.buf.length - this.currentPosition, readAtLeastTheseManyBytes); int bytesAvailable = this.underlyingStream.available(); if (bytesAvailable > bytesToRead) { // Great, there's more available, let's grab those bytes too! (read-ahead) bytesToRead = Math.min(this.buf.length - this.currentPosition, bytesAvailable); } if (this.doDebug) { StringBuilder debugBuf = new StringBuilder(); debugBuf.append(" ReadAheadInputStream.fill("); debugBuf.append(readAtLeastTheseManyBytes); debugBuf.append("), buffer_size="); debugBuf.append(this.buf.length); debugBuf.append(", current_position="); debugBuf.append(this.currentPosition); debugBuf.append(", need to read "); debugBuf.append(Math.min(this.buf.length - this.currentPosition, readAtLeastTheseManyBytes)); debugBuf.append(" bytes to fill request,"); if (bytesAvailable > 0) { debugBuf.append(" underlying InputStream reports "); debugBuf.append(bytesAvailable); debugBuf.append(" total bytes available,"); } debugBuf.append(" attempting to read "); debugBuf.append(bytesToRead); debugBuf.append(" bytes."); if (this.log != null) { this.log.logTrace(debugBuf.toString()); } else { System.err.println(debugBuf.toString()); } } int n = this.underlyingStream.read(this.buf, this.currentPosition, bytesToRead); if (n > 0) { this.endOfCurrentData = n + this.currentPosition; } } private int readFromUnderlyingStreamIfNecessary(byte[] b, int off, int len) throws IOException { checkClosed(); int avail = this.endOfCurrentData - this.currentPosition; if (this.doDebug) { StringBuilder debugBuf = new StringBuilder(); debugBuf.append("ReadAheadInputStream.readIfNecessary("); debugBuf.append(Arrays.toString(b)); debugBuf.append(","); debugBuf.append(off); debugBuf.append(","); debugBuf.append(len); debugBuf.append(")"); if (avail <= 0) { debugBuf.append(" not all data available in buffer, must read from stream"); if (len >= this.buf.length) { debugBuf.append(", amount requested > buffer, returning direct read() from stream"); } } if (this.log != null) { this.log.logTrace(debugBuf.toString()); } else { System.err.println(debugBuf.toString()); } } if (avail <= 0) { if (len >= this.buf.length) { return this.underlyingStream.read(b, off, len); } fill(len); avail = this.endOfCurrentData - this.currentPosition; if (avail <= 0) { return -1; } } int bytesActuallyRead = (avail < len) ? avail : len; System.arraycopy(this.buf, this.currentPosition, b, off, bytesActuallyRead); this.currentPosition += bytesActuallyRead; return bytesActuallyRead; } @Override public synchronized int read(byte b[], int off, int len) throws IOException { checkClosed(); // Check for closed stream if ((off | len | (off + len) | (b.length - (off + len))) < 0) { throw new IndexOutOfBoundsException(); } else if (len == 0) { return 0; } int totalBytesRead = 0; while (true) { int bytesReadThisRound = readFromUnderlyingStreamIfNecessary(b, off + totalBytesRead, len - totalBytesRead); // end-of-stream? if (bytesReadThisRound <= 0) { if (totalBytesRead == 0) { totalBytesRead = bytesReadThisRound; } break; } totalBytesRead += bytesReadThisRound; // Read _at_least_ enough bytes if (totalBytesRead >= len) { break; } // Nothing to read? if (this.underlyingStream.available() <= 0) { break; } } return totalBytesRead; } @Override public int read() throws IOException { checkClosed(); if (this.currentPosition >= this.endOfCurrentData) { fill(1); if (this.currentPosition >= this.endOfCurrentData) { return -1; } } return this.buf[this.currentPosition++] & 0xff; } @Override public int available() throws IOException { checkClosed(); return this.underlyingStream.available() + (this.endOfCurrentData - this.currentPosition); } private void checkClosed() throws IOException { if (this.buf == null) { throw new IOException("Stream closed"); } } public ReadAheadInputStream(InputStream toBuffer, boolean debug, Log logTo) { this(toBuffer, DEFAULT_BUFFER_SIZE, debug, logTo); } public ReadAheadInputStream(InputStream toBuffer, int bufferSize, boolean debug, Log logTo) { this.underlyingStream = toBuffer; this.buf = new byte[bufferSize]; this.doDebug = debug; this.log = logTo; } @Override public void close() throws IOException { if (this.underlyingStream != null) { try { this.underlyingStream.close(); } finally { this.underlyingStream = null; this.buf = null; this.log = null; } } } @Override public boolean markSupported() { return false; } @Override public long skip(long n) throws IOException { checkClosed(); if (n <= 0) { return 0; } long bytesAvailInBuffer = this.endOfCurrentData - this.currentPosition; if (bytesAvailInBuffer <= 0) { fill((int) n); bytesAvailInBuffer = this.endOfCurrentData - this.currentPosition; if (bytesAvailInBuffer <= 0) { return 0; } } long bytesSkipped = (bytesAvailInBuffer < n) ? bytesAvailInBuffer : n; this.currentPosition += bytesSkipped; return bytesSkipped; } }