]> sjero.net Git - linphone/blob - p2pproxy/dependencies-src/jxse-src-2.5/impl/src/net/jxta/impl/endpoint/BlockingMessenger.java
remove mediastreamer2 and add it as a submodule instead.
[linphone] / p2pproxy / dependencies-src / jxse-src-2.5 / impl / src / net / jxta / impl / endpoint / BlockingMessenger.java
1 /*
2  * Copyright (c) 2004-2007 Sun Microsystems, Inc.  All rights reserved.
3  *  
4  *  The Sun Project JXTA(TM) Software License
5  *  
6  *  Redistribution and use in source and binary forms, with or without 
7  *  modification, are permitted provided that the following conditions are met:
8  *  
9  *  1. Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *  
12  *  2. Redistributions in binary form must reproduce the above copyright notice, 
13  *     this list of conditions and the following disclaimer in the documentation 
14  *     and/or other materials provided with the distribution.
15  *  
16  *  3. The end-user documentation included with the redistribution, if any, must 
17  *     include the following acknowledgment: "This product includes software 
18  *     developed by Sun Microsystems, Inc. for JXTA(TM) technology." 
19  *     Alternately, this acknowledgment may appear in the software itself, if 
20  *     and wherever such third-party acknowledgments normally appear.
21  *  
22  *  4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must 
23  *     not be used to endorse or promote products derived from this software 
24  *     without prior written permission. For written permission, please contact 
25  *     Project JXTA at http://www.jxta.org.
26  *  
27  *  5. Products derived from this software may not be called "JXTA", nor may 
28  *     "JXTA" appear in their name, without prior written permission of Sun.
29  *  
30  *  THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
31  *  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND 
32  *  FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN 
33  *  MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
34  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 
35  *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, 
36  *  OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
37  *  LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
38  *  NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, 
39  *  EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
40  *  
41  *  JXTA is a registered trademark of Sun Microsystems, Inc. in the United 
42  *  States and other countries.
43  *  
44  *  Please see the license information page at :
45  *  <http://www.jxta.org/project/www/license.html> for instructions on use of 
46  *  the license in source files.
47  *  
48  *  ====================================================================
49  *  
50  *  This software consists of voluntary contributions made by many individuals 
51  *  on behalf of Project JXTA. For more information on Project JXTA, please see 
52  *  http://www.jxta.org.
53  *  
54  *  This license is based on the BSD license adopted by the Apache Foundation. 
55  */
56 package net.jxta.impl.endpoint;
57
58 import java.util.Timer;
59 import java.util.TimerTask;
60
61 import java.io.IOException;
62 import java.io.InterruptedIOException;
63
64 import java.util.logging.Level;
65
66 import net.jxta.logging.Logging;
67
68 import java.util.logging.Logger;
69
70 import net.jxta.endpoint.AbstractMessenger;
71 import net.jxta.endpoint.ChannelMessenger;
72 import net.jxta.endpoint.EndpointAddress;
73 import net.jxta.endpoint.Message;
74 import net.jxta.endpoint.Messenger;
75 import net.jxta.endpoint.MessengerState;
76 import net.jxta.endpoint.OutgoingMessageEvent;
77 import net.jxta.peergroup.PeerGroupID;
78 import net.jxta.util.SimpleSelectable;
79
80 import net.jxta.impl.util.TimeUtils;
81
82 /**
83  * This class is a near-drop-in replacement for the previous BlockingMessenger class.
84  * To subclassers (that is, currently, transports) the only difference is that some
85  * overloaded methods have a different name (class hierarchy reasons made it impossible
86  * to preserve the names without forcing an API change for applications).
87  *
88  * The other difference which is not API visible, is that it implements the
89  * standard MessengerState behaviour and semantics required by the changes in the endpoint framework.
90  *
91  * This the only base messenger class meant to be extended by outside code that is in the impl tree. The
92  * reason being that what it replaces was there already and that new code should not become dependant upon it.
93  */
94 public abstract class BlockingMessenger extends AbstractMessenger {
95
96     /**
97      * Logger
98      */
99     private final static transient Logger LOG = Logger.getLogger(BlockingMessenger.class.getName());
100
101     /**
102      * The self destruct timer.
103      * <p/>
104      * When a messenger has become idle, it is closed. As a side effect, it
105      * makes the owning canonical messenger, if any, subject to removal if it is
106      * otherwise unreferenced.
107      */
108     private final static transient Timer timer = new Timer("BlockingMessenger self destruct timer", true);
109
110     /*
111      * Actions that we defer to after returning from event methods. In other 
112      * words, they cannot be done with the lock held, or they require calling 
113      * more event methods.  Because this messenger can take only one message at 
114      * a time (saturated while sending), actions do not cascade much. Start can 
115      * lead to connect if the sending fails, but, because we always fail to
116      * connect, connect will not lead to start. As a result we can get away with 
117      * performing deferred actions recursively. That simplifies the code.
118      */
119     private enum DeferredAction {
120         /**
121          * No deferred action.
122          */
123         ACTION_NONE,
124
125         /**
126          * Must send message.
127          */
128         ACTION_SEND,
129
130         /**
131          * Must report failure to connect.
132          */
133         ACTION_CONNECT
134     }
135
136     /**
137      * The outstanding message.
138      */
139     private Message currentMessage = null;
140
141     /**
142      * The serviceName override for that message.
143      */
144     private String currentService = null;
145
146     /**
147      * The serviceParam override for that message.
148      */
149     private String currentParam = null;
150
151     /**
152      * The exception that caused that message to not be sent.
153      */
154     private Throwable currentThrowable = null;
155
156     /**
157      * true if we have deliberately closed our one message input queue.
158      */
159     private boolean inputClosed = false;
160
161     /**
162      * Need to know which group this transport lives in, so that we can suppress
163      * channel redirection when in the same group. This is currently the norm.
164      */
165     private final PeerGroupID homeGroupID;
166
167     /**
168      * The current deferred action.
169      */
170     private DeferredAction deferredAction = DeferredAction.ACTION_NONE;
171
172     /**
173      * Reference to owning object. This is there so that the owning object is not subject to garbage collection
174      * unless this object here becomes itself unreferenced. That happens when the self destruct timer closed it.
175      */
176     private Object owner = null;
177
178     /**
179      * The timer task watching over our self destruction requirement.
180      */
181     private final TimerTask selfDestructTask;
182
183     /**
184      * State lock and engine.
185      */
186     private final BlockingMessengerState stateMachine = new BlockingMessengerState();
187
188     /**
189      * legacy artefact: transports need to believe the messenger is not yet closed in order to actually close it.
190      * So we lie to them just while we run their closeImpl method so that they do not see that the messenger is
191      * officially closed.
192      */
193     private boolean lieToOldTransports = false;
194
195     /**
196      * Our statemachine implementation; just connects the standard AbstractMessengerState action methods to
197      * this object.
198      */
199     private class BlockingMessengerState extends MessengerState {
200
201         protected BlockingMessengerState() {
202             super(true);
203         }
204
205         /*
206          * The required action methods.
207          */
208
209         /**
210          * {@inheritDoc}
211          */
212         @Override
213         protected void connectAction() {
214             deferredAction = DeferredAction.ACTION_CONNECT;
215         }
216
217         /**
218          * {@inheritDoc}
219          */
220         @Override
221         protected void startAction() {
222             deferredAction = DeferredAction.ACTION_SEND;
223         }
224
225         /**
226          * {@inheritDoc}
227          */
228         @Override
229         protected void closeInputAction() {
230             // we're synchonized here. (invoked from stateMachine).
231             inputClosed = true;
232         }
233
234         /**
235          * {@inheritDoc}
236          */
237         @Override
238         protected void closeOutputAction() {
239             // This will break the cnx; thereby causing a down event if we have a send in progress.
240             // If the cnx does not break before the current message is sent, then the message will be sent successfully,
241             // resulting in an idle event. Either of these events is enough to complete the shutdown process.
242             lieToOldTransports = true;
243             closeImpl();
244             lieToOldTransports = false;
245
246             // Disconnect from the timer.
247             if (selfDestructTask != null) {
248                 selfDestructTask.cancel();
249             }
250         }
251
252         // This is a synchronous action. No synchronization needed: we're already synchronized, here.
253         // There's a subtlety here: we do not clear the current message. We let sendMessageB or sendMessageN
254         // deal with it, so that they can handle the status reporting each in their own way. So basically, all we
255         // do is to set a reason for that message to fail in case we are shutdown from the outside and that message
256         // is not sent yet. As long as there is a current message, it is guaranteed that there is a thread
257         // in charge of reporting its status. It is also guaranteed that when failAll is called, the input is
258         // already closed, and so, we have no obligation of making room for future messages immediately.
259         // All this aggravation is so that we do not have to create one context wrapper for each message just so
260         // that we can associate it with its result. Instead we use our single msg and single status model
261         // throughout.
262         @Override
263         protected void failAllAction() {
264
265             if (currentMessage == null) {
266                 return;
267             }
268
269             if (currentThrowable == null) {
270                 currentThrowable = new IOException("Messenger unexpectedly closed");
271             }
272         }
273     }
274
275
276     /**
277      * The implementation of channel messenger that getChannelMessenger returns:
278      * All it does is address rewriting. Even close() is forwarded to the shared messenger.
279      * The reason is that BlockingMessengers are not really shared; they're transitional
280      * entities used directly by CanonicalMessenger. GetChannel is used only to provide address
281      * rewriting when we pass a blocking messenger directly to incoming messenger listeners...this
282      * practice is to be removed in the future, in favor of making incoming messengers full-featured
283      * async messengers that can be shared.
284      */
285     private final class BlockingMessengerChannel extends ChannelMessenger {
286
287         public BlockingMessengerChannel(EndpointAddress baseAddress, PeerGroupID redirection, String origService, String origServiceParam) {
288
289             super(baseAddress, redirection, origService, origServiceParam);
290
291             // We tell our super class that we synchronize on the stateMachine object. Althoug it is not obvious, our getState()
292             // method calls the shared messenger getState() method, which synchronizes on the shared messenger's state machine
293             // object. So, that's what we must specify.  Logic would dictate that we pass it to super(), but it is not itself
294             // constructed until super() returns. No way around it.
295
296             setStateLock(stateMachine);
297         }
298
299         /**
300          * {@inheritDoc}
301          */
302         public int getState() {
303             return BlockingMessenger.this.getState();
304         }
305
306         /**
307          * {@inheritDoc}
308          */
309         public void resolve() {
310             BlockingMessenger.this.resolve();
311         }
312
313         /**
314          * {@inheritDoc}
315          */
316         public void close() {
317             BlockingMessenger.this.close();
318         }
319
320         /**
321          * {@inheritDoc}
322          *
323          * <p/>
324          * Address rewriting done here.
325          */
326         public boolean sendMessageN(Message msg, String service, String serviceParam) {
327             return BlockingMessenger.this.sendMessageN(msg, effectiveService(service), effectiveParam(service, serviceParam));
328         }
329
330         /**
331          * {@inheritDoc}
332          *
333          * <p/>
334          * Address rewriting done here.
335          */
336         public void sendMessageB(Message msg, String service, String serviceParam) throws IOException {
337             BlockingMessenger.this.sendMessageB(msg, effectiveService(service), effectiveParam(service, serviceParam));
338         }
339
340         /**
341          * {@inheritDoc}
342          *
343          * <p/>
344          * We're supposed to return the complete destination, including
345          * service and param specific to that channel. It is not clear, whether
346          * this should include the cross-group mangling, though. For now, let's
347          * say it does not.
348          */
349         public EndpointAddress getLogicalDestinationAddress() {
350             EndpointAddress rawLogical = getLogicalDestinationImpl();
351
352             if (rawLogical == null) {
353                 return null;
354             }
355             return new EndpointAddress(rawLogical, origService, origServiceParam);
356         }
357
358         // Check if it is worth staying registered
359         public void itemChanged(Object changedObject) {
360
361             if (!notifyChange()) {
362                 if (haveListeners()) {
363                     return;
364                 }
365
366                 BlockingMessenger.this.unregisterListener(this);
367
368                 if (!haveListeners()) {
369                     return;
370                 }
371
372                 // Ooops collision. We should not have unregistered. Next time, then. In case of collision, the end result
373                 // is always to stay registered. There's no harm in staying registered.
374                 BlockingMessenger.this.registerListener(this);
375             }
376         }
377
378         /**
379          * {@inheritDoc}
380          * <p/>
381          * Always make sure we're registered with the shared messenger.
382          */
383         @Override
384         protected void registerListener(SimpleSelectable l) {
385             BlockingMessenger.this.registerListener(this);
386             super.registerListener(l);
387         }
388     }
389
390     private void storeCurrent(Message msg, String service, String param) {
391         currentMessage = msg;
392         currentService = service;
393         currentParam = param;
394         currentThrowable = null;
395     }
396
397     /**
398      * Constructor.
399      * <p/>
400      * We start in the CONNECTED state, we pretend to have a queue of size 1, and we can never re-connect.  Although this
401      * messenger fully respects the asynchronous semantics, it is saturated as soon as one msg is being send, and if not
402      * saturated, send is actually performed by the invoker thread. So return is not completely immediate.  This is a barely
403      * acceptable implementation, but this is also a transition adapter that is bound to disappear one release from now. The main
404      * goal is to get things going until transports are adapted.
405      *
406      * @param homeGroupID  the group that this messenger works for. This is the group of the endpoint service or transport
407      *                     that created this messenger.
408      * @param dest         where messages should be addressed to
409      * @param selfDestruct true if this messenger must self close destruct when idle. <b>Warning:</b> If selfDestruct is used,
410      *                     this messenger will remained referenced for as long as isIdleImpl returns false.
411      */
412
413     public BlockingMessenger(PeerGroupID homeGroupID, EndpointAddress dest, boolean selfDestruct) {
414
415         super(dest);
416
417         this.homeGroupID = homeGroupID;
418
419         // We tell our superclass that we synchronize our state on the stateMachine object.  Logic would dictate that we pass it
420         // to super(), but it is not itself constructed until super() returns. No way around it.
421
422         setStateLock(stateMachine);
423
424         /*
425          * Sets up a timer task that will close this messenger if it says to have become idle. It will keep it referenced
426          * until then.
427          * <p/>
428          * As long as this timer task is scheduled, this messenger is not subject to GC. Therefore, its owner, if any, which is strongly
429          * referenced, is not subject to GC either. This avoids prematurely closing open connections just because a destination is
430          * not currently in use, which we would have to do if CanonicalMessengers could be GCed independantly (and which would
431          * force us to use finalizers, too).<p/>
432          *
433          * Such a mechanism is usefull only if this blocking messenger is expensive to make or holds system resources that require
434          * an explicit invocation of the close method. Else, it is better to let it be GC'ed along with any refering canonical
435          * messenger when memory is tight.<p/>
436          *
437          */
438
439         //
440         // FIXME 20040413 jice : we trust transports to implement isIdle reasonably, which may be a leap of faith. We
441         // should probably superimpose a time limit of our own.
442         //
443         if (selfDestruct) {
444             selfDestructTask = new TimerTask() {
445
446                 /**
447                  * {@inheritDoc}
448                  */
449                 @Override
450                 public void run() {
451                     try {
452                         if (isIdleImpl()) {
453                             close();
454                         } else {
455                             return;
456                         }
457                     } catch (Throwable uncaught) {
458                         if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
459                             LOG.log(Level.SEVERE, "Uncaught Throwable in selfDescructTask. ", uncaught);
460                         }
461                     }
462                     cancel();
463                 }
464             };
465
466             timer.schedule(selfDestructTask, TimeUtils.AMINUTE, TimeUtils.AMINUTE);
467         } else {
468             selfDestructTask = null;
469         }
470     }
471
472     /**
473      * Sets an owner for this blocking messenger. Owners are normally canonical messengers. The goal of registering the owner is
474      * to keep that owner reachable as long as this blocking messenger is.  Canonical messengers are otherwise softly referenced,
475      * and so, may be deleted whenever memory is tight.
476      * <p/>
477      * We do not want to use finalizers or the equivalent reference queue mechanism; so we have no idea when a blocking messenger
478      * is no-longer referenced by any canonical. In addition it may be expensive to make and so we want to keep it for a while
479      * anyway. As a result, instead of keeping a blocking messenger open as long as there is a canonical, we do the opposite: we
480      * keep the canonical (owner, here) as long as the blocking messenger is open (and usually beyond, memory allowing). How long
481      * a blocking messenger will stay around depends upon that messenger's implementation. That may even be left up to the GC, in
482      * the end (if close is not needed AND the messenger is cheap to make). In that case, the owner is likely the only referrer,
483      * and so both will have the same lifetime.
484      *
485      * @param owner The object that should be kept referenced at least as long as this one.
486      */
487     public void setOwner(Object owner) {
488         this.owner = owner;
489     }
490
491     /**
492      * Assemble a destination address for a message based upon the messenger
493      * default destination address and the optional provided overrides.
494      *
495      * @param service The destination service or {@code null} to use default.
496      * @param serviceParam The destination service parameter or {@code null} to 
497      * use default.
498      */
499     protected EndpointAddress getDestAddressToUse(String service, String serviceParam) {
500         EndpointAddress defaultAddress = getDestinationAddress();
501         EndpointAddress result;
502         
503         if(null == service) {
504             if(null == serviceParam) {
505                 // Use default service name and service params
506                 result = defaultAddress;
507             } else {
508                 // use default service name, override service params
509                 result = new EndpointAddress(defaultAddress, defaultAddress.getServiceName(), serviceParam);
510             }
511         } else {
512             if(null == serviceParam) {
513                 // override service name, use default service params (this one is pretty weird and probably not useful)
514                 result = new EndpointAddress(defaultAddress, service, defaultAddress.getServiceParameter());
515             } else {
516                 // override service name, override service params
517                 result = new EndpointAddress(defaultAddress, service, serviceParam);
518             }
519         }
520         
521         return result;
522     }
523
524     /**
525      * A transport may call this to cause an orderly closure of its messengers.
526      */
527     protected final void shutdown() {
528         DeferredAction action;
529
530         synchronized (stateMachine) {
531             stateMachine.shutdownEvent();
532             action = eventCalled();
533         }
534
535         // We called an event. State may have changed.
536         notifyChange();
537
538         performDeferredAction(action);
539     }
540
541     /**
542      * {@inheritDoc}
543      * <p/>
544      * We overload isClosed because many messengers still use super.isClosed()
545      * as part of their own implementation or don't override it at all. They
546      * expect it to be true only when all is shutdown; not while we're closing
547      * gently.
548      *
549      * FIXME - jice@jxta.org 20040413: transports should get a deeper retrofit eventually.
550      */
551     @Override
552     public boolean isClosed() {
553         return (!lieToOldTransports) && super.isClosed();
554     }
555
556     /**
557      * {@inheritDoc}
558      * <p/>
559      * getLogicalDestinationAddress() requires resolution (it's the address advertised by the other end).
560      * For a blocking messenger it's easy. We're born resolved. So, just ask the implementor what it is.
561      */
562     public final EndpointAddress getLogicalDestinationAddress() {
563         return getLogicalDestinationImpl();
564     }
565
566     /**
567      * {@inheritDoc}
568      *
569      * <p/> Some transports historically overload the close method of BlockingMessenger.
570      * The final is there to make sure we know about it. However, there should be no
571      * harm done if the unchanged code is renamed to closeImpl; even if it calls super.close().
572      * The real problem, however, is transports calling close (was their own, but now it means this one), when
573      * they want to break. It will make things look like someone just called close, but it will not
574      * actually break anything. However, that will cause the state machine to go through the close process.
575      * this will end up calling closeImpl(). That will do.
576      */
577     public final void close() {
578         DeferredAction action;
579
580         synchronized (stateMachine) {
581             stateMachine.closeEvent();
582
583             action = eventCalled();
584         }
585
586         // We called an event. State may have changed.
587         notifyChange();
588
589         performDeferredAction(action);
590     }
591
592     /**
593      * {@inheritDoc}
594      */
595     public void sendMessageB(Message msg, String service, String serviceParam) throws IOException {
596
597         DeferredAction action;
598
599         synchronized (stateMachine) {
600             try {
601                 while ((currentMessage != null) && !inputClosed) {
602                     stateMachine.wait();
603                 }
604             } catch (InterruptedException ie) {
605                 throw new InterruptedIOException();
606             }
607
608             if (inputClosed) {
609                 throw new IOException("Messenger is closed. It cannot be used to send messages");
610             }
611
612             // We store the four elements of a pending msg separately. We do not want to pour millions of tmp objects on the GC for
613             // nothing.
614
615             storeCurrent(msg, service, serviceParam);
616             stateMachine.saturatedEvent();
617             action = eventCalled();
618         }
619
620         notifyChange(); // We called an event. State may have changed.
621         performDeferredAction(action); // We called an event. There may be an action. (start, normally).
622
623         // After deferred action, the message was either sent or failed. (done by this thread).
624         // We can tell because, if failed, the currentMessage is still our msg.
625         Throwable failure = null;
626
627         synchronized (stateMachine) {
628             if (currentMessage == msg) {
629                 failure = currentThrowable;
630                 if (failure == null) {
631                     failure = new IOException("Unknown error");
632                 }
633                 // Ok, let it go, now.
634                 storeCurrent(null, null, null);
635             } // Else, don't touch currentMsg; it's not our msg.
636         }
637
638         if (failure == null) {
639             // No failure. Report ultimate succes.
640             msg.setMessageProperty(Messenger.class, OutgoingMessageEvent.SUCCESS);
641             return;
642         }
643
644         // Failure. See how we can manage to throw it.
645         if (failure instanceof IOException) {
646             throw (IOException) failure;
647         }
648         if (failure instanceof RuntimeException) {
649             throw (RuntimeException) failure;
650         }
651         if (failure instanceof Error) {
652             throw (Error) failure;
653         }
654
655         IOException failed = new IOException("Failure sending message");
656         failed.initCause(failure);
657         throw failed;
658     }
659
660     /**
661      * {@inheritDoc}
662      */
663     public final boolean sendMessageN(Message msg, String service, String serviceParam) {
664
665         boolean queued = false;
666         DeferredAction action = DeferredAction.ACTION_NONE;
667         boolean closed;
668
669         synchronized (stateMachine) {
670             closed = inputClosed;
671             if ((!closed) && (currentMessage == null)) {
672                 // We copy the four elements of a pending msg right here. We do not want to pour millions of tmp objects on the GC.
673                 storeCurrent(msg, service, serviceParam);
674                 stateMachine.saturatedEvent();
675                 action = eventCalled();
676                 queued = true;
677             }
678         }
679
680         if (queued) {
681             notifyChange(); // We called an event. State may have changed.
682             performDeferredAction(action); // We called an event. There may be an action. (start, normally).
683
684             // After deferred action, the message was either sent or failed. (done by this thread).
685             // We can tell because, if failed, the currentMessage is still our msg.
686             synchronized (stateMachine) {
687                 if (currentMessage == msg) {
688                     if (currentThrowable == null) {
689                         currentThrowable = new IOException("Unknown error");
690                     }
691                     msg.setMessageProperty(Message.class, currentThrowable);
692                     // Ok, let it go, now.
693                     storeCurrent(null, null, null);
694                 } else {
695                     msg.setMessageProperty(Message.class, OutgoingMessageEvent.SUCCESS);
696                     // Don't touch the current msg; it's not our msg.
697                 }
698             }
699             // Yes, we return true in either case. sendMessageN is supposed to be async. If a message fails
700             // after it was successfuly queued, the error is not reported by the return value, but only by
701             // the message property (and select). Just making sure the behaviour is as normal as can be
702             // even it means suppressing some information.
703
704             return true;
705         }
706
707         // Not queued. Either closed, or currently sending. If inputClosed, that's what we report.
708         msg.setMessageProperty(Messenger.class,
709                 closed ?
710                         new OutgoingMessageEvent(msg, new IOException("This messenger is closed. " + "It cannot be used to send messages.")) :
711                         OutgoingMessageEvent.OVERFLOW);
712         return false;
713     }
714
715     /**
716      * {@inheritDoc}
717      */
718     public final void resolve() {// We're born resolved. Don't bother calling the event.
719     }
720
721     /**
722      * {@inheritDoc}
723      */
724     public final int getState() {
725         return stateMachine.getState();
726     }
727
728     /**
729      * {@inheritDoc}
730      */
731     public final Messenger getChannelMessenger(PeerGroupID redirection, String service, String serviceParam) {
732
733         // Our transport is always in the same group. If the channel's target group is the same, no group
734         // redirection is ever needed.
735
736         return new BlockingMessengerChannel(getDestinationAddress(),
737                 homeGroupID.equals(redirection) ? null : redirection, service,
738                 serviceParam);
739     }
740
741     /**
742      * Three exposed methods may need to inject new events in the system: sendMessageN, close, and shutdown. Since they can both
743      * cause actions, and since connectAction and startAction are deferred, it seems possible that one of the
744      * actions caused by send, close, or shutdown be called while connectAction or startAction are in progress.
745      *
746      * However, the state machine gives us a few guarantees: connectAction and startAction can never nest. We will not be
747      * asked to perform one while still performing the other. Only the synchronous actions closeInput, closeOutput, or
748      * failAll can possibly be requested in the interval. We could make more assumptions and simplify the code, but rather
749      * keep at least some flexibility.
750      */
751
752     private void performDeferredAction(DeferredAction action) {
753         switch (action) {
754             case ACTION_SEND:
755                 sendIt();
756                 break;
757
758             case ACTION_CONNECT:
759                 cantConnect();
760                 break;
761         }
762     }
763
764     /**
765      * A shortHand for a frequently used sequence. MUST be called while synchronized on stateMachine.
766      *
767      * @return the deferred action.
768      */
769     private DeferredAction eventCalled() {
770         DeferredAction action = deferredAction;
771
772         deferredAction = DeferredAction.ACTION_NONE;
773         stateMachine.notifyAll();
774         return action;
775     }
776
777     /**
778      * Performs the ACTION_SEND deferred action: sends the one msg in our one msg queue.
779      * This method *never* sets the outcome message property. This is left to sendMessageN and sendMessageB, because
780      * sendMessageB does not want to set it in any other case than success, while sendMessageN does it in all cases.
781      * The problem with that is: how do we communicate the outcome to sendMessage{NB} without having to keep
782      * the 1 msg queue locked until then (which would be in contradiction with how we interact with the state machine).
783      * To make it really inexpensive, here's the trick: when a message fails currentMessage and currentFailure remain.
784      * So the sendMessageRoutine can check them and known that it is its message and not another one that caused the
785      * failure. If all is well, currentMessage and currentFailure are nulled and if another message is send immediately
786      * sendMessage is able to see that its own message was processed fully. (this is a small cheat regarding the
787      * state of saturation after failall, but that's not actually detectable from the outside: input is closed
788      * before failall anyway. See failall for that part.
789      */
790     private void sendIt() {
791
792         if (currentMessage == null) {
793             if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
794                 LOG.severe("Internal error. Asked to send with no message.");
795             }
796             return;
797         }
798
799         DeferredAction action;
800
801         try {
802             sendMessageBImpl(currentMessage, currentService, currentParam);
803         } catch (Throwable any) {
804             // Did not work. We report the link down and let the state machine tell us when to fail the msg.  It is assumed that
805             // when this happens, the cnx is already down.  FIXME - jice@jxta.org 20040413: check with the various kind of funky
806             // exception. Some may not mean the link is down
807             synchronized (stateMachine) {
808                 currentThrowable = any;
809                 stateMachine.downEvent();
810                 action = eventCalled();
811             }
812             notifyChange();
813             performDeferredAction(action); // we expect connect but let the state machine decide.
814             return;
815         }
816
817         // Worked.
818
819         synchronized (stateMachine) {
820             storeCurrent(null, null, null);
821             stateMachine.idleEvent();
822             action = eventCalled();
823         }
824
825         // We did go from non-idle to idle. Report it.
826         notifyChange();
827
828         performDeferredAction(action); // should be none but let the state machine decide.
829     }
830
831     /**
832      * Performs the ACTION_CONNECT deferred action: generate a downEvent since we cannot reconnect.
833      */
834     private void cantConnect() {
835         DeferredAction action;
836
837         synchronized (stateMachine) {
838             stateMachine.downEvent();
839             action = eventCalled();
840         }
841         notifyChange();
842         performDeferredAction(action); // should be none but let the state machine decide.
843     }
844
845     /*
846      * Abstract methods to be provided by implementer (a transport for example).
847      * To adapt legacy transport, keep extending BlockingMessenger and just rename your close, isIdle, sendMessage and
848      * getLogicalDestinationAddress methods to closeImpl, isIdleImpl, sendMessageBImpl, and getLogicalDestinationImpl, respectively.
849      */
850
851     /**
852      * Close connection. May fail current send.
853      */
854     protected abstract void closeImpl();
855
856     /**
857      * Send a message blocking as needed until the message is sent.
858      *
859      * @param message The message to send.
860      * @param service The destination service.
861      * @param param   The destination serivce param.
862      * @throws IOException Thrown for errors encountered while sending the message.
863      */
864     protected abstract void sendMessageBImpl(Message message, String service, String param) throws IOException;
865
866     /**
867      * return true if this messenger has not been used for a long time. The definition of long time is: "sufficient such that closing it
868      * is worth the cost of having to re-open". A messenger should self close if it thinks it meets the definition of
869      * idle. BlockingMessenger leaves the evaluation to the transport but does the work automatically. <b>Important:</b> if
870      * self destruction is used, this method must work; not just return false. See the constructor. In general, if closeImpl does
871      * not need to do anything, then self destruction is not needed.
872      *
873      * @return {@code true} if theis messenger is, by it's own definition, idle.
874      */
875     protected abstract boolean isIdleImpl();
876
877     /**
878      * Obtain the logical destination address from the implementer (a transport for example).
879      */
880     protected abstract EndpointAddress getLogicalDestinationImpl();
881 }