/* * Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved. * * This program is free software; you can redistribute it and/or modify it under * the terms of the GNU General Public License, version 2.0, as published by the * Free Software Foundation. * * This program is also distributed with certain software (including but not * limited to OpenSSL) that is licensed under separate terms, as designated in a * particular file or component or in included license documentation. The * authors of MySQL hereby grant you an additional permission to link the * program and your derivative works with the separately licensed software that * they have included with MySQL. * * Without limiting anything contained in the foregoing, this file, which is * part of MySQL Connector/J, is also subject to the Universal FOSS Exception, * version 1.0, a copy of which can be found at * http://oss.oracle.com/licenses/universal-foss-exception. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS * FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0, * for more details. * * You should have received a copy of the GNU General Public License along with * this program; if not, write to the Free Software Foundation, Inc., * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ package com.mysql.cj.protocol; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.nio.channels.ReadPendingException; import java.nio.channels.WritePendingException; import java.util.LinkedList; import java.util.Objects; import java.util.Queue; import java.util.concurrent.TimeUnit; /** * A layer over {@link AsynchronousSocketChannel} that serializes all incoming write requests. This means we queue any incoming buffer and don't begin writing * it until the previous buffer has been written fully. All buffers are transmitted atomically with respect to the caller/callback. */ public class SerializingBufferWriter implements CompletionHandler { // TODO make WRITES_AT_ONCE configurable private static int WRITES_AT_ONCE = 200; // Empirical value. Helps improving i/o rate for large number of concurrent asynchronous requests protected AsynchronousSocketChannel channel; /** * Maintain a queue of pending writes. */ private Queue pendingWrites = new LinkedList<>(); /** * Keeps the link between ByteBuffer to be written and the CompletionHandler * object to be invoked for this one write operation. */ private static class ByteBufferWrapper { private ByteBuffer buffer; private CompletionHandler handler = null; ByteBufferWrapper(ByteBuffer buffer, CompletionHandler completionHandler) { this.buffer = buffer; this.handler = completionHandler; } public ByteBuffer getBuffer() { return this.buffer; } public CompletionHandler getHandler() { return this.handler; } } public SerializingBufferWriter(AsynchronousSocketChannel channel) { this.channel = channel; } /** * Initiate a write of the current pending buffers. This method can only be called when no other writes are in progress. This method should be called under * a mutex for this.pendingWrites to prevent concurrent writes to the channel. */ private void initiateWrite() { try { // We must limit the number of buffers which may be sent at once with gathering write because of two reasons: // 1. Operating systems impose a limit on the number of buffers that may be used in an I/O operation, for example default Linux kernel value is 1024. // When the number of buffers exceeds this limit, then the I/O operation is performed with the maximum number of buffers allowed by the operating system. // That slows down the I/O significantly and could even hang it in case of asynchronous I/O when server response can't be read because write operation has drained all available buffers. // 2. With a large number of small asynchronous requests pendingWrites queue is filled much faster than it's freed so that the OS limit can be reached easily. ByteBuffer bufs[] = this.pendingWrites.stream().limit(WRITES_AT_ONCE).map(ByteBufferWrapper::getBuffer).toArray(size -> new ByteBuffer[size]); this.channel.write(bufs, 0, bufs.length, 0L, TimeUnit.MILLISECONDS, null, this); } catch (ReadPendingException | WritePendingException t) { return; } catch (Throwable t) { failed(t, null); } } /** * Queue a buffer to be written to the channel. This method uses a mutex on the buffer list to synchronize for the following cases: *
    *
  • The buffer list becomes empty after we check and miss writing to the channel.
  • *
  • LinkedList is not thread-safe.
  • *
* * @param buf * {@link ByteBuffer} * @param callback * {@link CompletionHandler} */ public void queueBuffer(ByteBuffer buf, CompletionHandler callback) { synchronized (this.pendingWrites) { this.pendingWrites.add(new ByteBufferWrapper(buf, callback)); // if there's no write in progress, we need to initiate a write of this buffer. otherwise the completion of the current write will do it if (this.pendingWrites.size() == 1) { initiateWrite(); } } } /** * Completion handler for channel writes. */ public void completed(Long bytesWritten, Void v) { // collect completed writes to notify after initiating the next write LinkedList> completedWrites = new LinkedList<>(); synchronized (this.pendingWrites) { while (this.pendingWrites.peek() != null && !this.pendingWrites.peek().getBuffer().hasRemaining() && completedWrites.size() < WRITES_AT_ONCE) { completedWrites.add(this.pendingWrites.remove().getHandler()); } // notify handler(s) before initiating write to satisfy ordering guarantees completedWrites.stream().filter(Objects::nonNull).forEach(l -> { // prevent exceptions in handler from blocking other notifications try { l.completed(0L, null); } catch (Throwable ex) { // presumably unexpected, notify so futures don't block try { l.failed(ex, null); } catch (Throwable ex2) { // nothing we can do here ex2.printStackTrace(); // TODO log error normally instead of sysout } } }); if (this.pendingWrites.size() > 0) { initiateWrite(); } } } public void failed(Throwable t, Void v) { // error writing, can't continue try { this.channel.close(); } catch (Exception ex) { } LinkedList> failedWrites = new LinkedList<>(); synchronized (this.pendingWrites) { while (this.pendingWrites.peek() != null) { ByteBufferWrapper bw = this.pendingWrites.remove(); if (bw.getHandler() != null) { failedWrites.add(bw.getHandler()); } } } failedWrites.forEach((CompletionHandler l) -> { try { l.failed(t, null); } catch (Exception ex) { } }); failedWrites.clear(); } /** * Allow overwriting the channel once the writer has been established. Required for SSL/TLS connections when the encryption doesn't start until we send the * capability flag to X Plugin. * * @param channel * {@link AsynchronousSocketChannel} */ public void setChannel(AsynchronousSocketChannel channel) { this.channel = channel; } }