2 * Copyright (c) 2001-2007 Sun Microsystems, Inc. All rights reserved.
4 * The Sun Project JXTA(TM) Software License
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
9 * 1. Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
16 * 3. The end-user documentation included with the redistribution, if any, must
17 * include the following acknowledgment: "This product includes software
18 * developed by Sun Microsystems, Inc. for JXTA(TM) technology."
19 * Alternately, this acknowledgment may appear in the software itself, if
20 * and wherever such third-party acknowledgments normally appear.
22 * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must
23 * not be used to endorse or promote products derived from this software
24 * without prior written permission. For written permission, please contact
25 * Project JXTA at http://www.jxta.org.
27 * 5. Products derived from this software may not be called "JXTA", nor may
28 * "JXTA" appear in their name, without prior written permission of Sun.
30 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
31 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
32 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN
33 * MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
34 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
36 * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
37 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
38 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
39 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41 * JXTA is a registered trademark of Sun Microsystems, Inc. in the United
42 * States and other countries.
44 * Please see the license information page at :
45 * <http://www.jxta.org/project/www/license.html> for instructions on use of
46 * the license in source files.
48 * ====================================================================
50 * This software consists of voluntary contributions made by many individuals
51 * on behalf of Project JXTA. For more information on Project JXTA, please see
52 * http://www.jxta.org.
54 * This license is based on the BSD license adopted by the Apache Foundation.
56 package net.jxta.impl.endpoint.router;
58 import net.jxta.credential.Credential;
59 import net.jxta.document.*;
60 import net.jxta.endpoint.EndpointAddress;
61 import net.jxta.endpoint.OutgoingMessageEvent;
62 import net.jxta.exception.PeerGroupException;
63 import net.jxta.id.ID;
64 import net.jxta.impl.cm.Srdi;
65 import net.jxta.impl.cm.Srdi.SrdiInterface;
66 import net.jxta.impl.cm.SrdiIndex;
67 import net.jxta.impl.protocol.*;
68 import net.jxta.impl.util.TimeUtils;
69 import net.jxta.membership.MembershipService;
70 import net.jxta.peer.PeerID;
71 import net.jxta.peergroup.PeerGroup;
72 import net.jxta.platform.Module;
73 import net.jxta.protocol.*;
74 import net.jxta.resolver.QueryHandler;
75 import net.jxta.resolver.ResolverService;
76 import net.jxta.resolver.SrdiHandler;
77 import java.util.logging.Level;
78 import net.jxta.logging.Logging;
79 import java.util.logging.Logger;
81 import java.beans.PropertyChangeEvent;
82 import java.beans.PropertyChangeListener;
83 import java.io.IOException;
84 import java.io.Reader;
85 import java.io.StringReader;
87 import java.util.concurrent.atomic.AtomicInteger;
90 * Handles dynamic route resolution.
92 class RouteResolver implements Module, QueryHandler, SrdiHandler, SrdiInterface {
97 private final static transient Logger LOG = Logger.getLogger(RouteResolver.class.getName());
100 * Router Service Name
102 public final static String routerSName = "EndpointRouter";
104 private final static String srdiIndexerFileName = "routerSrdi";
107 * Negative Route query acknowledgment
109 private final static int NACKROUTE_QUERYID = -1;
112 * Bad route expiration. Amount of time we consider a route bad
114 private final static long BADROUTE_EXPIRATION = 2L * TimeUtils.AMINUTE;
117 * Default dynamic route resolution configuration preference.
119 private final static boolean USE_ROUTE_RESOLVER_DEFAULT = true;
122 * Configuration property that disables the usage
123 * of dynamic route resolution. Dynamic routes
124 * will not be discovered. set to true by default
125 * can be overwritten via ConfigParams
127 private boolean useRouteResolver = USE_ROUTE_RESOLVER_DEFAULT;
130 * PeerGroup Service Handle
132 private PeerGroup group = null;
135 * Resolver service handle
137 private ResolverService resolver = null;
142 private MembershipService membership = null;
145 * EndpointRouter pointer
147 private EndpointRouter router = null;
150 * local peer ID as a endpointAddress.
152 private EndpointAddress localPeerAddr = null;
157 private ID localPeerId = null;
160 * Route CM Persistent cache
162 private RouteCM routeCM = null;
165 * The current resolver query ID. static to make debugging easier.
167 private final static AtomicInteger qid = new AtomicInteger(0);
172 private SrdiIndex srdiIndex = null;
177 private Srdi srdi = null;
180 * Encapsulates current Membership Service credential.
182 final static class CurrentCredential {
185 * The current default credential
187 final Credential credential;
190 * The current default credential in serialized XML form.
192 final XMLDocument credentialDoc;
194 CurrentCredential(Credential credential, XMLDocument credentialDoc) {
195 this.credential = credential;
196 this.credentialDoc = credentialDoc;
201 * The current Membership service default credential.
203 CurrentCredential currentCredential;
206 * Listener we use for membership property events.
208 private class CredentialListener implements PropertyChangeListener {
211 * Standard Constructor
213 CredentialListener() {}
218 public void propertyChange(PropertyChangeEvent evt) {
219 if (MembershipService.DEFAULT_CREDENTIAL_PROPERTY.equals(evt.getPropertyName())) {
220 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
221 LOG.fine("New default credential event");
224 synchronized (RouteResolver.this) {
225 Credential cred = (Credential) evt.getNewValue();
226 XMLDocument credentialDoc;
229 credentialDoc = (XMLDocument) cred.getDocument(MimeMediaType.XMLUTF8);
230 currentCredential = new CurrentCredential(cred, credentialDoc);
231 } catch (Exception all) {
232 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
233 LOG.log(Level.WARNING, "Could not generate credential document", all);
235 currentCredential = null;
238 currentCredential = null;
245 final CredentialListener membershipCredListener = new CredentialListener();
248 * @param router the router
250 RouteResolver(EndpointRouter router) {
251 this.router = router;
255 * initialize routeResolver
257 public void init(PeerGroup group, ID assignedID, Advertisement impl) throws PeerGroupException {
259 ModuleImplAdvertisement implAdvertisement = (ModuleImplAdvertisement) impl;
261 // extract Router service configuration properties
262 ConfigParams confAdv = group.getConfigAdvertisement();
263 XMLElement paramBlock = null;
265 if (confAdv != null) {
266 paramBlock = (XMLElement) confAdv.getServiceParam(assignedID);
269 if (paramBlock != null) {
270 // get our tunable router parameter
273 param = paramBlock.getChildren("useRouteResolver");
274 if (param.hasMoreElements()) {
275 useRouteResolver = Boolean.getBoolean(((XMLElement) param.nextElement()).getTextValue());
281 localPeerId = group.getPeerID();
283 localPeerAddr = EndpointRouter.pid2addr(group.getPeerID());
285 if (Logging.SHOW_CONFIG && LOG.isLoggable(Level.CONFIG)) {
286 StringBuilder configInfo = new StringBuilder("Configuring Router Transport Resolver : " + assignedID);
288 if (implAdvertisement != null) {
289 configInfo.append("\n\tImplementation :");
290 configInfo.append("\n\t\tModule Spec ID: ").append(implAdvertisement.getModuleSpecID());
291 configInfo.append("\n\t\tImpl Description : ").append(implAdvertisement.getDescription());
292 configInfo.append("\n\t\tImpl URI : ").append(implAdvertisement.getUri());
293 configInfo.append("\n\t\tImpl Code : ").append(implAdvertisement.getCode());
296 configInfo.append("\n\tGroup Params :");
297 configInfo.append("\n\t\tGroup : ").append(group);
298 configInfo.append("\n\t\tPeer ID : ").append(group.getPeerID());
300 configInfo.append("\n\tConfiguration:");
301 configInfo.append("\n\t\tUse Route Resolver : ").append(useRouteResolver());
302 LOG.config(configInfo.toString());
309 public int startApp(String[] arg) {
311 resolver = group.getResolverService();
313 if (null == resolver) {
314 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
315 LOG.warning("Endpoint Router start stalled until resolver service available");
317 return Module.START_AGAIN_STALLED;
320 membership = group.getMembershipService();
322 if (null == membership) {
323 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
324 LOG.warning("Endpoint Router start stalled until membership service available");
326 return Module.START_AGAIN_STALLED;
329 resolver.registerHandler(routerSName, this);
330 // create and register the srdi service
331 srdiIndex = new SrdiIndex(group, srdiIndexerFileName);
332 // Srdi is a thread but we are not going to start,
333 // since the service is reactive.
334 srdi = new Srdi(group, routerSName, this, srdiIndex, 0, 0);
335 resolver.registerSrdiHandler(routerSName, this);
337 synchronized (this) {
338 // register our credential listener.
339 membership.addPropertyChangeListener(MembershipService.DEFAULT_CREDENTIAL_PROPERTY, membershipCredListener);
342 // set the initial version of the default credential.
343 currentCredential = null;
344 Credential credential = membership.getDefaultCredential();
345 XMLDocument credentialDoc;
347 if (null != credential) {
348 credentialDoc = (XMLDocument) credential.getDocument(MimeMediaType.XMLUTF8);
349 currentCredential = new CurrentCredential(credential, credentialDoc);
351 } catch (Exception all) {
352 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
353 LOG.log(Level.WARNING, "could not get default credential", all);
358 // get the RouteCM cache service
359 routeCM = router.getRouteCM();
361 return Module.START_OK;
367 * Careful that stopApp() could in theory be called before startApp().
369 public void stopApp() {
371 resolver.unregisterHandler(routerSName);
374 resolver.unregisterSrdiHandler(routerSName);
377 membership.removePropertyChangeListener("defaultCredential", membershipCredListener);
378 currentCredential = null;
386 * return routeResolver usage
388 * @return routeResolver usage
390 boolean useRouteResolver() {
391 return useRouteResolver;
395 * enable routeResolver usage
396 * @param enable if true, enables route resolver
398 void enableRouteResolver(boolean enable) {
399 useRouteResolver = enable;
403 * issue a new route discovery resolver request
405 * @param peer the destination as a logical endpoint address
407 protected void findRoute(EndpointAddress peer) {
409 RouteAdvertisement myRoute = router.getMyLocalRoute();
411 // No need to pursue further if we haven't initialized our own route as
412 // responding peers are not going to be able to respond to us.
413 if (myRoute == null) {
414 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
415 LOG.fine("Cannot issue a find route if we don\'t know our own route");
420 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
421 LOG.fine("Find route for peer = " + peer);
425 // create a new RouteQuery message
428 // check if we have some bad route information
429 // for that peer, in that case pass the bad hop count
432 badRoute = router.getBadRoute(peer);
434 if (badRoute != null) {
435 // ok we have a bad route
436 // pass the bad hops info as part of the query
437 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
438 LOG.fine("findRoute sends query: known bad Hops" + badRoute);
440 doc = new RouteQuery(EndpointRouter.addr2pid(peer), myRoute, badRoute.getBadHops());
442 doc = new RouteQuery(EndpointRouter.addr2pid(peer), myRoute, null);
445 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
446 LOG.fine("Sending query for peer : " + peer);
449 XMLDocument credentialDoc;
450 CurrentCredential current = currentCredential;
452 if (null != current) {
453 credentialDoc = current.credentialDoc;
455 credentialDoc = null;
458 ResolverQuery query = new ResolverQuery(routerSName, credentialDoc, localPeerId.toString(), doc.toString(), qid.incrementAndGet());
460 // only run SRDI if we are a rendezvous
461 // FIXME 20060106 bondolo This is not dynamic enough. The route
462 // resolver needs to respond to changes in rendezvous configuration
464 if (group.isRendezvous()) {
466 // check where to send the query via SRDI
467 List<PeerID> results;
469 if (srdiIndex != null) {
470 // try to find a least 10 entries, will pick up one
471 // randomly. This will protect against retry. It is
472 // likely that a number of RDV will know about a route
473 results = srdiIndex.query("route", RouteAdvertisement.DEST_PID_TAG, EndpointRouter.addr2pid(peer).toString(), 10);
475 if (results != null && !results.isEmpty()) {
476 // use SRDI to send the query
477 // remove any non rdv peers from the candidate list
478 // and garbage collect the index in the process
479 List<PeerID> clean = cleanupAnyEdges(query.getSrcPeer(), results);
481 if (!clean.isEmpty()) {
482 // The purpose of incrementing the hopcount
483 // when an SRDI index match is found (we got a
484 // pointer to a rdv that should have the route) is to
485 // restrict any further forwarding. The increment
486 // count is only done when a matching SRDI index is
487 // found. Not when the replica is selected as we
488 // still need to forward the query. This restriction
489 // is purposelly done to avoid too many longjumps
491 query.incrementHopCount();
493 srdi.forwardQuery(clean, query, 1);
494 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
495 LOG.fine("found an srdi entry forwarding query to SRDI peer");
500 // it is not in our cache, look for the replica peer
501 // we need to send the query
502 PeerID destPeer = srdi.getReplicaPeer(EndpointRouter.addr2pid(peer).toString());
504 if (destPeer != null && !destPeer.equals(localPeerId)) {
505 // don't push anywhere if we do not have a replica
506 // or we are trying to push to ourself
507 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
508 LOG.fine("processQuery srdiIndex DHT forward :" + destPeer);
511 srdi.forwardQuery(destPeer, query);
514 LOG.fine("processQuery srdiIndex DHT forward resulted in no op");
520 // if we reach that point then we just use the resolver walk
521 resolver = group.getResolverService();
522 if (resolver != null) {
523 resolver.sendQuery(null, query);
524 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
525 LOG.fine("find route query sent");
528 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
529 LOG.warning("cannot get the resolver service");
532 } catch (Exception ee) {
533 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
534 LOG.log(Level.WARNING, "Exception in findRoute", ee);
542 * This is called by the Generic ResolverServiceImpl when processing a
543 * response to a query.
545 public void processResponse(ResolverResponseMsg response) {
547 if (!useRouteResolver) { // Route resolver disabled
551 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
552 LOG.fine("processResponse got a response");
555 // convert the response into a RouteResponse
556 RouteResponse doc = null;
559 Reader ip = new StringReader(response.getResponse());
561 XMLDocument asDoc = (XMLDocument)
562 StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8, ip);
564 doc = new RouteResponse(asDoc);
565 } catch (Exception e) {
566 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
567 LOG.log(Level.WARNING, "malformed response - discard", e);
572 RouteAdvertisement dstRoute = doc.getDestRoute();
573 RouteAdvertisement srcRoute = doc.getSrcRoute();
574 int queryId = response.getQueryId();
576 EndpointAddress routingPeer = EndpointRouter.pid2addr(srcRoute.getDestPeerID());
577 EndpointAddress destPeer = EndpointRouter.pid2addr(dstRoute.getDestPeerID());
579 // check if we have a negative route response
580 if (queryId == NACKROUTE_QUERYID) {
581 AccessPointAdvertisement badHop = dstRoute.nextHop(EndpointRouter.addr2pid(routingPeer));
585 if (badHop != null) {
586 badPeer = badHop.getPeerID();
587 } else { // the bad hop is the final destination
588 badPeer = dstRoute.getDestPeerID();
591 processBadRoute(badPeer, dstRoute);
595 // This is not our own peer adv, so we must not keep it
596 // for more than its expiration time.
597 // we only need to publish this route if
598 // we don't know about it yet
599 // XXX: here is where we could be more conservative and use isNormallyReachable() instead, thus excluding
600 // incoming messengers.
601 if ((!router.isLocalRoute(EndpointRouter.pid2addr(srcRoute.getDestPeerID())))
602 && (!router.isRoutedRoute(srcRoute.getDestPeerID()))) {
603 router.updateRouteAdv(srcRoute);
606 if (destPeer.equals(routingPeer)) {
607 // The dest peer itself managed to respond to us. That means we
608 // learned the route from the reverseRoute in the message
609 // itself. So, there's nothing we need to do.
610 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
611 LOG.fine("learn route directly from the destination");
614 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
615 LOG.fine("learn route:" + routingPeer);
619 // build the candidate route using the
620 // route response from the respondant peer
621 RouteAdvertisement candidateRoute = RouteAdvertisement.newRoute(EndpointRouter.addr2pid(destPeer),
622 EndpointRouter.addr2pid(routingPeer),(Vector) dstRoute.getVectorHops().clone());
624 // cleanup the candidate route from any loop and remove the local peer extra
626 RouteAdvertisement.cleanupLoop(candidateRoute, (PeerID) localPeerId);
628 // Is there anything left in that route (or did the respondant
629 // believe that we are the last hop on the route - which
630 // obviously we are not.
631 if (candidateRoute.size() == 0) {
632 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
633 LOG.fine("Route response outdated: NACK responder");
635 generateNACKRoute(EndpointRouter.addr2pid(routingPeer), EndpointRouter.addr2pid(destPeer), dstRoute.getVectorHops());
639 // get the address of the first hop in the route to verify that
640 // we have a route (direct or long) to the first hop, so the route
642 EndpointAddress candidateRouter = EndpointRouter.pid2addr(candidateRoute.getFirstHop().getPeerID());
644 // check that we have a direct connection to the first hop
645 if (router.ensureLocalRoute(candidateRouter, null) == null) {
646 // If we do not have a direct route to the candidate router check
647 // for a long route in that case stich the route
648 RouteAdvertisement routeToRouter = router.getRoute(candidateRouter, false);
650 if (routeToRouter == null) {
651 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
652 LOG.fine("Route response useless: no route to next router hop");
657 // stich the route removing any loops and localPeer cycle
658 if (RouteAdvertisement.stichRoute(candidateRoute, routeToRouter, (PeerID) localPeerId)) {
659 router.setRoute(candidateRoute, false);
661 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
662 LOG.fine("Route response error stiching route response");
667 // we have a direct connection with the first hop of the candidate route
668 // set the new route, which starts with the peer that replied to us.
669 router.setRoute(candidateRoute, false);
671 } catch (Exception ex) {
672 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
673 LOG.log(Level.WARNING, "Failure building response route", ex);
674 LOG.warning(" bad dstRoute: " + dstRoute.display());
678 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
679 LOG.fine("finish process route response successfully");
685 * bad route, so let's remove everything we have so
686 * we can start from scratch. We are maintaining a
687 * bad route up to DEFAULT_ROUTE expiration after
688 * that we consider it to be ok to retry the same route
689 * We are removing both the route and peer advertisement
690 * to force a new route query
692 * @param badHop source PeerID of NACK route info
693 * @param dest original route information
695 private void processBadRoute(PeerID badHop, RouteAdvertisement dest) {
697 EndpointAddress addr = EndpointRouter.pid2addr(dest.getDestPeerID());
700 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
701 LOG.warning("remove bad route has a bad route info - discard");
706 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
707 LOG.fine("remove bad route info for dest " + dest.display());
708 if (badHop != null) {
709 LOG.fine("remove bad route bad hop " + badHop);
715 // check first that we still have the same route, we may already
717 RouteAdvertisement currentRoute = router.getRoute(addr, false);
719 if (currentRoute == null) { // we already cleanup the route info
723 // check if we still have the old bad route, we may have
724 // already updated the route
725 if (!currentRoute.equals(dest)) {
727 // check if the bad hop is not the destination
728 // if it is then we still have a bad route
729 if (badHop == null) {
730 // we could get the bad hop, so consider the route ok
733 if (badHop.equals(EndpointRouter.addr2pid(addr))) {
734 // check if the new route may still contain the bad hop
735 // the known bad hop is the hop after the src peer that
736 // responded with a NACK route
737 // In this case we also consider the route bad
738 if (!currentRoute.containsHop(badHop)) {
741 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
742 LOG.fine("current route is bad because it contains known bad hop" + badHop);
746 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
747 LOG.fine("current route is bad because it contains known bad destination" + badHop);
753 // keep the bad one in a cache table so we don't retry them
754 // right away. We use the default route timeout
755 BadRoute badRoute = (router.getBadRoute(addr));
757 if (badRoute != null) {
758 if (badRoute.getExpiration() > TimeUtils.timeNow()) {// nothing to do. the information is still valid
760 // It is ancient knowlege update it
761 badRoute.setExpiration(TimeUtils.toAbsoluteTimeMillis(BADROUTE_EXPIRATION));
764 // check if we have to add a new bad hop
766 if (badHop != null) {
767 badRoute.addBadHop(badHop);
768 badRoute.setExpiration(TimeUtils.toAbsoluteTimeMillis(BADROUTE_EXPIRATION));
771 router.setBadRoute(addr, badRoute);
774 // create a new NACK route entry
777 if (badHop != null) {
778 badHops = Collections.singleton(badHop);
780 badHops = Collections.emptySet();
783 badRoute = new BadRoute(dest, TimeUtils.toAbsoluteTimeMillis(BADROUTE_EXPIRATION), badHops);
784 router.setBadRoute(addr, badRoute);
787 // remove route from route CM
788 routeCM.flushRoute(EndpointRouter.addr2pid(addr));
790 // let's remove the remote route info from the routing table
791 // we do this after we removed the entries from the CM
792 // to avoid that another thread is putting back the entry
793 router.removeRoute(EndpointRouter.addr2pid(addr));
794 } catch (Exception ex) {
795 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
796 LOG.log(Level.WARNING, "exception during bad route removal", ex);
802 * Process the Query, and generate response
804 * @param query the query to process
806 public int processQuery(ResolverQueryMsg query) {
808 if (!useRouteResolver) { // Route resolver disabled
809 return ResolverService.OK;
812 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
813 LOG.fine("processQuery starts");
816 RouteQuery routeQuery;
818 Reader ip = new StringReader(query.getQuery());
819 XMLDocument asDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8, ip);
820 routeQuery = new RouteQuery(asDoc);
821 } catch (RuntimeException e) {
822 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
823 LOG.log(Level.FINE, "Malformed Route query ", e);
825 return ResolverService.OK;
826 } catch (IOException e) {
827 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
828 LOG.log(Level.FINE, "Malformed Route query ", e);
830 return ResolverService.OK;
833 PeerID pId = routeQuery.getDestPeerID();
835 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
836 LOG.fine("Looking for route to " + pId);
839 RouteAdvertisement srcRoute = routeQuery.getSrcRoute();
840 Collection<PeerID> badHops = routeQuery.getBadHops();
842 if (Logging.SHOW_FINER && LOG.isLoggable(Level.FINER)) {
843 StringBuilder badHopsDump = new StringBuilder("bad Hops :\n");
845 for (ID aBadHop : badHops) {
846 badHopsDump.append('\t').append(aBadHop);
849 LOG.finer(badHopsDump.toString());
852 // if our source route is not null, then publish it
853 if (srcRoute != null) {
854 if (!(srcRoute.getDestPeerID()).equals(localPeerId)) {
855 // This is not our own peer adv so we must not keep it
856 // longer than its expiration time.
858 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
859 LOG.fine("Publishing sender route info " + srcRoute.getDestPeerID());
862 // we only need to publish this route if
863 // we don't know about it yet
864 // XXX: here is where we could be more conservative and use isNormallyReachable() instead, thus excluding
865 // incoming messengers.
866 if ((!router.isLocalRoute(EndpointRouter.pid2addr(srcRoute.getDestPeerID())))
867 && (!router.isRoutedRoute(srcRoute.getDestPeerID()))) {
868 routeCM.publishRoute(srcRoute);
870 } catch (Exception e) {
871 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
872 LOG.log(Level.FINE, "Could not publish Route Adv from query - discard", e);
874 return ResolverService.OK;
878 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
879 LOG.fine("No src Route in route query - discard ");
881 return ResolverService.OK;
885 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
886 LOG.fine("Malformed route query request, no PeerId - discard");
888 return ResolverService.OK;
891 // We have more luck with that one because, since it is part of OUR
892 // message, and not part of the resolver protocol, it is in OUR
894 EndpointAddress qReqAddr = EndpointRouter.pid2addr(pId);
896 RouteAdvertisement route;
898 // check if this peer has a route to the destination
900 boolean found = false;
902 if (qReqAddr.equals(localPeerAddr)) {
904 // return the route that is my local route
905 route = router.getMyLocalRoute();
907 // only rendezvous can respond to route requests
908 // if not we are generating too much traffic
909 // XXX: here is where we could be more conservative and use isNormallyReachable() instead, thus excluding
910 // incoming messengers.
911 if (router.isLocalRoute(qReqAddr)) {
912 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
913 LOG.fine("Peer has direct route to destination ");
915 // we should set the route to something :-)
919 // this peer has a direct route to the destination
920 // return the short route advertisement we know for this peer
921 // (For us it is zero hop, and we advertise ourself as the routing
922 // peer in the response. The stiching is done by whoever gets that
923 // response). May be there are more than one hop advertised in-there...
924 // alternate routing peers...should we leave them ?
925 // For now, we keep the full dest, but wack the hops.
927 route = (RouteAdvertisement) AdvertisementFactory.newAdvertisement(RouteAdvertisement.getAdvertisementType());
929 AccessPointAdvertisement ap = (AccessPointAdvertisement)
930 AdvertisementFactory.newAdvertisement(AccessPointAdvertisement.getAdvertisementType());
935 route = router.getRoute(qReqAddr, false);
938 // check if we were given some bad hops info
939 // and see if the found route contains
940 // any of these bad hops. In that case, we need
941 // to mark this route as bad
942 for (PeerID aBadHop : badHops) {
943 // destination is known to be bad
944 if (EndpointRouter.addr2pid(qReqAddr).equals(aBadHop)) {
945 processBadRoute(aBadHop, route);
950 if (route.containsHop(aBadHop)) {
951 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
952 LOG.fine("Peer has bad route due to " + aBadHop);
954 processBadRoute(aBadHop, route);
964 // discard the request if we are not a rendezvous
965 // else forward to the next peers
966 if (!group.isRendezvous()) {
967 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
968 LOG.fine("discard query forwarding as not a rendezvous");
970 return ResolverService.OK;
973 // did not find a route, check our srdi cache
974 // make sure we protect against out of sync
977 // srdi forwarding is only involved once the Index entry has
978 // been found and we forwarded the resolver query. Afterward a
979 // normal walk proceeds from the initial SRDI index pointing
980 // rdv. This is done to protect against potential loopback
981 // entries in the SRDI cache index due to out of sync peerview
983 if (query.getHopCount() < 2) {
985 // check local SRDI cache to see if we have the entry
986 // we look for 10 entries, will pickup one randomly
987 List<PeerID> results = srdiIndex.query("route", RouteAdvertisement.DEST_PID_TAG, pId.toString(), 10);
989 if (results.size() > 0) {
990 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
991 LOG.fine("processQuery srdiIndex lookup match :" + results.size());
994 // remove any non-rdv peers to avoid sending
995 // to a non-rdv peers and garbage collect the SRDI
996 // index in the process
997 List<PeerID> clean = cleanupAnyEdges(query.getSrcPeer(), results);
999 if (clean.size() > 0) {
1001 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1002 LOG.fine("found an srdi entry forwarding query to SRDI peer");
1005 // The purpose of incrementing the hopcount
1006 // when an SRDI index match is found (we got a
1007 // pointer to a rdv that should have the route) is to
1008 // restrict any further forwarding. The increment
1009 // count is only done when a matching SRDI index is
1010 // found. Not when the replica is selected as we
1011 // still need to forward the query. This restriction
1012 // is purposelly done to avoid too many longjumps
1014 query.incrementHopCount();
1016 // Note: this forwards the query to 1 peer randomly
1017 // selected from the result
1018 srdi.forwardQuery(clean, query, 1);
1020 // tell the resolver no further action is needed.
1021 return ResolverService.OK;
1026 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1027 LOG.fine("did not find a route or SRDI index");
1031 return ResolverService.Repropagate;
1034 // we found a route send the response
1036 if (route == null) {
1037 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1038 LOG.fine("we should have had a route at this point");
1040 return ResolverService.OK;
1043 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1044 LOG.fine("we have a route build route response" + route.display());
1047 RouteAdvertisement myRoute = router.getMyLocalRoute();
1049 // make sure we initialized our local
1050 // route info as we will need it to respond. We may
1051 // not have our route if we are still
1052 // waiting for a relay connection.
1053 if (myRoute == null) {
1054 return ResolverService.OK;
1057 RouteResponse routeResponse = new RouteResponse();
1059 routeResponse.setDestRoute(route);
1060 routeResponse.setSrcRoute(myRoute);
1062 if (routeResponse == null) {
1063 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1064 LOG.fine("error creating route response");
1066 return ResolverService.OK;
1069 // construct a response from the query
1070 ResolverResponseMsg res = query.makeResponse();
1072 CurrentCredential current = currentCredential;
1074 if (null != current) {
1075 res.setCredential(current.credentialDoc);
1077 res.setResponse(routeResponse.toString());
1079 resolver.sendResponse(query.getSrcPeer().toString(), res);
1080 return ResolverService.OK;
1082 } catch (Exception ee) {
1083 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1084 LOG.log(Level.FINE, "processQuery: error while processing query ", ee);
1086 return ResolverService.OK;
1091 * Return a route error in case a route was found to be invalid
1092 * as the current hop cannot find a way to forward the message to the
1093 * destination or any other hops in the forward part of the route.
1094 * In that case a negative route response is forwarded
1095 * to the original source of the message. Now of course we
1096 * do not have any way to guarantee that the NACK message will be
1097 * received by the sender, but the best we can do is to try :-)
1099 * we send a query ID to NACKROUTE_QUERYID to indicate
1100 * this is a bad Route
1102 * @param src original source of the message
1103 * @param dest original destination of the message
1104 * @param origHops original hops
1106 protected void generateNACKRoute(PeerID src, PeerID dest, Vector<AccessPointAdvertisement> origHops) {
1108 // As long as the group is partially initialized, do not bother
1109 // trying to send NACKS. We can't: it just causes NPEs.
1110 if (resolver == null) {
1114 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1115 LOG.fine("generate NACK Route response " + src);
1118 // check first, if we are not already in process of looking for a
1119 // route to the destination peer of the NACK. We should not try to
1120 // send a NACK to that destination at that point as this will block
1121 // our incoming processing thread while it is looking for a route to
1122 // that destination. If there a no pending route requests to that
1123 // destination then we can go ahead an attempt to send the NACK. At
1124 // the maximum we should have only one thread block while looking for
1125 // a specific destination. When we find a route to the destination,
1126 // the next NACK processing will be sent.
1128 if (router.isPendingRouteQuery(src)) {
1129 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1130 LOG.fine("drop NACK due to pending route discovery " + src);
1135 // Generate a route response
1136 RouteAdvertisement route = (RouteAdvertisement)
1137 AdvertisementFactory.newAdvertisement(RouteAdvertisement.getAdvertisementType());
1139 AccessPointAdvertisement ap = (AccessPointAdvertisement)
1140 AdvertisementFactory.newAdvertisement(AccessPointAdvertisement.getAdvertisementType());
1144 route.setHops(origHops);
1146 // set the the route of the peer that
1147 // detected the bad route
1148 RouteAdvertisement routeSrc = (RouteAdvertisement)
1149 AdvertisementFactory.newAdvertisement(RouteAdvertisement.getAdvertisementType());
1151 AccessPointAdvertisement apSrc = (AccessPointAdvertisement)
1152 AdvertisementFactory.newAdvertisement(AccessPointAdvertisement.getAdvertisementType());
1154 apSrc.setPeerID((PeerID) localPeerId);
1155 routeSrc.setDest(apSrc);
1157 RouteResponse routeResponse = new RouteResponse();
1159 routeResponse.setDestRoute(route);
1160 routeResponse.setSrcRoute(routeSrc);
1162 ResolverResponse res = new ResolverResponse();
1164 res.setHandlerName(routerSName);
1166 CurrentCredential current = currentCredential;
1168 if (null != current) {
1169 res.setCredential(current.credentialDoc);
1172 res.setQueryId(NACKROUTE_QUERYID);
1173 res.setResponse(routeResponse.toString());
1175 // send the NACK response back to the originator
1176 resolver.sendResponse(src.toString(), res);
1180 * process an SRDI message request
1182 * @param message SRDI resolver message
1184 public boolean processSrdi(ResolverSrdiMsg message) {
1185 if(!group.isRendezvous()) {
1190 SrdiMessage srdiMsg;
1193 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1194 LOG.fine("Received a SRDI messsage in group" + group.getPeerGroupName());
1197 XMLDocument asDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8, new StringReader(message.getPayload()));
1198 srdiMsg = new SrdiMessageImpl(asDoc);
1199 } catch (Exception e) {
1200 // we don't understand this msg, let's skip it
1201 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1202 LOG.log(Level.WARNING, "corrupted SRDI message", e);
1207 PeerID pid = srdiMsg.getPeerID();
1209 // filter messages that contain messages
1210 // about the local peer, so we don't enter
1212 if (pid.equals(localPeerId)) {
1216 for (SrdiMessage.Entry entry : srdiMsg.getEntries()) {
1217 // drop any information about ourself
1218 if (entry.key.equals(localPeerId.toString())) {
1221 value = entry.value;
1222 if (value == null) {
1226 // Expiration of entries is taken care of by SrdiIdex, so we always add
1227 // FIXME hamada 20030314
1228 // All routes are added under the secondary key 'DstPID', it would be more correct to
1229 // Specify it in the message, but since versioning is not yet supported the following is
1230 // acceptable, since it is localized
1231 srdiIndex.add(srdiMsg.getPrimaryKey(), RouteAdvertisement.DEST_PID_TAG, entry.key, pid, entry.expiration);
1232 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1233 LOG.fine("Primary Key [" + srdiMsg.getPrimaryKey() + "] key [RouteAdvertisement.DEST_PID_TAG]" + " value [" + entry.key + "] exp [" + entry.expiration + "]");
1243 public void pushEntries(boolean all) {
1244 // only send to the replica
1245 pushSrdi(null, all);
1249 * push all srdi entries to the rednezvous SRDI cache (new connection)
1251 *@param all if true push all entries, otherwise just deltas
1253 protected void pushSrdi(String peer, boolean all) {
1255 SrdiMessage srdiMsg;
1256 Vector<SrdiMessage.Entry> routeIx = new Vector<SrdiMessage.Entry>();
1258 // 20021018 tra:Route info don't expire unless the peer disappears
1259 // This approach is used to limit the SRDI traffic. The key
1260 // point here is that SRDI is used to tell a peer that another
1261 // has a route to the destination it is looking for. The information
1262 // that SRDI cache is not so much the specific route info but rather
1263 // the fact that a peer has knowledge of a route to another peer
1264 // We don't want to update the SRDI cache on every route update.
1265 // The SRDI cache will be flushed when the peer disconnect from
1268 // We cannot support concurrent modification of the map while we
1269 // do that: we must synchronize...
1270 for (Iterator<ID> each = router.getAllRoutedRouteAddresses(); each.hasNext();) {
1271 ID pid = each.next();
1272 SrdiMessage.Entry entry = new SrdiMessage.Entry(pid.toString(), "", Long.MAX_VALUE);
1273 routeIx.addElement(entry);
1277 // check if we have anything to send
1278 if (routeIx.size() == 0) {
1282 srdiMsg = new SrdiMessageImpl(group.getPeerID(),
1287 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1288 LOG.fine("Sending a SRDI messsage of [All=" + all + "] routes");
1290 // this will replicate entry to the SRDI replica peers
1291 srdi.replicateEntries(srdiMsg);
1292 } catch (Exception e) {
1293 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1294 LOG.log(Level.WARNING, "SRDI Push failed", e);
1300 * push srdi entries to the SRDI rendezvous cache
1301 * @param all if true push all entries, otherwise just deltas
1303 protected void pushSrdi(ID peer, PeerID id) {
1305 SrdiMessage srdiMsg;
1308 srdiMsg = new SrdiMessageImpl(group.getPeerID(), 1, // only one hop
1309 "route", id.toString(), null, Long.MAX_VALUE); // maximum expiration
1310 // 20021018 tra:Route info don't expire unless the peer disappears
1311 // This approach is used to limit the SRDI traffic. The key
1312 // point here is that SRDI is used to tell a peer that another
1313 // has a route to the destination it is looking for. The information
1314 // that SRDI cache is not so much the specific route info but rather
1315 // the fact that a peer has knowledge of a route to another peer
1316 // We don't want to update the SRDI cache on every route update.
1317 // The SRDI cache will be flushed when the peer disconnect from
1319 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1320 LOG.fine("sending a router SRDI message add route " + id);
1323 PeerID destPeer = srdi.getReplicaPeer(id.toString());
1326 // don't push anywhere if we do not have a replica
1327 // or we are trying to send the query to ourself
1328 if (!localPeerId.equals(peer)) {
1329 srdi.pushSrdi(peer, srdiMsg);
1331 } catch (Exception e) {
1332 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1333 LOG.log(Level.WARNING, "SRDI push failed", e);
1339 * remove a SRDI cache entry
1341 * @param peer peer id we send the request, null for sending to all
1342 * @param id peer id of the SRDI route that we want to remove
1345 protected void removeSrdi(String peer, PeerID id) {
1347 SrdiMessage srdiMsg;
1350 srdiMsg = new SrdiMessageImpl(group.getPeerID(), 1, // only one hop
1351 "route", id.toString(), null, // 0 means remove
1354 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1355 LOG.fine("sending a router SRDI message delete route " + id);
1359 PeerID destPeer = srdi.getReplicaPeer(id.toString());
1361 // don't push anywhere if we do not have replica
1362 // or we are trying to push to ouself
1363 if (destPeer != null && (!destPeer.equals(localPeerId))) {
1364 srdi.pushSrdi(destPeer, srdiMsg);
1367 } catch (Exception e) {
1368 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1369 LOG.log(Level.FINE, "Removing srdi entry failed", e);
1377 public void messageSendFailed(PeerID peerid, OutgoingMessageEvent e) {
1378 // when the resolver failed to send, we get a notification and
1379 // flush the SRDI cache entries for that destination
1380 removeSrdiIndex(peerid);
1384 * cleanup any edge peers when trying to forward an SRDI query so we are
1385 * guaranteed to the best of our knowledge that the peer is a rendezvous.
1386 * This is not perfect, as it may take time for the peerview to converge but
1387 * at least we can remove any peers that is not a rendezvous.
1390 * @param results vector of PeerIDs
1391 * @return cleaned up vector of PeerIDs
1393 protected List<PeerID> cleanupAnyEdges(ID src, List<PeerID> results) {
1394 List<PeerID> clean = new ArrayList<PeerID>(results.size());
1396 // put the peerview as a vector of PIDs
1397 List<PeerID> rpvId = srdi.getGlobalPeerView();
1399 // remove any peers not in the current peerview
1400 // these peers may be gone or have become edges
1401 for (PeerID pid : results) {
1402 // eliminate the src of the query so we don't resend
1403 // the query to whom send it to us
1404 if (src.equals(pid)) {
1407 // remove the local also, so we don't send to ourself
1408 if (localPeerId.equals(pid)) {
1411 if (rpvId.contains(pid)) {
1412 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1413 LOG.fine("valid rdv for SRDI forward " + pid);
1417 // cleanup our SRDI cache for that peer
1418 srdiIndex.remove(pid);
1427 * @param pid of the index to be removed
1429 protected void removeSrdiIndex(PeerID pid) {
1430 srdiIndex.remove(pid);