2 * Copyright (c) 2004-2007 Sun Microsystems, Inc. All rights reserved.
4 * The Sun Project JXTA(TM) Software License
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
9 * 1. Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
16 * 3. The end-user documentation included with the redistribution, if any, must
17 * include the following acknowledgment: "This product includes software
18 * developed by Sun Microsystems, Inc. for JXTA(TM) technology."
19 * Alternately, this acknowledgment may appear in the software itself, if
20 * and wherever such third-party acknowledgments normally appear.
22 * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must
23 * not be used to endorse or promote products derived from this software
24 * without prior written permission. For written permission, please contact
25 * Project JXTA at http://www.jxta.org.
27 * 5. Products derived from this software may not be called "JXTA", nor may
28 * "JXTA" appear in their name, without prior written permission of Sun.
30 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
31 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
32 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN
33 * MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
34 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
36 * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
37 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
38 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
39 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41 * JXTA is a registered trademark of Sun Microsystems, Inc. in the United
42 * States and other countries.
44 * Please see the license information page at :
45 * <http://www.jxta.org/project/www/license.html> for instructions on use of
46 * the license in source files.
48 * ====================================================================
50 * This software consists of voluntary contributions made by many individuals
51 * on behalf of Project JXTA. For more information on Project JXTA, please see
52 * http://www.jxta.org.
54 * This license is based on the BSD license adopted by the Apache Foundation.
56 package net.jxta.impl.endpoint;
58 import java.util.Timer;
59 import java.util.TimerTask;
61 import java.io.IOException;
62 import java.io.InterruptedIOException;
64 import java.util.logging.Level;
66 import net.jxta.logging.Logging;
68 import java.util.logging.Logger;
70 import net.jxta.endpoint.AbstractMessenger;
71 import net.jxta.endpoint.ChannelMessenger;
72 import net.jxta.endpoint.EndpointAddress;
73 import net.jxta.endpoint.Message;
74 import net.jxta.endpoint.Messenger;
75 import net.jxta.endpoint.MessengerState;
76 import net.jxta.endpoint.OutgoingMessageEvent;
77 import net.jxta.peergroup.PeerGroupID;
78 import net.jxta.util.SimpleSelectable;
80 import net.jxta.impl.util.TimeUtils;
83 * This class is a near-drop-in replacement for the previous BlockingMessenger class.
84 * To subclassers (that is, currently, transports) the only difference is that some
85 * overloaded methods have a different name (class hierarchy reasons made it impossible
86 * to preserve the names without forcing an API change for applications).
88 * The other difference which is not API visible, is that it implements the
89 * standard MessengerState behaviour and semantics required by the changes in the endpoint framework.
91 * This the only base messenger class meant to be extended by outside code that is in the impl tree. The
92 * reason being that what it replaces was there already and that new code should not become dependant upon it.
94 public abstract class BlockingMessenger extends AbstractMessenger {
99 private final static transient Logger LOG = Logger.getLogger(BlockingMessenger.class.getName());
102 * The self destruct timer.
104 * When a messenger has become idle, it is closed. As a side effect, it
105 * makes the owning canonical messenger, if any, subject to removal if it is
106 * otherwise unreferenced.
108 private final static transient Timer timer = new Timer("BlockingMessenger self destruct timer", true);
111 * Actions that we defer to after returning from event methods. In other
112 * words, they cannot be done with the lock held, or they require calling
113 * more event methods. Because this messenger can take only one message at
114 * a time (saturated while sending), actions do not cascade much. Start can
115 * lead to connect if the sending fails, but, because we always fail to
116 * connect, connect will not lead to start. As a result we can get away with
117 * performing deferred actions recursively. That simplifies the code.
119 private enum DeferredAction {
121 * No deferred action.
131 * Must report failure to connect.
137 * The outstanding message.
139 private Message currentMessage = null;
142 * The serviceName override for that message.
144 private String currentService = null;
147 * The serviceParam override for that message.
149 private String currentParam = null;
152 * The exception that caused that message to not be sent.
154 private Throwable currentThrowable = null;
157 * true if we have deliberately closed our one message input queue.
159 private boolean inputClosed = false;
162 * Need to know which group this transport lives in, so that we can suppress
163 * channel redirection when in the same group. This is currently the norm.
165 private final PeerGroupID homeGroupID;
168 * The current deferred action.
170 private DeferredAction deferredAction = DeferredAction.ACTION_NONE;
173 * Reference to owning object. This is there so that the owning object is not subject to garbage collection
174 * unless this object here becomes itself unreferenced. That happens when the self destruct timer closed it.
176 private Object owner = null;
179 * The timer task watching over our self destruction requirement.
181 private final TimerTask selfDestructTask;
184 * State lock and engine.
186 private final BlockingMessengerState stateMachine = new BlockingMessengerState();
189 * legacy artefact: transports need to believe the messenger is not yet closed in order to actually close it.
190 * So we lie to them just while we run their closeImpl method so that they do not see that the messenger is
193 private boolean lieToOldTransports = false;
196 * Our statemachine implementation; just connects the standard AbstractMessengerState action methods to
199 private class BlockingMessengerState extends MessengerState {
201 protected BlockingMessengerState() {
206 * The required action methods.
213 protected void connectAction() {
214 deferredAction = DeferredAction.ACTION_CONNECT;
221 protected void startAction() {
222 deferredAction = DeferredAction.ACTION_SEND;
229 protected void closeInputAction() {
230 // we're synchonized here. (invoked from stateMachine).
238 protected void closeOutputAction() {
239 // This will break the cnx; thereby causing a down event if we have a send in progress.
240 // If the cnx does not break before the current message is sent, then the message will be sent successfully,
241 // resulting in an idle event. Either of these events is enough to complete the shutdown process.
242 lieToOldTransports = true;
244 lieToOldTransports = false;
246 // Disconnect from the timer.
247 if (selfDestructTask != null) {
248 selfDestructTask.cancel();
252 // This is a synchronous action. No synchronization needed: we're already synchronized, here.
253 // There's a subtlety here: we do not clear the current message. We let sendMessageB or sendMessageN
254 // deal with it, so that they can handle the status reporting each in their own way. So basically, all we
255 // do is to set a reason for that message to fail in case we are shutdown from the outside and that message
256 // is not sent yet. As long as there is a current message, it is guaranteed that there is a thread
257 // in charge of reporting its status. It is also guaranteed that when failAll is called, the input is
258 // already closed, and so, we have no obligation of making room for future messages immediately.
259 // All this aggravation is so that we do not have to create one context wrapper for each message just so
260 // that we can associate it with its result. Instead we use our single msg and single status model
263 protected void failAllAction() {
265 if (currentMessage == null) {
269 if (currentThrowable == null) {
270 currentThrowable = new IOException("Messenger unexpectedly closed");
277 * The implementation of channel messenger that getChannelMessenger returns:
278 * All it does is address rewriting. Even close() is forwarded to the shared messenger.
279 * The reason is that BlockingMessengers are not really shared; they're transitional
280 * entities used directly by CanonicalMessenger. GetChannel is used only to provide address
281 * rewriting when we pass a blocking messenger directly to incoming messenger listeners...this
282 * practice is to be removed in the future, in favor of making incoming messengers full-featured
283 * async messengers that can be shared.
285 private final class BlockingMessengerChannel extends ChannelMessenger {
287 public BlockingMessengerChannel(EndpointAddress baseAddress, PeerGroupID redirection, String origService, String origServiceParam) {
289 super(baseAddress, redirection, origService, origServiceParam);
291 // We tell our super class that we synchronize on the stateMachine object. Althoug it is not obvious, our getState()
292 // method calls the shared messenger getState() method, which synchronizes on the shared messenger's state machine
293 // object. So, that's what we must specify. Logic would dictate that we pass it to super(), but it is not itself
294 // constructed until super() returns. No way around it.
296 setStateLock(stateMachine);
302 public int getState() {
303 return BlockingMessenger.this.getState();
309 public void resolve() {
310 BlockingMessenger.this.resolve();
316 public void close() {
317 BlockingMessenger.this.close();
324 * Address rewriting done here.
326 public boolean sendMessageN(Message msg, String service, String serviceParam) {
327 return BlockingMessenger.this.sendMessageN(msg, effectiveService(service), effectiveParam(service, serviceParam));
334 * Address rewriting done here.
336 public void sendMessageB(Message msg, String service, String serviceParam) throws IOException {
337 BlockingMessenger.this.sendMessageB(msg, effectiveService(service), effectiveParam(service, serviceParam));
344 * We're supposed to return the complete destination, including
345 * service and param specific to that channel. It is not clear, whether
346 * this should include the cross-group mangling, though. For now, let's
349 public EndpointAddress getLogicalDestinationAddress() {
350 EndpointAddress rawLogical = getLogicalDestinationImpl();
352 if (rawLogical == null) {
355 return new EndpointAddress(rawLogical, origService, origServiceParam);
358 // Check if it is worth staying registered
359 public void itemChanged(Object changedObject) {
361 if (!notifyChange()) {
362 if (haveListeners()) {
366 BlockingMessenger.this.unregisterListener(this);
368 if (!haveListeners()) {
372 // Ooops collision. We should not have unregistered. Next time, then. In case of collision, the end result
373 // is always to stay registered. There's no harm in staying registered.
374 BlockingMessenger.this.registerListener(this);
381 * Always make sure we're registered with the shared messenger.
384 protected void registerListener(SimpleSelectable l) {
385 BlockingMessenger.this.registerListener(this);
386 super.registerListener(l);
390 private void storeCurrent(Message msg, String service, String param) {
391 currentMessage = msg;
392 currentService = service;
393 currentParam = param;
394 currentThrowable = null;
400 * We start in the CONNECTED state, we pretend to have a queue of size 1, and we can never re-connect. Although this
401 * messenger fully respects the asynchronous semantics, it is saturated as soon as one msg is being send, and if not
402 * saturated, send is actually performed by the invoker thread. So return is not completely immediate. This is a barely
403 * acceptable implementation, but this is also a transition adapter that is bound to disappear one release from now. The main
404 * goal is to get things going until transports are adapted.
406 * @param homeGroupID the group that this messenger works for. This is the group of the endpoint service or transport
407 * that created this messenger.
408 * @param dest where messages should be addressed to
409 * @param selfDestruct true if this messenger must self close destruct when idle. <b>Warning:</b> If selfDestruct is used,
410 * this messenger will remained referenced for as long as isIdleImpl returns false.
413 public BlockingMessenger(PeerGroupID homeGroupID, EndpointAddress dest, boolean selfDestruct) {
417 this.homeGroupID = homeGroupID;
419 // We tell our superclass that we synchronize our state on the stateMachine object. Logic would dictate that we pass it
420 // to super(), but it is not itself constructed until super() returns. No way around it.
422 setStateLock(stateMachine);
425 * Sets up a timer task that will close this messenger if it says to have become idle. It will keep it referenced
428 * As long as this timer task is scheduled, this messenger is not subject to GC. Therefore, its owner, if any, which is strongly
429 * referenced, is not subject to GC either. This avoids prematurely closing open connections just because a destination is
430 * not currently in use, which we would have to do if CanonicalMessengers could be GCed independantly (and which would
431 * force us to use finalizers, too).<p/>
433 * Such a mechanism is usefull only if this blocking messenger is expensive to make or holds system resources that require
434 * an explicit invocation of the close method. Else, it is better to let it be GC'ed along with any refering canonical
435 * messenger when memory is tight.<p/>
440 // FIXME 20040413 jice : we trust transports to implement isIdle reasonably, which may be a leap of faith. We
441 // should probably superimpose a time limit of our own.
444 selfDestructTask = new TimerTask() {
457 } catch (Throwable uncaught) {
458 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
459 LOG.log(Level.SEVERE, "Uncaught Throwable in selfDescructTask. ", uncaught);
466 timer.schedule(selfDestructTask, TimeUtils.AMINUTE, TimeUtils.AMINUTE);
468 selfDestructTask = null;
473 * Sets an owner for this blocking messenger. Owners are normally canonical messengers. The goal of registering the owner is
474 * to keep that owner reachable as long as this blocking messenger is. Canonical messengers are otherwise softly referenced,
475 * and so, may be deleted whenever memory is tight.
477 * We do not want to use finalizers or the equivalent reference queue mechanism; so we have no idea when a blocking messenger
478 * is no-longer referenced by any canonical. In addition it may be expensive to make and so we want to keep it for a while
479 * anyway. As a result, instead of keeping a blocking messenger open as long as there is a canonical, we do the opposite: we
480 * keep the canonical (owner, here) as long as the blocking messenger is open (and usually beyond, memory allowing). How long
481 * a blocking messenger will stay around depends upon that messenger's implementation. That may even be left up to the GC, in
482 * the end (if close is not needed AND the messenger is cheap to make). In that case, the owner is likely the only referrer,
483 * and so both will have the same lifetime.
485 * @param owner The object that should be kept referenced at least as long as this one.
487 public void setOwner(Object owner) {
492 * Assemble a destination address for a message based upon the messenger
493 * default destination address and the optional provided overrides.
495 * @param service The destination service or {@code null} to use default.
496 * @param serviceParam The destination service parameter or {@code null} to
499 protected EndpointAddress getDestAddressToUse(String service, String serviceParam) {
500 EndpointAddress defaultAddress = getDestinationAddress();
501 EndpointAddress result;
503 if(null == service) {
504 if(null == serviceParam) {
505 // Use default service name and service params
506 result = defaultAddress;
508 // use default service name, override service params
509 result = new EndpointAddress(defaultAddress, defaultAddress.getServiceName(), serviceParam);
512 if(null == serviceParam) {
513 // override service name, use default service params (this one is pretty weird and probably not useful)
514 result = new EndpointAddress(defaultAddress, service, defaultAddress.getServiceParameter());
516 // override service name, override service params
517 result = new EndpointAddress(defaultAddress, service, serviceParam);
525 * A transport may call this to cause an orderly closure of its messengers.
527 protected final void shutdown() {
528 DeferredAction action;
530 synchronized (stateMachine) {
531 stateMachine.shutdownEvent();
532 action = eventCalled();
535 // We called an event. State may have changed.
538 performDeferredAction(action);
544 * We overload isClosed because many messengers still use super.isClosed()
545 * as part of their own implementation or don't override it at all. They
546 * expect it to be true only when all is shutdown; not while we're closing
549 * FIXME - jice@jxta.org 20040413: transports should get a deeper retrofit eventually.
552 public boolean isClosed() {
553 return (!lieToOldTransports) && super.isClosed();
559 * getLogicalDestinationAddress() requires resolution (it's the address advertised by the other end).
560 * For a blocking messenger it's easy. We're born resolved. So, just ask the implementor what it is.
562 public final EndpointAddress getLogicalDestinationAddress() {
563 return getLogicalDestinationImpl();
569 * <p/> Some transports historically overload the close method of BlockingMessenger.
570 * The final is there to make sure we know about it. However, there should be no
571 * harm done if the unchanged code is renamed to closeImpl; even if it calls super.close().
572 * The real problem, however, is transports calling close (was their own, but now it means this one), when
573 * they want to break. It will make things look like someone just called close, but it will not
574 * actually break anything. However, that will cause the state machine to go through the close process.
575 * this will end up calling closeImpl(). That will do.
577 public final void close() {
578 DeferredAction action;
580 synchronized (stateMachine) {
581 stateMachine.closeEvent();
583 action = eventCalled();
586 // We called an event. State may have changed.
589 performDeferredAction(action);
595 public void sendMessageB(Message msg, String service, String serviceParam) throws IOException {
597 DeferredAction action;
599 synchronized (stateMachine) {
601 while ((currentMessage != null) && !inputClosed) {
604 } catch (InterruptedException ie) {
605 throw new InterruptedIOException();
609 throw new IOException("Messenger is closed. It cannot be used to send messages");
612 // We store the four elements of a pending msg separately. We do not want to pour millions of tmp objects on the GC for
615 storeCurrent(msg, service, serviceParam);
616 stateMachine.saturatedEvent();
617 action = eventCalled();
620 notifyChange(); // We called an event. State may have changed.
621 performDeferredAction(action); // We called an event. There may be an action. (start, normally).
623 // After deferred action, the message was either sent or failed. (done by this thread).
624 // We can tell because, if failed, the currentMessage is still our msg.
625 Throwable failure = null;
627 synchronized (stateMachine) {
628 if (currentMessage == msg) {
629 failure = currentThrowable;
630 if (failure == null) {
631 failure = new IOException("Unknown error");
633 // Ok, let it go, now.
634 storeCurrent(null, null, null);
635 } // Else, don't touch currentMsg; it's not our msg.
638 if (failure == null) {
639 // No failure. Report ultimate succes.
640 msg.setMessageProperty(Messenger.class, OutgoingMessageEvent.SUCCESS);
644 // Failure. See how we can manage to throw it.
645 if (failure instanceof IOException) {
646 throw (IOException) failure;
648 if (failure instanceof RuntimeException) {
649 throw (RuntimeException) failure;
651 if (failure instanceof Error) {
652 throw (Error) failure;
655 IOException failed = new IOException("Failure sending message");
656 failed.initCause(failure);
663 public final boolean sendMessageN(Message msg, String service, String serviceParam) {
665 boolean queued = false;
666 DeferredAction action = DeferredAction.ACTION_NONE;
669 synchronized (stateMachine) {
670 closed = inputClosed;
671 if ((!closed) && (currentMessage == null)) {
672 // We copy the four elements of a pending msg right here. We do not want to pour millions of tmp objects on the GC.
673 storeCurrent(msg, service, serviceParam);
674 stateMachine.saturatedEvent();
675 action = eventCalled();
681 notifyChange(); // We called an event. State may have changed.
682 performDeferredAction(action); // We called an event. There may be an action. (start, normally).
684 // After deferred action, the message was either sent or failed. (done by this thread).
685 // We can tell because, if failed, the currentMessage is still our msg.
686 synchronized (stateMachine) {
687 if (currentMessage == msg) {
688 if (currentThrowable == null) {
689 currentThrowable = new IOException("Unknown error");
691 msg.setMessageProperty(Message.class, currentThrowable);
692 // Ok, let it go, now.
693 storeCurrent(null, null, null);
695 msg.setMessageProperty(Message.class, OutgoingMessageEvent.SUCCESS);
696 // Don't touch the current msg; it's not our msg.
699 // Yes, we return true in either case. sendMessageN is supposed to be async. If a message fails
700 // after it was successfuly queued, the error is not reported by the return value, but only by
701 // the message property (and select). Just making sure the behaviour is as normal as can be
702 // even it means suppressing some information.
707 // Not queued. Either closed, or currently sending. If inputClosed, that's what we report.
708 msg.setMessageProperty(Messenger.class,
710 new OutgoingMessageEvent(msg, new IOException("This messenger is closed. " + "It cannot be used to send messages.")) :
711 OutgoingMessageEvent.OVERFLOW);
718 public final void resolve() {// We're born resolved. Don't bother calling the event.
724 public final int getState() {
725 return stateMachine.getState();
731 public final Messenger getChannelMessenger(PeerGroupID redirection, String service, String serviceParam) {
733 // Our transport is always in the same group. If the channel's target group is the same, no group
734 // redirection is ever needed.
736 return new BlockingMessengerChannel(getDestinationAddress(),
737 homeGroupID.equals(redirection) ? null : redirection, service,
742 * Three exposed methods may need to inject new events in the system: sendMessageN, close, and shutdown. Since they can both
743 * cause actions, and since connectAction and startAction are deferred, it seems possible that one of the
744 * actions caused by send, close, or shutdown be called while connectAction or startAction are in progress.
746 * However, the state machine gives us a few guarantees: connectAction and startAction can never nest. We will not be
747 * asked to perform one while still performing the other. Only the synchronous actions closeInput, closeOutput, or
748 * failAll can possibly be requested in the interval. We could make more assumptions and simplify the code, but rather
749 * keep at least some flexibility.
752 private void performDeferredAction(DeferredAction action) {
765 * A shortHand for a frequently used sequence. MUST be called while synchronized on stateMachine.
767 * @return the deferred action.
769 private DeferredAction eventCalled() {
770 DeferredAction action = deferredAction;
772 deferredAction = DeferredAction.ACTION_NONE;
773 stateMachine.notifyAll();
778 * Performs the ACTION_SEND deferred action: sends the one msg in our one msg queue.
779 * This method *never* sets the outcome message property. This is left to sendMessageN and sendMessageB, because
780 * sendMessageB does not want to set it in any other case than success, while sendMessageN does it in all cases.
781 * The problem with that is: how do we communicate the outcome to sendMessage{NB} without having to keep
782 * the 1 msg queue locked until then (which would be in contradiction with how we interact with the state machine).
783 * To make it really inexpensive, here's the trick: when a message fails currentMessage and currentFailure remain.
784 * So the sendMessageRoutine can check them and known that it is its message and not another one that caused the
785 * failure. If all is well, currentMessage and currentFailure are nulled and if another message is send immediately
786 * sendMessage is able to see that its own message was processed fully. (this is a small cheat regarding the
787 * state of saturation after failall, but that's not actually detectable from the outside: input is closed
788 * before failall anyway. See failall for that part.
790 private void sendIt() {
792 if (currentMessage == null) {
793 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
794 LOG.severe("Internal error. Asked to send with no message.");
799 DeferredAction action;
802 sendMessageBImpl(currentMessage, currentService, currentParam);
803 } catch (Throwable any) {
804 // Did not work. We report the link down and let the state machine tell us when to fail the msg. It is assumed that
805 // when this happens, the cnx is already down. FIXME - jice@jxta.org 20040413: check with the various kind of funky
806 // exception. Some may not mean the link is down
807 synchronized (stateMachine) {
808 currentThrowable = any;
809 stateMachine.downEvent();
810 action = eventCalled();
813 performDeferredAction(action); // we expect connect but let the state machine decide.
819 synchronized (stateMachine) {
820 storeCurrent(null, null, null);
821 stateMachine.idleEvent();
822 action = eventCalled();
825 // We did go from non-idle to idle. Report it.
828 performDeferredAction(action); // should be none but let the state machine decide.
832 * Performs the ACTION_CONNECT deferred action: generate a downEvent since we cannot reconnect.
834 private void cantConnect() {
835 DeferredAction action;
837 synchronized (stateMachine) {
838 stateMachine.downEvent();
839 action = eventCalled();
842 performDeferredAction(action); // should be none but let the state machine decide.
846 * Abstract methods to be provided by implementer (a transport for example).
847 * To adapt legacy transport, keep extending BlockingMessenger and just rename your close, isIdle, sendMessage and
848 * getLogicalDestinationAddress methods to closeImpl, isIdleImpl, sendMessageBImpl, and getLogicalDestinationImpl, respectively.
852 * Close connection. May fail current send.
854 protected abstract void closeImpl();
857 * Send a message blocking as needed until the message is sent.
859 * @param message The message to send.
860 * @param service The destination service.
861 * @param param The destination serivce param.
862 * @throws IOException Thrown for errors encountered while sending the message.
864 protected abstract void sendMessageBImpl(Message message, String service, String param) throws IOException;
867 * return true if this messenger has not been used for a long time. The definition of long time is: "sufficient such that closing it
868 * is worth the cost of having to re-open". A messenger should self close if it thinks it meets the definition of
869 * idle. BlockingMessenger leaves the evaluation to the transport but does the work automatically. <b>Important:</b> if
870 * self destruction is used, this method must work; not just return false. See the constructor. In general, if closeImpl does
871 * not need to do anything, then self destruction is not needed.
873 * @return {@code true} if theis messenger is, by it's own definition, idle.
875 protected abstract boolean isIdleImpl();
878 * Obtain the logical destination address from the implementer (a transport for example).
880 protected abstract EndpointAddress getLogicalDestinationImpl();