]> sjero.net Git - linphone/blob - p2pproxy/dependencies-src/jxse-src-2.5/impl/src/net/jxta/impl/util/pipe/reliable/ReliableInputStream.java
5233416bdae33a96390b74ee9ded06aaaeda2077
[linphone] / p2pproxy / dependencies-src / jxse-src-2.5 / impl / src / net / jxta / impl / util / pipe / reliable / ReliableInputStream.java
1 /*
2  * Copyright (c) 2003-2007 Sun Microsystems, Inc.  All rights reserved.
3  *  
4  *  The Sun Project JXTA(TM) Software License
5  *  
6  *  Redistribution and use in source and binary forms, with or without 
7  *  modification, are permitted provided that the following conditions are met:
8  *  
9  *  1. Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *  
12  *  2. Redistributions in binary form must reproduce the above copyright notice, 
13  *     this list of conditions and the following disclaimer in the documentation 
14  *     and/or other materials provided with the distribution.
15  *  
16  *  3. The end-user documentation included with the redistribution, if any, must 
17  *     include the following acknowledgment: "This product includes software 
18  *     developed by Sun Microsystems, Inc. for JXTA(TM) technology." 
19  *     Alternately, this acknowledgment may appear in the software itself, if 
20  *     and wherever such third-party acknowledgments normally appear.
21  *  
22  *  4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must 
23  *     not be used to endorse or promote products derived from this software 
24  *     without prior written permission. For written permission, please contact 
25  *     Project JXTA at http://www.jxta.org.
26  *  
27  *  5. Products derived from this software may not be called "JXTA", nor may 
28  *     "JXTA" appear in their name, without prior written permission of Sun.
29  *  
30  *  THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
31  *  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND 
32  *  FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN 
33  *  MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
34  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 
35  *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, 
36  *  OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
37  *  LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
38  *  NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, 
39  *  EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
40  *  
41  *  JXTA is a registered trademark of Sun Microsystems, Inc. in the United 
42  *  States and other countries.
43  *  
44  *  Please see the license information page at :
45  *  <http://www.jxta.org/project/www/license.html> for instructions on use of 
46  *  the license in source files.
47  *  
48  *  ====================================================================
49  *  
50  *  This software consists of voluntary contributions made by many individuals 
51  *  on behalf of Project JXTA. For more information on Project JXTA, please see 
52  *  http://www.jxta.org.
53  *  
54  *  This license is based on the BSD license adopted by the Apache Foundation. 
55  */
56
57 package net.jxta.impl.util.pipe.reliable;
58
59
60 import java.io.ByteArrayInputStream;
61 import java.io.ByteArrayOutputStream;
62 import java.io.DataOutputStream;
63 import java.io.IOException;
64 import java.io.InputStream;
65 import java.io.InterruptedIOException;
66 import java.net.SocketTimeoutException;
67 import java.util.ArrayList;
68 import java.util.Arrays;
69 import java.util.Iterator;
70 import java.util.List;
71
72 import net.jxta.endpoint.ByteArrayMessageElement;
73 import net.jxta.endpoint.Message;
74 import net.jxta.endpoint.MessageElement;
75 import net.jxta.endpoint.WireFormatMessageFactory;
76 import net.jxta.impl.util.TimeUtils;
77
78 import java.util.logging.Level;
79 import net.jxta.logging.Logging;
80 import java.util.logging.Logger;
81
82
83 /**
84  *  Acts as a reliable input stream. Accepts data which
85  *  arrives in messages and orders it.
86  */
87 public class ReliableInputStream extends InputStream implements Incoming {
88     
89     /**
90      *  Logger
91      */
92     private static final Logger LOG = Logger.getLogger(ReliableInputStream.class.getName());
93     
94     /**
95      *  Connection we are working for.
96      */
97     private Outgoing outgoing;
98     
99     private volatile boolean closed = false;
100     private boolean closing = false;
101     
102     private MsgListener listener = null;
103     
104     /**
105      *  The amount of time that read() operation will block. > 0
106      */
107     private long timeout;
108     
109     /**
110      *  The current sequence number we are reading bytes from.
111      */
112     private volatile int sequenceNumber = 0;
113     
114     /**
115      *  Queue of incoming messages.
116      */
117     private final List<IQElt> inputQueue = new ArrayList<IQElt>();
118     
119     /**
120      *  The I/O record for the message we are currently using for stream data.
121      */
122     private final Record record;
123     
124     /**
125      * Input record Object
126      */
127     private static class Record {
128         public InputStream inputStream;
129         // next inbuff byte
130         public long nextByte;
131         // size of Record
132         public long size;
133         
134         public Record() {
135             inputStream = null; // allocated by caller
136             nextByte = 0; // We read here (set by caller)
137             size = 0; // Record size(set by caller)
138         }
139         
140         /** reset the record element
141          *
142          */
143         public void resetRecord() {
144             if (null != inputStream) {
145                 try {
146                     inputStream.close();
147                 } catch (IOException ignored) {}
148             }
149             inputStream = null;
150             size = nextByte = 0;
151         }
152     }
153     
154
155     /**
156      *  An input queue element which breaks out a received message in 
157      *  enqueueMessage().
158      */
159     private static class IQElt implements Comparable {
160         final int seqnum;
161         final MessageElement elt;
162         boolean ackd = false;
163         
164         IQElt(int sequence, MessageElement element) {
165             seqnum = sequence;
166             elt = element;
167         }
168         
169         /**
170          * {@inheritDoc}
171          */
172         @Override
173         public boolean equals(Object obj) {
174             if (this == obj) {
175                 return true;
176             }
177             if (obj instanceof IQElt) {
178                 IQElt targ = (IQElt) obj;
179
180                 return (this.seqnum == targ.seqnum);
181             }
182             return false;
183         }
184         
185         public int compareTo(IQElt el) {
186             return this.seqnum < el.seqnum ? -1 : this.seqnum == el.seqnum ? 0 : 1;
187         }
188         
189         /**
190          * {@inheritDoc}
191          */
192         public int compareTo(Object o) {
193             return compareTo((IQElt) o);
194         }
195     }
196     
197     public ReliableInputStream(Outgoing outgoing, int timeout) {
198         this(outgoing, timeout, null);
199     }
200     
201     public ReliableInputStream(Outgoing outgoing, int timeout, MsgListener listener) {
202         this.outgoing = outgoing;
203         setTimeout(timeout);
204         
205         record = new Record();
206         this.listener = listener;
207         // 1 <= seq# <= maxint, monotonically increasing
208         // Incremented before compare.
209         sequenceNumber = 0;
210         if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
211             if (listener != null) {
212                 LOG.info("Listener based ReliableInputStream created");
213             }
214         }
215     }
216     
217     /**
218      * {@inheritDoc}
219      *
220      * <p/>This is an explicit close operation. All subsequent {@code read()}
221      * operations will fail.
222      */
223     @Override
224     public void close() throws IOException {
225         super.close();
226         synchronized (inputQueue) {
227             closed = true;
228             inputQueue.clear();
229             inputQueue.notifyAll();
230         }
231     }
232     
233     /**
234      * Returns true if closed
235      *
236      * @return true if closed
237      */
238     public boolean isInputShutdown() {
239         return closed;
240     }
241     
242     /**
243      * Prepare this input stream to being closed. It will still deliver the
244      * packets that have been received, but nothing more. This is meant to be
245      * called in response to the other side having initiated closure. We assume
246      * that when the other side does it it means that it is satisfied with what
247      * we have acknowledged so far.
248      */
249     public void softClose() {
250         synchronized (inputQueue) {
251             closing = true;
252             inputQueue.notifyAll();
253         }
254     }
255     
256     /**
257      *  Sets the Timeout attribute. A timeout of 0 blocks forever
258      *
259      * @param  timeout The new soTimeout value
260      */
261     public void setTimeout(int timeout) {
262         if (timeout < 0) {
263             throw new IllegalArgumentException("Timeout must be >=0");
264         }
265         
266         this.timeout = (0 == timeout) ? Long.MAX_VALUE : timeout;
267     }
268     
269     /**
270      * {@inheritDoc}
271      */
272     @Override
273     public int read() throws IOException {
274         if (closed) {
275             return -1;
276         }
277         
278         byte[] a = new byte[1];
279         
280         while (true) {
281             int len = local_read(a, 0, 1);
282             
283             if (len < 0) {
284                 break;
285             }
286             if (len > 0) {
287                 if (Logging.SHOW_FINER && LOG.isLoggable(Level.FINER)) {
288                     LOG.finer("Read() : " + (a[0] & 255));
289                 }
290                 
291                 return a[0] & 0xFF; // The byte
292             }
293         }
294         
295         // If we've reached EOF, there's nothing to do but close().
296         
297         close();
298         return -1;
299     }
300     
301     /**
302      * {@inheritDoc}
303      */
304     @Override
305     public int read(byte[] a, int offset, int length) throws IOException {
306         if (closed) {
307             return -1;
308         }
309         
310         if (0 == length) {
311             return 0;
312         }
313         
314         int i = local_read(a, offset, length);
315         
316         if (Logging.SHOW_FINER && LOG.isLoggable(Level.FINER)) {
317             LOG.finer("Read(byte[], int, " + length + "), bytes read = " + i);
318         }
319         
320         // If we've reached EOF; there's nothing to do but close().
321         if (i == -1) {
322             close();
323         }
324         return i;
325     }
326     
327     /**
328      *  Send a sequential ACK and selective ACKs for all of
329      *  the queued messages.
330      *
331      *  @param seqnAck the sequence number being sequential ACKed
332      */
333     private void sendACK(int seqnAck) {
334         // No need to sync on inputQueue, acking as many as we can is want we want
335         List<Integer> selectedAckList = new ArrayList<Integer>();
336         List<IQElt> queue;
337         
338         synchronized (inputQueue) {
339             queue = new ArrayList<IQElt>(inputQueue);
340         }
341         
342         Iterator<IQElt> eachInQueue = queue.iterator();
343
344         while (eachInQueue.hasNext() && (selectedAckList.size() < Defs.MAXQUEUESIZE)) {
345             IQElt anIQElt = eachInQueue.next();
346
347             if (anIQElt.seqnum > seqnAck) {
348                 if (!anIQElt.ackd) {
349                     selectedAckList.add(anIQElt.seqnum);
350                     anIQElt.ackd = true;
351                 }
352             }
353         }
354         
355         // PERMIT DUPLICATE ACKS. Just a list and one small message.
356         sendACK(seqnAck, selectedAckList);
357     }
358     
359     /**
360      *  Build an ACK message. The message provides a sequential ACK count and
361      *  an optional list of selective ACKs.
362      *
363      *  @param seqnAck the sequence number being sequential ACKed
364      *  @param sackList a list of selective ACKs. Must be sorted in increasing
365      *  order.
366      */
367     private void sendACK(int seqnAck, List<Integer> sackList) {
368         ByteArrayOutputStream bos = new ByteArrayOutputStream((1 + sackList.size()) * 4);
369         DataOutputStream dos = new DataOutputStream(bos);
370         
371         try {
372             dos.writeInt(seqnAck);
373             for (Integer aSackList : sackList) {
374                 dos.writeInt(aSackList);
375             }
376             dos.close();
377             bos.close();
378             
379             Message ACKMsg = new Message();
380             MessageElement elt = new ByteArrayMessageElement(Defs.ACK_ELEMENT_NAME, Defs.MIME_TYPE_ACK, bos.toByteArray(), null);
381             
382             ACKMsg.addMessageElement(Defs.NAMESPACE, elt);
383             
384             outgoing.send(ACKMsg);
385             
386             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
387                 LOG.fine("SENT ACK, seqn#" + seqnAck + " and " + sackList.size() + " SACKs ");
388             }
389         } catch (IOException e) {
390             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
391                 LOG.log(Level.WARNING, "sendACK caught IOException:", e);
392             }
393         }
394     }
395     
396     /**
397      *  {@inheritDoc}
398      */
399     public void recv(Message msg) {
400         queueIncomingMessage(msg);
401     }
402     
403     public boolean hasNextMessage() {
404         return !inputQueue.isEmpty();
405     }
406        
407     Message nextMessage(boolean blocking) throws IOException {
408         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
409             LOG.fine("nextMessage blocking?  [" + blocking + "]");
410         }
411         MessageElement elt = dequeueMessage(sequenceNumber + 1, blocking);
412
413         if (null == elt) {
414             return null;
415         }
416         sequenceNumber += 1; // next msg sequence number
417         
418         Message msg;
419
420         try {
421             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
422                 LOG.fine("Converting message seqn :" + (sequenceNumber - 1) + "element to message");
423             }
424             
425             msg = WireFormatMessageFactory.fromWire(elt.getStream(), Defs.MIME_TYPE_MSG, null);
426         } catch (IOException ex) {
427             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
428                 LOG.log(Level.WARNING, "Could not deserialize message " + elt.getElementName(), ex);
429             }
430             return null;
431         }
432         return msg;
433     }
434     
435     /**
436      *  queue messages by sequence number.
437      */
438     private void queueIncomingMessage(Message msg) {
439         
440         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
441             LOG.fine("Queue Incoming Message begins for " + msg);
442         }
443         
444         long startEnqueue = TimeUtils.timeNow();
445         
446         Iterator<MessageElement> eachElement = msg.getMessageElements(Defs.NAMESPACE, Defs.MIME_TYPE_BLOCK);
447         
448         // OK look for jxta message
449         while (!closed && !closing && eachElement.hasNext()) {
450             MessageElement elt = eachElement.next();
451
452             eachElement.remove();
453             
454             int msgSeqn;
455
456             try {
457                 msgSeqn = Integer.parseInt(elt.getElementName());
458             } catch (NumberFormatException n) {
459                 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
460                     LOG.warning("Discarding element (" + elt.getElementName() + ") Not one of ours.");
461                 }
462                 continue;
463             }
464             
465             IQElt newElt = new IQElt(msgSeqn, elt);
466             
467             // OK we must enqueue
468             
469             // We rely on the sender to not to send more than the window size
470             // because we do not limit the number of elements we allow to be
471             // enqueued.
472             
473             // see if this is a duplicate
474             if (newElt.seqnum <= sequenceNumber) {
475                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
476                     LOG.fine("RCVD OLD MESSAGE : Discard seqn#" + newElt.seqnum + " now at seqn#" + sequenceNumber);
477                 }
478                 break;
479             }
480             
481             synchronized (inputQueue) {
482                 
483                 // dbl check with the lock held.
484                 if (closing || closed) {
485                     return;
486                 }
487                 
488                 // Insert this message into the input queue.
489                 // 1. Do not add duplicate messages
490                 // 2. Store in increasing sequence nos.
491                 int insertIndex = inputQueue.size();
492                 boolean duplicate = false;
493                 
494                 for (int j = 0; j < inputQueue.size(); j++) {
495                     IQElt iq = inputQueue.get(j);
496
497                     if (newElt.seqnum < iq.seqnum) {
498                         insertIndex = j;
499                         break;
500                     } else if (newElt.seqnum == iq.seqnum) {
501                         duplicate = true;
502                         break;
503                     }
504                 }
505                 
506                 if (duplicate) {
507                     if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
508                         LOG.fine("RCVD OLD MESSAGE :  Discard duplicate msg, seqn#" + newElt.seqnum);
509                     }
510                     break;
511                 }
512                 
513                 inputQueue.add(insertIndex, newElt);
514                 
515                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
516                     LOG.fine("Enqueued msg with seqn#" + newElt.seqnum + " at index " + insertIndex);
517                 }
518                 
519                 inputQueue.notifyAll();
520             }
521         }
522         
523         if (listener != null) {
524             Message newmsg = null;
525
526             while (true) {
527                 try {
528                     newmsg = nextMessage(false);
529                 } catch (IOException io) {// do nothing as this exception will never occur
530                 }
531                 if (newmsg == null) {
532                     break;
533                 }
534                 try {
535                     if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
536                         LOG.fine("In listener mode, calling back listener");
537                     }
538                     listener.processIncomingMessage(newmsg);
539                 } catch (Throwable all) {
540                     if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
541                         LOG.log(Level.WARNING, "Uncaught Throwable calling listener", all);
542                     }
543                 }
544             }
545         }
546         
547         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
548             long waited = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), startEnqueue);
549
550             LOG.fine("Queue Incoming Message for " + msg + " completed in " + waited + " msec.");
551         }
552     }
553     
554     long nextRetransRequest = TimeUtils.toAbsoluteTimeMillis(TimeUtils.ASECOND);
555     
556     /**
557      *  Dequeue the message with the desired sequence number waiting as needed
558      *  until the message is available.
559      *
560      *  @param desiredSeqn the sequence number to be dequeued.
561      *  @param blocking If {@code true} then this method should block while
562      *  waiting for the specified message sequence number.
563      *  @return the Message Element with the desired sequence number or null if
564      *  the queue has been closed.
565      */
566     private MessageElement dequeueMessage(int desiredSeqn, boolean blocking) throws IOException {
567         IQElt iQ = null;
568         
569         // Wait for incoming message here
570         long startDequeue = TimeUtils.timeNow();
571         long timeoutAt = TimeUtils.toAbsoluteTimeMillis(timeout);
572         int wct = 0;
573         
574         synchronized (inputQueue) {
575             while (!closed) {
576                 if (inputQueue.isEmpty()) {
577                     if (!blocking) {
578                         return null;
579                     }
580                     if (closing) {
581                         return null;
582                     }
583                     try {
584                         wct++;
585                         inputQueue.wait(TimeUtils.ASECOND);
586                         if (timeoutAt < TimeUtils.timeNow()) {
587                             throw new SocketTimeoutException("Read timeout reached");
588                         }
589                     } catch (InterruptedException e) {
590                         Thread.interrupted();
591                     }
592                     // reset retrans request timer since we don't want to immediately
593                     // request retry after a long wait for out of order messages.
594                     
595                     nextRetransRequest = TimeUtils.toAbsoluteTimeMillis(TimeUtils.ASECOND);
596                     continue;
597                 }
598                 
599                 iQ = inputQueue.get(0); // FIFO
600                 
601                 if (iQ.seqnum < desiredSeqn) {
602                     // Ooops a DUPE slipped in the head of the queue undetected
603                     // (seqnum consistency issue).
604                     // Just drop it.
605                     inputQueue.remove(0);
606                     // if such is the case then notify the other end so that
607                     // the message does not remain in the retry queue eventually
608                     // triggering a broken pipe exception
609                     sendACK(iQ.seqnum);
610                     continue;
611                 } else if (iQ.seqnum != desiredSeqn) {
612                     if (TimeUtils.toRelativeTimeMillis(nextRetransRequest) < 0) {
613                         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
614                             LOG.fine("Trigger retransmission. Wanted seqn#" + desiredSeqn + " found seqn#" + iQ.seqnum);
615                         }
616                         sendACK(desiredSeqn - 1);
617                         nextRetransRequest = TimeUtils.toAbsoluteTimeMillis(TimeUtils.ASECOND);
618                     }
619                     if (!blocking) {
620                         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
621                             LOG.fine("Message out of sequece in Non-Blocking mode. returning");
622                         }
623                         // not the element of interest return nothing
624                         return null;
625                     }
626                     try {
627                         wct++;
628                         inputQueue.wait(TimeUtils.ASECOND);
629                         if (timeoutAt < TimeUtils.timeNow()) {
630                             throw new SocketTimeoutException("Read timeout reached");
631                         }
632                     } catch (InterruptedException e) {
633                         throw new InterruptedIOException("IO interrupted ");
634                     }
635                     continue;
636                 }
637                 inputQueue.remove(0);
638                 break;
639             }
640         }
641         nextRetransRequest = 0;
642         // if we are closed then we return null
643         if (null == iQ) {
644             return null;
645         }
646         
647         sendACK(desiredSeqn);
648         
649         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
650             long waited = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), startDequeue);
651
652             LOG.fine("DEQUEUED seqn#" + iQ.seqnum + " in " + waited + " msec on input queue");
653             if (wct > 0) {
654                 LOG.fine("DEQUEUE waited " + wct + " times on input queue");
655             }
656         }
657         return iQ.elt;
658     }
659     
660     /**
661      * {@inheritDoc}
662      */
663     @Override
664     public int available() throws IOException {
665         if (listener != null) {
666             throw new IOException("available() not supported in async mode");
667         }
668         if (closed) {
669             throw new IOException("Stream closed");
670         }
671         synchronized (record) {
672             if (record.inputStream != null) {
673                 if ((record.size == 0) || (record.nextByte == record.size)) {
674                     if (inputQueue.isEmpty()) {
675                         return 0;
676                     }
677                     // reset the record
678                     record.resetRecord(); // GC as necessary(inputStream byte[])
679                     if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
680                         LOG.fine("Getting next data block at seqn#" + (sequenceNumber + 1));
681                     }
682                     MessageElement elt = dequeueMessage(sequenceNumber + 1, false);
683
684                     if (null == elt) {
685                         return 0;
686                     }
687                     sequenceNumber += 1; // next msg sequence number
688                     // Get the length of the Record
689                     record.size = elt.getByteLength();
690                     record.inputStream = elt.getStream();
691                 }
692                 return record.inputStream.available();
693             }
694         }
695         return 0;
696     }
697     
698     private int local_read(byte[] buf, int offset, int length) throws IOException {
699         
700         if (listener != null) {
701             throw new IOException("read() not supported in async mode");
702         }
703         
704         synchronized (record) {
705             if ((record.size == 0) || (record.nextByte == record.size)) {
706                 
707                 // reset the record
708                 record.resetRecord(); // GC as necessary(inputStream byte[])
709                 
710                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
711                     LOG.fine("Getting next data block at seqn#" + (sequenceNumber + 1));
712                 }
713                 
714                 MessageElement elt = dequeueMessage(sequenceNumber + 1, true);
715                 
716                 if (null == elt) {
717                     return -1;
718                 }
719                 
720                 sequenceNumber += 1; // next msg sequence number
721                 
722                 // Get the length of the Record
723                 record.size = elt.getByteLength();
724                 record.inputStream = elt.getStream();
725                 
726                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
727                     LOG.fine("new seqn#" + sequenceNumber + ", bytes = " + record.size);
728                 }
729             }
730             
731             // return the requested Record data
732             // These calls should NEVER ask for more data than is in the
733             // received Record.
734             
735             long left = record.size - record.nextByte;
736             int copyLen = (int) Math.min(length, left);
737             int copied = 0;
738             
739             do {
740                 int res = record.inputStream.read(buf, offset + copied, copyLen - copied);
741                 
742                 if (res < 0) {
743                     break;
744                 }
745                 copied += res;
746             } while (copied < copyLen);
747             
748             record.nextByte += copied;
749             
750             if (Logging.SHOW_FINER && LOG.isLoggable(Level.FINER)) {
751                 LOG.finer("Requested " + length + ", Read " + copied + " bytes");
752             }
753             
754             return copied;
755         }
756     }
757     
758     /**
759      * Returns the message listener for this pipe
760      * @return MsgListener
761      *
762      */
763     public MsgListener getListener() {
764         return listener;
765     }
766     
767     /**
768      *  The listener interface for receiving {@link net.jxta.endpoint.Message}
769      */
770     public interface MsgListener {
771         
772         /**
773          * Called for each message received.
774          *
775          * @param message The message to be received.
776          */
777         void processIncomingMessage(Message message);
778     }
779 }