2 * Copyright (c) 2001-2007 Sun Microsystems, Inc. All rights reserved.
4 * The Sun Project JXTA(TM) Software License
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
9 * 1. Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
16 * 3. The end-user documentation included with the redistribution, if any, must
17 * include the following acknowledgment: "This product includes software
18 * developed by Sun Microsystems, Inc. for JXTA(TM) technology."
19 * Alternately, this acknowledgment may appear in the software itself, if
20 * and wherever such third-party acknowledgments normally appear.
22 * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must
23 * not be used to endorse or promote products derived from this software
24 * without prior written permission. For written permission, please contact
25 * Project JXTA at http://www.jxta.org.
27 * 5. Products derived from this software may not be called "JXTA", nor may
28 * "JXTA" appear in their name, without prior written permission of Sun.
30 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
31 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
32 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN
33 * MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
34 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
36 * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
37 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
38 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
39 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41 * JXTA is a registered trademark of Sun Microsystems, Inc. in the United
42 * States and other countries.
44 * Please see the license information page at :
45 * <http://www.jxta.org/project/www/license.html> for instructions on use of
46 * the license in source files.
48 * ====================================================================
50 * This software consists of voluntary contributions made by many individuals
51 * on behalf of Project JXTA. For more information on Project JXTA, please see
52 * http://www.jxta.org.
54 * This license is based on the BSD license adopted by the Apache Foundation.
56 package net.jxta.impl.endpoint;
58 import net.jxta.discovery.DiscoveryService;
59 import net.jxta.document.Advertisement;
60 import net.jxta.document.AdvertisementFactory;
61 import net.jxta.document.MimeMediaType;
62 import net.jxta.document.StructuredDocument;
63 import net.jxta.document.StructuredDocumentFactory;
64 import net.jxta.document.StructuredDocumentUtils;
65 import net.jxta.document.XMLDocument;
66 import net.jxta.document.XMLElement;
67 import net.jxta.endpoint.ChannelMessenger;
68 import net.jxta.endpoint.EndpointAddress;
69 import net.jxta.endpoint.EndpointListener;
70 import net.jxta.endpoint.EndpointService;
71 import net.jxta.endpoint.Message;
72 import net.jxta.endpoint.MessageElement;
73 import net.jxta.endpoint.MessageFilterListener;
74 import net.jxta.endpoint.MessagePropagater;
75 import net.jxta.endpoint.MessageReceiver;
76 import net.jxta.endpoint.MessageSender;
77 import net.jxta.endpoint.MessageTransport;
78 import net.jxta.endpoint.Messenger;
79 import net.jxta.endpoint.MessengerEvent;
80 import net.jxta.endpoint.MessengerEventListener;
81 import net.jxta.endpoint.StringMessageElement;
82 import net.jxta.endpoint.ThreadedMessenger;
83 import net.jxta.exception.PeerGroupException;
84 import net.jxta.id.ID;
85 import net.jxta.impl.endpoint.endpointMeter.EndpointMeter;
86 import net.jxta.impl.endpoint.endpointMeter.EndpointMeterBuildSettings;
87 import net.jxta.impl.endpoint.endpointMeter.EndpointServiceMonitor;
88 import net.jxta.impl.endpoint.endpointMeter.InboundMeter;
89 import net.jxta.impl.endpoint.endpointMeter.OutboundMeter;
90 import net.jxta.impl.endpoint.endpointMeter.PropagationMeter;
91 import net.jxta.impl.endpoint.relay.RelayClient;
92 import net.jxta.impl.endpoint.router.EndpointRouter;
93 import net.jxta.impl.endpoint.tcp.TcpTransport;
94 import net.jxta.impl.meter.MonitorManager;
95 import net.jxta.impl.util.SequenceIterator;
96 import net.jxta.logging.Logging;
97 import net.jxta.meter.MonitorResources;
98 import net.jxta.peergroup.PeerGroup;
99 import net.jxta.platform.Module;
100 import net.jxta.protocol.AccessPointAdvertisement;
101 import net.jxta.protocol.ConfigParams;
102 import net.jxta.protocol.ModuleImplAdvertisement;
103 import net.jxta.protocol.PeerAdvertisement;
104 import net.jxta.protocol.RouteAdvertisement;
106 import java.io.IOException;
107 import java.lang.ref.Reference;
108 import java.lang.ref.SoftReference;
109 import java.lang.ref.WeakReference;
110 import java.util.ArrayList;
111 import java.util.Collection;
112 import java.util.Collections;
113 import java.util.Enumeration;
114 import java.util.HashMap;
115 import java.util.HashSet;
116 import java.util.Iterator;
117 import java.util.List;
118 import java.util.Map;
119 import java.util.Vector;
120 import java.util.WeakHashMap;
121 import java.util.logging.Level;
122 import java.util.logging.Logger;
125 * This class implements the frontend for all the JXTA endpoint protocols, as
126 * well as the API for the implementation of the core protocols that use
127 * directly the EndpointService. It theory it only needs to implement core methods.
128 * legacy or convenience methods should stay out. However, that would require
129 * a two-level interface for the service (internal and public). May be later.
131 public class EndpointServiceImpl implements EndpointService, MessengerEventListener {
136 private static final Logger LOG = Logger.getLogger(EndpointServiceImpl.class.getName());
141 * The Wire Message Format we will use by default.
143 public static final MimeMediaType DEFAULT_MESSAGE_TYPE = new MimeMediaType("application/x-jxta-msg").intern();
146 * The name of this service.
148 public static final String ENDPOINTSERVICE_NAME = "EndpointService";
151 * The Message empty namespace. This namespace is reserved for use by
152 * applications. It will not be used by core protocols.
154 public static final String MESSAGE_EMPTY_NS = "";
157 * The Message "jxta" namespace. This namespace is reserved for use by
158 * core protocols. It will not be used by applications.
160 public static final String MESSAGE_JXTA_NS = "jxta";
163 * Namespace in which the message source address will be placed.
165 public static final String MESSAGE_SOURCE_NS = MESSAGE_JXTA_NS;
168 * Element name in which the message source address will be placed.
170 public static final String MESSAGE_SOURCE_NAME = "EndpointSourceAddress";
173 * Namespace in which the message destination address will be placed.
175 public static final String MESSAGE_DESTINATION_NS = MESSAGE_JXTA_NS;
178 * Element name in which the message destination address will be placed.
179 * This element is used for loopback detection during propagate. Only
180 * propagate messages currently contain this element.
182 public static final String MESSAGE_DESTINATION_NAME = "EndpointDestinationAddress";
185 * Namespace in which the message source peer address will be placed.
187 public static final String MESSAGE_SRCPEERHDR_NS = MESSAGE_JXTA_NS;
190 * Element name in which the message source peer address will be placed.
191 * This element is used for loopback detection during propagate. Only
192 * propagated messages currently contain this element.
194 public static final String MESSAGE_SRCPEERHDR_NAME = "EndpointHeaderSrcPeer";
197 * Size of the message queue provided by virtual messengers.
199 private final static int DEFAULT_MESSAGE_QUEUE_SIZE = 20;
202 * If {@code true} then the parent endpoint may be used for acquiring
203 * messengers and for registering listeners.
205 private final static boolean DEFAULT_USE_PARENT_ENDPOINT = true;
207 EndpointServiceMonitor endpointServiceMonitor;
212 private EndpointMeter endpointMeter;
213 private PropagationMeter propagationMeter;
216 * If {@code true} then this service has been initialized.
218 private boolean initialized = false;
221 * tunable: the virtual messenger queue size
223 private int vmQueueSize = DEFAULT_MESSAGE_QUEUE_SIZE;
225 private PeerGroup group = null;
226 private ID assignedID = null;
227 private ModuleImplAdvertisement implAdvertisement = null;
229 private String localPeerId = null;
230 private boolean useParentEndpoint = DEFAULT_USE_PARENT_ENDPOINT;
231 private EndpointService parentEndpoint = null;
232 private String myServiceName = null;
235 * The Message Transports which are registered for this endpoint. This is
236 * only the message transport registered locally, it does not include
237 * transports which are used from other groups.
239 private final Collection<MessageTransport> messageTransports = new HashSet<MessageTransport>();
242 * Passive listeners for messengers. Three priorities, so far.
244 private final Collection[] passiveMessengerListeners = {
245 Collections.synchronizedList(new ArrayList<MessengerEventListener>()),
246 Collections.synchronizedList(new ArrayList<MessengerEventListener>()),
247 Collections.synchronizedList(new ArrayList<MessengerEventListener>())
251 * The set of listener managed by this instance of the endpoint svc.
253 private final Map<String, EndpointListener> incomingMessageListeners = new HashMap<String, EndpointListener>(16);
256 * The set of shared transport messengers currently ready for use.
258 private final Map<EndpointAddress, Reference<Messenger>> messengerMap = new WeakHashMap<EndpointAddress, Reference<Messenger>>(32);
260 * The set of shared transport messengers currently ready for use.
262 private final Map<EndpointAddress, Reference<Messenger>> directMessengerMap = new WeakHashMap<EndpointAddress, Reference<Messenger>>(32);
265 * The filter listeners.
267 * We rarely add/remove, never remove without iterating
268 * and insert objects that are always unique. So using a set
269 * does not make sense. An array list is the best.
271 private final Collection<FilterListenerAndMask> incomingFilterListeners = new ArrayList<FilterListenerAndMask>();
272 private final Collection<FilterListenerAndMask> outgoingFilterListeners = new ArrayList<FilterListenerAndMask>();
275 * Holder for a filter listener and its conditions
277 private static class FilterListenerAndMask {
278 final String namespace;
280 final MessageFilterListener listener;
282 public FilterListenerAndMask(MessageFilterListener listener, String namespace, String name) {
283 this.namespace = namespace;
285 this.listener = listener;
289 public boolean equals(Object target) {
290 if (this == target) {
294 if (target instanceof FilterListenerAndMask) {
295 FilterListenerAndMask likeMe = (FilterListenerAndMask) target;
297 boolean result = (null != namespace) ? (namespace.equals(likeMe.namespace)) : (null == likeMe.namespace);
299 result &= (null != name) ? (name.equals(likeMe.name)) : (null == likeMe.name);
300 result &= (listener == likeMe.listener);
311 * Added to make PMD shut up....
314 public int hashCode() {
315 return System.identityHashCode(this);
321 * A non blocking messenger that obtains a backing (possibly blocking)
322 * messenger on-demand.
324 private class CanonicalMessenger extends ThreadedMessenger {
327 * If the hint was not used because there already was a transport
328 * messenger available, then it is saved here for the next time we are
329 * forced to create a new transport messenger by the breakage of the one
332 * The management of hints is a bit inconsistent for now: the hint
333 * used may be different dependent upon which invocation created the
334 * current canonical messenger and, although we try to use the hint only
335 * once (to avoid carrying an invalid hint forever) it may happen that a
336 * hint is used long after it was suggested.
341 * The transport messenger that this canonical messenger currently uses.
343 Messenger cachedMessenger = null;
346 * Create a new CanonicalMessenger.
348 * @param vmQueueSize queue size
349 * @param destination destination who messages should be addressed to
350 * @param logicalDestination logical destination
351 * @param hint route hint
352 * @param messengerMeter the metering object if any
354 public CanonicalMessenger(int vmQueueSize, EndpointAddress destination, EndpointAddress logicalDestination, Object hint, OutboundMeter messengerMeter) {
355 super(group.getPeerGroupID(), destination, logicalDestination, vmQueueSize);
360 * close this canonical messenger.
363 public void close() {
364 // No way. Not form the outside.
368 * Drop the current messenger.
371 protected void closeImpl() {
372 if (cachedMessenger != null) {
373 cachedMessenger.close();
374 cachedMessenger = null;
376 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
377 LOG.severe("Internal messenger error: close requested while not connected.");
383 * Get a transport messenger to the destination.
385 * FIXME 20040413 jice : Do better hint management.
388 protected boolean connectImpl() {
389 if (cachedMessenger != null) {
390 if ((cachedMessenger.getState() & Messenger.TERMINAL) != 0) {
391 if (Logging.SHOW_FINE && LOG.isLoggable(Level.SEVERE)) {
392 LOG.fine("Closing TERMINAL internal messenger : attempting requested connect.");
394 cachedMessenger.close();
395 cachedMessenger = null;
401 // Consume the hint, if any.
402 Object theHint = hint;
405 cachedMessenger = getLocalTransportMessenger(getDestinationAddress(), theHint);
407 if (cachedMessenger == null) {
411 // FIXME 20040413 jice : it's not too clean: we assume
412 // that all transports use BlockingMessenger as the base class for
413 // their messengers. If they don't we can't force them to hold the
414 // strong reference to the canonical messenger.
416 ((BlockingMessenger) cachedMessenger).setOwner(this);
417 } catch (ClassCastException cce) {
418 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
419 LOG.severe("Transport messengers must all extend BlockingMessenger for now. " +
420 cachedMessenger + " may remain open beyond its use.");
430 protected EndpointAddress getLogicalDestinationImpl() {
431 if (cachedMessenger == null) {
432 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
433 LOG.severe("Internal messenger error: logical destination requested while not connected.");
437 return cachedMessenger.getLogicalDestinationAddress();
444 protected void sendMessageBImpl(Message msg, String service, String param) throws IOException {
445 if (cachedMessenger == null) {
446 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
447 LOG.severe("Internal messenger error: send requested while not connected.");
449 throw new IOException("Internal messenger error.");
453 cachedMessenger.sendMessageB(msg, service, param);
454 } catch (IOException any) {
455 cachedMessenger = null;
457 } catch (RuntimeException any) {
458 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
459 LOG.log(Level.WARNING, "Failure sending " + msg, any);
468 * Create a new EndpointService.
470 public EndpointServiceImpl() {
476 public synchronized void init(PeerGroup group, ID assignedID, Advertisement impl) throws PeerGroupException {
478 throw new PeerGroupException("Cannot initialize service more than once");
482 // The selector for the element of the peer adv params that we have to update.
483 this.assignedID = assignedID;
484 this.implAdvertisement = (ModuleImplAdvertisement) impl;
486 this.localPeerId = group.getPeerID().toString();
488 this.myServiceName = ChannelMessenger.InsertedServicePrefix + group.getPeerGroupID().getUniqueValue().toString();
490 ConfigParams confAdv = group.getConfigAdvertisement();
491 XMLElement paramBlock = null;
493 if (confAdv != null) {
494 paramBlock = (XMLElement) confAdv.getServiceParam(assignedID);
497 if (paramBlock != null) {
498 // get our two tunables: virtual messenger queue size, and whether to use the parent endpoint
501 param = paramBlock.getChildren("MessengerQueueSize");
502 if (param.hasMoreElements()) {
503 String textQSz = ((XMLElement) param.nextElement()).getTextValue();
506 Integer requestedSize = Integer.parseInt(textQSz.trim());
508 if (requestedSize > 0) {
509 vmQueueSize = requestedSize;
511 LOG.warning("Illegal MessengerQueueSize : " + textQSz);
513 } catch (NumberFormatException e) {
514 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
515 LOG.log(Level.WARNING, "could not parse MessengerQueueSize string", e);
520 param = paramBlock.getChildren("UseParentEndpoint");
521 if (param.hasMoreElements()) {
522 String textUPE = ((XMLElement) param.nextElement()).getTextValue();
524 useParentEndpoint = textUPE.trim().equalsIgnoreCase("true");
529 PeerGroup parentGroup = group.getParentGroup();
531 if (useParentEndpoint && parentGroup != null) {
532 parentEndpoint = parentGroup.getEndpointService();
533 parentEndpoint.addMessengerEventListener(this, EndpointService.LowPrecedence);
538 if (Logging.SHOW_CONFIG && LOG.isLoggable(Level.CONFIG)) {
539 StringBuilder configInfo = new StringBuilder("Configuring Endpoint Service : " + assignedID);
541 if (implAdvertisement != null) {
542 configInfo.append("\n\tImplementation :");
543 configInfo.append("\n\t\tModule Spec ID: ");
544 configInfo.append(implAdvertisement.getModuleSpecID());
545 configInfo.append("\n\t\tImpl Description : ").append(implAdvertisement.getDescription());
546 configInfo.append("\n\t\tImpl URI : ").append(implAdvertisement.getUri());
547 configInfo.append("\n\t\tImpl Code : ").append(implAdvertisement.getCode());
550 configInfo.append("\n\tGroup Params :");
551 configInfo.append("\n\t\tGroup : ").append(group);
552 configInfo.append("\n\t\tPeer ID : ").append(group.getPeerID());
554 configInfo.append("\n\tConfiguration :");
555 if (null == parentGroup) {
556 configInfo.append("\n\t\tHome Group : (none)");
558 configInfo.append("\n\t\tHome Group : ").append(parentGroup.getPeerGroupName()).append(" / ").append(
559 parentGroup.getPeerGroupID());
561 configInfo.append("\n\t\tUsing home group endpoint : ").append(parentEndpoint);
562 configInfo.append("\n\t\tVirtual Messenger Queue Size : ").append(vmQueueSize);
563 LOG.config(configInfo.toString());
570 public int startApp(String[] args) {
575 // FIXME when Load order Issue is resolved this should fail
576 // until it is able to get a non-failing service Monitor (or
577 // null = not monitoring)
578 // FIXME it is ok because of the hack in StdPeerGroup that starts
579 // endpoint service first
580 if (EndpointMeterBuildSettings.ENDPOINT_METERING) { // Fix-Me: Move to startApp() when load order issue is resolved
581 endpointServiceMonitor = (EndpointServiceMonitor) MonitorManager.getServiceMonitor(group, MonitorResources.endpointServiceMonitorClassID);
583 if (endpointServiceMonitor != null) {
584 endpointMeter = endpointServiceMonitor.getEndpointMeter();
588 if (parentEndpoint != null) {
589 Iterator<MessageTransport> parentMTs = parentEndpoint.getAllMessageTransports();
591 synchronized (this) {
592 while (parentMTs.hasNext()) {
593 addProtoToAdv(parentMTs.next());
598 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
599 LOG.info("Endpoint Service started.");
602 return Module.START_OK;
608 * The transports and services are going to be stopped as well. When
609 * they are, they will dereference us and we'll go into oblivion.
611 public void stopApp() {
612 if (parentEndpoint != null) {
613 parentEndpoint.removeMessengerEventListener(this, EndpointService.LowPrecedence);
616 // Clear up the passiveMessengersListeners
617 int prec = EndpointService.HighPrecedence;
618 while (prec >= EndpointService.LowPrecedence) {
619 passiveMessengerListeners[prec--].clear();
622 // Clear up any messengers.
623 messengerMap.clear();
624 directMessengerMap.clear();
626 // Clear up the listeners
627 incomingMessageListeners.clear();
629 // Forget about any message filters.
630 incomingFilterListeners.clear();
631 outgoingFilterListeners.clear();
633 // Forget any message transports
634 messageTransports.clear();
636 // Avoid cross-reference problems with the GC
639 // parentEndpoint = null;
640 // parentGroup = null;
642 // The above is not really needed and until we have a very orderly
643 // shutdown, it causes NPEs that are hard to prevent.
645 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
646 LOG.info("Endpoint Service stopped.");
653 public PeerGroup getGroup() {
660 * We create a new instance each time because our interface actually
661 * has state (channel messengers and listener callback adaptor).
663 public EndpointService getInterface() {
664 return new EndpointServiceInterface(this);
670 public ModuleImplAdvertisement getImplAdvertisement() {
671 return implAdvertisement;
674 // A vector for statistics between propagateThroughAll and its invoker.
675 private static class Metrics {
676 int numFilteredOut = 0;
677 int numPropagatedTo = 0;
678 int numErrorsPropagated = 0;
681 private void propagateThroughAll(Iterator<MessageTransport> eachProto, Message myMsg, String serviceName, String serviceParam, int initialTTL, Metrics metrics) {
683 Message filtered = null;
685 while (eachProto.hasNext()) {
686 MessageTransport aTransport = eachProto.next();
689 if (!(aTransport instanceof MessagePropagater)) {
693 MessagePropagater propagater = (MessagePropagater) aTransport;
695 if (null == filtered) {
696 // run process filters only once
697 filtered = processFilters(myMsg,
698 propagater.getPublicAddress(),
699 new EndpointAddress(group.getPeerGroupID(), serviceName, serviceParam),
703 if (null == filtered) {
704 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
705 LOG.fine(" message " + myMsg + " discarded upon filter decision");
708 if (EndpointMeterBuildSettings.ENDPOINT_METERING) {
709 metrics.numFilteredOut++;
714 propagater.propagate(filtered.clone(), serviceName, serviceParam, initialTTL);
716 if (EndpointMeterBuildSettings.ENDPOINT_METERING) {
717 metrics.numPropagatedTo++;
719 } catch (Exception e) {
720 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
721 LOG.log(Level.WARNING, "Failed propagating message " + filtered + " on message transport " + aTransport, e);
724 if (EndpointMeterBuildSettings.ENDPOINT_METERING) {
725 metrics.numErrorsPropagated++;
734 public void propagate(Message msg, String serviceName, String serviceParam) {
735 propagate(msg, serviceName, serviceParam, Integer.MAX_VALUE);
741 public void propagate(Message msg, String serviceName, String serviceParam, int initialTTL) {
742 long startPropagationTime = 0;
744 if (null == serviceName) {
745 throw new IllegalArgumentException("serviceName may not be null");
748 Metrics metrics = null;
750 if (EndpointMeterBuildSettings.ENDPOINT_METERING) {
751 metrics = new Metrics();
754 // Keep the orig unchanged for metering reference and caller's benefit, but
755 // we are forced to clone it here, because we add a header.
758 if (EndpointMeterBuildSettings.ENDPOINT_METERING) {
759 startPropagationTime = System.currentTimeMillis();
763 MessageElement srcHdrElement = new StringMessageElement(EndpointServiceImpl.MESSAGE_SRCPEERHDR_NAME, localPeerId, null);
765 msg.replaceMessageElement(EndpointServiceImpl.MESSAGE_SRCPEERHDR_NS, srcHdrElement);
767 // Do the local transports with the plain address.
768 Iterator<MessageTransport> eachProto = getAllLocalTransports();
770 propagateThroughAll(eachProto, msg.clone(), serviceName, serviceParam, initialTTL, metrics);
772 // Do the parent transports with a mangled address.
773 if (parentEndpoint != null) {
774 eachProto = parentEndpoint.getAllMessageTransports();
776 StringBuilder mangled = new StringBuilder(serviceName);
777 if (null != serviceParam) {
779 mangled.append(serviceParam);
782 propagateThroughAll(eachProto, msg.clone(), myServiceName, mangled.toString(), initialTTL, metrics);
785 if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointServiceMonitor != null)) {
786 PropagationMeter propagationMeter = endpointServiceMonitor.getPropagationMeter(serviceName, serviceParam);
788 propagationMeter.registerPropagateMessageStats(metrics.numPropagatedTo, metrics.numFilteredOut, metrics.numErrorsPropagated,
789 System.currentTimeMillis() - startPropagationTime);
794 * Process the filters for this message.
796 private Message processFilters(Message message, EndpointAddress srcAddress, EndpointAddress dstAddress, boolean incoming) {
798 Iterator<FilterListenerAndMask> eachFilter = incoming
799 ? incomingFilterListeners.iterator()
800 : outgoingFilterListeners.iterator();
802 while (eachFilter.hasNext()) {
803 FilterListenerAndMask aFilter = eachFilter.next();
805 Message.ElementIterator eachElement = message.getMessageElements();
807 while (eachElement.hasNext()) {
808 MessageElement anElement = eachElement.next();
810 if ((null != aFilter.namespace) && (!aFilter.namespace.equals(eachElement.getNamespace()))) {
814 if ((null != aFilter.name) && (!aFilter.name.equals(anElement.getElementName()))) {
818 message = aFilter.listener.filterMessage(message, srcAddress, dstAddress);
820 if (null == message) {
826 // If we got here, no filter has rejected the message. Keep processing it.
830 private static EndpointAddress demangleAddress(EndpointAddress mangled) {
831 String serviceName = mangled.getServiceName();
833 if (null == serviceName) {
834 // not a mangled address
838 if (!serviceName.startsWith(ChannelMessenger.InsertedServicePrefix)) {
839 // not a mangled address
843 String serviceParam = mangled.getServiceParameter();
845 if (null == serviceParam) {
846 // it has no param, its a null destination.
847 // XXX bondolo 20050907 I'm not sure this is correct.
848 return new EndpointAddress(mangled, null, null);
851 int slashAt = serviceParam.indexOf('/');
854 // param has no param portion.
855 return new EndpointAddress(mangled, serviceParam, null);
858 return new EndpointAddress(mangled, serviceParam.substring(0, slashAt), serviceParam.substring(slashAt + 1));
864 public void processIncomingMessage(Message msg, EndpointAddress srcAddress, EndpointAddress dstAddress) {
866 // check for propagate loopback.
867 MessageElement srcPeerElement = msg.getMessageElement(EndpointServiceImpl.MESSAGE_SRCPEERHDR_NS, EndpointServiceImpl.MESSAGE_SRCPEERHDR_NAME);
869 if (null != srcPeerElement) {
870 msg.removeMessageElement(srcPeerElement);
871 String srcPeer = srcPeerElement.toString();
873 if (localPeerId.equals(srcPeer)) {
874 // This is a loopback. Discard.
875 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
876 LOG.fine(msg + " is a propagate loopback. Discarded");
879 if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) {
880 endpointMeter.discardedLoopbackDemuxMessage();
887 if (null == srcAddress) {
888 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
889 LOG.warning("null src address, discarding message " + msg);
892 if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) {
893 endpointMeter.invalidIncomingMessage();
899 if (null == dstAddress) {
900 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
901 LOG.warning("null destination address, discarding message " + msg);
904 if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) {
905 endpointMeter.invalidIncomingMessage();
911 // Decode the destination address.
913 // 1 - a version of the address that does not have the grp redirection.
914 // 2 - a version of the serviceName that includes BOTH the group redirection and the original service name.
915 // 3 - the original service param; without the original service name stuck to it.
916 // So, basically we want the original serviceName part stuck to the group mangling, not stuck to the original
917 // serviceParam. We do that by cut/pasting from both the mangled and demangled versions of the address.
919 EndpointAddress demangledAddress = demangleAddress(dstAddress);
920 String decodedServiceName = demangledAddress.getServiceName();
921 String decodedServiceParam = demangledAddress.getServiceParameter();
923 if ((null == decodedServiceName) || (0 == decodedServiceName.length())) {
924 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
925 LOG.warning("dest serviceName must not be null, discarding message " + msg);
928 if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) {
929 endpointMeter.invalidIncomingMessage();
935 // Do filters for this message:
936 // FIXME - jice 20040417 : filters are likely broken, now. They do not see messages
937 // from xports in parent groups. For those messages that are seen, demangled address seems to be the useful one.
938 msg = processFilters(msg, srcAddress, demangledAddress, true);
940 // If processFilters retuns null, the message is to be discarded.
942 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
943 LOG.fine("Message discarded during filter processing");
946 if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) {
947 endpointMeter.incomingMessageFilteredOut();
953 // Now that we know the original service name is valid, finish building the decoded version.
954 if (demangledAddress != dstAddress) {
955 decodedServiceName = dstAddress.getServiceName() + "/" + decodedServiceName;
958 // Look up the listener
959 EndpointListener listener = getIncomingMessageListener(decodedServiceName, decodedServiceParam);
961 // No listener? oh well.
963 if (listener == null) {
964 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
965 LOG.warning("No listener for \'" + dstAddress + "\' in group " +
966 group + "\n\tdecodedServiceName :" +
967 decodedServiceName + "\tdecodedServiceParam :" +
968 decodedServiceParam);
971 if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) {
972 endpointMeter.noListenerForIncomingMessage();
975 return; // noone cares for this message
981 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
982 if (null != decodedServiceParam) {
983 LOG.fine("Calling listener for \'" + decodedServiceName + "/" + decodedServiceParam + "\' with " + msg);
985 LOG.fine("Calling listener for \'" + decodedServiceName + "\' with " + msg);
989 listener.processIncomingMessage(msg, srcAddress, demangledAddress);
991 if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) {
992 endpointMeter.incomingMessageSentToEndpointListener();
995 if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) {
996 endpointMeter.demuxMessageProcessed();
998 } catch (Throwable all) {
999 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
1000 LOG.log(Level.SEVERE, "Uncaught throwable from listener for " + dstAddress, all);
1003 if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) {
1004 endpointMeter.errorProcessingIncomingMessage();
1012 public void demux(Message msg) {
1014 // Get the message destination
1015 MessageElement dstAddressElement = msg.getMessageElement(EndpointServiceImpl.MESSAGE_DESTINATION_NS,
1016 EndpointServiceImpl.MESSAGE_DESTINATION_NAME);
1018 if (null == dstAddressElement) {
1019 // No destination address... Just discard
1020 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1021 LOG.warning(msg + " has no destination address. Discarded");
1024 if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) {
1025 endpointMeter.noDestinationAddressForDemuxMessage();
1031 msg.removeMessageElement(dstAddressElement);
1032 EndpointAddress dstAddress = new EndpointAddress(dstAddressElement.toString());
1034 // Get the message source
1035 MessageElement srcAddressElement = msg.getMessageElement(EndpointServiceImpl.MESSAGE_SOURCE_NS, EndpointServiceImpl.MESSAGE_SOURCE_NAME);
1037 if (null == srcAddressElement) {
1038 // No src address... Just discard
1039 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1040 LOG.warning(msg + " has no source address. Discarded");
1043 if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) {
1044 endpointMeter.noSourceAddressForDemuxMessage();
1050 msg.removeMessageElement(srcAddressElement);
1051 EndpointAddress msgScrAddress = new EndpointAddress(srcAddressElement.toString());
1053 processIncomingMessage(msg, msgScrAddress, dstAddress);
1059 public MessengerEventListener addMessageTransport(MessageTransport transpt) {
1061 synchronized (messageTransports) {
1062 // check if it is already installed.
1063 if (!messageTransports.contains(transpt)) {
1065 clearProtoFromAdv(transpt); // just to be safe
1066 messageTransports.add(transpt);
1067 addProtoToAdv(transpt);
1069 // FIXME: For now, we return this. Later we might return something else, so that we can take
1070 // advantage of the fact that we know that the event is from a local transport.
1071 // That will help cleaning up the incoming messenger mess.
1082 public boolean removeMessageTransport(MessageTransport transpt) {
1086 synchronized (messageTransports) {
1087 removed = messageTransports.remove(transpt);
1091 clearProtoFromAdv(transpt);
1100 public Iterator<MessageTransport> getAllMessageTransports() {
1101 if (null != parentEndpoint) {
1102 return new SequenceIterator(getAllLocalTransports(), parentEndpoint.getAllMessageTransports());
1104 return getAllLocalTransports();
1111 public MessageTransport getMessageTransport(String name) {
1112 Iterator<MessageTransport> allTransports = getAllMessageTransports();
1114 while (allTransports.hasNext()) {
1115 MessageTransport transpt = allTransports.next();
1117 if (transpt.getProtocolName().equals(name)) {
1125 private void addProtoToAdv(MessageTransport proto) {
1127 boolean relay = false;
1130 if (!(proto instanceof MessageReceiver)) {
1134 // no value to publish for the router endpoint address
1135 if (proto instanceof EndpointRouter) {
1136 // register the corresponding group to relay connection events
1137 addActiveRelayListener(group);
1141 // register this group to Relay connection events
1142 if (proto instanceof RelayClient) {
1144 ((RelayClient) proto).addActiveRelayListener(group);
1147 // get the list of addresses
1148 Iterator<EndpointAddress> allAddresses = ((MessageReceiver) proto).getPublicAddresses();
1149 Vector<String> ea = new Vector<String>();
1151 while (allAddresses.hasNext()) {
1152 EndpointAddress anEndpointAddress = allAddresses.next();
1154 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1155 LOG.fine("Adding endpoint address to route advertisement : " + anEndpointAddress);
1158 ea.add(anEndpointAddress.toString());
1161 PeerAdvertisement padv = group.getPeerAdvertisement();
1162 StructuredDocument myParam = padv.getServiceParam(assignedID);
1164 RouteAdvertisement route = null;
1166 if (myParam != null) {
1167 Enumeration paramChilds = myParam.getChildren(RouteAdvertisement.getAdvertisementType());
1169 if (paramChilds.hasMoreElements()) {
1170 // we have an advertisement just add the new access points
1171 XMLElement param = (XMLElement) paramChilds.nextElement();
1173 route = (RouteAdvertisement) AdvertisementFactory.newAdvertisement(param);
1174 route.addDestEndpointAddresses(ea);
1176 // need to add the relay info if we have some
1177 Vector<AccessPointAdvertisement> hops = ((RelayClient) proto).getActiveRelays(group);
1179 if (!hops.isEmpty()) {
1180 route.setHops(hops);
1186 if (null == route) {
1187 // None yet, so create a new Route Advertisement
1188 // create the RouteAdvertisement that will contain the route to
1189 // the peer. At this point we only know the peer endpoint addresses
1190 // no hops are known
1192 // create the destination access point
1193 AccessPointAdvertisement destAP = (AccessPointAdvertisement) AdvertisementFactory.newAdvertisement(
1194 AccessPointAdvertisement.getAdvertisementType());
1196 destAP.setPeerID(group.getPeerID());
1197 destAP.setEndpointAddresses(ea);
1199 // create the route advertisement
1200 route = (RouteAdvertisement) AdvertisementFactory.newAdvertisement(RouteAdvertisement.getAdvertisementType());
1201 route.setDest(destAP);
1204 // need to add the relay info if we have some
1205 Vector<AccessPointAdvertisement> hops = ((RelayClient) proto).getActiveRelays(group);
1207 if (!hops.isEmpty()) {
1208 route.setHops(hops);
1213 // create the param route
1214 XMLDocument newParam = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8, "Parm");
1215 XMLDocument xptDoc = (XMLDocument) route.getDocument(MimeMediaType.XMLUTF8);
1217 StructuredDocumentUtils.copyElements(newParam, newParam, xptDoc);
1219 padv.putServiceParam(assignedID, newParam);
1221 // publish the new advertisement
1222 DiscoveryService discovery = group.getDiscoveryService();
1224 if (discovery != null) {
1225 discovery.publish(padv, DiscoveryService.INFINITE_LIFETIME, DiscoveryService.DEFAULT_EXPIRATION);
1227 } catch (Exception ex) {
1228 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
1229 LOG.log(Level.SEVERE, "Exception adding message transport ", ex);
1234 private void clearProtoFromAdv(MessageTransport transpt) {
1237 if (!(transpt instanceof MessageReceiver)) {
1241 // no value to publish the router endpoint address
1242 if (transpt instanceof EndpointRouter) {
1243 // register the corresponding group in the relay
1244 removeActiveRelayListener(group);
1248 // register this group to Relay connection events
1249 if (transpt instanceof RelayClient) {
1250 ((RelayClient) transpt).removeActiveRelayListener(group);
1253 Iterator<EndpointAddress> allAddresses = ((MessageReceiver) transpt).getPublicAddresses();
1254 Vector<String> ea = new Vector<String>();
1256 while (allAddresses.hasNext()) {
1257 EndpointAddress anEndpointAddress = allAddresses.next();
1259 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1260 LOG.fine("Removing endpoint address from route advertisement : " + anEndpointAddress);
1263 ea.add(anEndpointAddress.toString());
1266 PeerAdvertisement padv = group.getPeerAdvertisement();
1267 XMLDocument myParam = (XMLDocument) padv.getServiceParam(assignedID);
1269 if (myParam == null) {
1273 Enumeration paramChilds = myParam.getChildren(RouteAdvertisement.getAdvertisementType());
1275 if (!paramChilds.hasMoreElements()) {
1279 XMLElement param = (XMLElement) paramChilds.nextElement();
1281 RouteAdvertisement route = (RouteAdvertisement) AdvertisementFactory.newAdvertisement(param);
1283 route.removeDestEndpointAddresses(ea);
1285 // update the new route to a new parm structure.
1286 XMLDocument newParam = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8, "Parm");
1288 XMLDocument xptDoc = (XMLDocument) route.getDocument(MimeMediaType.XMLUTF8);
1290 StructuredDocumentUtils.copyElements(newParam, newParam, xptDoc);
1292 // put the parms back.
1293 padv.putServiceParam(assignedID, newParam);
1295 // publish the new advertisement
1296 DiscoveryService discovery = group.getDiscoveryService();
1298 if (discovery != null) {
1299 discovery.publish(padv, DiscoveryService.INFINITE_LIFETIME, DiscoveryService.DEFAULT_EXPIRATION);
1301 } catch (Exception ex) {
1302 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
1303 LOG.log(Level.SEVERE, "Exception removing messsage transport ", ex);
1311 public boolean addMessengerEventListener(MessengerEventListener listener, int prio) {
1312 int priority = prio;
1314 if (priority > EndpointService.HighPrecedence) {
1315 priority = EndpointService.HighPrecedence;
1318 if (priority < EndpointService.LowPrecedence) {
1319 priority = EndpointService.LowPrecedence;
1322 return passiveMessengerListeners[priority].add(listener);
1328 public boolean removeMessengerEventListener(MessengerEventListener listener, int prio) {
1329 int priority = prio;
1331 if (priority > EndpointService.HighPrecedence) {
1332 priority = EndpointService.HighPrecedence;
1334 if (priority < EndpointService.LowPrecedence) {
1335 priority = EndpointService.LowPrecedence;
1338 return passiveMessengerListeners[priority].remove(listener);
1344 public boolean addIncomingMessageListener(EndpointListener listener, String serviceName, String serviceParam) {
1346 if (null == listener) {
1347 throw new IllegalArgumentException("EndpointListener must be non-null");
1350 if (null == serviceName) {
1351 throw new IllegalArgumentException("serviceName must not be null");
1354 if (-1 != serviceName.indexOf('/')) {
1355 throw new IllegalArgumentException("serviceName may not contain '/' characters");
1358 String address = serviceName;
1360 if (null != serviceParam) {
1361 address += "/" + serviceParam;
1364 synchronized (incomingMessageListeners) {
1365 if (incomingMessageListeners.containsKey(address)) {
1369 InboundMeter incomingMessageListenerMeter = null;
1371 if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointServiceMonitor != null)) {
1372 incomingMessageListenerMeter = endpointServiceMonitor.getInboundMeter(serviceName, serviceParam);
1375 incomingMessageListeners.put(address, listener);
1378 if (parentEndpoint != null) {
1379 if (serviceName.startsWith(ChannelMessenger.InsertedServicePrefix)) {
1380 // The listener name is already re-written.
1381 // The listener is already a quota listener; we made extra sure of that before tucking it into our local map.
1382 parentEndpoint.addIncomingMessageListener(listener, serviceName, serviceParam);
1384 parentEndpoint.addIncomingMessageListener(listener, myServiceName, address);
1394 public EndpointListener getIncomingMessageListener(String serviceName, String serviceParam) {
1396 if (null == serviceName) {
1397 throw new IllegalArgumentException("serviceName must not be null");
1400 EndpointListener listener = null;
1402 if (null != serviceParam) {
1403 listener = incomingMessageListeners.get(serviceName + "/" + serviceParam);
1406 // Didn't find it with param, maybe there is a generic listener for the service
1407 if (listener == null) {
1408 listener = incomingMessageListeners.get(serviceName);
1411 // Didn't find it still, try the compatibility name.
1412 if (listener == null) {
1413 listener = incomingMessageListeners.get(serviceName + serviceParam);
1415 if ((null != listener) && Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1416 LOG.warning("Found handler only via compatibility listener : " + serviceName + serviceParam);
1426 public EndpointListener removeIncomingMessageListener(String serviceName, String serviceParam) {
1427 if (null == serviceName) {
1428 throw new IllegalArgumentException("serviceName must not be null");
1431 if (-1 != serviceName.indexOf('/')) {
1432 throw new IllegalArgumentException("serviceName may not contain '/' characters");
1435 String address = serviceName;
1437 if (null != serviceParam) {
1438 address += "/" + serviceParam;
1441 EndpointListener removedListener;
1442 synchronized (incomingMessageListeners) {
1443 removedListener = incomingMessageListeners.remove(address);
1446 if (parentEndpoint != null) {
1447 if (serviceName.startsWith(ChannelMessenger.InsertedServicePrefix)) {
1448 parentEndpoint.removeIncomingMessageListener(serviceName, serviceParam);
1450 parentEndpoint.removeIncomingMessageListener(myServiceName, address);
1453 return removedListener;
1457 * Returns a local transport that can send to the given address. For now
1458 * this is based only on the protocol name.
1460 * @param addr the endpoint address
1461 * @return the transport if the address protocol is supported by this transport
1463 private MessageSender getLocalSenderForAddress(EndpointAddress addr) {
1465 Iterator<MessageTransport> localTransports = getAllLocalTransports();
1467 while (localTransports.hasNext()) {
1468 MessageTransport transpt = localTransports.next();
1469 if (!transpt.getProtocolName().equals(addr.getProtocolName())) {
1473 if (!(transpt instanceof MessageSender)) {
1477 return (MessageSender) transpt;
1485 * Note: canonical messenger itself does not do any address rewriting.
1486 * Any address rewriting must be specified when getting a channel. However,
1487 * canonical knows the default group redirection for its owning endpoint and
1488 * will automatically skip redirection if it is the same.
1491 public Messenger getCanonicalMessenger(EndpointAddress addr, Object hint) {
1493 throw new IllegalArgumentException("null endpoint address not allowed.");
1496 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1497 Throwable trace = new Throwable("Stack Trace");
1498 StackTraceElement elements[] = trace.getStackTrace();
1502 while (elements[position].getClassName().startsWith("net.jxta.impl.endpoint.EndpointService")) {
1506 if ((elements.length - 1) == position) {
1510 LOG.fine("Get Messenger for " + addr + " by " + elements[position]);
1513 // Check the canonical map.
1514 synchronized (messengerMap) {
1515 Reference<Messenger> ref = messengerMap.get(addr);
1518 Messenger found = ref.get();
1520 // If it is USABLE, return it.
1521 if ((found != null) && ((found.getState() & Messenger.USABLE) != 0)) {
1525 // It has been GCed or is no longer USABLE. Make room for a new one.
1526 messengerMap.remove(addr);
1529 if (getLocalSenderForAddress(addr) != null) {
1530 OutboundMeter messengerMeter = null;
1532 if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointServiceMonitor != null)) {
1533 messengerMeter = endpointServiceMonitor.getOutboundMeter(addr);
1536 // The hint is saved in the canonical messenger and will be used
1537 // when that virtual messenger first faces the need to create a
1538 // transport messenger. As of now, the logical dest is unknown.
1539 Messenger m = new CanonicalMessenger(vmQueueSize, addr, null, hint, messengerMeter);
1541 messengerMap.put(m.getDestinationAddress(), new SoftReference<Messenger>(m));
1546 // If we're here, we do not have any such transport.
1547 // Try our ancestors enpoints, if any.
1549 if (parentEndpoint == null) {
1550 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1551 LOG.fine("Could not create messenger for : " + addr);
1556 return parentEndpoint.getCanonicalMessenger(addr, hint);
1560 * Return only the message transport registered locally.
1562 protected Iterator<MessageTransport> getAllLocalTransports() {
1563 List<MessageTransport> transportList;
1565 synchronized (messageTransports) {
1566 transportList = new ArrayList<MessageTransport>(messageTransports);
1569 return transportList.iterator();
1573 * Returns a messenger for the specified address from one of the Message
1574 * Transports registered with this very endpoint service. Message
1575 * Transports inherited from parent groups will not be used.
1577 * @param addr The destination address of the desired Messenger.
1578 * @param hint A hint provided to the Message Transport which may assist it
1579 * in creating the messenger.
1580 * @return A Messenger for the specified destination address or {@code null}
1581 * if no Messenger could be created.
1583 private Messenger getLocalTransportMessenger(EndpointAddress addr, Object hint) {
1584 MessageSender sender = getLocalSenderForAddress(addr);
1585 Messenger messenger = null;
1587 if (sender != null) {
1588 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1589 LOG.fine("Trying address \'" + addr + "\' with : " + sender);
1591 messenger = sender.getMessenger(addr, hint);
1594 if (messenger == null) {
1595 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1596 LOG.fine("Couldn\'t create messenger for : " + addr);
1605 public synchronized void addIncomingMessageFilterListener(MessageFilterListener listener, String namespace, String name) {
1606 if (null == listener) {
1607 throw new IllegalArgumentException("listener must be non-null");
1610 FilterListenerAndMask aFilter = new FilterListenerAndMask(listener, namespace, name);
1612 incomingFilterListeners.add(aFilter);
1618 public synchronized void addOutgoingMessageFilterListener(MessageFilterListener listener, String namespace, String name) {
1619 if (null == listener) {
1620 throw new IllegalArgumentException("listener must be non-null");
1623 FilterListenerAndMask aFilter = new FilterListenerAndMask(listener, namespace, name);
1625 outgoingFilterListeners.add(aFilter);
1631 public synchronized MessageFilterListener removeIncomingMessageFilterListener(MessageFilterListener listener, String namespace, String name) {
1632 Iterator<FilterListenerAndMask> eachListener = incomingFilterListeners.iterator();
1634 while (eachListener.hasNext()) {
1635 FilterListenerAndMask aFilter = eachListener.next();
1637 if (listener == aFilter.listener) {
1638 eachListener.remove();
1649 public synchronized MessageFilterListener removeOutgoingMessageFilterListener(MessageFilterListener listener, String namespace, String name) {
1650 Iterator<FilterListenerAndMask> eachListener = outgoingFilterListeners.iterator();
1652 while (eachListener.hasNext()) {
1653 FilterListenerAndMask aFilter = eachListener.next();
1655 if ((listener == aFilter.listener)
1656 && ((null != namespace) ? namespace.equals(aFilter.namespace) : (null == aFilter.namespace))
1657 && ((null != name) ? name.equals(aFilter.name) : (null == aFilter.name))) {
1658 eachListener.remove();
1669 * <p/>Redistribute the event to those interested.
1671 public boolean messengerReady(MessengerEvent event) {
1673 // FIXME - jice@jxta.org 20040413: now that we share messengers, we
1674 // should be able to get rid of most of this mess, and in the router,
1675 // and the relay too.
1677 Messenger messenger = event.getMessenger();
1678 Messenger messengerForHere;
1679 EndpointAddress connAddr = event.getConnectionAddress();
1681 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1682 LOG.fine("New " + messenger + " for : " +
1683 messenger.getDestinationAddress() + " (" +
1684 messenger.getLogicalDestinationAddress() + ")");
1687 int highestPrec = EndpointService.HighPrecedence;
1688 int lowestPrec = EndpointService.LowPrecedence;
1690 // If there's no connection address we just pass the messenger around
1691 // everywhere; it is unspecified which group it is for.
1692 // Else, we must figure out if it is for this group, or must be
1693 // passed upStack (if any).
1694 if (connAddr != null) {
1695 String cgServiceName = connAddr.getServiceName();
1697 // See if there is a mangling. If not, this means this was sent
1698 // within this group through a local xport, so it is for here. Else
1699 // it may be for here (from below) or for upstack.
1701 if (cgServiceName == null || !cgServiceName.startsWith(ChannelMessenger.InsertedServicePrefix)) {
1702 // FIXME: jice@jxta.org - 20030512 If we restrict use to "here"
1703 // we make most backchannels useless. So, the statement below is
1704 // commented out. Ideally we should not have to worry about the
1705 // group targetting of connections, only messages. However the
1706 // way the relay and the router have to split messengers makes
1707 // it necessary. This may only be fixed by re-organizing
1708 // globally the management of incoming messengers in the
1709 // endpoint, so that router and relay no-longer need to claim
1710 // exclusive use of messengers. Since relay clients set the
1711 // group properly, their messengers are not affected by this
1712 // branch of the code.
1713 // lowestPrec = EndpointService.LowPrecedence + 1;
1714 } else if (!myServiceName.equals(cgServiceName)) {
1715 // This is for upstack only
1716 highestPrec = EndpointService.LowPrecedence;
1718 // Mangling present and this is for here (and therefore this is
1719 // from below). We must demangle. Wrapping is figured later,
1720 // since we may also have to wrap if there the
1721 lowestPrec = EndpointService.LowPrecedence + 1;
1723 String serviceParam = connAddr.getServiceParameter();
1724 String realService = null;
1725 String realParam = null;
1727 if (null != serviceParam) {
1728 int slashAt = serviceParam.indexOf('/');
1730 if (-1 == slashAt) {
1731 realService = serviceParam;
1733 realService = serviceParam.substring(0, slashAt);
1734 realParam = serviceParam.substring(slashAt + 1);
1738 connAddr = new EndpointAddress(connAddr, realService, realParam);
1742 // We make a channel in all cases, the channel will decide if the desired grp redirection
1743 // requires address rewriting or not.
1745 // As for a MessageWatcher for implementing sendMessage-with-listener, we do not provide one
1746 // mostly because it is difficult to get a hold on the only appropriate one: that of the endpoint
1747 // service interface of the listener's owner. So, incoming messengers will not support the listener-based send API.
1748 // Unless someone adds a watcher by hand.
1749 messengerForHere = event.getMessenger().getChannelMessenger(group.getPeerGroupID(), null, null);
1751 // Call the listener highest precedence first. The first one that claims
1752 // the messenger wins.
1753 for (int prec = highestPrec + 1; prec-- > lowestPrec;) {
1754 MessengerEvent newMessenger = new MessengerEvent(event.getSource(),
1755 prec == EndpointService.LowPrecedence ? messenger : messengerForHere, connAddr);
1757 // We need to grab the listeners and release the lock. Otherwise the
1758 // sometimes too long operations performed by the listener creates
1759 // an unnecessary contention.
1760 // The side effect is that a listener can in theory be called after
1761 // remove returned. It is unlikely to be a problem for messenger
1762 // events, but if it is, then we'll have to add reader-writer synch.
1763 Collection<MessengerEventListener> allML = new ArrayList<MessengerEventListener>(passiveMessengerListeners[prec]);
1764 for (MessengerEventListener listener : allML) {
1766 if (listener.messengerReady(newMessenger)) {
1767 // A listener has taken the messenger. we're done.
1768 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1769 LOG.fine(newMessenger + " claimed by " + listener);
1773 } catch (Throwable all) {
1774 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1775 LOG.log(Level.WARNING, "Uncaught Throwable in listener " + listener, all);
1781 // Note that the messenger was not wanted.
1782 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1783 LOG.fine("Nobody cared about " + event);
1788 // public MessengerEventListener getMessengerEventListener() {
1792 // try to find the relay service in our
1793 // hierachy of endpoints to register our listener group
1794 // since the group has to register into the relay service. This is not
1795 // very pretty, but the other way was even worth to register the relay
1796 // into the endpoint!
1798 private void addActiveRelayListener(PeerGroup listeningGroup) {
1799 PeerGroup parentGroup = group.getParentGroup();
1800 while (parentGroup != null) {
1801 EndpointService parentEndpoint = parentGroup.getEndpointService();
1803 for (Iterator<MessageTransport> it = parentEndpoint.getAllMessageTransports(); it.hasNext();) {
1804 MessageTransport mt = it.next();
1806 if ((mt instanceof RelayClient)) {
1807 ((RelayClient) mt).addActiveRelayListener(listeningGroup);
1811 parentGroup = parentGroup.getParentGroup();
1815 // try to find the relay service in our
1816 // hierachy of endpoints to unregister our listener group
1817 private void removeActiveRelayListener(PeerGroup listeningGroup) {
1818 PeerGroup parentGroup = group.getParentGroup();
1820 while (parentGroup != null) {
1821 EndpointService parentEndpoint = parentGroup.getEndpointService();
1823 for (Iterator<MessageTransport> it = parentEndpoint.getAllMessageTransports(); it.hasNext();) {
1824 MessageTransport mt = it.next();
1826 if ((mt instanceof RelayClient)) {
1827 ((RelayClient) mt).removeActiveRelayListener(listeningGroup);
1831 parentGroup = parentGroup.getParentGroup();
1836 * Convenience legacy methods. They are here to reduce the complexity of the class hierarchy but are not supposed to be used.
1842 * @deprecated legacy method.
1845 public boolean ping(EndpointAddress addr) {
1846 throw new UnsupportedOperationException("Legacy method not implemented. Use an interface object.");
1852 * @deprecated legacy method.
1855 public boolean getMessenger(MessengerEventListener listener, EndpointAddress addr, Object hint) {
1856 throw new UnsupportedOperationException("Legacy method not implemented. Use an interface object.");
1862 * convenience method not supported here.
1864 public Messenger getMessenger(EndpointAddress addr) {
1865 throw new UnsupportedOperationException("Convenience method not implemented. Use an interface object.");
1871 * convenience method not supported here.
1873 public Messenger getMessengerImmediate(EndpointAddress addr, Object hint) {
1874 throw new UnsupportedOperationException("Convenience method not implemented. Use an interface object.");
1880 * convenience method not supported here.
1882 public Messenger getMessenger(EndpointAddress addr, Object hint) {
1883 throw new UnsupportedOperationException("Convenience method not implemented. Use an interface object.");
1887 * Returns a Direct Messenger that may be used to send messages via this endpoint
1888 * to the specified destination.
1890 * @param address the destination address.
1891 * @param hint the messenger hint, if any, otherwise null.
1892 * @param exclusive if true avoids caching the messenger
1893 * @return The messenger or {@code null} is returned if the destination address is not reachable.
1894 * @throws IllegalArgumentException if hint is not of RouteAdvertisement, or PeerAdvertisement type.
1896 public Messenger getDirectMessenger(EndpointAddress address, Object hint, boolean exclusive) {
1899 Reference<Messenger> reference = directMessengerMap.get(address);
1900 if (reference != null) {
1901 Messenger messenger = reference.get();
1902 if (messenger != null && !messenger.isClosed()) {
1908 // We must have access to a TCP transport to create a direct messenger.
1909 TcpTransport tcpTransport = (TcpTransport) getMessageTransport("tcp");
1911 if ((tcpTransport != null) && (hint != null)) {
1912 RouteAdvertisement route;
1913 EndpointAddress direct;
1914 Messenger messenger;
1915 if (hint instanceof RouteAdvertisement) {
1916 route = (RouteAdvertisement) hint;
1917 } else if (hint instanceof PeerAdvertisement) {
1918 route = EndpointUtils.extractRouteAdv((PeerAdvertisement) hint);
1920 throw new IllegalArgumentException("Unknown route hint object type" + hint);
1923 for (EndpointAddress transportAddr : route.getDestEndpointAddresses()) {
1924 if (transportAddr.getProtocolName().equals("tcp")) {
1925 direct = createDirectAddress(transportAddr, address);
1926 // direct messengers are non self destructive
1927 messenger = tcpTransport.getMessenger(direct, route, false);
1928 if (messenger != null) {
1930 directMessengerMap.put(address, new WeakReference<Messenger>(messenger));
1942 * Given a transport address and service address, create a mangled endpoint address
1944 * @param transportAddr the transport messenger address
1945 * @param serviceEndpoint the service endpoint
1946 * @return an composite endpoint address
1948 private EndpointAddress createDirectAddress(EndpointAddress transportAddr, EndpointAddress serviceEndpoint) {
1949 //physical transport address
1950 StringBuilder destStr = new StringBuilder(transportAddr.toString()).append("/");
1952 destStr.append(ENDPOINTSERVICE_NAME);
1954 destStr.append(":").append(group.getPeerGroupID().getUniqueValue().toString()).append("/");
1956 destStr.append(serviceEndpoint.getServiceName()).append("/").append(serviceEndpoint.getServiceParameter());
1958 //return new EndpointAddress(transportAddr, serviceEndpoint.getServiceName(), serviceEndpoint.getServiceParameter());
1959 return new EndpointAddress(destStr.toString());