]> sjero.net Git - linphone/blob - p2pproxy/dependencies-src/jxse-src-2.5/impl/src/net/jxta/impl/pipe/InputPipeImpl.java
remove mediastreamer2 and add it as a submodule instead.
[linphone] / p2pproxy / dependencies-src / jxse-src-2.5 / impl / src / net / jxta / impl / pipe / InputPipeImpl.java
1 /*
2  * Copyright (c) 2001-2007 Sun Microsystems, Inc.  All rights reserved.
3  *  
4  *  The Sun Project JXTA(TM) Software License
5  *  
6  *  Redistribution and use in source and binary forms, with or without 
7  *  modification, are permitted provided that the following conditions are met:
8  *  
9  *  1. Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *  
12  *  2. Redistributions in binary form must reproduce the above copyright notice, 
13  *     this list of conditions and the following disclaimer in the documentation 
14  *     and/or other materials provided with the distribution.
15  *  
16  *  3. The end-user documentation included with the redistribution, if any, must 
17  *     include the following acknowledgment: "This product includes software 
18  *     developed by Sun Microsystems, Inc. for JXTA(TM) technology." 
19  *     Alternately, this acknowledgment may appear in the software itself, if 
20  *     and wherever such third-party acknowledgments normally appear.
21  *  
22  *  4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must 
23  *     not be used to endorse or promote products derived from this software 
24  *     without prior written permission. For written permission, please contact 
25  *     Project JXTA at http://www.jxta.org.
26  *  
27  *  5. Products derived from this software may not be called "JXTA", nor may 
28  *     "JXTA" appear in their name, without prior written permission of Sun.
29  *  
30  *  THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
31  *  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND 
32  *  FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN 
33  *  MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
34  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 
35  *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, 
36  *  OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
37  *  LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
38  *  NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, 
39  *  EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
40  *  
41  *  JXTA is a registered trademark of Sun Microsystems, Inc. in the United 
42  *  States and other countries.
43  *  
44  *  Please see the license information page at :
45  *  <http://www.jxta.org/project/www/license.html> for instructions on use of 
46  *  the license in source files.
47  *  
48  *  ====================================================================
49  *  
50  *  This software consists of voluntary contributions made by many individuals 
51  *  on behalf of Project JXTA. For more information on Project JXTA, please see 
52  *  http://www.jxta.org.
53  *  
54  *  This license is based on the BSD license adopted by the Apache Foundation. 
55  */
56 package net.jxta.impl.pipe;
57
58 import net.jxta.endpoint.EndpointAddress;
59 import net.jxta.endpoint.EndpointListener;
60 import net.jxta.endpoint.Message;
61 import net.jxta.id.ID;
62 import net.jxta.impl.util.TimeUtils;
63 import net.jxta.impl.util.UnbiasedQueue;
64 import net.jxta.logging.Logging;
65 import net.jxta.pipe.InputPipe;
66 import net.jxta.pipe.PipeID;
67 import net.jxta.pipe.PipeMsgEvent;
68 import net.jxta.pipe.PipeMsgListener;
69 import net.jxta.protocol.PipeAdvertisement;
70
71 import java.io.IOException;
72 import java.util.logging.Level;
73 import java.util.logging.Logger;
74
75 /**
76  * Implements the {@link net.jxta.pipe.InputPipe} interface by listening on the
77  * endpoint for messages to service "PipeService" and a param of the Pipe ID.
78  */
79 class InputPipeImpl implements EndpointListener, InputPipe {
80
81     /**
82      * logger
83      */
84     private final static Logger LOG = Logger.getLogger(InputPipeImpl.class.getName());
85
86     protected final static int QUEUESIZE = 100;
87
88     protected PipeRegistrar registrar;
89
90     protected final PipeAdvertisement pipeAdv;
91     protected final ID pipeID;
92
93     protected volatile boolean closed = false;
94
95     protected PipeMsgListener listener;
96     protected final UnbiasedQueue queue;
97
98     /**
99      * Constructor for the InputPipeImpl object
100      *
101      * @param r        pipe resolver
102      * @param adv      pipe advertisement
103      * @param listener listener to receive messages
104      * @throws IOException if an io error occurs
105      */
106     InputPipeImpl(PipeRegistrar r, PipeAdvertisement adv, PipeMsgListener listener) throws IOException {
107         registrar = r;
108         this.pipeAdv = adv;
109         this.listener = listener;
110
111         pipeID = adv.getPipeID();
112
113         if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
114             LOG.info(
115                     "Creating InputPipe for " + pipeID + " of type " + adv.getType() + " with "
116                     + ((null != listener) ? "listener" : "queue"));
117         }
118
119         // queue based inputpipe?
120         if (listener == null) {
121             queue = UnbiasedQueue.synchronizedQueue(new UnbiasedQueue(QUEUESIZE, true));
122         } else {
123             queue = null;
124         }
125
126         if (!registrar.register(this)) {
127             throw new IOException("Could not register input pipe (already registered) for " + pipeID);
128         }
129     }
130
131     /**
132      * {@inheritDoc}
133      * <p/>
134      * Closes the pipe.
135      */
136     @Override
137     protected synchronized void finalize() throws Throwable {
138         if (!closed) {
139             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
140                 LOG.warning("Pipe is being finalized without being previously closed. This is likely a bug.");
141             }
142         }
143         close();
144         super.finalize();
145     }
146
147     /**
148      * {@inheritDoc}
149      */
150     public Message waitForMessage() throws InterruptedException {
151         return poll(0);
152     }
153
154     /**
155      * {@inheritDoc}
156      */
157     public Message poll(int timeout) throws InterruptedException {
158         if (listener == null) {
159             return (Message) queue.pop(timeout);
160         } else {
161             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
162                 LOG.warning("poll() has no effect in listener mode.");
163             }
164             return null;
165         }
166     }
167
168     /**
169      * {@inheritDoc}
170      */
171     public synchronized void close() {
172         if (closed) {
173             return;
174         }
175         closed = true;
176
177         // Close the queue
178         if (null == listener) {
179             queue.close();
180         }
181
182         listener = null;
183         // Remove myself from the pipe registrar.
184         if (!registrar.forget(this)) {
185             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
186                 LOG.warning("close() : pipe was not registered with registrar.");
187             }
188         }
189         registrar = null;
190
191         if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
192             LOG.info("Closed " + pipeID);
193         }
194     }
195
196     /**
197      * {@inheritDoc}
198      */
199     public void processIncomingMessage(Message msg, EndpointAddress srcAddr, EndpointAddress dstAddr) {
200         // if we are closed, ignore any additional messages
201         if (closed) {
202             return;
203         }
204
205         // XXX: header check, security and such should be done here
206         // before pushing the message onto the queue.
207         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
208             LOG.fine("Received " + msg + " from " + srcAddr + " for " + pipeID);
209         }
210         // determine where demux the msg, to listener, or onto the queue
211         if (null == queue) {
212             PipeMsgListener temp = listener;
213             if (null == temp) {
214                 return;
215             }
216
217             PipeMsgEvent event = new PipeMsgEvent(this, msg, (PipeID) pipeID);
218             try {
219                 temp.pipeMsgEvent(event);
220             } catch (Throwable ignored) {
221                 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
222                     LOG.log(Level.SEVERE, "Uncaught Throwable in listener for : " + pipeID + "(" + temp.getClass().getName() + ")", ignored);
223                 }
224             }
225         } else {
226             boolean pushed = false;
227             while (!pushed && !queue.isClosed()) {
228                 try {
229                     pushed = queue.push(msg, TimeUtils.ASECOND);
230                 } catch (InterruptedException woken) {
231                     Thread.interrupted();
232                 }
233             }
234
235             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
236                 synchronized (this) {
237                     LOG.fine("Queued " + msg + " for " + pipeID + "\n\tqueue closed : " + queue.isClosed() + "\tnumber in queue : "
238                             + queue.getCurrentInQueue() + "\tnumber queued : " + queue.getNumEnqueued() + "\tnumber dequeued : "
239                             + queue.getNumDequeued());
240                 }
241             }
242         }
243     }
244
245     /**
246      * Gets the pipe type
247      *
248      * @return The type
249      */
250     public String getType() {
251         return pipeAdv.getType();
252     }
253
254     /**
255      * Gets the pipe id
256      *
257      * @return The type
258      */
259     public ID getPipeID() {
260         return pipeID;
261     }
262
263     /**
264      * Gets the pipe name
265      *
266      * @return The name
267      */
268     public String getName() {
269         return pipeAdv.getName();
270     }
271
272     /**
273      * Gets the pipe advertisement
274      *
275      * @return The advertisement
276      */
277     public PipeAdvertisement getAdvertisement() {
278         return pipeAdv;
279     }
280 }