]> sjero.net Git - linphone/blob - p2pproxy/dependencies-src/jxse-src-2.5/api/src/net/jxta/endpoint/ListenerAdaptor.java
remove mediastreamer2 and add it as a submodule instead.
[linphone] / p2pproxy / dependencies-src / jxse-src-2.5 / api / src / net / jxta / endpoint / ListenerAdaptor.java
1 /*
2  * Copyright (c) 2004-2007 Sun Microsystems, Inc.  All rights reserved.
3  *  
4  *  The Sun Project JXTA(TM) Software License
5  *  
6  *  Redistribution and use in source and binary forms, with or without 
7  *  modification, are permitted provided that the following conditions are met:
8  *  
9  *  1. Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *  
12  *  2. Redistributions in binary form must reproduce the above copyright notice, 
13  *     this list of conditions and the following disclaimer in the documentation 
14  *     and/or other materials provided with the distribution.
15  *  
16  *  3. The end-user documentation included with the redistribution, if any, must 
17  *     include the following acknowledgment: "This product includes software 
18  *     developed by Sun Microsystems, Inc. for JXTA(TM) technology." 
19  *     Alternately, this acknowledgment may appear in the software itself, if 
20  *     and wherever such third-party acknowledgments normally appear.
21  *  
22  *  4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must 
23  *     not be used to endorse or promote products derived from this software 
24  *     without prior written permission. For written permission, please contact 
25  *     Project JXTA at http://www.jxta.org.
26  *  
27  *  5. Products derived from this software may not be called "JXTA", nor may 
28  *     "JXTA" appear in their name, without prior written permission of Sun.
29  *  
30  *  THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
31  *  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND 
32  *  FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN 
33  *  MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
34  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 
35  *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, 
36  *  OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
37  *  LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
38  *  NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, 
39  *  EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
40  *  
41  *  JXTA is a registered trademark of Sun Microsystems, Inc. in the United 
42  *  States and other countries.
43  *  
44  *  Please see the license information page at :
45  *  <http://www.jxta.org/project/www/license.html> for instructions on use of 
46  *  the license in source files.
47  *  
48  *  ====================================================================
49  *  
50  *  This software consists of voluntary contributions made by many individuals 
51  *  on behalf of Project JXTA. For more information on Project JXTA, please see 
52  *  http://www.jxta.org.
53  *  
54  *  This license is based on the BSD license adopted by the Apache Foundation. 
55  */
56 package net.jxta.endpoint;
57
58 import net.jxta.logging.Logging;
59 import net.jxta.util.SimpleSelectable;
60 import net.jxta.util.SimpleSelectable.IdentityReference;
61 import net.jxta.util.SimpleSelector;
62
63 import java.io.IOException;
64 import java.util.ArrayList;
65 import java.util.Collection;
66 import java.util.HashMap;
67 import java.util.Map;
68 import java.util.concurrent.Executor;
69 import java.util.logging.Level;
70 import java.util.logging.Logger;
71
72 /**
73  * The legacy getMessenger asynchronous API never returns any object to the invoker until a messenger could actually be made,
74  * allowing the application to supply a listener to be invoked when the operation completes. The legacy Messenger API also
75  * provides a method to send messages that calls a listener to report the outcome of the operation.  <p/>
76  * <p/>
77  * The model has changed, so that an asynchronous messenger is made unresolved and returned immediately to the invoker, which can
78  * then request opening or even just send a message to force the opening. Subsequently, the messenger can be used as a control
79  * block to monitor progress with {@link Messenger#register} and {@link Messenger#waitState}.<p/>
80  * <p/>
81  * Likewise, the outcome of sending a message is a property of that message. Messages can be selected to monitor property changes
82  * with {@link Message#register} and {@link net.jxta.endpoint.Message#getMessageProperty(Object)} (the outcome property key is
83  * <code>Messenger.class</code>).
84  * <p/>
85  * This class here provides the legacy listener model on top of the new model for applications that prefer listeners. This class
86  * is used internally to emulate the legacy listener behaviour, so that applications do not need to be adapted.<p/>
87  * <p/>
88  * Note: one instance of this class gets instantiated by each EndpointService interface. However, it does not start using any
89  * resources until it first gets used.<p/>
90  */
91 public class ListenerAdaptor implements Runnable {
92
93     // FIXME - jice 20040413: Eventhough it is not as critical as it used to be we should get rid of old, never resolved entries.
94     // Attempts are supposed to always fail or succeed rather soon. Here, we trust transports in that matter. Is it safe ?
95
96     /**
97      * Logger
98      */
99     private final static transient Logger LOG = Logger.getLogger(ListenerAdaptor.class.getName());
100
101     /**
102      * The in progress messages.
103      */
104     private final Map<IdentityReference, ListenerContainer> inprogress = new HashMap<IdentityReference, ListenerContainer>(32);
105
106     /**
107      * The thread that does the work.
108      */
109     private Thread bgThread = null;
110
111     /**
112      * The selector that we use to watch messengers progress.
113      */
114     private final SimpleSelector selector = new SimpleSelector();
115
116     /**
117      * Have we been shutdown?
118      */
119     private volatile boolean shutdown = false;
120
121     /**
122      * The exceutor service.
123      */
124     private final Executor executor;
125
126     /**
127      * The ThreadGroup in which this adaptor will run.
128      */
129     private final ThreadGroup threadGroup;
130
131     /**
132      * Standard Constructor
133      *
134      * @param threadGroup The ThreadGroup in which this adaptor will run.
135      */
136     public ListenerAdaptor(ThreadGroup threadGroup) {
137         this(threadGroup, null);
138     }
139     /**
140      * Creates a ListenerAdaptor with a threadpool for notification callback.
141      *
142      * @param threadGroup The ThreadGroup in which this adaptor will run.
143      * @param executor the excutor to use for notification callback
144      */
145     public ListenerAdaptor(ThreadGroup threadGroup, Executor executor) {
146         this.executor = executor;
147         this.threadGroup = threadGroup;
148     }
149
150     /**
151      * Cannot be re-started. Do not call once shutdown.
152      */
153     private synchronized void init() {
154         assert !shutdown;
155
156         if (bgThread != null) {
157             return;
158         }
159
160         bgThread = new Thread(threadGroup, this, "Listener Adaptor");
161         bgThread.setDaemon(true);
162         bgThread.start();
163     }
164
165     public synchronized void shutdown() {
166         shutdown = true;
167
168         // Stop the thread if it was ever created.
169         Thread bg = bgThread;
170         if (bg != null) {
171             bg.interrupt();
172         }
173     }
174
175     /**
176      * Stop watching a given selectable.
177      *
178      * @param ts the selectable
179      */
180     private void forgetSelectable(SimpleSelectable ts) {
181         // Either way, we're done with this one.
182         ts.unregister(selector);
183
184         synchronized (this) {
185             inprogress.remove(ts.getIdentityReference());
186         }
187     }
188
189     /**
190      * Select the given message and invoke the given listener when the message sending is complete.
191      *
192      * @param listener The listener to invoke. If null the resolution will take place, but obviously no listener will be invoked.
193      * @param message  The message being sent.
194      * @return true if the message was registered successfully or the listener is null. If true it is guaranteed that the listener
195      *         will be invoked unless null. If false, it is guaranteed that the listener will not be invoked.
196      */
197     public boolean watchMessage(OutgoingMessageEventListener listener, Message message) {
198         synchronized (this) {
199             if (shutdown) {
200                 return false;
201             }
202
203             if (listener == null) {
204                 // We're done, then. The invoker does not really care.
205                 return true;
206             }
207
208             // Init if needed.
209             init();
210
211             // First we must ensure that if the state changes we'll get to handle it.
212             MessageListenerContainer allListeners = (MessageListenerContainer) inprogress.get(message.getIdentityReference());
213
214             if (allListeners == null) {
215                 allListeners = new MessageListenerContainer();
216                 inprogress.put(message.getIdentityReference(), allListeners);
217             }
218             allListeners.add(listener);
219         }
220
221         // When we do that, the selector gets notified. Therefore always check the initial state automatically. If the
222         // selectable is already done with, the listener will be called by the selector's handler.
223         message.register(selector);
224
225         return true;
226     }
227
228     /**
229      * Select the given messenger and invoke the given listener when the messenger is resolved.
230      *
231      * @param listener  The listener to invoke. If null the resolution will take place, but obviously no listener will be invoked.
232      * @param messenger The messenger being resolved.
233      * @return true if the messenger was registered successfully or the listener is null. If true it is guaranteed that the listener
234      *         will be invoked unless null. If false, it is guaranteed that the listener will not be invoked.
235      */
236     public boolean watchMessenger(MessengerEventListener listener, Messenger messenger) {
237         synchronized (this) {
238
239             if (shutdown) {
240                 return false;
241             }
242
243             if (listener == null) {
244                 // We're done, then. The invoker does not really care.
245                 return true;
246             }
247
248             // Init if needed.
249             init();
250
251             // First we must ensure that if the state changes we'll get to handle it.
252             MessengerListenerContainer allListeners = (MessengerListenerContainer) inprogress.get(messenger.getIdentityReference());
253
254             if (allListeners == null) {
255                 // Use ArrayList. The code is optimized for that.
256                 allListeners = new MessengerListenerContainer();
257                 inprogress.put(messenger.getIdentityReference(), allListeners);
258             }
259             allListeners.add(listener);
260         }
261
262         // When we do that, the selector get notified. Therefore we will always check the initial state automatically. If the
263         // selectable is already done with, the listener will be called by the selector's handler.
264         messenger.register(selector);
265
266         return true;
267     }
268
269     /*
270      * Any sort of listener type.
271      */
272     static abstract class ListenerContainer<S extends SimpleSelectable, L extends java.util.EventListener> extends ArrayList<L> {
273
274         public ListenerContainer() {
275             super(1);
276         }
277
278         protected abstract void giveUp(S what, Throwable how);
279
280         protected abstract void process(S what);
281     }
282
283
284     /**
285      * For messages
286      */
287     @SuppressWarnings("serial")
288     class MessageListenerContainer extends ListenerContainer<Message, OutgoingMessageEventListener> {
289
290         private void messageDone(Message message, OutgoingMessageEvent event) {
291             // Note: synchronization is externally provided. When this method is invoked, this
292             // object has already been removed from the map, so the list of listener cannot change.
293
294             if (event == OutgoingMessageEvent.SUCCESS) {
295                 // Replace it with a msg-specific one.
296                 event = new OutgoingMessageEvent(message, null);
297
298                 for (OutgoingMessageEventListener eachListener : this) {
299                     try {
300                         eachListener.messageSendSucceeded(event);
301                     } catch (Throwable any) {
302                         if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
303                             LOG.log(Level.WARNING, "Uncaught throwable from listener", any);
304                         }
305                     }
306                 }
307             } else {
308                 if (event == OutgoingMessageEvent.OVERFLOW) {
309                     // Replace it with a msg-specific one.
310                     event = new OutgoingMessageEvent(message, null);
311                 }
312
313                 for (OutgoingMessageEventListener eachListener : this) {
314                     try {
315                         eachListener.messageSendFailed(event);
316                     } catch (Throwable any) {
317                         if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
318                             LOG.log(Level.WARNING, "Uncaught throwable in listener", any);
319                         }
320                     }
321                 }
322             }
323         }
324
325         /**
326          * {@inheritDoc}
327          */
328         @Override
329         protected void process(Message message) {
330             OutgoingMessageEvent event = (OutgoingMessageEvent) message.getMessageProperty(Messenger.class);
331
332             if (event == null) {
333                 return;
334             }
335
336             // Remove this container-selectable binding
337             forgetSelectable(message);
338
339             // Invoke app listeners
340             messageDone(message, event);
341         }
342
343         /**
344          * {@inheritDoc}
345          */
346         @Override
347         protected void giveUp(Message m, Throwable how) {
348             messageDone(m, new OutgoingMessageEvent(m, how));
349         }
350     }
351
352     /**
353      * For messengers
354      */
355     @SuppressWarnings("serial")
356     class MessengerListenerContainer extends ListenerContainer<Messenger, MessengerEventListener> {
357
358         private void messengerDone(Messenger messenger) {
359
360             // Note: synchronization is externally provided. When this method is invoked, this
361             // object has already been removed from the map, so the list of listener cannot change.
362
363             MessengerEvent event = new MessengerEvent(ListenerAdaptor.this, messenger, null);
364
365             for (MessengerEventListener eachListener : this) {
366                 try {
367                     eachListener.messengerReady(event);
368                 } catch (Throwable any) {
369                     if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
370                         LOG.log(Level.WARNING, "Uncaught throwable in listener", any);
371                     }
372                 }
373             }
374         }
375
376         /**
377          * {@inheritDoc}
378          */
379         @Override
380         protected void process(Messenger messenger) {
381             if ((messenger.getState() & (Messenger.RESOLVED | Messenger.TERMINAL)) == 0) {
382                 return;
383             }
384
385             // Remove this container-selectable binding
386             forgetSelectable(messenger);
387
388             if ((messenger.getState() & Messenger.USABLE) == 0) {
389                 messenger = null;
390             }
391
392             // Invoke app listeners
393             messengerDone(messenger);
394         }
395
396         /**
397          * {@inheritDoc}
398          */
399         @Override
400         protected void giveUp(Messenger m, Throwable how) {
401             messengerDone(null);
402         }
403     }
404
405     /**
406      * {@inheritDoc}
407      */
408     public void run() {
409         try {
410             while (!shutdown) {
411                 try {
412                     Collection<SimpleSelectable> changed = selector.select();
413                     for (SimpleSelectable simpleSelectable : changed) {
414                         ListenerContainer listeners;
415                         synchronized (this) {
416                             listeners = inprogress.get(simpleSelectable.getIdentityReference());
417                         }
418                         if (listeners == null) {
419                             simpleSelectable.unregister(selector);
420                             continue;
421                         }
422                         if (executor == null) {
423                             listeners.process(simpleSelectable);
424                         } else {
425                             executor.execute(new ListenerProcessor(listeners, simpleSelectable));
426                         }
427                     }
428                 } catch (InterruptedException ie) {
429                     Thread.interrupted();
430                 }
431             }
432         } catch (Throwable anyOther) {
433             if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
434                 LOG.log(Level.SEVERE, "Uncaught Throwable in background thread", anyOther);
435             }
436
437             // There won't be any other thread. This thing is dead if that
438             // happens. And it really shouldn't.
439             synchronized (this) {
440                 shutdown = true;
441             }
442         } finally {
443             try {
444                 // It's only us now. Stopped is true.
445                 IOException failed = new IOException("Endpoint interface terminated");
446                 for (Map.Entry<IdentityReference, ListenerContainer> entry : inprogress.entrySet()) {
447                     SimpleSelectable simpleSelectable = entry.getKey().getObject();
448                     ListenerContainer listeners = entry.getValue();
449                     simpleSelectable.unregister(selector);
450
451                     if (listeners != null) {
452                         listeners.giveUp(simpleSelectable, failed);
453                     }
454                 }
455                 inprogress.clear();
456             } catch (Throwable anyOther) {
457                 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
458                     LOG.log(Level.SEVERE, "Uncaught Throwable while shutting down background thread", anyOther);
459                 }
460             }
461             bgThread = null;
462         }
463     }
464
465     /**
466      * A small class for processing individual messages.
467      */
468     private class ListenerProcessor implements Runnable {
469
470         private SimpleSelectable simpleSelectable;
471         private ListenerContainer listeners;
472         ListenerProcessor(ListenerContainer listeners, SimpleSelectable simpleSelectable) {
473             this.listeners = listeners;
474             this.simpleSelectable = simpleSelectable;
475         }
476
477         public void run() {
478             listeners.process(simpleSelectable);
479         }
480     }
481 }