]> sjero.net Git - linphone/blob - p2pproxy/dependencies-src/jxse-src-2.5/impl/src/net/jxta/impl/endpoint/router/RouteResolver.java
6f3cfd173bf27a5603c2ad65186bed497a96ea28
[linphone] / p2pproxy / dependencies-src / jxse-src-2.5 / impl / src / net / jxta / impl / endpoint / router / RouteResolver.java
1 /*
2  * Copyright (c) 2001-2007 Sun Microsystems, Inc.  All rights reserved.
3  *  
4  *  The Sun Project JXTA(TM) Software License
5  *  
6  *  Redistribution and use in source and binary forms, with or without 
7  *  modification, are permitted provided that the following conditions are met:
8  *  
9  *  1. Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *  
12  *  2. Redistributions in binary form must reproduce the above copyright notice, 
13  *     this list of conditions and the following disclaimer in the documentation 
14  *     and/or other materials provided with the distribution.
15  *  
16  *  3. The end-user documentation included with the redistribution, if any, must 
17  *     include the following acknowledgment: "This product includes software 
18  *     developed by Sun Microsystems, Inc. for JXTA(TM) technology." 
19  *     Alternately, this acknowledgment may appear in the software itself, if 
20  *     and wherever such third-party acknowledgments normally appear.
21  *  
22  *  4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must 
23  *     not be used to endorse or promote products derived from this software 
24  *     without prior written permission. For written permission, please contact 
25  *     Project JXTA at http://www.jxta.org.
26  *  
27  *  5. Products derived from this software may not be called "JXTA", nor may 
28  *     "JXTA" appear in their name, without prior written permission of Sun.
29  *  
30  *  THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
31  *  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND 
32  *  FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN 
33  *  MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
34  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 
35  *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, 
36  *  OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
37  *  LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
38  *  NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, 
39  *  EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
40  *  
41  *  JXTA is a registered trademark of Sun Microsystems, Inc. in the United 
42  *  States and other countries.
43  *  
44  *  Please see the license information page at :
45  *  <http://www.jxta.org/project/www/license.html> for instructions on use of 
46  *  the license in source files.
47  *  
48  *  ====================================================================
49  *  
50  *  This software consists of voluntary contributions made by many individuals 
51  *  on behalf of Project JXTA. For more information on Project JXTA, please see 
52  *  http://www.jxta.org.
53  *  
54  *  This license is based on the BSD license adopted by the Apache Foundation. 
55  */
56 package net.jxta.impl.endpoint.router;
57
58 import net.jxta.credential.Credential;
59 import net.jxta.document.*;
60 import net.jxta.endpoint.EndpointAddress;
61 import net.jxta.endpoint.OutgoingMessageEvent;
62 import net.jxta.exception.PeerGroupException;
63 import net.jxta.id.ID;
64 import net.jxta.impl.cm.Srdi;
65 import net.jxta.impl.cm.Srdi.SrdiInterface;
66 import net.jxta.impl.cm.SrdiIndex;
67 import net.jxta.impl.protocol.*;
68 import net.jxta.impl.util.TimeUtils;
69 import net.jxta.membership.MembershipService;
70 import net.jxta.peer.PeerID;
71 import net.jxta.peergroup.PeerGroup;
72 import net.jxta.platform.Module;
73 import net.jxta.protocol.*;
74 import net.jxta.resolver.QueryHandler;
75 import net.jxta.resolver.ResolverService;
76 import net.jxta.resolver.SrdiHandler;
77 import java.util.logging.Level;
78 import net.jxta.logging.Logging;
79 import java.util.logging.Logger;
80
81 import java.beans.PropertyChangeEvent;
82 import java.beans.PropertyChangeListener;
83 import java.io.IOException;
84 import java.io.Reader;
85 import java.io.StringReader;
86 import java.util.*;
87 import java.util.concurrent.atomic.AtomicInteger;
88
89 /**
90  * Handles dynamic route resolution.
91  */
92 class RouteResolver implements Module, QueryHandler, SrdiHandler, SrdiInterface {
93
94     /**
95      * Logger
96      */
97     private final static transient Logger LOG = Logger.getLogger(RouteResolver.class.getName());
98
99     /**
100      * Router Service Name
101      */
102     public final static String routerSName = "EndpointRouter";
103
104     private final static String srdiIndexerFileName = "routerSrdi";
105
106     /**
107      * Negative Route query acknowledgment
108      */
109     private final static int NACKROUTE_QUERYID = -1;
110
111     /**
112      * Bad route expiration. Amount of time we consider a route bad
113      */
114     private final static long BADROUTE_EXPIRATION = 2L * TimeUtils.AMINUTE;
115
116     /**
117      * Default dynamic route resolution configuration preference.
118      */
119     private final static boolean USE_ROUTE_RESOLVER_DEFAULT = true;
120
121     /**
122      * Configuration property that disables the usage
123      * of dynamic route resolution. Dynamic routes
124      * will not be discovered. set to true by default
125      * can be overwritten via ConfigParams
126      */
127     private boolean useRouteResolver = USE_ROUTE_RESOLVER_DEFAULT;
128
129     /**
130      * PeerGroup Service Handle
131      */
132     private PeerGroup group = null;
133
134     /**
135      * Resolver service handle
136      */
137     private ResolverService resolver = null;
138
139     /**
140      * membership service
141      */
142     private MembershipService membership = null;
143
144     /**
145      * EndpointRouter pointer
146      */
147     private EndpointRouter router = null;
148
149     /**
150      * local peer ID as a endpointAddress.
151      */
152     private EndpointAddress localPeerAddr = null;
153
154     /**
155      * local Peer ID
156      */
157     private ID localPeerId = null;
158
159     /**
160      * Route CM Persistent cache
161      */
162     private RouteCM routeCM = null;
163
164     /**
165      *  The current resolver query ID. static to make debugging easier.
166      */
167     private final static AtomicInteger qid = new AtomicInteger(0);
168
169     /**
170      * SRDI route index
171      */
172     private SrdiIndex srdiIndex = null;
173
174     /**
175      * SRDI Index
176      */
177     private Srdi srdi = null;
178
179     /**
180      *  Encapsulates current Membership Service credential.
181      */
182     final static class CurrentCredential {
183
184         /**
185          *      The current default credential
186          */
187         final Credential credential;
188         
189         /**
190          *      The current default credential in serialized XML form.
191          */
192         final XMLDocument credentialDoc;
193         
194         CurrentCredential(Credential credential, XMLDocument credentialDoc) {
195             this.credential = credential;
196             this.credentialDoc = credentialDoc;
197         }
198     }
199     
200     /**
201      *   The current Membership service default credential.
202      */
203     CurrentCredential currentCredential;
204     
205     /**
206      *  Listener we use for membership property events.
207      */
208     private class CredentialListener implements PropertyChangeListener {
209
210         /**
211          *  Standard Constructor
212          */
213         CredentialListener() {}
214         
215         /**
216          *  {@inheritDoc}
217          */
218         public void propertyChange(PropertyChangeEvent evt) {
219             if (MembershipService.DEFAULT_CREDENTIAL_PROPERTY.equals(evt.getPropertyName())) {
220                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
221                     LOG.fine("New default credential event");
222                 }
223
224                 synchronized (RouteResolver.this) {
225                     Credential cred = (Credential) evt.getNewValue();
226                     XMLDocument credentialDoc;
227                     if (null != cred) {
228                         try {
229                             credentialDoc = (XMLDocument) cred.getDocument(MimeMediaType.XMLUTF8);
230                             currentCredential = new CurrentCredential(cred, credentialDoc);
231                         } catch (Exception all) {
232                             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
233                                 LOG.log(Level.WARNING, "Could not generate credential document", all);
234                             }
235                             currentCredential = null;
236                         }
237                     } else {
238                         currentCredential = null;
239                     }
240                 }
241             }
242         }
243     }
244
245     final CredentialListener membershipCredListener = new CredentialListener();
246
247     /**
248      * @param router the router
249      */
250     RouteResolver(EndpointRouter router) {
251         this.router = router;
252     }
253
254     /**
255      * initialize  routeResolver
256      */
257     public void init(PeerGroup group, ID assignedID, Advertisement impl) throws PeerGroupException {
258
259         ModuleImplAdvertisement implAdvertisement = (ModuleImplAdvertisement) impl;
260
261         // extract Router service configuration properties
262         ConfigParams confAdv = group.getConfigAdvertisement();
263         XMLElement paramBlock = null;
264
265         if (confAdv != null) {
266             paramBlock = (XMLElement) confAdv.getServiceParam(assignedID);
267         }
268
269         if (paramBlock != null) {
270             // get our tunable router parameter
271             Enumeration param;
272
273             param = paramBlock.getChildren("useRouteResolver");
274             if (param.hasMoreElements()) {
275                 useRouteResolver = Boolean.getBoolean(((XMLElement) param.nextElement()).getTextValue());
276             }
277         }
278
279         this.group = group;
280
281         localPeerId = group.getPeerID();
282
283         localPeerAddr = EndpointRouter.pid2addr(group.getPeerID());
284
285         if (Logging.SHOW_CONFIG && LOG.isLoggable(Level.CONFIG)) {
286             StringBuilder configInfo = new StringBuilder("Configuring Router Transport Resolver : " + assignedID);
287
288             if (implAdvertisement != null) {
289                 configInfo.append("\n\tImplementation :");
290                 configInfo.append("\n\t\tModule Spec ID: ").append(implAdvertisement.getModuleSpecID());
291                 configInfo.append("\n\t\tImpl Description : ").append(implAdvertisement.getDescription());
292                 configInfo.append("\n\t\tImpl URI : ").append(implAdvertisement.getUri());
293                 configInfo.append("\n\t\tImpl Code : ").append(implAdvertisement.getCode());
294             }
295
296             configInfo.append("\n\tGroup Params :");
297             configInfo.append("\n\t\tGroup : ").append(group);
298             configInfo.append("\n\t\tPeer ID : ").append(group.getPeerID());
299
300             configInfo.append("\n\tConfiguration:");
301             configInfo.append("\n\t\tUse Route Resolver : ").append(useRouteResolver());
302             LOG.config(configInfo.toString());
303         }
304     }
305
306     /**
307      * {@inheritDoc}
308      */
309     public int startApp(String[] arg) {
310
311         resolver = group.getResolverService();
312
313         if (null == resolver) {
314             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
315                 LOG.warning("Endpoint Router start stalled until resolver service available");
316             }
317             return Module.START_AGAIN_STALLED;
318         }
319
320         membership = group.getMembershipService();
321
322         if (null == membership) {
323             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
324                 LOG.warning("Endpoint Router start stalled until membership service available");
325             }
326             return Module.START_AGAIN_STALLED;
327         }
328
329         resolver.registerHandler(routerSName, this);
330         // create and register the srdi service
331         srdiIndex = new SrdiIndex(group, srdiIndexerFileName);
332         // Srdi is a thread but we are not going to start,
333         // since the service is reactive.
334         srdi = new Srdi(group, routerSName, this, srdiIndex, 0, 0);
335         resolver.registerSrdiHandler(routerSName, this);
336
337         synchronized (this) {
338             // register our credential listener.
339             membership.addPropertyChangeListener(MembershipService.DEFAULT_CREDENTIAL_PROPERTY, membershipCredListener);
340             
341             try {
342                 // set the initial version of the default credential.
343                 currentCredential = null;
344                 Credential credential = membership.getDefaultCredential();
345                 XMLDocument credentialDoc;
346
347                 if (null != credential) {
348                     credentialDoc = (XMLDocument) credential.getDocument(MimeMediaType.XMLUTF8);
349                     currentCredential = new CurrentCredential(credential, credentialDoc);
350                 }
351             } catch (Exception all) {
352                 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
353                     LOG.log(Level.WARNING, "could not get default credential", all);
354                 }
355             }
356         }
357
358         // get the RouteCM cache service
359         routeCM = router.getRouteCM();
360
361         return Module.START_OK;
362     }
363
364     /**
365      * {@inheritDoc}
366      * <p/>
367      * Careful that stopApp() could in theory be called before startApp().
368      */
369     public void stopApp() {
370
371         resolver.unregisterHandler(routerSName);
372
373         // unregister SRDI
374         resolver.unregisterSrdiHandler(routerSName);
375         srdiIndex.stop();
376
377         membership.removePropertyChangeListener("defaultCredential", membershipCredListener);
378         currentCredential = null;
379
380         resolver = null;
381         srdi = null;
382         membership = null;
383     }
384
385     /**
386      * return routeResolver usage
387      *
388      * @return routeResolver usage
389      */
390     boolean useRouteResolver() {
391         return useRouteResolver;
392     }
393
394     /**
395      * enable routeResolver usage
396      * @param enable if true, enables route resolver
397      */
398     void enableRouteResolver(boolean enable) {
399         useRouteResolver = enable;
400     }
401
402     /**
403      * issue a new route discovery resolver request
404      *
405      * @param peer the destination as a logical endpoint address
406      */
407     protected void findRoute(EndpointAddress peer) {
408
409         RouteAdvertisement myRoute = router.getMyLocalRoute();
410
411         // No need to pursue further if we haven't initialized our own route as
412         // responding peers are not going to be able to respond to us.
413         if (myRoute == null) {
414             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
415                 LOG.fine("Cannot issue a find route if we don\'t know our own route");
416             }
417             return;
418         }
419
420         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
421             LOG.fine("Find route for peer = " + peer);
422         }
423
424         try {
425             // create a new RouteQuery message
426             RouteQuery doc;
427
428             // check if we have some bad route information
429             // for that peer, in that case pass the bad hop count
430             BadRoute badRoute;
431
432             badRoute = router.getBadRoute(peer);
433
434             if (badRoute != null) {
435                 // ok we have a bad route
436                 // pass the bad hops info as part of the query
437                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
438                     LOG.fine("findRoute sends query: known bad Hops" + badRoute);
439                 }
440                 doc = new RouteQuery(EndpointRouter.addr2pid(peer), myRoute, badRoute.getBadHops());
441             } else {
442                 doc = new RouteQuery(EndpointRouter.addr2pid(peer), myRoute, null);
443             }
444
445             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
446                 LOG.fine("Sending query for peer : " + peer);
447             }
448
449             XMLDocument credentialDoc;
450             CurrentCredential current = currentCredential;
451
452             if (null != current) {
453                 credentialDoc = current.credentialDoc;
454             } else {
455                 credentialDoc = null;
456             }
457           
458             ResolverQuery query = new ResolverQuery(routerSName, credentialDoc, localPeerId.toString(), doc.toString(), qid.incrementAndGet());
459
460             // only run SRDI if we are a rendezvous
461             // FIXME 20060106 bondolo This is not dynamic enough. The route 
462             // resolver needs to respond to changes in rendezvous configuration 
463             // at runtime.
464             if (group.isRendezvous()) {
465
466                 // check where to send the query via SRDI
467                 List<PeerID> results;
468
469                 if (srdiIndex != null) {
470                     // try to find a least 10 entries, will pick up one
471                     // randomly. This will protect against retry. It is
472                     // likely that a number of RDV will know about a route
473                     results = srdiIndex.query("route", RouteAdvertisement.DEST_PID_TAG, EndpointRouter.addr2pid(peer).toString(), 10);
474
475                     if (results != null && !results.isEmpty()) {
476                         // use SRDI to send the query
477                         // remove any non rdv peers from the candidate list
478                         // and garbage collect the index in the process
479                         List<PeerID> clean = cleanupAnyEdges(query.getSrcPeer(), results);
480
481                         if (!clean.isEmpty()) {
482                             // The purpose of incrementing the hopcount
483                             // when an SRDI index match is found (we got a
484                             // pointer to a rdv that should have the route) is to
485                             // restrict any further forwarding. The increment
486                             // count is only done when a matching SRDI index is
487                             // found. Not when the replica is selected as we
488                             // still need to forward the query.  This restriction
489                             // is purposelly done to avoid too many longjumps
490                             // within a walk.
491                             query.incrementHopCount();
492
493                             srdi.forwardQuery(clean, query, 1);
494                             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
495                                 LOG.fine("found an srdi entry forwarding query to SRDI peer");
496                             }
497                             return;
498                         }
499                     } else {
500                         // it is not in our cache, look for the replica peer
501                         // we need to send the query
502                         PeerID destPeer = srdi.getReplicaPeer(EndpointRouter.addr2pid(peer).toString());
503
504                         if (destPeer != null && !destPeer.equals(localPeerId)) {
505                             // don't push anywhere if we do not have a replica
506                             // or we are trying to push to ourself
507                             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
508                                 LOG.fine("processQuery srdiIndex DHT forward :" + destPeer);
509                             }
510
511                             srdi.forwardQuery(destPeer, query);
512                             return;
513                         } else {
514                             LOG.fine("processQuery srdiIndex DHT forward resulted in no op");
515                         }
516                     }
517                 }
518             }
519
520             // if we reach that point then we just use the resolver walk
521             resolver = group.getResolverService();
522             if (resolver != null) {
523                 resolver.sendQuery(null, query);
524                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
525                     LOG.fine("find route query sent");
526                 }
527             } else {
528                 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
529                     LOG.warning("cannot get the resolver service");
530                 }
531             }
532         } catch (Exception ee) {
533             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
534                 LOG.log(Level.WARNING, "Exception in findRoute", ee);
535             }
536         }
537     }
538
539     /**
540      * {@inheritDoc}
541      * <p/>
542      * This is called by the Generic ResolverServiceImpl when processing a
543      * response to a query.
544      */
545     public void processResponse(ResolverResponseMsg response) {
546
547         if (!useRouteResolver) { // Route resolver disabled
548             return;
549         }
550
551         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
552             LOG.fine("processResponse got a response");
553         }
554
555         // convert the response into a RouteResponse
556         RouteResponse doc = null;
557
558         try {
559             Reader ip = new StringReader(response.getResponse());
560
561             XMLDocument asDoc = (XMLDocument)
562                     StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8, ip);
563
564             doc = new RouteResponse(asDoc);
565         } catch (Exception e) {
566             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
567                 LOG.log(Level.WARNING, "malformed response - discard", e);
568             }
569             return;
570         }
571
572         RouteAdvertisement dstRoute = doc.getDestRoute();
573         RouteAdvertisement srcRoute = doc.getSrcRoute();
574         int queryId = response.getQueryId();
575
576         EndpointAddress routingPeer = EndpointRouter.pid2addr(srcRoute.getDestPeerID());
577         EndpointAddress destPeer = EndpointRouter.pid2addr(dstRoute.getDestPeerID());
578
579         // check if we have a negative route response
580         if (queryId == NACKROUTE_QUERYID) {
581             AccessPointAdvertisement badHop = dstRoute.nextHop(EndpointRouter.addr2pid(routingPeer));
582
583             PeerID badPeer;
584
585             if (badHop != null) {
586                 badPeer = badHop.getPeerID();
587             } else { // the bad hop is the final destination
588                 badPeer = dstRoute.getDestPeerID();
589             }
590
591             processBadRoute(badPeer, dstRoute);
592             return;
593         }
594
595         // This is not our own peer adv, so we must not keep it
596         // for more than its expiration time.
597         // we only need to publish this route if
598         // we don't know about it yet
599         // XXX: here is where we could be more conservative and use isNormallyReachable() instead, thus excluding
600         // incoming messengers.
601         if ((!router.isLocalRoute(EndpointRouter.pid2addr(srcRoute.getDestPeerID())))
602                 && (!router.isRoutedRoute(srcRoute.getDestPeerID()))) {
603             router.updateRouteAdv(srcRoute);
604         }
605
606         if (destPeer.equals(routingPeer)) {
607             // The dest peer itself managed to respond to us. That means we
608             // learned the route from the reverseRoute in the message
609             // itself. So, there's nothing we need to do.
610             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
611                 LOG.fine("learn route directly from the destination");
612             }
613         } else {
614             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
615                 LOG.fine("learn route:" + routingPeer);
616             }
617
618             try {
619                 // build the candidate route using the
620                 // route response from the respondant peer
621                 RouteAdvertisement candidateRoute = RouteAdvertisement.newRoute(EndpointRouter.addr2pid(destPeer),
622                         EndpointRouter.addr2pid(routingPeer),(Vector) dstRoute.getVectorHops().clone());
623
624                 // cleanup the candidate route from any loop and remove the local peer extra
625                 // cycle
626                 RouteAdvertisement.cleanupLoop(candidateRoute, (PeerID) localPeerId);
627
628                 // Is there anything left in that route (or did the respondant
629                 // believe that we are the last hop on the route - which
630                 // obviously we are not.
631                 if (candidateRoute.size() == 0) {
632                     if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
633                         LOG.fine("Route response outdated: NACK responder");
634                     }
635                     generateNACKRoute(EndpointRouter.addr2pid(routingPeer), EndpointRouter.addr2pid(destPeer), dstRoute.getVectorHops());
636                     return;
637                 }
638
639                 // get the address of the first hop in the route to verify that
640                 // we have a route (direct or long) to the first hop, so the route
641                 // is valid
642                 EndpointAddress candidateRouter = EndpointRouter.pid2addr(candidateRoute.getFirstHop().getPeerID());
643
644                 // check that we have a direct connection to the first hop
645                 if (router.ensureLocalRoute(candidateRouter, null) == null) {
646                     // If we do not have a direct route to the candidate router check
647                     // for a long route in that case stich the route
648                     RouteAdvertisement routeToRouter = router.getRoute(candidateRouter, false);
649
650                     if (routeToRouter == null) {
651                         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
652                             LOG.fine("Route response useless: no route to next router hop");
653                         }
654                         return;
655                     }
656
657                     // stich the route removing any loops and localPeer cycle
658                     if (RouteAdvertisement.stichRoute(candidateRoute, routeToRouter, (PeerID) localPeerId)) {
659                         router.setRoute(candidateRoute, false);
660                     } else {
661                         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
662                             LOG.fine("Route response error stiching route response");
663                         }
664                         return;
665                     }
666                 } else {
667                     // we have a direct connection with the first hop of the candidate route
668                     // set the new route, which starts with the peer that replied to us.
669                     router.setRoute(candidateRoute, false);
670                 }
671             } catch (Exception ex) {
672                 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
673                     LOG.log(Level.WARNING, "Failure building response route", ex);
674                     LOG.warning("               bad dstRoute: " + dstRoute.display());
675                 }
676             }
677
678             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
679                 LOG.fine("finish process route response successfully");
680             }
681         }
682     }
683
684     /**
685      * bad route, so let's remove everything we have so
686      * we can start from scratch. We are maintaining a
687      * bad route up to DEFAULT_ROUTE expiration after
688      * that we consider it to be ok to retry the same route
689      * We are removing both the route and peer advertisement
690      * to force a new route query
691      *
692      * @param badHop source PeerID of NACK route info
693      * @param dest   original route information
694      */
695     private void processBadRoute(PeerID badHop, RouteAdvertisement dest) {
696
697         EndpointAddress addr = EndpointRouter.pid2addr(dest.getDestPeerID());
698
699         if (addr == null) {
700             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
701                 LOG.warning("remove bad route has a bad route info - discard");
702             }
703             return;
704         }
705
706         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
707             LOG.fine("remove bad route info for dest " + dest.display());
708             if (badHop != null) {
709                 LOG.fine("remove bad route bad hop " + badHop);
710             }
711         }
712
713         try {
714
715             // check first that we still have the same route, we may already
716             // using a new route
717             RouteAdvertisement currentRoute = router.getRoute(addr, false);
718
719             if (currentRoute == null) { // we already cleanup the route info
720                 return;
721             }
722
723             // check if we still have the old bad route, we may have
724             // already updated the route
725             if (!currentRoute.equals(dest)) {
726
727                 // check if the bad hop is not the destination
728                 // if it is then we still have a bad route
729                 if (badHop == null) {
730                     // we could get the bad hop, so consider the route ok
731                     return;
732                 }
733                 if (badHop.equals(EndpointRouter.addr2pid(addr))) {
734                     // check if the new route may still contain the bad hop
735                     // the known bad hop is the hop after the src peer that
736                     // responded with a NACK route
737                     // In this case we also consider the route bad
738                     if (!currentRoute.containsHop(badHop)) {
739                         return; // we are ok
740                     } else {
741                         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
742                             LOG.fine("current route is bad because it contains known bad hop" + badHop);
743                         }
744                     }
745                 } else {
746                     if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
747                         LOG.fine("current route is bad because it contains known bad destination" + badHop);
748                     }
749                 }
750
751             }
752
753             // keep the bad one in a cache table so we don't retry them
754             // right away. We use the default route timeout
755             BadRoute badRoute = (router.getBadRoute(addr));
756
757             if (badRoute != null) {
758                 if (badRoute.getExpiration() > TimeUtils.timeNow()) {// nothing to do. the information is still valid
759                 } else {
760                     // It is ancient knowlege update it
761                     badRoute.setExpiration(TimeUtils.toAbsoluteTimeMillis(BADROUTE_EXPIRATION));
762                 }
763
764                 // check if we have to add a new bad hop
765                 // to our bad route
766                 if (badHop != null) {
767                     badRoute.addBadHop(badHop);
768                     badRoute.setExpiration(TimeUtils.toAbsoluteTimeMillis(BADROUTE_EXPIRATION));
769                 }
770
771                 router.setBadRoute(addr, badRoute);
772                 return;
773             } else {
774                 // create a new NACK route entry
775                 Set<PeerID> badHops;
776
777                 if (badHop != null) {
778                     badHops = Collections.singleton(badHop);
779                 } else {
780                     badHops = Collections.emptySet();
781                 }
782
783                 badRoute = new BadRoute(dest, TimeUtils.toAbsoluteTimeMillis(BADROUTE_EXPIRATION), badHops);
784                 router.setBadRoute(addr, badRoute);
785             }
786
787             // remove route from route CM
788             routeCM.flushRoute(EndpointRouter.addr2pid(addr));
789
790             // let's remove the remote route info from the routing table
791             // we do this after we removed the entries from the CM
792             // to avoid that another thread is putting back the entry
793             router.removeRoute(EndpointRouter.addr2pid(addr));
794         } catch (Exception ex) {
795             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
796                 LOG.log(Level.WARNING, "exception during bad route removal", ex);
797             }
798         }
799     }
800
801     /**
802      * Process the Query, and generate response
803      *
804      * @param query the query to process
805      */
806     public int processQuery(ResolverQueryMsg query) {
807
808         if (!useRouteResolver) { // Route resolver disabled
809             return ResolverService.OK;
810         }
811
812         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
813             LOG.fine("processQuery starts");
814         }
815
816         RouteQuery routeQuery;
817         try {
818             Reader ip = new StringReader(query.getQuery());
819             XMLDocument asDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8, ip);
820             routeQuery = new RouteQuery(asDoc);
821         } catch (RuntimeException e) {
822             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
823                 LOG.log(Level.FINE, "Malformed Route query ", e);
824             }
825             return ResolverService.OK;
826         } catch (IOException e) {
827             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
828                 LOG.log(Level.FINE, "Malformed Route query ", e);
829             }
830             return ResolverService.OK;
831         } 
832
833         PeerID pId = routeQuery.getDestPeerID();
834
835         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
836             LOG.fine("Looking for route to " + pId);
837         }
838
839         RouteAdvertisement srcRoute = routeQuery.getSrcRoute();
840         Collection<PeerID> badHops = routeQuery.getBadHops();
841
842         if (Logging.SHOW_FINER && LOG.isLoggable(Level.FINER)) {
843             StringBuilder badHopsDump = new StringBuilder("bad Hops :\n");
844
845             for (ID aBadHop : badHops) {
846                 badHopsDump.append('\t').append(aBadHop);
847             }
848
849             LOG.finer(badHopsDump.toString());
850         }
851
852         // if our source route is not null, then publish it
853         if (srcRoute != null) {
854             if (!(srcRoute.getDestPeerID()).equals(localPeerId)) {
855                 // This is not our own peer adv so we must not keep it
856                 // longer than its expiration time.
857                 try {
858                     if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
859                         LOG.fine("Publishing sender route info " + srcRoute.getDestPeerID());
860                     }
861
862                     // we only need to publish this route if
863                     // we don't know about it yet
864                     // XXX: here is where we could be more conservative and use isNormallyReachable() instead, thus excluding
865                     // incoming messengers.
866                     if ((!router.isLocalRoute(EndpointRouter.pid2addr(srcRoute.getDestPeerID())))
867                             && (!router.isRoutedRoute(srcRoute.getDestPeerID()))) {
868                         routeCM.publishRoute(srcRoute);
869                     }
870                 } catch (Exception e) {
871                     if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
872                         LOG.log(Level.FINE, "Could not publish Route Adv from query - discard", e);
873                     }
874                     return ResolverService.OK;
875                 }
876             }
877         } else {
878             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
879                 LOG.fine("No src Route in route query - discard ");
880             }
881             return ResolverService.OK;
882         }
883
884         if (pId == null) {
885             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
886                 LOG.fine("Malformed route query request, no PeerId - discard");
887             }
888             return ResolverService.OK;
889         }
890
891         // We have more luck with that one because, since it is part of OUR
892         // message, and not part of the resolver protocol, it is in OUR
893         // format.
894         EndpointAddress qReqAddr = EndpointRouter.pid2addr(pId);
895
896         RouteAdvertisement route;
897
898         // check if this peer has a route to the destination
899         // requested
900         boolean found = false;
901
902         if (qReqAddr.equals(localPeerAddr)) {
903             found = true;
904             // return the route that is my local route
905             route = router.getMyLocalRoute();
906         } else {
907             // only rendezvous can respond to route requests
908             // if not we are generating too much traffic
909             // XXX: here is where we could be more conservative and use isNormallyReachable() instead, thus excluding
910             // incoming messengers.
911             if (router.isLocalRoute(qReqAddr)) {
912                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
913                     LOG.fine("Peer has direct route to destination ");
914                 }
915                 // we should set the route to something  :-)
916
917                 found = true;
918
919                 // this peer has a direct route to the destination
920                 // return the short route advertisement we know for this peer
921                 // (For us it is zero hop, and we advertise ourself as the routing
922                 // peer in the response. The stiching is done by whoever gets that
923                 // response). May be there are more than one hop advertised in-there...
924                 // alternate routing peers...should we leave them ?
925                 // For now, we keep the full dest, but wack the hops.
926
927                 route = (RouteAdvertisement) AdvertisementFactory.newAdvertisement(RouteAdvertisement.getAdvertisementType());
928
929                 AccessPointAdvertisement ap = (AccessPointAdvertisement)
930                         AdvertisementFactory.newAdvertisement(AccessPointAdvertisement.getAdvertisementType());
931
932                 ap.setPeerID(pId);
933                 route.setDest(ap);
934             } else {
935                 route = router.getRoute(qReqAddr, false);
936                 if (route != null) {
937                     found = true;
938                     // check if we were given some bad hops info
939                     // and see if the found route contains
940                     // any of these bad hops. In that case, we need
941                     // to mark this route as bad
942                     for (PeerID aBadHop : badHops) {
943                         // destination is known to be bad
944                         if (EndpointRouter.addr2pid(qReqAddr).equals(aBadHop)) {
945                             processBadRoute(aBadHop, route);
946                             found = false;
947                             break;
948                         }
949
950                         if (route.containsHop(aBadHop)) {
951                             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
952                                 LOG.fine("Peer has bad route due to " + aBadHop);
953                             }
954                             processBadRoute(aBadHop, route);
955                             found = false;
956                             break;
957                         }
958                     }
959                 }
960             }
961         }
962
963         if (!found) {
964             // discard the request if we are not a rendezvous
965             // else forward to the next peers
966             if (!group.isRendezvous()) {
967                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
968                     LOG.fine("discard query forwarding as not a rendezvous");
969                 }
970                 return ResolverService.OK;
971             }
972
973             // did not find a route, check our srdi cache
974             // make sure we protect against out of sync
975             // SRDI index
976
977             // srdi forwarding is only involved once the Index entry has
978             // been found and we forwarded the resolver query. Afterward a
979             // normal walk proceeds from the initial SRDI index pointing
980             // rdv. This is done to protect against potential loopback
981             // entries in the SRDI cache index due to out of sync peerview
982             // and index.
983             if (query.getHopCount() < 2) {
984
985                 // check local SRDI cache to see if we have the entry
986                 // we look for 10 entries, will pickup one randomly
987                 List<PeerID> results = srdiIndex.query("route", RouteAdvertisement.DEST_PID_TAG, pId.toString(), 10);
988
989                 if (results.size() > 0) {
990                     if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
991                         LOG.fine("processQuery srdiIndex lookup match :" + results.size());
992                     }
993
994                     // remove any non-rdv peers to avoid sending
995                     // to a non-rdv peers and garbage collect the SRDI
996                     // index in the process
997                     List<PeerID> clean = cleanupAnyEdges(query.getSrcPeer(), results);
998
999                     if (clean.size() > 0) {
1000
1001                         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1002                             LOG.fine("found an srdi entry forwarding query to SRDI peer");
1003                         }
1004
1005                         // The purpose of incrementing the hopcount
1006                         // when an SRDI index match is found (we got a
1007                         // pointer to a rdv that should have the route) is to
1008                         // restrict any further forwarding. The increment
1009                         // count is only done when a matching SRDI index is
1010                         // found. Not when the replica is selected as we
1011                         // still need to forward the query.  This restriction
1012                         // is purposelly done to avoid too many longjumps
1013                         // within a walk.
1014                         query.incrementHopCount();
1015
1016                         // Note: this forwards the query to 1 peer randomly
1017                         // selected from the result
1018                         srdi.forwardQuery(clean, query, 1);
1019
1020                         // tell the resolver no further action is needed.
1021                         return ResolverService.OK;
1022                     }
1023                 }
1024             }
1025
1026             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1027                 LOG.fine("did not find a route or SRDI index");
1028             }
1029
1030             // force a walk
1031             return ResolverService.Repropagate;
1032         }
1033
1034         // we found a route send the response
1035         try {
1036             if (route == null) {
1037                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1038                     LOG.fine("we should have had a route at this point");
1039                 }
1040                 return ResolverService.OK;
1041             }
1042
1043             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1044                 LOG.fine("we have a route build route response" + route.display());
1045             }
1046
1047             RouteAdvertisement myRoute = router.getMyLocalRoute();
1048
1049             // make sure we initialized our local
1050             // route info as we will need it to respond. We may
1051             // not have our route if we are still
1052             // waiting for a relay connection.
1053             if (myRoute == null) {
1054                 return ResolverService.OK;
1055             }
1056
1057             RouteResponse routeResponse = new RouteResponse();
1058
1059             routeResponse.setDestRoute(route);
1060             routeResponse.setSrcRoute(myRoute);
1061
1062             if (routeResponse == null) {
1063                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1064                     LOG.fine("error creating route response");
1065                 }
1066                 return ResolverService.OK;
1067             }
1068
1069             // construct a response from the query
1070             ResolverResponseMsg res = query.makeResponse();
1071
1072             CurrentCredential current = currentCredential;
1073
1074             if (null != current) {
1075                 res.setCredential(current.credentialDoc);
1076             }
1077             res.setResponse(routeResponse.toString());
1078
1079             resolver.sendResponse(query.getSrcPeer().toString(), res);
1080             return ResolverService.OK;
1081
1082         } catch (Exception ee) {
1083             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1084                 LOG.log(Level.FINE, "processQuery: error while processing query ", ee);
1085             }
1086             return ResolverService.OK;
1087         }
1088     }
1089
1090     /**
1091      * Return a route error in case a route was found to be invalid
1092      * as the current hop cannot find a way to forward the message to the
1093      * destination or any other hops in the forward part of the route.
1094      * In that case a negative route response is forwarded
1095      * to the original source of the message. Now of course we
1096      * do not have any way to guarantee that the NACK message will be
1097      * received by the sender, but the best we can do is to try :-)
1098      * <p/>
1099      * we send a query ID to NACKROUTE_QUERYID to indicate
1100      * this is a bad Route
1101      *
1102      * @param src      original source of the message
1103      * @param dest     original destination of the message
1104      * @param origHops original hops
1105      */
1106     protected void generateNACKRoute(PeerID src, PeerID dest, Vector<AccessPointAdvertisement> origHops) {
1107
1108         // As long as the group is partially initialized, do not bother
1109         // trying to send NACKS. We can't: it just causes NPEs.
1110         if (resolver == null) {
1111             return;
1112         }
1113
1114         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1115             LOG.fine("generate NACK Route response " + src);
1116         }
1117
1118         // check first, if we are not already in process of looking for a
1119         // route to the destination peer of the NACK. We should not try to
1120         // send a NACK to that destination at that point as this will block
1121         // our incoming processing thread while it is looking for a route to
1122         // that destination. If there a no pending route requests to that
1123         // destination then we can go ahead an attempt to send the NACK. At
1124         // the maximum we should have only one thread block while looking for
1125         // a specific destination. When we find a route to the destination,
1126         // the next NACK processing will be sent.
1127
1128         if (router.isPendingRouteQuery(src)) {
1129             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1130                 LOG.fine("drop NACK due to pending route discovery " + src);
1131             }
1132             return;
1133         }
1134
1135         // Generate a route response
1136         RouteAdvertisement route = (RouteAdvertisement)
1137                 AdvertisementFactory.newAdvertisement(RouteAdvertisement.getAdvertisementType());
1138
1139         AccessPointAdvertisement ap = (AccessPointAdvertisement)
1140                 AdvertisementFactory.newAdvertisement(AccessPointAdvertisement.getAdvertisementType());
1141
1142         ap.setPeerID(dest);
1143         route.setDest(ap);
1144         route.setHops(origHops);
1145
1146         // set the the route of the peer that
1147         // detected the bad route
1148         RouteAdvertisement routeSrc = (RouteAdvertisement)
1149                 AdvertisementFactory.newAdvertisement(RouteAdvertisement.getAdvertisementType());
1150
1151         AccessPointAdvertisement apSrc = (AccessPointAdvertisement)
1152                 AdvertisementFactory.newAdvertisement(AccessPointAdvertisement.getAdvertisementType());
1153
1154         apSrc.setPeerID((PeerID) localPeerId);
1155         routeSrc.setDest(apSrc);
1156
1157         RouteResponse routeResponse = new RouteResponse();
1158
1159         routeResponse.setDestRoute(route);
1160         routeResponse.setSrcRoute(routeSrc);
1161         
1162         ResolverResponse res = new ResolverResponse();
1163
1164         res.setHandlerName(routerSName);
1165         
1166         CurrentCredential current = currentCredential;
1167
1168         if (null != current) {
1169             res.setCredential(current.credentialDoc); 
1170         } 
1171         
1172         res.setQueryId(NACKROUTE_QUERYID);        
1173         res.setResponse(routeResponse.toString());
1174
1175         // send the NACK response back to the originator
1176         resolver.sendResponse(src.toString(), res);
1177     }
1178
1179     /**
1180      * process an SRDI message request
1181      *
1182      * @param message SRDI resolver message
1183      */
1184     public boolean processSrdi(ResolverSrdiMsg message) {
1185         if(!group.isRendezvous()) {
1186             return true;
1187         }
1188         
1189         String value;
1190         SrdiMessage srdiMsg;
1191
1192         try {
1193             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1194                 LOG.fine("Received a SRDI messsage in group" + group.getPeerGroupName());
1195             }
1196
1197             XMLDocument asDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8, new StringReader(message.getPayload()));
1198             srdiMsg = new SrdiMessageImpl(asDoc);
1199         } catch (Exception e) {
1200             // we don't understand this msg, let's skip it
1201             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1202                 LOG.log(Level.WARNING, "corrupted SRDI message", e);
1203             }
1204             return false;
1205         }
1206
1207         PeerID pid = srdiMsg.getPeerID();
1208
1209         // filter messages that contain messages
1210         // about the local peer, so we don't enter
1211         // self-reference
1212         if (pid.equals(localPeerId)) {
1213             return false;
1214         }
1215
1216         for (SrdiMessage.Entry entry : srdiMsg.getEntries()) {
1217             // drop any information  about ourself
1218             if (entry.key.equals(localPeerId.toString())) {
1219                 continue;
1220             }
1221             value = entry.value;
1222             if (value == null) {
1223                 value = "";
1224             }
1225
1226             // Expiration of entries is taken care of by SrdiIdex, so we always add
1227             // FIXME hamada 20030314
1228             // All routes are added under the secondary key 'DstPID', it would be more correct to
1229             // Specify it in the message, but since versioning is not yet supported the following is
1230             // acceptable, since it is localized
1231             srdiIndex.add(srdiMsg.getPrimaryKey(), RouteAdvertisement.DEST_PID_TAG, entry.key, pid, entry.expiration);
1232             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1233                 LOG.fine("Primary Key [" + srdiMsg.getPrimaryKey() + "] key [RouteAdvertisement.DEST_PID_TAG]" + " value [" + entry.key + "] exp [" + entry.expiration + "]");
1234             }
1235         }
1236
1237         return true;
1238     }
1239
1240     /**
1241      * {@inheritDoc}
1242      */
1243     public void pushEntries(boolean all) {
1244         // only send to the replica
1245         pushSrdi(null, all);
1246     }
1247
1248     /*
1249      * push all srdi entries to the rednezvous SRDI cache (new connection)
1250      *
1251      *@param all if true push all entries, otherwise just deltas
1252      */
1253     protected void pushSrdi(String peer, boolean all) {
1254
1255         SrdiMessage srdiMsg;
1256         Vector<SrdiMessage.Entry> routeIx = new Vector<SrdiMessage.Entry>();
1257
1258         // 20021018 tra:Route info don't expire unless the peer disappears
1259         // This approach is used to limit the SRDI traffic. The key
1260         // point here is that SRDI is used to tell a peer that another
1261         // has a route to the destination it is looking for. The information
1262         // that SRDI cache is not so much the specific route info but rather
1263         // the fact that a peer has knowledge of a route to another peer
1264         // We don't want to update the SRDI cache on every route update.
1265         // The SRDI cache will be flushed when the peer disconnect from
1266         // the rendezvous.
1267
1268         // We cannot support concurrent modification of the map while we
1269         // do that: we must synchronize...
1270         for (Iterator<ID> each = router.getAllRoutedRouteAddresses(); each.hasNext();) {
1271             ID pid = each.next();
1272             SrdiMessage.Entry entry = new SrdiMessage.Entry(pid.toString(), "", Long.MAX_VALUE);
1273             routeIx.addElement(entry);
1274         }
1275
1276         try {
1277             // check if we have anything to send
1278             if (routeIx.size() == 0) {
1279                 return;
1280             }
1281
1282             srdiMsg = new SrdiMessageImpl(group.getPeerID(),
1283                     // one hop
1284                     1,
1285                     "route", routeIx);
1286
1287             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1288                 LOG.fine("Sending a SRDI messsage of [All=" + all + "] routes");
1289             }
1290             // this will replicate entry to the  SRDI replica peers
1291             srdi.replicateEntries(srdiMsg);
1292         } catch (Exception e) {
1293             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1294                 LOG.log(Level.WARNING, "SRDI Push failed", e);
1295             }
1296         }
1297     }
1298
1299     /*
1300      * push srdi entries to the SRDI rendezvous cache
1301      * @param all if true push all entries, otherwise just deltas
1302      */
1303     protected void pushSrdi(ID peer, PeerID id) {
1304
1305         SrdiMessage srdiMsg;
1306
1307         try {
1308             srdiMsg = new SrdiMessageImpl(group.getPeerID(), 1, // only one hop
1309                     "route", id.toString(), null, Long.MAX_VALUE); // maximum expiration
1310             // 20021018 tra:Route info don't expire unless the peer disappears
1311             // This approach is used to limit the SRDI traffic. The key
1312             // point here is that SRDI is used to tell a peer that another
1313             // has a route to the destination it is looking for. The information
1314             // that SRDI cache is not so much the specific route info but rather
1315             // the fact that a peer has knowledge of a route to another peer
1316             // We don't want to update the SRDI cache on every route update.
1317             // The SRDI cache will be flushed when the peer disconnect from
1318             // the rendezvous.
1319             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1320                 LOG.fine("sending a router SRDI message add route " + id);
1321             }
1322             if (peer == null) {
1323                 PeerID destPeer = srdi.getReplicaPeer(id.toString());
1324                 peer = destPeer;
1325             }
1326             // don't push anywhere if we do not have a replica
1327             // or we are trying to send the query to ourself
1328             if (!localPeerId.equals(peer)) {
1329                 srdi.pushSrdi(peer, srdiMsg);
1330             }
1331         } catch (Exception e) {
1332             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1333                 LOG.log(Level.WARNING, "SRDI push failed", e);
1334             }
1335         }
1336     }
1337
1338     /**
1339      * remove a SRDI cache entry
1340      *
1341      * @param peer peer id we send the request, null for sending to all
1342      * @param id   peer id of the SRDI route that we want to remove
1343      *             from the cache
1344      */
1345     protected void removeSrdi(String peer, PeerID id) {
1346
1347         SrdiMessage srdiMsg;
1348
1349         try {
1350             srdiMsg = new SrdiMessageImpl(group.getPeerID(), 1, // only one hop
1351                     "route", id.toString(), null, // 0 means remove
1352                     0);
1353
1354             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1355                 LOG.fine("sending a router SRDI message delete route " + id);
1356             }
1357
1358             if (peer == null) {
1359                 PeerID destPeer = srdi.getReplicaPeer(id.toString());
1360
1361                 // don't push anywhere if we do not have replica
1362                 // or we are trying to push to ouself
1363                 if (destPeer != null && (!destPeer.equals(localPeerId))) {
1364                     srdi.pushSrdi(destPeer, srdiMsg);
1365                 }
1366             }
1367         } catch (Exception e) {
1368             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1369                 LOG.log(Level.FINE, "Removing srdi entry failed", e);
1370             }
1371         }
1372     }
1373
1374     /**
1375      * {@inheritDoc}
1376      */
1377     public void messageSendFailed(PeerID peerid, OutgoingMessageEvent e) {
1378         // when the resolver failed to send, we get a notification and
1379         // flush the SRDI cache entries for that destination
1380         removeSrdiIndex(peerid);
1381     }
1382
1383     /**
1384      * cleanup any edge peers when trying to forward an SRDI query so we are
1385      * guaranteed to the best of our knowledge that the peer is a rendezvous.
1386      * This is not perfect, as it may take time for the peerview to converge but
1387      * at least we can remove any peers that is not a rendezvous.
1388      *
1389      * @param src     source
1390      * @param results vector of PeerIDs
1391      * @return cleaned up vector of PeerIDs
1392      */
1393     protected List<PeerID> cleanupAnyEdges(ID src, List<PeerID> results) {
1394         List<PeerID> clean = new ArrayList<PeerID>(results.size());
1395
1396         // put the peerview as a vector of PIDs
1397         List<PeerID> rpvId = srdi.getGlobalPeerView();
1398
1399         // remove any peers not in the current peerview
1400         // these peers may be gone or have become edges
1401         for (PeerID pid : results) {
1402             // eliminate the src of the query so we don't resend
1403             // the query to whom send it to us
1404             if (src.equals(pid)) {
1405                 continue;
1406             }
1407             // remove the local also, so we don't send to ourself
1408             if (localPeerId.equals(pid)) {
1409                 continue;
1410             }
1411             if (rpvId.contains(pid)) {
1412                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1413                     LOG.fine("valid rdv for SRDI forward " + pid);
1414                 }
1415                 clean.add(pid);
1416             } else {
1417                 // cleanup our SRDI cache for that peer
1418                 srdiIndex.remove(pid);
1419             }
1420         }
1421         return clean;
1422     }
1423
1424     /**
1425      * remove SRDI index
1426      *
1427      * @param pid of the index to be removed
1428      */
1429     protected void removeSrdiIndex(PeerID pid) {
1430         srdiIndex.remove(pid);
1431     }
1432 }