]> sjero.net Git - linphone/blob - p2pproxy/dependencies-src/jxse-src-2.5/api/src/net/jxta/util/JxtaBiDiPipe.java
remove mediastreamer2 and add it as a submodule instead.
[linphone] / p2pproxy / dependencies-src / jxse-src-2.5 / api / src / net / jxta / util / JxtaBiDiPipe.java
1 /*
2  * Copyright (c) 2006-2007 Sun Microsystems, Inc.  All rights reserved.
3  *  
4  *  The Sun Project JXTA(TM) Software License
5  *  
6  *  Redistribution and use in source and binary forms, with or without 
7  *  modification, are permitted provided that the following conditions are met:
8  *  
9  *  1. Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *  
12  *  2. Redistributions in binary form must reproduce the above copyright notice, 
13  *     this list of conditions and the following disclaimer in the documentation 
14  *     and/or other materials provided with the distribution.
15  *  
16  *  3. The end-user documentation included with the redistribution, if any, must 
17  *     include the following acknowledgment: "This product includes software 
18  *     developed by Sun Microsystems, Inc. for JXTA(TM) technology." 
19  *     Alternately, this acknowledgment may appear in the software itself, if 
20  *     and wherever such third-party acknowledgments normally appear.
21  *  
22  *  4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must 
23  *     not be used to endorse or promote products derived from this software 
24  *     without prior written permission. For written permission, please contact 
25  *     Project JXTA at http://www.jxta.org.
26  *  
27  *  5. Products derived from this software may not be called "JXTA", nor may 
28  *     "JXTA" appear in their name, without prior written permission of Sun.
29  *  
30  *  THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
31  *  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND 
32  *  FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN 
33  *  MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
34  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 
35  *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, 
36  *  OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
37  *  LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
38  *  NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, 
39  *  EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
40  *  
41  *  JXTA is a registered trademark of Sun Microsystems, Inc. in the United 
42  *  States and other countries.
43  *  
44  *  Please see the license information page at :
45  *  <http://www.jxta.org/project/www/license.html> for instructions on use of 
46  *  the license in source files.
47  *  
48  *  ====================================================================
49  *  
50  *  This software consists of voluntary contributions made by many individuals 
51  *  on behalf of Project JXTA. For more information on Project JXTA, please see 
52  *  http://www.jxta.org.
53  *  
54  *  This license is based on the BSD license adopted by the Apache Foundation. 
55  */
56 package net.jxta.util;
57
58 import net.jxta.credential.Credential;
59 import net.jxta.document.AdvertisementFactory;
60 import net.jxta.document.MimeMediaType;
61 import net.jxta.document.StructuredDocument;
62 import net.jxta.document.StructuredDocumentFactory;
63 import net.jxta.document.XMLDocument;
64 import net.jxta.endpoint.EndpointAddress;
65 import net.jxta.endpoint.EndpointService;
66 import net.jxta.endpoint.Message;
67 import net.jxta.endpoint.MessageElement;
68 import net.jxta.endpoint.Messenger;
69 import net.jxta.endpoint.StringMessageElement;
70 import net.jxta.endpoint.TextDocumentMessageElement;
71 import net.jxta.id.ID;
72 import net.jxta.impl.endpoint.tcp.TcpMessenger;
73 import net.jxta.impl.util.pipe.reliable.Defs;
74 import net.jxta.impl.util.pipe.reliable.FixedFlowControl;
75 import net.jxta.impl.util.pipe.reliable.OutgoingMsgrAdaptor;
76 import net.jxta.impl.util.pipe.reliable.ReliableInputStream;
77 import net.jxta.impl.util.pipe.reliable.ReliableOutputStream;
78 import net.jxta.logging.Logging;
79 import net.jxta.membership.MembershipService;
80 import net.jxta.peer.PeerID;
81 import net.jxta.peergroup.PeerGroup;
82 import net.jxta.pipe.InputPipe;
83 import net.jxta.pipe.OutputPipe;
84 import net.jxta.pipe.OutputPipeEvent;
85 import net.jxta.pipe.OutputPipeListener;
86 import net.jxta.pipe.PipeID;
87 import net.jxta.pipe.PipeMsgEvent;
88 import net.jxta.pipe.PipeMsgListener;
89 import net.jxta.pipe.PipeService;
90 import net.jxta.protocol.PeerAdvertisement;
91 import net.jxta.protocol.PipeAdvertisement;
92
93 import java.io.IOException;
94 import java.io.InputStream;
95 import java.net.SocketTimeoutException;
96 import java.util.Collections;
97 import java.util.Iterator;
98 import java.util.concurrent.ArrayBlockingQueue;
99 import java.util.concurrent.TimeUnit;
100 import java.util.logging.Level;
101 import java.util.logging.Logger;
102
103 /**
104  * JxtaBiDiPipe is a pair of UnicastPipe channels that implements a bidirectional pipe.
105  * By default, JxtaBiDiPipe operates in reliable mode, unless otherwise specified,
106  * in addition, messages must not exceed the Endpoint MTU size of 64K, exceed the
107  * MTU will lead to unexpected behavior.
108  * <p/>
109  * It highly recommended that an application message listener is specified, not doing so, may
110  * lead to message loss in the event the internal queue is overflowed.
111  * <p/>
112  * Sending messages vis {@link #sendMessage(Message)} from within a 
113  * {@code PipeMsgListener} may result in a deadlock due to contention
114  * between the sending and receiving portions of BiDi pipes. 
115  * <p/>
116  * JxtaBiDiPipe, whenever possible, will attempt to utilize direct tcp messengers,
117  * which leads to improved performance.
118  */
119 public class JxtaBiDiPipe implements PipeMsgListener, OutputPipeListener, ReliableInputStream.MsgListener {
120
121     /**
122      * Logger
123      */
124     private final static transient Logger LOG = Logger.getLogger(JxtaBiDiPipe.class.getName());
125
126     private final static int MAXRETRYTIMEOUT = 120 * 1000;
127     private PipeAdvertisement remotePipeAdv;
128     private PeerAdvertisement remotePeerAdv;
129     protected int timeout = 15 * 1000;
130     protected int retryTimeout = 60 * 1000;
131     protected int maxRetryTimeout = MAXRETRYTIMEOUT;
132     protected int windowSize = 50;
133
134     private ArrayBlockingQueue<PipeMsgEvent> queue = new ArrayBlockingQueue<PipeMsgEvent>(windowSize);
135     protected PeerGroup group;
136     protected PipeAdvertisement pipeAdv;
137     protected PipeAdvertisement myPipeAdv;
138     protected PipeService pipeSvc;
139     protected InputPipe inputPipe;
140     protected OutputPipe connectOutpipe;
141     protected Messenger msgr;
142     protected InputStream stream;
143     protected final Object closeLock = new Object();
144     protected final Object acceptLock = new Object();
145     protected final Object finalLock = new Object();
146     protected boolean closed = false;
147     protected boolean bound = false;
148     protected boolean dequeued = false;
149     protected PipeMsgListener msgListener;
150     protected PipeEventListener eventListener;
151     protected PipeStateListener stateListener;
152     protected Credential credential = null;
153     protected boolean waiting;
154
155     /**
156      * If {@code true} then we are using the underlying end-to-end ACK reliable
157      * layer to ensure that messages are received by the remote peer.
158      */
159     protected boolean isReliable = false;
160
161     protected ReliableInputStream ris = null;
162     protected ReliableOutputStream ros = null;
163
164     /**
165      * If {@code true} then we are using a reliable direct messenger to the
166      * remote peer. We will assume that messages which are sent successfully
167      * will be received successfully.
168      */
169     protected volatile boolean direct = false;
170     protected OutgoingMsgrAdaptor outgoing = null;
171     protected StructuredDocument credentialDoc = null;
172
173     /**
174      * Pipe close Event
175      */
176     public static final int PIPE_CLOSED_EVENT = 1;
177
178     /**
179      * Creates a bidirectional pipe
180      *
181      * @param group      group context
182      * @param msgr       lightweight output pipe
183      * @param pipe       PipeAdvertisement
184      * @param isReliable Whether the connection is reliable or not
185      * @param credDoc    Credential StructuredDocument
186      * @param direct     indicates a direct messenger pipe
187      * @throws IOException if an io error occurs
188      */
189     protected JxtaBiDiPipe(PeerGroup group, Messenger msgr, PipeAdvertisement pipe, StructuredDocument credDoc, boolean isReliable, boolean direct) throws IOException {
190         if (msgr == null) {
191             throw new IOException("Null Messenger");
192         }
193         this.direct = direct;
194         this.group = group;
195         this.pipeAdv = pipe;
196         this.credentialDoc = credDoc != null ? credDoc : getCredDoc(group);
197         this.pipeSvc = group.getPipeService();
198         this.inputPipe = pipeSvc.createInputPipe(pipe, this);
199         this.msgr = msgr;
200         this.isReliable = isReliable;
201         if (!direct) {
202             createRLib();
203         }
204         setBound();
205     }
206
207     /**
208      * Creates a new object with a default timeout of #timeout, and no reliability.
209      *
210      */
211     public JxtaBiDiPipe() {
212     }
213
214     /**
215      * Creates a bidirectional pipe.
216      *
217      * Attempts to create a bidirectional connection to remote peer within default
218      * timeout of #timeout.
219      *
220      * @param group       group context
221      * @param pipeAd      PipeAdvertisement
222      * @param msgListener application PipeMsgListener
223      * @throws IOException if an io error occurs
224      */
225     public JxtaBiDiPipe(PeerGroup group, PipeAdvertisement pipeAd, PipeMsgListener msgListener) throws IOException {
226         connect(group, null, pipeAd, timeout, msgListener);
227     }
228
229     /**
230      * Creates a bidirectional pipe.
231      *
232      * Attempts to create a bidirectional connection to remote peer within specified
233      * timeout of #timeout.
234      *
235      * @param group       group context
236      * @param timeout     The number of milliseconds within which the JxtaBiDiPipe must
237      *                    be successfully created. An exception will be thrown if the pipe
238      *                    cannot be created in the alotted time. A timeout value of {@code 0}
239      *                    (zero) specifies an infinite timeout.
240      * @param pipeAd      PipeAdvertisement
241      * @param msgListener application PipeMsgListener
242      * @throws IOException if an io error occurs
243      */
244     public JxtaBiDiPipe(PeerGroup group, PipeAdvertisement pipeAd, int timeout, PipeMsgListener msgListener) throws IOException {
245         connect(group, null, pipeAd, timeout, msgListener);
246     }
247
248     /**
249      * attempts to create a bidirectional connection to remote peer
250      *
251      * @param group       group context
252      * @param pipeAd      PipeAdvertisement
253      * @param timeout     The number of milliseconds within which the JxtaBiDiPipe must
254      *                    be successfully created. An exception will be thrown if the pipe
255      *                    cannot be created in the allotted time. A timeout value of {@code 0}
256      *                    (zero) specifies an infinite timeout.
257      * @param msgListener application PipeMsgListener
258      * @param reliable    if true, the reliability is assumed
259      * @throws IOException if an io error occurs
260      */
261     public JxtaBiDiPipe(PeerGroup group, PipeAdvertisement pipeAd, int timeout, PipeMsgListener msgListener, boolean reliable) throws IOException {
262         connect(group, null, pipeAd, timeout, msgListener, reliable);
263     }
264
265     /**
266      * Connect to a JxtaServerPipe with default timeout
267      *
268      * @param group  group context
269      * @param pipeAd PipeAdvertisement
270      * @throws IOException if an io error occurs
271      */
272     public void connect(PeerGroup group, PipeAdvertisement pipeAd) throws IOException {
273         connect(group, pipeAd, timeout);
274     }
275
276     /**
277      * Connects to a remote JxtaBiDiPipe
278      *
279      * @param group   group context
280      * @param pipeAd  PipeAdvertisement
281      * @param timeout timeout in ms, also reset object default timeout
282      *                to that of timeout
283      * @throws IOException if an io error occurs
284      */
285     public void connect(PeerGroup group, PipeAdvertisement pipeAd, int timeout) throws IOException {
286         connect(group, null, pipeAd, timeout, null);
287     }
288
289     /**
290      * Connects to a remote JxtaServerPipe
291      *
292      * @param group       group context
293      * @param peerid      peer to connect to
294      * @param pipeAd      PipeAdvertisement
295      * @param timeout     timeout in ms, also reset object default timeout to that of timeout
296      * @param msgListener application PipeMsgListener
297      * @throws IOException if an io error occurs
298      */
299     public void connect(PeerGroup group, PeerID peerid, PipeAdvertisement pipeAd, int timeout, PipeMsgListener msgListener) throws IOException {
300         connect(group, peerid, pipeAd, timeout, msgListener, isReliable);
301     }
302
303     /**
304      * Connects to a remote JxtaServerPipe
305      *
306      * @param group       group context
307      * @param peerid      peer to connect to
308      * @param pipeAd      PipeAdvertisement
309      * @param timeout     timeout in ms, also reset object default timeout to that of timeout
310      * @param msgListener application PipeMsgListener
311      * @param reliable    Reliable connection
312      * @throws IOException if an io error occurs
313      */
314     public void connect(PeerGroup group, PeerID peerid, PipeAdvertisement pipeAd, int timeout, PipeMsgListener msgListener, boolean reliable) throws IOException {
315         if (isBound()) {
316             throw new IOException("Pipe already bound");
317         }
318         if (timeout <= 0) {
319             throw new IllegalArgumentException("Invalid timeout :" + timeout);
320         }
321
322         this.pipeAdv = pipeAd;
323         this.group = group;
324         this.msgListener = msgListener;
325         if (msgListener != null) {
326             dequeued = true;
327         }
328         this.isReliable = reliable;
329         pipeSvc = group.getPipeService();
330         this.timeout = timeout;
331         myPipeAdv = JxtaServerPipe.newInputPipe(group, pipeAd);
332         this.inputPipe = pipeSvc.createInputPipe(myPipeAdv, this);
333         this.credentialDoc = credentialDoc != null ? credentialDoc : getCredDoc(group);
334         Message openMsg = createOpenMessage(group, myPipeAdv);
335
336         // create the output pipe and send this message
337         if (peerid == null) {
338             pipeSvc.createOutputPipe(pipeAd, this);
339         } else {
340             pipeSvc.createOutputPipe(pipeAd, Collections.singleton(peerid), this);
341         }
342         try {
343             synchronized (acceptLock) {
344                 // check connectOutpipe within lock to prevent a race with modification.
345                 if (connectOutpipe == null) {
346                     if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
347                         LOG.fine("Waiting for " + timeout + " msec");
348                     }
349                     acceptLock.wait(timeout);
350                 }
351             }
352         } catch (InterruptedException ie) {
353             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
354                 LOG.log(Level.FINE, "Interrupted", ie);
355             }
356             Thread.interrupted();
357             IOException exp = new IOException("Interrupted");
358             exp.initCause(ie);
359             throw exp;
360         }
361         if (connectOutpipe == null) {
362             throw new IOException("connection timeout");
363         }
364         // send connect message
365         waiting = true;
366         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
367             LOG.fine("Sending a backchannel message");
368         }
369         connectOutpipe.send(openMsg);
370         // wait for the second op
371         try {
372             synchronized (finalLock) {
373                 if (waiting) {
374                     if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
375                         LOG.fine("Waiting for " + timeout + " msec for back channel to be established");
376                     }
377                     finalLock.wait(timeout);
378                     // Need to check for creation
379                     if (msgr == null) {
380                         throw new IOException("connection timeout");
381                     }
382                 }
383             }
384         } catch (InterruptedException ie) {
385             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
386                 LOG.log(Level.FINE, "Interrupted", ie);
387             }
388             Thread.interrupted();
389             IOException exp = new IOException("Interrupted");
390             exp.initCause(ie);
391             throw exp;
392         }
393         setBound();
394         notifyListeners(PipeStateListener.PIPE_OPENED_EVENT);
395     }
396
397     /**
398      * creates all the reliability objects
399      */
400     private void createRLib() {
401         if (isReliable) {
402             if (outgoing == null) {
403                 outgoing = new OutgoingMsgrAdaptor(msgr, retryTimeout);
404             }
405             if (ros == null) {
406                 ros = new ReliableOutputStream(outgoing, new FixedFlowControl(windowSize));
407             }
408             if (ris == null) {
409                 ris = new ReliableInputStream(outgoing, retryTimeout, this);
410             }
411         }
412     }
413
414     /**
415      * Toggles reliability
416      *
417      * @param reliable Toggles reliability to reliable
418      * @throws IOException if pipe is bound
419      */
420     public void setReliable(boolean reliable) throws IOException {
421         if (isBound()) {
422             throw new IOException("Can not set reliability after pipe is bound");
423         }
424         this.isReliable = reliable;
425     }
426
427     /**
428      * Obtain the cred doc from the group object.
429      *
430      * @param group group context
431      * @return The credDoc value
432      */
433     protected static StructuredDocument getCredDoc(PeerGroup group) {
434         try {
435             MembershipService membership = group.getMembershipService();
436             Credential credential = membership.getDefaultCredential();
437
438             if (credential != null) {
439                 return credential.getDocument(MimeMediaType.XMLUTF8);
440             }
441         } catch (Exception e) {
442             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
443                 LOG.log(Level.WARNING, "failed to get credential", e);
444             }
445         }
446         return null;
447     }
448
449     /**
450      * get the remote credential doc
451      *
452      * @return Credential StructuredDocument
453      */
454     public StructuredDocument getCredentialDoc() {
455         return credentialDoc;
456     }
457
458     /**
459      * Sets the connection credential doc.
460      * If no credentials are set, the default group credential are used.
461      *
462      * @param doc Credential StructuredDocument
463      */
464     public void setCredentialDoc(StructuredDocument doc) {
465         this.credentialDoc = doc;
466     }
467
468     /**
469      * Creates a connection request message
470      *
471      * @param group  group context
472      * @param pipeAd pipe advertisement
473      * @return the Message  object
474      * @throws IOException if an io error occurs
475      */
476     protected Message createOpenMessage(PeerGroup group, PipeAdvertisement pipeAd) throws IOException {
477         Message msg = new Message();
478         PeerAdvertisement peerAdv = group.getPeerAdvertisement();
479
480         if (credentialDoc == null) {
481             credentialDoc = getCredDoc(group);
482         }
483         if (credentialDoc == null && pipeAd.getType().equals(PipeService.UnicastSecureType)) {
484             throw new IOException("No credentials established to initiate a secure connection");
485         }
486         try {
487             if (credentialDoc != null) {
488                 msg.addMessageElement(JxtaServerPipe.nameSpace,
489                         new TextDocumentMessageElement(JxtaServerPipe.credTag, (XMLDocument) credentialDoc, null));
490             }
491             msg.addMessageElement(JxtaServerPipe.nameSpace,
492                     new TextDocumentMessageElement(JxtaServerPipe.reqPipeTag,
493                             (XMLDocument) pipeAd.getDocument(MimeMediaType.XMLUTF8), null));
494
495             msg.addMessageElement(JxtaServerPipe.nameSpace,
496                     new StringMessageElement(JxtaServerPipe.reliableTag, Boolean.toString(isReliable), null));
497
498             msg.addMessageElement(JxtaServerPipe.nameSpace,
499                     new StringMessageElement(JxtaServerPipe.directSupportedTag, Boolean.toString(true), null));
500
501             msg.addMessageElement(JxtaServerPipe.nameSpace,
502                     new TextDocumentMessageElement(JxtaServerPipe.remPeerTag,
503                             (XMLDocument) peerAdv.getDocument(MimeMediaType.XMLUTF8), null));
504             return msg;
505         } catch (Throwable t) {
506             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
507                 LOG.log(Level.FINE, "error getting element stream", t);
508             }
509             return null;
510         }
511     }
512
513     /**
514      * Sets the bound attribute of the JxtaServerPipe object
515      */
516     void setBound() {
517         bound = true;
518         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
519             LOG.fine("Pipe Bound :true");
520         }
521     }
522
523     /**
524      * Returns the binding state of the JxtaServerPipe.
525      *
526      * @return true if the ServerSocket successfully bound to an address
527      */
528     public boolean isBound() {
529         return bound;
530     }
531
532     /**
533      * Returns an input stream for this socket.
534      *
535      * @return a stream for reading from this socket.
536      * @throws IOException if an I/O error occurs when creating the
537      *                     input stream.
538      */
539     public InputPipe getInputPipe() throws IOException {
540         return inputPipe;
541     }
542
543     /**
544      * Returns remote PeerAdvertisement
545      *
546      * @return remote PeerAdvertisement
547      */
548     public PeerAdvertisement getRemotePeerAdvertisement() {
549         return remotePeerAdv;
550     }
551
552     /**
553      * Returns remote PipeAdvertisement
554      *
555      * @return remote PipeAdvertisement
556      */
557     public PipeAdvertisement getRemotePipeAdvertisement() {
558         return remotePipeAdv;
559     }
560
561     /**
562      * Sets the remote PeerAdvertisement
563      *
564      * @param peer Remote PeerAdvertisement
565      */
566     protected void setRemotePeerAdvertisement(PeerAdvertisement peer) {
567         this.remotePeerAdv = peer;
568     }
569
570     /**
571      * Sets the remote PipeAdvertisement
572      *
573      * @param pipe PipeAdvertisement
574      */
575     protected void setRemotePipeAdvertisement(PipeAdvertisement pipe) {
576         this.remotePipeAdv = pipe;
577     }
578
579     /**
580      * Closes this pipe.
581      *
582      * @throws IOException if an I/O error occurs when closing this
583      *                     socket.
584      */
585     public void close() throws IOException {
586         sendClose();
587         closePipe(false);
588         bound = false;
589     }
590
591     protected void closePipe(boolean fastClose) throws IOException {
592         // close both pipes
593         synchronized (closeLock) {
594             if (closed) {
595                 return;
596             }
597             closed = true;
598             bound = false;
599         }
600
601         if (!fastClose && isReliable && !direct) {
602             /*
603              *  This implements linger!
604              */
605             long quitAt = System.currentTimeMillis() + timeout;
606             while (true) {
607                 //FIXME hamada this does not loop
608                 if (ros == null || ros.getMaxAck() == ros.getSeqNumber()) {
609                     // Nothing to worry about.
610                     break;
611                 }
612
613                 // By default wait forever.
614                 long left = 0;
615
616                 // If timeout is not zero. Then compute the waiting time
617                 // left.
618                 if (timeout != 0) {
619                     left = quitAt - System.currentTimeMillis();
620                     if (left < 0) {
621                         // Too late
622                         sendClose();
623                         throw new IOException("Close timeout");
624                     }
625                 }
626
627                 try {
628                     if (!ros.isQueueEmpty()) {
629                         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
630                             LOG.fine("Waiting for Output stream queue event");
631                         }
632                         ros.waitQueueEvent(left);
633                     }
634                     break;
635                 } catch (InterruptedException ie) {
636                     // give up, then.
637                     throw new IOException("Close interrupted");
638                 }
639             }
640
641             // We are initiating the close. We do not want to receive
642             // anything more. So we can close the ris right away.
643             ris.close();
644         }
645
646         if (isReliable && ros != null) {
647             ros.close();
648         }
649
650         // close the pipe
651         inputPipe.close();
652         msgr.close();
653         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
654             LOG.fine("Pipe close complete");
655         }
656         notifyListeners(PIPE_CLOSED_EVENT);
657     }
658
659     private void notifyListeners(int event) {
660         try {
661             if (eventListener != null) {
662                 eventListener.pipeEvent(event);
663             } else if (stateListener != null) {
664                 stateListener.stateEvent(this, event);
665             }
666         } catch (Throwable th) {
667             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
668                 LOG.log(Level.FINE, "error during pipe event callback", th);
669             }
670         }
671     }
672
673     /**
674      * Sets the inputPipe attribute of the JxtaBiDiPipe object
675      *
676      * @param inputPipe The new inputPipe value
677      */
678     protected void setInputPipe(InputPipe inputPipe) {
679         this.inputPipe = inputPipe;
680     }
681
682     /**
683      * {@inheritDoc}
684      */
685     public void pipeMsgEvent(PipeMsgEvent event) {
686         Message message = event.getMessage();
687         if (message == null) {
688             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
689                 LOG.fine("Empty event");
690             }
691             return;
692         }
693         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
694             LOG.fine("Pipe message arrived");
695         }
696
697         MessageElement element;
698         if (!bound) {
699             // look for a remote pipe answer
700             element = message.getMessageElement(JxtaServerPipe.nameSpace, JxtaServerPipe.remPipeTag);
701             if (element != null) {
702                 // connect response
703                 try {
704                     XMLDocument CredDoc = null;
705                     XMLDocument remotePipeDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(element);
706
707                     remotePipeAdv = (PipeAdvertisement) AdvertisementFactory.newAdvertisement(remotePipeDoc);
708                     if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
709                         LOG.fine("Recevied a pipe Advertisement :" + remotePipeAdv.getName());
710                     }
711
712                     element = message.getMessageElement(JxtaServerPipe.nameSpace, JxtaServerPipe.remPeerTag);
713                     if (element != null) {
714                         XMLDocument remotePeerDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(element);
715
716                         remotePeerAdv = (PeerAdvertisement) AdvertisementFactory.newAdvertisement(remotePeerDoc);
717                         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
718                             LOG.fine("Recevied an Peer Advertisement :" + remotePeerAdv.getName());
719                         }
720                     } else {
721                         if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
722                             LOG.warning(" BAD connect response");
723                         }
724                         return;
725                     }
726
727                     element = message.getMessageElement(JxtaServerPipe.nameSpace, JxtaServerPipe.credTag);
728                     if (element != null) {
729                         CredDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(element);
730                     }
731                     if (pipeAdv.getType().equals(PipeService.UnicastSecureType) && (CredDoc == null || !checkCred(CredDoc))) {
732                         // we're done here
733                         if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
734                             LOG.severe("Missing remote credential doc");
735                         }
736                         return;
737                     }
738
739                     element = message.getMessageElement(JxtaServerPipe.nameSpace, JxtaServerPipe.reliableTag);
740                     if (element != null) {
741                         isReliable = Boolean.valueOf(element.toString());
742                     }
743
744                     boolean directSupported = false;
745                     element = message.getMessageElement(JxtaServerPipe.nameSpace, JxtaServerPipe.directSupportedTag);
746                     if (element != null) {
747                         directSupported = Boolean.valueOf(element.toString());
748                     }
749
750                     if (directSupported) {
751                         msgr = getDirectMessenger(group, remotePipeAdv, remotePeerAdv);
752                         if (msgr != null) {
753                             this.direct = true;
754                         } else {
755                             msgr = lightweightOutputPipe(group, remotePipeAdv, remotePeerAdv);
756                         }
757                     } else {
758                         msgr = lightweightOutputPipe(group, remotePipeAdv, remotePeerAdv);
759                     }
760
761                     if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
762                         LOG.fine("Reliability set to :" + isReliable);
763                     }
764                     if (isReliable && !direct) {
765                         createRLib();
766                     }
767                     synchronized (finalLock) {
768                         waiting = false;
769                         finalLock.notifyAll();
770                     }
771                 } catch (IOException e) {
772                     if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
773                         LOG.log(Level.SEVERE, "failed to process response message", e);
774                     }
775                 }
776                 return;
777             }
778         }
779
780         if (isReliable && !direct) {
781             // let reliabilty deal with the message
782             receiveMessage(message);
783             return;
784         }
785         if (!hasClose(message)) {
786             push(event);
787         }
788     }
789
790     private boolean hasClose(Message message) {
791         // look for close request
792         MessageElement element = message.getMessageElement(JxtaServerPipe.nameSpace, JxtaServerPipe.closeTag);
793         if (element != null) {
794             try {
795                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
796                     LOG.fine("Recevied a pipe close request, closing pipes");
797                 }
798                 if (ros != null) {
799                     ros.hardClose();
800                 }
801                 closePipe(false);
802             } catch (IOException ie) {
803                 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
804                     LOG.log(Level.WARNING, "failed during close", ie);
805                 }
806             }
807             return true;
808         }
809         return false;
810     }
811
812     private void receiveMessage(Message message) {
813         Iterator<MessageElement> i = message.getMessageElements(Defs.NAMESPACE, Defs.MIME_TYPE_ACK);
814
815         if (i.hasNext()) {
816             if (ros != null) {
817                 ros.recv(message);
818             }
819             return;
820         }
821
822         i = message.getMessageElements(Defs.NAMESPACE, Defs.MIME_TYPE_BLOCK);
823         if (i.hasNext()) {
824
825             // It can happen that we receive messages for the input stream
826             // while we have not finished creating it.
827             try {
828                 synchronized (finalLock) {
829                     while (waiting) {
830                         finalLock.wait(timeout);
831                     }
832                 }
833             } catch (InterruptedException ie) {// ignored
834             }
835
836             if (ris != null) {
837                 ris.recv(message);
838             }
839         }
840     }
841
842     /**
843      * Gets the Maximum Retry Timeout of the reliability layer
844      *
845      * @return The maximum retry Timeout value
846      */
847     public synchronized int getMaxRetryTimeout() {
848         return maxRetryTimeout;
849     }
850
851     /**
852      * Gets the Maximum Retry Timeout of the reliability layer
853      *
854      * @param maxRetryTimeout The new maximum retry timeout value
855      * @throws IllegalArgumentException if maxRetryTimeout exceeds jxta platform maximum retry timeout
856      */
857     public synchronized void setMaxRetryTimeout(int maxRetryTimeout) {
858         if (maxRetryTimeout <= 0 || maxRetryTimeout > MAXRETRYTIMEOUT) {
859             throw new IllegalArgumentException(
860                     "Invalid Maximum retry timeout :" + maxRetryTimeout + " Exceed Global maximum retry timeout :"
861                             + MAXRETRYTIMEOUT);
862         }
863         this.maxRetryTimeout = maxRetryTimeout;
864     }
865
866     /**
867      * Gets the Retry Timeout of the reliability layer
868      *
869      * @return The retry Timeout value
870      */
871     public synchronized int getRetryTimeout() {
872         return retryTimeout;
873     }
874
875     /**
876      * Sets the Retry Timeout of the underlying reliability layer
877      * .
878      * In reliable mode it is possible for this call to block
879      * trying to obtain a lock on reliable input stream
880      *
881      * @param retryTimeout The new retry timeout value
882      * @throws IOException if an I/O error occurs
883      */
884     public synchronized void setRetryTimeout(int retryTimeout) throws IOException {
885         if (timeout <= 0) {
886             throw new IllegalArgumentException("Invalid Socket timeout :" + retryTimeout);
887         }
888         this.retryTimeout = retryTimeout;
889         if (outgoing != null) {
890             outgoing.setTimeout(retryTimeout);
891         }
892     }
893
894     /**
895      * When in reliable mode, gets the Reliable library window size
896      *
897      * @return The windowSize value
898      */
899     public synchronized int getWindowSize() {
900         return windowSize;
901     }
902
903     /**
904      * When in reliable mode, sets the Reliable library window size
905      *
906      * @param windowSize The new window size value
907      * @throws IOException if an I/O error occurs
908      */
909     public synchronized void setWindowSize(int windowSize) throws IOException {
910         if (isBound()) {
911             throw new IOException("Socket bound. Can not change the window size");
912         }
913         this.windowSize = windowSize;
914     }
915
916     /**
917      * This method is invoked by the Reliablity library for each incoming data message
918      *
919      * @param message Incoming message
920      */
921     public void processIncomingMessage(Message message) {
922         if (!hasClose(message)) {
923             PipeMsgEvent event = new PipeMsgEvent(this, message, (PipeID) inputPipe.getPipeID());
924             push(event);
925         }
926     }
927
928     private void push(PipeMsgEvent event) {
929         if (msgListener == null) {
930             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
931                 LOG.fine("push message onto queue");
932             }
933             queue.offer(event);
934         } else {
935             dequeue();
936             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
937                 LOG.fine("calling message listener");
938             }
939             msgListener.pipeMsgEvent(event);
940         }
941
942     }
943
944     /**
945      * Send a message
946      * <p/>
947      * <code>Messenger</code>
948      *
949      * @param msg Message to send to the remote side
950      * @return true if message was successfully enqueued
951      * @throws IOException if the underlying messenger breaks, either due to
952      *                     a physical address change, reliability issue.
953      * @see net.jxta.endpoint.Message
954      */
955     public boolean sendMessage(Message msg) throws IOException {
956         if (isReliable && !direct) {
957             int seqn = ros.send(msg);
958             return (seqn > 0);
959         } else {
960             try {
961                 if (msgr instanceof TcpMessenger) {
962                     ((TcpMessenger) msgr).sendMessageDirect(msg, null, null, true);
963                     return true;
964                 } else {
965                     return msgr.sendMessage(msg, null, null);
966                 }
967             } catch (SocketTimeoutException io) {
968                 if (msgr instanceof TcpMessenger) {
969                     ((TcpMessenger) msgr).sendMessageDirect(msg, null, null, true);
970                     return true;
971                 } else {
972                     return msgr.sendMessage(msg, null, null);
973                 }
974             } catch (IOException io) {
975                 closePipe(true);
976                 IOException exp = new IOException("IO error occured during sendMessage()");
977                 exp.initCause(io);
978                 throw exp;
979
980             }
981         }
982     }
983
984     private void dequeue() {
985         if (!dequeued && (null != msgListener)) {
986             while (queue != null && !queue.isEmpty()) {
987                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
988                     LOG.fine("dequeing messages onto message listener");
989                 }
990                 try {
991                     msgListener.pipeMsgEvent(queue.take());
992                 } catch (InterruptedException e) {
993                     //ignored
994                 }
995             }
996             dequeued = false;
997         }
998     }
999
1000     /**
1001      * {@inheritDoc}
1002      */
1003     public void outputPipeEvent(OutputPipeEvent event) {
1004         OutputPipe op = event.getOutputPipe();
1005
1006         if (op.getAdvertisement() == null) {
1007             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1008                 LOG.warning("The output pipe has no internal pipe advertisement. Continueing anyway.");
1009             }
1010         }
1011         if (op.getAdvertisement() == null || pipeAdv.equals(op.getAdvertisement())) {
1012             synchronized (acceptLock) {
1013                 // modify op within lock to prevent a race with the if.
1014                 if (connectOutpipe == null) {
1015                     connectOutpipe = op;
1016                     // set to null to avoid closure
1017                     op = null;
1018                 }
1019                 acceptLock.notifyAll();
1020             }
1021             // Ooops one too many, we were too fast re-trying.
1022             if (op != null) {
1023                 op.close();
1024             }
1025
1026         } else {
1027             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1028                 LOG.warning("Unexpected OutputPipe :" + op);
1029             }
1030         }
1031     }
1032
1033     /**
1034      * A lightweight direct messenger output pipe constructor, note the return type
1035      * Since all the info needed is available, there's no need for to
1036      * use the pipe service to resolve the pipe we have all we need
1037      * to construct a messenger.
1038      *
1039      * @param group   group context
1040      * @param pipeAdv Remote Pipe Advertisement
1041      * @param peer    Remote Peer advertisement
1042      * @return Messenger
1043      */
1044     protected static Messenger getDirectMessenger(PeerGroup group, PipeAdvertisement pipeAdv, PeerAdvertisement peer) {
1045         // Get an endpoint messenger to that address
1046         if (pipeAdv.getType().equals(PipeService.PropagateType)) {
1047             throw new IllegalArgumentException("Invalid pipe type " + pipeAdv.getType());
1048         }
1049         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1050             LOG.fine("Creating a Direct Messenger");
1051         }
1052
1053         if (pipeAdv.getType().equals(PipeService.UnicastType)) {
1054             EndpointService endpoint = group.getEndpointService();
1055             EndpointAddress pipeEndpoint = new EndpointAddress("jxta",
1056                                                        (peer.getPeerID().getUniqueValue()).toString(),
1057                                                        "PipeService",
1058                                                        pipeAdv.getPipeID().toString());
1059             return endpoint.getDirectMessenger(pipeEndpoint, peer, true);
1060         }
1061         return null;
1062     }
1063
1064     /**
1065      * A lightweight output pipe constructor, note the return type
1066      * Since all the info needed is available, there's no need for to
1067      * use the pipe service to resolve the pipe we have all we need
1068      * to construct a messenger.
1069      *
1070      * @param group   group context
1071      * @param pipeAdv Remote Pipe Advertisement
1072      * @param peer    Remote Peer advertisement
1073      * @return Messenger
1074      */
1075     protected static Messenger lightweightOutputPipe(PeerGroup group, PipeAdvertisement pipeAdv, PeerAdvertisement peer) {
1076
1077         EndpointService endpoint = group.getEndpointService();
1078         ID opId = pipeAdv.getPipeID();
1079         String destPeer = (peer.getPeerID().getUniqueValue()).toString();
1080
1081         // Get an endpoint messenger to that address
1082         EndpointAddress addr;
1083         if (pipeAdv.getType().equals(PipeService.UnicastType)) {
1084             addr = new EndpointAddress("jxta", destPeer, "PipeService", opId.toString());
1085         } else if (pipeAdv.getType().equals(PipeService.UnicastSecureType)) {
1086             addr = new EndpointAddress("jxtatls", destPeer, "PipeService", opId.toString());
1087         } else {
1088             // not a supported type
1089             return null;
1090         }
1091         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1092             LOG.fine("Creating a lightweightOutputPipe()");
1093         }
1094         return endpoint.getMessenger(addr);
1095     }
1096
1097     /**
1098      * Not implemented yet
1099      *
1100      * @param cred the credential document
1101      * @return always returns true
1102      */
1103     protected boolean checkCred(StructuredDocument cred) {
1104         // FIXME need to check credentials
1105         return true;
1106     }
1107
1108     /**
1109      * Send a close message to the remote side
1110      */
1111     private void sendClose() {
1112         if (!direct && isReliable && ros.isClosed()) {
1113             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1114                 LOG.fine("ReliableOutputStream is already closed. Skipping close message");
1115             }
1116             return;
1117         }
1118
1119         Message msg = new Message();
1120         msg.addMessageElement(JxtaServerPipe.nameSpace, new StringMessageElement(JxtaServerPipe.closeTag, "close", null));
1121         try {
1122             sendMessage(msg);
1123             // ros will not take any new message, now.
1124             if (!direct && ros != null) {
1125                 ros.close();
1126             }
1127         } catch (IOException ie) {
1128             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1129                 LOG.log(Level.SEVERE, "failed during close", ie);
1130             }
1131         }
1132     }
1133
1134     /**
1135      * Returns the message listener for this pipe
1136      *
1137      * @return PipeMsgListener
1138      * @deprecated use getMessageListener instead
1139      */
1140     @Deprecated
1141     public PipeMsgListener getListener() {
1142         return getMessageListener();
1143     }
1144
1145     /**
1146      * Returns the message listener for this pipe
1147      *
1148      * @return PipeMsgListener
1149      */
1150     public PipeMsgListener getMessageListener() {
1151         return msgListener;
1152     }
1153
1154     /**
1155      * Sets message listener for a pipe spawned by the JxtaServerPipe.
1156      * There is a window where a message could arrive prior to listener being
1157      * registered therefore a message queue is created to queue messages, once
1158      * a listener is registered these messages will be dequeued by calling the
1159      * listener until the queue is empty
1160      *
1161      * @param msgListener New value of property listener.
1162      * @deprecated use setMessageListener instead
1163      */
1164     @Deprecated
1165     public void setListener(PipeMsgListener msgListener) {
1166         setMessageListener(msgListener);
1167     }
1168
1169     /**
1170      * Sets message listener for a pipe spawned by the JxtaServerPipe.
1171      * There is a window where a message could arrive prior to listener being
1172      * registered therefore a message queue is created to queue messages, once
1173      * a listener is registered these messages will be dequeued by calling the
1174      * listener until the queue is empty.
1175      * <p/>
1176      * Sending messages vis {@link #sendMessage(Message)} from within a 
1177      * {@code PipeMsgListener} may result in a deadlock due to contention
1178      * between the sending and receiving portions of BiDi pipes. 
1179      *
1180      * @param msgListener New value of property listener.
1181      */
1182     public void setMessageListener(PipeMsgListener msgListener) {
1183         this.msgListener = msgListener;
1184         // if there are messages enqueued then dequeue them onto the msgListener
1185         dequeue();
1186     }
1187
1188     /**
1189      * Sets a Pipe event listener, set listener to null to unset the listener
1190      *
1191      * @param eventListener New value of property listener.
1192      * @deprecated use setPipeEventListener instead
1193      */
1194     @Deprecated
1195     public void setListener(PipeEventListener eventListener) {
1196         setPipeEventListener(eventListener);
1197     }
1198
1199     /**
1200      * Sets a Pipe event listener, set listener to null to unset the listener
1201      *
1202      * @param eventListener New value of property listener.
1203      */
1204     public void setPipeEventListener(PipeEventListener eventListener) {
1205         this.eventListener = eventListener;
1206     }
1207
1208     /**
1209      * Returns the Pipe event listener for this pipe
1210      *
1211      * @return PipeMsgListener
1212      */
1213     public PipeEventListener getPipeEventListener() {
1214         return eventListener;
1215     }
1216
1217     /**
1218      * Sets a Pipe state listener, set listener to null to unset the listener
1219      *
1220      * @param stateListener New value of property listener.
1221      */
1222     public void setPipeStateListener(PipeStateListener stateListener) {
1223         this.stateListener = stateListener;
1224     }
1225
1226     /**
1227      * Returns the Pipe state listener for this pipe
1228      *
1229      * @return PipeMsgListener
1230      */
1231     public PipeStateListener getPipeStateListener() {
1232         return stateListener;
1233     }
1234
1235     /**
1236      * Gets a message from the queue. If no Object is immediately available,
1237      * then wait the specified amount of time for a message to be inserted.
1238      *
1239      * @param timeout Amount of time to wait in milliseconds for an object to
1240      *                be available. Per Java convention, a timeout of zero (0) means wait an
1241      *                infinite amount of time. Negative values mean do not wait at all.
1242      * @return The next message in the queue. if a listener is registered calls
1243      *         to this method will return null
1244      * @throws InterruptedException if the operation is interrupted before
1245      *                              the timeout interval is completed.
1246      */
1247     public Message getMessage(int timeout) throws InterruptedException {
1248         if (queue == null || msgListener != null) {
1249             return null;
1250         } else {
1251             PipeMsgEvent ev = queue.poll(timeout, TimeUnit.MILLISECONDS);
1252             if (ev != null) {
1253                 return ev.getMessage();
1254             } else {
1255                 return null;
1256             }
1257         }
1258     }
1259
1260     /**
1261      * Returns the Assigned PipeAdvertisement
1262      *
1263      * @return the Assigned PipeAdvertisement
1264      */
1265     public PipeAdvertisement getPipeAdvertisement() {
1266         return pipeAdv;
1267     }
1268
1269     /**
1270      * {@inheritDoc}
1271      * <p/>
1272      * Closes the JxtaBiDiPipe.
1273      */
1274     @Override
1275     protected synchronized void finalize() throws Throwable {
1276         if (!closed) {
1277             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1278                 LOG.warning("JxtaBiDiPipe is being finalized without being previously closed. This is likely a users bug.");
1279             }
1280             close();
1281         }
1282         super.finalize();
1283     }
1284 }