]> sjero.net Git - linphone/blob - p2pproxy/dependencies-src/jxse-src-2.5/impl/src/net/jxta/impl/endpoint/tls/JTlsInputStream.java
cbebd9f4356f045b7d6c565be309053b030a1f30
[linphone] / p2pproxy / dependencies-src / jxse-src-2.5 / impl / src / net / jxta / impl / endpoint / tls / JTlsInputStream.java
1 /*
2  * Copyright (c) 2001-2007 Sun Microsystems, Inc.  All rights reserved.
3  *  
4  *  The Sun Project JXTA(TM) Software License
5  *  
6  *  Redistribution and use in source and binary forms, with or without 
7  *  modification, are permitted provided that the following conditions are met:
8  *  
9  *  1. Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *  
12  *  2. Redistributions in binary form must reproduce the above copyright notice, 
13  *     this list of conditions and the following disclaimer in the documentation 
14  *     and/or other materials provided with the distribution.
15  *  
16  *  3. The end-user documentation included with the redistribution, if any, must 
17  *     include the following acknowledgment: "This product includes software 
18  *     developed by Sun Microsystems, Inc. for JXTA(TM) technology." 
19  *     Alternately, this acknowledgment may appear in the software itself, if 
20  *     and wherever such third-party acknowledgments normally appear.
21  *  
22  *  4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must 
23  *     not be used to endorse or promote products derived from this software 
24  *     without prior written permission. For written permission, please contact 
25  *     Project JXTA at http://www.jxta.org.
26  *  
27  *  5. Products derived from this software may not be called "JXTA", nor may 
28  *     "JXTA" appear in their name, without prior written permission of Sun.
29  *  
30  *  THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
31  *  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND 
32  *  FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN 
33  *  MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
34  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 
35  *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, 
36  *  OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
37  *  LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
38  *  NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, 
39  *  EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
40  *  
41  *  JXTA is a registered trademark of Sun Microsystems, Inc. in the United 
42  *  States and other countries.
43  *  
44  *  Please see the license information page at :
45  *  <http://www.jxta.org/project/www/license.html> for instructions on use of 
46  *  the license in source files.
47  *  
48  *  ====================================================================
49  *  
50  *  This software consists of voluntary contributions made by many individuals 
51  *  on behalf of Project JXTA. For more information on Project JXTA, please see 
52  *  http://www.jxta.org.
53  *  
54  *  This license is based on the BSD license adopted by the Apache Foundation. 
55  */
56
57 package net.jxta.impl.endpoint.tls;
58
59
60 import java.io.ByteArrayOutputStream;
61 import java.io.DataOutputStream;
62 import java.io.InputStream;
63 import java.io.IOException;
64 import java.net.SocketTimeoutException;
65 import java.io.InterruptedIOException;
66 import java.util.ArrayList;
67 import java.util.Iterator;
68 import java.util.List;
69 import java.util.Vector;
70
71 import net.jxta.endpoint.ByteArrayMessageElement;
72 import net.jxta.endpoint.Message;
73 import net.jxta.endpoint.MessageElement;
74 import net.jxta.impl.util.TimeUtils;
75
76 import java.util.logging.Level;
77 import net.jxta.logging.Logging;
78 import java.util.logging.Logger;
79
80
81 /**
82  *  Acts as the input for TLS. Accepts ciphertext which arrives in messages
83  *  and orders it before passing it to TLS for decryption.
84  *
85  * TLS will do its raw reads off of this InputStream
86  * Here, we will have queued up the payload of TLS message
87  * elements to be passed to TLS code as TLS Records.
88  *
89  */
90 class JTlsInputStream extends InputStream {
91     private static final Logger LOG = Logger.getLogger(JTlsInputStream.class.getName());
92     
93     private static final boolean  DEBUGIO = false;
94     
95     static private int MAXQUEUESIZE = 25;
96     
97     /**
98      *  Connection we are working for.
99      */
100     private TlsConn conn;
101     
102     private volatile boolean closed = false;
103     private boolean closing = false;
104     
105     private long timeout = 2 * TimeUtils.AMINUTE;
106     private JTlsRecord jtrec = null;
107     private volatile int sequenceNumber = 0;
108     private final Vector<IQElt> inputQueue = new Vector<IQElt>(MAXQUEUESIZE); // For incoming messages.
109     
110     /**
111      * Input TLS record Object
112      **/
113     private static class JTlsRecord {
114         // This dummy message elt
115         public InputStream tlsRecord; // TLS Record
116         public long nextByte; // next inbuff byte
117         public long size; // size of TLS Record
118         
119         public JTlsRecord() {
120             tlsRecord = null; // allocated by caller
121             nextByte = 0; // We read here (set by caller)
122             size = 0; // TLS Record size(set by caller)
123         }
124         
125         // reset the jxta tls record element
126         
127         public void resetRecord() {
128             if (null != tlsRecord) {
129                 try {
130                     tlsRecord.close();
131                 } catch (IOException ignored) {// ignored
132                 }
133             }
134             tlsRecord = null;
135             size = nextByte = 0;
136         }
137     }
138     
139     
140     // An input queue element which breaks out a
141     // received message in enqueueMessage().
142     private static class IQElt {
143         int seqnum;
144         MessageElement elt;
145         boolean ackd;
146     }
147     
148     public JTlsInputStream(TlsConn conn, long timeout) {
149         this.timeout = timeout;
150         this.conn = conn;
151         jtrec = new JTlsRecord();
152         // 1 <= seq# <= maxint, monotonically increasing
153         // Incremented before compare.
154         sequenceNumber = 0;
155         
156     }
157     
158     /**
159      * {@inheritDoc}
160      **/
161     @Override
162     public void close() throws IOException {
163         super.close();
164         
165         closed = true;
166         synchronized (inputQueue) {
167             inputQueue.clear();
168             inputQueue.notifyAll();
169         }
170     }
171
172     /**
173      * prepare this input stream to being closed. It will still
174      * deliver the packets that have been received, but nothing
175      * more. This is meant to be called in response to the other side
176      * having initiated closure. We assume that when the other side does it
177      * it means that it is satified with what we have acknoleged so far.
178      */
179     public void setClosing() throws IOException {
180         synchronized (inputQueue) {
181             closing = true;
182             inputQueue.notifyAll();
183         }
184     }
185     
186     // Here we read the TLS Record data from the incoming JXTA message.
187     // (We will really have a full jxta message available.)
188     //
189     // TLS  Record input only calls the following  methods.
190     // They are called from SSLRecord.decode(SSLConn, Inputstream);
191     //
192     
193     /**
194      * {@inheritDoc}
195      */
196     @Override
197     public int read() throws IOException {
198         if (closed) {
199             return -1;
200         }
201         
202         byte[] a = new byte[1];
203         
204         while (true) {
205             int len = local_read(a, 0, 1);
206             
207             if (len < 0) {
208                 break;
209             }
210             
211             if (len > 0) {
212                 if (DEBUGIO && Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
213                     LOG.fine("Read() : " + (a[0] & 255));
214                 }
215                 
216                 return (a[0] & 0xFF); // The byte
217             }
218         }
219         // If we've reached EOF, there's nothing to do but close().
220         
221         close();
222         return -1;
223     }
224     
225     /**
226      * {@inheritDoc}
227      */
228     @Override
229     public int read(byte[] a, int offset, int length) throws IOException {
230         if (closed) {
231             return -1;
232         }
233         
234         if (0 == length) {
235             return 0;
236         }
237         
238         int i = local_read(a, offset, length);
239         
240         if (DEBUGIO && Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
241             LOG.fine("Read(byte[], int, " + length + "), bytes read = " + i);
242         }
243         
244         // If we've reached EOF; there's nothing to do but close().
245         if (i == -1) {
246             close();
247         }
248         return i;
249     }
250     
251     // protected accessor for sequence number
252     int getSequenceNumber() {
253         return sequenceNumber;
254     }
255     
256     // Our input queue max size
257     int getMaxIQSize() {
258         return MAXQUEUESIZE;
259     }
260     
261     /**
262      *  Send a sequential ACK and selective ACKs for all of the queued messages.
263      *
264      *  @param seqnAck the sequence number being sequential ACKed
265      **/
266     private void sendACK(int seqnAck) {
267         List<Integer> selectedAckList = new ArrayList<Integer>();
268         
269         synchronized (inputQueue) {
270             Iterator<IQElt> eachInQueue = inputQueue.iterator();
271             
272             while (eachInQueue.hasNext() && (selectedAckList.size() < MAXQUEUESIZE)) {
273                 IQElt anIQElt = eachInQueue.next();
274
275                 if (anIQElt.seqnum > seqnAck) {
276                     selectedAckList.add(new Integer(anIQElt.seqnum));
277                 }
278             }
279         }
280         
281         // PERMIT DUPLICATE ACKS. Just a list and one small message.
282         sendACK(seqnAck, selectedAckList);
283     }
284     
285     /**
286      *  Build an ACK message. The message provides a sequential ACK count and
287      *  an optional list of selective ACKs.
288      *
289      *  @param seqnAck the sequence number being sequential ACKed
290      *  @param sackList a list of selective ACKs. Must be sorted in increasing
291      *  order.
292      */
293     private void sendACK(int seqnAck, List<Integer> sackList) {
294         ByteArrayOutputStream bos = new ByteArrayOutputStream((1 + sackList.size()) * 4);
295         DataOutputStream dos = new DataOutputStream(bos);
296         
297         try {
298             dos.writeInt(seqnAck);
299             
300             Iterator<Integer> eachSACK = sackList.iterator();
301             
302             while (eachSACK.hasNext()) {
303                 int aSack = (eachSACK.next()).intValue();
304
305                 dos.writeInt(aSack);
306             }
307             dos.close();
308             bos.close();
309             
310             Message ACKMsg = new Message();
311             MessageElement elt = new ByteArrayMessageElement(JTlsDefs.ACKKEY, JTlsDefs.ACKS, bos.toByteArray(), null);
312             
313             ACKMsg.addMessageElement(JTlsDefs.TLSNameSpace, elt);
314             
315             conn.sendToRemoteTls(ACKMsg);
316             
317             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
318                 LOG.fine("SENT ACK, seqn#" + seqnAck + " and " + sackList.size() + " SACKs ");
319             }
320         } catch (IOException e) {
321             if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
322                 LOG.log(Level.INFO, "sendACK caught IOException:", e);
323             }
324         }
325     }
326     
327     /**
328      *  queue messages by sequence number.
329      */
330     public void queueIncomingMessage(Message msg) {
331         
332         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
333             LOG.fine("Queue Incoming Message begins for " + msg);
334         }
335         
336         long startEnqueue = TimeUtils.timeNow();
337         
338         Message.ElementIterator e = msg.getMessageElements(JTlsDefs.TLSNameSpace, JTlsDefs.BLOCKS);
339         
340         // OK look for jxta message
341         while (!closed && !closing && e.hasNext()) {
342             MessageElement elt = e.next();
343
344             e.remove();
345             
346             int msgSeqn = 0;
347             
348             try {
349                 msgSeqn = Integer.parseInt(elt.getElementName());
350             } catch (NumberFormatException n) {
351                 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
352                     LOG.warning("Discarding element (" + elt.getElementName() + ") Not one of ours.");
353                 }
354                 continue;
355             }
356             
357             IQElt newElt = new IQElt();
358             
359             newElt.seqnum = msgSeqn;
360             newElt.elt = elt;
361             newElt.ackd = false;
362             
363             // OK we must inqueue:
364             // Wait until someone dequeues if we are at the size limit
365             // see if this is a duplicate
366             if (newElt.seqnum <= sequenceNumber) {
367                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
368                     LOG.fine("RCVD OLD MESSAGE : Discard seqn#" + newElt.seqnum + " now at seqn#" + sequenceNumber);
369                 }
370                 break;
371             }
372             synchronized (inputQueue) {
373                 // dbl check with the lock held.
374                 if (closing || closed) {
375                     return;
376                 }
377                 
378                 // Insert this message into the input queue.
379                 // 1. Do not add duplicate messages
380                 // 2. Store in increasing sequence nos.
381                 int insertIndex = inputQueue.size();
382                 boolean duplicate = false;
383                 
384                 for (int j = 0; j < inputQueue.size(); j++) {
385                     IQElt iq = inputQueue.elementAt(j);
386
387                     if (newElt.seqnum < iq.seqnum) {
388                         insertIndex = j;
389                         break;
390                     } else if (newElt.seqnum == iq.seqnum) {
391                         duplicate = true;
392                         break;
393                     }
394                 }
395                 
396                 if (duplicate) {
397                     if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
398                         LOG.fine("RCVD OLD MESSAGE : Discard duplicate msg, seqn#" + newElt.seqnum);
399                     }
400                     newElt = null;
401                     break;
402                 }
403                 
404                 inputQueue.add(insertIndex, newElt);
405                 
406                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
407                     LOG.fine("Enqueued msg with seqn#" + newElt.seqnum + " at index " + insertIndex);
408                 }
409                 
410                 inputQueue.notifyAll();
411                 newElt = null;
412             }
413         }
414         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
415             long waited = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), startEnqueue);
416             
417             LOG.fine("Queue Incoming Message for " + msg + " completed in " + waited + " msec.");
418         }
419     }
420     
421     /**
422      *  Dequeue the message with the desired sequence number waiting as needed
423      *  until the message is available.
424      *
425      *  @param desiredSeqn the sequence number to be dequeued.
426      *  @return the Message Element with the desired sequence number or null if
427      *  the queue has been closed.
428      **/
429     private MessageElement dequeueMessage(int desiredSeqn) throws IOException {
430         IQElt iQ = null;
431         
432         // Wait for incoming message here
433         long startDequeue = TimeUtils.timeNow();
434         long whenToTimeout = startDequeue + timeout;
435         int wct = 0;
436         
437         long nextRetransRequest = TimeUtils.toAbsoluteTimeMillis(TimeUtils.ASECOND);
438         
439         synchronized (inputQueue) {
440             while (!closed) {
441                 if (inputQueue.size() == 0) {
442                     if (closing) {
443                         return null;
444                     }
445                     try {
446                         wct++;
447                         inputQueue.wait(TimeUtils.ASECOND);
448                         if (whenToTimeout < TimeUtils.timeNow()) {
449                             throw new SocketTimeoutException("Read timeout reached");
450                         }
451                     } catch (InterruptedException e) {
452                         Thread.interrupted(); // just continue
453                     }
454                     // we reset the retrans request timer since we don't want to
455                     // immediately request retry after a long wait for out of
456                     // order messages.
457                     
458                     nextRetransRequest = TimeUtils.toAbsoluteTimeMillis(TimeUtils.ASECOND);
459                     continue;
460                 }
461                 
462                 iQ = inputQueue.elementAt(0); // FIFO
463                 
464                 if (iQ.seqnum < desiredSeqn) {
465                     // Ooops a DUPE slipped in the head of the queue undetected
466                     // (seqnum consistency issue).
467                     // Just drop it.
468                     inputQueue.remove(0);
469                     // if such is the case then notify the other end so that
470                     // the message does not remain in the retry queue eventually
471                     // triggering a broken pipe exception
472                     sendACK(iQ.seqnum);
473                     continue;
474                 } else if (iQ.seqnum != desiredSeqn) {
475                     if (TimeUtils.toRelativeTimeMillis(nextRetransRequest) < 0) {
476                         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
477                             LOG.fine("Trigger retransmission. Wanted seqn#" + desiredSeqn + " found seqn#" + iQ.seqnum);
478                         }
479                         sendACK(desiredSeqn - 1);
480                         nextRetransRequest = TimeUtils.toAbsoluteTimeMillis(TimeUtils.ASECOND);
481                     }
482                     
483                     try {
484                         wct++;
485                         inputQueue.wait(TimeUtils.ASECOND);
486                         if (whenToTimeout < TimeUtils.timeNow()) {
487                             throw new SocketTimeoutException("Read timeout reached");
488                         }
489                     } catch (InterruptedException e) {
490                         throw new InterruptedIOException("IO interrupted ");
491                     }
492                     continue;
493                 }
494                 
495                 inputQueue.remove(0);
496                 break;
497             }
498         }
499         
500         nextRetransRequest = 0;
501         sendACK(desiredSeqn);
502         // if we are closed then we return null
503         if (null == iQ) {
504             return null;
505         }
506         
507         if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
508             long waited = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), startDequeue);
509             
510             LOG.info("DEQUEUED seqn#" + iQ.seqnum + " in " + waited + " msec on input queue");
511             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
512                 if (wct > 0) {
513                     LOG.fine("DEQUEUE waited " + wct + " times on input queue");
514                 }
515             }
516         }
517         
518         return iQ.elt;
519     }
520     
521     /**
522      *
523      */
524     private int local_read(byte[] a, int offset, int length) throws IOException {
525         
526         synchronized (jtrec) {
527             if ((jtrec.size == 0) || (jtrec.nextByte == jtrec.size)) {
528                 
529                 // reset the record
530                 jtrec.resetRecord(); // GC as necessary(tlsRecord byte[])
531                 
532                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
533                     LOG.fine("local_read: getting next data block at seqn#" + (sequenceNumber + 1));
534                 }
535                 
536                 MessageElement elt = null;
537
538                 try {
539                     elt = dequeueMessage(sequenceNumber + 1);
540                 } catch (SocketTimeoutException ste) {
541                     // timed out with no data
542                     // SSLSocket expects a 0 data in this case
543                     return 0;
544                 }
545                 
546                 if (null == elt) {
547                     return -1;
548                 }
549                 
550                 sequenceNumber += 1; // next msg sequence number
551                 
552                 // Get the length of the TLS Record
553                 jtrec.size = elt.getByteLength();
554                 jtrec.tlsRecord = elt.getStream();
555                 
556                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
557                     LOG.fine("local_read: new seqn#" + sequenceNumber + ", bytes = " + jtrec.size);
558                 }
559             }
560             
561             // return the requested TLS Record data
562             // These calls should NEVER ask for more data than is in the
563             // received TLS Record.
564             
565             long left = jtrec.size - jtrec.nextByte;
566             int copyLen = (int) Math.min(length, left);
567             int copied = 0;
568             
569             do {
570                 int res = jtrec.tlsRecord.read(a, offset + copied, copyLen - copied);
571                 
572                 if (res < 0) {
573                     break;
574                 }
575                 
576                 copied += res;
577             } while (copied < copyLen);
578             
579             jtrec.nextByte += copied;
580             
581             if (DEBUGIO) {
582                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
583                     LOG.fine("local_read: Requested " + length + ", Read " + copied + " bytes");
584                 }
585             }
586             
587             return copied;
588         }
589     }
590 }