]> sjero.net Git - linphone/blob - p2pproxy/dependencies-src/jxse-src-2.5/impl/src/net/jxta/impl/endpoint/router/Destinations.java
remove mediastreamer2 and add it as a submodule instead.
[linphone] / p2pproxy / dependencies-src / jxse-src-2.5 / impl / src / net / jxta / impl / endpoint / router / Destinations.java
1 /*
2  * Copyright (c) 2004-2007 Sun Microsystems, Inc.  All rights reserved.
3  *  
4  *  The Sun Project JXTA(TM) Software License
5  *  
6  *  Redistribution and use in source and binary forms, with or without 
7  *  modification, are permitted provided that the following conditions are met:
8  *  
9  *  1. Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *  
12  *  2. Redistributions in binary form must reproduce the above copyright notice, 
13  *     this list of conditions and the following disclaimer in the documentation 
14  *     and/or other materials provided with the distribution.
15  *  
16  *  3. The end-user documentation included with the redistribution, if any, must 
17  *     include the following acknowledgment: "This product includes software 
18  *     developed by Sun Microsystems, Inc. for JXTA(TM) technology." 
19  *     Alternately, this acknowledgment may appear in the software itself, if 
20  *     and wherever such third-party acknowledgments normally appear.
21  *  
22  *  4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must 
23  *     not be used to endorse or promote products derived from this software 
24  *     without prior written permission. For written permission, please contact 
25  *     Project JXTA at http://www.jxta.org.
26  *  
27  *  5. Products derived from this software may not be called "JXTA", nor may 
28  *     "JXTA" appear in their name, without prior written permission of Sun.
29  *  
30  *  THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
31  *  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND 
32  *  FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN 
33  *  MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
34  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 
35  *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, 
36  *  OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
37  *  LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
38  *  NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, 
39  *  EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
40  *  
41  *  JXTA is a registered trademark of Sun Microsystems, Inc. in the United 
42  *  States and other countries.
43  *  
44  *  Please see the license information page at :
45  *  <http://www.jxta.org/project/www/license.html> for instructions on use of 
46  *  the license in source files.
47  *  
48  *  ====================================================================
49  *  
50  *  This software consists of voluntary contributions made by many individuals 
51  *  on behalf of Project JXTA. For more information on Project JXTA, please see 
52  *  http://www.jxta.org.
53  *  
54  *  This license is based on the BSD license adopted by the Apache Foundation. 
55  */
56
57 package net.jxta.impl.endpoint.router;
58
59
60 import net.jxta.endpoint.EndpointAddress;
61 import net.jxta.endpoint.EndpointService;
62 import net.jxta.endpoint.Messenger;
63 import net.jxta.impl.util.TimeUtils;
64 import java.util.logging.Level;
65 import net.jxta.logging.Logging;
66 import java.util.logging.Logger;
67
68 import java.lang.ref.Reference;
69 import java.lang.ref.SoftReference;
70 import java.util.*;
71
72
73 /**
74  * This class is a repository of wisdom regarding destinations. It also provides
75  * a messenger if there is one. Currently, the wisdom is very limited and is
76  * only about direct destinations (for which a messenger once existed). The
77  * wisdom that can be obtained is:
78  *
79  * <p/><ul>
80  * <li> is there a messenger at hand (incoming or otherwise).</li>
81  * 
82  * <li> is it likely that one can be made from this end, should the one we
83  * have break. (the last attempt succeeded, not only incoming, and that was
84  * not long ago).</li>
85  * 
86  * <li> is either of the above true, (are we confident we can get a
87  * messenger as of right now one way or the other).</li>
88  * 
89  * <li> are we supposed to send a welcome to that destination (we can't
90  * remember having done it).</li>
91  * </ul>
92  * 
93  * <p/>This could be extended to manage more of the life cycle, such as knowing
94  * about messengers being resolved or having failed to. This primitive interface
95  * is temporary; it is only meant to replace messengerPool without having to
96  * change the router too much.
97  */
98
99 class Destinations {
100
101     /**
102      * Logger
103      */
104     private final static transient Logger LOG = Logger.getLogger(Destinations.class.getName());
105
106     /**
107      * Shared Timer which handles cleanup of expired Wisdom.
108      */
109     private final static transient Timer cleanup = new Timer("Endpoint Destinations GC", true);
110
111     private final Map<EndpointAddress, Wisdom> wisdoms = new HashMap<EndpointAddress, Wisdom>(64);
112
113     /**
114      * If {@code true} then we are shutting down.
115      */
116     private volatile boolean stopped = false;
117
118     /**
119      * The endpoint service we are working for.
120      */
121     private final EndpointService endpoint;
122
123     /**
124      * The wisdom GC task for this instance.
125      */
126     private final WisdomGCTask wisdomGC;
127
128     /**
129      * Stores knowledge about one particular destination.
130      * 
131      * <p/>It does not provide any synchronization. This is provided by the Destinations class.
132      */
133     final class Wisdom {
134
135         /**
136          * How long we consider that a past outgoingMessenger is an indication 
137          * that one is possible in the future.
138          */
139         static final long EXPIRATION = 10 * TimeUtils.AMINUTE;
140
141         /**
142          * The channel we last used, if any. They disappear faster than the 
143          * canonical, but, as long as the canonical is around, they can be 
144          * obtained at a near-zero cost.
145          */
146         private Reference<Messenger> outgoingMessenger;
147
148         /**
149          * The channel we last used if it happens to be an incoming messenger. We keep
150          * a strong reference to it.
151          */
152         private Messenger incomingMessenger;
153
154         /**
155          * The transport destination address of the messenger we're caching (if 
156          * not incoming).
157          */
158         private EndpointAddress xportDest;
159
160         /**
161          * This tells when the outgoing messenger information expires. Incoming
162          * messengers have no expiration per se.  We draw no conclusion from
163          * their past presence; only current presence. A wisdom is totally
164          * expired (and may thus be removed) when its outgoing messenger
165          * information is expired AND it has no incoming messenger.
166          */
167         private long expiresAt = 0;
168
169         /**
170          * When a new destination is added, we're supposed to send our welcome 
171          * along with the first message. This tells whether isWelcomeNeeded was 
172          * once invoked or not.
173          */
174         private boolean welcomeNeeded = true;
175
176         /**
177          * @param messenger The messenger to cache information about.
178          * @param incoming  If true, this is an incoming messenger, which means 
179          * that if the channel is lost it cannot be re-obtained. It must 
180          * strongly referenced until it closes for disuse, or breaks.
181          */
182         Wisdom(Messenger messenger, boolean incoming) {
183             if (incoming) {
184                 addIncomingMessenger(messenger);
185             } else {
186                 addOutgoingMessenger(messenger);
187             }
188         }
189
190         /**
191          * Reports whether a welcome message is needed. The first time we're 
192          * asked we say "true". Subsequently, always "false".
193          *
194          * <p/>This assumes that the welcome was sent following the first test.
195          * (so, ask only if you'll send the welcome when told to).
196          *
197          * @return {@code true} if this is the first time this method is invoked.
198          */
199         synchronized boolean isWelcomeNeeded() {
200             boolean res = welcomeNeeded;
201
202             welcomeNeeded = false;
203             return res;
204         }
205
206         boolean addIncomingMessenger(Messenger m) {
207
208             // If we have no other incoming, we take it. No questions asked.
209             Messenger currentIncoming = getIncoming();
210
211             if (currentIncoming == null) {
212                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
213                     LOG.fine("Accepted new incoming messenger for " + m.getDestinationAddress());
214                 }
215
216                 incomingMessenger = m;
217                 return true;
218             }
219
220             // Now, check reachability (0 port number means no incoming connections).
221             // If the old one looks better, prefer it.
222
223             // Compute reachability of the new one.
224             String originAddr = m.getDestinationAddress().getProtocolAddress();
225
226             int index = originAddr.lastIndexOf(':');
227             int srcPort = (index != -1) ? Integer.parseInt(originAddr.substring(index + 1)) : 0;
228             boolean reachable = (srcPort != 0);
229
230             // Compute reachability of the old one.
231             originAddr = currentIncoming.getDestinationAddress().getProtocolAddress();
232
233             index = originAddr.lastIndexOf(':');
234             srcPort = (index != -1) ? Integer.parseInt(originAddr.substring(index + 1)) : 0;
235             boolean currentReachable = (srcPort != 0);
236
237             // The new one is less reachable than the old one. Keep the old one.
238             if (currentReachable && !reachable) {
239                 return false;
240             }
241
242             incomingMessenger = m;
243
244             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
245                 LOG.fine("Accepted new incoming messenger for " + m.getDestinationAddress());
246             }
247
248             return true;
249         }
250
251         boolean addOutgoingMessenger(Messenger m) {
252             if (getOutgoing() != null) {
253                 return false;
254             }
255             this.outgoingMessenger = new SoftReference<Messenger>(m);
256             xportDest = m.getDestinationAddress();
257             expiresAt = TimeUtils.toAbsoluteTimeMillis(EXPIRATION);
258
259             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
260                 LOG.fine("Accepted new outgoing messenger for " + xportDest);
261             }
262             return true;
263         }
264
265         void noOutgoingMessenger() {
266             outgoingMessenger = null;
267             xportDest = null;
268             expiresAt = 0;
269         }
270
271         /**
272          * Returns an incoming messenger is there is a working one available.
273          *
274          * @return an incoming messenger, null if there's none
275          */
276         private Messenger getIncoming() {
277             if (incomingMessenger != null) {
278                 if ((incomingMessenger.getState() & Messenger.USABLE) != 0) {
279                     return incomingMessenger;
280                 }
281                 
282                 // forget about this broken messenger.
283                 incomingMessenger = null;
284             }
285             return null;
286         }
287
288         /**
289          * Returns an outgoingMessenger if there is one or one can be made without delay.
290          * Renews a broken one if it can be. Refreshes expiration time if a messenger is returned.
291          *
292          * @return an outgoing messenger, null if there's none
293          */
294         private Messenger getOutgoing() {
295
296             if (outgoingMessenger == null) {
297                 return null;
298             }
299
300             // (If messenger is not null, it means that we also have a xportDest).
301
302             Messenger messenger = outgoingMessenger.get();
303
304             // If it is gone or broken, try and get a new one.
305             if ((messenger == null) || ((messenger.getState() & Messenger.USABLE) == 0)) {
306
307                 messenger = endpoint.getMessengerImmediate(xportDest, null);
308
309                 // If this fails, it is hopeless: the address is bad or something like that. Make ourselves expired right away.
310                 if (messenger == null) {
311                     outgoingMessenger = null;
312                     xportDest = null;
313                     expiresAt = 0;
314                     return null;
315                 }
316
317                 // Renew the ref. The xportDest is the same.
318                 outgoingMessenger = new SoftReference<Messenger>(messenger);
319             }
320
321             // So we had one or could renew. But, does it work ?
322             if ((messenger.getState() & (Messenger.USABLE & Messenger.RESOLVED)) == 0) {
323                 // We no-longer have the underlying connection. Let ourselves expire. Do not renew the expiration time.
324                 outgoingMessenger = null;
325                 xportDest = null;
326                 return null;
327             }
328
329             // Ok, we do have an outgoing messenger at the ready after all.
330             expiresAt = TimeUtils.toAbsoluteTimeMillis(EXPIRATION);
331             return messenger;
332         }
333
334         /**
335          * Returns a channel for this destination if one is there or can be obtained
336          * readily and works.
337          * <p/>
338          * <p/>We prefer the incoming connection to the outgoing for two
339          * reasons:
340          * <ul>
341          * <li>The remote peer was able to reach us. We cannot be sure that
342          * we can reach the remote peer.</li>
343          * <li>The remote peer initiated the connection. It has a better
344          * sense of when the connection should be closed or reopened than
345          * we do.</li>
346          *
347          * @return a channel for this destination
348          */
349         Messenger getCurrentMessenger() {
350             Messenger res = getIncoming();
351
352             if (res != null) {
353                 return res;
354             }
355
356             return getOutgoing();
357         }
358
359         /**
360          * @return true if we do have an outgoing messenger or, failing that, we had one not too long ago.
361          */
362         boolean isNormallyReachable() {
363             return ((getOutgoing() != null) || (TimeUtils.toRelativeTimeMillis(expiresAt) >= 0));
364         }
365
366         /**
367          * We think the destination is reachable somehow. Not sure how long.
368          *
369          * @return true if we have any kind of messenger or, failing that, we had an outgoing one not too long ago.
370          */
371         boolean isCurrentlyReachable() {
372             return ((getIncoming() != null) || (getOutgoing() != null) || (TimeUtils.toRelativeTimeMillis(expiresAt) >= 0));
373         }
374
375         /**
376          * @return true if this wisdom carries no positive information whatsoever.
377          */
378         boolean isExpired() {
379             return !isCurrentlyReachable();
380         }
381     }
382
383     /*
384      * Internal mechanisms
385      */
386
387     /**
388      *  Return any Wisdom for the specified destination. The address will
389      *  be normalized to the base form.
390      *
391      *  @param destination The address of the wisdom that is sought.
392      *  @return The Wisdom for this address or {@code null} if no Wisdom found.
393      */
394     private Wisdom getWisdom(EndpointAddress destination) {
395         if (destination.getServiceName() != null) {
396             destination = new EndpointAddress(destination, null, null);
397         }
398         return wisdoms.get(destination);
399     }
400
401     /**
402      *  Add a Wisdom for the specified destination. The address will
403      *  be normalized to the base form.
404      *
405      *  @param destination The address of the Wisdom that is being added.
406      *  @param wisdom The Wisdom for this address to be added to the map.
407      */
408     private void addWisdom(EndpointAddress destination, Wisdom wisdom) {
409         destination = new EndpointAddress(destination, null, null);
410         wisdoms.put(destination, wisdom);
411     }
412
413     /*
414      * General house keeping.
415      */
416
417     public Destinations(EndpointService endpoint) {
418
419         this.endpoint = endpoint;
420
421         wisdomGC = new WisdomGCTask();
422
423         cleanup.schedule(wisdomGC, TimeUtils.AMINUTE, TimeUtils.AMINUTE);
424     }
425
426     /**
427      * Shutdown this cache. (stop the gc)
428      */
429     public synchronized void close() {
430         stopped = true;
431
432         // forget everything.
433         wisdoms.clear();
434
435         wisdomGC.cancel();
436     }
437
438     /**
439      * Handles cleanup of expired wisdoms
440      */
441     class WisdomGCTask extends TimerTask {
442
443         /**
444          * {@inheritDoc}
445          * 
446          * <p/>garbage collector. We use soft references to messengers, but we use
447          * a strong hashmap to keep the wisdom around in a more predictable
448          * manner. Entries are simply removed when they no-longer carry
449          * relevant information; so there's no change in the total meaning of
450          * the map when an entry is removed.
451          */
452         @Override
453         public void run() {
454             try {
455                 synchronized (Destinations.this) {
456                     Iterator<Wisdom> eachWisdom = wisdoms.values().iterator();
457
458                     while (eachWisdom.hasNext()) {
459                         Wisdom w = eachWisdom.next();
460
461                         if (w.isExpired()) {
462                             eachWisdom.remove();
463                         }
464                     }
465                 }
466             } catch (Throwable all) {
467                 if (Logging.SHOW_SEVERE && Destinations.LOG.isLoggable(Level.SEVERE)) {
468                     LOG.log(Level.SEVERE, "Uncaught Throwable in TimerTask :" + Thread.currentThread().getName(), all);
469                 }
470             }
471         }
472     }
473
474     public synchronized Collection<EndpointAddress> allDestinations() {
475
476         Set<EndpointAddress> allKeys = wisdoms.keySet();
477         List<EndpointAddress> res = new ArrayList<EndpointAddress>(allKeys);
478
479         return res;
480     }
481
482     /*
483      * information output
484      */
485
486     /**
487      * If there is a messenger at hand (incoming or otherwise), return it.
488      *
489      * @param destination The destination as an endpoint address (is automatically normalized to protocol and address only).
490      * @return A messenger to that destination if a resolved and usable one is available or can be made instantly. null otherwise.
491      */
492     public synchronized Messenger getCurrentMessenger(EndpointAddress destination) {
493         Wisdom wisdom = getWisdom(destination);
494
495         if (wisdom == null) {
496             return null;
497         }
498         return wisdom.getCurrentMessenger();
499     }
500
501     /**
502      * Is it likely that one can be made from this end. (the last attempt succeeded, not only incoming, and that was not long ago) ?
503      * This is a conservative test. It means that declaring that we can route to that destination is a very safe bet, as opposed
504      * to isNormallyReachable and getCurrentMessenger, which could be misleading if the only messenger we can ever get is incoming.
505      * Not currently used. Should likely be.
506      *
507      * @param destination The destination as an endpoint address (is automatically normalized to protocol and address only).
508      * @return true if it is likely that we can get a messenger to that destination in the future.
509      */
510     public synchronized boolean isNormallyReachable(EndpointAddress destination) {
511         Wisdom wisdom = getWisdom(destination);
512
513         return ((wisdom != null) && wisdom.isNormallyReachable());
514     }
515
516     /**
517      * Do we already have a messenger or is it likely that we can make one? 
518      * We is will return {@code true} more often than 
519      * {@code isNormallyReachable()} since it can be true even when all we have 
520      * is an incoming messenger.
521      * 
522      * <p/>Just testing that there is an entry is no-longer the same because we
523      * may keep the entries beyond the point where we would keep them before, so
524      * that we can add some longer-lived information in the future, and do not 
525      * interfere as much with the gc thread.
526      *
527      * @param destination The destination as an endpoint address (is automatically normalized to protocol and address only).
528      * @return true is we are confident that we can obtain a messenger, either because we can get one instantly, or because
529      *         this destination is normally reachable. (So, it is ok to try and route to that destination, now).
530      */
531     public synchronized boolean isCurrentlyReachable(EndpointAddress destination) {
532         Wisdom wisdom = getWisdom(destination);
533
534         return ((wisdom != null) && wisdom.isCurrentlyReachable());
535     }
536
537     /**
538      * Are we supposed to send a welcome to that destination (we can't remember having done it).
539      * It is assumed that once true was returned, it will be acted upon. So, true is not returned a second time.
540      *
541      * @param destination The destination as an endpoint address (is automatically normalized to protocol and address only).
542      * @return true if this a destination to whish we can't remember sending a welcome message.
543      */
544     public synchronized boolean isWelcomeNeeded(EndpointAddress destination) {
545         Wisdom wisdom = getWisdom(destination);
546
547         return ((wisdom != null) && wisdom.isWelcomeNeeded());
548     }
549
550     /*
551      * information input.
552      */
553
554     /**
555      * Here is a messenger that we were able to obtain.
556      *
557      * @param destination The destination as an endpoint address (is automatically normalized to protocol and address only).
558      * @param messenger   The incoming messenger for that destination.
559      * @return true if this messenger was added (keep it open). false otherwise (do what you want with it).
560      */
561     public synchronized boolean addOutgoingMessenger(EndpointAddress destination, Messenger messenger) {
562         Wisdom wisdom = getWisdom(destination);
563
564         if (wisdom != null) {
565             return wisdom.addOutgoingMessenger(messenger);
566         }
567         addWisdom(destination, new Wisdom(messenger, false));
568         return true;
569     }
570
571     /**
572      * Here is an incoming messenger that just popped out.
573      *
574      * @param destination The destination as an endpoint address (is automatically normalized to protocol and address only).
575      * @param messenger   The incoming messenger for that destination.
576      * @return true if this messenger was added (keep it open). false otherwise (do what you want with it).
577      */
578     public synchronized boolean addIncomingMessenger(EndpointAddress destination, Messenger messenger) {
579         Wisdom wisdom = getWisdom(destination);
580
581         if (wisdom != null) {
582             return wisdom.addIncomingMessenger(messenger);
583         }
584         addWisdom(destination, new Wisdom(messenger, true));
585         return true;
586     }
587
588     /**
589      * We tried to get a messenger but could not. We know that we do not have connectivity from our end, for now.  we may still
590      * have an incoming. However, if we had to try and make a messenger, there probably isn't an incoming, but that's not our
591      * business here. isNormallyReachable becomes false; but we can still try when solicited.
592      *
593      * @param destination The destination as an endpoint address (is automatically normalized to protocol and address only).
594      */
595     public synchronized void noOutgoingMessenger(EndpointAddress destination) {
596         Wisdom wisdom = getWisdom(destination);
597
598         if (wisdom != null) {
599             wisdom.noOutgoingMessenger();
600         }
601     }
602 }