2 * Copyright (c) 2003-2007 Sun Microsystems, Inc. All rights reserved.
4 * The Sun Project JXTA(TM) Software License
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
9 * 1. Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
16 * 3. The end-user documentation included with the redistribution, if any, must
17 * include the following acknowledgment: "This product includes software
18 * developed by Sun Microsystems, Inc. for JXTA(TM) technology."
19 * Alternately, this acknowledgment may appear in the software itself, if
20 * and wherever such third-party acknowledgments normally appear.
22 * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must
23 * not be used to endorse or promote products derived from this software
24 * without prior written permission. For written permission, please contact
25 * Project JXTA at http://www.jxta.org.
27 * 5. Products derived from this software may not be called "JXTA", nor may
28 * "JXTA" appear in their name, without prior written permission of Sun.
30 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
31 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
32 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN
33 * MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
34 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
36 * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
37 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
38 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
39 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41 * JXTA is a registered trademark of Sun Microsystems, Inc. in the United
42 * States and other countries.
44 * Please see the license information page at :
45 * <http://www.jxta.org/project/www/license.html> for instructions on use of
46 * the license in source files.
48 * ====================================================================
50 * This software consists of voluntary contributions made by many individuals
51 * on behalf of Project JXTA. For more information on Project JXTA, please see
52 * http://www.jxta.org.
54 * This license is based on the BSD license adopted by the Apache Foundation.
57 package net.jxta.impl.util.pipe.reliable;
60 import java.io.ByteArrayInputStream;
61 import java.io.ByteArrayOutputStream;
62 import java.io.DataOutputStream;
63 import java.io.IOException;
64 import java.io.InputStream;
65 import java.io.InterruptedIOException;
66 import java.net.SocketTimeoutException;
67 import java.util.ArrayList;
68 import java.util.Arrays;
69 import java.util.Iterator;
70 import java.util.List;
72 import net.jxta.endpoint.ByteArrayMessageElement;
73 import net.jxta.endpoint.Message;
74 import net.jxta.endpoint.MessageElement;
75 import net.jxta.endpoint.WireFormatMessageFactory;
76 import net.jxta.impl.util.TimeUtils;
78 import java.util.logging.Level;
79 import net.jxta.logging.Logging;
80 import java.util.logging.Logger;
84 * Acts as a reliable input stream. Accepts data which
85 * arrives in messages and orders it.
87 public class ReliableInputStream extends InputStream implements Incoming {
92 private static final Logger LOG = Logger.getLogger(ReliableInputStream.class.getName());
95 * Connection we are working for.
97 private Outgoing outgoing;
99 private volatile boolean closed = false;
100 private boolean closing = false;
102 private MsgListener listener = null;
105 * The amount of time that read() operation will block. > 0
107 private long timeout;
110 * The current sequence number we are reading bytes from.
112 private volatile int sequenceNumber = 0;
115 * Queue of incoming messages.
117 private final List<IQElt> inputQueue = new ArrayList<IQElt>();
120 * The I/O record for the message we are currently using for stream data.
122 private final Record record;
125 * Input record Object
127 private static class Record {
128 public InputStream inputStream;
130 public long nextByte;
135 inputStream = null; // allocated by caller
136 nextByte = 0; // We read here (set by caller)
137 size = 0; // Record size(set by caller)
140 /** reset the record element
143 public void resetRecord() {
144 if (null != inputStream) {
147 } catch (IOException ignored) {}
156 * An input queue element which breaks out a received message in
159 private static class IQElt implements Comparable {
161 final MessageElement elt;
162 boolean ackd = false;
164 IQElt(int sequence, MessageElement element) {
173 public boolean equals(Object obj) {
177 if (obj instanceof IQElt) {
178 IQElt targ = (IQElt) obj;
180 return (this.seqnum == targ.seqnum);
185 public int compareTo(IQElt el) {
186 return this.seqnum < el.seqnum ? -1 : this.seqnum == el.seqnum ? 0 : 1;
192 public int compareTo(Object o) {
193 return compareTo((IQElt) o);
197 public ReliableInputStream(Outgoing outgoing, int timeout) {
198 this(outgoing, timeout, null);
201 public ReliableInputStream(Outgoing outgoing, int timeout, MsgListener listener) {
202 this.outgoing = outgoing;
205 record = new Record();
206 this.listener = listener;
207 // 1 <= seq# <= maxint, monotonically increasing
208 // Incremented before compare.
210 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
211 if (listener != null) {
212 LOG.info("Listener based ReliableInputStream created");
220 * <p/>This is an explicit close operation. All subsequent {@code read()}
221 * operations will fail.
224 public void close() throws IOException {
226 synchronized (inputQueue) {
229 inputQueue.notifyAll();
234 * Returns true if closed
236 * @return true if closed
238 public boolean isInputShutdown() {
243 * Prepare this input stream to being closed. It will still deliver the
244 * packets that have been received, but nothing more. This is meant to be
245 * called in response to the other side having initiated closure. We assume
246 * that when the other side does it it means that it is satisfied with what
247 * we have acknowledged so far.
249 public void softClose() {
250 synchronized (inputQueue) {
252 inputQueue.notifyAll();
257 * Sets the Timeout attribute. A timeout of 0 blocks forever
259 * @param timeout The new soTimeout value
261 public void setTimeout(int timeout) {
263 throw new IllegalArgumentException("Timeout must be >=0");
266 this.timeout = (0 == timeout) ? Long.MAX_VALUE : timeout;
273 public int read() throws IOException {
278 byte[] a = new byte[1];
281 int len = local_read(a, 0, 1);
287 if (Logging.SHOW_FINER && LOG.isLoggable(Level.FINER)) {
288 LOG.finer("Read() : " + (a[0] & 255));
291 return a[0] & 0xFF; // The byte
295 // If we've reached EOF, there's nothing to do but close().
305 public int read(byte[] a, int offset, int length) throws IOException {
314 int i = local_read(a, offset, length);
316 if (Logging.SHOW_FINER && LOG.isLoggable(Level.FINER)) {
317 LOG.finer("Read(byte[], int, " + length + "), bytes read = " + i);
320 // If we've reached EOF; there's nothing to do but close().
328 * Send a sequential ACK and selective ACKs for all of
329 * the queued messages.
331 * @param seqnAck the sequence number being sequential ACKed
333 private void sendACK(int seqnAck) {
334 // No need to sync on inputQueue, acking as many as we can is want we want
335 List<Integer> selectedAckList = new ArrayList<Integer>();
338 synchronized (inputQueue) {
339 queue = new ArrayList<IQElt>(inputQueue);
342 Iterator<IQElt> eachInQueue = queue.iterator();
344 while (eachInQueue.hasNext() && (selectedAckList.size() < Defs.MAXQUEUESIZE)) {
345 IQElt anIQElt = eachInQueue.next();
347 if (anIQElt.seqnum > seqnAck) {
349 selectedAckList.add(anIQElt.seqnum);
355 // PERMIT DUPLICATE ACKS. Just a list and one small message.
356 sendACK(seqnAck, selectedAckList);
360 * Build an ACK message. The message provides a sequential ACK count and
361 * an optional list of selective ACKs.
363 * @param seqnAck the sequence number being sequential ACKed
364 * @param sackList a list of selective ACKs. Must be sorted in increasing
367 private void sendACK(int seqnAck, List<Integer> sackList) {
368 ByteArrayOutputStream bos = new ByteArrayOutputStream((1 + sackList.size()) * 4);
369 DataOutputStream dos = new DataOutputStream(bos);
372 dos.writeInt(seqnAck);
373 for (Integer aSackList : sackList) {
374 dos.writeInt(aSackList);
379 Message ACKMsg = new Message();
380 MessageElement elt = new ByteArrayMessageElement(Defs.ACK_ELEMENT_NAME, Defs.MIME_TYPE_ACK, bos.toByteArray(), null);
382 ACKMsg.addMessageElement(Defs.NAMESPACE, elt);
384 outgoing.send(ACKMsg);
386 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
387 LOG.fine("SENT ACK, seqn#" + seqnAck + " and " + sackList.size() + " SACKs ");
389 } catch (IOException e) {
390 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
391 LOG.log(Level.WARNING, "sendACK caught IOException:", e);
399 public void recv(Message msg) {
400 queueIncomingMessage(msg);
403 public boolean hasNextMessage() {
404 return !inputQueue.isEmpty();
407 Message nextMessage(boolean blocking) throws IOException {
408 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
409 LOG.fine("nextMessage blocking? [" + blocking + "]");
411 MessageElement elt = dequeueMessage(sequenceNumber + 1, blocking);
416 sequenceNumber += 1; // next msg sequence number
421 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
422 LOG.fine("Converting message seqn :" + (sequenceNumber - 1) + "element to message");
425 msg = WireFormatMessageFactory.fromWire(elt.getStream(), Defs.MIME_TYPE_MSG, null);
426 } catch (IOException ex) {
427 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
428 LOG.log(Level.WARNING, "Could not deserialize message " + elt.getElementName(), ex);
436 * queue messages by sequence number.
438 private void queueIncomingMessage(Message msg) {
440 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
441 LOG.fine("Queue Incoming Message begins for " + msg);
444 long startEnqueue = TimeUtils.timeNow();
446 Iterator<MessageElement> eachElement = msg.getMessageElements(Defs.NAMESPACE, Defs.MIME_TYPE_BLOCK);
448 // OK look for jxta message
449 while (!closed && !closing && eachElement.hasNext()) {
450 MessageElement elt = eachElement.next();
452 eachElement.remove();
457 msgSeqn = Integer.parseInt(elt.getElementName());
458 } catch (NumberFormatException n) {
459 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
460 LOG.warning("Discarding element (" + elt.getElementName() + ") Not one of ours.");
465 IQElt newElt = new IQElt(msgSeqn, elt);
467 // OK we must enqueue
469 // We rely on the sender to not to send more than the window size
470 // because we do not limit the number of elements we allow to be
473 // see if this is a duplicate
474 if (newElt.seqnum <= sequenceNumber) {
475 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
476 LOG.fine("RCVD OLD MESSAGE : Discard seqn#" + newElt.seqnum + " now at seqn#" + sequenceNumber);
481 synchronized (inputQueue) {
483 // dbl check with the lock held.
484 if (closing || closed) {
488 // Insert this message into the input queue.
489 // 1. Do not add duplicate messages
490 // 2. Store in increasing sequence nos.
491 int insertIndex = inputQueue.size();
492 boolean duplicate = false;
494 for (int j = 0; j < inputQueue.size(); j++) {
495 IQElt iq = inputQueue.get(j);
497 if (newElt.seqnum < iq.seqnum) {
500 } else if (newElt.seqnum == iq.seqnum) {
507 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
508 LOG.fine("RCVD OLD MESSAGE : Discard duplicate msg, seqn#" + newElt.seqnum);
513 inputQueue.add(insertIndex, newElt);
515 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
516 LOG.fine("Enqueued msg with seqn#" + newElt.seqnum + " at index " + insertIndex);
519 inputQueue.notifyAll();
523 if (listener != null) {
524 Message newmsg = null;
528 newmsg = nextMessage(false);
529 } catch (IOException io) {// do nothing as this exception will never occur
531 if (newmsg == null) {
535 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
536 LOG.fine("In listener mode, calling back listener");
538 listener.processIncomingMessage(newmsg);
539 } catch (Throwable all) {
540 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
541 LOG.log(Level.WARNING, "Uncaught Throwable calling listener", all);
547 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
548 long waited = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), startEnqueue);
550 LOG.fine("Queue Incoming Message for " + msg + " completed in " + waited + " msec.");
554 long nextRetransRequest = TimeUtils.toAbsoluteTimeMillis(TimeUtils.ASECOND);
557 * Dequeue the message with the desired sequence number waiting as needed
558 * until the message is available.
560 * @param desiredSeqn the sequence number to be dequeued.
561 * @param blocking If {@code true} then this method should block while
562 * waiting for the specified message sequence number.
563 * @return the Message Element with the desired sequence number or null if
564 * the queue has been closed.
566 private MessageElement dequeueMessage(int desiredSeqn, boolean blocking) throws IOException {
569 // Wait for incoming message here
570 long startDequeue = TimeUtils.timeNow();
571 long timeoutAt = TimeUtils.toAbsoluteTimeMillis(timeout);
574 synchronized (inputQueue) {
576 if (inputQueue.isEmpty()) {
585 inputQueue.wait(TimeUtils.ASECOND);
586 if (timeoutAt < TimeUtils.timeNow()) {
587 throw new SocketTimeoutException("Read timeout reached");
589 } catch (InterruptedException e) {
590 Thread.interrupted();
592 // reset retrans request timer since we don't want to immediately
593 // request retry after a long wait for out of order messages.
595 nextRetransRequest = TimeUtils.toAbsoluteTimeMillis(TimeUtils.ASECOND);
599 iQ = inputQueue.get(0); // FIFO
601 if (iQ.seqnum < desiredSeqn) {
602 // Ooops a DUPE slipped in the head of the queue undetected
603 // (seqnum consistency issue).
605 inputQueue.remove(0);
606 // if such is the case then notify the other end so that
607 // the message does not remain in the retry queue eventually
608 // triggering a broken pipe exception
611 } else if (iQ.seqnum != desiredSeqn) {
612 if (TimeUtils.toRelativeTimeMillis(nextRetransRequest) < 0) {
613 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
614 LOG.fine("Trigger retransmission. Wanted seqn#" + desiredSeqn + " found seqn#" + iQ.seqnum);
616 sendACK(desiredSeqn - 1);
617 nextRetransRequest = TimeUtils.toAbsoluteTimeMillis(TimeUtils.ASECOND);
620 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
621 LOG.fine("Message out of sequece in Non-Blocking mode. returning");
623 // not the element of interest return nothing
628 inputQueue.wait(TimeUtils.ASECOND);
629 if (timeoutAt < TimeUtils.timeNow()) {
630 throw new SocketTimeoutException("Read timeout reached");
632 } catch (InterruptedException e) {
633 throw new InterruptedIOException("IO interrupted ");
637 inputQueue.remove(0);
641 nextRetransRequest = 0;
642 // if we are closed then we return null
647 sendACK(desiredSeqn);
649 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
650 long waited = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), startDequeue);
652 LOG.fine("DEQUEUED seqn#" + iQ.seqnum + " in " + waited + " msec on input queue");
654 LOG.fine("DEQUEUE waited " + wct + " times on input queue");
664 public int available() throws IOException {
665 if (listener != null) {
666 throw new IOException("available() not supported in async mode");
669 throw new IOException("Stream closed");
671 synchronized (record) {
672 if (record.inputStream != null) {
673 if ((record.size == 0) || (record.nextByte == record.size)) {
674 if (inputQueue.isEmpty()) {
678 record.resetRecord(); // GC as necessary(inputStream byte[])
679 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
680 LOG.fine("Getting next data block at seqn#" + (sequenceNumber + 1));
682 MessageElement elt = dequeueMessage(sequenceNumber + 1, false);
687 sequenceNumber += 1; // next msg sequence number
688 // Get the length of the Record
689 record.size = elt.getByteLength();
690 record.inputStream = elt.getStream();
692 return record.inputStream.available();
698 private int local_read(byte[] buf, int offset, int length) throws IOException {
700 if (listener != null) {
701 throw new IOException("read() not supported in async mode");
704 synchronized (record) {
705 if ((record.size == 0) || (record.nextByte == record.size)) {
708 record.resetRecord(); // GC as necessary(inputStream byte[])
710 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
711 LOG.fine("Getting next data block at seqn#" + (sequenceNumber + 1));
714 MessageElement elt = dequeueMessage(sequenceNumber + 1, true);
720 sequenceNumber += 1; // next msg sequence number
722 // Get the length of the Record
723 record.size = elt.getByteLength();
724 record.inputStream = elt.getStream();
726 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
727 LOG.fine("new seqn#" + sequenceNumber + ", bytes = " + record.size);
731 // return the requested Record data
732 // These calls should NEVER ask for more data than is in the
735 long left = record.size - record.nextByte;
736 int copyLen = (int) Math.min(length, left);
740 int res = record.inputStream.read(buf, offset + copied, copyLen - copied);
746 } while (copied < copyLen);
748 record.nextByte += copied;
750 if (Logging.SHOW_FINER && LOG.isLoggable(Level.FINER)) {
751 LOG.finer("Requested " + length + ", Read " + copied + " bytes");
759 * Returns the message listener for this pipe
760 * @return MsgListener
763 public MsgListener getListener() {
768 * The listener interface for receiving {@link net.jxta.endpoint.Message}
770 public interface MsgListener {
773 * Called for each message received.
775 * @param message The message to be received.
777 void processIncomingMessage(Message message);