]> sjero.net Git - linphone/blob - p2pproxy/dependencies-src/jxse-src-2.5/impl/src/net/jxta/impl/endpoint/EndpointServiceImpl.java
d3db1125a844d06d9dfca83fd4df273897887cb5
[linphone] / p2pproxy / dependencies-src / jxse-src-2.5 / impl / src / net / jxta / impl / endpoint / EndpointServiceImpl.java
1 /*
2  * Copyright (c) 2001-2007 Sun Microsystems, Inc.  All rights reserved.
3  *  
4  *  The Sun Project JXTA(TM) Software License
5  *  
6  *  Redistribution and use in source and binary forms, with or without 
7  *  modification, are permitted provided that the following conditions are met:
8  *  
9  *  1. Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *  
12  *  2. Redistributions in binary form must reproduce the above copyright notice, 
13  *     this list of conditions and the following disclaimer in the documentation 
14  *     and/or other materials provided with the distribution.
15  *  
16  *  3. The end-user documentation included with the redistribution, if any, must 
17  *     include the following acknowledgment: "This product includes software 
18  *     developed by Sun Microsystems, Inc. for JXTA(TM) technology." 
19  *     Alternately, this acknowledgment may appear in the software itself, if 
20  *     and wherever such third-party acknowledgments normally appear.
21  *  
22  *  4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must 
23  *     not be used to endorse or promote products derived from this software 
24  *     without prior written permission. For written permission, please contact 
25  *     Project JXTA at http://www.jxta.org.
26  *  
27  *  5. Products derived from this software may not be called "JXTA", nor may 
28  *     "JXTA" appear in their name, without prior written permission of Sun.
29  *  
30  *  THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
31  *  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND 
32  *  FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN 
33  *  MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
34  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 
35  *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, 
36  *  OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
37  *  LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
38  *  NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, 
39  *  EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
40  *  
41  *  JXTA is a registered trademark of Sun Microsystems, Inc. in the United 
42  *  States and other countries.
43  *  
44  *  Please see the license information page at :
45  *  <http://www.jxta.org/project/www/license.html> for instructions on use of 
46  *  the license in source files.
47  *  
48  *  ====================================================================
49  *  
50  *  This software consists of voluntary contributions made by many individuals 
51  *  on behalf of Project JXTA. For more information on Project JXTA, please see 
52  *  http://www.jxta.org.
53  *  
54  *  This license is based on the BSD license adopted by the Apache Foundation. 
55  */
56 package net.jxta.impl.endpoint;
57
58 import net.jxta.discovery.DiscoveryService;
59 import net.jxta.document.Advertisement;
60 import net.jxta.document.AdvertisementFactory;
61 import net.jxta.document.MimeMediaType;
62 import net.jxta.document.StructuredDocument;
63 import net.jxta.document.StructuredDocumentFactory;
64 import net.jxta.document.StructuredDocumentUtils;
65 import net.jxta.document.XMLDocument;
66 import net.jxta.document.XMLElement;
67 import net.jxta.endpoint.ChannelMessenger;
68 import net.jxta.endpoint.EndpointAddress;
69 import net.jxta.endpoint.EndpointListener;
70 import net.jxta.endpoint.EndpointService;
71 import net.jxta.endpoint.Message;
72 import net.jxta.endpoint.MessageElement;
73 import net.jxta.endpoint.MessageFilterListener;
74 import net.jxta.endpoint.MessagePropagater;
75 import net.jxta.endpoint.MessageReceiver;
76 import net.jxta.endpoint.MessageSender;
77 import net.jxta.endpoint.MessageTransport;
78 import net.jxta.endpoint.Messenger;
79 import net.jxta.endpoint.MessengerEvent;
80 import net.jxta.endpoint.MessengerEventListener;
81 import net.jxta.endpoint.StringMessageElement;
82 import net.jxta.endpoint.ThreadedMessenger;
83 import net.jxta.exception.PeerGroupException;
84 import net.jxta.id.ID;
85 import net.jxta.impl.endpoint.endpointMeter.EndpointMeter;
86 import net.jxta.impl.endpoint.endpointMeter.EndpointMeterBuildSettings;
87 import net.jxta.impl.endpoint.endpointMeter.EndpointServiceMonitor;
88 import net.jxta.impl.endpoint.endpointMeter.InboundMeter;
89 import net.jxta.impl.endpoint.endpointMeter.OutboundMeter;
90 import net.jxta.impl.endpoint.endpointMeter.PropagationMeter;
91 import net.jxta.impl.endpoint.relay.RelayClient;
92 import net.jxta.impl.endpoint.router.EndpointRouter;
93 import net.jxta.impl.endpoint.tcp.TcpTransport;
94 import net.jxta.impl.meter.MonitorManager;
95 import net.jxta.impl.util.SequenceIterator;
96 import net.jxta.logging.Logging;
97 import net.jxta.meter.MonitorResources;
98 import net.jxta.peergroup.PeerGroup;
99 import net.jxta.platform.Module;
100 import net.jxta.protocol.AccessPointAdvertisement;
101 import net.jxta.protocol.ConfigParams;
102 import net.jxta.protocol.ModuleImplAdvertisement;
103 import net.jxta.protocol.PeerAdvertisement;
104 import net.jxta.protocol.RouteAdvertisement;
105
106 import java.io.IOException;
107 import java.lang.ref.Reference;
108 import java.lang.ref.SoftReference;
109 import java.lang.ref.WeakReference;
110 import java.util.ArrayList;
111 import java.util.Collection;
112 import java.util.Collections;
113 import java.util.Enumeration;
114 import java.util.HashMap;
115 import java.util.HashSet;
116 import java.util.Iterator;
117 import java.util.List;
118 import java.util.Map;
119 import java.util.Vector;
120 import java.util.WeakHashMap;
121 import java.util.logging.Level;
122 import java.util.logging.Logger;
123
124 /**
125  * This class implements the frontend for all the JXTA  endpoint protocols, as
126  * well as the API for the implementation of the core protocols that use
127  * directly the EndpointService. It theory it only needs to implement core methods.
128  * legacy or convenience methods should stay out. However, that would require
129  * a two-level interface for the service (internal and public). May be later.
130  */
131 public class EndpointServiceImpl implements EndpointService, MessengerEventListener {
132
133     /**
134      * Logger
135      */
136     private static final Logger LOG = Logger.getLogger(EndpointServiceImpl.class.getName());
137
138     // // constants ////
139
140     /**
141      * The Wire Message Format we will use by default.
142      */
143     public static final MimeMediaType DEFAULT_MESSAGE_TYPE = new MimeMediaType("application/x-jxta-msg").intern();
144
145     /**
146      * The name of this service.
147      */
148     public static final String ENDPOINTSERVICE_NAME = "EndpointService";
149
150     /**
151      * The Message empty namespace. This namespace is reserved for use by
152      * applications. It will not be used by core protocols.
153      */
154     public static final String MESSAGE_EMPTY_NS = "";
155
156     /**
157      * The Message "jxta" namespace. This namespace is reserved for use by
158      * core protocols. It will not be used by applications.
159      */
160     public static final String MESSAGE_JXTA_NS = "jxta";
161
162     /**
163      * Namespace in which the message source address will be placed.
164      */
165     public static final String MESSAGE_SOURCE_NS = MESSAGE_JXTA_NS;
166
167     /**
168      * Element name in which the message source address will be placed.
169      */
170     public static final String MESSAGE_SOURCE_NAME = "EndpointSourceAddress";
171
172     /**
173      * Namespace in which the message destination address will be placed.
174      */
175     public static final String MESSAGE_DESTINATION_NS = MESSAGE_JXTA_NS;
176
177     /**
178      * Element name in which the message destination address will be placed.
179      * This element is used for loopback detection during propagate. Only
180      * propagate messages currently contain this element.
181      */
182     public static final String MESSAGE_DESTINATION_NAME = "EndpointDestinationAddress";
183
184     /**
185      * Namespace in which the message source peer address will be placed.
186      */
187     public static final String MESSAGE_SRCPEERHDR_NS = MESSAGE_JXTA_NS;
188
189     /**
190      * Element name in which the message source peer address will be placed.
191      * This element is used for loopback detection during propagate. Only
192      * propagated messages currently contain this element.
193      */
194     public static final String MESSAGE_SRCPEERHDR_NAME = "EndpointHeaderSrcPeer";
195
196     /**
197      * Size of the message queue provided by virtual messengers.
198      */
199     private final static int DEFAULT_MESSAGE_QUEUE_SIZE = 20;
200
201     /**
202      * If {@code true} then the parent endpoint may be used for acquiring
203      * messengers and for registering listeners.
204      */
205     private final static boolean DEFAULT_USE_PARENT_ENDPOINT = true;
206
207     EndpointServiceMonitor endpointServiceMonitor;
208
209     /**
210      * the EndpointMeter
211      */
212     private EndpointMeter endpointMeter;
213     private PropagationMeter propagationMeter;
214
215     /**
216      * If {@code true} then this service has been initialized.
217      */
218     private boolean initialized = false;
219
220     /**
221      * tunable: the virtual messenger queue size
222      */
223     private int vmQueueSize = DEFAULT_MESSAGE_QUEUE_SIZE;
224
225     private PeerGroup group = null;
226     private ID assignedID = null;
227     private ModuleImplAdvertisement implAdvertisement = null;
228
229     private String localPeerId = null;
230     private boolean useParentEndpoint = DEFAULT_USE_PARENT_ENDPOINT;
231     private EndpointService parentEndpoint = null;
232     private String myServiceName = null;
233
234     /**
235      * The Message Transports which are registered for this endpoint. This is
236      * only the message transport registered locally, it does not include
237      * transports which are used from other groups.
238      */
239     private final Collection<MessageTransport> messageTransports = new HashSet<MessageTransport>();
240
241     /**
242      * Passive listeners for messengers. Three priorities, so far.
243      */
244     private final Collection[] passiveMessengerListeners = {
245             Collections.synchronizedList(new ArrayList<MessengerEventListener>()),
246             Collections.synchronizedList(new ArrayList<MessengerEventListener>()),
247             Collections.synchronizedList(new ArrayList<MessengerEventListener>())
248     };
249
250     /**
251      * The set of listener managed by this instance of the endpoint svc.
252      */
253     private final Map<String, EndpointListener> incomingMessageListeners = new HashMap<String, EndpointListener>(16);
254
255     /**
256      * The set of shared transport messengers currently ready for use.
257      */
258     private final Map<EndpointAddress, Reference<Messenger>> messengerMap = new WeakHashMap<EndpointAddress, Reference<Messenger>>(32);
259     /**
260      * The set of shared transport messengers currently ready for use.
261      */
262     private final Map<EndpointAddress, Reference<Messenger>> directMessengerMap = new WeakHashMap<EndpointAddress, Reference<Messenger>>(32);
263
264     /**
265      * The filter listeners.
266      * <p/>
267      * We rarely add/remove, never remove without iterating
268      * and insert objects that are always unique. So using a set
269      * does not make sense. An array list is the best.
270      */
271     private final Collection<FilterListenerAndMask> incomingFilterListeners = new ArrayList<FilterListenerAndMask>();
272     private final Collection<FilterListenerAndMask> outgoingFilterListeners = new ArrayList<FilterListenerAndMask>();
273
274     /**
275      * Holder for a filter listener and its conditions
276      */
277     private static class FilterListenerAndMask {
278         final String namespace;
279         final String name;
280         final MessageFilterListener listener;
281
282         public FilterListenerAndMask(MessageFilterListener listener, String namespace, String name) {
283             this.namespace = namespace;
284             this.name = name;
285             this.listener = listener;
286         }
287
288         @Override
289         public boolean equals(Object target) {
290             if (this == target) {
291                 return true;
292             }
293
294             if (target instanceof FilterListenerAndMask) {
295                 FilterListenerAndMask likeMe = (FilterListenerAndMask) target;
296
297                 boolean result = (null != namespace) ? (namespace.equals(likeMe.namespace)) : (null == likeMe.namespace);
298
299                 result &= (null != name) ? (name.equals(likeMe.name)) : (null == likeMe.name);
300                 result &= (listener == likeMe.listener);
301
302                 return result;
303             }
304
305             return false;
306         }
307
308         /**
309          * {@inheritDoc}
310          * <p/>
311          * Added to make PMD shut up....
312          */
313         @Override
314         public int hashCode() {
315             return System.identityHashCode(this);
316         }
317     }
318
319
320     /**
321      * A non blocking messenger that obtains a backing (possibly blocking)
322      * messenger on-demand.
323      */
324     private class CanonicalMessenger extends ThreadedMessenger {
325
326         /**
327          * If the hint was not used because there already was a transport
328          * messenger available, then it is saved here for the next time we are
329          * forced to create a new transport messenger by the breakage of the one
330          * that's here.
331          * <p/>
332          * The management of hints is a bit inconsistent for now: the hint
333          * used may be different dependent upon which invocation created the
334          * current canonical messenger and, although we try to use the hint only
335          * once (to avoid carrying an invalid hint forever) it may happen that a
336          * hint is used long after it was suggested.
337          */
338         Object hint;
339
340         /**
341          * The transport messenger that this canonical messenger currently uses.
342          */
343         Messenger cachedMessenger = null;
344
345         /**
346          * Create a new CanonicalMessenger.
347          *
348          * @param vmQueueSize        queue size
349          * @param destination        destination who messages should be addressed to
350          * @param logicalDestination logical destination
351          * @param hint               route hint
352          * @param messengerMeter     the metering object if any
353          */
354         public CanonicalMessenger(int vmQueueSize, EndpointAddress destination, EndpointAddress logicalDestination, Object hint, OutboundMeter messengerMeter) {
355             super(group.getPeerGroupID(), destination, logicalDestination, vmQueueSize);
356             this.hint = hint;
357         }
358
359         /**
360          * close this canonical messenger.
361          */
362         @Override
363         public void close() {
364             // No way. Not form the outside.
365         }
366
367         /**
368          * Drop the current messenger.
369          */
370         @Override
371         protected void closeImpl() {
372             if (cachedMessenger != null) {
373                 cachedMessenger.close();
374                 cachedMessenger = null;
375             } else {
376                 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
377                     LOG.severe("Internal messenger error: close requested while not connected.");
378                 }
379             }
380         }
381
382         /**
383          * Get a transport messenger to the destination.
384          * <p/>
385          * FIXME 20040413 jice : Do better hint management.
386          */
387         @Override
388         protected boolean connectImpl() {
389             if (cachedMessenger != null) {
390                 if ((cachedMessenger.getState() & Messenger.TERMINAL) != 0) {
391                     if (Logging.SHOW_FINE && LOG.isLoggable(Level.SEVERE)) {
392                         LOG.fine("Closing TERMINAL internal messenger : attempting requested connect.");
393                     }
394                     cachedMessenger.close();
395                     cachedMessenger = null;
396                 } else {
397                     return true;
398                 }
399             }
400
401             // Consume the hint, if any.
402             Object theHint = hint;
403
404             hint = null;
405             cachedMessenger = getLocalTransportMessenger(getDestinationAddress(), theHint);
406
407             if (cachedMessenger == null) {
408                 return false;
409             }
410
411             // FIXME 20040413 jice : it's not too clean: we assume
412             // that all transports use BlockingMessenger as the base class for
413             // their messengers. If they don't we can't force them to hold the
414             // strong reference to the canonical messenger.
415             try {
416                 ((BlockingMessenger) cachedMessenger).setOwner(this);
417             } catch (ClassCastException cce) {
418                 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
419                     LOG.severe("Transport messengers must all extend BlockingMessenger for now. " +
420                             cachedMessenger + " may remain open beyond its use.");
421                 }
422             }
423             return true;
424         }
425
426         /**
427          * {@inheritDoc}
428          */
429         @Override
430         protected EndpointAddress getLogicalDestinationImpl() {
431             if (cachedMessenger == null) {
432                 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
433                     LOG.severe("Internal messenger error: logical destination requested while not connected.");
434                 }
435                 return null;
436             }
437             return cachedMessenger.getLogicalDestinationAddress();
438         }
439
440         /**
441          * {@inheritDoc}
442          */
443         @Override
444         protected void sendMessageBImpl(Message msg, String service, String param) throws IOException {
445             if (cachedMessenger == null) {
446                 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
447                     LOG.severe("Internal messenger error: send requested while not connected.");
448                 }
449                 throw new IOException("Internal messenger error.");
450             }
451
452             try {
453                 cachedMessenger.sendMessageB(msg, service, param);
454             } catch (IOException any) {
455                 cachedMessenger = null;
456                 throw any;
457             } catch (RuntimeException any) {
458                 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
459                     LOG.log(Level.WARNING, "Failure sending " + msg, any);
460                 }
461
462                 throw any;
463             }
464         }
465     }
466
467     /**
468      * Create a new EndpointService.
469      */
470     public EndpointServiceImpl() {
471     }
472
473     /**
474      * {@inheritDoc}
475      */
476     public synchronized void init(PeerGroup group, ID assignedID, Advertisement impl) throws PeerGroupException {
477         if (initialized) {
478             throw new PeerGroupException("Cannot initialize service more than once");
479         }
480
481         this.group = group;
482         // The selector for the element of the peer adv params that we have to update.
483         this.assignedID = assignedID;
484         this.implAdvertisement = (ModuleImplAdvertisement) impl;
485
486         this.localPeerId = group.getPeerID().toString();
487
488         this.myServiceName = ChannelMessenger.InsertedServicePrefix + group.getPeerGroupID().getUniqueValue().toString();
489
490         ConfigParams confAdv = group.getConfigAdvertisement();
491         XMLElement paramBlock = null;
492
493         if (confAdv != null) {
494             paramBlock = (XMLElement) confAdv.getServiceParam(assignedID);
495         }
496
497         if (paramBlock != null) {
498             // get our two tunables: virtual messenger queue size, and whether to use the parent endpoint
499             Enumeration param;
500
501             param = paramBlock.getChildren("MessengerQueueSize");
502             if (param.hasMoreElements()) {
503                 String textQSz = ((XMLElement) param.nextElement()).getTextValue();
504
505                 try {
506                     Integer requestedSize = Integer.parseInt(textQSz.trim());
507
508                     if (requestedSize > 0) {
509                         vmQueueSize = requestedSize;
510                     } else {
511                         LOG.warning("Illegal MessengerQueueSize : " + textQSz);
512                     }
513                 } catch (NumberFormatException e) {
514                     if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
515                         LOG.log(Level.WARNING, "could not parse MessengerQueueSize string", e);
516                     }
517                 }
518             }
519
520             param = paramBlock.getChildren("UseParentEndpoint");
521             if (param.hasMoreElements()) {
522                 String textUPE = ((XMLElement) param.nextElement()).getTextValue();
523
524                 useParentEndpoint = textUPE.trim().equalsIgnoreCase("true");
525             }
526
527         }
528
529         PeerGroup parentGroup = group.getParentGroup();
530
531         if (useParentEndpoint && parentGroup != null) {
532             parentEndpoint = parentGroup.getEndpointService();
533             parentEndpoint.addMessengerEventListener(this, EndpointService.LowPrecedence);
534         }
535
536         initialized = true;
537
538         if (Logging.SHOW_CONFIG && LOG.isLoggable(Level.CONFIG)) {
539             StringBuilder configInfo = new StringBuilder("Configuring Endpoint Service : " + assignedID);
540
541             if (implAdvertisement != null) {
542                 configInfo.append("\n\tImplementation :");
543                 configInfo.append("\n\t\tModule Spec ID: ");
544                 configInfo.append(implAdvertisement.getModuleSpecID());
545                 configInfo.append("\n\t\tImpl Description : ").append(implAdvertisement.getDescription());
546                 configInfo.append("\n\t\tImpl URI : ").append(implAdvertisement.getUri());
547                 configInfo.append("\n\t\tImpl Code : ").append(implAdvertisement.getCode());
548             }
549
550             configInfo.append("\n\tGroup Params :");
551             configInfo.append("\n\t\tGroup : ").append(group);
552             configInfo.append("\n\t\tPeer ID : ").append(group.getPeerID());
553
554             configInfo.append("\n\tConfiguration :");
555             if (null == parentGroup) {
556                 configInfo.append("\n\t\tHome Group : (none)");
557             } else {
558                 configInfo.append("\n\t\tHome Group : ").append(parentGroup.getPeerGroupName()).append(" / ").append(
559                         parentGroup.getPeerGroupID());
560             }
561             configInfo.append("\n\t\tUsing home group endpoint : ").append(parentEndpoint);
562             configInfo.append("\n\t\tVirtual Messenger Queue Size : ").append(vmQueueSize);
563             LOG.config(configInfo.toString());
564         }
565     }
566
567     /**
568      * {@inheritDoc}
569      */
570     public int startApp(String[] args) {
571         if (!initialized) {
572             return -1;
573         }
574
575         // FIXME  when Load order Issue is resolved this should fail
576         // until it is able to get a non-failing service Monitor (or
577         // null = not monitoring) 
578         // FIXME it is ok because of the hack in StdPeerGroup that starts 
579         // endpoint service first
580         if (EndpointMeterBuildSettings.ENDPOINT_METERING) { // Fix-Me: Move to startApp() when load order issue is resolved
581             endpointServiceMonitor = (EndpointServiceMonitor) MonitorManager.getServiceMonitor(group, MonitorResources.endpointServiceMonitorClassID);
582
583             if (endpointServiceMonitor != null) {
584                 endpointMeter = endpointServiceMonitor.getEndpointMeter();
585             }
586         }
587
588         if (parentEndpoint != null) {
589             Iterator<MessageTransport> parentMTs = parentEndpoint.getAllMessageTransports();
590
591             synchronized (this) {
592                 while (parentMTs.hasNext()) {
593                     addProtoToAdv(parentMTs.next());
594                 }
595             }
596         }
597
598         if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
599             LOG.info("Endpoint Service started.");
600         }
601
602         return Module.START_OK;
603     }
604
605     /**
606      * {@inheritDoc}
607      * <p/>
608      * The transports and services are going to be stopped as well. When
609      * they are, they will dereference us and we'll go into oblivion.
610      */
611     public void stopApp() {
612         if (parentEndpoint != null) {
613             parentEndpoint.removeMessengerEventListener(this, EndpointService.LowPrecedence);
614         }
615
616         // Clear up the passiveMessengersListeners
617         int prec = EndpointService.HighPrecedence;
618         while (prec >= EndpointService.LowPrecedence) {
619             passiveMessengerListeners[prec--].clear();
620         }
621
622         // Clear up any messengers.
623         messengerMap.clear();
624         directMessengerMap.clear();
625
626         // Clear up the listeners
627         incomingMessageListeners.clear();
628
629         // Forget about any message filters.
630         incomingFilterListeners.clear();
631         outgoingFilterListeners.clear();
632
633         // Forget any message transports
634         messageTransports.clear();
635
636         // Avoid cross-reference problems with the GC
637
638         // group = null;
639         // parentEndpoint = null;
640         // parentGroup = null;
641
642         // The above is not really needed and until we have a very orderly
643         // shutdown, it causes NPEs that are hard to prevent.
644
645         if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
646             LOG.info("Endpoint Service stopped.");
647         }
648     }
649
650     /**
651      * {@inheritDoc}
652      */
653     public PeerGroup getGroup() {
654         return group;
655     }
656
657     /**
658      * {@inheritDoc}
659      * <p/>
660      * We create a new instance each time because our interface actually
661      * has state (channel messengers and listener callback adaptor).
662      */
663     public EndpointService getInterface() {
664         return new EndpointServiceInterface(this);
665     }
666
667     /**
668      * {@inheritDoc}
669      */
670     public ModuleImplAdvertisement getImplAdvertisement() {
671         return implAdvertisement;
672     }
673
674     // A vector for statistics between propagateThroughAll and its invoker.
675     private static class Metrics {
676         int numFilteredOut = 0;
677         int numPropagatedTo = 0;
678         int numErrorsPropagated = 0;
679     }
680
681     private void propagateThroughAll(Iterator<MessageTransport> eachProto, Message myMsg, String serviceName, String serviceParam, int initialTTL, Metrics metrics) {
682
683         Message filtered = null;
684
685         while (eachProto.hasNext()) {
686             MessageTransport aTransport = eachProto.next();
687
688             try {
689                 if (!(aTransport instanceof MessagePropagater)) {
690                     continue;
691                 }
692
693                 MessagePropagater propagater = (MessagePropagater) aTransport;
694
695                 if (null == filtered) {
696                     // run process filters only once
697                     filtered = processFilters(myMsg,
698                             propagater.getPublicAddress(),
699                             new EndpointAddress(group.getPeerGroupID(), serviceName, serviceParam),
700                             false);
701                 }
702
703                 if (null == filtered) {
704                     if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
705                         LOG.fine("   message " + myMsg + " discarded upon filter decision");
706                     }
707
708                     if (EndpointMeterBuildSettings.ENDPOINT_METERING) {
709                         metrics.numFilteredOut++;
710                     }
711                     break;
712                 }
713
714                 propagater.propagate(filtered.clone(), serviceName, serviceParam, initialTTL);
715
716                 if (EndpointMeterBuildSettings.ENDPOINT_METERING) {
717                     metrics.numPropagatedTo++;
718                 }
719             } catch (Exception e) {
720                 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
721                     LOG.log(Level.WARNING, "Failed propagating message " + filtered + " on message transport " + aTransport, e);
722                 }
723
724                 if (EndpointMeterBuildSettings.ENDPOINT_METERING) {
725                     metrics.numErrorsPropagated++;
726                 }
727             }
728         }
729     }
730
731     /**
732      * {@inheritDoc}
733      */
734     public void propagate(Message msg, String serviceName, String serviceParam) {
735         propagate(msg, serviceName, serviceParam, Integer.MAX_VALUE);
736     }
737
738     /**
739      * {@inheritDoc}
740      */
741     public void propagate(Message msg, String serviceName, String serviceParam, int initialTTL) {
742         long startPropagationTime = 0;
743
744         if (null == serviceName) {
745             throw new IllegalArgumentException("serviceName may not be null");
746         }
747
748         Metrics metrics = null;
749
750         if (EndpointMeterBuildSettings.ENDPOINT_METERING) {
751             metrics = new Metrics();
752         }
753
754         // Keep the orig unchanged for metering reference and caller's benefit, but
755         // we are forced to clone it here, because we add a header.
756         msg = msg.clone();
757
758         if (EndpointMeterBuildSettings.ENDPOINT_METERING) {
759             startPropagationTime = System.currentTimeMillis();
760         }
761
762         // Add our header.
763         MessageElement srcHdrElement = new StringMessageElement(EndpointServiceImpl.MESSAGE_SRCPEERHDR_NAME, localPeerId, null);
764
765         msg.replaceMessageElement(EndpointServiceImpl.MESSAGE_SRCPEERHDR_NS, srcHdrElement);
766
767         // Do the local transports with the plain address.
768         Iterator<MessageTransport> eachProto = getAllLocalTransports();
769
770         propagateThroughAll(eachProto, msg.clone(), serviceName, serviceParam, initialTTL, metrics);
771
772         // Do the parent transports with a mangled address.
773         if (parentEndpoint != null) {
774             eachProto = parentEndpoint.getAllMessageTransports();
775
776             StringBuilder mangled = new StringBuilder(serviceName);
777             if (null != serviceParam) {
778                 mangled.append('/');
779                 mangled.append(serviceParam);
780             }
781
782             propagateThroughAll(eachProto, msg.clone(), myServiceName, mangled.toString(), initialTTL, metrics);
783         }
784
785         if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointServiceMonitor != null)) {
786             PropagationMeter propagationMeter = endpointServiceMonitor.getPropagationMeter(serviceName, serviceParam);
787
788             propagationMeter.registerPropagateMessageStats(metrics.numPropagatedTo, metrics.numFilteredOut, metrics.numErrorsPropagated,
789                     System.currentTimeMillis() - startPropagationTime);
790         }
791     }
792
793     /**
794      * Process the filters for this message.
795      */
796     private Message processFilters(Message message, EndpointAddress srcAddress, EndpointAddress dstAddress, boolean incoming) {
797
798         Iterator<FilterListenerAndMask> eachFilter = incoming
799                 ? incomingFilterListeners.iterator()
800                 : outgoingFilterListeners.iterator();
801
802         while (eachFilter.hasNext()) {
803             FilterListenerAndMask aFilter = eachFilter.next();
804
805             Message.ElementIterator eachElement = message.getMessageElements();
806
807             while (eachElement.hasNext()) {
808                 MessageElement anElement = eachElement.next();
809
810                 if ((null != aFilter.namespace) && (!aFilter.namespace.equals(eachElement.getNamespace()))) {
811                     continue;
812                 }
813
814                 if ((null != aFilter.name) && (!aFilter.name.equals(anElement.getElementName()))) {
815                     continue;
816                 }
817
818                 message = aFilter.listener.filterMessage(message, srcAddress, dstAddress);
819
820                 if (null == message) {
821                     return null;
822                 }
823             }
824         }
825
826         // If we got here, no filter has rejected the message. Keep processing it.
827         return message;
828     }
829
830     private static EndpointAddress demangleAddress(EndpointAddress mangled) {
831         String serviceName = mangled.getServiceName();
832
833         if (null == serviceName) {
834             // not a mangled address
835             return mangled;
836         }
837
838         if (!serviceName.startsWith(ChannelMessenger.InsertedServicePrefix)) {
839             // not a mangled address
840             return mangled;
841         }
842
843         String serviceParam = mangled.getServiceParameter();
844
845         if (null == serviceParam) {
846             // it has no param, its a null destination.
847             // XXX bondolo 20050907 I'm not sure this is correct.
848             return new EndpointAddress(mangled, null, null);
849         }
850
851         int slashAt = serviceParam.indexOf('/');
852
853         if (-1 == slashAt) {
854             // param has no param portion.
855             return new EndpointAddress(mangled, serviceParam, null);
856         }
857
858         return new EndpointAddress(mangled, serviceParam.substring(0, slashAt), serviceParam.substring(slashAt + 1));
859     }
860
861     /**
862      * {@inheritDoc}
863      */
864     public void processIncomingMessage(Message msg, EndpointAddress srcAddress, EndpointAddress dstAddress) {
865
866         // check for propagate loopback.
867         MessageElement srcPeerElement = msg.getMessageElement(EndpointServiceImpl.MESSAGE_SRCPEERHDR_NS, EndpointServiceImpl.MESSAGE_SRCPEERHDR_NAME);
868
869         if (null != srcPeerElement) {
870             msg.removeMessageElement(srcPeerElement);
871             String srcPeer = srcPeerElement.toString();
872
873             if (localPeerId.equals(srcPeer)) {
874                 // This is a loopback. Discard.
875                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
876                     LOG.fine(msg + " is a propagate loopback. Discarded");
877                 }
878
879                 if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) {
880                     endpointMeter.discardedLoopbackDemuxMessage();
881                 }
882
883                 return;
884             }
885         }
886
887         if (null == srcAddress) {
888             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
889                 LOG.warning("null src address, discarding message " + msg);
890             }
891
892             if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) {
893                 endpointMeter.invalidIncomingMessage();
894             }
895
896             return;
897         }
898
899         if (null == dstAddress) {
900             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
901                 LOG.warning("null destination address, discarding message " + msg);
902             }
903
904             if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) {
905                 endpointMeter.invalidIncomingMessage();
906             }
907
908             return;
909         }
910
911         // Decode the destination address.
912         // We want:
913         // 1 - a version of the address that does not have the grp redirection.
914         // 2 - a version of the serviceName that includes BOTH the group redirection and the original service name.
915         // 3 - the original service param; without the original service name stuck to it.
916         // So, basically we want the original serviceName part stuck to the group mangling, not stuck to the original
917         // serviceParam. We do that by cut/pasting from both the mangled and demangled versions of the address.
918
919         EndpointAddress demangledAddress = demangleAddress(dstAddress);
920         String decodedServiceName = demangledAddress.getServiceName();
921         String decodedServiceParam = demangledAddress.getServiceParameter();
922
923         if ((null == decodedServiceName) || (0 == decodedServiceName.length())) {
924             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
925                 LOG.warning("dest serviceName must not be null, discarding message " + msg);
926             }
927
928             if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) {
929                 endpointMeter.invalidIncomingMessage();
930             }
931
932             return;
933         }
934
935         // Do filters for this message:
936         // FIXME - jice 20040417 : filters are likely broken, now. They do not see messages
937         // from xports in parent groups.  For those messages that are seen, demangled address seems to be the useful one.
938         msg = processFilters(msg, srcAddress, demangledAddress, true);
939
940         // If processFilters retuns null, the message is to be discarded.
941         if (msg == null) {
942             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
943                 LOG.fine("Message discarded during filter processing");
944             }
945
946             if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) {
947                 endpointMeter.incomingMessageFilteredOut();
948             }
949
950             return;
951         }
952
953         // Now that we know the original service name is valid, finish building the decoded version.
954         if (demangledAddress != dstAddress) {
955             decodedServiceName = dstAddress.getServiceName() + "/" + decodedServiceName;
956         }
957
958         // Look up the listener
959         EndpointListener listener = getIncomingMessageListener(decodedServiceName, decodedServiceParam);
960
961         // No listener? oh well.
962
963         if (listener == null) {
964             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
965                 LOG.warning("No listener for \'" + dstAddress + "\' in group " +
966                         group + "\n\tdecodedServiceName :" +
967                         decodedServiceName + "\tdecodedServiceParam :" +
968                         decodedServiceParam);
969             }
970
971             if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) {
972                 endpointMeter.noListenerForIncomingMessage();
973             }
974
975             return; // noone cares for this message
976         }
977
978         // call the listener
979
980         try {
981             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
982                 if (null != decodedServiceParam) {
983                     LOG.fine("Calling listener for \'" + decodedServiceName + "/" + decodedServiceParam + "\' with " + msg);
984                 } else {
985                     LOG.fine("Calling listener for \'" + decodedServiceName + "\' with " + msg);
986                 }
987             }
988
989             listener.processIncomingMessage(msg, srcAddress, demangledAddress);
990
991             if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) {
992                 endpointMeter.incomingMessageSentToEndpointListener();
993             }
994
995             if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) {
996                 endpointMeter.demuxMessageProcessed();
997             }
998         } catch (Throwable all) {
999             if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
1000                 LOG.log(Level.SEVERE, "Uncaught throwable from listener for " + dstAddress, all);
1001             }
1002
1003             if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) {
1004                 endpointMeter.errorProcessingIncomingMessage();
1005             }
1006         }
1007     }
1008
1009     /**
1010      * {@inheritDoc}
1011      */
1012     public void demux(Message msg) {
1013
1014         // Get the message destination
1015         MessageElement dstAddressElement = msg.getMessageElement(EndpointServiceImpl.MESSAGE_DESTINATION_NS,
1016                 EndpointServiceImpl.MESSAGE_DESTINATION_NAME);
1017
1018         if (null == dstAddressElement) {
1019             // No destination address... Just discard
1020             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1021                 LOG.warning(msg + " has no destination address. Discarded");
1022             }
1023
1024             if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) {
1025                 endpointMeter.noDestinationAddressForDemuxMessage();
1026             }
1027
1028             return;
1029         }
1030
1031         msg.removeMessageElement(dstAddressElement);
1032         EndpointAddress dstAddress = new EndpointAddress(dstAddressElement.toString());
1033
1034         // Get the message source
1035         MessageElement srcAddressElement = msg.getMessageElement(EndpointServiceImpl.MESSAGE_SOURCE_NS, EndpointServiceImpl.MESSAGE_SOURCE_NAME);
1036
1037         if (null == srcAddressElement) {
1038             // No src address... Just discard
1039             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1040                 LOG.warning(msg + " has no source address. Discarded");
1041             }
1042
1043             if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointMeter != null)) {
1044                 endpointMeter.noSourceAddressForDemuxMessage();
1045             }
1046
1047             return;
1048         }
1049
1050         msg.removeMessageElement(srcAddressElement);
1051         EndpointAddress msgScrAddress = new EndpointAddress(srcAddressElement.toString());
1052
1053         processIncomingMessage(msg, msgScrAddress, dstAddress);
1054     }
1055
1056     /**
1057      * {@inheritDoc}
1058      */
1059     public MessengerEventListener addMessageTransport(MessageTransport transpt) {
1060
1061         synchronized (messageTransports) {
1062             // check if it is already installed.
1063             if (!messageTransports.contains(transpt)) {
1064
1065                 clearProtoFromAdv(transpt); // just to be safe
1066                 messageTransports.add(transpt);
1067                 addProtoToAdv(transpt);
1068
1069                 // FIXME: For now, we return this. Later we might return something else, so that we can take
1070                 // advantage of the fact that we know that the event is from a local transport.
1071                 // That will help cleaning up the incoming messenger mess.
1072                 return this;
1073             }
1074         }
1075
1076         return null;
1077     }
1078
1079     /**
1080      * {@inheritDoc}
1081      */
1082     public boolean removeMessageTransport(MessageTransport transpt) {
1083
1084         boolean removed;
1085
1086         synchronized (messageTransports) {
1087             removed = messageTransports.remove(transpt);
1088         }
1089
1090         if (removed) {
1091             clearProtoFromAdv(transpt);
1092         }
1093
1094         return removed;
1095     }
1096
1097     /**
1098      * {@inheritDoc}
1099      */
1100     public Iterator<MessageTransport> getAllMessageTransports() {
1101         if (null != parentEndpoint) {
1102             return new SequenceIterator(getAllLocalTransports(), parentEndpoint.getAllMessageTransports());
1103         } else {
1104             return getAllLocalTransports();
1105         }
1106     }
1107
1108     /**
1109      * {@inheritDoc}
1110      */
1111     public MessageTransport getMessageTransport(String name) {
1112         Iterator<MessageTransport> allTransports = getAllMessageTransports();
1113
1114         while (allTransports.hasNext()) {
1115             MessageTransport transpt = allTransports.next();
1116
1117             if (transpt.getProtocolName().equals(name)) {
1118                 return transpt;
1119             }
1120         }
1121
1122         return null;
1123     }
1124
1125     private void addProtoToAdv(MessageTransport proto) {
1126
1127         boolean relay = false;
1128
1129         try {
1130             if (!(proto instanceof MessageReceiver)) {
1131                 return;
1132             }
1133
1134             // no value to publish for the router endpoint address
1135             if (proto instanceof EndpointRouter) {
1136                 // register the corresponding group to relay connection events
1137                 addActiveRelayListener(group);
1138                 return;
1139             }
1140
1141             // register this group to Relay connection events
1142             if (proto instanceof RelayClient) {
1143                 relay = true;
1144                 ((RelayClient) proto).addActiveRelayListener(group);
1145             }
1146
1147             // get the list of addresses
1148             Iterator<EndpointAddress> allAddresses = ((MessageReceiver) proto).getPublicAddresses();
1149             Vector<String> ea = new Vector<String>();
1150
1151             while (allAddresses.hasNext()) {
1152                 EndpointAddress anEndpointAddress = allAddresses.next();
1153
1154                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1155                     LOG.fine("Adding endpoint address to route advertisement : " + anEndpointAddress);
1156                 }
1157
1158                 ea.add(anEndpointAddress.toString());
1159             }
1160
1161             PeerAdvertisement padv = group.getPeerAdvertisement();
1162             StructuredDocument myParam = padv.getServiceParam(assignedID);
1163
1164             RouteAdvertisement route = null;
1165
1166             if (myParam != null) {
1167                 Enumeration paramChilds = myParam.getChildren(RouteAdvertisement.getAdvertisementType());
1168
1169                 if (paramChilds.hasMoreElements()) {
1170                     // we have an advertisement just add the new access points
1171                     XMLElement param = (XMLElement) paramChilds.nextElement();
1172
1173                     route = (RouteAdvertisement) AdvertisementFactory.newAdvertisement(param);
1174                     route.addDestEndpointAddresses(ea);
1175                     if (relay) {
1176                         // need to add the relay info if we have some
1177                         Vector<AccessPointAdvertisement> hops = ((RelayClient) proto).getActiveRelays(group);
1178
1179                         if (!hops.isEmpty()) {
1180                             route.setHops(hops);
1181                         }
1182                     }
1183                 }
1184             }
1185
1186             if (null == route) {
1187                 // None yet, so create a new Route Advertisement
1188                 // create the RouteAdvertisement that will contain the route to
1189                 // the peer. At this point we only know the peer endpoint addresses
1190                 // no hops are known
1191
1192                 // create the destination access point
1193                 AccessPointAdvertisement destAP = (AccessPointAdvertisement) AdvertisementFactory.newAdvertisement(
1194                         AccessPointAdvertisement.getAdvertisementType());
1195
1196                 destAP.setPeerID(group.getPeerID());
1197                 destAP.setEndpointAddresses(ea);
1198
1199                 // create the route advertisement
1200                 route = (RouteAdvertisement) AdvertisementFactory.newAdvertisement(RouteAdvertisement.getAdvertisementType());
1201                 route.setDest(destAP);
1202
1203                 if (relay) {
1204                     // need to add the relay info if we have some
1205                     Vector<AccessPointAdvertisement> hops = ((RelayClient) proto).getActiveRelays(group);
1206
1207                     if (!hops.isEmpty()) {
1208                         route.setHops(hops);
1209                     }
1210                 }
1211             }
1212
1213             // create the param route
1214             XMLDocument newParam = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8, "Parm");
1215             XMLDocument xptDoc = (XMLDocument) route.getDocument(MimeMediaType.XMLUTF8);
1216
1217             StructuredDocumentUtils.copyElements(newParam, newParam, xptDoc);
1218
1219             padv.putServiceParam(assignedID, newParam);
1220
1221             // publish the new advertisement
1222             DiscoveryService discovery = group.getDiscoveryService();
1223
1224             if (discovery != null) {
1225                 discovery.publish(padv, DiscoveryService.INFINITE_LIFETIME, DiscoveryService.DEFAULT_EXPIRATION);
1226             }
1227         } catch (Exception ex) {
1228             if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
1229                 LOG.log(Level.SEVERE, "Exception adding message transport ", ex);
1230             }
1231         }
1232     }
1233
1234     private void clearProtoFromAdv(MessageTransport transpt) {
1235
1236         try {
1237             if (!(transpt instanceof MessageReceiver)) {
1238                 return;
1239             }
1240
1241             // no value to publish the router endpoint address
1242             if (transpt instanceof EndpointRouter) {
1243                 // register the corresponding group in the relay
1244                 removeActiveRelayListener(group);
1245                 return;
1246             }
1247
1248             // register this group to Relay connection events
1249             if (transpt instanceof RelayClient) {
1250                 ((RelayClient) transpt).removeActiveRelayListener(group);
1251             }
1252
1253             Iterator<EndpointAddress> allAddresses = ((MessageReceiver) transpt).getPublicAddresses();
1254             Vector<String> ea = new Vector<String>();
1255
1256             while (allAddresses.hasNext()) {
1257                 EndpointAddress anEndpointAddress = allAddresses.next();
1258
1259                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1260                     LOG.fine("Removing endpoint address from route advertisement : " + anEndpointAddress);
1261                 }
1262
1263                 ea.add(anEndpointAddress.toString());
1264             }
1265
1266             PeerAdvertisement padv = group.getPeerAdvertisement();
1267             XMLDocument myParam = (XMLDocument) padv.getServiceParam(assignedID);
1268
1269             if (myParam == null) {
1270                 return;
1271             }
1272
1273             Enumeration paramChilds = myParam.getChildren(RouteAdvertisement.getAdvertisementType());
1274
1275             if (!paramChilds.hasMoreElements()) {
1276                 return;
1277             }
1278
1279             XMLElement param = (XMLElement) paramChilds.nextElement();
1280
1281             RouteAdvertisement route = (RouteAdvertisement) AdvertisementFactory.newAdvertisement(param);
1282
1283             route.removeDestEndpointAddresses(ea);
1284
1285             // update the new route to a new parm structure.
1286             XMLDocument newParam = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8, "Parm");
1287
1288             XMLDocument xptDoc = (XMLDocument) route.getDocument(MimeMediaType.XMLUTF8);
1289
1290             StructuredDocumentUtils.copyElements(newParam, newParam, xptDoc);
1291
1292             // put the parms back.
1293             padv.putServiceParam(assignedID, newParam);
1294
1295             // publish the new advertisement
1296             DiscoveryService discovery = group.getDiscoveryService();
1297
1298             if (discovery != null) {
1299                 discovery.publish(padv, DiscoveryService.INFINITE_LIFETIME, DiscoveryService.DEFAULT_EXPIRATION);
1300             }
1301         } catch (Exception ex) {
1302             if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
1303                 LOG.log(Level.SEVERE, "Exception removing messsage transport ", ex);
1304             }
1305         }
1306     }
1307
1308     /**
1309      * {@inheritDoc}
1310      */
1311     public boolean addMessengerEventListener(MessengerEventListener listener, int prio) {
1312         int priority = prio;
1313
1314         if (priority > EndpointService.HighPrecedence) {
1315             priority = EndpointService.HighPrecedence;
1316         }
1317
1318         if (priority < EndpointService.LowPrecedence) {
1319             priority = EndpointService.LowPrecedence;
1320         }
1321
1322         return passiveMessengerListeners[priority].add(listener);
1323     }
1324
1325     /**
1326      * {@inheritDoc}
1327      */
1328     public boolean removeMessengerEventListener(MessengerEventListener listener, int prio) {
1329         int priority = prio;
1330
1331         if (priority > EndpointService.HighPrecedence) {
1332             priority = EndpointService.HighPrecedence;
1333         }
1334         if (priority < EndpointService.LowPrecedence) {
1335             priority = EndpointService.LowPrecedence;
1336         }
1337
1338         return passiveMessengerListeners[priority].remove(listener);
1339     }
1340
1341     /**
1342      * {@inheritDoc}
1343      */
1344     public boolean addIncomingMessageListener(EndpointListener listener, String serviceName, String serviceParam) {
1345
1346         if (null == listener) {
1347             throw new IllegalArgumentException("EndpointListener must be non-null");
1348         }
1349
1350         if (null == serviceName) {
1351             throw new IllegalArgumentException("serviceName must not be null");
1352         }
1353
1354         if (-1 != serviceName.indexOf('/')) {
1355             throw new IllegalArgumentException("serviceName may not contain '/' characters");
1356         }
1357
1358         String address = serviceName;
1359
1360         if (null != serviceParam) {
1361             address += "/" + serviceParam;
1362         }
1363
1364         synchronized (incomingMessageListeners) {
1365             if (incomingMessageListeners.containsKey(address)) {
1366                 return false;
1367             }
1368
1369             InboundMeter incomingMessageListenerMeter = null;
1370
1371             if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointServiceMonitor != null)) {
1372                 incomingMessageListenerMeter = endpointServiceMonitor.getInboundMeter(serviceName, serviceParam);
1373             }
1374
1375             incomingMessageListeners.put(address, listener);
1376         }
1377
1378         if (parentEndpoint != null) {
1379             if (serviceName.startsWith(ChannelMessenger.InsertedServicePrefix)) {
1380                 // The listener name is already re-written.
1381                 // The listener is already a quota listener; we made extra sure of that before tucking it into our local map.
1382                 parentEndpoint.addIncomingMessageListener(listener, serviceName, serviceParam);
1383             } else {
1384                 parentEndpoint.addIncomingMessageListener(listener, myServiceName, address);
1385             }
1386         }
1387
1388         return true;
1389     }
1390
1391     /**
1392      * {@inheritDoc}
1393      */
1394     public EndpointListener getIncomingMessageListener(String serviceName, String serviceParam) {
1395
1396         if (null == serviceName) {
1397             throw new IllegalArgumentException("serviceName must not be null");
1398         }
1399
1400         EndpointListener listener = null;
1401
1402         if (null != serviceParam) {
1403             listener = incomingMessageListeners.get(serviceName + "/" + serviceParam);
1404         }
1405
1406         // Didn't find it with param, maybe there is a generic listener for the service
1407         if (listener == null) {
1408             listener = incomingMessageListeners.get(serviceName);
1409         }
1410
1411         // Didn't find it still, try the compatibility name.
1412         if (listener == null) {
1413             listener = incomingMessageListeners.get(serviceName + serviceParam);
1414
1415             if ((null != listener) && Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1416                 LOG.warning("Found handler only via compatibility listener : " + serviceName + serviceParam);
1417             }
1418         }
1419
1420         return listener;
1421     }
1422
1423     /**
1424      * {@inheritDoc}
1425      */
1426     public EndpointListener removeIncomingMessageListener(String serviceName, String serviceParam) {
1427         if (null == serviceName) {
1428             throw new IllegalArgumentException("serviceName must not be null");
1429         }
1430
1431         if (-1 != serviceName.indexOf('/')) {
1432             throw new IllegalArgumentException("serviceName may not contain '/' characters");
1433         }
1434
1435         String address = serviceName;
1436
1437         if (null != serviceParam) {
1438             address += "/" + serviceParam;
1439         }
1440
1441         EndpointListener removedListener;
1442         synchronized (incomingMessageListeners) {
1443             removedListener = incomingMessageListeners.remove(address);
1444         }
1445
1446         if (parentEndpoint != null) {
1447             if (serviceName.startsWith(ChannelMessenger.InsertedServicePrefix)) {
1448                 parentEndpoint.removeIncomingMessageListener(serviceName, serviceParam);
1449             } else {
1450                 parentEndpoint.removeIncomingMessageListener(myServiceName, address);
1451             }
1452         }
1453         return removedListener;
1454     }
1455
1456     /**
1457      * Returns a local transport that can send to the given address. For now
1458      * this is based only on the protocol name.
1459      *
1460      * @param addr the endpoint address
1461      * @return the transport if the address protocol is supported by this transport
1462      */
1463     private MessageSender getLocalSenderForAddress(EndpointAddress addr) {
1464
1465         Iterator<MessageTransport> localTransports = getAllLocalTransports();
1466
1467         while (localTransports.hasNext()) {
1468             MessageTransport transpt = localTransports.next();
1469             if (!transpt.getProtocolName().equals(addr.getProtocolName())) {
1470                 continue;
1471             }
1472
1473             if (!(transpt instanceof MessageSender)) {
1474                 continue;
1475             }
1476             
1477             return (MessageSender) transpt;
1478         }
1479         return null;
1480     }
1481
1482     /**
1483      * {@inheritDoc}
1484      * <p/>
1485      * Note: canonical messenger itself does not do any address rewriting.
1486      * Any address rewriting must be specified when getting a channel. However,
1487      * canonical knows the default group redirection for its owning endpoint and
1488      * will automatically skip redirection if it is the same.
1489      */
1490
1491     public Messenger getCanonicalMessenger(EndpointAddress addr, Object hint) {
1492         if (addr == null) {
1493             throw new IllegalArgumentException("null endpoint address not allowed.");
1494         }
1495
1496         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1497             Throwable trace = new Throwable("Stack Trace");
1498             StackTraceElement elements[] = trace.getStackTrace();
1499
1500             int position = 1;
1501
1502             while (elements[position].getClassName().startsWith("net.jxta.impl.endpoint.EndpointService")) {
1503                 position++;
1504             }
1505
1506             if ((elements.length - 1) == position) {
1507                 position--;
1508             }
1509
1510             LOG.fine("Get Messenger for " + addr + " by " + elements[position]);
1511         }
1512
1513         // Check the canonical map.
1514         synchronized (messengerMap) {
1515             Reference<Messenger> ref = messengerMap.get(addr);
1516
1517             if (ref != null) {
1518                 Messenger found = ref.get();
1519
1520                 // If it is USABLE, return it.
1521                 if ((found != null) && ((found.getState() & Messenger.USABLE) != 0)) {
1522                     return found;
1523                 }
1524
1525                 // It has been GCed or is no longer USABLE. Make room for a new one.
1526                 messengerMap.remove(addr);
1527             }
1528
1529             if (getLocalSenderForAddress(addr) != null) {
1530                 OutboundMeter messengerMeter = null;
1531
1532                 if (EndpointMeterBuildSettings.ENDPOINT_METERING && (endpointServiceMonitor != null)) {
1533                     messengerMeter = endpointServiceMonitor.getOutboundMeter(addr);
1534                 }
1535
1536                 // The hint is saved in the canonical messenger and will be used
1537                 // when that virtual messenger first faces the need to create a
1538                 // transport messenger. As of now, the logical dest is unknown.
1539                 Messenger m = new CanonicalMessenger(vmQueueSize, addr, null, hint, messengerMeter);
1540
1541                 messengerMap.put(m.getDestinationAddress(), new SoftReference<Messenger>(m));
1542                 return m;
1543             }
1544         }
1545
1546         // If we're here, we do not have any such transport.
1547         // Try our ancestors enpoints, if any.
1548
1549         if (parentEndpoint == null) {
1550             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1551                 LOG.fine("Could not create messenger for : " + addr);
1552             }
1553             return null;
1554         }
1555
1556         return parentEndpoint.getCanonicalMessenger(addr, hint);
1557     }
1558
1559     /**
1560      * Return only the message transport registered locally.
1561      */
1562     protected Iterator<MessageTransport> getAllLocalTransports() {
1563         List<MessageTransport> transportList;
1564
1565         synchronized (messageTransports) {
1566             transportList = new ArrayList<MessageTransport>(messageTransports);
1567         }
1568
1569         return transportList.iterator();
1570     }
1571
1572     /**
1573      * Returns a messenger for the specified address from one of the Message
1574      * Transports registered with this very endpoint service. Message
1575      * Transports inherited from parent groups will not be used.
1576      *
1577      * @param addr The destination address of the desired Messenger.
1578      * @param hint A hint provided to the Message Transport which may assist it
1579      *             in creating the messenger.
1580      * @return A Messenger for the specified destination address or {@code null}
1581      *         if no Messenger could be created.
1582      */
1583     private Messenger getLocalTransportMessenger(EndpointAddress addr, Object hint) {
1584         MessageSender sender = getLocalSenderForAddress(addr);
1585         Messenger messenger = null;
1586
1587         if (sender != null) {
1588             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1589                 LOG.fine("Trying address \'" + addr + "\' with : " + sender);
1590             }
1591             messenger = sender.getMessenger(addr, hint);
1592         }
1593
1594         if (messenger == null) {
1595             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1596                 LOG.fine("Couldn\'t create messenger for : " + addr);
1597             }
1598         }
1599         return messenger;
1600     }
1601
1602     /**
1603      * {@inheritDoc}
1604      */
1605     public synchronized void addIncomingMessageFilterListener(MessageFilterListener listener, String namespace, String name) {
1606         if (null == listener) {
1607             throw new IllegalArgumentException("listener must be non-null");
1608         }
1609
1610         FilterListenerAndMask aFilter = new FilterListenerAndMask(listener, namespace, name);
1611
1612         incomingFilterListeners.add(aFilter);
1613     }
1614
1615     /**
1616      * {@inheritDoc}
1617      */
1618     public synchronized void addOutgoingMessageFilterListener(MessageFilterListener listener, String namespace, String name) {
1619         if (null == listener) {
1620             throw new IllegalArgumentException("listener must be non-null");
1621         }
1622
1623         FilterListenerAndMask aFilter = new FilterListenerAndMask(listener, namespace, name);
1624
1625         outgoingFilterListeners.add(aFilter);
1626     }
1627
1628     /**
1629      * {@inheritDoc}
1630      */
1631     public synchronized MessageFilterListener removeIncomingMessageFilterListener(MessageFilterListener listener, String namespace, String name) {
1632         Iterator<FilterListenerAndMask> eachListener = incomingFilterListeners.iterator();
1633
1634         while (eachListener.hasNext()) {
1635             FilterListenerAndMask aFilter = eachListener.next();
1636
1637             if (listener == aFilter.listener) {
1638                 eachListener.remove();
1639                 return listener;
1640             }
1641         }
1642
1643         return null;
1644     }
1645
1646     /**
1647      * {@inheritDoc}
1648      */
1649     public synchronized MessageFilterListener removeOutgoingMessageFilterListener(MessageFilterListener listener, String namespace, String name) {
1650         Iterator<FilterListenerAndMask> eachListener = outgoingFilterListeners.iterator();
1651
1652         while (eachListener.hasNext()) {
1653             FilterListenerAndMask aFilter = eachListener.next();
1654
1655             if ((listener == aFilter.listener)
1656                     && ((null != namespace) ? namespace.equals(aFilter.namespace) : (null == aFilter.namespace))
1657                     && ((null != name) ? name.equals(aFilter.name) : (null == aFilter.name))) {
1658                 eachListener.remove();
1659                 return listener;
1660             }
1661         }
1662
1663         return null;
1664     }
1665
1666     /**
1667      * {@inheritDoc}
1668      *
1669      * <p/>Redistribute the event to those interested.
1670      */
1671     public boolean messengerReady(MessengerEvent event) {
1672
1673         // FIXME - jice@jxta.org 20040413: now that we share messengers, we 
1674         // should be able to get rid of most of this mess, and in the router, 
1675         // and the relay too.
1676
1677         Messenger messenger = event.getMessenger();
1678         Messenger messengerForHere;
1679         EndpointAddress connAddr = event.getConnectionAddress();
1680
1681         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1682             LOG.fine("New " + messenger + " for : " +
1683                     messenger.getDestinationAddress() + " (" +
1684                     messenger.getLogicalDestinationAddress() + ")");
1685         }
1686
1687         int highestPrec = EndpointService.HighPrecedence;
1688         int lowestPrec = EndpointService.LowPrecedence;
1689
1690         // If there's no connection address we just pass the messenger around
1691         // everywhere; it is unspecified which group it is for.
1692         // Else, we must figure out if it is for this group, or must be
1693         // passed upStack (if any).
1694         if (connAddr != null) {
1695             String cgServiceName = connAddr.getServiceName();
1696
1697             // See if there is a mangling. If not, this means this was sent
1698             // within this group through a local xport, so it is for here. Else 
1699             // it may be for here (from below) or for upstack.
1700
1701             if (cgServiceName == null || !cgServiceName.startsWith(ChannelMessenger.InsertedServicePrefix)) {
1702                 // FIXME: jice@jxta.org - 20030512 If we restrict use to "here"
1703                 // we make most backchannels useless. So, the statement below is
1704                 // commented out.  Ideally we should not have to worry about the 
1705                 // group targetting of connections, only messages. However the
1706                 // way the relay and the router have to split messengers makes
1707                 // it necessary. This may only be fixed by re-organizing 
1708                 // globally the management of incoming messengers in the 
1709                 // endpoint, so that router and relay no-longer need to claim 
1710                 // exclusive use of messengers. Since relay clients set the 
1711                 // group properly, their messengers are not affected by this
1712                 // branch of the code.
1713                 // lowestPrec = EndpointService.LowPrecedence + 1;
1714             } else if (!myServiceName.equals(cgServiceName)) {
1715                 // This is for upstack only
1716                 highestPrec = EndpointService.LowPrecedence;
1717             } else {
1718                 // Mangling present and this is for here (and therefore this is
1719                 // from below). We must demangle. Wrapping is figured later, 
1720                 // since we may also have to wrap if there the
1721                 lowestPrec = EndpointService.LowPrecedence + 1;
1722
1723                 String serviceParam = connAddr.getServiceParameter();
1724                 String realService = null;
1725                 String realParam = null;
1726
1727                 if (null != serviceParam) {
1728                     int slashAt = serviceParam.indexOf('/');
1729
1730                     if (-1 == slashAt) {
1731                         realService = serviceParam;
1732                     } else {
1733                         realService = serviceParam.substring(0, slashAt);
1734                         realParam = serviceParam.substring(slashAt + 1);
1735                     }
1736                 }
1737
1738                 connAddr = new EndpointAddress(connAddr, realService, realParam);
1739             }
1740         }
1741
1742         // We make a channel in all cases, the channel will decide if the desired grp redirection
1743         // requires address rewriting or not.
1744
1745         // As for a MessageWatcher for implementing sendMessage-with-listener, we do not provide one
1746         // mostly because it is difficult to get a hold on the only appropriate one: that of the endpoint
1747         // service interface of the listener's owner. So, incoming messengers will not support the listener-based send API.
1748         // Unless someone adds a watcher by hand.
1749         messengerForHere = event.getMessenger().getChannelMessenger(group.getPeerGroupID(), null, null);
1750
1751         // Call the listener highest precedence first. The first one that claims
1752         // the messenger wins.
1753         for (int prec = highestPrec + 1; prec-- > lowestPrec;) {
1754             MessengerEvent newMessenger = new MessengerEvent(event.getSource(),
1755                     prec == EndpointService.LowPrecedence ? messenger : messengerForHere, connAddr);
1756
1757             // We need to grab the listeners and release the lock. Otherwise the
1758             // sometimes too long operations performed by the listener creates
1759             // an unnecessary contention.
1760             // The side effect is that a listener can in theory be called after
1761             // remove returned. It is unlikely to be a problem for messenger
1762             // events, but if it is, then we'll have to add reader-writer synch.
1763             Collection<MessengerEventListener> allML = new ArrayList<MessengerEventListener>(passiveMessengerListeners[prec]);
1764             for (MessengerEventListener listener : allML) {
1765                 try {
1766                     if (listener.messengerReady(newMessenger)) {
1767                         // A listener has taken the messenger. we're done.
1768                         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1769                             LOG.fine(newMessenger + " claimed by " + listener);
1770                         }
1771                         return true;
1772                     }
1773                 } catch (Throwable all) {
1774                     if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1775                         LOG.log(Level.WARNING, "Uncaught Throwable in listener " + listener, all);
1776                     }
1777                 }
1778             }
1779         }
1780
1781         // Note that the messenger was not wanted.
1782         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1783             LOG.fine("Nobody cared about " + event);
1784         }
1785         return false;
1786     }
1787
1788     // public MessengerEventListener getMessengerEventListener() {
1789     // return this;
1790     // }
1791
1792     // try to find the relay service in our
1793     // hierachy of endpoints to register our listener group
1794     // since the group has to register into the relay service. This is not
1795     // very pretty, but the other way was even worth to register the relay
1796     // into the endpoint!
1797
1798     private void addActiveRelayListener(PeerGroup listeningGroup) {
1799         PeerGroup parentGroup = group.getParentGroup();
1800         while (parentGroup != null) {
1801             EndpointService parentEndpoint = parentGroup.getEndpointService();
1802
1803             for (Iterator<MessageTransport> it = parentEndpoint.getAllMessageTransports(); it.hasNext();) {
1804                 MessageTransport mt = it.next();
1805
1806                 if ((mt instanceof RelayClient)) {
1807                     ((RelayClient) mt).addActiveRelayListener(listeningGroup);
1808                     break;
1809                 }
1810             }
1811             parentGroup = parentGroup.getParentGroup();
1812         }
1813     }
1814
1815     // try to find the relay service in our
1816     // hierachy of endpoints to unregister our listener group
1817     private void removeActiveRelayListener(PeerGroup listeningGroup) {
1818         PeerGroup parentGroup = group.getParentGroup();
1819
1820         while (parentGroup != null) {
1821             EndpointService parentEndpoint = parentGroup.getEndpointService();
1822
1823             for (Iterator<MessageTransport> it = parentEndpoint.getAllMessageTransports(); it.hasNext();) {
1824                 MessageTransport mt = it.next();
1825
1826                 if ((mt instanceof RelayClient)) {
1827                     ((RelayClient) mt).removeActiveRelayListener(listeningGroup);
1828                     break;
1829                 }
1830             }
1831             parentGroup = parentGroup.getParentGroup();
1832         }
1833     }
1834
1835     /*
1836      * Convenience legacy methods. They are here to reduce the complexity of the class hierarchy but are not supposed to be used.
1837      */
1838
1839     /**
1840      * {@inheritDoc}
1841      *
1842      * @deprecated legacy method.
1843      */
1844     @Deprecated
1845     public boolean ping(EndpointAddress addr) {
1846         throw new UnsupportedOperationException("Legacy method not implemented. Use an interface object.");
1847     }
1848
1849     /**
1850      * {@inheritDoc}
1851      *
1852      * @deprecated legacy method.
1853      */
1854     @Deprecated
1855     public boolean getMessenger(MessengerEventListener listener, EndpointAddress addr, Object hint) {
1856         throw new UnsupportedOperationException("Legacy method not implemented. Use an interface object.");
1857     }
1858
1859     /**
1860      * {@inheritDoc}
1861      * <p/>
1862      * convenience method not supported here.
1863      */
1864     public Messenger getMessenger(EndpointAddress addr) {
1865         throw new UnsupportedOperationException("Convenience method not implemented. Use an interface object.");
1866     }
1867
1868     /**
1869      * {@inheritDoc}
1870      * <p/>
1871      * convenience method not supported here.
1872      */
1873     public Messenger getMessengerImmediate(EndpointAddress addr, Object hint) {
1874         throw new UnsupportedOperationException("Convenience method not implemented. Use an interface object.");
1875     }
1876
1877     /**
1878      * {@inheritDoc}
1879      * <p/>
1880      * convenience method not supported here.
1881      */
1882     public Messenger getMessenger(EndpointAddress addr, Object hint) {
1883         throw new UnsupportedOperationException("Convenience method not implemented. Use an interface object.");
1884     }
1885
1886     /**
1887      * Returns a Direct Messenger that may be used to send messages via  this endpoint
1888      * to the specified destination.
1889      *
1890      * @param address the destination address.
1891      * @param hint the messenger hint, if any, otherwise null.
1892      * @param exclusive if true avoids caching the messenger
1893      * @return The messenger or {@code null} is returned if the destination address is not reachable.
1894      * @throws IllegalArgumentException if hint is not of RouteAdvertisement, or PeerAdvertisement type.
1895      */
1896     public Messenger getDirectMessenger(EndpointAddress address, Object hint, boolean exclusive) {
1897
1898         if (!exclusive) {
1899             Reference<Messenger> reference = directMessengerMap.get(address);
1900             if (reference != null) {
1901                 Messenger messenger = reference.get();
1902                 if (messenger != null && !messenger.isClosed()) {
1903                     return messenger;
1904                 }
1905             }
1906         }
1907
1908         // We must have access to a TCP transport to create a direct messenger.
1909         TcpTransport tcpTransport = (TcpTransport) getMessageTransport("tcp");
1910
1911         if ((tcpTransport != null) && (hint != null)) {
1912             RouteAdvertisement route;
1913             EndpointAddress direct;
1914             Messenger messenger;
1915             if (hint instanceof RouteAdvertisement) {
1916                 route = (RouteAdvertisement) hint;
1917             } else if (hint instanceof PeerAdvertisement) {
1918                 route = EndpointUtils.extractRouteAdv((PeerAdvertisement) hint);
1919             } else {
1920                 throw new IllegalArgumentException("Unknown route hint object type" + hint);
1921             }
1922             
1923             for (EndpointAddress transportAddr : route.getDestEndpointAddresses()) {
1924                 if (transportAddr.getProtocolName().equals("tcp")) {                    
1925                     direct = createDirectAddress(transportAddr, address);
1926                     // direct messengers are non self destructive
1927                     messenger = tcpTransport.getMessenger(direct, route, false);
1928                     if (messenger != null) {
1929                         if (!exclusive) {
1930                             directMessengerMap.put(address, new WeakReference<Messenger>(messenger));
1931                         }
1932                         return messenger;
1933                     }
1934                 }
1935             }
1936         }
1937         
1938         return null;
1939     }
1940
1941     /**
1942      * Given a transport address and service address, create a mangled endpoint address
1943      *
1944      * @param transportAddr   the transport messenger address
1945      * @param serviceEndpoint the service endpoint
1946      * @return an composite endpoint address
1947      */
1948     private EndpointAddress createDirectAddress(EndpointAddress transportAddr, EndpointAddress serviceEndpoint) {
1949         //physical transport address
1950         StringBuilder destStr = new StringBuilder(transportAddr.toString()).append("/");
1951         // EndpointService
1952         destStr.append(ENDPOINTSERVICE_NAME);
1953         //Dest peergroup
1954         destStr.append(":").append(group.getPeerGroupID().getUniqueValue().toString()).append("/");
1955         //Service endpoint
1956         destStr.append(serviceEndpoint.getServiceName()).append("/").append(serviceEndpoint.getServiceParameter());
1957
1958         //return new EndpointAddress(transportAddr, serviceEndpoint.getServiceName(), serviceEndpoint.getServiceParameter());
1959         return new EndpointAddress(destStr.toString());
1960     }
1961 }