2 * Copyright (c) 2001-2007 Sun Microsystems, Inc. All rights reserved.
4 * The Sun Project JXTA(TM) Software License
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
9 * 1. Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
16 * 3. The end-user documentation included with the redistribution, if any, must
17 * include the following acknowledgment: "This product includes software
18 * developed by Sun Microsystems, Inc. for JXTA(TM) technology."
19 * Alternately, this acknowledgment may appear in the software itself, if
20 * and wherever such third-party acknowledgments normally appear.
22 * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must
23 * not be used to endorse or promote products derived from this software
24 * without prior written permission. For written permission, please contact
25 * Project JXTA at http://www.jxta.org.
27 * 5. Products derived from this software may not be called "JXTA", nor may
28 * "JXTA" appear in their name, without prior written permission of Sun.
30 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
31 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
32 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN
33 * MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
34 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
36 * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
37 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
38 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
39 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41 * JXTA is a registered trademark of Sun Microsystems, Inc. in the United
42 * States and other countries.
44 * Please see the license information page at :
45 * <http://www.jxta.org/project/www/license.html> for instructions on use of
46 * the license in source files.
48 * ====================================================================
50 * This software consists of voluntary contributions made by many individuals
51 * on behalf of Project JXTA. For more information on Project JXTA, please see
52 * http://www.jxta.org.
54 * This license is based on the BSD license adopted by the Apache Foundation.
57 package net.jxta.impl.endpoint.tls;
60 import java.io.OutputStream;
61 import java.io.IOException;
63 import java.util.ArrayList;
64 import java.util.Iterator;
65 import java.util.List;
66 import java.util.Vector;
68 import java.util.logging.Level;
69 import net.jxta.logging.Logging;
70 import java.util.logging.Logger;
72 import net.jxta.endpoint.ByteArrayMessageElement;
73 import net.jxta.endpoint.Message;
74 import net.jxta.endpoint.MessageElement;
75 import net.jxta.endpoint.StringMessageElement;
77 import net.jxta.impl.endpoint.tls.TlsConn.HandshakeState;
78 import net.jxta.impl.util.TimeUtils;
82 * Acts as the output for TLS. Accepts ciphertext from TLS and packages it into
83 * messages for sending to the remote. The messages are kept in a retry queue
84 * until the remote peer acknowledges receipt of the message.
86 class JTlsOutputStream extends OutputStream {
91 private static final Logger LOG = Logger.getLogger(JTlsOutputStream.class.getName());
96 * This maximum is only enforced if we have not heard
97 * from the remote for RETRMAXAGE.
99 private static final int MAXRETRQSIZE = 100;
102 * Initial estimated Round Trip Time
104 private static final long initRTT = 1 * TimeUtils.ASECOND;
106 private static final MessageElement RETELT = new StringMessageElement(JTlsDefs.RETR, "TLSRET", null);
109 * Retrans window. When reached, we up the RTO.
111 private static final int RWINDOW = 5;
114 * If true then the stream has been closed.
116 private volatile boolean closed = false;
119 * If true then the stream is being closed.
120 * It means that it still works completely for all messages already
121 * queued, but no new message may be enqueued.
123 private volatile boolean closing = false;
126 * Sequence number of the message we most recently sent out.
128 private volatile int sequenceNumber = 0;
131 * Sequence number of highest sequential ACK.
133 private volatile int maxACK = 0;
136 * Transport we are working for
138 private TlsTransport tp = null;
141 * connection we are working for
143 private TlsConn conn = null;
145 private Retransmitter retransmitter = null;
147 // for retransmission
150 * Average round trip time in milliseconds.
152 private volatile long aveRTT = initRTT;
155 * Number of ACK message received.
157 private int nACKS = 0;
160 * Retry Time Out measured in milliseconds.
162 private volatile long RTO = 0;
165 * Minimum Retry Timeout measured in milliseconds.
167 private volatile long minRTO = initRTT;
170 * Maximum Retry Timeout measured in milliseconds.
172 private volatile long maxRTO = initRTT * 5;
175 * absolute time in milliseconds of last sequential ACK.
177 private volatile long lastACKTime = 0;
180 * absolute time in milliseconds of last SACK based retransmit.
182 private volatile long sackRetransTime = 0;
185 * The collection of messages available for re-transmission.
187 final List<RetrQElt> retrQ = new Vector<RetrQElt>(25, 5);
189 // running average of receipients Input Queue
190 private int nIQTests = 0;
191 private int aveIQSize = 0;
194 * Our estimation of the current free space in the remote input queue.
196 private volatile int mrrIQFreeSpace = 0;
199 * Our estimation of the maximum sise of the remote input queue.
201 private int rmaxQSize = 0;
204 * retrans queue element
206 private static class RetrQElt {
207 int seqnum; // sequence number of this message.
208 long enqueuedAt; // absolute time of original enqueing.
209 volatile Message msg; // the message
210 int marked; // has been marked as retransmission
211 long sentAt; // when this msg was last transmitted
213 public RetrQElt(int seqnum, Message msg) {
214 this.seqnum = seqnum;
216 this.enqueuedAt = TimeUtils.timeNow();
217 this.sentAt = this.enqueuedAt;
222 JTlsOutputStream(TlsTransport tp, TlsConn conn) {
223 this.conn = conn; // TlsConnection.
224 this.tp = tp; // our transport
226 this.RTO = minRTO; // initial RTO
228 // input free queue size
230 this.mrrIQFreeSpace = rmaxQSize;
232 // Init last ACK Time to now
233 this.lastACKTime = TimeUtils.timeNow();
234 this.sackRetransTime = TimeUtils.timeNow();
236 // Start retransmission thread
237 this.retransmitter = new Retransmitter();
243 * <p/>We don't current support linger.
246 public void close() throws IOException {
247 synchronized (this) {
251 synchronized (retrQ) {
258 * indicate that we're in the process of closing. To respect the semantics
259 * of close()/isClosed(), we do not set the closed flag, yet. Instead, we
260 * set the flag "closing", which simply garantees that no new message
262 * This, in combination with getSequenceNumber and getMaxAck, and
263 * waitQevent, enables fine grain control of the tear down process.
265 public void setClosing() {
266 synchronized (retrQ) {
277 public void write(int c) throws IOException {
278 byte[] a = new byte[1];
280 a[0] = (byte) (c & 0xFF);
287 * <p/>We override the write(byte[], offset, length);
288 * method which is called by SSLRecord.send(SSLConn conn)
289 * via tos.writeTo(conn.sock_out), tos a ByteArrayOutputStream
290 * which has buffered the TLS output record in the byte array.
291 * The actual call is write(byte[] b, 0, length);
293 * <p/>We put this TLS record into a msssage element for the output
294 * pipe to send along.
296 * <p/>This is reasonable since in fact, if more than 16K bytes of
297 * application data are sent, then the max TLS Record is a little
298 * larger than 16K bytes including the TLS overhead.
300 * <p/>Therefore, an app. message is N+r TLS Records,
301 * Message length = Nx16K + r, N >= 0, r >= 0,
302 * N > 0 || r > 0 true.
305 public void write(byte[] b, int off, int len) throws IOException {
306 // flag to allow connection closure in finally block
307 // Connection can not be closed when holding a lock on this
308 boolean closeStale = false;
309 // allocate new message
310 Message jmsg = new Message();
314 throw new IOException("stream is closed");
317 throw new IOException("stream is being closed");
320 throw new IllegalArgumentException("buffer is null");
323 if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
324 throw new IndexOutOfBoundsException();
331 // Copy the data since it will be queued, and caller may
332 // overwrite the same byte[] buffer.
333 byte[] data = new byte[len];
335 System.arraycopy(b, off, data, 0, len);
337 // sync so that writes don't get out of order.
338 synchronized (retrQ) {
339 // add TLS record as element
340 MessageElement ciphertext = new ByteArrayMessageElement(Integer.toString(++sequenceNumber), JTlsDefs.BLOCKS, data
344 jmsg.addMessageElement(JTlsDefs.TLSNameSpace, ciphertext);
346 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
347 LOG.fine("TLS CT WRITE : seqn#" + sequenceNumber + " length=" + len);
350 // (1) See if the most recent remote input queue size is close to
351 // it's maximum input queue size
352 // Send only if at least 20% or more of the queue is free.
353 // (2) Also, if our retransQ is larger than the remotes inputQ,
354 // wait until we've received an ack.
355 // We assume some msgs are in transit or the remote system buffers
356 // We do not want to overrun the receiver.
357 // (3) We need to release from the loop because of possible deadlocks
358 // EG: retrQ.size() == 0 and mrrIQFreeSpace forces looping
359 // forever because the most recent SACK cleared it, and the receiver
360 // is waiting for more data.
363 int maxwait = Math.min((int) aveRTT, 200);
364 // iterations to wait (max 3, min 1)
365 int waitCt = Math.max(maxwait / 60, 1);
367 // check if the queue has gone dead.
368 if (retrQ.size() > 0) {
369 long inQueue = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), retrQ.get(0).enqueuedAt);
371 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
372 LOG.fine("write : Retry queue idle for " + inQueue);
375 if (inQueue > tp.RETRMAXAGE) {
376 if (inQueue > (2 * tp.RETRMAXAGE)) {
377 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
378 LOG.info("Closing stale connection " + conn);
380 // SPT - set flag for connection close in finally block
382 throw new IOException("Stale connection closure in progress");
383 } else if (retrQ.size() >= MAXRETRQSIZE) {
384 // if the the queue is "full" and we are long idle, delay new writes forever.
385 waitCt = Integer.MAX_VALUE;
392 while (!closed && ((mrrIQFreeSpace < rmaxQSize / 5) || (retrQ.size() > rmaxQSize))) {
394 // see if max. wait has arrived.
396 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
397 LOG.fine("write() wait for ACK, maxwait timer expired while enqueuing seqn#" + sequenceNumber);
402 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
403 LOG.fine("write() wait 60ms for ACK while enqueuing seqn#" + sequenceNumber + "\n\tremote IQ free space = "
404 + mrrIQFreeSpace + "\n\tMIN free space to continue = " + (rmaxQSize / 5) + "" + "\n\tretQ.size()="
408 // Less than 20% free queue space is left. Wait.
411 } catch (InterruptedException ignored) {
412 Thread.interrupted();
416 // place copy on retransmission queue
417 RetrQElt r = new RetrQElt(sequenceNumber, jmsg.clone());
421 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
422 LOG.fine("Retrans Enqueue added seqn#" + sequenceNumber + " retQ.size()=" + retrQ.size());
426 // Here we will send the message to the transport
427 conn.sendToRemoteTls(jmsg);
428 // assume we have now taken a slot
431 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
432 LOG.fine("TLS CT SENT : seqn#" + sequenceNumber + " length=" + len);
436 // The retry queue has really gone stale.
439 // in this we close ourself
440 conn.close(HandshakeState.CONNECTIONDEAD);
441 } catch (IOException ignored) {
448 private void calcRTT(long enqueuedAt) {
449 long dt = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), enqueuedAt);
459 aveRTT = ((n * aveRTT) + dt) / (nACKS);
461 // Set retransmission time out: 2.5 x RTT
462 RTO = (aveRTT << 1) + (aveRTT >> 1);
466 RTO = Math.max(RTO, minRTO);
467 RTO = Math.min(RTO, maxRTO);
469 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
470 LOG.fine("TLS!! RTT = " + dt + "ms aveRTT = " + aveRTT + "ms" + " RTO = " + RTO + "ms");
474 private int calcAVEIQ(int iq) {
479 aveIQSize = ((n * aveIQSize) + iq) / nIQTests;
485 * Process an ACK Message. We remove ACKed messages from the retry queue.
486 * We only acknowledge messages received in sequence.
488 * The seqnum is for the largest unacknowledged seqnum
489 * the receipient has received.
491 * The sackList is a sequence of all of the received
492 * messages in the sender's input Q. All will be sequence numbers higher
493 * than the sequential ACK seqnum.
495 * Recepients are passive and only ack upon the receipt
496 * of an in sequence message.
498 * They depend on our RTO to fill holes in message
501 void ackReceived(int seqnum, int[] sackList) {
502 lastACKTime = TimeUtils.timeNow();
505 // remove acknowledged messages from retrans Q.
507 synchronized (retrQ) {
508 maxACK = Math.max(maxACK, seqnum);
510 // dump the current Retry queue and the SACK list
511 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
512 StringBuilder dumpRETRQ = new StringBuilder("ACK RECEIVE : " + Integer.toString(seqnum));
514 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
515 dumpRETRQ.append('\n');
517 dumpRETRQ.append("\tRETRQ (size=" + retrQ.size() + ")");
518 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
519 dumpRETRQ.append(" : ");
520 for (int y = 0; y < retrQ.size(); y++) {
522 dumpRETRQ.append(", ");
524 RetrQElt r = retrQ.get(y);
526 dumpRETRQ.append(r.seqnum);
529 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
530 dumpRETRQ.append('\n');
532 dumpRETRQ.append("\tSACKLIST (size=" + sackList.length + ")");
533 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
534 dumpRETRQ.append(" : ");
535 for (int y = 0; y < sackList.length; y++) {
537 dumpRETRQ.append(", ");
539 dumpRETRQ.append(sackList[y]);
542 LOG.fine(dumpRETRQ.toString());
545 Iterator eachRetryQueueEntry = retrQ.iterator();
547 // First remove monotonically increasing seq#s in retrans vector
548 while (eachRetryQueueEntry.hasNext()) {
549 RetrQElt r = (RetrQElt) eachRetryQueueEntry.next();
551 if (r.seqnum > seqnum) {
552 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
553 LOG.fine("r.seqnum :" + r.seqnum + " > seqnum :" + seqnum);
559 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
560 LOG.fine("seqnum :" + seqnum);
561 LOG.fine("Removing :" + r.seqnum + " from retransmit queue");
563 eachRetryQueueEntry.remove();
566 if (0 != r.enqueuedAt) {
567 calcRTT(r.enqueuedAt);
576 // Update last accessed time in response to getting seq acks.
577 if (numberACKed > 0) {
578 conn.lastAccessed = TimeUtils.timeNow();
581 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
582 LOG.fine("TLS!! SEQUENTIALLY ACKD SEQN = " + seqnum + ", (" + numberACKed + " acked)");
585 // most recent remote IQ free space
586 rmaxQSize = Math.max(rmaxQSize, sackList.length);
587 mrrIQFreeSpace = rmaxQSize - sackList.length;
589 // let's look at average sacs.size(). If it is big, then this
590 // probably means we must back off because the system is slow.
591 // Our retrans Queue can be large and we can overwhelm the
592 // receiver with retransmissions.
593 // We will keep the rwin <= ave real input queue size.
594 int aveIQ = calcAVEIQ(sackList.length);
596 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
597 LOG.fine("remote IQ free space = " + mrrIQFreeSpace + " remote avg IQ occupancy = " + aveIQ);
602 if (sackList.length > 0) {
603 Iterator eachRetrQElement = retrQ.iterator();
607 while (eachRetrQElement.hasNext()) {
608 RetrQElt r = (RetrQElt) eachRetrQElement.next();
610 while (sackList[currentSACK] < r.seqnum) {
612 if (currentSACK == sackList.length) {
617 if (currentSACK == sackList.length) {
621 if (sackList[currentSACK] == r.seqnum) {
622 eachRetrQElement.remove();
627 // for aveRTT calculation
628 long enqueuetime = r.enqueuedAt;
631 if (enqueuetime != 0) {
632 calcRTT(enqueuetime);
635 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
636 LOG.fine("TLS!! SACKD SEQN = " + r.seqnum);
645 // Retransmit? Only if there is a hole in the selected
646 // acknowledgement list. Otherwise let RTO deal.
647 // Given that this SACK acknowledged messages still
649 // seqnum is the max consectively SACKD message.
650 // seqnum < r.seqnum means a message has not reached
651 // receiver. EG: sacklist == 10,11,13 seqnum == 11
653 if (seqnum < r.seqnum) {
656 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
657 LOG.fine("RETR: Fill hole, SACK, seqn#" + r.seqnum + ", Window =" + retrans);
663 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
664 LOG.fine("TLS!! SELECTIVE ACKD (" + numberACKed + ") " + retrans + " retrans wanted");
667 // retransmit 1 retq mem. only
669 retransmit(Math.min(RWINDOW, retrans), lastACKTime);
670 sackRetransTime = TimeUtils.timeNow();
679 * retransmit unacknowledged messages
681 * @param rwin max number of messages to retransmit
682 * @return number of messages retransmitted.
684 private int retransmit(int rwin, long triggerTime) {
685 List retransMsgs = new ArrayList();
689 // build a list of retries.
690 synchronized (retrQ) {
691 numberToRetrans = Math.min(retrQ.size(), rwin);
693 if (numberToRetrans > 0 && Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
694 LOG.fine("RETRANSMITING [rwindow = " + numberToRetrans + "]");
697 for (int j = 0; j < numberToRetrans; j++) {
698 RetrQElt r = retrQ.get(j);
700 // Mark message as retransmission
701 // need to know if a msg was retr or not for RTT eval
705 // First time: we're here because this message has not arrived, but
706 // the next one has. It may be an out of order message.
707 // Experience shows that such a message rarely arrives older than
708 // 1.2 * aveRTT. Beyond that, it's lost. It is also rare that we
709 // detect a hole within that delay. So, often enough, as soon as
710 // a hole is detected, it's time to resend...but not always.
712 if (TimeUtils.toRelativeTimeMillis(triggerTime, r.sentAt) < (6 * aveRTT) / 5) {
714 // Nothing to worry about, yet.
720 // That one has been retransmitted at least once already.
721 // So, we don't have much of a clue other than the age of the
722 // last transmission. It is unlikely that it arrives before aveRTT/2
723 // but we have to anticipate its loss at the risk of making dupes.
724 // Otherwise the receiver will reach the hole, and that's really
725 // expensive. (Think that we've been trying for a while already.)
727 if (TimeUtils.toRelativeTimeMillis(triggerTime, r.sentAt) < aveRTT) {
729 // Nothing to worry about, yet.
735 // Make a copy to for sending
741 int retransmitted = 0;
743 Iterator eachRetrans = retransMsgs.iterator();
745 while (eachRetrans.hasNext()) {
746 RetrQElt r = (RetrQElt) eachRetrans.next();
748 eachRetrans.remove();
751 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
752 LOG.fine("TLS!! RETRANSMIT seqn#" + r.seqnum);
755 Message sending = r.msg;
757 // its possible that the message was acked while we were working
758 // in this case r.msg will have been nulled.
759 if (null != sending) {
760 sending = sending.clone();
761 sending.replaceMessageElement(JTlsDefs.TLSNameSpace, RETELT);
762 if (conn.sendToRemoteTls(sending)) {
763 mrrIQFreeSpace--; // assume we have now taken a slot
767 } // don't bother continuing.
769 } catch (IOException e) {
770 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
771 LOG.log(Level.FINE, "FAILED RETRANS seqn#" + r.seqnum, e);
773 break; // don't bother continuing.
777 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
778 LOG.fine("RETRANSMITED " + retransmitted + " of " + numberToRetrans);
781 return retransmitted;
785 * Retransmission daemon thread
787 private class Retransmitter implements Runnable {
789 Thread retransmitterThread;
790 volatile int nretransmitted = 0;
793 public Retransmitter() {
795 this.retransmitterThread = new Thread(tp.myThreadGroup, this, "JXTA TLS Retransmiter for " + conn.destAddr);
796 retransmitterThread.setDaemon(true);
797 retransmitterThread.start();
799 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
800 LOG.info("STARTED TLS Retransmit thread, RTO = " + RTO);
804 public int getRetransCount() {
805 return nretransmitted;
817 long conn_idle = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), conn.lastAccessed);
819 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
820 LOG.fine("RETRANS : " + conn + " idle for " + conn_idle);
823 // check to see if we have not idled out.
824 if (tp.CONNECTION_IDLE_TIMEOUT < conn_idle) {
825 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
826 LOG.info("RETRANS : Shutting down idle connection: " + conn);
830 // the following call eventually closes this stream
831 conn.close(HandshakeState.CONNECTIONDEAD);
832 // Leave. Otherwise we'll be spinning forever
834 } catch (IOException ignored) {
840 synchronized (retrQ) {
843 } catch (InterruptedException e) {
844 Thread.interrupted();
851 // see if we recently did a retransmit triggered by a SACK
852 long sinceLastSACKRetr = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), sackRetransTime);
854 if (sinceLastSACKRetr < RTO) {
855 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
856 LOG.fine("RETRANS : SACK retrans " + sinceLastSACKRetr + "ms ago");
862 // See how long we've waited since RTO was set
863 long sinceLastACK = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), lastACKTime);
864 long oldestInQueueWait;
866 synchronized (retrQ) {
867 if (retrQ.size() > 0) {
868 oldestInQueueWait = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), retrQ.get(0).enqueuedAt);
870 oldestInQueueWait = 0;
874 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
875 LOG.fine("RETRANS : Last ACK " + sinceLastACK + "ms ago. Age of oldest in Queue " + oldestInQueueWait + "ms");
878 // see if the queue has gone dead
879 if (oldestInQueueWait > (tp.RETRMAXAGE * 2)) {
880 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
881 LOG.info("RETRANS : Shutting down stale connection: " + conn);
885 conn.close(HandshakeState.CONNECTIONDEAD);
886 // Leave. Otherwise we'll be spinning forever.
888 } catch (IOException ignored) {
894 // get real wait as max of age of oldest in retrQ and
896 long realWait = Math.max(oldestInQueueWait, sinceLastACK);
898 // Retransmit only if RTO has expired.
899 // a. real wait time is longer than RTO
900 // b. oldest message on Q has been there longer
901 // than RTO. This is necessary because we may
902 // have just sent a message, and we do not
903 // want to overrun the receiver. Also, we
904 // do not want to restransmit a message that
905 // has not been idle for the RTO.
906 if ((realWait >= RTO) && (oldestInQueueWait >= RTO)) {
908 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
909 LOG.fine("RETRANS : RTO RETRANSMISSION [" + RWINDOW + "]");
913 int retransed = retransmit(RWINDOW, TimeUtils.timeNow());
916 nretransmitted += retransed;
918 // number at this RTO
919 nAtThisRTO += retransed;
921 // See if real wait is too long and queue is non-empty
922 // Remote may be dead - double until max.
923 // Double after window restransmitted msgs at this RTO
924 // exceeds the RWINDOW, and we've had no response for
925 // twice the current RTO.
926 if ((retransed > 0) && (realWait >= 2 * RTO) && (nAtThisRTO >= 2 * RWINDOW)) {
927 RTO = (realWait > maxRTO ? maxRTO : 2 * RTO);
931 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
932 LOG.fine("RETRANS : RETRANSMISSION " + retransed + " retrans " + nAtThisRTO + " at this RTO (" + RTO
933 + ") " + nretransmitted + " total retrans");
938 // reset RTO to min if we are idle
939 if (idleCounter == 2) {
945 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
946 LOG.fine("RETRANS : IDLE : RTO=" + RTO + " WAIT=" + realWait);
950 } catch (Throwable all) {
951 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
952 LOG.log(Level.SEVERE, "Uncaught Throwable in thread :" + Thread.currentThread().getName(), all);
955 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
956 LOG.info("STOPPED TLS Retransmit thread");
959 retransmitterThread = null;
960 retransmitter = null;