]> sjero.net Git - linphone/blob - p2pproxy/dependencies-src/jxse-src-2.5/impl/src/net/jxta/impl/util/pipe/reliable/ReliableOutputStream.java
remove mediastreamer2 and add it as a submodule instead.
[linphone] / p2pproxy / dependencies-src / jxse-src-2.5 / impl / src / net / jxta / impl / util / pipe / reliable / ReliableOutputStream.java
1 /*
2  * Copyright (c) 2003-2007 Sun Microsystems, Inc.  All rights reserved.
3  *  
4  *  The Sun Project JXTA(TM) Software License
5  *  
6  *  Redistribution and use in source and binary forms, with or without 
7  *  modification, are permitted provided that the following conditions are met:
8  *  
9  *  1. Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *  
12  *  2. Redistributions in binary form must reproduce the above copyright notice, 
13  *     this list of conditions and the following disclaimer in the documentation 
14  *     and/or other materials provided with the distribution.
15  *  
16  *  3. The end-user documentation included with the redistribution, if any, must 
17  *     include the following acknowledgment: "This product includes software 
18  *     developed by Sun Microsystems, Inc. for JXTA(TM) technology." 
19  *     Alternately, this acknowledgment may appear in the software itself, if 
20  *     and wherever such third-party acknowledgments normally appear.
21  *  
22  *  4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must 
23  *     not be used to endorse or promote products derived from this software 
24  *     without prior written permission. For written permission, please contact 
25  *     Project JXTA at http://www.jxta.org.
26  *  
27  *  5. Products derived from this software may not be called "JXTA", nor may 
28  *     "JXTA" appear in their name, without prior written permission of Sun.
29  *  
30  *  THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
31  *  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND 
32  *  FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN 
33  *  MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
34  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 
35  *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, 
36  *  OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
37  *  LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
38  *  NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, 
39  *  EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
40  *  
41  *  JXTA is a registered trademark of Sun Microsystems, Inc. in the United 
42  *  States and other countries.
43  *  
44  *  Please see the license information page at :
45  *  <http://www.jxta.org/project/www/license.html> for instructions on use of 
46  *  the license in source files.
47  *  
48  *  ====================================================================
49  *  
50  *  This software consists of voluntary contributions made by many individuals 
51  *  on behalf of Project JXTA. For more information on Project JXTA, please see 
52  *  http://www.jxta.org.
53  *  
54  *  This license is based on the BSD license adopted by the Apache Foundation. 
55  */
56
57 package net.jxta.impl.util.pipe.reliable;
58
59
60 import java.io.ByteArrayOutputStream;
61 import net.jxta.endpoint.ByteArrayMessageElement;
62 import net.jxta.endpoint.Message;
63 import net.jxta.endpoint.MessageElement;
64 import net.jxta.endpoint.StringMessageElement;
65 import net.jxta.endpoint.WireFormatMessage;
66 import net.jxta.endpoint.WireFormatMessageFactory;
67 import net.jxta.impl.util.TimeUtils;
68 import net.jxta.logging.Logging;
69
70 import java.io.DataInputStream;
71 import java.io.IOException;
72 import java.io.OutputStream;
73 import java.util.ArrayList;
74 import java.util.Arrays;
75 import java.util.Iterator;
76 import java.util.List;
77 import java.util.concurrent.atomic.AtomicInteger;
78 import java.util.logging.Level;
79 import java.util.logging.Logger;
80
81
82 /**
83  * Accepts data and packages it into messages for sending to the remote. The
84  * messages are kept in a retry queue until the remote peer acknowledges
85  * receipt of the message.
86  */
87 public class ReliableOutputStream extends OutputStream implements Incoming {
88     
89     /**
90      * Logger
91      */
92     private final static Logger LOG = Logger.getLogger(ReliableOutputStream.class.getName());
93     
94     /**
95      * Initial estimated Round Trip Time
96      */
97     private final static long initRTT = 10 * TimeUtils.ASECOND;
98     
99     /**
100      *  The default size for the blocks we will chunk the stream into.
101      */
102     private final static int DEFAULT_MESSAGE_CHUNK_SIZE = 63 * 1024;
103     
104     private final static MessageElement RETELT = new StringMessageElement(Defs.RETRY_ELEMENT_NAME, Defs.RETRY_ELEMENT_VALUE, null);
105     
106     /**
107      * A lock we use to ensure that write operations happen in order.
108      */
109     private final Object writeLock = new String("writeLock");
110     
111     /**
112      * The buffer we cache writes to.
113      */
114     private byte[] writeBuffer = null;
115     
116     /**
117      * Number of bytes written to the write buffer.
118      */
119     private int writeCount = 0;
120     
121     /**
122      * Set the default write buffer size.
123      */
124     private int writeBufferSize = DEFAULT_MESSAGE_CHUNK_SIZE;
125     
126     /**
127      * Absolute time in milliseconds at which the write buffer began
128      * accumulating bytes to be written.
129      */
130     private long writeBufferAge = Long.MAX_VALUE;
131     
132     /**
133      * If less than {@code TimeUtils.timenow()} then we are closed otherwise
134      * this is the absolute time at which we will become closed. We begin by
135      * setting this value as {@Long.MAX_VALUE} until we establish an earlier
136      * close deadline.
137      */
138     private long closedAt = Long.MAX_VALUE;
139     
140     /**
141      * If {@code true} then we have received a close request from the remote
142      * side. They do not want to receive any more messages from us.
143      */
144     private volatile boolean remoteClosed = false;
145     
146     /**
147      * If {@code true} then we have closed this stream locally and will not
148      * accept any further messages for sending. Unacknowledged messages will
149      * be retransmitted until the linger delay is passed.
150      */
151     private volatile boolean localClosed = false;
152     
153     /**
154      * The relative time in milliseconds that we will allow our connection to
155      * linger.
156      */
157     private long lingerDelay = 120 * TimeUtils.ASECOND;
158     
159     /**
160      * Sequence number of the message we most recently sent out.
161      */
162     private AtomicInteger sequenceNumber = new AtomicInteger(0);
163     
164     /**
165      * Sequence number of highest sequential ACK.
166      */
167     private volatile int maxACK = 0;
168     
169     /**
170      * connection we are working for
171      */
172     private final Outgoing outgoing;
173     
174     /**
175      *  The daemon thread that performs retransmissions.
176      */
177     private Thread retrThread = null;
178     
179     // for retransmission
180     /**
181      * Average round trip time in milliseconds.
182      */
183     private volatile long aveRTT = initRTT;
184     private volatile long remRTT = 0;
185     
186     /**
187      * Has aveRTT been set at least once over its initial guesstimate value.
188      */
189     private boolean aveRTTreset = false;
190     
191     /**
192      * Number of ACK message received.
193      */
194     private AtomicInteger numACKS = new AtomicInteger(0);
195     
196     /**
197      * When to start computing aveRTT
198      */
199     private int rttThreshold = 0;
200     
201     /**
202      * Retry Time Out measured in milliseconds.
203      */
204     private volatile long RTO = 0;
205     
206     /**
207      * Minimum Retry Timeout measured in milliseconds.
208      */
209     private volatile long minRTO = initRTT * 5;
210     
211     /**
212      * Maximum Retry Timeout measured in milliseconds.
213      */
214     private volatile long maxRTO = initRTT * 60;
215     
216     /**
217      * absolute time in milliseconds of last sequential ACK.
218      */
219     private volatile long lastACKTime = 0;
220     
221     /**
222      * absolute time in milliseconds of last SACK based retransmit.
223      */
224     private volatile long sackRetransTime = 0;
225     
226     // running average of receipients Input Queue
227     private int nIQTests = 0;
228     private int aveIQSize = 0;
229     
230     /**
231      * Our estimation of the current free space in the remote input queue.
232      */
233     private volatile int mrrIQFreeSpace = 0;
234     
235     /**
236      * Our estimation of the maximum size of the remote input queue.
237      */
238     private int rmaxQSize = Defs.MAXQUEUESIZE;
239     
240     /**
241      * The flow control module.
242      */
243     private final FlowControl fc;
244     
245     /**
246      * Cache of the last rwindow recommendation by fc.
247      */
248     private volatile int rwindow = 0;
249     
250     /**
251      * retrans queue element
252      */
253     private static class RetrQElt {
254         
255         /**
256          * sequence number of this message.
257          */
258         final int seqnum;
259         
260         /**
261          * the message
262          */
263         final Message msg;
264         
265         /**
266          * absolute time of original enqueuing
267          */
268         final long enqueuedAt;
269         
270         /**
271          * has been marked as retransmission
272          */
273         int marked;
274         
275         /**
276          * absolute time when this msg was last transmitted
277          */
278         long sentAt;
279         
280         /**
281          * Constructor for the RetrQElt object
282          *
283          * @param seqnum sequence number
284          * @param msg    the message
285          */
286         public RetrQElt(int seqnum, Message msg) {
287             this.seqnum = seqnum;
288             this.msg = msg;
289             this.enqueuedAt = TimeUtils.timeNow();
290             this.sentAt = this.enqueuedAt;
291             this.marked = 0;
292         }
293     }
294     
295     /**
296      * The collection of messages available for re-transmission.
297      */
298     protected final List<RetrQElt> retrQ = new ArrayList<RetrQElt>();
299     
300     /**
301      * Constructor for the ReliableOutputStream object
302      *
303      * @param outgoing the outgoing object
304      */
305     public ReliableOutputStream(Outgoing outgoing) {
306         // By default use the old behaviour: fixed fc with a rwin of 20
307         this(outgoing, new FixedFlowControl(20));
308     }
309     
310     /**
311      * Constructor for the ReliableOutputStream object
312      *
313      * @param outgoing the outgoing object
314      * @param fc       flow-control
315      */
316     public ReliableOutputStream(Outgoing outgoing, FlowControl fc) {
317         this.outgoing = outgoing;
318         
319         // initial RTO is set to maxRTO so as to give time
320         // to the receiver to catch-up
321         this.RTO = maxRTO;
322         
323         this.mrrIQFreeSpace = rmaxQSize;
324         this.rttThreshold = rmaxQSize;
325         
326         // Init last ACK Time to now
327         this.lastACKTime = TimeUtils.timeNow();
328         this.sackRetransTime = TimeUtils.timeNow();
329         
330         // Attach the flowControl module
331         this.fc = fc;
332         
333         // Update our initial rwindow to reflect fc's initial value
334         this.rwindow = fc.getRwindow();
335     }
336     
337     /**
338      * {@inheritDoc}
339      */
340     @Override
341     public void close() throws IOException {
342         flush();
343         
344         super.close();
345         localClosed = true;
346         closedAt = TimeUtils.toRelativeTimeMillis(lingerDelay);
347         
348         synchronized (retrQ) {
349             retrQ.notifyAll();
350         }
351         
352         if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
353             LOG.info("Closed.");
354         }
355     }
356     
357     public long getLingerDelay() {
358         return lingerDelay;
359     }
360     
361     public void setLingerDelay(long linger) {
362         if (linger < 0) {
363             throw new IllegalArgumentException("Linger delay may not be negative.");
364         }
365         
366         if (0 == linger) {
367             linger = Long.MAX_VALUE;
368         }
369         
370         lingerDelay = linger;
371     }
372     
373     /**
374      * Return the size of the buffers we are using for accumulating writes.
375      *
376      * @return size of our write buffers.
377      */
378     public int setSendBufferSize() {
379         return writeBufferSize;
380     }
381     
382     /**
383      * Set the size of the buffers we will use for accumulating writes.
384      *
385      * @param size The desired size of write buffers.
386      * @throws IOException if an I/O error occurs. In particular, an IOException is thrown if the output stream is closed.
387      */
388     public void setSendBufferSize(int size) throws IOException {
389         if (size <= 0) {
390             throw new IllegalArgumentException("Send buffer size may not be <= 0");
391         }
392         
393         // Flush any existing buffered writes. Then next write will use the new buffer size.
394         synchronized (writeLock) {
395             flushBuffer();
396             writeBufferSize = size;
397         }
398     }
399     
400     /**
401      * We have received a close request from the remote peer. We must stop
402      * retransmissions immediately.
403      */
404     public void hardClose() {
405         remoteClosed = true;
406         closedAt = TimeUtils.timeNow();
407         
408         // Clear the retry queue. Remote side doesn't care.
409         synchronized (retrQ) {
410             retrQ.clear();
411             retrQ.notifyAll();
412         }
413         
414         // Clear the write queue. Remote side doesn't care.
415         synchronized (writeLock) {
416             writeCount = 0;
417             writeBuffer = null;
418             writeBufferAge = Long.MAX_VALUE;
419         }
420         
421         if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
422             LOG.info("Hard closed.");
423         }
424     }
425     
426     /**
427      * Returns the state of the stream
428      *
429      * @return true if closed
430      */
431     public boolean isClosed() {
432         return localClosed || remoteClosed;
433     }
434     
435     /**
436      * {@inheritDoc}
437      */
438     @Override
439     public void flush() throws IOException {
440         synchronized (writeLock) {
441             flushBuffer();
442         }
443     }
444     
445     /**
446      * {@inheritDoc}
447      */
448     @Override
449     public void write(int b) throws IOException {
450         write(new byte[] { (byte) b }, 0, 1);
451     }
452     
453     /**
454      * {@inheritDoc}
455      */
456     @Override
457     public void write(byte b[], int off, int len) throws IOException {
458         synchronized (writeLock) {
459             if (isClosed()) {
460                 throw new IOException("stream is closed");
461             }
462             
463             if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
464                 throw new IndexOutOfBoundsException();
465             }
466             
467             if (len == 0) {
468                 return;
469             }
470             
471             int current = off;
472             int end = current + len;
473             
474             while (current < end) {
475                 if (0 == writeCount) {
476                     // No bytes written? We need a new buffer.
477                     writeBuffer = new byte[writeBufferSize];
478                     writeBufferAge = TimeUtils.timeNow();
479                 }
480                 
481                 int remain = end - current;
482                 
483                 int available = writeBuffer.length - writeCount;
484                 int copy = Math.min(available, remain);
485                 
486                 System.arraycopy(b, current, writeBuffer, writeCount, copy);
487                 writeCount += copy;
488                 current += copy;
489                 
490                 if (writeBuffer.length == writeCount) {
491                     flushBuffer();
492                 }
493             }
494         }
495     }
496     
497     /**
498      * Flush the internal buffer. {@code writeLock} must have been previously
499      * acquired.
500      * @throws IOException if an I/O error occurs. In particular, an IOException is thrown if the output stream is closed.
501      */
502     private void flushBuffer() throws IOException {
503         if (writeCount > 0) {
504             // send the message
505             try {
506                 writeBuffer(writeBuffer, 0, writeCount);
507             } finally {
508                 writeCount = 0;
509                 writeBuffer = null;
510                 writeBufferAge = Long.MAX_VALUE;
511             }
512         }
513     }
514     
515     /**
516      * Write the internal buffer. {@code writeLock} must have been previously
517      * acquired.
518      *
519      * @param b data
520      * @param off  the start offset in the data.
521      * @param len     the number of bytes to write.
522      * @throws IOException if an I/O error occurs. In particular, an IOException is thrown if the output stream is closed.
523      */
524     private void writeBuffer(byte[] b, int off, int len) throws IOException {
525         if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
526             throw new IndexOutOfBoundsException();
527         }
528         
529         if (len == 0) {
530             return;
531         }
532         
533         if (null == retrThread) {
534             retrThread = new Thread(new Retransmitter(), "JXTA Reliable Retransmiter for " + this);
535             retrThread.setDaemon(true);
536             retrThread.start();
537         }
538         
539         // allocate new message
540         Message jmsg = new Message();
541
542         synchronized (retrQ) {
543             while (true) {
544                 if (isClosed()) {
545                     throw new IOException("Connection is " + (localClosed ? "closing" : "closed"));
546                 }
547                 if (retrQ.size() > Math.min(rwindow, mrrIQFreeSpace * 2)) {
548                     try {
549                         retrQ.wait(1000);
550                     } catch (InterruptedException ignored) {// ignored
551                     }
552                     continue;
553                 }
554                 break;
555             }
556             
557             int sequenceToUse = sequenceNumber.incrementAndGet();
558             MessageElement element = new ByteArrayMessageElement(Integer.toString(sequenceToUse), Defs.MIME_TYPE_BLOCK, b, off
559                     ,
560                     len, null);
561
562             jmsg.addMessageElement(Defs.NAMESPACE, element);
563             RetrQElt retrQel = new RetrQElt(sequenceToUse, jmsg.clone());
564             
565             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
566                 LOG.fine("Reliable WRITE : seqn#" + sequenceNumber + " length=" + len);
567             }
568             
569             // place copy on retransmission queue
570             retrQ.add(retrQel);
571             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
572                 LOG.fine("Retrans Enqueue added seqn#" + sequenceNumber + " retrQ.size()=" + retrQ.size());
573             }
574         }
575         
576         outgoing.send(jmsg);
577         mrrIQFreeSpace--;
578         // assume we have now taken a slot
579         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
580             LOG.fine("SENT : seqn#" + sequenceNumber + " length=" + len);
581         }
582     }
583     
584     /**
585      * Serialize a JXTA message as a reliable message.
586      *
587      * <p/>This method bypasses the built-in buffering and ignores the MTU size.
588      *
589      * @param msg message to send
590      * @return message sequence number
591      * @throws IOException if an I/O error occurs
592      */
593     public int send(Message msg) throws IOException {
594         WireFormatMessage msgSerialized = WireFormatMessageFactory.toWire(msg, Defs.MIME_TYPE_MSG, null);
595         ByteArrayOutputStream baos = new ByteArrayOutputStream((int) msgSerialized.getByteLength());
596
597         msgSerialized.sendToStream(baos);
598         baos.close();
599         byte[] bytes = baos.toByteArray();
600         
601         synchronized (writeLock) {
602             flushBuffer();
603             writeBuffer(bytes, 0, bytes.length);
604             return sequenceNumber.get();
605         }
606     }
607     
608     /**
609      * Gets the maxAck attribute of the ReliableOutputStream object
610      *
611      * @return The maxAck value
612      */
613     public int getMaxAck() {
614         return maxACK;
615     }
616     
617     /**
618      * Gets the seqNumber attribute of the ReliableOutputStream object
619      *
620      * @return The seqNumber value
621      */
622     public int getSeqNumber() {
623         return sequenceNumber.get();
624     }
625     
626     /**
627      * Gets the queueFull attribute of the ReliableOutputStream object
628      *
629      * @return The queueFull value
630      */
631     protected boolean isQueueFull() {
632         return mrrIQFreeSpace < 1;
633     }
634     
635     /**
636      * Gets the queueEmpty attribute of the ReliableOutputStream object.
637      *
638      * @return {@code true} if the queue is empty otherwise {@code false}.
639      */
640     public boolean isQueueEmpty() {
641         synchronized (retrQ) {
642             return retrQ.isEmpty();
643         }
644     }
645     
646     /**
647      * Waits for the retransmit queue to become empty.
648      *
649      * @param timeout The relative time in milliseconds to wait for the queue to
650      *                become empty.
651      * @return {@code true} if the queue is empty otherwise {@code false}.
652      * @throws InterruptedException if interrupted
653      */
654     public boolean waitQueueEmpty(long timeout) throws InterruptedException {
655         long timeoutAt = TimeUtils.toAbsoluteTimeMillis(timeout);
656         
657         synchronized (retrQ) {
658             while (!retrQ.isEmpty() && (TimeUtils.timeNow() < timeoutAt)) {
659                 long sleepTime = TimeUtils.toRelativeTimeMillis(timeoutAt);
660                 
661                 if (sleepTime > 0) {
662                     retrQ.wait(sleepTime);
663                 }
664             }
665             
666             return retrQ.isEmpty();
667         }
668     }
669     
670     /**
671      * wait for activity on the retry queue
672      *
673      * @param timeout timeout in millis
674      * @throws InterruptedException when interrupted
675      */
676     public void waitQueueEvent(long timeout) throws InterruptedException {
677         synchronized (retrQ) {
678             retrQ.wait(timeout);
679         }
680     }
681     
682     /**
683      * Calculates a message retransmission time-out
684      *
685      * @param dt        base time
686      * @param msgSeqNum Message sequence number
687      */
688     private void calcRTT(long dt, int msgSeqNum) {
689         
690         if (numACKS.incrementAndGet() == 1) {
691             // First ACK arrived. We can start computing aveRTT on the messages
692             // we send from now on.
693             rttThreshold = sequenceNumber.get() + 1;
694         }
695         
696         if (msgSeqNum > rttThreshold) {
697             // Compute only when it has stabilized a bit
698             // Since the initial mrrIQFreeSpace is small; the first few
699             // messages will be sent early on and may wait a long time
700             // for the return channel to initialize. After that things
701             // start flowing and RTT becomes relevant.
702             // Carefull with the computation: integer division with round-down
703             // causes cumulative damage: the ave never goes up if this is not
704             // taken care of. We keep the reminder from one round to the other.
705             
706             if (!aveRTTreset) {
707                 aveRTT = dt;
708                 aveRTTreset = true;
709             } else {
710                 long tmp = (8 * aveRTT) + ((8 * remRTT) / 9) + dt;
711
712                 aveRTT = tmp / 9;
713                 remRTT = tmp - aveRTT * 9;
714             }
715         }
716         
717         // Set retransmission time out: 2.5 x RTT
718         // RTO = (aveRTT << 1) + (aveRTT >> 1);
719         RTO = aveRTT * 2;
720         
721         // Enforce a min/max
722         RTO = Math.max(RTO, minRTO);
723         RTO = Math.min(RTO, maxRTO);
724         
725         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
726             LOG.fine("RTT = " + dt + "ms aveRTT = " + aveRTT + "ms" + " RTO = " + RTO + "ms");
727         }
728     }
729     
730     /**
731      * @param iq Description of the Parameter
732      * @return Description of the Return Value
733      */
734     private int calcAVEIQ(int iq) {
735         int n = nIQTests;
736
737         nIQTests += 1;
738         aveIQSize = ((n * aveIQSize) + iq) / nIQTests;
739         return aveIQSize;
740     }
741     
742     /**
743      * process an incoming message
744      *
745      * @param msg message to process
746      */
747     public void recv(Message msg) {
748         
749         Iterator<MessageElement> eachACK = msg.getMessageElements(Defs.NAMESPACE, Defs.MIME_TYPE_ACK);
750         
751         while (eachACK.hasNext()) {
752             MessageElement elt = eachACK.next();
753
754             eachACK.remove();
755             int sackCount = ((int) elt.getByteLength() / 4) - 1;
756             
757             try {
758                 DataInputStream dis = new DataInputStream(elt.getStream());
759                 int seqack = dis.readInt();
760                 int[] sacs = new int[sackCount];
761
762                 for (int eachSac = 0; eachSac < sackCount; eachSac++) {
763                     sacs[eachSac] = dis.readInt();
764                 }
765                 Arrays.sort(sacs);
766                 // take care of the ACK here;
767                 ackReceived(seqack, sacs);
768             } catch (IOException failed) {
769                 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
770                     LOG.log(Level.WARNING, "Failure processing ACK", failed);
771                 }
772             }
773         }
774     }
775     
776     /**
777      * Process an ACK Message. We remove ACKed
778      * messages from the retry queue.  We only
779      * acknowledge messages received in sequence.
780      * <p/>
781      * The seqnum is for the largest unacknowledged seqnum
782      * the recipient has received.
783      * <p/>
784      * The sackList is a sequence of all of the
785      * received messages in the sender's input Q. All
786      * will be sequence numbers higher than the
787      * sequential ACK seqnum.
788      * <p/>
789      * Recipients are passive and only ack upon the
790      * receipt of an in sequence message.
791      * <p/>
792      * They depend on our RTO to fill holes in message
793      * sequences.
794      *
795      * @param seqnum   message sequence number
796      * @param sackList array of message sequence numbers
797      */
798     public void ackReceived(int seqnum, int[] sackList) {
799         
800         int numberACKed = 0;
801         long rttCalcDt = 0;
802         int rttCalcSeqnum = -1;
803         long fallBackDt = 0;
804         int fallBackSeqnum = -1;
805         
806         // remove acknowledged messages from retrans Q.
807         synchronized (retrQ) {
808             lastACKTime = TimeUtils.timeNow();
809             fc.ackEventBegin();
810             maxACK = Math.max(maxACK, seqnum);
811             
812             // dump the current Retry queue and the SACK list
813             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
814                 StringBuilder dumpRETRQ = new StringBuilder("ACK RECEIVE : " + Integer.toString(seqnum));
815                 
816                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
817                     dumpRETRQ.append('\n');
818                 }
819                 dumpRETRQ.append("\tRETRQ (size=").append(retrQ.size()).append(")");
820                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
821                     dumpRETRQ.append(" : ");
822                     for (int y = 0; y < retrQ.size(); y++) {
823                         if (0 != y) {
824                             dumpRETRQ.append(", ");
825                         }
826                         RetrQElt r = retrQ.get(y);
827                         
828                         dumpRETRQ.append(r.seqnum);
829                     }
830                 }
831                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
832                     dumpRETRQ.append('\n');
833                 }
834                 dumpRETRQ.append("\tSACKLIST (size=").append(sackList.length).append(")");
835                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
836                     dumpRETRQ.append(" : ");
837                     for (int y = 0; y < sackList.length; y++) {
838                         if (0 != y) {
839                             dumpRETRQ.append(", ");
840                         }
841                         dumpRETRQ.append(sackList[y]);
842                     }
843                 }
844                 LOG.fine(dumpRETRQ.toString());
845             }
846             
847             Iterator<RetrQElt> eachRetryQueueEntry = retrQ.iterator();
848
849             // First remove monotonically increasing seq#s in retrans vector
850             while (eachRetryQueueEntry.hasNext()) {
851                 RetrQElt retrQElt = eachRetryQueueEntry.next();
852
853                 if (retrQElt.seqnum > seqnum) {
854                     break;
855                 }
856                 // Acknowledged
857                 eachRetryQueueEntry.remove();
858                 
859                 // Update RTT, RTO. Use only those that where acked
860                 // w/o retrans otherwise the number may be phony (ack
861                 // of first xmit received just after resending => RTT
862                 // seems small).  Also, we keep the worst of the bunch
863                 // we encounter.  If we really can't find a single
864                 // non-resent message, we make do with a pessimistic
865                 // approximation: we must not be left behind with an
866                 // RTT that's too short, we'd keep resending like
867                 // crazy.
868                 long enqueuetime = retrQElt.enqueuedAt;
869                 long dt = TimeUtils.toRelativeTimeMillis(lastACKTime, enqueuetime);
870
871                 // Update RTT, RTO
872                 if (retrQElt.marked == 0) {
873                     if (dt > rttCalcDt) {
874                         rttCalcDt = dt;
875                         rttCalcSeqnum = retrQElt.seqnum;
876                     }
877                 } else {
878                     // In case we find no good candidate, make
879                     // a guess by dividing by the number of attempts
880                     // and keep the worst of them too. Since we
881                     // know it may be too short, we will not use it
882                     // if shortens rtt.
883                     dt /= (retrQElt.marked + 1);
884                     if (dt > fallBackDt) {
885                         fallBackDt = dt;
886                         fallBackSeqnum = retrQElt.seqnum;
887                     }
888                 }
889                 fc.packetACKed(retrQElt.seqnum);
890                 retrQElt = null;
891                 numberACKed++;
892             }
893             // Update last accessed time in response to getting seq acks.
894             if (numberACKed > 0) {
895                 outgoing.setLastAccessed(TimeUtils.timeNow());
896             }
897             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
898                 LOG.fine("SEQUENTIALLY ACKD SEQN = " + seqnum + ", (" + numberACKed + " acked)");
899             }
900             // most recent remote IQ free space
901             mrrIQFreeSpace = rmaxQSize - sackList.length;
902             // let's look at average sacs.size(). If it is big, then this
903             // probably means we must back off because the system is slow.
904             // Our retrans Queue can be large and we can overwhelm the
905             // receiver with retransmissions.
906             // We will keep the rwin <= ave real input queue size.
907             int aveIQ = calcAVEIQ(sackList.length);
908
909             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
910                 LOG.fine("remote IQ free space = " + mrrIQFreeSpace + " remote avg IQ occupancy = " + aveIQ);
911             }
912             
913             int retrans = 0;
914
915             if (sackList.length > 0) {
916                 Iterator<RetrQElt> eachRetrQElement = retrQ.iterator();
917                 int currentSACK = 0;
918
919                 while (eachRetrQElement.hasNext()) {
920                     RetrQElt retrQElt = eachRetrQElement.next();
921
922                     while (sackList[currentSACK] < retrQElt.seqnum) {
923                         currentSACK++;
924                         if (currentSACK == sackList.length) {
925                             break;
926                         }
927                     }
928                     if (currentSACK == sackList.length) {
929                         break;
930                     }
931                     if (sackList[currentSACK] == retrQElt.seqnum) {
932                         fc.packetACKed(retrQElt.seqnum);
933                         numberACKed++;
934                         eachRetrQElement.remove();
935                         
936                         // Update RTT, RTO. Use only those that where acked w/o retrans
937                         // otherwise the number is completely phony.
938                         // Also, we keep the worst of the bunch we encounter.
939                         long enqueuetime = retrQElt.enqueuedAt;
940                         long dt = TimeUtils.toRelativeTimeMillis(lastACKTime, enqueuetime);
941
942                         // Update RTT, RTO
943                         if (retrQElt.marked == 0) {
944                             if (dt > rttCalcDt) {
945                                 rttCalcDt = dt;
946                                 rttCalcSeqnum = retrQElt.seqnum;
947                             }
948                         } else {
949                             // In case we find no good candidate, make
950                             // a guess by dividing by the number of attempts
951                             // and keep the worst of them too. Since we
952                             // know it may be too short, we will not use it
953                             // if shortens rtt.
954                             dt /= (retrQElt.marked + 1);
955                             if (dt > fallBackDt) {
956                                 fallBackDt = dt;
957                                 fallBackSeqnum = retrQElt.seqnum;
958                             }
959                         }
960                         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
961                             LOG.fine("SACKD SEQN = " + retrQElt.seqnum);
962                         }
963                         
964                         // GC this stuff
965                         retrQElt = null;
966                         
967                     } else {
968                         // Retransmit? Only if there is a hole in the selected
969                         // acknowledgement list. Otherwise let RTO deal.
970                         
971                         // Given that this SACK acknowledged messages still
972                         // in the retrQ:
973                         // seqnum is the max consectively SACKD message.
974                         // seqnum < retrQElt.seqnum means a message has not reached
975                         // receiver. EG: sacklist == 10,11,13 seqnum == 11
976                         // We retransmit 12.
977                         if (seqnum < retrQElt.seqnum) {
978                             fc.packetMissing(retrQElt.seqnum);
979                             retrans++;
980                             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
981                                 LOG.fine("RETR: Fill hole, SACK, seqn#" + retrQElt.seqnum + ", Window =" + retrans);
982                             }
983                         }
984                     }
985                 }
986                 
987                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
988                     LOG.fine("SELECTIVE ACKD (" + numberACKed + ") " + retrans + " retrans wanted");
989                 }
990             }
991             
992             // Compute aveRTT on the most representative message,
993             // if any. That's the most accurate data.
994             // Failing that we use the fall back, provided that it not
995             // more recent than aveRTT ago - that would decrease aveRTT
996             // and in the absence of solid data, we do not want to take
997             // that risk.
998             if (rttCalcSeqnum != -1) {
999                 calcRTT(rttCalcDt, rttCalcSeqnum);
1000                 // get fc to recompute rwindow
1001                 rwindow = fc.ackEventEnd(rmaxQSize, aveRTT, rttCalcDt);
1002             } else if ((fallBackSeqnum != -1) && (fallBackDt > aveRTT)) {
1003                 calcRTT(fallBackDt, fallBackSeqnum);
1004                 // get fc to recompute rwindow
1005                 rwindow = fc.ackEventEnd(rmaxQSize, aveRTT, fallBackDt);
1006             }
1007             retrQ.notifyAll();
1008         }
1009     }
1010     
1011     /**
1012      * retransmit unacknowledged  messages
1013      *
1014      * @param rwin        max number of messages to retransmit
1015      * @param triggerTime base time
1016      * @return number of messages retransmitted.
1017      */
1018     private int retransmit(int rwin, long triggerTime) {
1019         
1020         List<RetrQElt> retransMsgs = new ArrayList<RetrQElt>();
1021         
1022         int numberToRetrans;
1023         
1024         // build a list of retries.
1025         synchronized (retrQ) {
1026             numberToRetrans = Math.min(retrQ.size(), rwin);
1027             if (numberToRetrans > 0 && Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1028                 LOG.fine("Number of messages pending retransmit =" + numberToRetrans);
1029             }
1030             for (int j = 0; j < numberToRetrans; j++) {
1031                 RetrQElt r = retrQ.get(j);
1032
1033                 // Mark message as retransmission
1034                 // need to know if a msg was retr or not for RTT eval
1035                 if (r.marked == 0) {
1036                     // First time: we're here because this message has not arrived, but
1037                     // the next one has. It may be an out of order message.
1038                     // Experience shows that such a message rarely arrives older than
1039                     // 1.2 * aveRTT. Beyond that, it's lost. It is also rare that we
1040                     // detect a hole within that delay. So, often enough, as soon as
1041                     // a hole is detected, it's time to resend...but not always.
1042                     if (TimeUtils.toRelativeTimeMillis(triggerTime, r.sentAt) < (6 * aveRTT) / 5) {
1043                         // Nothing to worry about, yet.
1044                         continue;
1045                     }
1046                 } else {
1047                     // That one has been retransmitted at least once already.
1048                     // So, we don't have much of a clue other than the age of the
1049                     // last transmission. It is unlikely that it arrives before aveRTT/2
1050                     // but we have to anticipate its loss at the risk of making dupes.
1051                     // Otherwise the receiver will reach the hole, and that's really
1052                     // expensive. (Think that we've been trying for a while already.)
1053                     
1054                     if (TimeUtils.toRelativeTimeMillis(triggerTime, r.sentAt) < aveRTT) {
1055                         // Nothing to worry about, yet.
1056                         continue;
1057                     }
1058                 }
1059                 r.marked++;
1060                 // Make a copy to for sending
1061                 retransMsgs.add(r);
1062             }
1063         }
1064         
1065         // send the retries.
1066         int retransmitted = 0;
1067         Iterator<RetrQElt> eachRetrans = retransMsgs.iterator();
1068
1069         while (eachRetrans.hasNext()) {
1070             RetrQElt r = eachRetrans.next();
1071
1072             eachRetrans.remove();
1073             try {
1074                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1075                     LOG.fine("RETRANSMIT seqn#" + r.seqnum);
1076                 }
1077                 Message sending = r.msg;
1078
1079                 // its possible that the message was
1080                 // acked while we were working in this
1081                 // case r.msg will have been nulled.
1082                 if (null != sending) {
1083                     sending = sending.clone();
1084                     sending.replaceMessageElement(Defs.NAMESPACE, RETELT);
1085                     if (outgoing.send(sending)) {
1086                         r.sentAt = TimeUtils.timeNow();
1087                         mrrIQFreeSpace--;
1088                         // assume we have now taken a slot
1089                         retransmitted++;
1090                     } else {
1091                         break;
1092                         // don't bother continuing sending now.
1093                     }
1094                 }
1095             } catch (IOException e) {
1096                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1097                     LOG.log(Level.FINE, "FAILED RETRANS seqn#" + r.seqnum, e);
1098                 }
1099                 break;
1100                 // don't bother continuing.
1101             }
1102         }
1103         
1104         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1105             LOG.fine("RETRANSMITED " + retransmitted + " of " + numberToRetrans);
1106         }
1107         
1108         return retransmitted;
1109     }
1110     
1111     /**
1112      * Retransmission daemon thread
1113      */
1114     private class Retransmitter implements Runnable {
1115         
1116         int nAtThisRTO = 0;
1117         volatile int nretransmitted = 0;
1118         
1119         /**
1120          * Constructor for the Retransmitter object
1121          */
1122         public Retransmitter() {
1123             if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
1124                 LOG.info("STARTED Reliable Retransmitter, RTO = " + RTO);
1125             }
1126         }
1127         
1128         /**
1129          * Gets the retransCount attribute of the Retransmitter object
1130          *
1131          * @return The retransCount value
1132          */
1133         public int getRetransCount() {
1134             return nretransmitted;
1135         }
1136         
1137         /**
1138          *  {@inheritDoc}
1139          *
1140          *  <p/>Main processing method for the Retransmitter object
1141          */
1142         public void run() {
1143             try {
1144                 int idleCounter = 0;
1145                 
1146                 while (TimeUtils.toRelativeTimeMillis(closedAt) > 0) {
1147                     long conn_idle = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), outgoing.getLastAccessed());
1148
1149                     if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1150                         LOG.fine(outgoing + " idle for " + conn_idle);
1151                     }
1152                     
1153                     // check to see if we have not idled out.
1154                     if (outgoing.getIdleTimeout() < conn_idle) {
1155                         if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
1156                             LOG.info("Shutting down idle " + "connection " + outgoing);
1157                         }
1158                         
1159                         break;
1160                     }
1161                     
1162                     long sinceLastACK;
1163                     long oldestInQueueWait;
1164                     
1165                     synchronized (retrQ) {
1166                         try {
1167                             if (RTO > 0) {
1168                                 retrQ.wait(RTO);
1169                             }
1170                             Thread.currentThread().setName(
1171                                     "JXTA Reliable Retransmiter for " + this + " Queue size : " + retrQ.size());
1172                         } catch (InterruptedException e) {// ignored
1173                         }
1174                         
1175                         if (TimeUtils.toRelativeTimeMillis(closedAt) <= 0) {
1176                             break;
1177                         }
1178                         
1179                         // see if we recently did a retransmit triggered by a SACK
1180                         long sinceLastSACKRetr = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), sackRetransTime);
1181
1182                         if (sinceLastSACKRetr < RTO) {
1183                             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1184                                 LOG.fine("SACK retrans " + sinceLastSACKRetr + "ms ago");
1185                             }
1186                             continue;
1187                         }
1188                         // See how long we've waited since RTO was set
1189                         sinceLastACK = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), lastACKTime);
1190                         
1191                         if (!retrQ.isEmpty()) {
1192                             RetrQElt elt = retrQ.get(0);
1193
1194                             oldestInQueueWait = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), elt.enqueuedAt);
1195                         } else {
1196                             oldestInQueueWait = 0;
1197                         }
1198                     }
1199                     
1200                     if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1201                         LOG.fine("Last ACK " + sinceLastACK + "ms ago. Age of oldest in Queue " + oldestInQueueWait + "ms.");
1202                     }
1203                     
1204                     // see if the queue has gone dead
1205                     if (oldestInQueueWait > outgoing.getMaxRetryAge()) {
1206                         if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
1207                             LOG.info("Shutting down stale connection " + outgoing);
1208                         }
1209                         
1210                         break;
1211                     }
1212                     
1213                     // get real wait as max of age of oldest in retrQ and
1214                     // lastAck time
1215                     long realWait = Math.max(oldestInQueueWait, sinceLastACK);
1216
1217                     // Retransmit only if RTO has expired.
1218                     // a. real wait time is longer than RTO
1219                     // b. oldest message on Q has been there longer
1220                     // than RTO. This is necessary because we may
1221                     // have just sent a message, and we do not
1222                     // want to overrun the receiver. Also, we
1223                     // do not want to restransmit a message that
1224                     // has not been idle for the RTO.
1225                     if ((realWait >= RTO) && (oldestInQueueWait >= RTO)) {
1226                         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1227                             LOG.fine("RTO RETRANSMISSION [" + rwindow + "]");
1228                         }
1229                         // retransmit
1230                         int retransed = retransmit(rwindow, TimeUtils.timeNow());
1231
1232                         // Total
1233                         nretransmitted += retransed;
1234                         // number at this RTO
1235                         nAtThisRTO += retransed;
1236                         // See if real wait is too long and queue is non-empty
1237                         // Remote may be dead - double until max.
1238                         // Double after window restransmitted msgs at this RTO
1239                         // exceeds the rwindow, and we've had no response for
1240                         // twice the current RTO.
1241                         if ((retransed > 0) && (realWait >= 2 * RTO) && (nAtThisRTO >= 2 * rwindow)) {
1242                             RTO = (realWait > maxRTO ? maxRTO : 2 * RTO);
1243                             nAtThisRTO = 0;
1244                         }
1245                         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1246                             LOG.fine(
1247                                     "RETRANSMISSION " + retransed + " retrans " + nAtThisRTO + " at this RTO (" + RTO + ") "
1248                                     + nretransmitted + " total retrans");
1249                         }
1250                     } else {
1251                         idleCounter += 1;
1252                         // reset RTO to min if we are idle
1253                         if (idleCounter == 2) {
1254                             RTO = minRTO;
1255                             idleCounter = 0;
1256                             nAtThisRTO = 0;
1257                         }
1258                         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1259                             LOG.fine("IDLE : RTO=" + RTO + " WAIT=" + realWait);
1260                         }
1261                     }
1262                 }
1263             } catch (Throwable all) {
1264                 LOG.log(Level.SEVERE, "Uncaught Throwable in thread :" + Thread.currentThread().getName(), all);
1265             } finally {
1266                 hardClose();
1267                 
1268                 retrThread = null;
1269                 
1270                 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
1271                     LOG.info("STOPPED Retransmit thread");
1272                 }
1273             }
1274         }
1275     }
1276 }
1277