2 * Copyright (c) 2004-2007 Sun Microsystems, Inc. All rights reserved.
4 * The Sun Project JXTA(TM) Software License
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
9 * 1. Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
16 * 3. The end-user documentation included with the redistribution, if any, must
17 * include the following acknowledgment: "This product includes software
18 * developed by Sun Microsystems, Inc. for JXTA(TM) technology."
19 * Alternately, this acknowledgment may appear in the software itself, if
20 * and wherever such third-party acknowledgments normally appear.
22 * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must
23 * not be used to endorse or promote products derived from this software
24 * without prior written permission. For written permission, please contact
25 * Project JXTA at http://www.jxta.org.
27 * 5. Products derived from this software may not be called "JXTA", nor may
28 * "JXTA" appear in their name, without prior written permission of Sun.
30 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
31 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
32 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN
33 * MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
34 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
36 * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
37 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
38 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
39 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41 * JXTA is a registered trademark of Sun Microsystems, Inc. in the United
42 * States and other countries.
44 * Please see the license information page at :
45 * <http://www.jxta.org/project/www/license.html> for instructions on use of
46 * the license in source files.
48 * ====================================================================
50 * This software consists of voluntary contributions made by many individuals
51 * on behalf of Project JXTA. For more information on Project JXTA, please see
52 * http://www.jxta.org.
54 * This license is based on the BSD license adopted by the Apache Foundation.
57 package net.jxta.impl.endpoint.router;
60 import net.jxta.endpoint.EndpointAddress;
61 import net.jxta.endpoint.EndpointService;
62 import net.jxta.endpoint.Messenger;
63 import net.jxta.impl.util.TimeUtils;
64 import java.util.logging.Level;
65 import net.jxta.logging.Logging;
66 import java.util.logging.Logger;
68 import java.lang.ref.Reference;
69 import java.lang.ref.SoftReference;
74 * This class is a repository of wisdom regarding destinations. It also provides
75 * a messenger if there is one. Currently, the wisdom is very limited and is
76 * only about direct destinations (for which a messenger once existed). The
77 * wisdom that can be obtained is:
80 * <li> is there a messenger at hand (incoming or otherwise).</li>
82 * <li> is it likely that one can be made from this end, should the one we
83 * have break. (the last attempt succeeded, not only incoming, and that was
86 * <li> is either of the above true, (are we confident we can get a
87 * messenger as of right now one way or the other).</li>
89 * <li> are we supposed to send a welcome to that destination (we can't
90 * remember having done it).</li>
93 * <p/>This could be extended to manage more of the life cycle, such as knowing
94 * about messengers being resolved or having failed to. This primitive interface
95 * is temporary; it is only meant to replace messengerPool without having to
96 * change the router too much.
104 private final static transient Logger LOG = Logger.getLogger(Destinations.class.getName());
107 * Shared Timer which handles cleanup of expired Wisdom.
109 private final static transient Timer cleanup = new Timer("Endpoint Destinations GC", true);
111 private final Map<EndpointAddress, Wisdom> wisdoms = new HashMap<EndpointAddress, Wisdom>(64);
114 * If {@code true} then we are shutting down.
116 private volatile boolean stopped = false;
119 * The endpoint service we are working for.
121 private final EndpointService endpoint;
124 * The wisdom GC task for this instance.
126 private final WisdomGCTask wisdomGC;
129 * Stores knowledge about one particular destination.
131 * <p/>It does not provide any synchronization. This is provided by the Destinations class.
136 * How long we consider that a past outgoingMessenger is an indication
137 * that one is possible in the future.
139 static final long EXPIRATION = 10 * TimeUtils.AMINUTE;
142 * The channel we last used, if any. They disappear faster than the
143 * canonical, but, as long as the canonical is around, they can be
144 * obtained at a near-zero cost.
146 private Reference<Messenger> outgoingMessenger;
149 * The channel we last used if it happens to be an incoming messenger. We keep
150 * a strong reference to it.
152 private Messenger incomingMessenger;
155 * The transport destination address of the messenger we're caching (if
158 private EndpointAddress xportDest;
161 * This tells when the outgoing messenger information expires. Incoming
162 * messengers have no expiration per se. We draw no conclusion from
163 * their past presence; only current presence. A wisdom is totally
164 * expired (and may thus be removed) when its outgoing messenger
165 * information is expired AND it has no incoming messenger.
167 private long expiresAt = 0;
170 * When a new destination is added, we're supposed to send our welcome
171 * along with the first message. This tells whether isWelcomeNeeded was
172 * once invoked or not.
174 private boolean welcomeNeeded = true;
177 * @param messenger The messenger to cache information about.
178 * @param incoming If true, this is an incoming messenger, which means
179 * that if the channel is lost it cannot be re-obtained. It must
180 * strongly referenced until it closes for disuse, or breaks.
182 Wisdom(Messenger messenger, boolean incoming) {
184 addIncomingMessenger(messenger);
186 addOutgoingMessenger(messenger);
191 * Reports whether a welcome message is needed. The first time we're
192 * asked we say "true". Subsequently, always "false".
194 * <p/>This assumes that the welcome was sent following the first test.
195 * (so, ask only if you'll send the welcome when told to).
197 * @return {@code true} if this is the first time this method is invoked.
199 synchronized boolean isWelcomeNeeded() {
200 boolean res = welcomeNeeded;
202 welcomeNeeded = false;
206 boolean addIncomingMessenger(Messenger m) {
208 // If we have no other incoming, we take it. No questions asked.
209 Messenger currentIncoming = getIncoming();
211 if (currentIncoming == null) {
212 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
213 LOG.fine("Accepted new incoming messenger for " + m.getDestinationAddress());
216 incomingMessenger = m;
220 // Now, check reachability (0 port number means no incoming connections).
221 // If the old one looks better, prefer it.
223 // Compute reachability of the new one.
224 String originAddr = m.getDestinationAddress().getProtocolAddress();
226 int index = originAddr.lastIndexOf(':');
227 int srcPort = (index != -1) ? Integer.parseInt(originAddr.substring(index + 1)) : 0;
228 boolean reachable = (srcPort != 0);
230 // Compute reachability of the old one.
231 originAddr = currentIncoming.getDestinationAddress().getProtocolAddress();
233 index = originAddr.lastIndexOf(':');
234 srcPort = (index != -1) ? Integer.parseInt(originAddr.substring(index + 1)) : 0;
235 boolean currentReachable = (srcPort != 0);
237 // The new one is less reachable than the old one. Keep the old one.
238 if (currentReachable && !reachable) {
242 incomingMessenger = m;
244 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
245 LOG.fine("Accepted new incoming messenger for " + m.getDestinationAddress());
251 boolean addOutgoingMessenger(Messenger m) {
252 if (getOutgoing() != null) {
255 this.outgoingMessenger = new SoftReference<Messenger>(m);
256 xportDest = m.getDestinationAddress();
257 expiresAt = TimeUtils.toAbsoluteTimeMillis(EXPIRATION);
259 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
260 LOG.fine("Accepted new outgoing messenger for " + xportDest);
265 void noOutgoingMessenger() {
266 outgoingMessenger = null;
272 * Returns an incoming messenger is there is a working one available.
274 * @return an incoming messenger, null if there's none
276 private Messenger getIncoming() {
277 if (incomingMessenger != null) {
278 if ((incomingMessenger.getState() & Messenger.USABLE) != 0) {
279 return incomingMessenger;
282 // forget about this broken messenger.
283 incomingMessenger = null;
289 * Returns an outgoingMessenger if there is one or one can be made without delay.
290 * Renews a broken one if it can be. Refreshes expiration time if a messenger is returned.
292 * @return an outgoing messenger, null if there's none
294 private Messenger getOutgoing() {
296 if (outgoingMessenger == null) {
300 // (If messenger is not null, it means that we also have a xportDest).
302 Messenger messenger = outgoingMessenger.get();
304 // If it is gone or broken, try and get a new one.
305 if ((messenger == null) || ((messenger.getState() & Messenger.USABLE) == 0)) {
307 messenger = endpoint.getMessengerImmediate(xportDest, null);
309 // If this fails, it is hopeless: the address is bad or something like that. Make ourselves expired right away.
310 if (messenger == null) {
311 outgoingMessenger = null;
317 // Renew the ref. The xportDest is the same.
318 outgoingMessenger = new SoftReference<Messenger>(messenger);
321 // So we had one or could renew. But, does it work ?
322 if ((messenger.getState() & (Messenger.USABLE & Messenger.RESOLVED)) == 0) {
323 // We no-longer have the underlying connection. Let ourselves expire. Do not renew the expiration time.
324 outgoingMessenger = null;
329 // Ok, we do have an outgoing messenger at the ready after all.
330 expiresAt = TimeUtils.toAbsoluteTimeMillis(EXPIRATION);
335 * Returns a channel for this destination if one is there or can be obtained
338 * <p/>We prefer the incoming connection to the outgoing for two
341 * <li>The remote peer was able to reach us. We cannot be sure that
342 * we can reach the remote peer.</li>
343 * <li>The remote peer initiated the connection. It has a better
344 * sense of when the connection should be closed or reopened than
347 * @return a channel for this destination
349 Messenger getCurrentMessenger() {
350 Messenger res = getIncoming();
356 return getOutgoing();
360 * @return true if we do have an outgoing messenger or, failing that, we had one not too long ago.
362 boolean isNormallyReachable() {
363 return ((getOutgoing() != null) || (TimeUtils.toRelativeTimeMillis(expiresAt) >= 0));
367 * We think the destination is reachable somehow. Not sure how long.
369 * @return true if we have any kind of messenger or, failing that, we had an outgoing one not too long ago.
371 boolean isCurrentlyReachable() {
372 return ((getIncoming() != null) || (getOutgoing() != null) || (TimeUtils.toRelativeTimeMillis(expiresAt) >= 0));
376 * @return true if this wisdom carries no positive information whatsoever.
378 boolean isExpired() {
379 return !isCurrentlyReachable();
384 * Internal mechanisms
388 * Return any Wisdom for the specified destination. The address will
389 * be normalized to the base form.
391 * @param destination The address of the wisdom that is sought.
392 * @return The Wisdom for this address or {@code null} if no Wisdom found.
394 private Wisdom getWisdom(EndpointAddress destination) {
395 if (destination.getServiceName() != null) {
396 destination = new EndpointAddress(destination, null, null);
398 return wisdoms.get(destination);
402 * Add a Wisdom for the specified destination. The address will
403 * be normalized to the base form.
405 * @param destination The address of the Wisdom that is being added.
406 * @param wisdom The Wisdom for this address to be added to the map.
408 private void addWisdom(EndpointAddress destination, Wisdom wisdom) {
409 destination = new EndpointAddress(destination, null, null);
410 wisdoms.put(destination, wisdom);
414 * General house keeping.
417 public Destinations(EndpointService endpoint) {
419 this.endpoint = endpoint;
421 wisdomGC = new WisdomGCTask();
423 cleanup.schedule(wisdomGC, TimeUtils.AMINUTE, TimeUtils.AMINUTE);
427 * Shutdown this cache. (stop the gc)
429 public synchronized void close() {
432 // forget everything.
439 * Handles cleanup of expired wisdoms
441 class WisdomGCTask extends TimerTask {
446 * <p/>garbage collector. We use soft references to messengers, but we use
447 * a strong hashmap to keep the wisdom around in a more predictable
448 * manner. Entries are simply removed when they no-longer carry
449 * relevant information; so there's no change in the total meaning of
450 * the map when an entry is removed.
455 synchronized (Destinations.this) {
456 Iterator<Wisdom> eachWisdom = wisdoms.values().iterator();
458 while (eachWisdom.hasNext()) {
459 Wisdom w = eachWisdom.next();
466 } catch (Throwable all) {
467 if (Logging.SHOW_SEVERE && Destinations.LOG.isLoggable(Level.SEVERE)) {
468 LOG.log(Level.SEVERE, "Uncaught Throwable in TimerTask :" + Thread.currentThread().getName(), all);
474 public synchronized Collection<EndpointAddress> allDestinations() {
476 Set<EndpointAddress> allKeys = wisdoms.keySet();
477 List<EndpointAddress> res = new ArrayList<EndpointAddress>(allKeys);
487 * If there is a messenger at hand (incoming or otherwise), return it.
489 * @param destination The destination as an endpoint address (is automatically normalized to protocol and address only).
490 * @return A messenger to that destination if a resolved and usable one is available or can be made instantly. null otherwise.
492 public synchronized Messenger getCurrentMessenger(EndpointAddress destination) {
493 Wisdom wisdom = getWisdom(destination);
495 if (wisdom == null) {
498 return wisdom.getCurrentMessenger();
502 * Is it likely that one can be made from this end. (the last attempt succeeded, not only incoming, and that was not long ago) ?
503 * This is a conservative test. It means that declaring that we can route to that destination is a very safe bet, as opposed
504 * to isNormallyReachable and getCurrentMessenger, which could be misleading if the only messenger we can ever get is incoming.
505 * Not currently used. Should likely be.
507 * @param destination The destination as an endpoint address (is automatically normalized to protocol and address only).
508 * @return true if it is likely that we can get a messenger to that destination in the future.
510 public synchronized boolean isNormallyReachable(EndpointAddress destination) {
511 Wisdom wisdom = getWisdom(destination);
513 return ((wisdom != null) && wisdom.isNormallyReachable());
517 * Do we already have a messenger or is it likely that we can make one?
518 * We is will return {@code true} more often than
519 * {@code isNormallyReachable()} since it can be true even when all we have
520 * is an incoming messenger.
522 * <p/>Just testing that there is an entry is no-longer the same because we
523 * may keep the entries beyond the point where we would keep them before, so
524 * that we can add some longer-lived information in the future, and do not
525 * interfere as much with the gc thread.
527 * @param destination The destination as an endpoint address (is automatically normalized to protocol and address only).
528 * @return true is we are confident that we can obtain a messenger, either because we can get one instantly, or because
529 * this destination is normally reachable. (So, it is ok to try and route to that destination, now).
531 public synchronized boolean isCurrentlyReachable(EndpointAddress destination) {
532 Wisdom wisdom = getWisdom(destination);
534 return ((wisdom != null) && wisdom.isCurrentlyReachable());
538 * Are we supposed to send a welcome to that destination (we can't remember having done it).
539 * It is assumed that once true was returned, it will be acted upon. So, true is not returned a second time.
541 * @param destination The destination as an endpoint address (is automatically normalized to protocol and address only).
542 * @return true if this a destination to whish we can't remember sending a welcome message.
544 public synchronized boolean isWelcomeNeeded(EndpointAddress destination) {
545 Wisdom wisdom = getWisdom(destination);
547 return ((wisdom != null) && wisdom.isWelcomeNeeded());
555 * Here is a messenger that we were able to obtain.
557 * @param destination The destination as an endpoint address (is automatically normalized to protocol and address only).
558 * @param messenger The incoming messenger for that destination.
559 * @return true if this messenger was added (keep it open). false otherwise (do what you want with it).
561 public synchronized boolean addOutgoingMessenger(EndpointAddress destination, Messenger messenger) {
562 Wisdom wisdom = getWisdom(destination);
564 if (wisdom != null) {
565 return wisdom.addOutgoingMessenger(messenger);
567 addWisdom(destination, new Wisdom(messenger, false));
572 * Here is an incoming messenger that just popped out.
574 * @param destination The destination as an endpoint address (is automatically normalized to protocol and address only).
575 * @param messenger The incoming messenger for that destination.
576 * @return true if this messenger was added (keep it open). false otherwise (do what you want with it).
578 public synchronized boolean addIncomingMessenger(EndpointAddress destination, Messenger messenger) {
579 Wisdom wisdom = getWisdom(destination);
581 if (wisdom != null) {
582 return wisdom.addIncomingMessenger(messenger);
584 addWisdom(destination, new Wisdom(messenger, true));
589 * We tried to get a messenger but could not. We know that we do not have connectivity from our end, for now. we may still
590 * have an incoming. However, if we had to try and make a messenger, there probably isn't an incoming, but that's not our
591 * business here. isNormallyReachable becomes false; but we can still try when solicited.
593 * @param destination The destination as an endpoint address (is automatically normalized to protocol and address only).
595 public synchronized void noOutgoingMessenger(EndpointAddress destination) {
596 Wisdom wisdom = getWisdom(destination);
598 if (wisdom != null) {
599 wisdom.noOutgoingMessenger();