2 * Copyright (c) 2003-2007 Sun Microsystems, Inc. All rights reserved.
4 * The Sun Project JXTA(TM) Software License
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
9 * 1. Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
16 * 3. The end-user documentation included with the redistribution, if any, must
17 * include the following acknowledgment: "This product includes software
18 * developed by Sun Microsystems, Inc. for JXTA(TM) technology."
19 * Alternately, this acknowledgment may appear in the software itself, if
20 * and wherever such third-party acknowledgments normally appear.
22 * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must
23 * not be used to endorse or promote products derived from this software
24 * without prior written permission. For written permission, please contact
25 * Project JXTA at http://www.jxta.org.
27 * 5. Products derived from this software may not be called "JXTA", nor may
28 * "JXTA" appear in their name, without prior written permission of Sun.
30 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
31 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
32 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN
33 * MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
34 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
36 * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
37 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
38 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
39 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41 * JXTA is a registered trademark of Sun Microsystems, Inc. in the United
42 * States and other countries.
44 * Please see the license information page at :
45 * <http://www.jxta.org/project/www/license.html> for instructions on use of
46 * the license in source files.
48 * ====================================================================
50 * This software consists of voluntary contributions made by many individuals
51 * on behalf of Project JXTA. For more information on Project JXTA, please see
52 * http://www.jxta.org.
54 * This license is based on the BSD license adopted by the Apache Foundation.
57 package net.jxta.impl.util.pipe.reliable;
60 import java.io.ByteArrayOutputStream;
61 import net.jxta.endpoint.ByteArrayMessageElement;
62 import net.jxta.endpoint.Message;
63 import net.jxta.endpoint.MessageElement;
64 import net.jxta.endpoint.StringMessageElement;
65 import net.jxta.endpoint.WireFormatMessage;
66 import net.jxta.endpoint.WireFormatMessageFactory;
67 import net.jxta.impl.util.TimeUtils;
68 import net.jxta.logging.Logging;
70 import java.io.DataInputStream;
71 import java.io.IOException;
72 import java.io.OutputStream;
73 import java.util.ArrayList;
74 import java.util.Arrays;
75 import java.util.Iterator;
76 import java.util.List;
77 import java.util.concurrent.atomic.AtomicInteger;
78 import java.util.logging.Level;
79 import java.util.logging.Logger;
83 * Accepts data and packages it into messages for sending to the remote. The
84 * messages are kept in a retry queue until the remote peer acknowledges
85 * receipt of the message.
87 public class ReliableOutputStream extends OutputStream implements Incoming {
92 private final static Logger LOG = Logger.getLogger(ReliableOutputStream.class.getName());
95 * Initial estimated Round Trip Time
97 private final static long initRTT = 10 * TimeUtils.ASECOND;
100 * The default size for the blocks we will chunk the stream into.
102 private final static int DEFAULT_MESSAGE_CHUNK_SIZE = 63 * 1024;
104 private final static MessageElement RETELT = new StringMessageElement(Defs.RETRY_ELEMENT_NAME, Defs.RETRY_ELEMENT_VALUE, null);
107 * A lock we use to ensure that write operations happen in order.
109 private final Object writeLock = new String("writeLock");
112 * The buffer we cache writes to.
114 private byte[] writeBuffer = null;
117 * Number of bytes written to the write buffer.
119 private int writeCount = 0;
122 * Set the default write buffer size.
124 private int writeBufferSize = DEFAULT_MESSAGE_CHUNK_SIZE;
127 * Absolute time in milliseconds at which the write buffer began
128 * accumulating bytes to be written.
130 private long writeBufferAge = Long.MAX_VALUE;
133 * If less than {@code TimeUtils.timenow()} then we are closed otherwise
134 * this is the absolute time at which we will become closed. We begin by
135 * setting this value as {@Long.MAX_VALUE} until we establish an earlier
138 private long closedAt = Long.MAX_VALUE;
141 * If {@code true} then we have received a close request from the remote
142 * side. They do not want to receive any more messages from us.
144 private volatile boolean remoteClosed = false;
147 * If {@code true} then we have closed this stream locally and will not
148 * accept any further messages for sending. Unacknowledged messages will
149 * be retransmitted until the linger delay is passed.
151 private volatile boolean localClosed = false;
154 * The relative time in milliseconds that we will allow our connection to
157 private long lingerDelay = 120 * TimeUtils.ASECOND;
160 * Sequence number of the message we most recently sent out.
162 private AtomicInteger sequenceNumber = new AtomicInteger(0);
165 * Sequence number of highest sequential ACK.
167 private volatile int maxACK = 0;
170 * connection we are working for
172 private final Outgoing outgoing;
175 * The daemon thread that performs retransmissions.
177 private Thread retrThread = null;
179 // for retransmission
181 * Average round trip time in milliseconds.
183 private volatile long aveRTT = initRTT;
184 private volatile long remRTT = 0;
187 * Has aveRTT been set at least once over its initial guesstimate value.
189 private boolean aveRTTreset = false;
192 * Number of ACK message received.
194 private AtomicInteger numACKS = new AtomicInteger(0);
197 * When to start computing aveRTT
199 private int rttThreshold = 0;
202 * Retry Time Out measured in milliseconds.
204 private volatile long RTO = 0;
207 * Minimum Retry Timeout measured in milliseconds.
209 private volatile long minRTO = initRTT * 5;
212 * Maximum Retry Timeout measured in milliseconds.
214 private volatile long maxRTO = initRTT * 60;
217 * absolute time in milliseconds of last sequential ACK.
219 private volatile long lastACKTime = 0;
222 * absolute time in milliseconds of last SACK based retransmit.
224 private volatile long sackRetransTime = 0;
226 // running average of receipients Input Queue
227 private int nIQTests = 0;
228 private int aveIQSize = 0;
231 * Our estimation of the current free space in the remote input queue.
233 private volatile int mrrIQFreeSpace = 0;
236 * Our estimation of the maximum size of the remote input queue.
238 private int rmaxQSize = Defs.MAXQUEUESIZE;
241 * The flow control module.
243 private final FlowControl fc;
246 * Cache of the last rwindow recommendation by fc.
248 private volatile int rwindow = 0;
251 * retrans queue element
253 private static class RetrQElt {
256 * sequence number of this message.
266 * absolute time of original enqueuing
268 final long enqueuedAt;
271 * has been marked as retransmission
276 * absolute time when this msg was last transmitted
281 * Constructor for the RetrQElt object
283 * @param seqnum sequence number
284 * @param msg the message
286 public RetrQElt(int seqnum, Message msg) {
287 this.seqnum = seqnum;
289 this.enqueuedAt = TimeUtils.timeNow();
290 this.sentAt = this.enqueuedAt;
296 * The collection of messages available for re-transmission.
298 protected final List<RetrQElt> retrQ = new ArrayList<RetrQElt>();
301 * Constructor for the ReliableOutputStream object
303 * @param outgoing the outgoing object
305 public ReliableOutputStream(Outgoing outgoing) {
306 // By default use the old behaviour: fixed fc with a rwin of 20
307 this(outgoing, new FixedFlowControl(20));
311 * Constructor for the ReliableOutputStream object
313 * @param outgoing the outgoing object
314 * @param fc flow-control
316 public ReliableOutputStream(Outgoing outgoing, FlowControl fc) {
317 this.outgoing = outgoing;
319 // initial RTO is set to maxRTO so as to give time
320 // to the receiver to catch-up
323 this.mrrIQFreeSpace = rmaxQSize;
324 this.rttThreshold = rmaxQSize;
326 // Init last ACK Time to now
327 this.lastACKTime = TimeUtils.timeNow();
328 this.sackRetransTime = TimeUtils.timeNow();
330 // Attach the flowControl module
333 // Update our initial rwindow to reflect fc's initial value
334 this.rwindow = fc.getRwindow();
341 public void close() throws IOException {
346 closedAt = TimeUtils.toRelativeTimeMillis(lingerDelay);
348 synchronized (retrQ) {
352 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
357 public long getLingerDelay() {
361 public void setLingerDelay(long linger) {
363 throw new IllegalArgumentException("Linger delay may not be negative.");
367 linger = Long.MAX_VALUE;
370 lingerDelay = linger;
374 * Return the size of the buffers we are using for accumulating writes.
376 * @return size of our write buffers.
378 public int setSendBufferSize() {
379 return writeBufferSize;
383 * Set the size of the buffers we will use for accumulating writes.
385 * @param size The desired size of write buffers.
386 * @throws IOException if an I/O error occurs. In particular, an IOException is thrown if the output stream is closed.
388 public void setSendBufferSize(int size) throws IOException {
390 throw new IllegalArgumentException("Send buffer size may not be <= 0");
393 // Flush any existing buffered writes. Then next write will use the new buffer size.
394 synchronized (writeLock) {
396 writeBufferSize = size;
401 * We have received a close request from the remote peer. We must stop
402 * retransmissions immediately.
404 public void hardClose() {
406 closedAt = TimeUtils.timeNow();
408 // Clear the retry queue. Remote side doesn't care.
409 synchronized (retrQ) {
414 // Clear the write queue. Remote side doesn't care.
415 synchronized (writeLock) {
418 writeBufferAge = Long.MAX_VALUE;
421 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
422 LOG.info("Hard closed.");
427 * Returns the state of the stream
429 * @return true if closed
431 public boolean isClosed() {
432 return localClosed || remoteClosed;
439 public void flush() throws IOException {
440 synchronized (writeLock) {
449 public void write(int b) throws IOException {
450 write(new byte[] { (byte) b }, 0, 1);
457 public void write(byte b[], int off, int len) throws IOException {
458 synchronized (writeLock) {
460 throw new IOException("stream is closed");
463 if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
464 throw new IndexOutOfBoundsException();
472 int end = current + len;
474 while (current < end) {
475 if (0 == writeCount) {
476 // No bytes written? We need a new buffer.
477 writeBuffer = new byte[writeBufferSize];
478 writeBufferAge = TimeUtils.timeNow();
481 int remain = end - current;
483 int available = writeBuffer.length - writeCount;
484 int copy = Math.min(available, remain);
486 System.arraycopy(b, current, writeBuffer, writeCount, copy);
490 if (writeBuffer.length == writeCount) {
498 * Flush the internal buffer. {@code writeLock} must have been previously
500 * @throws IOException if an I/O error occurs. In particular, an IOException is thrown if the output stream is closed.
502 private void flushBuffer() throws IOException {
503 if (writeCount > 0) {
506 writeBuffer(writeBuffer, 0, writeCount);
510 writeBufferAge = Long.MAX_VALUE;
516 * Write the internal buffer. {@code writeLock} must have been previously
520 * @param off the start offset in the data.
521 * @param len the number of bytes to write.
522 * @throws IOException if an I/O error occurs. In particular, an IOException is thrown if the output stream is closed.
524 private void writeBuffer(byte[] b, int off, int len) throws IOException {
525 if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
526 throw new IndexOutOfBoundsException();
533 if (null == retrThread) {
534 retrThread = new Thread(new Retransmitter(), "JXTA Reliable Retransmiter for " + this);
535 retrThread.setDaemon(true);
539 // allocate new message
540 Message jmsg = new Message();
542 synchronized (retrQ) {
545 throw new IOException("Connection is " + (localClosed ? "closing" : "closed"));
547 if (retrQ.size() > Math.min(rwindow, mrrIQFreeSpace * 2)) {
550 } catch (InterruptedException ignored) {// ignored
557 int sequenceToUse = sequenceNumber.incrementAndGet();
558 MessageElement element = new ByteArrayMessageElement(Integer.toString(sequenceToUse), Defs.MIME_TYPE_BLOCK, b, off
562 jmsg.addMessageElement(Defs.NAMESPACE, element);
563 RetrQElt retrQel = new RetrQElt(sequenceToUse, jmsg.clone());
565 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
566 LOG.fine("Reliable WRITE : seqn#" + sequenceNumber + " length=" + len);
569 // place copy on retransmission queue
571 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
572 LOG.fine("Retrans Enqueue added seqn#" + sequenceNumber + " retrQ.size()=" + retrQ.size());
578 // assume we have now taken a slot
579 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
580 LOG.fine("SENT : seqn#" + sequenceNumber + " length=" + len);
585 * Serialize a JXTA message as a reliable message.
587 * <p/>This method bypasses the built-in buffering and ignores the MTU size.
589 * @param msg message to send
590 * @return message sequence number
591 * @throws IOException if an I/O error occurs
593 public int send(Message msg) throws IOException {
594 WireFormatMessage msgSerialized = WireFormatMessageFactory.toWire(msg, Defs.MIME_TYPE_MSG, null);
595 ByteArrayOutputStream baos = new ByteArrayOutputStream((int) msgSerialized.getByteLength());
597 msgSerialized.sendToStream(baos);
599 byte[] bytes = baos.toByteArray();
601 synchronized (writeLock) {
603 writeBuffer(bytes, 0, bytes.length);
604 return sequenceNumber.get();
609 * Gets the maxAck attribute of the ReliableOutputStream object
611 * @return The maxAck value
613 public int getMaxAck() {
618 * Gets the seqNumber attribute of the ReliableOutputStream object
620 * @return The seqNumber value
622 public int getSeqNumber() {
623 return sequenceNumber.get();
627 * Gets the queueFull attribute of the ReliableOutputStream object
629 * @return The queueFull value
631 protected boolean isQueueFull() {
632 return mrrIQFreeSpace < 1;
636 * Gets the queueEmpty attribute of the ReliableOutputStream object.
638 * @return {@code true} if the queue is empty otherwise {@code false}.
640 public boolean isQueueEmpty() {
641 synchronized (retrQ) {
642 return retrQ.isEmpty();
647 * Waits for the retransmit queue to become empty.
649 * @param timeout The relative time in milliseconds to wait for the queue to
651 * @return {@code true} if the queue is empty otherwise {@code false}.
652 * @throws InterruptedException if interrupted
654 public boolean waitQueueEmpty(long timeout) throws InterruptedException {
655 long timeoutAt = TimeUtils.toAbsoluteTimeMillis(timeout);
657 synchronized (retrQ) {
658 while (!retrQ.isEmpty() && (TimeUtils.timeNow() < timeoutAt)) {
659 long sleepTime = TimeUtils.toRelativeTimeMillis(timeoutAt);
662 retrQ.wait(sleepTime);
666 return retrQ.isEmpty();
671 * wait for activity on the retry queue
673 * @param timeout timeout in millis
674 * @throws InterruptedException when interrupted
676 public void waitQueueEvent(long timeout) throws InterruptedException {
677 synchronized (retrQ) {
683 * Calculates a message retransmission time-out
685 * @param dt base time
686 * @param msgSeqNum Message sequence number
688 private void calcRTT(long dt, int msgSeqNum) {
690 if (numACKS.incrementAndGet() == 1) {
691 // First ACK arrived. We can start computing aveRTT on the messages
692 // we send from now on.
693 rttThreshold = sequenceNumber.get() + 1;
696 if (msgSeqNum > rttThreshold) {
697 // Compute only when it has stabilized a bit
698 // Since the initial mrrIQFreeSpace is small; the first few
699 // messages will be sent early on and may wait a long time
700 // for the return channel to initialize. After that things
701 // start flowing and RTT becomes relevant.
702 // Carefull with the computation: integer division with round-down
703 // causes cumulative damage: the ave never goes up if this is not
704 // taken care of. We keep the reminder from one round to the other.
710 long tmp = (8 * aveRTT) + ((8 * remRTT) / 9) + dt;
713 remRTT = tmp - aveRTT * 9;
717 // Set retransmission time out: 2.5 x RTT
718 // RTO = (aveRTT << 1) + (aveRTT >> 1);
722 RTO = Math.max(RTO, minRTO);
723 RTO = Math.min(RTO, maxRTO);
725 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
726 LOG.fine("RTT = " + dt + "ms aveRTT = " + aveRTT + "ms" + " RTO = " + RTO + "ms");
731 * @param iq Description of the Parameter
732 * @return Description of the Return Value
734 private int calcAVEIQ(int iq) {
738 aveIQSize = ((n * aveIQSize) + iq) / nIQTests;
743 * process an incoming message
745 * @param msg message to process
747 public void recv(Message msg) {
749 Iterator<MessageElement> eachACK = msg.getMessageElements(Defs.NAMESPACE, Defs.MIME_TYPE_ACK);
751 while (eachACK.hasNext()) {
752 MessageElement elt = eachACK.next();
755 int sackCount = ((int) elt.getByteLength() / 4) - 1;
758 DataInputStream dis = new DataInputStream(elt.getStream());
759 int seqack = dis.readInt();
760 int[] sacs = new int[sackCount];
762 for (int eachSac = 0; eachSac < sackCount; eachSac++) {
763 sacs[eachSac] = dis.readInt();
766 // take care of the ACK here;
767 ackReceived(seqack, sacs);
768 } catch (IOException failed) {
769 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
770 LOG.log(Level.WARNING, "Failure processing ACK", failed);
777 * Process an ACK Message. We remove ACKed
778 * messages from the retry queue. We only
779 * acknowledge messages received in sequence.
781 * The seqnum is for the largest unacknowledged seqnum
782 * the recipient has received.
784 * The sackList is a sequence of all of the
785 * received messages in the sender's input Q. All
786 * will be sequence numbers higher than the
787 * sequential ACK seqnum.
789 * Recipients are passive and only ack upon the
790 * receipt of an in sequence message.
792 * They depend on our RTO to fill holes in message
795 * @param seqnum message sequence number
796 * @param sackList array of message sequence numbers
798 public void ackReceived(int seqnum, int[] sackList) {
802 int rttCalcSeqnum = -1;
804 int fallBackSeqnum = -1;
806 // remove acknowledged messages from retrans Q.
807 synchronized (retrQ) {
808 lastACKTime = TimeUtils.timeNow();
810 maxACK = Math.max(maxACK, seqnum);
812 // dump the current Retry queue and the SACK list
813 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
814 StringBuilder dumpRETRQ = new StringBuilder("ACK RECEIVE : " + Integer.toString(seqnum));
816 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
817 dumpRETRQ.append('\n');
819 dumpRETRQ.append("\tRETRQ (size=").append(retrQ.size()).append(")");
820 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
821 dumpRETRQ.append(" : ");
822 for (int y = 0; y < retrQ.size(); y++) {
824 dumpRETRQ.append(", ");
826 RetrQElt r = retrQ.get(y);
828 dumpRETRQ.append(r.seqnum);
831 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
832 dumpRETRQ.append('\n');
834 dumpRETRQ.append("\tSACKLIST (size=").append(sackList.length).append(")");
835 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
836 dumpRETRQ.append(" : ");
837 for (int y = 0; y < sackList.length; y++) {
839 dumpRETRQ.append(", ");
841 dumpRETRQ.append(sackList[y]);
844 LOG.fine(dumpRETRQ.toString());
847 Iterator<RetrQElt> eachRetryQueueEntry = retrQ.iterator();
849 // First remove monotonically increasing seq#s in retrans vector
850 while (eachRetryQueueEntry.hasNext()) {
851 RetrQElt retrQElt = eachRetryQueueEntry.next();
853 if (retrQElt.seqnum > seqnum) {
857 eachRetryQueueEntry.remove();
859 // Update RTT, RTO. Use only those that where acked
860 // w/o retrans otherwise the number may be phony (ack
861 // of first xmit received just after resending => RTT
862 // seems small). Also, we keep the worst of the bunch
863 // we encounter. If we really can't find a single
864 // non-resent message, we make do with a pessimistic
865 // approximation: we must not be left behind with an
866 // RTT that's too short, we'd keep resending like
868 long enqueuetime = retrQElt.enqueuedAt;
869 long dt = TimeUtils.toRelativeTimeMillis(lastACKTime, enqueuetime);
872 if (retrQElt.marked == 0) {
873 if (dt > rttCalcDt) {
875 rttCalcSeqnum = retrQElt.seqnum;
878 // In case we find no good candidate, make
879 // a guess by dividing by the number of attempts
880 // and keep the worst of them too. Since we
881 // know it may be too short, we will not use it
883 dt /= (retrQElt.marked + 1);
884 if (dt > fallBackDt) {
886 fallBackSeqnum = retrQElt.seqnum;
889 fc.packetACKed(retrQElt.seqnum);
893 // Update last accessed time in response to getting seq acks.
894 if (numberACKed > 0) {
895 outgoing.setLastAccessed(TimeUtils.timeNow());
897 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
898 LOG.fine("SEQUENTIALLY ACKD SEQN = " + seqnum + ", (" + numberACKed + " acked)");
900 // most recent remote IQ free space
901 mrrIQFreeSpace = rmaxQSize - sackList.length;
902 // let's look at average sacs.size(). If it is big, then this
903 // probably means we must back off because the system is slow.
904 // Our retrans Queue can be large and we can overwhelm the
905 // receiver with retransmissions.
906 // We will keep the rwin <= ave real input queue size.
907 int aveIQ = calcAVEIQ(sackList.length);
909 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
910 LOG.fine("remote IQ free space = " + mrrIQFreeSpace + " remote avg IQ occupancy = " + aveIQ);
915 if (sackList.length > 0) {
916 Iterator<RetrQElt> eachRetrQElement = retrQ.iterator();
919 while (eachRetrQElement.hasNext()) {
920 RetrQElt retrQElt = eachRetrQElement.next();
922 while (sackList[currentSACK] < retrQElt.seqnum) {
924 if (currentSACK == sackList.length) {
928 if (currentSACK == sackList.length) {
931 if (sackList[currentSACK] == retrQElt.seqnum) {
932 fc.packetACKed(retrQElt.seqnum);
934 eachRetrQElement.remove();
936 // Update RTT, RTO. Use only those that where acked w/o retrans
937 // otherwise the number is completely phony.
938 // Also, we keep the worst of the bunch we encounter.
939 long enqueuetime = retrQElt.enqueuedAt;
940 long dt = TimeUtils.toRelativeTimeMillis(lastACKTime, enqueuetime);
943 if (retrQElt.marked == 0) {
944 if (dt > rttCalcDt) {
946 rttCalcSeqnum = retrQElt.seqnum;
949 // In case we find no good candidate, make
950 // a guess by dividing by the number of attempts
951 // and keep the worst of them too. Since we
952 // know it may be too short, we will not use it
954 dt /= (retrQElt.marked + 1);
955 if (dt > fallBackDt) {
957 fallBackSeqnum = retrQElt.seqnum;
960 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
961 LOG.fine("SACKD SEQN = " + retrQElt.seqnum);
968 // Retransmit? Only if there is a hole in the selected
969 // acknowledgement list. Otherwise let RTO deal.
971 // Given that this SACK acknowledged messages still
973 // seqnum is the max consectively SACKD message.
974 // seqnum < retrQElt.seqnum means a message has not reached
975 // receiver. EG: sacklist == 10,11,13 seqnum == 11
977 if (seqnum < retrQElt.seqnum) {
978 fc.packetMissing(retrQElt.seqnum);
980 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
981 LOG.fine("RETR: Fill hole, SACK, seqn#" + retrQElt.seqnum + ", Window =" + retrans);
987 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
988 LOG.fine("SELECTIVE ACKD (" + numberACKed + ") " + retrans + " retrans wanted");
992 // Compute aveRTT on the most representative message,
993 // if any. That's the most accurate data.
994 // Failing that we use the fall back, provided that it not
995 // more recent than aveRTT ago - that would decrease aveRTT
996 // and in the absence of solid data, we do not want to take
998 if (rttCalcSeqnum != -1) {
999 calcRTT(rttCalcDt, rttCalcSeqnum);
1000 // get fc to recompute rwindow
1001 rwindow = fc.ackEventEnd(rmaxQSize, aveRTT, rttCalcDt);
1002 } else if ((fallBackSeqnum != -1) && (fallBackDt > aveRTT)) {
1003 calcRTT(fallBackDt, fallBackSeqnum);
1004 // get fc to recompute rwindow
1005 rwindow = fc.ackEventEnd(rmaxQSize, aveRTT, fallBackDt);
1012 * retransmit unacknowledged messages
1014 * @param rwin max number of messages to retransmit
1015 * @param triggerTime base time
1016 * @return number of messages retransmitted.
1018 private int retransmit(int rwin, long triggerTime) {
1020 List<RetrQElt> retransMsgs = new ArrayList<RetrQElt>();
1022 int numberToRetrans;
1024 // build a list of retries.
1025 synchronized (retrQ) {
1026 numberToRetrans = Math.min(retrQ.size(), rwin);
1027 if (numberToRetrans > 0 && Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1028 LOG.fine("Number of messages pending retransmit =" + numberToRetrans);
1030 for (int j = 0; j < numberToRetrans; j++) {
1031 RetrQElt r = retrQ.get(j);
1033 // Mark message as retransmission
1034 // need to know if a msg was retr or not for RTT eval
1035 if (r.marked == 0) {
1036 // First time: we're here because this message has not arrived, but
1037 // the next one has. It may be an out of order message.
1038 // Experience shows that such a message rarely arrives older than
1039 // 1.2 * aveRTT. Beyond that, it's lost. It is also rare that we
1040 // detect a hole within that delay. So, often enough, as soon as
1041 // a hole is detected, it's time to resend...but not always.
1042 if (TimeUtils.toRelativeTimeMillis(triggerTime, r.sentAt) < (6 * aveRTT) / 5) {
1043 // Nothing to worry about, yet.
1047 // That one has been retransmitted at least once already.
1048 // So, we don't have much of a clue other than the age of the
1049 // last transmission. It is unlikely that it arrives before aveRTT/2
1050 // but we have to anticipate its loss at the risk of making dupes.
1051 // Otherwise the receiver will reach the hole, and that's really
1052 // expensive. (Think that we've been trying for a while already.)
1054 if (TimeUtils.toRelativeTimeMillis(triggerTime, r.sentAt) < aveRTT) {
1055 // Nothing to worry about, yet.
1060 // Make a copy to for sending
1065 // send the retries.
1066 int retransmitted = 0;
1067 Iterator<RetrQElt> eachRetrans = retransMsgs.iterator();
1069 while (eachRetrans.hasNext()) {
1070 RetrQElt r = eachRetrans.next();
1072 eachRetrans.remove();
1074 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1075 LOG.fine("RETRANSMIT seqn#" + r.seqnum);
1077 Message sending = r.msg;
1079 // its possible that the message was
1080 // acked while we were working in this
1081 // case r.msg will have been nulled.
1082 if (null != sending) {
1083 sending = sending.clone();
1084 sending.replaceMessageElement(Defs.NAMESPACE, RETELT);
1085 if (outgoing.send(sending)) {
1086 r.sentAt = TimeUtils.timeNow();
1088 // assume we have now taken a slot
1092 // don't bother continuing sending now.
1095 } catch (IOException e) {
1096 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1097 LOG.log(Level.FINE, "FAILED RETRANS seqn#" + r.seqnum, e);
1100 // don't bother continuing.
1104 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1105 LOG.fine("RETRANSMITED " + retransmitted + " of " + numberToRetrans);
1108 return retransmitted;
1112 * Retransmission daemon thread
1114 private class Retransmitter implements Runnable {
1117 volatile int nretransmitted = 0;
1120 * Constructor for the Retransmitter object
1122 public Retransmitter() {
1123 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
1124 LOG.info("STARTED Reliable Retransmitter, RTO = " + RTO);
1129 * Gets the retransCount attribute of the Retransmitter object
1131 * @return The retransCount value
1133 public int getRetransCount() {
1134 return nretransmitted;
1140 * <p/>Main processing method for the Retransmitter object
1144 int idleCounter = 0;
1146 while (TimeUtils.toRelativeTimeMillis(closedAt) > 0) {
1147 long conn_idle = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), outgoing.getLastAccessed());
1149 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1150 LOG.fine(outgoing + " idle for " + conn_idle);
1153 // check to see if we have not idled out.
1154 if (outgoing.getIdleTimeout() < conn_idle) {
1155 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
1156 LOG.info("Shutting down idle " + "connection " + outgoing);
1163 long oldestInQueueWait;
1165 synchronized (retrQ) {
1170 Thread.currentThread().setName(
1171 "JXTA Reliable Retransmiter for " + this + " Queue size : " + retrQ.size());
1172 } catch (InterruptedException e) {// ignored
1175 if (TimeUtils.toRelativeTimeMillis(closedAt) <= 0) {
1179 // see if we recently did a retransmit triggered by a SACK
1180 long sinceLastSACKRetr = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), sackRetransTime);
1182 if (sinceLastSACKRetr < RTO) {
1183 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1184 LOG.fine("SACK retrans " + sinceLastSACKRetr + "ms ago");
1188 // See how long we've waited since RTO was set
1189 sinceLastACK = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), lastACKTime);
1191 if (!retrQ.isEmpty()) {
1192 RetrQElt elt = retrQ.get(0);
1194 oldestInQueueWait = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), elt.enqueuedAt);
1196 oldestInQueueWait = 0;
1200 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1201 LOG.fine("Last ACK " + sinceLastACK + "ms ago. Age of oldest in Queue " + oldestInQueueWait + "ms.");
1204 // see if the queue has gone dead
1205 if (oldestInQueueWait > outgoing.getMaxRetryAge()) {
1206 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
1207 LOG.info("Shutting down stale connection " + outgoing);
1213 // get real wait as max of age of oldest in retrQ and
1215 long realWait = Math.max(oldestInQueueWait, sinceLastACK);
1217 // Retransmit only if RTO has expired.
1218 // a. real wait time is longer than RTO
1219 // b. oldest message on Q has been there longer
1220 // than RTO. This is necessary because we may
1221 // have just sent a message, and we do not
1222 // want to overrun the receiver. Also, we
1223 // do not want to restransmit a message that
1224 // has not been idle for the RTO.
1225 if ((realWait >= RTO) && (oldestInQueueWait >= RTO)) {
1226 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1227 LOG.fine("RTO RETRANSMISSION [" + rwindow + "]");
1230 int retransed = retransmit(rwindow, TimeUtils.timeNow());
1233 nretransmitted += retransed;
1234 // number at this RTO
1235 nAtThisRTO += retransed;
1236 // See if real wait is too long and queue is non-empty
1237 // Remote may be dead - double until max.
1238 // Double after window restransmitted msgs at this RTO
1239 // exceeds the rwindow, and we've had no response for
1240 // twice the current RTO.
1241 if ((retransed > 0) && (realWait >= 2 * RTO) && (nAtThisRTO >= 2 * rwindow)) {
1242 RTO = (realWait > maxRTO ? maxRTO : 2 * RTO);
1245 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1247 "RETRANSMISSION " + retransed + " retrans " + nAtThisRTO + " at this RTO (" + RTO + ") "
1248 + nretransmitted + " total retrans");
1252 // reset RTO to min if we are idle
1253 if (idleCounter == 2) {
1258 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1259 LOG.fine("IDLE : RTO=" + RTO + " WAIT=" + realWait);
1263 } catch (Throwable all) {
1264 LOG.log(Level.SEVERE, "Uncaught Throwable in thread :" + Thread.currentThread().getName(), all);
1270 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
1271 LOG.info("STOPPED Retransmit thread");