2 * Copyright (c) 2001-2007 Sun Microsystems, Inc. All rights reserved.
4 * The Sun Project JXTA(TM) Software License
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
9 * 1. Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
16 * 3. The end-user documentation included with the redistribution, if any, must
17 * include the following acknowledgment: "This product includes software
18 * developed by Sun Microsystems, Inc. for JXTA(TM) technology."
19 * Alternately, this acknowledgment may appear in the software itself, if
20 * and wherever such third-party acknowledgments normally appear.
22 * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must
23 * not be used to endorse or promote products derived from this software
24 * without prior written permission. For written permission, please contact
25 * Project JXTA at http://www.jxta.org.
27 * 5. Products derived from this software may not be called "JXTA", nor may
28 * "JXTA" appear in their name, without prior written permission of Sun.
30 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
31 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
32 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN
33 * MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
34 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
36 * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
37 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
38 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
39 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41 * JXTA is a registered trademark of Sun Microsystems, Inc. in the United
42 * States and other countries.
44 * Please see the license information page at :
45 * <http://www.jxta.org/project/www/license.html> for instructions on use of
46 * the license in source files.
48 * ====================================================================
50 * This software consists of voluntary contributions made by many individuals
51 * on behalf of Project JXTA. For more information on Project JXTA, please see
52 * http://www.jxta.org.
54 * This license is based on the BSD license adopted by the Apache Foundation.
57 package net.jxta.impl.endpoint.tls;
60 import java.io.ByteArrayOutputStream;
61 import java.io.DataOutputStream;
62 import java.io.InputStream;
63 import java.io.IOException;
64 import java.net.SocketTimeoutException;
65 import java.io.InterruptedIOException;
66 import java.util.ArrayList;
67 import java.util.Iterator;
68 import java.util.List;
69 import java.util.Vector;
71 import net.jxta.endpoint.ByteArrayMessageElement;
72 import net.jxta.endpoint.Message;
73 import net.jxta.endpoint.MessageElement;
74 import net.jxta.impl.util.TimeUtils;
76 import java.util.logging.Level;
77 import net.jxta.logging.Logging;
78 import java.util.logging.Logger;
82 * Acts as the input for TLS. Accepts ciphertext which arrives in messages
83 * and orders it before passing it to TLS for decryption.
85 * TLS will do its raw reads off of this InputStream
86 * Here, we will have queued up the payload of TLS message
87 * elements to be passed to TLS code as TLS Records.
90 class JTlsInputStream extends InputStream {
91 private static final Logger LOG = Logger.getLogger(JTlsInputStream.class.getName());
93 private static final boolean DEBUGIO = false;
95 static private int MAXQUEUESIZE = 25;
98 * Connection we are working for.
100 private TlsConn conn;
102 private volatile boolean closed = false;
103 private boolean closing = false;
105 private long timeout = 2 * TimeUtils.AMINUTE;
106 private JTlsRecord jtrec = null;
107 private volatile int sequenceNumber = 0;
108 private final Vector<IQElt> inputQueue = new Vector<IQElt>(MAXQUEUESIZE); // For incoming messages.
111 * Input TLS record Object
113 private static class JTlsRecord {
114 // This dummy message elt
115 public InputStream tlsRecord; // TLS Record
116 public long nextByte; // next inbuff byte
117 public long size; // size of TLS Record
119 public JTlsRecord() {
120 tlsRecord = null; // allocated by caller
121 nextByte = 0; // We read here (set by caller)
122 size = 0; // TLS Record size(set by caller)
125 // reset the jxta tls record element
127 public void resetRecord() {
128 if (null != tlsRecord) {
131 } catch (IOException ignored) {// ignored
140 // An input queue element which breaks out a
141 // received message in enqueueMessage().
142 private static class IQElt {
148 public JTlsInputStream(TlsConn conn, long timeout) {
149 this.timeout = timeout;
151 jtrec = new JTlsRecord();
152 // 1 <= seq# <= maxint, monotonically increasing
153 // Incremented before compare.
162 public void close() throws IOException {
166 synchronized (inputQueue) {
168 inputQueue.notifyAll();
173 * prepare this input stream to being closed. It will still
174 * deliver the packets that have been received, but nothing
175 * more. This is meant to be called in response to the other side
176 * having initiated closure. We assume that when the other side does it
177 * it means that it is satified with what we have acknoleged so far.
179 public void setClosing() throws IOException {
180 synchronized (inputQueue) {
182 inputQueue.notifyAll();
186 // Here we read the TLS Record data from the incoming JXTA message.
187 // (We will really have a full jxta message available.)
189 // TLS Record input only calls the following methods.
190 // They are called from SSLRecord.decode(SSLConn, Inputstream);
197 public int read() throws IOException {
202 byte[] a = new byte[1];
205 int len = local_read(a, 0, 1);
212 if (DEBUGIO && Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
213 LOG.fine("Read() : " + (a[0] & 255));
216 return (a[0] & 0xFF); // The byte
219 // If we've reached EOF, there's nothing to do but close().
229 public int read(byte[] a, int offset, int length) throws IOException {
238 int i = local_read(a, offset, length);
240 if (DEBUGIO && Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
241 LOG.fine("Read(byte[], int, " + length + "), bytes read = " + i);
244 // If we've reached EOF; there's nothing to do but close().
251 // protected accessor for sequence number
252 int getSequenceNumber() {
253 return sequenceNumber;
256 // Our input queue max size
262 * Send a sequential ACK and selective ACKs for all of the queued messages.
264 * @param seqnAck the sequence number being sequential ACKed
266 private void sendACK(int seqnAck) {
267 List<Integer> selectedAckList = new ArrayList<Integer>();
269 synchronized (inputQueue) {
270 Iterator<IQElt> eachInQueue = inputQueue.iterator();
272 while (eachInQueue.hasNext() && (selectedAckList.size() < MAXQUEUESIZE)) {
273 IQElt anIQElt = eachInQueue.next();
275 if (anIQElt.seqnum > seqnAck) {
276 selectedAckList.add(new Integer(anIQElt.seqnum));
281 // PERMIT DUPLICATE ACKS. Just a list and one small message.
282 sendACK(seqnAck, selectedAckList);
286 * Build an ACK message. The message provides a sequential ACK count and
287 * an optional list of selective ACKs.
289 * @param seqnAck the sequence number being sequential ACKed
290 * @param sackList a list of selective ACKs. Must be sorted in increasing
293 private void sendACK(int seqnAck, List<Integer> sackList) {
294 ByteArrayOutputStream bos = new ByteArrayOutputStream((1 + sackList.size()) * 4);
295 DataOutputStream dos = new DataOutputStream(bos);
298 dos.writeInt(seqnAck);
300 Iterator<Integer> eachSACK = sackList.iterator();
302 while (eachSACK.hasNext()) {
303 int aSack = (eachSACK.next()).intValue();
310 Message ACKMsg = new Message();
311 MessageElement elt = new ByteArrayMessageElement(JTlsDefs.ACKKEY, JTlsDefs.ACKS, bos.toByteArray(), null);
313 ACKMsg.addMessageElement(JTlsDefs.TLSNameSpace, elt);
315 conn.sendToRemoteTls(ACKMsg);
317 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
318 LOG.fine("SENT ACK, seqn#" + seqnAck + " and " + sackList.size() + " SACKs ");
320 } catch (IOException e) {
321 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
322 LOG.log(Level.INFO, "sendACK caught IOException:", e);
328 * queue messages by sequence number.
330 public void queueIncomingMessage(Message msg) {
332 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
333 LOG.fine("Queue Incoming Message begins for " + msg);
336 long startEnqueue = TimeUtils.timeNow();
338 Message.ElementIterator e = msg.getMessageElements(JTlsDefs.TLSNameSpace, JTlsDefs.BLOCKS);
340 // OK look for jxta message
341 while (!closed && !closing && e.hasNext()) {
342 MessageElement elt = e.next();
349 msgSeqn = Integer.parseInt(elt.getElementName());
350 } catch (NumberFormatException n) {
351 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
352 LOG.warning("Discarding element (" + elt.getElementName() + ") Not one of ours.");
357 IQElt newElt = new IQElt();
359 newElt.seqnum = msgSeqn;
363 // OK we must inqueue:
364 // Wait until someone dequeues if we are at the size limit
365 // see if this is a duplicate
366 if (newElt.seqnum <= sequenceNumber) {
367 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
368 LOG.fine("RCVD OLD MESSAGE : Discard seqn#" + newElt.seqnum + " now at seqn#" + sequenceNumber);
372 synchronized (inputQueue) {
373 // dbl check with the lock held.
374 if (closing || closed) {
378 // Insert this message into the input queue.
379 // 1. Do not add duplicate messages
380 // 2. Store in increasing sequence nos.
381 int insertIndex = inputQueue.size();
382 boolean duplicate = false;
384 for (int j = 0; j < inputQueue.size(); j++) {
385 IQElt iq = inputQueue.elementAt(j);
387 if (newElt.seqnum < iq.seqnum) {
390 } else if (newElt.seqnum == iq.seqnum) {
397 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
398 LOG.fine("RCVD OLD MESSAGE : Discard duplicate msg, seqn#" + newElt.seqnum);
404 inputQueue.add(insertIndex, newElt);
406 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
407 LOG.fine("Enqueued msg with seqn#" + newElt.seqnum + " at index " + insertIndex);
410 inputQueue.notifyAll();
414 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
415 long waited = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), startEnqueue);
417 LOG.fine("Queue Incoming Message for " + msg + " completed in " + waited + " msec.");
422 * Dequeue the message with the desired sequence number waiting as needed
423 * until the message is available.
425 * @param desiredSeqn the sequence number to be dequeued.
426 * @return the Message Element with the desired sequence number or null if
427 * the queue has been closed.
429 private MessageElement dequeueMessage(int desiredSeqn) throws IOException {
432 // Wait for incoming message here
433 long startDequeue = TimeUtils.timeNow();
434 long whenToTimeout = startDequeue + timeout;
437 long nextRetransRequest = TimeUtils.toAbsoluteTimeMillis(TimeUtils.ASECOND);
439 synchronized (inputQueue) {
441 if (inputQueue.size() == 0) {
447 inputQueue.wait(TimeUtils.ASECOND);
448 if (whenToTimeout < TimeUtils.timeNow()) {
449 throw new SocketTimeoutException("Read timeout reached");
451 } catch (InterruptedException e) {
452 Thread.interrupted(); // just continue
454 // we reset the retrans request timer since we don't want to
455 // immediately request retry after a long wait for out of
458 nextRetransRequest = TimeUtils.toAbsoluteTimeMillis(TimeUtils.ASECOND);
462 iQ = inputQueue.elementAt(0); // FIFO
464 if (iQ.seqnum < desiredSeqn) {
465 // Ooops a DUPE slipped in the head of the queue undetected
466 // (seqnum consistency issue).
468 inputQueue.remove(0);
469 // if such is the case then notify the other end so that
470 // the message does not remain in the retry queue eventually
471 // triggering a broken pipe exception
474 } else if (iQ.seqnum != desiredSeqn) {
475 if (TimeUtils.toRelativeTimeMillis(nextRetransRequest) < 0) {
476 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
477 LOG.fine("Trigger retransmission. Wanted seqn#" + desiredSeqn + " found seqn#" + iQ.seqnum);
479 sendACK(desiredSeqn - 1);
480 nextRetransRequest = TimeUtils.toAbsoluteTimeMillis(TimeUtils.ASECOND);
485 inputQueue.wait(TimeUtils.ASECOND);
486 if (whenToTimeout < TimeUtils.timeNow()) {
487 throw new SocketTimeoutException("Read timeout reached");
489 } catch (InterruptedException e) {
490 throw new InterruptedIOException("IO interrupted ");
495 inputQueue.remove(0);
500 nextRetransRequest = 0;
501 sendACK(desiredSeqn);
502 // if we are closed then we return null
507 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
508 long waited = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), startDequeue);
510 LOG.info("DEQUEUED seqn#" + iQ.seqnum + " in " + waited + " msec on input queue");
511 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
513 LOG.fine("DEQUEUE waited " + wct + " times on input queue");
524 private int local_read(byte[] a, int offset, int length) throws IOException {
526 synchronized (jtrec) {
527 if ((jtrec.size == 0) || (jtrec.nextByte == jtrec.size)) {
530 jtrec.resetRecord(); // GC as necessary(tlsRecord byte[])
532 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
533 LOG.fine("local_read: getting next data block at seqn#" + (sequenceNumber + 1));
536 MessageElement elt = null;
539 elt = dequeueMessage(sequenceNumber + 1);
540 } catch (SocketTimeoutException ste) {
541 // timed out with no data
542 // SSLSocket expects a 0 data in this case
550 sequenceNumber += 1; // next msg sequence number
552 // Get the length of the TLS Record
553 jtrec.size = elt.getByteLength();
554 jtrec.tlsRecord = elt.getStream();
556 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
557 LOG.fine("local_read: new seqn#" + sequenceNumber + ", bytes = " + jtrec.size);
561 // return the requested TLS Record data
562 // These calls should NEVER ask for more data than is in the
563 // received TLS Record.
565 long left = jtrec.size - jtrec.nextByte;
566 int copyLen = (int) Math.min(length, left);
570 int res = jtrec.tlsRecord.read(a, offset + copied, copyLen - copied);
577 } while (copied < copyLen);
579 jtrec.nextByte += copied;
582 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
583 LOG.fine("local_read: Requested " + length + ", Read " + copied + " bytes");