2 * Copyright (c) 2006-2007 Sun Microsystems, Inc. All rights reserved.
4 * The Sun Project JXTA(TM) Software License
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
9 * 1. Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
16 * 3. The end-user documentation included with the redistribution, if any, must
17 * include the following acknowledgment: "This product includes software
18 * developed by Sun Microsystems, Inc. for JXTA(TM) technology."
19 * Alternately, this acknowledgment may appear in the software itself, if
20 * and wherever such third-party acknowledgments normally appear.
22 * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must
23 * not be used to endorse or promote products derived from this software
24 * without prior written permission. For written permission, please contact
25 * Project JXTA at http://www.jxta.org.
27 * 5. Products derived from this software may not be called "JXTA", nor may
28 * "JXTA" appear in their name, without prior written permission of Sun.
30 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
31 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
32 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN
33 * MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
34 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
36 * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
37 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
38 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
39 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41 * JXTA is a registered trademark of Sun Microsystems, Inc. in the United
42 * States and other countries.
44 * Please see the license information page at :
45 * <http://www.jxta.org/project/www/license.html> for instructions on use of
46 * the license in source files.
48 * ====================================================================
50 * This software consists of voluntary contributions made by many individuals
51 * on behalf of Project JXTA. For more information on Project JXTA, please see
52 * http://www.jxta.org.
54 * This license is based on the BSD license adopted by the Apache Foundation.
56 package net.jxta.util;
58 import net.jxta.credential.Credential;
59 import net.jxta.document.AdvertisementFactory;
60 import net.jxta.document.MimeMediaType;
61 import net.jxta.document.StructuredDocument;
62 import net.jxta.document.StructuredDocumentFactory;
63 import net.jxta.document.XMLDocument;
64 import net.jxta.endpoint.EndpointAddress;
65 import net.jxta.endpoint.EndpointService;
66 import net.jxta.endpoint.Message;
67 import net.jxta.endpoint.MessageElement;
68 import net.jxta.endpoint.Messenger;
69 import net.jxta.endpoint.StringMessageElement;
70 import net.jxta.endpoint.TextDocumentMessageElement;
71 import net.jxta.id.ID;
72 import net.jxta.impl.endpoint.tcp.TcpMessenger;
73 import net.jxta.impl.util.pipe.reliable.Defs;
74 import net.jxta.impl.util.pipe.reliable.FixedFlowControl;
75 import net.jxta.impl.util.pipe.reliable.OutgoingMsgrAdaptor;
76 import net.jxta.impl.util.pipe.reliable.ReliableInputStream;
77 import net.jxta.impl.util.pipe.reliable.ReliableOutputStream;
78 import net.jxta.logging.Logging;
79 import net.jxta.membership.MembershipService;
80 import net.jxta.peer.PeerID;
81 import net.jxta.peergroup.PeerGroup;
82 import net.jxta.pipe.InputPipe;
83 import net.jxta.pipe.OutputPipe;
84 import net.jxta.pipe.OutputPipeEvent;
85 import net.jxta.pipe.OutputPipeListener;
86 import net.jxta.pipe.PipeID;
87 import net.jxta.pipe.PipeMsgEvent;
88 import net.jxta.pipe.PipeMsgListener;
89 import net.jxta.pipe.PipeService;
90 import net.jxta.protocol.PeerAdvertisement;
91 import net.jxta.protocol.PipeAdvertisement;
93 import java.io.IOException;
94 import java.io.InputStream;
95 import java.net.SocketTimeoutException;
96 import java.util.Collections;
97 import java.util.Iterator;
98 import java.util.concurrent.ArrayBlockingQueue;
99 import java.util.concurrent.TimeUnit;
100 import java.util.logging.Level;
101 import java.util.logging.Logger;
104 * JxtaBiDiPipe is a pair of UnicastPipe channels that implements a bidirectional pipe.
105 * By default, JxtaBiDiPipe operates in reliable mode, unless otherwise specified,
106 * in addition, messages must not exceed the Endpoint MTU size of 64K, exceed the
107 * MTU will lead to unexpected behavior.
109 * It highly recommended that an application message listener is specified, not doing so, may
110 * lead to message loss in the event the internal queue is overflowed.
112 * Sending messages vis {@link #sendMessage(Message)} from within a
113 * {@code PipeMsgListener} may result in a deadlock due to contention
114 * between the sending and receiving portions of BiDi pipes.
116 * JxtaBiDiPipe, whenever possible, will attempt to utilize direct tcp messengers,
117 * which leads to improved performance.
119 public class JxtaBiDiPipe implements PipeMsgListener, OutputPipeListener, ReliableInputStream.MsgListener {
124 private final static transient Logger LOG = Logger.getLogger(JxtaBiDiPipe.class.getName());
126 private final static int MAXRETRYTIMEOUT = 120 * 1000;
127 private PipeAdvertisement remotePipeAdv;
128 private PeerAdvertisement remotePeerAdv;
129 protected int timeout = 15 * 1000;
130 protected int retryTimeout = 60 * 1000;
131 protected int maxRetryTimeout = MAXRETRYTIMEOUT;
132 protected int windowSize = 50;
134 private ArrayBlockingQueue<PipeMsgEvent> queue = new ArrayBlockingQueue<PipeMsgEvent>(windowSize);
135 protected PeerGroup group;
136 protected PipeAdvertisement pipeAdv;
137 protected PipeAdvertisement myPipeAdv;
138 protected PipeService pipeSvc;
139 protected InputPipe inputPipe;
140 protected OutputPipe connectOutpipe;
141 protected Messenger msgr;
142 protected InputStream stream;
143 protected final Object closeLock = new Object();
144 protected final Object acceptLock = new Object();
145 protected final Object finalLock = new Object();
146 protected boolean closed = false;
147 protected boolean bound = false;
148 protected boolean dequeued = false;
149 protected PipeMsgListener msgListener;
150 protected PipeEventListener eventListener;
151 protected PipeStateListener stateListener;
152 protected Credential credential = null;
153 protected boolean waiting;
156 * If {@code true} then we are using the underlying end-to-end ACK reliable
157 * layer to ensure that messages are received by the remote peer.
159 protected boolean isReliable = false;
161 protected ReliableInputStream ris = null;
162 protected ReliableOutputStream ros = null;
165 * If {@code true} then we are using a reliable direct messenger to the
166 * remote peer. We will assume that messages which are sent successfully
167 * will be received successfully.
169 protected volatile boolean direct = false;
170 protected OutgoingMsgrAdaptor outgoing = null;
171 protected StructuredDocument credentialDoc = null;
176 public static final int PIPE_CLOSED_EVENT = 1;
179 * Creates a bidirectional pipe
181 * @param group group context
182 * @param msgr lightweight output pipe
183 * @param pipe PipeAdvertisement
184 * @param isReliable Whether the connection is reliable or not
185 * @param credDoc Credential StructuredDocument
186 * @param direct indicates a direct messenger pipe
187 * @throws IOException if an io error occurs
189 protected JxtaBiDiPipe(PeerGroup group, Messenger msgr, PipeAdvertisement pipe, StructuredDocument credDoc, boolean isReliable, boolean direct) throws IOException {
191 throw new IOException("Null Messenger");
193 this.direct = direct;
196 this.credentialDoc = credDoc != null ? credDoc : getCredDoc(group);
197 this.pipeSvc = group.getPipeService();
198 this.inputPipe = pipeSvc.createInputPipe(pipe, this);
200 this.isReliable = isReliable;
208 * Creates a new object with a default timeout of #timeout, and no reliability.
211 public JxtaBiDiPipe() {
215 * Creates a bidirectional pipe.
217 * Attempts to create a bidirectional connection to remote peer within default
218 * timeout of #timeout.
220 * @param group group context
221 * @param pipeAd PipeAdvertisement
222 * @param msgListener application PipeMsgListener
223 * @throws IOException if an io error occurs
225 public JxtaBiDiPipe(PeerGroup group, PipeAdvertisement pipeAd, PipeMsgListener msgListener) throws IOException {
226 connect(group, null, pipeAd, timeout, msgListener);
230 * Creates a bidirectional pipe.
232 * Attempts to create a bidirectional connection to remote peer within specified
233 * timeout of #timeout.
235 * @param group group context
236 * @param timeout The number of milliseconds within which the JxtaBiDiPipe must
237 * be successfully created. An exception will be thrown if the pipe
238 * cannot be created in the alotted time. A timeout value of {@code 0}
239 * (zero) specifies an infinite timeout.
240 * @param pipeAd PipeAdvertisement
241 * @param msgListener application PipeMsgListener
242 * @throws IOException if an io error occurs
244 public JxtaBiDiPipe(PeerGroup group, PipeAdvertisement pipeAd, int timeout, PipeMsgListener msgListener) throws IOException {
245 connect(group, null, pipeAd, timeout, msgListener);
249 * attempts to create a bidirectional connection to remote peer
251 * @param group group context
252 * @param pipeAd PipeAdvertisement
253 * @param timeout The number of milliseconds within which the JxtaBiDiPipe must
254 * be successfully created. An exception will be thrown if the pipe
255 * cannot be created in the allotted time. A timeout value of {@code 0}
256 * (zero) specifies an infinite timeout.
257 * @param msgListener application PipeMsgListener
258 * @param reliable if true, the reliability is assumed
259 * @throws IOException if an io error occurs
261 public JxtaBiDiPipe(PeerGroup group, PipeAdvertisement pipeAd, int timeout, PipeMsgListener msgListener, boolean reliable) throws IOException {
262 connect(group, null, pipeAd, timeout, msgListener, reliable);
266 * Connect to a JxtaServerPipe with default timeout
268 * @param group group context
269 * @param pipeAd PipeAdvertisement
270 * @throws IOException if an io error occurs
272 public void connect(PeerGroup group, PipeAdvertisement pipeAd) throws IOException {
273 connect(group, pipeAd, timeout);
277 * Connects to a remote JxtaBiDiPipe
279 * @param group group context
280 * @param pipeAd PipeAdvertisement
281 * @param timeout timeout in ms, also reset object default timeout
283 * @throws IOException if an io error occurs
285 public void connect(PeerGroup group, PipeAdvertisement pipeAd, int timeout) throws IOException {
286 connect(group, null, pipeAd, timeout, null);
290 * Connects to a remote JxtaServerPipe
292 * @param group group context
293 * @param peerid peer to connect to
294 * @param pipeAd PipeAdvertisement
295 * @param timeout timeout in ms, also reset object default timeout to that of timeout
296 * @param msgListener application PipeMsgListener
297 * @throws IOException if an io error occurs
299 public void connect(PeerGroup group, PeerID peerid, PipeAdvertisement pipeAd, int timeout, PipeMsgListener msgListener) throws IOException {
300 connect(group, peerid, pipeAd, timeout, msgListener, isReliable);
304 * Connects to a remote JxtaServerPipe
306 * @param group group context
307 * @param peerid peer to connect to
308 * @param pipeAd PipeAdvertisement
309 * @param timeout timeout in ms, also reset object default timeout to that of timeout
310 * @param msgListener application PipeMsgListener
311 * @param reliable Reliable connection
312 * @throws IOException if an io error occurs
314 public void connect(PeerGroup group, PeerID peerid, PipeAdvertisement pipeAd, int timeout, PipeMsgListener msgListener, boolean reliable) throws IOException {
316 throw new IOException("Pipe already bound");
319 throw new IllegalArgumentException("Invalid timeout :" + timeout);
322 this.pipeAdv = pipeAd;
324 this.msgListener = msgListener;
325 if (msgListener != null) {
328 this.isReliable = reliable;
329 pipeSvc = group.getPipeService();
330 this.timeout = timeout;
331 myPipeAdv = JxtaServerPipe.newInputPipe(group, pipeAd);
332 this.inputPipe = pipeSvc.createInputPipe(myPipeAdv, this);
333 this.credentialDoc = credentialDoc != null ? credentialDoc : getCredDoc(group);
334 Message openMsg = createOpenMessage(group, myPipeAdv);
336 // create the output pipe and send this message
337 if (peerid == null) {
338 pipeSvc.createOutputPipe(pipeAd, this);
340 pipeSvc.createOutputPipe(pipeAd, Collections.singleton(peerid), this);
343 synchronized (acceptLock) {
344 // check connectOutpipe within lock to prevent a race with modification.
345 if (connectOutpipe == null) {
346 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
347 LOG.fine("Waiting for " + timeout + " msec");
349 acceptLock.wait(timeout);
352 } catch (InterruptedException ie) {
353 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
354 LOG.log(Level.FINE, "Interrupted", ie);
356 Thread.interrupted();
357 IOException exp = new IOException("Interrupted");
361 if (connectOutpipe == null) {
362 throw new IOException("connection timeout");
364 // send connect message
366 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
367 LOG.fine("Sending a backchannel message");
369 connectOutpipe.send(openMsg);
370 // wait for the second op
372 synchronized (finalLock) {
374 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
375 LOG.fine("Waiting for " + timeout + " msec for back channel to be established");
377 finalLock.wait(timeout);
378 // Need to check for creation
380 throw new IOException("connection timeout");
384 } catch (InterruptedException ie) {
385 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
386 LOG.log(Level.FINE, "Interrupted", ie);
388 Thread.interrupted();
389 IOException exp = new IOException("Interrupted");
394 notifyListeners(PipeStateListener.PIPE_OPENED_EVENT);
398 * creates all the reliability objects
400 private void createRLib() {
402 if (outgoing == null) {
403 outgoing = new OutgoingMsgrAdaptor(msgr, retryTimeout);
406 ros = new ReliableOutputStream(outgoing, new FixedFlowControl(windowSize));
409 ris = new ReliableInputStream(outgoing, retryTimeout, this);
415 * Toggles reliability
417 * @param reliable Toggles reliability to reliable
418 * @throws IOException if pipe is bound
420 public void setReliable(boolean reliable) throws IOException {
422 throw new IOException("Can not set reliability after pipe is bound");
424 this.isReliable = reliable;
428 * Obtain the cred doc from the group object.
430 * @param group group context
431 * @return The credDoc value
433 protected static StructuredDocument getCredDoc(PeerGroup group) {
435 MembershipService membership = group.getMembershipService();
436 Credential credential = membership.getDefaultCredential();
438 if (credential != null) {
439 return credential.getDocument(MimeMediaType.XMLUTF8);
441 } catch (Exception e) {
442 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
443 LOG.log(Level.WARNING, "failed to get credential", e);
450 * get the remote credential doc
452 * @return Credential StructuredDocument
454 public StructuredDocument getCredentialDoc() {
455 return credentialDoc;
459 * Sets the connection credential doc.
460 * If no credentials are set, the default group credential are used.
462 * @param doc Credential StructuredDocument
464 public void setCredentialDoc(StructuredDocument doc) {
465 this.credentialDoc = doc;
469 * Creates a connection request message
471 * @param group group context
472 * @param pipeAd pipe advertisement
473 * @return the Message object
474 * @throws IOException if an io error occurs
476 protected Message createOpenMessage(PeerGroup group, PipeAdvertisement pipeAd) throws IOException {
477 Message msg = new Message();
478 PeerAdvertisement peerAdv = group.getPeerAdvertisement();
480 if (credentialDoc == null) {
481 credentialDoc = getCredDoc(group);
483 if (credentialDoc == null && pipeAd.getType().equals(PipeService.UnicastSecureType)) {
484 throw new IOException("No credentials established to initiate a secure connection");
487 if (credentialDoc != null) {
488 msg.addMessageElement(JxtaServerPipe.nameSpace,
489 new TextDocumentMessageElement(JxtaServerPipe.credTag, (XMLDocument) credentialDoc, null));
491 msg.addMessageElement(JxtaServerPipe.nameSpace,
492 new TextDocumentMessageElement(JxtaServerPipe.reqPipeTag,
493 (XMLDocument) pipeAd.getDocument(MimeMediaType.XMLUTF8), null));
495 msg.addMessageElement(JxtaServerPipe.nameSpace,
496 new StringMessageElement(JxtaServerPipe.reliableTag, Boolean.toString(isReliable), null));
498 msg.addMessageElement(JxtaServerPipe.nameSpace,
499 new StringMessageElement(JxtaServerPipe.directSupportedTag, Boolean.toString(true), null));
501 msg.addMessageElement(JxtaServerPipe.nameSpace,
502 new TextDocumentMessageElement(JxtaServerPipe.remPeerTag,
503 (XMLDocument) peerAdv.getDocument(MimeMediaType.XMLUTF8), null));
505 } catch (Throwable t) {
506 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
507 LOG.log(Level.FINE, "error getting element stream", t);
514 * Sets the bound attribute of the JxtaServerPipe object
518 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
519 LOG.fine("Pipe Bound :true");
524 * Returns the binding state of the JxtaServerPipe.
526 * @return true if the ServerSocket successfully bound to an address
528 public boolean isBound() {
533 * Returns an input stream for this socket.
535 * @return a stream for reading from this socket.
536 * @throws IOException if an I/O error occurs when creating the
539 public InputPipe getInputPipe() throws IOException {
544 * Returns remote PeerAdvertisement
546 * @return remote PeerAdvertisement
548 public PeerAdvertisement getRemotePeerAdvertisement() {
549 return remotePeerAdv;
553 * Returns remote PipeAdvertisement
555 * @return remote PipeAdvertisement
557 public PipeAdvertisement getRemotePipeAdvertisement() {
558 return remotePipeAdv;
562 * Sets the remote PeerAdvertisement
564 * @param peer Remote PeerAdvertisement
566 protected void setRemotePeerAdvertisement(PeerAdvertisement peer) {
567 this.remotePeerAdv = peer;
571 * Sets the remote PipeAdvertisement
573 * @param pipe PipeAdvertisement
575 protected void setRemotePipeAdvertisement(PipeAdvertisement pipe) {
576 this.remotePipeAdv = pipe;
582 * @throws IOException if an I/O error occurs when closing this
585 public void close() throws IOException {
591 protected void closePipe(boolean fastClose) throws IOException {
593 synchronized (closeLock) {
601 if (!fastClose && isReliable && !direct) {
603 * This implements linger!
605 long quitAt = System.currentTimeMillis() + timeout;
607 //FIXME hamada this does not loop
608 if (ros == null || ros.getMaxAck() == ros.getSeqNumber()) {
609 // Nothing to worry about.
613 // By default wait forever.
616 // If timeout is not zero. Then compute the waiting time
619 left = quitAt - System.currentTimeMillis();
623 throw new IOException("Close timeout");
628 if (!ros.isQueueEmpty()) {
629 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
630 LOG.fine("Waiting for Output stream queue event");
632 ros.waitQueueEvent(left);
635 } catch (InterruptedException ie) {
637 throw new IOException("Close interrupted");
641 // We are initiating the close. We do not want to receive
642 // anything more. So we can close the ris right away.
646 if (isReliable && ros != null) {
653 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
654 LOG.fine("Pipe close complete");
656 notifyListeners(PIPE_CLOSED_EVENT);
659 private void notifyListeners(int event) {
661 if (eventListener != null) {
662 eventListener.pipeEvent(event);
663 } else if (stateListener != null) {
664 stateListener.stateEvent(this, event);
666 } catch (Throwable th) {
667 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
668 LOG.log(Level.FINE, "error during pipe event callback", th);
674 * Sets the inputPipe attribute of the JxtaBiDiPipe object
676 * @param inputPipe The new inputPipe value
678 protected void setInputPipe(InputPipe inputPipe) {
679 this.inputPipe = inputPipe;
685 public void pipeMsgEvent(PipeMsgEvent event) {
686 Message message = event.getMessage();
687 if (message == null) {
688 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
689 LOG.fine("Empty event");
693 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
694 LOG.fine("Pipe message arrived");
697 MessageElement element;
699 // look for a remote pipe answer
700 element = message.getMessageElement(JxtaServerPipe.nameSpace, JxtaServerPipe.remPipeTag);
701 if (element != null) {
704 XMLDocument CredDoc = null;
705 XMLDocument remotePipeDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(element);
707 remotePipeAdv = (PipeAdvertisement) AdvertisementFactory.newAdvertisement(remotePipeDoc);
708 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
709 LOG.fine("Recevied a pipe Advertisement :" + remotePipeAdv.getName());
712 element = message.getMessageElement(JxtaServerPipe.nameSpace, JxtaServerPipe.remPeerTag);
713 if (element != null) {
714 XMLDocument remotePeerDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(element);
716 remotePeerAdv = (PeerAdvertisement) AdvertisementFactory.newAdvertisement(remotePeerDoc);
717 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
718 LOG.fine("Recevied an Peer Advertisement :" + remotePeerAdv.getName());
721 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
722 LOG.warning(" BAD connect response");
727 element = message.getMessageElement(JxtaServerPipe.nameSpace, JxtaServerPipe.credTag);
728 if (element != null) {
729 CredDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(element);
731 if (pipeAdv.getType().equals(PipeService.UnicastSecureType) && (CredDoc == null || !checkCred(CredDoc))) {
733 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
734 LOG.severe("Missing remote credential doc");
739 element = message.getMessageElement(JxtaServerPipe.nameSpace, JxtaServerPipe.reliableTag);
740 if (element != null) {
741 isReliable = Boolean.valueOf(element.toString());
744 boolean directSupported = false;
745 element = message.getMessageElement(JxtaServerPipe.nameSpace, JxtaServerPipe.directSupportedTag);
746 if (element != null) {
747 directSupported = Boolean.valueOf(element.toString());
750 if (directSupported) {
751 msgr = getDirectMessenger(group, remotePipeAdv, remotePeerAdv);
755 msgr = lightweightOutputPipe(group, remotePipeAdv, remotePeerAdv);
758 msgr = lightweightOutputPipe(group, remotePipeAdv, remotePeerAdv);
761 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
762 LOG.fine("Reliability set to :" + isReliable);
764 if (isReliable && !direct) {
767 synchronized (finalLock) {
769 finalLock.notifyAll();
771 } catch (IOException e) {
772 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
773 LOG.log(Level.SEVERE, "failed to process response message", e);
780 if (isReliable && !direct) {
781 // let reliabilty deal with the message
782 receiveMessage(message);
785 if (!hasClose(message)) {
790 private boolean hasClose(Message message) {
791 // look for close request
792 MessageElement element = message.getMessageElement(JxtaServerPipe.nameSpace, JxtaServerPipe.closeTag);
793 if (element != null) {
795 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
796 LOG.fine("Recevied a pipe close request, closing pipes");
802 } catch (IOException ie) {
803 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
804 LOG.log(Level.WARNING, "failed during close", ie);
812 private void receiveMessage(Message message) {
813 Iterator<MessageElement> i = message.getMessageElements(Defs.NAMESPACE, Defs.MIME_TYPE_ACK);
822 i = message.getMessageElements(Defs.NAMESPACE, Defs.MIME_TYPE_BLOCK);
825 // It can happen that we receive messages for the input stream
826 // while we have not finished creating it.
828 synchronized (finalLock) {
830 finalLock.wait(timeout);
833 } catch (InterruptedException ie) {// ignored
843 * Gets the Maximum Retry Timeout of the reliability layer
845 * @return The maximum retry Timeout value
847 public synchronized int getMaxRetryTimeout() {
848 return maxRetryTimeout;
852 * Gets the Maximum Retry Timeout of the reliability layer
854 * @param maxRetryTimeout The new maximum retry timeout value
855 * @throws IllegalArgumentException if maxRetryTimeout exceeds jxta platform maximum retry timeout
857 public synchronized void setMaxRetryTimeout(int maxRetryTimeout) {
858 if (maxRetryTimeout <= 0 || maxRetryTimeout > MAXRETRYTIMEOUT) {
859 throw new IllegalArgumentException(
860 "Invalid Maximum retry timeout :" + maxRetryTimeout + " Exceed Global maximum retry timeout :"
863 this.maxRetryTimeout = maxRetryTimeout;
867 * Gets the Retry Timeout of the reliability layer
869 * @return The retry Timeout value
871 public synchronized int getRetryTimeout() {
876 * Sets the Retry Timeout of the underlying reliability layer
878 * In reliable mode it is possible for this call to block
879 * trying to obtain a lock on reliable input stream
881 * @param retryTimeout The new retry timeout value
882 * @throws IOException if an I/O error occurs
884 public synchronized void setRetryTimeout(int retryTimeout) throws IOException {
886 throw new IllegalArgumentException("Invalid Socket timeout :" + retryTimeout);
888 this.retryTimeout = retryTimeout;
889 if (outgoing != null) {
890 outgoing.setTimeout(retryTimeout);
895 * When in reliable mode, gets the Reliable library window size
897 * @return The windowSize value
899 public synchronized int getWindowSize() {
904 * When in reliable mode, sets the Reliable library window size
906 * @param windowSize The new window size value
907 * @throws IOException if an I/O error occurs
909 public synchronized void setWindowSize(int windowSize) throws IOException {
911 throw new IOException("Socket bound. Can not change the window size");
913 this.windowSize = windowSize;
917 * This method is invoked by the Reliablity library for each incoming data message
919 * @param message Incoming message
921 public void processIncomingMessage(Message message) {
922 if (!hasClose(message)) {
923 PipeMsgEvent event = new PipeMsgEvent(this, message, (PipeID) inputPipe.getPipeID());
928 private void push(PipeMsgEvent event) {
929 if (msgListener == null) {
930 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
931 LOG.fine("push message onto queue");
936 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
937 LOG.fine("calling message listener");
939 msgListener.pipeMsgEvent(event);
947 * <code>Messenger</code>
949 * @param msg Message to send to the remote side
950 * @return true if message was successfully enqueued
951 * @throws IOException if the underlying messenger breaks, either due to
952 * a physical address change, reliability issue.
953 * @see net.jxta.endpoint.Message
955 public boolean sendMessage(Message msg) throws IOException {
956 if (isReliable && !direct) {
957 int seqn = ros.send(msg);
961 if (msgr instanceof TcpMessenger) {
962 ((TcpMessenger) msgr).sendMessageDirect(msg, null, null, true);
965 return msgr.sendMessage(msg, null, null);
967 } catch (SocketTimeoutException io) {
968 if (msgr instanceof TcpMessenger) {
969 ((TcpMessenger) msgr).sendMessageDirect(msg, null, null, true);
972 return msgr.sendMessage(msg, null, null);
974 } catch (IOException io) {
976 IOException exp = new IOException("IO error occured during sendMessage()");
984 private void dequeue() {
985 if (!dequeued && (null != msgListener)) {
986 while (queue != null && !queue.isEmpty()) {
987 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
988 LOG.fine("dequeing messages onto message listener");
991 msgListener.pipeMsgEvent(queue.take());
992 } catch (InterruptedException e) {
1003 public void outputPipeEvent(OutputPipeEvent event) {
1004 OutputPipe op = event.getOutputPipe();
1006 if (op.getAdvertisement() == null) {
1007 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1008 LOG.warning("The output pipe has no internal pipe advertisement. Continueing anyway.");
1011 if (op.getAdvertisement() == null || pipeAdv.equals(op.getAdvertisement())) {
1012 synchronized (acceptLock) {
1013 // modify op within lock to prevent a race with the if.
1014 if (connectOutpipe == null) {
1015 connectOutpipe = op;
1016 // set to null to avoid closure
1019 acceptLock.notifyAll();
1021 // Ooops one too many, we were too fast re-trying.
1027 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1028 LOG.warning("Unexpected OutputPipe :" + op);
1034 * A lightweight direct messenger output pipe constructor, note the return type
1035 * Since all the info needed is available, there's no need for to
1036 * use the pipe service to resolve the pipe we have all we need
1037 * to construct a messenger.
1039 * @param group group context
1040 * @param pipeAdv Remote Pipe Advertisement
1041 * @param peer Remote Peer advertisement
1044 protected static Messenger getDirectMessenger(PeerGroup group, PipeAdvertisement pipeAdv, PeerAdvertisement peer) {
1045 // Get an endpoint messenger to that address
1046 if (pipeAdv.getType().equals(PipeService.PropagateType)) {
1047 throw new IllegalArgumentException("Invalid pipe type " + pipeAdv.getType());
1049 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1050 LOG.fine("Creating a Direct Messenger");
1053 if (pipeAdv.getType().equals(PipeService.UnicastType)) {
1054 EndpointService endpoint = group.getEndpointService();
1055 EndpointAddress pipeEndpoint = new EndpointAddress("jxta",
1056 (peer.getPeerID().getUniqueValue()).toString(),
1058 pipeAdv.getPipeID().toString());
1059 return endpoint.getDirectMessenger(pipeEndpoint, peer, true);
1065 * A lightweight output pipe constructor, note the return type
1066 * Since all the info needed is available, there's no need for to
1067 * use the pipe service to resolve the pipe we have all we need
1068 * to construct a messenger.
1070 * @param group group context
1071 * @param pipeAdv Remote Pipe Advertisement
1072 * @param peer Remote Peer advertisement
1075 protected static Messenger lightweightOutputPipe(PeerGroup group, PipeAdvertisement pipeAdv, PeerAdvertisement peer) {
1077 EndpointService endpoint = group.getEndpointService();
1078 ID opId = pipeAdv.getPipeID();
1079 String destPeer = (peer.getPeerID().getUniqueValue()).toString();
1081 // Get an endpoint messenger to that address
1082 EndpointAddress addr;
1083 if (pipeAdv.getType().equals(PipeService.UnicastType)) {
1084 addr = new EndpointAddress("jxta", destPeer, "PipeService", opId.toString());
1085 } else if (pipeAdv.getType().equals(PipeService.UnicastSecureType)) {
1086 addr = new EndpointAddress("jxtatls", destPeer, "PipeService", opId.toString());
1088 // not a supported type
1091 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1092 LOG.fine("Creating a lightweightOutputPipe()");
1094 return endpoint.getMessenger(addr);
1098 * Not implemented yet
1100 * @param cred the credential document
1101 * @return always returns true
1103 protected boolean checkCred(StructuredDocument cred) {
1104 // FIXME need to check credentials
1109 * Send a close message to the remote side
1111 private void sendClose() {
1112 if (!direct && isReliable && ros.isClosed()) {
1113 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1114 LOG.fine("ReliableOutputStream is already closed. Skipping close message");
1119 Message msg = new Message();
1120 msg.addMessageElement(JxtaServerPipe.nameSpace, new StringMessageElement(JxtaServerPipe.closeTag, "close", null));
1123 // ros will not take any new message, now.
1124 if (!direct && ros != null) {
1127 } catch (IOException ie) {
1128 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1129 LOG.log(Level.SEVERE, "failed during close", ie);
1135 * Returns the message listener for this pipe
1137 * @return PipeMsgListener
1138 * @deprecated use getMessageListener instead
1141 public PipeMsgListener getListener() {
1142 return getMessageListener();
1146 * Returns the message listener for this pipe
1148 * @return PipeMsgListener
1150 public PipeMsgListener getMessageListener() {
1155 * Sets message listener for a pipe spawned by the JxtaServerPipe.
1156 * There is a window where a message could arrive prior to listener being
1157 * registered therefore a message queue is created to queue messages, once
1158 * a listener is registered these messages will be dequeued by calling the
1159 * listener until the queue is empty
1161 * @param msgListener New value of property listener.
1162 * @deprecated use setMessageListener instead
1165 public void setListener(PipeMsgListener msgListener) {
1166 setMessageListener(msgListener);
1170 * Sets message listener for a pipe spawned by the JxtaServerPipe.
1171 * There is a window where a message could arrive prior to listener being
1172 * registered therefore a message queue is created to queue messages, once
1173 * a listener is registered these messages will be dequeued by calling the
1174 * listener until the queue is empty.
1176 * Sending messages vis {@link #sendMessage(Message)} from within a
1177 * {@code PipeMsgListener} may result in a deadlock due to contention
1178 * between the sending and receiving portions of BiDi pipes.
1180 * @param msgListener New value of property listener.
1182 public void setMessageListener(PipeMsgListener msgListener) {
1183 this.msgListener = msgListener;
1184 // if there are messages enqueued then dequeue them onto the msgListener
1189 * Sets a Pipe event listener, set listener to null to unset the listener
1191 * @param eventListener New value of property listener.
1192 * @deprecated use setPipeEventListener instead
1195 public void setListener(PipeEventListener eventListener) {
1196 setPipeEventListener(eventListener);
1200 * Sets a Pipe event listener, set listener to null to unset the listener
1202 * @param eventListener New value of property listener.
1204 public void setPipeEventListener(PipeEventListener eventListener) {
1205 this.eventListener = eventListener;
1209 * Returns the Pipe event listener for this pipe
1211 * @return PipeMsgListener
1213 public PipeEventListener getPipeEventListener() {
1214 return eventListener;
1218 * Sets a Pipe state listener, set listener to null to unset the listener
1220 * @param stateListener New value of property listener.
1222 public void setPipeStateListener(PipeStateListener stateListener) {
1223 this.stateListener = stateListener;
1227 * Returns the Pipe state listener for this pipe
1229 * @return PipeMsgListener
1231 public PipeStateListener getPipeStateListener() {
1232 return stateListener;
1236 * Gets a message from the queue. If no Object is immediately available,
1237 * then wait the specified amount of time for a message to be inserted.
1239 * @param timeout Amount of time to wait in milliseconds for an object to
1240 * be available. Per Java convention, a timeout of zero (0) means wait an
1241 * infinite amount of time. Negative values mean do not wait at all.
1242 * @return The next message in the queue. if a listener is registered calls
1243 * to this method will return null
1244 * @throws InterruptedException if the operation is interrupted before
1245 * the timeout interval is completed.
1247 public Message getMessage(int timeout) throws InterruptedException {
1248 if (queue == null || msgListener != null) {
1251 PipeMsgEvent ev = queue.poll(timeout, TimeUnit.MILLISECONDS);
1253 return ev.getMessage();
1261 * Returns the Assigned PipeAdvertisement
1263 * @return the Assigned PipeAdvertisement
1265 public PipeAdvertisement getPipeAdvertisement() {
1272 * Closes the JxtaBiDiPipe.
1275 protected synchronized void finalize() throws Throwable {
1277 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1278 LOG.warning("JxtaBiDiPipe is being finalized without being previously closed. This is likely a users bug.");