2 * Copyright (c) 2002-2007 Sun Micro//Systems, Inc. All rights reserved.
4 * The Sun Project JXTA(TM) Software License
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
9 * 1. Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
16 * 3. The end-user documentation included with the redistribution, if any, must
17 * include the following acknowledgment: "This product includes software
18 * developed by Sun Microsystems, Inc. for JXTA(TM) technology."
19 * Alternately, this acknowledgment may appear in the software itself, if
20 * and wherever such third-party acknowledgments normally appear.
22 * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must
23 * not be used to endorse or promote products derived from this software
24 * without prior written permission. For written permission, please contact
25 * Project JXTA at http://www.jxta.org.
27 * 5. Products derived from this software may not be called "JXTA", nor may
28 * "JXTA" appear in their name, without prior written permission of Sun.
30 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
31 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
32 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN
33 * MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
34 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
36 * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
37 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
38 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
39 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41 * JXTA is a registered trademark of Sun Microsystems, Inc. in the United
42 * States and other countries.
44 * Please see the license information page at :
45 * <http://www.jxta.org/project/www/license.html> for instructions on use of
46 * the license in source files.
48 * ====================================================================
50 * This software consists of voluntary contributions made by many individuals
51 * on behalf of Project JXTA. For more information on Project JXTA, please see
52 * http://www.jxta.org.
54 * This license is based on the BSD license adopted by the Apache Foundation.
56 package net.jxta.impl.rendezvous.rpv;
58 import java.io.IOException;
60 import java.net.URISyntaxException;
61 import java.util.ArrayList;
62 import java.util.Arrays;
63 import java.util.Collections;
64 import java.util.HashSet;
65 import java.util.Iterator;
66 import java.util.List;
67 import java.util.NoSuchElementException;
68 import java.util.Random;
70 import java.util.SortedSet;
71 import java.util.Timer;
72 import java.util.TimerTask;
73 import java.util.TreeSet;
74 import java.util.Vector;
75 import java.util.logging.Level;
76 import java.util.logging.Logger;
78 import net.jxta.discovery.DiscoveryService;
79 import net.jxta.document.Advertisement;
80 import net.jxta.document.AdvertisementFactory;
81 import net.jxta.document.MimeMediaType;
82 import net.jxta.document.StructuredDocumentFactory;
83 import net.jxta.document.XMLDocument;
84 import net.jxta.endpoint.EndpointAddress;
85 import net.jxta.endpoint.EndpointListener;
86 import net.jxta.endpoint.EndpointService;
87 import net.jxta.endpoint.Message;
88 import net.jxta.endpoint.MessageElement;
89 import net.jxta.endpoint.Messenger;
90 import net.jxta.endpoint.StringMessageElement;
91 import net.jxta.endpoint.TextDocumentMessageElement;
92 import net.jxta.id.ID;
93 import net.jxta.id.IDFactory;
94 import net.jxta.logging.Logging;
95 import net.jxta.peer.PeerID;
96 import net.jxta.peergroup.PeerGroup;
97 import net.jxta.pipe.InputPipe;
98 import net.jxta.pipe.OutputPipe;
99 import net.jxta.pipe.PipeID;
100 import net.jxta.pipe.PipeMsgEvent;
101 import net.jxta.pipe.PipeMsgListener;
102 import net.jxta.pipe.PipeService;
103 import net.jxta.protocol.ConfigParams;
104 import net.jxta.protocol.PeerAdvertisement;
105 import net.jxta.protocol.PipeAdvertisement;
106 import net.jxta.protocol.RdvAdvertisement;
107 import net.jxta.protocol.RouteAdvertisement;
108 import net.jxta.rendezvous.RendezvousEvent;
109 import net.jxta.rendezvous.RendezvousListener;
111 import net.jxta.impl.endpoint.EndpointUtils;
112 import net.jxta.impl.endpoint.relay.RelayReferralSeedingManager;
113 import net.jxta.impl.protocol.RdvConfigAdv;
114 import net.jxta.impl.rendezvous.RendezVousServiceImpl;
115 import net.jxta.impl.util.SeedingManager;
116 import net.jxta.impl.util.TimeUtils;
117 import net.jxta.impl.util.URISeedingManager;
120 * This class models a Rendezvous Peer View (RPV):
121 * ordered collection of all other Rendezvous Peers visible to
124 * Presently this class implements a random "diffusion" algorithm
125 * where each Peer periodically selects a randomly selected peer advertisement
126 * from its view and sends it over to a randomly selected peer from its view.
127 * Over time, this causes every peer to learn about every other peer, resulting
128 * in a "converged" peer view.
130 * This diffusion process is bootstrapped by every peer sending their
131 * own peer advertisements to some well-known, stable, "seed" peers on
134 public final class PeerView implements EndpointListener, RendezvousListener {
139 private static final transient Logger LOG = Logger.getLogger(PeerView.class.getName());
144 static final String SERVICE_NAME = "PeerView";
147 * Namespace used for rdv message elements.
149 static final String MESSAGE_NAMESPACE = "jxta";
152 * Element name of outgoing messages. Note that the element contains a
153 * RdvAvertisement and <emphasis>not</emphasis> a Peer Advertisement.
155 static final String MESSAGE_ELEMENT_NAME = "PeerView.PeerAdv";
158 * Element name of responses. Note that the element contains a
159 * RdvAvertisement and <emphasis>not</emphasis> a Peer Advertisement.
161 static final String RESPONSE_ELEMENT_NAME = "PeerView.PeerAdv.Response";
164 * Message element name for PeerView "Cached" Message Element
166 static final String CACHED_RADV_ELEMENT_NAME = "PeerView.Cached";
169 * Optional message element that specifies by its presence in a peerview
170 * message that the referenced peer is not the provider of the
171 * RdvAdvertisement and the advertisement is a "hint" or referral from the
174 * In practice, when sending its own RdvAdvertisement, a peer does not
175 * include this element, but when sending another peer's RdvAdvertisement,
176 * this element is included.
178 static final MessageElement CACHED_RADV_ELEMENT = new StringMessageElement(CACHED_RADV_ELEMENT_NAME, Boolean.TRUE.toString(), null);
181 * Message element name that specifies the route advertisement of the
182 * source of the message.
184 static final String SRCROUTEADV_ELEMENT_NAME = "PeerView.SrcRouteAdv";
187 * Message element name for PeerView "Edge" Message Element
189 static final String EDGE_ELEMENT_NAME = "PeerView.EdgePeer";
192 * Optional message element that specifies by its presence in a peerview
193 * message that the referenced peer is an edge peer and not a member of the
196 static final MessageElement EDGE_ELEMENT = new StringMessageElement(EDGE_ELEMENT_NAME, Boolean.TRUE.toString(), null);
199 * Message element name for PeerView "Failure" Message Element
201 static final String FAILURE_ELEMENT_NAME = "PeerView.Failure";
204 * Optional message element that specifies by its presence in a peerview
205 * message that the referenced peer has either failed or is quitting. If the
206 * "cached" element is also set then the error is being reported by a third
209 static final MessageElement FAILURE_ELEMENT = new StringMessageElement(FAILURE_ELEMENT_NAME, Boolean.TRUE.toString(), null);
212 * This is the interval between adv exchange in seconds. This is
213 * the main tunable runtime parameter for the diffusion
214 * process. An interval that is too low will improve view
215 * consistency at the expense of gratuitous network traffic. On
216 * the other hand, an interval that is too high will cause the
217 * view to become inconsistent. It is desirable to err on the side
220 private static final long DEFAULT_SEEDING_PERIOD = 5 * TimeUtils.ASECOND;
222 private static final long WATCHDOG_PERIOD = 30 * TimeUtils.ASECOND;
223 private static final long WATCHDOG_GRACE_DELAY = 5 * TimeUtils.AMINUTE;
225 private static final long DEFAULT_BOOTSTRAP_KICK_INTERVAL = 3 * TimeUtils.ASECOND;
227 private static final int MIN_BOOTLEVEL = 0;
228 private static final int BOOTLEVEL_INCREMENT = 1;
229 private static final int MAX_BOOTLEVEL = 6;
232 * DEFAULT_SEEDING_RDVPEERS
234 * This value is the maximum number of rendezvous peers that will be
235 * send our own advertisement at boot time.
237 //private static final int DEFAULT_SEEDING_RDVPEERS = 5;
239 private final PeerGroup group;
242 * The group in which our propagate pipe will run.
244 private final PeerGroup advertisingGroup;
245 private final RendezVousServiceImpl rdvService;
246 private final EndpointService endpoint;
249 * The name of this PeerView.
251 * FIXME 20040623 bondolo This should be a CodatID.
253 private final String name;
256 * Delay in relative milliseconds to apply before contacting seeding rdvs.
257 * 0 is supposed to be reserved by RdvConfig to mean "use the default".
258 * However, it is in fact a valid value and also the one we want for the default.
259 * The only problem with that is that it is not possible to configure this value
260 * explicitly, should it one day not be the default. The issue is actually in RdvConfig.
262 private long seedingRdvConnDelay = 0;
264 private final boolean useOnlySeeds;
266 private final SeedingManager seedingManager;
269 * If the peerview is smaller than this we will try harder to find
270 * additional peerview members.
272 private int minHappyPeerView = 4;
275 * A single timer is used to periodically kick each PeerView
276 * into activity. For the Random PeerView, this activity consists
277 * of selecting a PeerViewElement at random from its view and
278 * sending it across to a randomly-selected peer from its view.
280 * FIXME 20021121 lomax
282 * The original idea of using a common timer in order to save threads IS a
283 * very good idea. However, limitations, and actually, bugs, in java.util.Timer
284 * creates the following problems when using a static Timer:
287 * <li>Memory Leak: Canceling a TimerTask does not remove it from the
288 * execution queue of the Timer until the Timer is canceled or the
289 * TimerTask is fired. Since most of the TimerTasks are inner classes
290 * this can mean that the PeerView is held around for a long time.</li>
292 * <li>java.util.Timer is not only not real-time (which is more or less fine
293 * for the PeerView, but it sequentially invokes tasks (only one Thread
294 * per Timer). As a result, tasks that takes a long time to run delays
298 * The PeerView would function better with a better Timer, but JDK does
299 * not provide a standard timer that would fulfill the needs of the
300 * PeerView. Maybe we should implement a JXTA Timer, since lots of the JXTA
301 * services, by being very asynchronous, rely on the same kind of timer
302 * semantics as the PeerView. Until then, creating a Timer per instance of
303 * the PeerView (i.e. per instance of joined PeerGroup) is the best
306 private final Timer timer;
309 * A random number generator.
311 private final static Random random = new Random();
314 * List of scheduled tasks
316 private final Set<TimerTask> scheduledTasks = Collections.synchronizedSet(new HashSet<TimerTask>());
319 * Describes the frequency and amount of effort we will spend updating
322 private int bootLevel = MIN_BOOTLEVEL;
325 * Earliest absolute time in milliseconds at which we will allow a reseed
328 private long earliestReseed = 0L;
330 private final String uniqueGroupId;
333 * Listeners for PeerView Events.
335 private final Set<PeerViewListener> rpvListeners = Collections.synchronizedSet(new HashSet<PeerViewListener>());
338 * Used for querying for pves.
340 private InputPipe wirePipeInputPipe = null;
343 * Used for querying for pves.
345 private OutputPipe wirePipeOutputPipe = null;
348 * Used for notifications about pve failures.
350 private InputPipe localGroupWirePipeInputPipe = null;
353 * Used for notifications about pve failures.
355 private OutputPipe localGroupWirePipeOutputPipe = null;
358 * A task which monitors the up and down peers in the peerview.
360 private WatchdogTask watchdogTask = null;
363 * This is the accumulated view by an instance of this class.
365 private final SortedSet<PeerViewDestination> localView = Collections.synchronizedSortedSet(new TreeSet<PeerViewDestination>());
370 * FIXME bondolo 20041015 This should be part of the local view.
372 private final PeerViewElement self;
373 private PeerViewElement upPeer = null;
374 private PeerViewElement downPeer = null;
376 private final PeerViewStrategy replyStrategy;
378 private final PeerViewStrategy kickRecipientStrategy;
380 private final PeerViewStrategy kickAdvertisementStrategy;
382 private final PeerViewStrategy refreshRecipientStrategy;
385 private PeerAdvertisement lastPeerAdv = null;
386 private int lastModCount = -1;
388 private final PipeAdvertisement localGroupWirePipeAdv;
389 private final PipeAdvertisement advGroupPropPipeAdv;
392 * If <code>true</code> then this Peer View instance is closed and is
395 private volatile boolean closed = false;
398 * Get an instance of PeerView for the specified PeerGroup and Service.
400 * @param group Peer Group in which this Peer View instance operates.
401 * @param advertisingGroup Peer Group in which this Peer View instance will
402 * advertise and broadcast its existence.
403 * @param rdvService The rdvService we are to use.
404 * @param name The identifying name for this Peer View instance.
406 public PeerView(PeerGroup group, PeerGroup advertisingGroup, RendezVousServiceImpl rdvService, String name) {
408 this.advertisingGroup = advertisingGroup;
409 this.rdvService = rdvService;
412 this.endpoint = group.getEndpointService();
414 this.uniqueGroupId = group.getPeerGroupID().getUniqueValue().toString();
416 timer = new Timer("PeerView Timer for " + group.getPeerGroupID(), true);
418 Advertisement adv = null;
419 ConfigParams confAdv = group.getConfigAdvertisement();
421 // Get the config. If we do not have a config, we're done; we just keep
422 // the defaults (edge peer/no auto-rdv)
423 if (confAdv != null) {
425 XMLDocument configDoc = (XMLDocument) confAdv.getServiceParam(rdvService.getAssignedID());
427 if (null != configDoc) {
428 adv = AdvertisementFactory.newAdvertisement(configDoc);
430 } catch (java.util.NoSuchElementException failed) {// ignored
434 RdvConfigAdv rdvConfigAdv;
436 if (!(adv instanceof RdvConfigAdv)) {
437 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
438 LOG.fine("Creating new RdvConfigAdv for defaults.");
441 rdvConfigAdv = (RdvConfigAdv) AdvertisementFactory.newAdvertisement(RdvConfigAdv.getAdvertisementType());
443 rdvConfigAdv = (RdvConfigAdv) adv;
446 if (rdvConfigAdv.getSeedRendezvousConnectDelay() > 0) {
447 seedingRdvConnDelay = rdvConfigAdv.getSeedRendezvousConnectDelay();
450 useOnlySeeds = rdvConfigAdv.getUseOnlySeeds();
452 if (rdvConfigAdv.getMinHappyPeerView() > 0) {
453 minHappyPeerView = rdvConfigAdv.getMinHappyPeerView();
456 URISeedingManager seedingManager;
458 if ((null == advertisingGroup) && rdvConfigAdv.getProbeRelays()) {
459 seedingManager = new RelayReferralSeedingManager(rdvConfigAdv.getAclUri(), useOnlySeeds, group, name);
461 seedingManager = new URISeedingManager(rdvConfigAdv.getAclUri(), useOnlySeeds, group, name);
464 for (URI aSeeder : Arrays.asList(rdvConfigAdv.getSeedingURIs())) {
465 seedingManager.addSeedingURI(aSeeder);
468 for (URI aSeed : Arrays.asList(rdvConfigAdv.getSeedRendezvous())) {
469 seedingManager.addSeed(aSeed);
472 this.seedingManager = seedingManager;
474 lastPeerAdv = group.getPeerAdvertisement();
475 lastModCount = lastPeerAdv.getModCount();
477 // create a new local RdvAdvertisement and set it to self.
478 RdvAdvertisement radv = createRdvAdvertisement(lastPeerAdv, name);
480 self = new PeerViewElement(endpoint, radv);
482 // addPeerViewElement( self );
484 // setup endpoint listener
485 endpoint.addIncomingMessageListener(this, SERVICE_NAME, uniqueGroupId);
487 // add rendezvous listener
488 rdvService.addListener(this);
490 // initialize strategies
491 replyStrategy = new PeerViewRandomWithReplaceStrategy(localView);
493 kickRecipientStrategy = new PeerViewRandomStrategy(localView);
495 kickAdvertisementStrategy = new PeerViewRandomWithReplaceStrategy(localView);
497 refreshRecipientStrategy = new PeerViewSequentialStrategy(localView);
499 localGroupWirePipeAdv = makeWirePipeAdvertisement(group, group, name);
501 if (null != advertisingGroup) {
502 advGroupPropPipeAdv = makeWirePipeAdvertisement(advertisingGroup, group, name);
504 advGroupPropPipeAdv = null;
507 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
508 LOG.info( "PeerView created for group \"" + group.getPeerGroupName() +
509 "\" [" + group.getPeerGroupID() + "] name \"" + name + "\"");
516 * Listener for "PeerView"/<peergroup-unique-id> and propagate pipes.
518 public void processIncomingMessage(Message msg, EndpointAddress srcAddr, EndpointAddress dstAddr) {
520 // check what kind of message this is (response or not).
521 boolean isResponse = false;
522 MessageElement me = msg.getMessageElement(MESSAGE_NAMESPACE, MESSAGE_ELEMENT_NAME);
525 me = msg.getMessageElement(MESSAGE_NAMESPACE, RESPONSE_ELEMENT_NAME);
527 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
528 LOG.warning("Discarding damaged " + msg + ".");
539 XMLDocument asDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(me);
541 adv = AdvertisementFactory.newAdvertisement(asDoc);
542 } catch (RuntimeException failed) {
543 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
544 LOG.log(Level.WARNING, "Failed building rdv advertisement from message element", failed);
547 } catch (IOException failed) {
548 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
549 LOG.log(Level.WARNING, "Failed building rdv advertisement from message element", failed);
554 if (!(adv instanceof RdvAdvertisement)) {
555 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
556 LOG.warning("Response does not contain radv (" + adv.getAdvertisementType() + ")");
561 RdvAdvertisement radv = (RdvAdvertisement) adv;
563 if (null == radv.getRouteAdv()) {
564 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
565 LOG.warning("Rdv Advertisement does not contain route.");
570 // See if we can find a src route adv in the message.
571 me = msg.getMessageElement(MESSAGE_NAMESPACE, SRCROUTEADV_ELEMENT_NAME);
574 XMLDocument asDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(me);
575 Advertisement routeAdv = AdvertisementFactory.newAdvertisement(asDoc);
577 if (!(routeAdv instanceof RouteAdvertisement)) {
578 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
579 LOG.warning("Advertisement is not a RouteAdvertisement");
582 RouteAdvertisement rdvRouteAdv = radv.getRouteAdv().clone();
584 // XXX we stich them together even if in the end it gets optimized away
585 RouteAdvertisement.stichRoute(rdvRouteAdv, (RouteAdvertisement) routeAdv);
586 radv.setRouteAdv(rdvRouteAdv);
588 } catch (RuntimeException failed) {
589 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
590 LOG.log(Level.WARNING, "Failed building route adv from message element", failed);
592 } catch (IOException failed) {
593 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
594 LOG.log(Level.WARNING, "Failed building route adv from message element", failed);
600 // Is this a message about ourself?
601 if (group.getPeerID().equals(radv.getPeerID())) {
602 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
603 LOG.fine("Received a PeerView message about self. Discard.");
609 // Collect the various flags.
611 boolean isFailure = (msg.getMessageElement(MESSAGE_NAMESPACE, FAILURE_ELEMENT_NAME) != null);
612 boolean isCached = (msg.getMessageElement(MESSAGE_NAMESPACE, CACHED_RADV_ELEMENT_NAME) != null);
613 boolean isFromEdge = (msg.getMessageElement(MESSAGE_NAMESPACE, EDGE_ELEMENT_NAME) != null);
614 boolean isTrusted = isFromEdge || seedingManager.isAcceptablePeer(radv.getRouteAdv());
616 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
617 String srcPeer = srcAddr.toString();
619 if ("jxta".equals(srcAddr.getProtocolName())) {
621 String idstr = ID.URIEncodingName + ":" + ID.URNNamespace + ":" + srcAddr.getProtocolAddress();
623 ID asID = IDFactory.fromURI(new URI(idstr));
625 PeerViewElement pve = getPeerViewElement(asID);
628 srcPeer = "\"" + pve.getRdvAdvertisement().getName() + "\"";
630 } catch (URISyntaxException failed) {// ignored
635 "[" + group.getPeerGroupID() + "] Received a" + (isCached ? " cached" : "") + (isResponse ? " response" : "")
636 + (isFailure ? " failure" : "") + " message (" + msg.toString() + ")" + (isFromEdge ? " from edge" : "")
637 + " regarding \"" + radv.getName() + "\" from " + srcPeer);
641 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
642 LOG.warning("Rejecting peerview message from " + radv.getPeerID());
647 // if this is a notification failure. All we have to do is locally
648 // process the failure
650 notifyFailure(radv.getPeerID(), false);
654 handlePeerViewMessage(isResponse, isCached, isFromEdge, isTrusted, radv);
658 * Following the extraction of a peerview message from a
660 private void handlePeerViewMessage(boolean isResponse, boolean isCached, boolean isFromEdge, boolean isTrusted, RdvAdvertisement radv) {
662 // Figure out if we know that peer already. If we do, reuse the pve
664 boolean isNewbie = false;
665 boolean added = false;
668 synchronized (localView) {
669 PeerViewElement newbie = new PeerViewElement(endpoint, radv);
671 pve = getPeerViewElement(newbie);
678 if (!isFromEdge && !isCached && isTrusted) {
680 added = addPeerViewElement(pve);
682 pve.setRdvAdvertisement(radv);
687 if (!isNewbie && isFromEdge && !isCached) {
688 // The message stated that it is from an edge we believed was a
689 // peerview member. Best thing to do is tell everyone that it's no
690 // longer in peerview.
691 notifyFailure(pve, true);
692 // we continue processing because it's not the other peer's fault we had the wrong idea.
695 // Do the rest of the add related tasks out of synch.
696 // We must not nest any possibly synchronized ops in
697 // the LocalView lock; it's the lowest level.
700 // Notify local listeners
701 generateEvent(PeerViewEvent.ADD, pve);
705 * Now, see what if any message we have to send as a result.
706 * There are three kinds of messages we can send:
708 * - A response with ourselves, if we're being probed and we're
711 * - A probe to the peer whose adv we received, because we want
712 * confirmation that it's alive.
714 * - A response with a random adv from our cache if we're being probed
716 * We may send more than one message.
723 // Type 1: Respond to probe
725 // We are being probed by an edge peer or peerview member. We respond
726 // with our own advertisement.
727 status = send(pve, self, true, false);
729 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
730 LOG.fine("Type 1 (Respond with self PVE) : Sent to " + pve + " result =" + status);
733 // Type 3: Respond with random entry from our PV when we are probed.
735 // Respond with a strategized adv from our view.
736 PeerViewElement sendpve = replyStrategy.next();
738 if ((sendpve != null) && !pve.equals(sendpve) && !self.equals(sendpve)) {
739 status = send(pve, sendpve, true, false);
740 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
741 LOG.fine("Type 3 (Respond with random PVE) : Sent " + sendpve + " to " + pve + " result=" + status);
745 // Heartbeat: do nothing.
747 } else if (isResponse) {
748 if (isNewbie && !useOnlySeeds && !isFromEdge) {
749 // Type 2: Probe a peer we have just learned about from a referral.
751 // If useOnlySeeds, we're not allowed to talk to peers other than our
752 // seeds, so do not probe anything we learn from 3rd party. (Probing of
753 // seeds happens as part of the "kick" strategy).
754 status = send(pve, self, false, false);
756 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
757 LOG.fine("Type 2 (Probe PVE) : Probed " + pve + " result=" + status);
760 // Already known or ignoring: do nothing.
763 // Invalid : do nothing.
770 @SuppressWarnings("fallsthrough")
771 public void rendezvousEvent(RendezvousEvent event) {
777 boolean notifyFailure = false;
779 synchronized (this) {
781 int theEventType = event.getType();
783 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
784 LOG.fine("[" + group.getPeerGroupName() + "] Processing " + event);
789 if ((RendezvousEvent.BECAMERDV == theEventType) || (RendezvousEvent.BECAMEEDGE == theEventType)) {
790 // kill any existing watchdog task
791 if (null != watchdogTask) {
792 removeTask(watchdogTask);
793 watchdogTask.cancel();
798 switch (theEventType) {
799 case RendezvousEvent.RDVCONNECT:
800 case RendezvousEvent.RDVRECONNECT:
801 case RendezvousEvent.CLIENTCONNECT:
802 case RendezvousEvent.CLIENTRECONNECT:
803 case RendezvousEvent.RDVFAILED:
804 case RendezvousEvent.RDVDISCONNECT:
805 case RendezvousEvent.CLIENTFAILED:
806 case RendezvousEvent.CLIENTDISCONNECT:
809 case RendezvousEvent.BECAMERDV:
811 watchdogTask = new WatchdogTask();
812 addTask(watchdogTask, WATCHDOG_PERIOD, WATCHDOG_PERIOD);
813 rescheduleKick(true);
816 case RendezvousEvent.BECAMEEDGE:
818 if (!localView.isEmpty()) {
819 // FIXME bondolo 20040229 since we likely don't have a
820 // rendezvous connection, it is kind of silly to be sending
821 // this now. probably should wait until we get a rendezvous
823 notifyFailure = true;
825 rescheduleKick(true);
829 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
830 LOG.warning("[" + group.getPeerGroupName() + "] Unexpected RDV event : " + event);
836 // we can't do the notification under synchronization.
838 notifyFailure(self, true);
842 public void start() {// do nothing for now... all the good stuff happens as a result of
843 // rendezvous events.
848 synchronized (this) {
849 // Only one thread gets to perform the shutdown.
856 // notify other rendezvous peers that we are going down
857 notifyFailure(self, true);
859 // From now on we can nullify everything we want. Other threads check
860 // the closed flag where it matters.
861 synchronized (this) {
862 if (watchdogTask != null) {
863 removeTask(watchdogTask);
864 watchdogTask.cancel();
868 // Remove message listener.
869 endpoint.removeIncomingMessageListener(SERVICE_NAME, uniqueGroupId);
871 // Remove rendezvous listener.
872 rdvService.removeListener(this);
874 // Remove all our pending scheduled tasks
875 // Carefull with the indices while removing: do it backwards, it's
876 // cheaper and simpler.
878 synchronized (scheduledTasks) {
879 Iterator<TimerTask> eachTask = scheduledTasks.iterator();
881 while (eachTask.hasNext()) {
883 TimerTask task = eachTask.next();
887 } catch (Exception ez1) {
888 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
889 LOG.log(Level.WARNING, "Cannot cancel task: ", ez1);
895 // Make sure that we close our WirePipes
898 // Let go of the up and down peers.
905 rpvListeners.clear();
909 protected void addTask(TimerTask task, long delay, long interval) {
911 synchronized (scheduledTasks) {
912 if (scheduledTasks.contains(task)) {
913 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
914 LOG.warning("Task list already contains specified task.");
917 scheduledTasks.add(task);
921 timer.schedule(task, delay, interval);
923 timer.schedule(task, delay);
927 protected void removeTask(TimerTask task) {
928 scheduledTasks.remove(task);
932 * Adds the specified URI to the list of seeds. Even if useOnlySeeds is in
933 * effect, this seed may now be used, as if it was part of the initial
936 * @param seed the URI of the seed rendezvous.
938 public void addSeed(URI seed) {
939 if (seedingManager instanceof URISeedingManager) {
940 ((URISeedingManager) seedingManager).addSeed(seed);
945 * Probe the specified peer immediately.
947 * Note: If "useOnlySeeds" is in effect and the peer is not a seed, any response to this probe will be ignored.
949 public boolean probeAddress(EndpointAddress address, RouteAdvertisement hint) {
951 PeerViewElement holdIt;
953 synchronized (localView) {
957 return send(address, hint, holdIt, false, false);
961 * Send our own advertisement to all of the seed rendezvous.
964 long reseedRemaining = earliestReseed - TimeUtils.timeNow();
966 if (reseedRemaining > 0) {
967 // Too early; the previous round is not even done.
968 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
969 LOG.info("Still Seeding for " + reseedRemaining + "ms.");
974 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
975 LOG.info("New Seeding...");
978 // Schedule sending propagated query to our local network neighbors.
979 send(null, null, self, false, false);
983 if (localView.size() < minHappyPeerView) {
984 // We only do these things if we don't have a "happy" Peer View.
985 // If the Peer View is already "happy" then we will use only
986 // Peer View referrals for learning of new entires.
988 List<RouteAdvertisement> seedRdvs = new ArrayList<RouteAdvertisement>(
989 Arrays.asList(seedingManager.getActiveSeedRoutes()));
991 while (!seedRdvs.isEmpty()) {
992 RouteAdvertisement aSeed = seedRdvs.remove(0);
994 if (null == aSeed.getDestPeerID()) {
995 // It is an incomplete route advertisement. We are going to assume that it is only a wrapper for a single ea.
996 Vector<String> seed_eas = aSeed.getDest().getVectorEndpointAddresses();
998 if (!seed_eas.isEmpty()) {
999 EndpointAddress aSeedHost = new EndpointAddress(seed_eas.get(0));
1001 // XXX 20061220 bondolo We could check all of our current PVEs to make sure that this address is not already known.
1003 send(aSeedHost, null, self, false, false);
1006 // We have a full route, send it to the virtual address of the route!
1007 // FIXME malveaux 20070816 Second part of conjunct can be removed once 'self' is included in the peerview
1008 if ((null == getPeerViewElement(aSeed.getDestPeerID())) && !group.getPeerID().equals(aSeed.getDestPeerID())) {
1009 EndpointAddress aSeedHost = new EndpointAddress("jxta", aSeed.getDestPeerID().getUniqueValue().toString(),
1012 send(aSeedHost, aSeed, self, false, false);
1017 if (!useOnlySeeds) {
1018 // If use only seeds, we're not allowed to put in the peerview
1019 // anything but our seed rdvs. So, we've done everything that
1022 // Schedule sending propagated query to our advertising group
1023 if (advertisingGroup != null) {
1024 // send it, but not immediately.
1025 scheduleAdvertisingGroupQuery(DEFAULT_SEEDING_PERIOD * 2);
1030 earliestReseed = TimeUtils.toAbsoluteTimeMillis(seedingRdvConnDelay + (DEFAULT_SEEDING_PERIOD * iterations));
1034 * Make sure that the PeerView properly changes behavior, when switching
1035 * from edge mode to rdv mode, and vice-versa.
1036 * Since openWirePipes() requires some other services such as the Pipe
1037 * Service, and since updateStatus is invoked this work must happen in
1038 * background, giving a chance to other services to be started.
1040 private class OpenPipesTask extends TimerTask {
1053 } catch (Throwable all) {
1054 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
1055 LOG.log(Level.SEVERE, "Uncaught Throwable in thread: " + Thread.currentThread().getName(), all);
1063 private void scheduleOpenPipes(long delay) {
1065 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1066 LOG.fine("Scheduling open pipes attempt in " + delay + "ms.");
1069 addTask(new OpenPipesTask(), delay, -1);
1073 * Send a PeerView Message to the specified peer.
1075 * @param response indicates whether this is a response. Otherwise
1076 * we may create a distributed loop where peers keep perpetually
1077 * responding to each-other.
1078 * @param failure Construct the message as a failure notification.
1080 private boolean send(PeerViewElement dest, PeerViewElement pve, boolean response, boolean failure) {
1082 Message msg = makeMessage(pve, response, failure);
1084 boolean result = dest.sendMessage(msg, SERVICE_NAME, uniqueGroupId);
1086 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1087 LOG.fine("Sending " + msg + " to " + dest + " success = " + result);
1094 * Send a PeerView Message to the specified peer.
1096 * @param response indicates whether this is a response. Otherwise
1097 * we may create a distributed loop where peers keep perpetually
1098 * responding to each-other.
1099 * @param failure Construct the message as a failure notification.
1101 private boolean send(EndpointAddress dest, RouteAdvertisement hint, PeerViewElement pve, boolean response, boolean failure) {
1103 Message msg = makeMessage(pve, response, failure);
1106 EndpointAddress realAddr = new EndpointAddress(dest, SERVICE_NAME, uniqueGroupId);
1108 Messenger messenger = rdvService.endpoint.getMessengerImmediate(realAddr, hint);
1110 if (null != messenger) {
1112 boolean result = messenger.sendMessage(msg);
1114 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1115 LOG.fine("Sending " + msg + " to " + dest + " success = " + result);
1119 } catch (IOException failed) {
1120 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1121 LOG.log(Level.WARNING, "Could not send " + msg + " to " + dest, failed);
1126 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1127 LOG.warning("Could not get messenger for " + dest);
1133 // Else, propagate the message.
1135 endpoint.propagate(msg, SERVICE_NAME, uniqueGroupId);
1137 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1138 LOG.fine("Sent " + msg + " via propagate");
1141 } catch (IOException ez) {
1142 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1143 // Pretty strange. This has little basis for failure...
1144 LOG.log(Level.WARNING, "Could not propagate " + msg, ez);
1152 * Send a PeerView Message to the specified peer.
1154 * @param response indicates whether this is a response. Otherwise
1155 * we may create a distributed loop where peers keep perpetually
1156 * responding to each-other.
1157 * @param failure Construct the message as a failure notification.
1158 * @param dest destination output pipe
1159 * @param pve the peer view element
1160 * @return true if successful
1162 private boolean send(OutputPipe dest, PeerViewElement pve, boolean response, boolean failure) {
1164 Message msg = makeMessage(pve, response, failure);
1167 return dest.send(msg);
1168 } catch (IOException ez) {
1169 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1170 LOG.log(Level.WARNING, "Could not send " + msg, ez);
1177 * Make a PeerView Message
1179 * @param content the peer view element
1180 * @param response the response
1181 * @param failure whether to create a message based on a failure
1182 * @return the message
1184 private Message makeMessage(PeerViewElement content, boolean response, boolean failure) {
1186 Message msg = new Message();
1188 // // edge peers add an identifying element, RDV peers do not
1189 // if (!rdvService.isRendezVous()) {
1190 // msg.addMessageElement(MESSAGE_NAMESPACE, EDGE_ELEMENT);
1194 // This is a failure notification.
1195 msg.addMessageElement(MESSAGE_NAMESPACE, FAILURE_ELEMENT);
1200 RdvAdvertisement radv = content.getRdvAdvertisement();
1202 XMLDocument doc = (XMLDocument) radv.getDocument(MimeMediaType.XMLUTF8);
1203 String msgName = response ? RESPONSE_ELEMENT_NAME : MESSAGE_ELEMENT_NAME;
1205 MessageElement msge = new TextDocumentMessageElement(msgName, doc, null);
1207 msg.addMessageElement(MESSAGE_NAMESPACE, msge);
1209 if (!content.equals(self)) {
1210 // This is a cached RdvAdvertisement
1211 msg.addMessageElement(MESSAGE_NAMESPACE, CACHED_RADV_ELEMENT);
1213 // This message contains an RdvAdvertisement which is not ourself. In that
1214 // case, it is wise to also send the local route advertisement (as the optional
1215 // SrcRdvAdv) so the destination might have a better change to access the "content"
1216 // RendezvousAdv (this peer will then act as a hop).
1218 RouteAdvertisement localra = EndpointUtils.extractRouteAdv(lastPeerAdv);
1220 if (localra != null) {
1222 XMLDocument radoc = (XMLDocument) localra.getDocument(MimeMediaType.XMLUTF8);
1224 msge = new TextDocumentMessageElement(SRCROUTEADV_ELEMENT_NAME, radoc, null);
1225 msg.addMessageElement(MESSAGE_NAMESPACE, msge);
1226 } catch (Exception ez1) {
1227 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1228 LOG.log(Level.WARNING, "Could not create optional src route adv for " + content, ez1);
1238 * Invoked by anyone in order to inform the PeerView of a failure
1239 * of one of the member peers.
1241 * @param pid ID of the peer which failed.
1242 * @param propagateFailure If <tt>true</tt>then broadcast the failure to
1243 * other peers otherwise only update the local peerview.
1245 public void notifyFailure(PeerID pid, boolean propagateFailure) {
1247 PeerViewElement pve = getPeerViewElement(pid);
1250 notifyFailure(pve, propagateFailure);
1255 * Invoked when a peerview member peer becomes unreachable.
1257 * @param pve The peer which failed.
1258 * @param propagateFailure If {@code true} then broadcast the failure to
1259 * other peers otherwise only update the local peerview.
1261 void notifyFailure(PeerViewElement pve, boolean propagateFailure) {
1263 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1264 LOG.fine("Notifying failure of " + pve);
1268 boolean removedFromPeerView = removePeerViewElement(pve);
1270 // only propagate if we actually knew of the peer
1271 propagateFailure &= (removedFromPeerView || (self == pve));
1273 // Notify local listeners
1274 if (removedFromPeerView) {
1275 generateEvent(PeerViewEvent.FAIL, pve);
1278 boolean emptyPeerView = localView.isEmpty();
1280 // If the local view has become empty, reset the kicker into
1282 if (emptyPeerView && removedFromPeerView) {
1283 rescheduleKick(true);
1286 if (propagateFailure) {
1287 // Notify other rendezvous peers that there has been a failure.
1288 OutputPipe op = localGroupWirePipeOutputPipe;
1291 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1292 LOG.fine("Propagating failure of " + pve);
1295 send(op, pve, true, true);
1298 } catch (Exception ez) {
1299 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1300 LOG.log(Level.WARNING, "Failure while generating noficiation of failure of PeerView : " + pve, ez);
1306 * Invoked by the Timer thread to cause each PeerView to initiate
1307 * a Peer Advertisement exchange.
1309 private void kick() {
1312 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1313 LOG.fine("Begun kick() in " + group.getPeerGroupID());
1316 // Use seed strategy. (it has its own throttling and resource limiting).
1319 // refresh ourself to a peer in our view
1320 PeerViewElement refreshee = refreshRecipientStrategy.next();
1322 if ((refreshee != null) && (self != refreshee)) {
1323 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1324 LOG.fine("Refresh " + refreshee);
1326 send(refreshee, self, false, false);
1329 // now share an adv from our local view to another peer from our
1332 PeerViewElement recipient = kickRecipientStrategy.next();
1334 if (recipient == null) {
1335 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1336 LOG.fine("No recipient to send adv ");
1341 PeerViewElement rpve = kickAdvertisementStrategy.next();
1344 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1345 LOG.fine("No adv to send");
1350 if (rpve.equals(recipient) || self.equals(recipient)) {
1351 // give up: no point in sending a peer its own adv
1352 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1353 LOG.fine("adv to send is same as recipient: Nothing to do.");
1358 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1359 LOG.fine("Sending adv " + rpve + " to " + recipient);
1362 send(recipient, rpve, true, false);
1364 rescheduleKick(false);
1369 * Choose a boot level appropriate for the current configuration and state.
1371 * @return the new boot level.
1373 private int adjustBootLevel() {
1375 boolean areWeHappy = localView.size() >= minHappyPeerView;
1377 // increment boot level faster if we have a reasonable peerview.
1378 int increment = areWeHappy ? BOOTLEVEL_INCREMENT : BOOTLEVEL_INCREMENT * 2;
1380 // if we don't have a reasonable peerview, we continue to try harder.
1381 int maxbootlevel = MAX_BOOTLEVEL - (areWeHappy ? 0 : BOOTLEVEL_INCREMENT);
1383 bootLevel = Math.min(maxbootlevel, bootLevel + increment);
1388 private synchronized void rescheduleKick(boolean now) {
1394 // Set the next iteration
1397 bootLevel = MIN_BOOTLEVEL;
1402 long tilNextKick = DEFAULT_BOOTSTRAP_KICK_INTERVAL * ((1L << bootLevel) - 1);
1404 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1406 "Scheduling kick in " + (tilNextKick / TimeUtils.ASECOND) + " seconds at bootLevel " + bootLevel
1407 + " in group " + group.getPeerGroupID());
1410 KickerTask task = new KickerTask();
1412 addTask(task, tilNextKick, -1);
1413 } catch (Exception ez1) {
1414 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
1415 LOG.log(Level.SEVERE, "Cannot set timer. RPV will not work.", ez1);
1421 * Refresh the local copy of the peer advertisement and the rendezvous
1424 private void refreshSelf() {
1426 RdvAdvertisement radv;
1428 synchronized (this) {
1429 PeerAdvertisement newPadv = group.getPeerAdvertisement();
1430 int newModCount = newPadv.getModCount();
1432 if ((lastPeerAdv != newPadv) || (lastModCount != newModCount)) {
1433 lastPeerAdv = newPadv;
1434 lastModCount = newModCount;
1436 // create a new local RdvAdvertisement and set it to self.
1437 radv = createRdvAdvertisement(lastPeerAdv, name);
1440 self.setRdvAdvertisement(radv);
1446 static RdvAdvertisement createRdvAdvertisement(PeerAdvertisement padv, String serviceName) {
1449 // FIX ME: 10/19/2002 lomax@jxta.org. We need to properly set up the service ID. Unfortunately
1450 // this current implementation of the PeerView takes a String as a service name and not its ID.
1451 // Since currently, there is only PeerView per group (all peerviews share the same "service", this
1452 // is not a problem, but that will have to be fixed eventually.
1454 // create a new RdvAdvertisement
1455 RdvAdvertisement rdv = (RdvAdvertisement) AdvertisementFactory.newAdvertisement(
1456 RdvAdvertisement.getAdvertisementType());
1458 rdv.setPeerID(padv.getPeerID());
1459 rdv.setGroupID(padv.getPeerGroupID());
1460 rdv.setServiceName(serviceName);
1461 rdv.setName(padv.getName());
1463 RouteAdvertisement ra = EndpointUtils.extractRouteAdv(padv);
1465 // Insert it into the RdvAdvertisement.
1466 rdv.setRouteAdv(ra);
1469 } catch (Exception ez) {
1470 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1471 LOG.log(Level.WARNING, "Cannot create Local RdvAdvertisement: ", ez);
1478 * Add a listener for PeerViewEvent
1480 * @param listener An PeerViewListener to process the event.
1481 * @return true if successful
1483 public boolean addListener(PeerViewListener listener) {
1484 boolean added = rpvListeners.add(listener);
1486 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1487 LOG.fine("Registered PeerViewEvent Listener (" + listener.getClass().getName() + ")");
1494 * Removes a PeerViewEvent Listener previously added with addListener.
1496 * @param listener the PeerViewListener listener remove
1497 * @return whether successful or not
1499 public boolean removeListener(PeerViewListener listener) {
1500 boolean removed = rpvListeners.remove(listener);
1502 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1503 LOG.fine("Removed PeerViewEvent Listener (" + listener.getClass().getName() + ")");
1510 * Generate a PeerView Event and notify all listeners.
1512 * @param type the Event Type.
1513 * @param element The peer having the event.
1515 private void generateEvent(int type, PeerViewElement element) {
1517 PeerViewEvent newevent = new PeerViewEvent(this, type, element);
1519 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1520 LOG.fine("Calling listeners for " + newevent + " in group " + group.getPeerGroupID());
1523 for (Object o : Arrays.asList(rpvListeners.toArray())) {
1524 PeerViewListener pvl = (PeerViewListener) o;
1527 pvl.peerViewEvent(newevent);
1528 } catch (Throwable ignored) {
1529 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
1530 LOG.log(Level.SEVERE, "Uncaught Throwable in PeerViewEvent listener : (" + pvl.getClass().getName() + ")"
1538 static PipeAdvertisement makeWirePipeAdvertisement(PeerGroup destGroup, PeerGroup group, String name) {
1540 PipeAdvertisement adv = (PipeAdvertisement) AdvertisementFactory.newAdvertisement(PipeAdvertisement.getAdvertisementType());
1542 // Create a pipe advertisement for this group.
1543 // Generate a well known but unique ID.
1544 // FIXME bondolo 20040507 The ID created is really poor, it has only
1545 // 2 unique bytes on average. it would be much better to hash something
1546 // also, since the the definition of how to use the seed bytes is not
1547 // fixed, it's not reliable.
1548 PipeID pipeId = IDFactory.newPipeID(destGroup.getPeerGroupID()
1550 (SERVICE_NAME + group.getPeerGroupID().getUniqueValue().toString() + name).getBytes());
1552 adv.setPipeID(pipeId);
1553 adv.setType(PipeService.PropagateType);
1554 adv.setName(SERVICE_NAME + " pipe for " + group.getPeerGroupID());
1559 private synchronized void openWirePipes() {
1561 PipeService pipes = group.getPipeService();
1563 if (null == pipes) {
1564 scheduleOpenPipes(TimeUtils.ASECOND); // Try again in one second.
1569 // First, listen to in our own PeerGroup
1570 if (null == localGroupWirePipeInputPipe) {
1571 localGroupWirePipeInputPipe = pipes.createInputPipe(localGroupWirePipeAdv, new WirePipeListener());
1574 if (null == localGroupWirePipeOutputPipe) {
1575 // Creates the OutputPipe - note that timeout is irrelevant for
1578 localGroupWirePipeOutputPipe = pipes.createOutputPipe(localGroupWirePipeAdv, 1 * TimeUtils.ASECOND);
1581 if (localGroupWirePipeOutputPipe == null) {
1582 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1583 LOG.warning("Cannot get OutputPipe for current group");
1586 } catch (Exception failed) {
1587 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1588 LOG.fine("PipeService not ready yet. Trying again in 1 second.");
1590 // Try again in one second.
1591 scheduleOpenPipes(TimeUtils.ASECOND);
1595 if (advertisingGroup != null) {
1597 pipes = advertisingGroup.getPipeService();
1599 if (null == pipes) {
1600 // Try again in one second.
1601 scheduleOpenPipes(TimeUtils.ASECOND);
1605 if (null == wirePipeInputPipe) {
1606 wirePipeInputPipe = pipes.createInputPipe(advGroupPropPipeAdv, new WirePipeListener());
1609 if (null == wirePipeOutputPipe) {
1610 wirePipeOutputPipe = pipes.createOutputPipe(advGroupPropPipeAdv, 1 * TimeUtils.ASECOND);
1613 if (wirePipeOutputPipe == null) {
1614 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1615 LOG.warning("Cannot get OutputPipe for current group");
1618 } catch (Exception failed) {
1619 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1620 LOG.fine("Could not open pipes in local group. Trying again in 1 second.");
1622 // Try again in one second.
1623 scheduleOpenPipes(TimeUtils.ASECOND);
1628 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
1629 LOG.info("Propagate Pipes opened.");
1633 private synchronized void closeWirePipes() {
1635 if (localGroupWirePipeInputPipe != null) {
1636 localGroupWirePipeInputPipe.close();
1637 localGroupWirePipeInputPipe = null;
1640 if (localGroupWirePipeOutputPipe != null) {
1641 localGroupWirePipeOutputPipe.close();
1642 localGroupWirePipeOutputPipe = null;
1645 if (wirePipeInputPipe != null) {
1646 wirePipeInputPipe.close();
1647 wirePipeInputPipe = null;
1650 if (wirePipeOutputPipe != null) {
1651 wirePipeOutputPipe.close();
1652 wirePipeOutputPipe = null;
1655 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
1656 LOG.info("Propagate Pipes closed.");
1661 * Adapter class for receiving wire pipe messages
1663 private class WirePipeListener implements PipeMsgListener {
1668 public void pipeMsgEvent(PipeMsgEvent event) {
1670 Message msg = event.getMessage();
1672 boolean failure = (null != msg.getMessageElement(MESSAGE_NAMESPACE, FAILURE_ELEMENT_NAME));
1673 boolean response = (null != msg.getMessageElement(MESSAGE_NAMESPACE, RESPONSE_ELEMENT_NAME));
1675 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1677 "Received a PeerView " + (failure ? "failure " : "") + (response ? "response " : "") + "message [" + msg
1678 + "] on propagated pipe " + event.getPipeID());
1681 if (!failure && !response) {
1683 // If this is not a failure message then decide if we will respond.
1685 // We play a game that is tuned by the view size so that the expectation of number of responses is equal to
1686 // minHappyPeerView. The game is to draw a number between 0 and the pv size. If the result is < minHappyPeerView,
1687 // then we win (respond) else we lose (stay silent). The probability of winning is HAPPY_SIZE/viewsize. If each of
1688 // the viewsize peers plays the same game, on average HAPPY_SIZE of them win (with a significant variance, but
1689 // that is good enough). If viewsize is <= HAPPY_SIZE, then all respond. This is approximate, of course, since
1690 // the view size is not always consistent among peers.
1692 int viewsize = PeerView.this.localView.size();
1694 if (viewsize > minHappyPeerView) {
1695 int randinview = random.nextInt(viewsize);
1697 if (randinview >= minHappyPeerView) {
1698 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1699 LOG.fine("Ignoring " + msg + " from pipe " + event.getPipeID());
1704 } // Else, we always win; don't bother playing.
1707 // Fabricate dummy src and dst addrs so that we can call processIncoming. These are
1708 // only used for traces. The merit of using the pipeID is that it is recognizable
1710 EndpointAddress src = new EndpointAddress(event.getPipeID(), SERVICE_NAME, null);
1711 EndpointAddress dest = new EndpointAddress(event.getPipeID(), SERVICE_NAME, null);
1714 // call the peerview.
1715 PeerView.this.processIncomingMessage(msg, src, dest);
1716 } catch (Throwable ez) {
1717 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1718 LOG.log(Level.WARNING, "Failed processing " + msg + " from pipe " + event.getPipeID(), ez);
1724 private synchronized void scheduleAdvertisingGroupQuery(long delay) {
1730 TimerTask task = new AdvertisingGroupQueryTask();
1732 addTask(task, delay, -1);
1736 * Class implementing the query request on the AdvertisingGroup
1738 private final class AdvertisingGroupQueryTask extends TimerTask {
1744 public boolean cancel() {
1745 boolean res = super.cancel();
1759 OutputPipe op = wirePipeOutputPipe;
1762 Message msg = makeMessage(self, false, false);
1766 } catch (Throwable all) {
1767 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
1768 LOG.log(Level.SEVERE, "Uncaught Throwable in thread :" + Thread.currentThread().getName(), all);
1777 * Get a copy of the PeerView for this group.
1779 * @return A SortedSet which is the current local view of the peerview
1781 public SortedSet<PeerViewElement> getView() {
1782 synchronized (localView) {
1783 return new TreeSet<PeerViewElement>((SortedSet)localView);
1788 * Add the provided element to the local peerview.
1790 * @param pve the <code>PeerViewElement</code> to add.
1791 * @return <code>true</true> if the element was not present and added
1792 * otherwise <code>false</code>.
1794 private boolean addPeerViewElement(PeerViewElement pve) {
1797 if (null == pve.getRdvAdvertisement()) {
1798 throw new IllegalStateException("Cannot add a seed pve to local view");
1801 synchronized (localView) {
1802 added = localView.add(pve);
1805 // Refresh, if necessary, our up and down peers.
1806 updateUpAndDownPeers();
1811 pve.setPeerView(this);
1818 * Remove the provided element from the local peerview.
1820 * @param pve the <code>PeerViewElement</code> to remove.
1821 * @return <code>true</true> if the element was present and removed
1822 * otherwise <code>false</code>.
1824 private boolean removePeerViewElement(PeerViewElement pve) {
1827 synchronized (localView) {
1828 removed = localView.remove(pve);
1831 // Refresh, if necessary, our up and down peers.
1832 updateUpAndDownPeers();
1837 pve.setPeerView(null);
1844 * Return from the local view, the PeerViewElement that is equal to the
1845 * given PeerViewDestination, if one exists or <code>null</code> if it is
1846 * not present. Identity is defined by {@link PeerViewDestination#equals}
1847 * which only looks at the destination address. Thus a PeerViewDestination
1848 * is enough. A full PeerViewElement may be passed as well. This method
1849 * does not require external synchronization.
1851 * @param wanted PeerViewDestination matching the desired one.
1852 * @return the matching PeerViewElement or <code>null</code> if it could not
1855 public PeerViewElement getPeerViewElement(PeerViewDestination wanted) {
1858 PeerViewElement found = (PeerViewElement) localView.tailSet(wanted).first();
1860 if (wanted.equals(found)) {
1863 } catch (NoSuchElementException nse) {// This can happen if the tailset is empty. We could test for it,
1864 // but it could still become empty after the test, since it reflects
1865 // concurrent changes to localView. Not worth synchronizing for that
1866 // rare occurence. The end-result is still correct.
1873 * Get from the local view, the PeerViewElement for the given PeerID, if one
1874 * exists. Null otherwise. This method does not require external
1877 * @param pid the PeerID of the desired element.
1878 * @return the matching PeerViewElement null if it could not be found.
1880 public PeerViewElement getPeerViewElement(ID pid) {
1882 return getPeerViewElement(new PeerViewDestination(pid));
1886 * Get the down peer from the local peer.
1888 * @return the down PeerViewElement or null if there is no such peer.
1890 public PeerViewElement getDownPeer() {
1895 * Get the local peer.
1897 * @return the local PeerViewElement
1899 public PeerViewElement getSelf() {
1904 * Get the up peer from the local peer.
1906 * @return the up PeerViewElement or null if there is no such peer.
1908 public PeerViewElement getUpPeer() {
1913 * update Up and Down Peers
1915 private void updateUpAndDownPeers() {
1917 synchronized (localView) {
1918 final PeerViewElement oldDown = downPeer;
1919 final PeerViewElement oldUp = upPeer;
1921 SortedSet<PeerViewDestination> headSet = localView.headSet(self);
1923 if (!headSet.isEmpty()) {
1924 downPeer = (PeerViewElement) headSet.last();
1929 SortedSet<PeerViewDestination> tailSet = localView.tailSet(self);
1931 if (!tailSet.isEmpty()) {
1932 if (self.equals(tailSet.first())) {
1933 Iterator eachTail = tailSet.iterator();
1935 eachTail.next(); // self
1937 if (eachTail.hasNext()) {
1938 upPeer = (PeerViewElement) eachTail.next();
1943 upPeer = (PeerViewElement) tailSet.first();
1949 if ((oldDown != downPeer) && (downPeer != null)) {
1950 downPeer.setLastUpdateTime(TimeUtils.timeNow());
1953 if ((oldUp != upPeer) && (upPeer != null)) {
1954 upPeer.setLastUpdateTime(TimeUtils.timeNow());
1960 * A task that checks on upPeer and downPeer.
1962 private final class WatchdogTask extends TimerTask {
1965 * The number of iterations that the watchdog task has executed.
1981 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1982 LOG.fine("Watchdog task executing for group " + PeerView.this.group.getPeerGroupID());
1987 if(0 == (iterations % 5)) {
1988 DiscoveryService discovery = group.getDiscoveryService();
1989 if(null != discovery) {
1990 discovery.publish(self.getRdvAdvertisement(), WATCHDOG_PERIOD * 10, WATCHDOG_PERIOD * 5);
1994 PeerViewElement up = PeerView.this.getUpPeer();
1997 if (TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), up.getLastUpdateTime()) > WATCHDOG_GRACE_DELAY) {
1998 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1999 LOG.warning("UP peer has gone MIA : " + up);
2002 notifyFailure(up, true);
2005 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
2006 LOG.fine("Checking on UP peer : " + up);
2009 PeerView.this.send(up, PeerView.this.getSelf(), false, false);
2013 PeerViewElement down = PeerView.this.getDownPeer();
2016 if (TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), down.getLastUpdateTime()) > WATCHDOG_GRACE_DELAY) {
2017 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
2018 LOG.warning("DOWN peer has gone MIA : " + down);
2021 notifyFailure(down, true);
2024 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
2025 LOG.fine("Checking on DOWN peer : " + down);
2028 PeerView.this.send(down, PeerView.this.getSelf(), false, false);
2031 } catch (Throwable all) {
2032 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
2033 LOG.log(Level.SEVERE, "Uncaught Throwable in thread :" + Thread.currentThread().getName(), all);
2043 * Class implementing the kicker
2045 private final class KickerTask extends TimerTask {
2057 PeerView.this.kick();
2058 } catch (Throwable all) {
2059 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
2060 LOG.log(Level.SEVERE, "Uncaught Throwable in thread : " + Thread.currentThread().getName(), all);