]> sjero.net Git - linphone/blob - p2pproxy/dependencies-src/jxse-src-2.5/impl/src/net/jxta/impl/pipe/BlockingWireOutputPipe.java
remove mediastreamer2 and add it as a submodule instead.
[linphone] / p2pproxy / dependencies-src / jxse-src-2.5 / impl / src / net / jxta / impl / pipe / BlockingWireOutputPipe.java
1 /*
2  * Copyright (c) 2001-2007 Sun Microsystems, Inc.  All rights reserved.
3  *  
4  *  The Sun Project JXTA(TM) Software License
5  *  
6  *  Redistribution and use in source and binary forms, with or without 
7  *  modification, are permitted provided that the following conditions are met:
8  *  
9  *  1. Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *  
12  *  2. Redistributions in binary form must reproduce the above copyright notice, 
13  *     this list of conditions and the following disclaimer in the documentation 
14  *     and/or other materials provided with the distribution.
15  *  
16  *  3. The end-user documentation included with the redistribution, if any, must 
17  *     include the following acknowledgment: "This product includes software 
18  *     developed by Sun Microsystems, Inc. for JXTA(TM) technology." 
19  *     Alternately, this acknowledgment may appear in the software itself, if 
20  *     and wherever such third-party acknowledgments normally appear.
21  *  
22  *  4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must 
23  *     not be used to endorse or promote products derived from this software 
24  *     without prior written permission. For written permission, please contact 
25  *     Project JXTA at http://www.jxta.org.
26  *  
27  *  5. Products derived from this software may not be called "JXTA", nor may 
28  *     "JXTA" appear in their name, without prior written permission of Sun.
29  *  
30  *  THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
31  *  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND 
32  *  FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN 
33  *  MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
34  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 
35  *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, 
36  *  OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
37  *  LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
38  *  NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, 
39  *  EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
40  *  
41  *  JXTA is a registered trademark of Sun Microsystems, Inc. in the United 
42  *  States and other countries.
43  *  
44  *  Please see the license information page at :
45  *  <http://www.jxta.org/project/www/license.html> for instructions on use of 
46  *  the license in source files.
47  *  
48  *  ====================================================================
49  *  
50  *  This software consists of voluntary contributions made by many individuals 
51  *  on behalf of Project JXTA. For more information on Project JXTA, please see 
52  *  http://www.jxta.org.
53  *  
54  *  This license is based on the BSD license adopted by the Apache Foundation. 
55  */
56 package net.jxta.impl.pipe;
57
58 import net.jxta.document.MimeMediaType;
59 import net.jxta.document.XMLDocument;
60 import net.jxta.endpoint.EndpointAddress;
61 import net.jxta.endpoint.EndpointService;
62 import net.jxta.endpoint.Message;
63 import net.jxta.endpoint.MessageElement;
64 import net.jxta.endpoint.Messenger;
65 import net.jxta.endpoint.TextDocumentMessageElement;
66 import net.jxta.id.ID;
67 import net.jxta.impl.endpoint.tcp.TcpMessenger;
68 import net.jxta.logging.Logging;
69 import net.jxta.peer.PeerID;
70 import net.jxta.peergroup.PeerGroup;
71 import net.jxta.pipe.OutputPipe;
72 import net.jxta.protocol.PipeAdvertisement;
73 import net.jxta.protocol.RouteAdvertisement;
74
75 import java.io.IOException;
76 import java.util.logging.Level;
77 import java.util.logging.Logger;
78
79 /**
80  * This Object is created when a call to PipeService.createOutputPipe(propgateAdv) with a Set containing a single
81  * PeerID.  This pipe blocks until a valid messeger is created (i.e. resolved and useable). With this object it is
82  * possible to detect connection failures during the messenger resolution. Notge, this pipe also avoids utilitizing the
83  * rendezvous for propagation, effectively reducing message overhead, resulting in improved performance.
84  * <p/>
85  * #send is remains asynchronous.
86  */
87 public class BlockingWireOutputPipe implements OutputPipe {
88
89     /**
90      * Logger
91      */
92     private static final Logger LOG = Logger.getLogger(NonBlockingWireOutputPipe.class.getName());
93
94     /**
95      * If true then the pipe has been closed and will no longer accept messages.
96      */
97     private volatile boolean closed = false;
98
99     /**
100      * The advertisement we were created from.
101      */
102     private final PipeAdvertisement pAdv;
103
104     private final PeerGroup group;
105     private Messenger destMessenger = null;
106     private EndpointAddress destination;
107     private EndpointService endpoint = null;
108     private RouteAdvertisement route = null;
109
110     /**
111      * Create a new blocking output pipe
112      *
113      * @param group  The peergroup context.
114      * @param pAdv   advertisement for the pipe we are supporting.
115      * @param peerID the destination <code>PeerID</code>.
116      */
117     public BlockingWireOutputPipe(PeerGroup group, PipeAdvertisement pAdv, PeerID peerID) {
118
119         this.pAdv = pAdv;
120         this.group = group;
121         this.endpoint = group.getEndpointService();
122         destination = new EndpointAddress("jxta", peerID.getUniqueValue().toString(), "PipeService", pAdv.getID().toString());
123         destMessenger = endpoint.getMessenger(destination);
124
125         if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
126             LOG.info("Created output pipe for " + getPipeID());
127         }
128     }
129     /**
130      * Create a new blocking output pipe
131      *
132      * @param group  The peergroup context.
133      * @param pAdv   advertisement for the pipe we are supporting.
134      * @param peerID the destination <code>PeerID</code>.
135      * @param route the destination route.
136      */
137     public BlockingWireOutputPipe(PeerGroup group, PipeAdvertisement pAdv, PeerID peerID, RouteAdvertisement route) {
138         this.route = route;
139         this.pAdv = pAdv;
140         this.group = group;
141         this.endpoint = group.getEndpointService();
142         destination = new EndpointAddress("jxta", peerID.getUniqueValue().toString(), "PipeService", pAdv.getID().toString());
143         if (route != null) {
144             destMessenger = endpoint.getDirectMessenger(destination, route, true);
145         }
146         if (destMessenger == null) {
147             destMessenger = endpoint.getMessenger(destination);
148         }
149         if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
150             LOG.info("Created output pipe for " + getPipeID());
151         }
152     }
153
154     /**
155      * {@inheritDoc}
156      */
157     public synchronized void close() {
158
159         if (closed) {
160             return;
161         }
162         // Close the queue so that no more messages are accepted
163         if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
164             LOG.info("Closing queue for " + getPipeID());
165         }
166         closed = true;
167     }
168
169     /**
170      * {@inheritDoc}
171      */
172     public boolean isClosed() {
173         return closed;
174     }
175
176     /**
177      * {@inheritDoc}
178      */
179     public final String getType() {
180         return pAdv.getType();
181     }
182
183     /**
184      * {@inheritDoc}
185      */
186     public final ID getPipeID() {
187         return pAdv.getPipeID();
188     }
189
190     /**
191      * {@inheritDoc}
192      */
193     public final String getName() {
194         return pAdv.getName();
195     }
196
197     /**
198      * {@inheritDoc}
199      */
200     public final PipeAdvertisement getAdvertisement() {
201         return pAdv;
202     }
203
204     private void checkMessenger() throws IOException {
205         if (!(destMessenger instanceof TcpMessenger) && destMessenger != null && (destMessenger.getState() & Messenger.USABLE) != 0) {
206             return;
207         }
208         if (destMessenger != null && !destMessenger.isClosed()) {
209                 return;
210         }
211         synchronized (this) {
212             if (route != null) {
213                 destMessenger = endpoint.getDirectMessenger(destination, route, true);
214             }
215             destMessenger = endpoint.getMessenger(destination);
216             if (!(destMessenger instanceof TcpMessenger) && destMessenger == null || (destMessenger.getState() & Messenger.TERMINAL) != 0) {
217                 if (destMessenger != null) {
218                     destMessenger.close();
219                     destMessenger = null;
220                 }
221                 throw new IOException("Unable to create a messenger to " + destination.toString());
222             }
223         }
224     }
225
226     /**
227      * {@inheritDoc}
228      */
229     public boolean send(Message message) throws IOException {
230         if (closed) {
231             throw new IOException("Pipe closed");
232         }
233         WireHeader header = new WireHeader();
234
235         header.setPipeID(getPipeID());
236         header.setSrcPeer(group.getPeerID());
237         header.setTTL(1);
238         header.setMsgId(WirePipe.createMsgId());
239
240         XMLDocument asDoc = (XMLDocument) header.getDocument(MimeMediaType.XMLUTF8);
241         MessageElement elem = new TextDocumentMessageElement(WirePipeImpl.WIRE_HEADER_ELEMENT_NAME, asDoc, null);
242         Message msg = message.clone();
243
244         msg.replaceMessageElement(WirePipeImpl.WIRE_HEADER_ELEMENT_NAMESPACE, elem);
245         checkMessenger();
246         try {
247             if (destMessenger instanceof TcpMessenger) {
248                 ((TcpMessenger) destMessenger).sendMessageDirect(msg, null, null, true);
249             } else  if (!destMessenger.sendMessage(msg, null, null)) {
250                 throw new IOException("Pipe closed");
251             }
252         } catch (IOException io) {
253             checkMessenger();
254             if (!destMessenger.sendMessage(msg, null, null)) {
255                 throw new IOException("Pipe closed");
256             }
257         }
258         return true;
259     }
260 }