2 * Copyright (c) 2001-2007 Sun Microsystems, Inc. All rights reserved.
4 * The Sun Project JXTA(TM) Software License
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
9 * 1. Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
16 * 3. The end-user documentation included with the redistribution, if any, must
17 * include the following acknowledgment: "This product includes software
18 * developed by Sun Microsystems, Inc. for JXTA(TM) technology."
19 * Alternately, this acknowledgment may appear in the software itself, if
20 * and wherever such third-party acknowledgments normally appear.
22 * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must
23 * not be used to endorse or promote products derived from this software
24 * without prior written permission. For written permission, please contact
25 * Project JXTA at http://www.jxta.org.
27 * 5. Products derived from this software may not be called "JXTA", nor may
28 * "JXTA" appear in their name, without prior written permission of Sun.
30 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
31 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
32 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN
33 * MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
34 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
36 * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
37 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
38 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
39 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41 * JXTA is a registered trademark of Sun Microsystems, Inc. in the United
42 * States and other countries.
44 * Please see the license information page at :
45 * <http://www.jxta.org/project/www/license.html> for instructions on use of
46 * the license in source files.
48 * ====================================================================
50 * This software consists of voluntary contributions made by many individuals
51 * on behalf of Project JXTA. For more information on Project JXTA, please see
52 * http://www.jxta.org.
54 * This license is based on the BSD license adopted by the Apache Foundation.
56 package net.jxta.impl.endpoint;
58 import net.jxta.document.Advertisement;
59 import net.jxta.endpoint.ChannelMessenger;
60 import net.jxta.endpoint.EndpointAddress;
61 import net.jxta.endpoint.EndpointListener;
62 import net.jxta.endpoint.EndpointService;
63 import net.jxta.endpoint.ListenerAdaptor;
64 import net.jxta.endpoint.Message;
65 import net.jxta.endpoint.MessageFilterListener;
66 import net.jxta.endpoint.MessageTransport;
67 import net.jxta.endpoint.Messenger;
68 import net.jxta.endpoint.MessengerEventListener;
69 import net.jxta.id.ID;
70 import net.jxta.impl.util.TimeUtils;
71 import net.jxta.impl.peergroup.StdPeerGroup;
72 import net.jxta.peergroup.PeerGroup;
73 import net.jxta.protocol.ModuleImplAdvertisement;
75 import java.lang.ref.Reference;
76 import java.lang.ref.WeakReference;
77 import java.util.Iterator;
79 import java.util.WeakHashMap;
81 import net.jxta.platform.Module;
84 * Provides an interface object appropriate for applications using the endpoint
85 * service. The interface provides a number of convenience features and
86 * implementation necessary for legacy features.
88 class EndpointServiceInterface implements EndpointService {
91 * The service interface that we will be fronting.
93 private final EndpointServiceImpl theRealThing;
96 * The number of active instances of this class. We use this for deciding
97 * when to instantiate and shutdown the listener adaptor.
99 private static int activeInstanceCount = 0;
102 * Provides emulation of the legacy send-message-with-listener and get-messenger-with-listener APIs.
104 private static ListenerAdaptor listenerAdaptor;
107 * The cache of channels. If a given owner of this EndpointService interface
108 * object requests channels for the same exact destination multiple times,
109 * we will return the same channel object as much as possible. We keep
110 * channels in a weak map, so that when channels are discarded, they
111 * eventually disappear. Channels that have messages in them are always
112 * referenced. Therefore, this prevents the creation of more than one
113 * channel with messages in it for the same destination in the same context
114 * (owner of interface object - typically one module). This is required to
115 * properly support the common (and convenient) pattern:
117 * <code>m = endpointServiceInterface.getMessenger(); messenger.sendMessage(); m = null;</code>
119 * If that was not kept in check, it would be possible to inadvertently
120 * create an infinite number of channels with pending messages, thus an
121 * infinite number of messages too.
123 private final Map<EndpointAddress, Reference<Messenger>> channelCache = new WeakHashMap<EndpointAddress, Reference<Messenger>>();
126 * Builds a new interface object.
128 * @param endpointService the endpoint service that we will front.
130 public EndpointServiceInterface(EndpointServiceImpl endpointService) {
131 theRealThing = endpointService;
132 synchronized (this.getClass()) {
133 activeInstanceCount++;
134 if (1 == activeInstanceCount) {
135 listenerAdaptor = new ListenerAdaptor(Thread.currentThread().getThreadGroup(), ((StdPeerGroup) endpointService.getGroup()).getExecutor());
143 * This is rather heavy-weight if instances are frequently created and
144 * discarded since finalization significantly delays GC.
147 protected void finalize() throws Throwable {
148 synchronized (this.getClass()) {
149 activeInstanceCount--;
150 if (0 == activeInstanceCount) {
151 listenerAdaptor.shutdown();
152 listenerAdaptor = null;
161 * it is there only to satisfy the requirements of the interface that we
162 * implement. Ultimately, the API should define two levels of interfaces :
163 * one for the real service implementation and one for the interface object.
164 * Right now it feels a bit heavy to so that since the only different
165 * between the two would be init() and may-be getName().
167 public void init(PeerGroup peerGroup, ID id, Advertisement implAdv) {
173 * This is here for temporary class hierarchy reasons.
174 * it is ALWAYS ignored. By definition, the interface object
175 * protects the real object's start/stop methods from being called
177 public int startApp(String[] arg) {
178 return Module.START_OK;
184 * This is here for temporary class hierarchy reasons.
185 * it is ALWAYS ignored. By definition, the interface object
186 * protects the real object's start/stop methods from being called
188 * This request is currently ignored.
190 public void stopApp() {
196 public ModuleImplAdvertisement getImplAdvertisement() {
197 return theRealThing.getImplAdvertisement();
203 * Sort of absurd but this is part of the API we're implementing.
204 * We would not do a two-level API just for that.
206 public EndpointService getInterface() {
213 public PeerGroup getGroup() {
214 return theRealThing.getGroup();
220 public Messenger getCanonicalMessenger(EndpointAddress addr, Object hint) {
221 // XXX: maybe we should enforce the stripping of the address here.
222 // That would prevent application from making canonical messengers with a variety of service names and
223 // service params. On the other hand that would cost useless cloning of endp addrs and prevent future
224 // flexibility regarding QOS params, possibly. Be liberal for now.
225 return theRealThing.getCanonicalMessenger(addr, hint);
231 public Messenger getMessengerImmediate(EndpointAddress addr, Object hint) {
232 // Note: for now, the hint is not used for canonicalization (hint != QOS).
233 synchronized (channelCache) {
234 Reference<Messenger> existing = channelCache.get(addr);
236 if (existing != null) {
237 Messenger messenger = existing.get();
238 if ((messenger != null) && ((messenger.getState() & Messenger.USABLE) != 0)) {
244 // We do not have a good one at hand. Make a new one.
246 // Use the stripped address to get a canonical msngr; not doing so
247 // would reduce the sharing to almost nothing.
248 EndpointAddress plainAddr = new EndpointAddress(addr, null, null);
250 Messenger found = theRealThing.getCanonicalMessenger(plainAddr, hint);
252 // Address must not be a supported one.
257 // Get a channel for that servicename and serviceparam. redirect to this group.
259 // NOTE: This assumes that theRealThing.getGroup() is really the group where the application that obtained
260 // this interface object lives. This is the case today because all groups have their own endpoint service.
261 // In the future, interface objects may refer to a group context that is not necessarily the group where
262 // "therealThing" lives. When that happens, this interface object will have to know which group it works
263 // for without asking "theRealThing".
265 ChannelMessenger res = (ChannelMessenger) found.getChannelMessenger(theRealThing.getGroup().getPeerGroupID(),
266 addr.getServiceName(), addr.getServiceParameter());
268 synchronized (channelCache) {
269 // We have to check again. May be we did all that in parallel with some other thread and it beat
270 // us to the finish line. In which case, substitute the existing one and throw ours away.
271 Reference<Messenger> existing = channelCache.get(addr);
273 if (existing != null) {
274 Messenger messenger = existing.get();
275 if ((messenger != null) && ((messenger.getState() & Messenger.USABLE) != 0)) {
279 // The listenerAdaptor of this interface obj is used to support the sendMessage-with-listener API.
280 res.setMessageWatcher(listenerAdaptor);
281 channelCache.put(res.getDestinationAddress(), new WeakReference<Messenger>(res));
289 public Messenger getMessenger(EndpointAddress addr) {
290 return getMessenger(addr, null);
296 public Messenger getMessenger(EndpointAddress addr, Object hint) {
298 // Get an unresolved messenger (that's immediate).
299 Messenger messenger = getMessengerImmediate(addr, hint);
301 if (messenger == null) {
305 // Now ask the messenger to resolve: this legacy blocking API ensures
306 // that only successfully resolved messengers are ever returned.
309 messenger.waitState(Messenger.RESOLVED | Messenger.TERMINAL, TimeUtils.AMINUTE);
310 } catch (InterruptedException ie) {
311 Thread.interrupted();
315 int state = messenger.getState();
317 if ((state & Messenger.TERMINAL) != 0) {
320 if ((state & Messenger.RESOLVED) == 0) {
321 // Not failed yet. But too late for us.
330 public void propagate(Message msg, String serviceName, String serviceParam) {
331 theRealThing.propagate(msg, serviceName, serviceParam, Integer.MAX_VALUE);
337 public void propagate(Message msg, String serviceName, String serviceParam, int initialTTL) {
338 theRealThing.propagate(msg, serviceName, serviceParam, initialTTL);
344 public void demux(Message msg) {
345 theRealThing.demux(msg);
351 public void processIncomingMessage(Message message, EndpointAddress source, EndpointAddress destination) {
352 theRealThing.processIncomingMessage(message, source, destination);
359 public boolean ping(EndpointAddress addr) {
360 return null != getMessengerImmediate(addr, null);
366 public MessengerEventListener addMessageTransport(MessageTransport transpt) {
367 // FIXME TOO: We should probably make the interface refuse to do it.
368 // But that will have to wait until we have criteria to decide who
369 // gets an interface object and who gets the real thing. In the
370 // meantime just do it.
371 return theRealThing.addMessageTransport(transpt);
377 public boolean removeMessageTransport(MessageTransport transpt) {
378 // FIXME TOO: We should probably make the interface refuse to do it.
379 // But that will have to wait until we have criteria to decide who
380 // gets an interface object and who gets the real thing. In the
381 // meantime just do it.
382 return theRealThing.removeMessageTransport(transpt);
388 public Iterator<MessageTransport> getAllMessageTransports() {
389 return theRealThing.getAllMessageTransports();
395 public MessageTransport getMessageTransport(String name) {
396 return theRealThing.getMessageTransport(name);
402 public boolean addIncomingMessageListener(EndpointListener listener, String serviceName, String serviceParam) {
403 return theRealThing.addIncomingMessageListener(listener, serviceName, serviceParam);
409 public EndpointListener getIncomingMessageListener(String serviceName, String serviceParam) {
410 return theRealThing.getIncomingMessageListener(serviceName, serviceParam);
416 public void addIncomingMessageFilterListener(MessageFilterListener listener, String namespace, String name) {
417 theRealThing.addIncomingMessageFilterListener(listener, namespace, name);
423 public void addOutgoingMessageFilterListener(MessageFilterListener listener, String namespace, String name) {
424 theRealThing.addOutgoingMessageFilterListener(listener, namespace, name);
430 public MessageFilterListener removeIncomingMessageFilterListener(MessageFilterListener listener, String namespace, String name) {
431 return theRealThing.removeIncomingMessageFilterListener(listener, namespace, name);
437 public MessageFilterListener removeOutgoingMessageFilterListener(MessageFilterListener listener, String namespace, String name) {
438 return theRealThing.removeOutgoingMessageFilterListener(listener, namespace, name);
444 public EndpointListener removeIncomingMessageListener(String serviceName, String serviceParam) {
445 return theRealThing.removeIncomingMessageListener(serviceName, serviceParam);
451 public boolean addMessengerEventListener(MessengerEventListener listener, int prio) {
452 return theRealThing.addMessengerEventListener(listener, prio);
458 public boolean removeMessengerEventListener(MessengerEventListener listener, int prio) {
459 return theRealThing.removeMessengerEventListener(listener, prio);
465 * @deprecated legacy support
468 public boolean getMessenger(MessengerEventListener listener, EndpointAddress addr, Object hint) {
469 Messenger messenger = getMessengerImmediate(addr, hint);
470 if (messenger == null) {
474 if (!listenerAdaptor.watchMessenger(listener, messenger)) {
478 // Make sure that resolution is being attempted if not already in progress.
484 * Returns a Direct Messenger that may be used to send messages via this endpoint to the specified destination.
486 * @param addr the destination address.
487 * @param hint the messenger hint, if any, otherwise null.
488 * @param exclusive if true avoids caching the messenger
489 * @return The messenger or {@code null} is returned if the destination address is not reachable.
491 public Messenger getDirectMessenger(EndpointAddress addr, Object hint, boolean exclusive) {
492 return theRealThing.getDirectMessenger(addr, hint, exclusive);