2 * Copyright (c) 2004-2007 Sun Microsystems, Inc. All rights reserved.
4 * The Sun Project JXTA(TM) Software License
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
9 * 1. Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
16 * 3. The end-user documentation included with the redistribution, if any, must
17 * include the following acknowledgment: "This product includes software
18 * developed by Sun Microsystems, Inc. for JXTA(TM) technology."
19 * Alternately, this acknowledgment may appear in the software itself, if
20 * and wherever such third-party acknowledgments normally appear.
22 * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must
23 * not be used to endorse or promote products derived from this software
24 * without prior written permission. For written permission, please contact
25 * Project JXTA at http://www.jxta.org.
27 * 5. Products derived from this software may not be called "JXTA", nor may
28 * "JXTA" appear in their name, without prior written permission of Sun.
30 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
31 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
32 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN
33 * MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
34 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
36 * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
37 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
38 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
39 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41 * JXTA is a registered trademark of Sun Microsystems, Inc. in the United
42 * States and other countries.
44 * Please see the license information page at :
45 * <http://www.jxta.org/project/www/license.html> for instructions on use of
46 * the license in source files.
48 * ====================================================================
50 * This software consists of voluntary contributions made by many individuals
51 * on behalf of Project JXTA. For more information on Project JXTA, please see
52 * http://www.jxta.org.
54 * This license is based on the BSD license adopted by the Apache Foundation.
56 package net.jxta.endpoint;
58 import net.jxta.logging.Logging;
59 import net.jxta.util.SimpleSelectable;
60 import net.jxta.util.SimpleSelectable.IdentityReference;
61 import net.jxta.util.SimpleSelector;
63 import java.io.IOException;
64 import java.util.ArrayList;
65 import java.util.Collection;
66 import java.util.HashMap;
68 import java.util.concurrent.Executor;
69 import java.util.logging.Level;
70 import java.util.logging.Logger;
73 * The legacy getMessenger asynchronous API never returns any object to the invoker until a messenger could actually be made,
74 * allowing the application to supply a listener to be invoked when the operation completes. The legacy Messenger API also
75 * provides a method to send messages that calls a listener to report the outcome of the operation. <p/>
77 * The model has changed, so that an asynchronous messenger is made unresolved and returned immediately to the invoker, which can
78 * then request opening or even just send a message to force the opening. Subsequently, the messenger can be used as a control
79 * block to monitor progress with {@link Messenger#register} and {@link Messenger#waitState}.<p/>
81 * Likewise, the outcome of sending a message is a property of that message. Messages can be selected to monitor property changes
82 * with {@link Message#register} and {@link net.jxta.endpoint.Message#getMessageProperty(Object)} (the outcome property key is
83 * <code>Messenger.class</code>).
85 * This class here provides the legacy listener model on top of the new model for applications that prefer listeners. This class
86 * is used internally to emulate the legacy listener behaviour, so that applications do not need to be adapted.<p/>
88 * Note: one instance of this class gets instantiated by each EndpointService interface. However, it does not start using any
89 * resources until it first gets used.<p/>
91 public class ListenerAdaptor implements Runnable {
93 // FIXME - jice 20040413: Eventhough it is not as critical as it used to be we should get rid of old, never resolved entries.
94 // Attempts are supposed to always fail or succeed rather soon. Here, we trust transports in that matter. Is it safe ?
99 private final static transient Logger LOG = Logger.getLogger(ListenerAdaptor.class.getName());
102 * The in progress messages.
104 private final Map<IdentityReference, ListenerContainer> inprogress = new HashMap<IdentityReference, ListenerContainer>(32);
107 * The thread that does the work.
109 private Thread bgThread = null;
112 * The selector that we use to watch messengers progress.
114 private final SimpleSelector selector = new SimpleSelector();
117 * Have we been shutdown?
119 private volatile boolean shutdown = false;
122 * The exceutor service.
124 private final Executor executor;
127 * The ThreadGroup in which this adaptor will run.
129 private final ThreadGroup threadGroup;
132 * Standard Constructor
134 * @param threadGroup The ThreadGroup in which this adaptor will run.
136 public ListenerAdaptor(ThreadGroup threadGroup) {
137 this(threadGroup, null);
140 * Creates a ListenerAdaptor with a threadpool for notification callback.
142 * @param threadGroup The ThreadGroup in which this adaptor will run.
143 * @param executor the excutor to use for notification callback
145 public ListenerAdaptor(ThreadGroup threadGroup, Executor executor) {
146 this.executor = executor;
147 this.threadGroup = threadGroup;
151 * Cannot be re-started. Do not call once shutdown.
153 private synchronized void init() {
156 if (bgThread != null) {
160 bgThread = new Thread(threadGroup, this, "Listener Adaptor");
161 bgThread.setDaemon(true);
165 public synchronized void shutdown() {
168 // Stop the thread if it was ever created.
169 Thread bg = bgThread;
176 * Stop watching a given selectable.
178 * @param ts the selectable
180 private void forgetSelectable(SimpleSelectable ts) {
181 // Either way, we're done with this one.
182 ts.unregister(selector);
184 synchronized (this) {
185 inprogress.remove(ts.getIdentityReference());
190 * Select the given message and invoke the given listener when the message sending is complete.
192 * @param listener The listener to invoke. If null the resolution will take place, but obviously no listener will be invoked.
193 * @param message The message being sent.
194 * @return true if the message was registered successfully or the listener is null. If true it is guaranteed that the listener
195 * will be invoked unless null. If false, it is guaranteed that the listener will not be invoked.
197 public boolean watchMessage(OutgoingMessageEventListener listener, Message message) {
198 synchronized (this) {
203 if (listener == null) {
204 // We're done, then. The invoker does not really care.
211 // First we must ensure that if the state changes we'll get to handle it.
212 MessageListenerContainer allListeners = (MessageListenerContainer) inprogress.get(message.getIdentityReference());
214 if (allListeners == null) {
215 allListeners = new MessageListenerContainer();
216 inprogress.put(message.getIdentityReference(), allListeners);
218 allListeners.add(listener);
221 // When we do that, the selector gets notified. Therefore always check the initial state automatically. If the
222 // selectable is already done with, the listener will be called by the selector's handler.
223 message.register(selector);
229 * Select the given messenger and invoke the given listener when the messenger is resolved.
231 * @param listener The listener to invoke. If null the resolution will take place, but obviously no listener will be invoked.
232 * @param messenger The messenger being resolved.
233 * @return true if the messenger was registered successfully or the listener is null. If true it is guaranteed that the listener
234 * will be invoked unless null. If false, it is guaranteed that the listener will not be invoked.
236 public boolean watchMessenger(MessengerEventListener listener, Messenger messenger) {
237 synchronized (this) {
243 if (listener == null) {
244 // We're done, then. The invoker does not really care.
251 // First we must ensure that if the state changes we'll get to handle it.
252 MessengerListenerContainer allListeners = (MessengerListenerContainer) inprogress.get(messenger.getIdentityReference());
254 if (allListeners == null) {
255 // Use ArrayList. The code is optimized for that.
256 allListeners = new MessengerListenerContainer();
257 inprogress.put(messenger.getIdentityReference(), allListeners);
259 allListeners.add(listener);
262 // When we do that, the selector get notified. Therefore we will always check the initial state automatically. If the
263 // selectable is already done with, the listener will be called by the selector's handler.
264 messenger.register(selector);
270 * Any sort of listener type.
272 static abstract class ListenerContainer<S extends SimpleSelectable, L extends java.util.EventListener> extends ArrayList<L> {
274 public ListenerContainer() {
278 protected abstract void giveUp(S what, Throwable how);
280 protected abstract void process(S what);
287 @SuppressWarnings("serial")
288 class MessageListenerContainer extends ListenerContainer<Message, OutgoingMessageEventListener> {
290 private void messageDone(Message message, OutgoingMessageEvent event) {
291 // Note: synchronization is externally provided. When this method is invoked, this
292 // object has already been removed from the map, so the list of listener cannot change.
294 if (event == OutgoingMessageEvent.SUCCESS) {
295 // Replace it with a msg-specific one.
296 event = new OutgoingMessageEvent(message, null);
298 for (OutgoingMessageEventListener eachListener : this) {
300 eachListener.messageSendSucceeded(event);
301 } catch (Throwable any) {
302 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
303 LOG.log(Level.WARNING, "Uncaught throwable from listener", any);
308 if (event == OutgoingMessageEvent.OVERFLOW) {
309 // Replace it with a msg-specific one.
310 event = new OutgoingMessageEvent(message, null);
313 for (OutgoingMessageEventListener eachListener : this) {
315 eachListener.messageSendFailed(event);
316 } catch (Throwable any) {
317 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
318 LOG.log(Level.WARNING, "Uncaught throwable in listener", any);
329 protected void process(Message message) {
330 OutgoingMessageEvent event = (OutgoingMessageEvent) message.getMessageProperty(Messenger.class);
336 // Remove this container-selectable binding
337 forgetSelectable(message);
339 // Invoke app listeners
340 messageDone(message, event);
347 protected void giveUp(Message m, Throwable how) {
348 messageDone(m, new OutgoingMessageEvent(m, how));
355 @SuppressWarnings("serial")
356 class MessengerListenerContainer extends ListenerContainer<Messenger, MessengerEventListener> {
358 private void messengerDone(Messenger messenger) {
360 // Note: synchronization is externally provided. When this method is invoked, this
361 // object has already been removed from the map, so the list of listener cannot change.
363 MessengerEvent event = new MessengerEvent(ListenerAdaptor.this, messenger, null);
365 for (MessengerEventListener eachListener : this) {
367 eachListener.messengerReady(event);
368 } catch (Throwable any) {
369 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
370 LOG.log(Level.WARNING, "Uncaught throwable in listener", any);
380 protected void process(Messenger messenger) {
381 if ((messenger.getState() & (Messenger.RESOLVED | Messenger.TERMINAL)) == 0) {
385 // Remove this container-selectable binding
386 forgetSelectable(messenger);
388 if ((messenger.getState() & Messenger.USABLE) == 0) {
392 // Invoke app listeners
393 messengerDone(messenger);
400 protected void giveUp(Messenger m, Throwable how) {
412 Collection<SimpleSelectable> changed = selector.select();
413 for (SimpleSelectable simpleSelectable : changed) {
414 ListenerContainer listeners;
415 synchronized (this) {
416 listeners = inprogress.get(simpleSelectable.getIdentityReference());
418 if (listeners == null) {
419 simpleSelectable.unregister(selector);
422 if (executor == null) {
423 listeners.process(simpleSelectable);
425 executor.execute(new ListenerProcessor(listeners, simpleSelectable));
428 } catch (InterruptedException ie) {
429 Thread.interrupted();
432 } catch (Throwable anyOther) {
433 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
434 LOG.log(Level.SEVERE, "Uncaught Throwable in background thread", anyOther);
437 // There won't be any other thread. This thing is dead if that
438 // happens. And it really shouldn't.
439 synchronized (this) {
444 // It's only us now. Stopped is true.
445 IOException failed = new IOException("Endpoint interface terminated");
446 for (Map.Entry<IdentityReference, ListenerContainer> entry : inprogress.entrySet()) {
447 SimpleSelectable simpleSelectable = entry.getKey().getObject();
448 ListenerContainer listeners = entry.getValue();
449 simpleSelectable.unregister(selector);
451 if (listeners != null) {
452 listeners.giveUp(simpleSelectable, failed);
456 } catch (Throwable anyOther) {
457 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
458 LOG.log(Level.SEVERE, "Uncaught Throwable while shutting down background thread", anyOther);
466 * A small class for processing individual messages.
468 private class ListenerProcessor implements Runnable {
470 private SimpleSelectable simpleSelectable;
471 private ListenerContainer listeners;
472 ListenerProcessor(ListenerContainer listeners, SimpleSelectable simpleSelectable) {
473 this.listeners = listeners;
474 this.simpleSelectable = simpleSelectable;
478 listeners.process(simpleSelectable);