]> sjero.net Git - linphone/blob - p2pproxy/dependencies-src/jxse-src-2.5/impl/src/net/jxta/impl/endpoint/tls/JTlsOutputStream.java
remove mediastreamer2 and add it as a submodule instead.
[linphone] / p2pproxy / dependencies-src / jxse-src-2.5 / impl / src / net / jxta / impl / endpoint / tls / JTlsOutputStream.java
1 /*
2  * Copyright (c) 2001-2007 Sun Microsystems, Inc.  All rights reserved.
3  *  
4  *  The Sun Project JXTA(TM) Software License
5  *  
6  *  Redistribution and use in source and binary forms, with or without 
7  *  modification, are permitted provided that the following conditions are met:
8  *  
9  *  1. Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *  
12  *  2. Redistributions in binary form must reproduce the above copyright notice, 
13  *     this list of conditions and the following disclaimer in the documentation 
14  *     and/or other materials provided with the distribution.
15  *  
16  *  3. The end-user documentation included with the redistribution, if any, must 
17  *     include the following acknowledgment: "This product includes software 
18  *     developed by Sun Microsystems, Inc. for JXTA(TM) technology." 
19  *     Alternately, this acknowledgment may appear in the software itself, if 
20  *     and wherever such third-party acknowledgments normally appear.
21  *  
22  *  4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must 
23  *     not be used to endorse or promote products derived from this software 
24  *     without prior written permission. For written permission, please contact 
25  *     Project JXTA at http://www.jxta.org.
26  *  
27  *  5. Products derived from this software may not be called "JXTA", nor may 
28  *     "JXTA" appear in their name, without prior written permission of Sun.
29  *  
30  *  THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
31  *  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND 
32  *  FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN 
33  *  MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
34  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 
35  *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, 
36  *  OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
37  *  LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
38  *  NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, 
39  *  EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
40  *  
41  *  JXTA is a registered trademark of Sun Microsystems, Inc. in the United 
42  *  States and other countries.
43  *  
44  *  Please see the license information page at :
45  *  <http://www.jxta.org/project/www/license.html> for instructions on use of 
46  *  the license in source files.
47  *  
48  *  ====================================================================
49  *  
50  *  This software consists of voluntary contributions made by many individuals 
51  *  on behalf of Project JXTA. For more information on Project JXTA, please see 
52  *  http://www.jxta.org.
53  *  
54  *  This license is based on the BSD license adopted by the Apache Foundation. 
55  */
56
57 package net.jxta.impl.endpoint.tls;
58
59
60 import java.io.OutputStream;
61 import java.io.IOException;
62 import java.net.*;
63 import java.util.ArrayList;
64 import java.util.Iterator;
65 import java.util.List;
66 import java.util.Vector;
67
68 import java.util.logging.Level;
69 import net.jxta.logging.Logging;
70 import java.util.logging.Logger;
71
72 import net.jxta.endpoint.ByteArrayMessageElement;
73 import net.jxta.endpoint.Message;
74 import net.jxta.endpoint.MessageElement;
75 import net.jxta.endpoint.StringMessageElement;
76
77 import net.jxta.impl.endpoint.tls.TlsConn.HandshakeState;
78 import net.jxta.impl.util.TimeUtils;
79
80
81 /**
82  *  Acts as the output for TLS. Accepts ciphertext from TLS and packages it into
83  *  messages for sending to the remote. The messages are kept in a retry queue
84  *  until the remote peer acknowledges receipt of the message.
85  **/
86 class JTlsOutputStream extends OutputStream {
87
88     /**
89      *  Log4J Logger
90      **/
91     private static final Logger LOG = Logger.getLogger(JTlsOutputStream.class.getName());
92
93     // constants
94
95     /**
96      * This maximum is only enforced if we have not heard
97      * from the remote for RETRMAXAGE.
98      **/
99     private static final int MAXRETRQSIZE = 100;
100
101     /**
102      *  Initial estimated Round Trip Time
103      **/
104     private static final long initRTT = 1 * TimeUtils.ASECOND;
105
106     private static final MessageElement RETELT = new StringMessageElement(JTlsDefs.RETR, "TLSRET", null);
107
108     /**
109      * Retrans window. When reached, we up the RTO.
110      **/
111     private static final int RWINDOW = 5;
112
113     /**
114      *  If true then the stream has been closed.
115      **/
116     private volatile boolean closed = false;
117
118     /**
119      * If true then the stream is being closed.
120      * It means that it still works completely for all messages already
121      * queued, but no new message may be enqueued.
122      **/
123     private volatile boolean closing = false;
124
125     /**
126      *  Sequence number of the message we most recently sent out.
127      **/
128     private volatile int sequenceNumber = 0;
129
130     /**
131      *  Sequence number of highest sequential ACK.
132      **/
133     private volatile int maxACK = 0;
134
135     /**
136      *  Transport we are working for
137      **/
138     private TlsTransport tp = null;
139
140     /**
141      *  connection we are working for
142      **/
143     private TlsConn conn = null;
144
145     private Retransmitter retransmitter = null;
146
147     // for retransmission
148
149     /**
150      *  Average round trip time in milliseconds.
151      **/
152     private volatile long aveRTT = initRTT;
153
154     /**
155      *  Number of ACK message received.
156      **/
157     private int nACKS = 0;
158
159     /**
160      *  Retry Time Out measured in milliseconds.
161      **/
162     private volatile long RTO = 0;
163
164     /**
165      *  Minimum Retry Timeout measured in milliseconds.
166      **/
167     private volatile long minRTO = initRTT;
168
169     /**
170      *  Maximum Retry Timeout measured in milliseconds.
171      **/
172     private volatile long maxRTO = initRTT * 5;
173
174     /**
175      *  absolute time in milliseconds of last sequential ACK.
176      **/
177     private volatile long lastACKTime = 0;
178
179     /**
180      *  absolute time in milliseconds of last SACK based retransmit.
181      **/
182     private volatile long sackRetransTime = 0;
183
184     /**
185      *   The collection of messages available for re-transmission.
186      */
187     final List<RetrQElt> retrQ = new Vector<RetrQElt>(25, 5);
188
189     // running average of receipients Input Queue
190     private int nIQTests = 0;
191     private int aveIQSize = 0;
192
193     /**
194      *  Our estimation of the current free space in the remote input queue.
195      **/
196     private volatile int mrrIQFreeSpace = 0;
197
198     /**
199      *  Our estimation of the maximum sise of the remote input queue.
200      **/
201     private int rmaxQSize = 0;
202
203     /**
204      * retrans queue element
205      **/
206     private static class RetrQElt {
207         int seqnum; // sequence number of this message.
208         long enqueuedAt; // absolute time of original enqueing.
209         volatile Message msg; // the message
210         int marked; // has been marked as retransmission
211         long sentAt; // when this msg was last transmitted
212
213         public RetrQElt(int seqnum, Message msg) {
214             this.seqnum = seqnum;
215             this.msg = msg;
216             this.enqueuedAt = TimeUtils.timeNow();
217             this.sentAt = this.enqueuedAt;
218             this.marked = 0;
219         }
220     }
221
222     JTlsOutputStream(TlsTransport tp, TlsConn conn) {
223         this.conn = conn; // TlsConnection.
224         this.tp = tp; // our transport
225
226         this.RTO = minRTO; // initial RTO
227
228         // input free queue size
229         this.rmaxQSize = 20;
230         this.mrrIQFreeSpace = rmaxQSize;
231
232         // Init last ACK Time to now
233         this.lastACKTime = TimeUtils.timeNow();
234         this.sackRetransTime = TimeUtils.timeNow();
235
236         // Start retransmission thread
237         this.retransmitter = new Retransmitter();
238     }
239
240     /**
241      * {@inheritDoc}
242      *
243      *  <p/>We don't current support linger.
244      **/
245     @Override
246     public void close() throws IOException {
247         synchronized (this) {
248             super.close();
249             closed = true;
250         }
251         synchronized (retrQ) {
252             retrQ.notifyAll();
253             retrQ.clear();
254         }
255     }
256
257     /**
258      * indicate that we're in the process of closing. To respect the semantics
259      * of close()/isClosed(), we do not set the closed flag, yet. Instead, we
260      * set the flag "closing", which simply garantees that no new message
261      * will be queued.
262      * This, in combination with getSequenceNumber and getMaxAck, and
263      * waitQevent, enables fine grain control of the tear down process.
264      **/
265     public void setClosing() {
266         synchronized (retrQ) {
267             closing = true;
268             retrQ.clear();
269             retrQ.notifyAll();
270         }
271     }
272
273     /**
274      * {@inheritDoc}
275      **/
276     @Override
277     public void write(int c) throws IOException {
278         byte[] a = new byte[1];
279
280         a[0] = (byte) (c & 0xFF);
281         write(a, 0, 1);
282     }
283
284     /**
285      * {@inheritDoc}
286      *
287      * <p/>We override the write(byte[], offset, length);
288      * method which is called by SSLRecord.send(SSLConn conn)
289      * via tos.writeTo(conn.sock_out), tos a ByteArrayOutputStream
290      * which has buffered the TLS output record in the byte array.
291      * The actual call is write(byte[] b, 0, length);
292      *
293      * <p/>We put this TLS record into a msssage element for the output
294      * pipe to send along.
295      *
296      * <p/>This is reasonable since in fact, if more than 16K bytes of
297      * application data are sent, then the max TLS Record is a little
298      * larger than 16K bytes including the TLS overhead.
299      *
300      * <p/>Therefore, an app. message is N+r TLS Records,
301      * Message length = Nx16K + r, N >= 0, r >= 0,
302      * N > 0 || r > 0 true.
303      **/
304     @Override
305     public void write(byte[] b, int off, int len) throws IOException {
306         // flag to allow connection closure in finally block
307         // Connection can not be closed when holding a lock on this
308         boolean closeStale = false;
309         // allocate new message
310         Message jmsg = new Message();
311
312         try {
313             if (closed) {
314                 throw new IOException("stream is closed");
315             }
316             if (closing) {
317                 throw new IOException("stream is being closed");
318             }
319             if (b == null) {
320                 throw new IllegalArgumentException("buffer is null");
321             }
322
323             if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
324                 throw new IndexOutOfBoundsException();
325             }
326
327             if (len == 0) {
328                 return;
329             }
330
331             // Copy the data since it will be queued, and caller may
332             // overwrite the same byte[] buffer.
333             byte[] data = new byte[len];
334
335             System.arraycopy(b, off, data, 0, len);
336
337             // sync so that writes don't get out of order.
338             synchronized (retrQ) {
339                 // add TLS record as element
340                 MessageElement ciphertext = new ByteArrayMessageElement(Integer.toString(++sequenceNumber), JTlsDefs.BLOCKS, data
341                         ,
342                         null);
343
344                 jmsg.addMessageElement(JTlsDefs.TLSNameSpace, ciphertext);
345
346                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
347                     LOG.fine("TLS CT WRITE : seqn#" + sequenceNumber + " length=" + len);
348                 }
349
350                 // (1)  See if the most recent remote input queue size is close to
351                 // it's maximum input queue size
352                 // Send only if at least 20% or more of the queue is free.
353                 // (2) Also, if our retransQ is larger than the remotes inputQ,
354                 // wait until we've received an ack.
355                 // We assume some msgs are in transit or the remote system buffers
356                 // We do not want to overrun the receiver.
357                 // (3) We need to release from the loop because of possible deadlocks
358                 // EG: retrQ.size() == 0 and mrrIQFreeSpace forces looping
359                 // forever because the most recent SACK cleared it, and the receiver
360                 // is waiting for more data.
361
362                 // max of 200ms wait
363                 int maxwait = Math.min((int) aveRTT, 200);
364                 // iterations to wait (max 3, min 1)
365                 int waitCt = Math.max(maxwait / 60, 1);
366
367                 // check if the queue has gone dead.
368                 if (retrQ.size() > 0) {
369                     long inQueue = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), retrQ.get(0).enqueuedAt);
370
371                     if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
372                         LOG.fine("write : Retry queue idle for " + inQueue);
373                     }
374
375                     if (inQueue > tp.RETRMAXAGE) {
376                         if (inQueue > (2 * tp.RETRMAXAGE)) {
377                             if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
378                                 LOG.info("Closing stale connection " + conn);
379                             }
380                             // SPT - set flag for connection close in finally block
381                             closeStale = true;
382                             throw new IOException("Stale connection closure in progress");
383                         } else if (retrQ.size() >= MAXRETRQSIZE) {
384                             // if the the queue is "full" and we are long idle, delay new writes forever.
385                             waitCt = Integer.MAX_VALUE;
386                         }
387                     }
388                 }
389
390                 int i = 0;
391
392                 while (!closed && ((mrrIQFreeSpace < rmaxQSize / 5) || (retrQ.size() > rmaxQSize))) {
393
394                     // see if max. wait has arrived.
395                     if (i++ == waitCt) {
396                         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
397                             LOG.fine("write() wait for ACK, maxwait timer expired while enqueuing seqn#" + sequenceNumber);
398                         }
399                         break;
400                     }
401
402                     if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
403                         LOG.fine("write() wait 60ms for ACK while enqueuing seqn#" + sequenceNumber + "\n\tremote IQ free space = "
404                                 + mrrIQFreeSpace + "\n\tMIN free space to continue = " + (rmaxQSize / 5) + "" + "\n\tretQ.size()="
405                                 + retrQ.size());
406                     }
407
408                     // Less than 20% free queue space is left. Wait.
409                     try {
410                         retrQ.wait(60);
411                     } catch (InterruptedException ignored) {
412                         Thread.interrupted();
413                     }
414                 }
415
416                 // place copy on retransmission queue
417                 RetrQElt r = new RetrQElt(sequenceNumber, jmsg.clone());
418
419                 retrQ.add(r);
420
421                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
422                     LOG.fine("Retrans Enqueue added seqn#" + sequenceNumber + " retQ.size()=" + retrQ.size());
423                 }
424             }
425
426             // Here we will send the message to the transport
427             conn.sendToRemoteTls(jmsg);
428             // assume we have now taken a slot
429             mrrIQFreeSpace--;
430
431             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
432                 LOG.fine("TLS CT SENT : seqn#" + sequenceNumber + " length=" + len);
433             }
434         } finally {
435             if (closeStale) {
436                 // The retry queue has really gone stale.
437                 try {
438                     setClosing();
439                     // in this we close ourself
440                     conn.close(HandshakeState.CONNECTIONDEAD);
441                 } catch (IOException ignored) {
442                     ;
443                 }
444             }
445         }
446     }
447
448     private void calcRTT(long enqueuedAt) {
449         long dt = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), enqueuedAt);
450
451         if (dt == 0) {
452             dt += 1;
453         }
454
455         int n = nACKS;
456
457         nACKS += 1;
458
459         aveRTT = ((n * aveRTT) + dt) / (nACKS);
460
461         // Set retransmission time out: 2.5 x RTT
462         RTO = (aveRTT << 1) + (aveRTT >> 1);
463
464         // Enforce a min/max
465
466         RTO = Math.max(RTO, minRTO);
467         RTO = Math.min(RTO, maxRTO);
468
469         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
470             LOG.fine("TLS!! RTT = " + dt + "ms aveRTT = " + aveRTT + "ms" + " RTO = " + RTO + "ms");
471         }
472     }
473
474     private int calcAVEIQ(int iq) {
475         int n = nIQTests;
476
477         nIQTests += 1;
478
479         aveIQSize = ((n * aveIQSize) + iq) / nIQTests;
480
481         return aveIQSize;
482     }
483
484     /**
485      * Process an ACK Message. We remove ACKed messages from the retry queue.
486      * We only acknowledge messages received in sequence.
487      *
488      * The seqnum is for the largest unacknowledged seqnum
489      * the receipient has received.
490      *
491      * The sackList is a sequence of all of the received
492      *    messages in the sender's input Q. All will be sequence numbers higher
493      *    than the sequential ACK seqnum.
494      *
495      *    Recepients are passive and only ack upon the receipt
496      *    of an in sequence message.
497      *
498      *    They depend on our RTO to fill holes in message
499      *   sequences.
500      **/
501     void ackReceived(int seqnum, int[] sackList) {
502         lastACKTime = TimeUtils.timeNow();
503         int numberACKed = 0;
504
505         // remove acknowledged messages from retrans Q.
506
507         synchronized (retrQ) {
508             maxACK = Math.max(maxACK, seqnum);
509
510             // dump the current Retry queue and the SACK list
511             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
512                 StringBuilder dumpRETRQ = new StringBuilder("ACK RECEIVE : " + Integer.toString(seqnum));
513
514                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
515                     dumpRETRQ.append('\n');
516                 }
517                 dumpRETRQ.append("\tRETRQ (size=" + retrQ.size() + ")");
518                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
519                     dumpRETRQ.append(" : ");
520                     for (int y = 0; y < retrQ.size(); y++) {
521                         if (0 != y) {
522                             dumpRETRQ.append(", ");
523                         }
524                         RetrQElt r = retrQ.get(y);
525
526                         dumpRETRQ.append(r.seqnum);
527                     }
528                 }
529                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
530                     dumpRETRQ.append('\n');
531                 }
532                 dumpRETRQ.append("\tSACKLIST (size=" + sackList.length + ")");
533                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
534                     dumpRETRQ.append(" : ");
535                     for (int y = 0; y < sackList.length; y++) {
536                         if (0 != y) {
537                             dumpRETRQ.append(", ");
538                         }
539                         dumpRETRQ.append(sackList[y]);
540                     }
541                 }
542                 LOG.fine(dumpRETRQ.toString());
543             }
544
545             Iterator eachRetryQueueEntry = retrQ.iterator();
546
547             // First remove monotonically increasing seq#s in retrans vector
548             while (eachRetryQueueEntry.hasNext()) {
549                 RetrQElt r = (RetrQElt) eachRetryQueueEntry.next();
550
551                 if (r.seqnum > seqnum) {
552                     if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
553                         LOG.fine("r.seqnum :" + r.seqnum + " > seqnum :" + seqnum);
554                     }
555                     break;
556                 }
557
558                 // Acknowledged
559                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
560                     LOG.fine("seqnum :" + seqnum);
561                     LOG.fine("Removing :" + r.seqnum + " from retransmit queue");
562                 }
563                 eachRetryQueueEntry.remove();
564
565                 // Update RTT, RTO
566                 if (0 != r.enqueuedAt) {
567                     calcRTT(r.enqueuedAt);
568                 }
569
570                 r.msg.clear();
571                 r.msg = null;
572                 r = null;
573                 numberACKed++;
574             }
575
576             // Update last accessed time in response to getting seq acks.
577             if (numberACKed > 0) {
578                 conn.lastAccessed = TimeUtils.timeNow();
579             }
580
581             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
582                 LOG.fine("TLS!! SEQUENTIALLY ACKD SEQN = " + seqnum + ", (" + numberACKed + " acked)");
583             }
584
585             // most recent remote IQ free space
586             rmaxQSize = Math.max(rmaxQSize, sackList.length);
587             mrrIQFreeSpace = rmaxQSize - sackList.length;
588
589             // let's look at average sacs.size(). If it is big, then this
590             // probably means we must back off because the system is slow.
591             // Our retrans Queue can be large and we can overwhelm the
592             // receiver with retransmissions.
593             // We will keep the rwin <= ave real input queue size.
594             int aveIQ = calcAVEIQ(sackList.length);
595
596             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
597                 LOG.fine("remote IQ free space = " + mrrIQFreeSpace + " remote avg IQ occupancy = " + aveIQ);
598             }
599
600             int retrans = 0;
601
602             if (sackList.length > 0) {
603                 Iterator eachRetrQElement = retrQ.iterator();
604
605                 int currentSACK = 0;
606
607                 while (eachRetrQElement.hasNext()) {
608                     RetrQElt r = (RetrQElt) eachRetrQElement.next();
609
610                     while (sackList[currentSACK] < r.seqnum) {
611                         currentSACK++;
612                         if (currentSACK == sackList.length) {
613                             break;
614                         }
615                     }
616
617                     if (currentSACK == sackList.length) {
618                         break;
619                     }
620
621                     if (sackList[currentSACK] == r.seqnum) {
622                         eachRetrQElement.remove();
623
624                         // ack counter
625                         numberACKed++;
626
627                         // for aveRTT calculation
628                         long enqueuetime = r.enqueuedAt;
629
630                         // Update RTT, RTO
631                         if (enqueuetime != 0) {
632                             calcRTT(enqueuetime);
633                         }
634
635                         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
636                             LOG.fine("TLS!! SACKD SEQN = " + r.seqnum);
637                         }
638
639                         // GC this stuff
640                         r.msg.clear();
641                         r.msg = null;
642                         r = null;
643
644                     } else {
645                         // Retransmit? Only if there is a hole in the selected
646                         // acknowledgement list. Otherwise let RTO deal.
647                         // Given that this SACK acknowledged messages still
648                         // in the retrQ:
649                         // seqnum is the max consectively SACKD message.
650                         // seqnum < r.seqnum means a message has not reached
651                         // receiver. EG: sacklist == 10,11,13 seqnum == 11
652                         // We retransmit 12.
653                         if (seqnum < r.seqnum) {
654                             retrans++;
655
656                             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
657                                 LOG.fine("RETR: Fill hole, SACK, seqn#" + r.seqnum + ", Window =" + retrans);
658                             }
659                         }
660                     }
661                 }
662
663                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
664                     LOG.fine("TLS!! SELECTIVE ACKD (" + numberACKed + ") " + retrans + " retrans wanted");
665                 }
666
667                 // retransmit 1 retq mem. only
668                 if (retrans > 0) {
669                     retransmit(Math.min(RWINDOW, retrans), lastACKTime);
670                     sackRetransTime = TimeUtils.timeNow();
671                 }
672             }
673
674             retrQ.notify();
675         }
676     }
677
678     /**
679      * retransmit unacknowledged  messages
680      *
681      *  @param rwin max number of messages to retransmit
682      *  @return number of messages retransmitted.
683      **/
684     private int retransmit(int rwin, long triggerTime) {
685         List retransMsgs = new ArrayList();
686
687         int numberToRetrans;
688
689         // build a list of retries.
690         synchronized (retrQ) {
691             numberToRetrans = Math.min(retrQ.size(), rwin);
692
693             if (numberToRetrans > 0 && Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
694                 LOG.fine("RETRANSMITING [rwindow = " + numberToRetrans + "]");
695             }
696
697             for (int j = 0; j < numberToRetrans; j++) {
698                 RetrQElt r = retrQ.get(j);
699
700                 // Mark message as retransmission
701                 // need to know if a msg was retr or not for RTT eval
702
703                 if (r.marked == 0) {
704
705                     // First time: we're here because this message has not arrived, but
706                     // the next one has. It may be an out of order message.
707                     // Experience shows that such a message rarely arrives older than
708                     // 1.2 * aveRTT. Beyond that, it's lost. It is also rare that we
709                     // detect a hole within that delay. So, often enough, as soon as
710                     // a hole is detected, it's time to resend...but not always.
711
712                     if (TimeUtils.toRelativeTimeMillis(triggerTime, r.sentAt) < (6 * aveRTT) / 5) {
713
714                         // Nothing to worry about, yet.
715                         continue;
716                     }
717
718                 } else {
719
720                     // That one has been retransmitted at least once already.
721                     // So, we don't have much of a clue other than the age of the
722                     // last transmission. It is unlikely that it arrives before aveRTT/2
723                     // but we have to anticipate its loss at the risk of making dupes.
724                     // Otherwise the receiver will reach the hole, and that's really
725                     // expensive. (Think that we've been trying for a while already.)
726
727                     if (TimeUtils.toRelativeTimeMillis(triggerTime, r.sentAt) < aveRTT) {
728
729                         // Nothing to worry about, yet.
730                         continue;
731                     }
732                 }
733
734                 r.marked++;
735                 // Make a copy to for sending
736                 retransMsgs.add(r);
737             }
738         }
739
740         // send the retries.
741         int retransmitted = 0;
742
743         Iterator eachRetrans = retransMsgs.iterator();
744
745         while (eachRetrans.hasNext()) {
746             RetrQElt r = (RetrQElt) eachRetrans.next();
747
748             eachRetrans.remove();
749
750             try {
751                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
752                     LOG.fine("TLS!! RETRANSMIT seqn#" + r.seqnum);
753                 }
754
755                 Message sending = r.msg;
756
757                 // its possible that the message was acked while we were working
758                 // in this case r.msg will have been nulled.
759                 if (null != sending) {
760                     sending = sending.clone();
761                     sending.replaceMessageElement(JTlsDefs.TLSNameSpace, RETELT);
762                     if (conn.sendToRemoteTls(sending)) {
763                         mrrIQFreeSpace--; // assume we have now taken a slot
764                         retransmitted++;
765                     } else {
766                         break;
767                     } // don't bother continuing.
768                 }
769             } catch (IOException e) {
770                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
771                     LOG.log(Level.FINE, "FAILED RETRANS seqn#" + r.seqnum, e);
772                 }
773                 break; // don't bother continuing.
774             }
775         }
776
777         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
778             LOG.fine("RETRANSMITED " + retransmitted + " of " + numberToRetrans);
779         }
780
781         return retransmitted;
782     }
783
784     /**
785      * Retransmission daemon thread
786      **/
787     private class Retransmitter implements Runnable {
788
789         Thread retransmitterThread;
790         volatile int nretransmitted = 0;
791         int nAtThisRTO = 0;
792
793         public Retransmitter() {
794
795             this.retransmitterThread = new Thread(tp.myThreadGroup, this, "JXTA TLS Retransmiter for " + conn.destAddr);
796             retransmitterThread.setDaemon(true);
797             retransmitterThread.start();
798
799             if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
800                 LOG.info("STARTED TLS Retransmit thread, RTO = " + RTO);
801             }
802         }
803
804         public int getRetransCount() {
805             return nretransmitted;
806         }
807
808         /**
809          *  {@inheritDoc]
810          **/
811         public void run() {
812
813             try {
814                 int idleCounter = 0;
815
816                 while (!closed) {
817                     long conn_idle = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), conn.lastAccessed);
818
819                     if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
820                         LOG.fine("RETRANS : " + conn + " idle for " + conn_idle);
821                     }
822
823                     // check to see if we have not idled out.
824                     if (tp.CONNECTION_IDLE_TIMEOUT < conn_idle) {
825                         if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
826                             LOG.info("RETRANS : Shutting down idle connection: " + conn);
827                         }
828                         try {
829                             setClosing();
830                             // the following call eventually closes this stream
831                             conn.close(HandshakeState.CONNECTIONDEAD);
832                             // Leave. Otherwise we'll be spinning forever
833                             return;
834                         } catch (IOException ignored) {
835                             ;
836                         }
837                         continue;
838                     }
839
840                     synchronized (retrQ) {
841                         try {
842                             retrQ.wait(RTO);
843                         } catch (InterruptedException e) {
844                             Thread.interrupted();
845                         }
846                     }
847                     if (closed) {
848                         break;
849                     }
850
851                     // see if we recently did a retransmit triggered by a SACK
852                     long sinceLastSACKRetr = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), sackRetransTime);
853
854                     if (sinceLastSACKRetr < RTO) {
855                         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
856                             LOG.fine("RETRANS : SACK retrans " + sinceLastSACKRetr + "ms ago");
857                         }
858
859                         continue;
860                     }
861
862                     // See how long we've waited since RTO was set
863                     long sinceLastACK = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), lastACKTime);
864                     long oldestInQueueWait;
865
866                     synchronized (retrQ) {
867                         if (retrQ.size() > 0) {
868                             oldestInQueueWait = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), retrQ.get(0).enqueuedAt);
869                         } else {
870                             oldestInQueueWait = 0;
871                         }
872                     }
873
874                     if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
875                         LOG.fine("RETRANS : Last ACK " + sinceLastACK + "ms ago. Age of oldest in Queue " + oldestInQueueWait + "ms");
876                     }
877
878                     // see if the queue has gone dead
879                     if (oldestInQueueWait > (tp.RETRMAXAGE * 2)) {
880                         if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
881                             LOG.info("RETRANS : Shutting down stale connection: " + conn);
882                         }
883                         try {
884                             setClosing();
885                             conn.close(HandshakeState.CONNECTIONDEAD);
886                             // Leave. Otherwise we'll be spinning forever.
887                             return;
888                         } catch (IOException ignored) {
889                             ;
890                         }
891                         continue;
892                     }
893
894                     // get real wait as max of age of oldest in retrQ and
895                     // lastAck time
896                     long realWait = Math.max(oldestInQueueWait, sinceLastACK);
897
898                     // Retransmit only if RTO has expired.
899                     // a. real wait time is longer than RTO
900                     // b. oldest message on Q has been there longer
901                     // than RTO. This is necessary because we may
902                     // have just sent a message, and we do not
903                     // want to overrun the receiver. Also, we
904                     // do not want to restransmit a message that
905                     // has not been idle for the RTO.
906                     if ((realWait >= RTO) && (oldestInQueueWait >= RTO)) {
907
908                         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
909                             LOG.fine("RETRANS : RTO RETRANSMISSION [" + RWINDOW + "]");
910                         }
911
912                         // retrasmit
913                         int retransed = retransmit(RWINDOW, TimeUtils.timeNow());
914
915                         // Total
916                         nretransmitted += retransed;
917
918                         // number at this RTO
919                         nAtThisRTO += retransed;
920
921                         // See if real wait is too long and queue is non-empty
922                         // Remote may be dead - double until max.
923                         // Double after window restransmitted msgs at this RTO
924                         // exceeds the RWINDOW, and we've had no response for
925                         // twice the current RTO.
926                         if ((retransed > 0) && (realWait >= 2 * RTO) && (nAtThisRTO >= 2 * RWINDOW)) {
927                             RTO = (realWait > maxRTO ? maxRTO : 2 * RTO);
928                             nAtThisRTO = 0;
929                         }
930
931                         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
932                             LOG.fine("RETRANS : RETRANSMISSION " + retransed + " retrans " + nAtThisRTO + " at this RTO (" + RTO
933                                     + ") " + nretransmitted + " total retrans");
934                         }
935                     } else {
936                         idleCounter += 1;
937
938                         // reset RTO to min if we are idle
939                         if (idleCounter == 2) {
940                             RTO = minRTO;
941                             idleCounter = 0;
942                             nAtThisRTO = 0;
943                         }
944
945                         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
946                             LOG.fine("RETRANS : IDLE : RTO=" + RTO + " WAIT=" + realWait);
947                         }
948                     }
949                 }
950             } catch (Throwable all) {
951                 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
952                     LOG.log(Level.SEVERE, "Uncaught Throwable in thread :" + Thread.currentThread().getName(), all);
953                 }
954             } finally {
955                 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
956                     LOG.info("STOPPED TLS Retransmit thread");
957                 }
958
959                 retransmitterThread = null;
960                 retransmitter = null;
961             }
962         }
963     }
964 }