]> sjero.net Git - linphone/blob - p2pproxy/dependencies-src/jxse-src-2.5/impl/src/net/jxta/impl/rendezvous/rpv/PeerView.java
44b006d83172ae8be5c9b5b429c163dad91c4f8d
[linphone] / p2pproxy / dependencies-src / jxse-src-2.5 / impl / src / net / jxta / impl / rendezvous / rpv / PeerView.java
1 /*
2  * Copyright (c) 2002-2007 Sun Micro//Systems, Inc.  All rights reserved.
3  *  
4  *  The Sun Project JXTA(TM) Software License
5  *  
6  *  Redistribution and use in source and binary forms, with or without 
7  *  modification, are permitted provided that the following conditions are met:
8  *  
9  *  1. Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *  
12  *  2. Redistributions in binary form must reproduce the above copyright notice, 
13  *     this list of conditions and the following disclaimer in the documentation 
14  *     and/or other materials provided with the distribution.
15  *  
16  *  3. The end-user documentation included with the redistribution, if any, must 
17  *     include the following acknowledgment: "This product includes software 
18  *     developed by Sun Microsystems, Inc. for JXTA(TM) technology." 
19  *     Alternately, this acknowledgment may appear in the software itself, if 
20  *     and wherever such third-party acknowledgments normally appear.
21  *  
22  *  4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must 
23  *     not be used to endorse or promote products derived from this software 
24  *     without prior written permission. For written permission, please contact 
25  *     Project JXTA at http://www.jxta.org.
26  *  
27  *  5. Products derived from this software may not be called "JXTA", nor may 
28  *     "JXTA" appear in their name, without prior written permission of Sun.
29  *  
30  *  THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
31  *  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND 
32  *  FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN 
33  *  MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
34  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 
35  *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, 
36  *  OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
37  *  LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
38  *  NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, 
39  *  EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
40  *  
41  *  JXTA is a registered trademark of Sun Microsystems, Inc. in the United 
42  *  States and other countries.
43  *  
44  *  Please see the license information page at :
45  *  <http://www.jxta.org/project/www/license.html> for instructions on use of 
46  *  the license in source files.
47  *  
48  *  ====================================================================
49  *  
50  *  This software consists of voluntary contributions made by many individuals 
51  *  on behalf of Project JXTA. For more information on Project JXTA, please see 
52  *  http://www.jxta.org.
53  *  
54  *  This license is based on the BSD license adopted by the Apache Foundation. 
55  */
56 package net.jxta.impl.rendezvous.rpv;
57
58 import java.io.IOException;
59 import java.net.URI;
60 import java.net.URISyntaxException;
61 import java.util.ArrayList;
62 import java.util.Arrays;
63 import java.util.Collections;
64 import java.util.HashSet;
65 import java.util.Iterator;
66 import java.util.List;
67 import java.util.NoSuchElementException;
68 import java.util.Random;
69 import java.util.Set;
70 import java.util.SortedSet;
71 import java.util.Timer;
72 import java.util.TimerTask;
73 import java.util.TreeSet;
74 import java.util.Vector;
75 import java.util.logging.Level;
76 import java.util.logging.Logger;
77
78 import net.jxta.discovery.DiscoveryService;
79 import net.jxta.document.Advertisement;
80 import net.jxta.document.AdvertisementFactory;
81 import net.jxta.document.MimeMediaType;
82 import net.jxta.document.StructuredDocumentFactory;
83 import net.jxta.document.XMLDocument;
84 import net.jxta.endpoint.EndpointAddress;
85 import net.jxta.endpoint.EndpointListener;
86 import net.jxta.endpoint.EndpointService;
87 import net.jxta.endpoint.Message;
88 import net.jxta.endpoint.MessageElement;
89 import net.jxta.endpoint.Messenger;
90 import net.jxta.endpoint.StringMessageElement;
91 import net.jxta.endpoint.TextDocumentMessageElement;
92 import net.jxta.id.ID;
93 import net.jxta.id.IDFactory;
94 import net.jxta.logging.Logging;
95 import net.jxta.peer.PeerID;
96 import net.jxta.peergroup.PeerGroup;
97 import net.jxta.pipe.InputPipe;
98 import net.jxta.pipe.OutputPipe;
99 import net.jxta.pipe.PipeID;
100 import net.jxta.pipe.PipeMsgEvent;
101 import net.jxta.pipe.PipeMsgListener;
102 import net.jxta.pipe.PipeService;
103 import net.jxta.protocol.ConfigParams;
104 import net.jxta.protocol.PeerAdvertisement;
105 import net.jxta.protocol.PipeAdvertisement;
106 import net.jxta.protocol.RdvAdvertisement;
107 import net.jxta.protocol.RouteAdvertisement;
108 import net.jxta.rendezvous.RendezvousEvent;
109 import net.jxta.rendezvous.RendezvousListener;
110
111 import net.jxta.impl.endpoint.EndpointUtils;
112 import net.jxta.impl.endpoint.relay.RelayReferralSeedingManager;
113 import net.jxta.impl.protocol.RdvConfigAdv;
114 import net.jxta.impl.rendezvous.RendezVousServiceImpl;
115 import net.jxta.impl.util.SeedingManager;
116 import net.jxta.impl.util.TimeUtils;
117 import net.jxta.impl.util.URISeedingManager;
118
119 /**
120  * This class models a Rendezvous Peer View (RPV):
121  * ordered collection of all other Rendezvous Peers visible to
122  * this peer.
123  * <p/>
124  * Presently this class implements a random "diffusion" algorithm
125  * where each Peer periodically selects a randomly selected peer advertisement
126  * from its view and sends it over to a randomly selected peer from its view.
127  * Over time, this causes every peer to learn about every other peer, resulting
128  * in a "converged" peer view.
129  * <p/>
130  * This diffusion process is bootstrapped by every peer sending their
131  * own peer advertisements to some well-known, stable, "seed" peers on
132  * startup.
133  */
134 public final class PeerView implements EndpointListener, RendezvousListener {
135
136     /**
137      * Logger
138      */
139     private static final transient Logger LOG = Logger.getLogger(PeerView.class.getName());
140
141     /**
142      * Our service name
143      */
144     static final String SERVICE_NAME = "PeerView";
145
146     /**
147      * Namespace used for rdv message elements.
148      */
149     static final String MESSAGE_NAMESPACE = "jxta";
150
151     /**
152      * Element name of outgoing messages. Note that the element contains a
153      * RdvAvertisement and <emphasis>not</emphasis> a Peer Advertisement.
154      */
155     static final String MESSAGE_ELEMENT_NAME = "PeerView.PeerAdv";
156
157     /**
158      * Element name of responses. Note that the element contains a
159      * RdvAvertisement and <emphasis>not</emphasis> a Peer Advertisement.
160      */
161     static final String RESPONSE_ELEMENT_NAME = "PeerView.PeerAdv.Response";
162
163     /**
164      * Message element name for PeerView "Cached" Message Element
165      */
166     static final String CACHED_RADV_ELEMENT_NAME = "PeerView.Cached";
167
168     /**
169      * Optional message element that specifies by its presence in a peerview
170      * message that the referenced peer is not the provider of the
171      * RdvAdvertisement and the advertisement is a "hint" or referral from the
172      * responding peer.
173      * <p/>
174      * In practice, when sending its own RdvAdvertisement, a peer does not
175      * include this element, but when sending another peer's RdvAdvertisement,
176      * this element is included.
177      */
178     static final MessageElement CACHED_RADV_ELEMENT = new StringMessageElement(CACHED_RADV_ELEMENT_NAME, Boolean.TRUE.toString(), null);
179
180     /**
181      * Message element name that specifies the route advertisement of the
182      * source of the message.
183      */
184     static final String SRCROUTEADV_ELEMENT_NAME = "PeerView.SrcRouteAdv";
185
186     /**
187      * Message element name for PeerView "Edge" Message Element
188      */
189     static final String EDGE_ELEMENT_NAME = "PeerView.EdgePeer";
190
191     /**
192      * Optional message element that specifies by its presence in a peerview
193      * message that the referenced peer is an edge peer and not a member of the
194      * peerview.
195      */
196     static final MessageElement EDGE_ELEMENT = new StringMessageElement(EDGE_ELEMENT_NAME, Boolean.TRUE.toString(), null);
197
198     /**
199      * Message element name for PeerView "Failure" Message Element
200      */
201     static final String FAILURE_ELEMENT_NAME = "PeerView.Failure";
202
203     /**
204      * Optional message element that specifies by its presence in a peerview
205      * message that the referenced peer has either failed or is quitting. If the
206      * "cached" element is also set then the error is being reported by a third
207      * party.
208      */
209     static final MessageElement FAILURE_ELEMENT = new StringMessageElement(FAILURE_ELEMENT_NAME, Boolean.TRUE.toString(), null);
210
211     /**
212      * This is the interval between adv exchange in seconds. This is
213      * the main tunable runtime parameter for the diffusion
214      * process. An interval that is too low will improve view
215      * consistency at the expense of gratuitous network traffic. On
216      * the other hand, an interval that is too high will cause the
217      * view to become inconsistent. It is desirable to err on the side
218      * of extra traffic.
219      */
220     private static final long DEFAULT_SEEDING_PERIOD = 5 * TimeUtils.ASECOND;
221
222     private static final long WATCHDOG_PERIOD = 30 * TimeUtils.ASECOND;
223     private static final long WATCHDOG_GRACE_DELAY = 5 * TimeUtils.AMINUTE;
224
225     private static final long DEFAULT_BOOTSTRAP_KICK_INTERVAL = 3 * TimeUtils.ASECOND;
226
227     private static final int MIN_BOOTLEVEL = 0;
228     private static final int BOOTLEVEL_INCREMENT = 1;
229     private static final int MAX_BOOTLEVEL = 6;
230
231     /**
232      * DEFAULT_SEEDING_RDVPEERS
233      * <p/>
234      * This value is the maximum number of rendezvous peers that will be
235      * send our own advertisement at boot time.
236      */
237     //private static final int DEFAULT_SEEDING_RDVPEERS = 5;
238
239     private final PeerGroup group;
240     
241     /**
242      *  The group in which our propagate pipe will run.
243      */
244     private final PeerGroup advertisingGroup;
245     private final RendezVousServiceImpl rdvService;
246     private final EndpointService endpoint;
247
248     /**
249      * The name of this PeerView.
250      * <p/>
251      * FIXME 20040623 bondolo This should be a CodatID.
252      */
253     private final String name;
254
255     /**
256      * Delay in relative milliseconds to apply before contacting seeding rdvs.
257      * 0 is supposed to be reserved by RdvConfig to mean "use the default".
258      * However, it is in fact a valid value and also the one we want for the default.
259      * The only problem with that is that it is not possible to configure this value
260      * explicitly, should it one day not be the default. The issue is actually in RdvConfig.
261      */
262     private long seedingRdvConnDelay = 0;
263
264     private final boolean useOnlySeeds;
265
266     private final SeedingManager seedingManager;
267
268     /**
269      * If the peerview is smaller than this we will try harder to find
270      * additional peerview members.
271      */
272     private int minHappyPeerView = 4;
273
274     /**
275      * A single timer is used to periodically kick each PeerView
276      * into activity. For the Random PeerView, this activity consists
277      * of selecting a PeerViewElement at random from its view and
278      * sending it across to a randomly-selected peer from its view.
279      * <p/>
280      * FIXME 20021121 lomax
281      * <p/>
282      * The original idea of using a common timer in order to save threads IS a
283      * very good idea. However, limitations, and actually, bugs, in java.util.Timer
284      * creates the following problems when using a static Timer:
285      * <p/>
286      * <ul>
287      * <li>Memory Leak: Canceling a TimerTask does not remove it from the
288      * execution queue of the Timer until the Timer is canceled or the
289      * TimerTask is fired. Since most of the TimerTasks are inner classes
290      * this can mean that the PeerView is held around for a long time.</li>
291      * <p/>
292      * <li>java.util.Timer is not only not real-time (which is more or less fine
293      * for the PeerView, but it sequentially invokes tasks (only one Thread
294      * per Timer). As a result, tasks that takes a long time to run delays
295      * other tasks.</li>
296      * </ul>
297      * <p/>
298      * The PeerView would function better with a better Timer, but JDK does
299      * not provide a standard timer that would fulfill the needs of the
300      * PeerView. Maybe we should implement a JXTA Timer, since lots of the JXTA
301      * services, by being very asynchronous, rely on the same kind of timer
302      * semantics as the PeerView. Until then, creating a Timer per instance of
303      * the PeerView (i.e. per instance of joined PeerGroup) is the best
304      * workaround.
305      */
306     private final Timer timer;
307
308     /**
309      * A random number generator.
310      */
311     private final static Random random = new Random();
312
313     /**
314      * List of scheduled tasks
315      */
316     private final Set<TimerTask> scheduledTasks = Collections.synchronizedSet(new HashSet<TimerTask>());
317
318     /**
319      * Describes the frequency and amount of effort we will spend updating
320      * the peerview.
321      */
322     private int bootLevel = MIN_BOOTLEVEL;
323
324     /**
325      * Earliest absolute time in milliseconds at which we will allow a reseed
326      * to take place.
327      */
328     private long earliestReseed = 0L;
329
330     private final String uniqueGroupId;
331
332     /**
333      * Listeners for PeerView Events.
334      */
335     private final Set<PeerViewListener> rpvListeners = Collections.synchronizedSet(new HashSet<PeerViewListener>());
336
337     /**
338      * Used for querying for pves.
339      */
340     private InputPipe wirePipeInputPipe = null;
341
342     /**
343      * Used for querying for pves.
344      */
345     private OutputPipe wirePipeOutputPipe = null;
346
347     /**
348      * Used for notifications about pve failures.
349      */
350     private InputPipe localGroupWirePipeInputPipe = null;
351
352     /**
353      * Used for notifications about pve failures.
354      */
355     private OutputPipe localGroupWirePipeOutputPipe = null;
356
357     /**
358      * A task which monitors the up and down peers in the peerview.
359      */
360     private WatchdogTask watchdogTask = null;
361
362     /**
363      * This is the accumulated view by an instance of this class.
364      */
365     private final SortedSet<PeerViewDestination> localView = Collections.synchronizedSortedSet(new TreeSet<PeerViewDestination>());
366
367     /**
368      * PVE for ourself.
369      * <p/>
370      * FIXME bondolo 20041015 This should be part of the local view.
371      */
372     private final PeerViewElement self;
373     private PeerViewElement upPeer = null;
374     private PeerViewElement downPeer = null;
375
376     private final PeerViewStrategy replyStrategy;
377
378     private final PeerViewStrategy kickRecipientStrategy;
379
380     private final PeerViewStrategy kickAdvertisementStrategy;
381
382     private final PeerViewStrategy refreshRecipientStrategy;
383
384     // PeerAdv tracking.
385     private PeerAdvertisement lastPeerAdv = null;
386     private int lastModCount = -1;
387
388     private final PipeAdvertisement localGroupWirePipeAdv;
389     private final PipeAdvertisement advGroupPropPipeAdv;
390
391     /**
392      * If <code>true</code> then this Peer View instance is closed and is
393      * shutting down.
394      */
395     private volatile boolean closed = false;
396
397     /**
398      * Get an instance of PeerView for the specified PeerGroup and Service.
399      *
400      * @param group            Peer Group in which this Peer View instance operates.
401      * @param advertisingGroup Peer Group in which this Peer View instance will
402      *                         advertise and broadcast its existence.
403      * @param rdvService       The rdvService we are to use.
404      * @param name             The identifying name for this Peer View instance.
405      */
406     public PeerView(PeerGroup group, PeerGroup advertisingGroup, RendezVousServiceImpl rdvService, String name) {
407         this.group = group;
408         this.advertisingGroup = advertisingGroup;
409         this.rdvService = rdvService;
410         this.name = name;
411
412         this.endpoint = group.getEndpointService();
413
414         this.uniqueGroupId = group.getPeerGroupID().getUniqueValue().toString();
415
416         timer = new Timer("PeerView Timer for " + group.getPeerGroupID(), true);
417
418         Advertisement adv = null;
419         ConfigParams confAdv = group.getConfigAdvertisement();
420
421         // Get the config. If we do not have a config, we're done; we just keep
422         // the defaults (edge peer/no auto-rdv)
423         if (confAdv != null) {
424             try {
425                 XMLDocument configDoc = (XMLDocument) confAdv.getServiceParam(rdvService.getAssignedID());
426
427                 if (null != configDoc) {
428                     adv = AdvertisementFactory.newAdvertisement(configDoc);
429                 }
430             } catch (java.util.NoSuchElementException failed) {// ignored
431             }
432         }
433
434         RdvConfigAdv rdvConfigAdv;
435
436         if (!(adv instanceof RdvConfigAdv)) {
437             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
438                 LOG.fine("Creating new RdvConfigAdv for defaults.");
439             }
440
441             rdvConfigAdv = (RdvConfigAdv) AdvertisementFactory.newAdvertisement(RdvConfigAdv.getAdvertisementType());
442         } else {
443             rdvConfigAdv = (RdvConfigAdv) adv;
444         }
445
446         if (rdvConfigAdv.getSeedRendezvousConnectDelay() > 0) {
447             seedingRdvConnDelay = rdvConfigAdv.getSeedRendezvousConnectDelay();
448         }
449
450         useOnlySeeds = rdvConfigAdv.getUseOnlySeeds();
451
452         if (rdvConfigAdv.getMinHappyPeerView() > 0) {
453             minHappyPeerView = rdvConfigAdv.getMinHappyPeerView();
454         }
455
456         URISeedingManager seedingManager;
457
458         if ((null == advertisingGroup) && rdvConfigAdv.getProbeRelays()) {
459             seedingManager = new RelayReferralSeedingManager(rdvConfigAdv.getAclUri(), useOnlySeeds, group, name);
460         } else {
461             seedingManager = new URISeedingManager(rdvConfigAdv.getAclUri(), useOnlySeeds, group, name);
462         }
463
464         for (URI aSeeder : Arrays.asList(rdvConfigAdv.getSeedingURIs())) {
465             seedingManager.addSeedingURI(aSeeder);
466         }
467
468         for (URI aSeed : Arrays.asList(rdvConfigAdv.getSeedRendezvous())) {
469             seedingManager.addSeed(aSeed);
470         }
471
472         this.seedingManager = seedingManager;
473
474         lastPeerAdv = group.getPeerAdvertisement();
475         lastModCount = lastPeerAdv.getModCount();
476
477         // create a new local RdvAdvertisement and set it to self.
478         RdvAdvertisement radv = createRdvAdvertisement(lastPeerAdv, name);
479
480         self = new PeerViewElement(endpoint, radv);
481
482         // addPeerViewElement( self );
483
484         // setup endpoint listener
485         endpoint.addIncomingMessageListener(this, SERVICE_NAME, uniqueGroupId);
486
487         // add rendezvous listener
488         rdvService.addListener(this);
489
490         // initialize strategies
491         replyStrategy = new PeerViewRandomWithReplaceStrategy(localView);
492
493         kickRecipientStrategy = new PeerViewRandomStrategy(localView);
494
495         kickAdvertisementStrategy = new PeerViewRandomWithReplaceStrategy(localView);
496
497         refreshRecipientStrategy = new PeerViewSequentialStrategy(localView);
498
499         localGroupWirePipeAdv = makeWirePipeAdvertisement(group, group, name);
500
501         if (null != advertisingGroup) {
502             advGroupPropPipeAdv = makeWirePipeAdvertisement(advertisingGroup, group, name);
503         } else {
504             advGroupPropPipeAdv = null;
505         }
506
507         if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
508             LOG.info( "PeerView created for group \"" + group.getPeerGroupName() +
509                     "\" [" + group.getPeerGroupID() + "] name \"" + name + "\"");
510         }
511     }
512
513     /**
514      * {@inheritDoc}
515      * <p/>
516      * Listener for "PeerView"/&lt;peergroup-unique-id> and propagate pipes.
517      */
518     public void processIncomingMessage(Message msg, EndpointAddress srcAddr, EndpointAddress dstAddr) {
519
520         // check what kind of message this is (response or not).
521         boolean isResponse = false;
522         MessageElement me = msg.getMessageElement(MESSAGE_NAMESPACE, MESSAGE_ELEMENT_NAME);
523
524         if (me == null) {
525             me = msg.getMessageElement(MESSAGE_NAMESPACE, RESPONSE_ELEMENT_NAME);
526             if (me == null) {
527                 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
528                     LOG.warning("Discarding damaged " + msg + ".");
529                 }
530                 return;
531             } else {
532                 isResponse = true;
533             }
534         }
535
536         Advertisement adv;
537
538         try {
539             XMLDocument asDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(me);
540
541             adv = AdvertisementFactory.newAdvertisement(asDoc);
542         } catch (RuntimeException failed) {
543             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
544                 LOG.log(Level.WARNING, "Failed building rdv advertisement from message element", failed);
545             }
546             return;
547         } catch (IOException failed) {
548             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
549                 LOG.log(Level.WARNING, "Failed building rdv advertisement from message element", failed);
550             }
551             return;
552         }
553
554         if (!(adv instanceof RdvAdvertisement)) {
555             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
556                 LOG.warning("Response does not contain radv (" + adv.getAdvertisementType() + ")");
557             }
558             return;
559         }
560
561         RdvAdvertisement radv = (RdvAdvertisement) adv;
562
563         if (null == radv.getRouteAdv()) {
564             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
565                 LOG.warning("Rdv Advertisement does not contain route.");
566             }
567             return;
568         }
569
570         // See if we can find a src route adv in the message.
571         me = msg.getMessageElement(MESSAGE_NAMESPACE, SRCROUTEADV_ELEMENT_NAME);
572         if (me != null) {
573             try {
574                 XMLDocument asDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(me);
575                 Advertisement routeAdv = AdvertisementFactory.newAdvertisement(asDoc);
576
577                 if (!(routeAdv instanceof RouteAdvertisement)) {
578                     if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
579                         LOG.warning("Advertisement is not a RouteAdvertisement");
580                     }
581                 } else {
582                     RouteAdvertisement rdvRouteAdv = radv.getRouteAdv().clone();
583
584                     // XXX we stich them together even if in the end it gets optimized away
585                     RouteAdvertisement.stichRoute(rdvRouteAdv, (RouteAdvertisement) routeAdv);
586                     radv.setRouteAdv(rdvRouteAdv);
587                 }
588             } catch (RuntimeException failed) {
589                 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
590                     LOG.log(Level.WARNING, "Failed building route adv from message element", failed);
591                 }
592             } catch (IOException failed) {
593                 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
594                     LOG.log(Level.WARNING, "Failed building route adv from message element", failed);
595                 }
596             }
597         }
598         me = null;
599
600         // Is this a message about ourself?
601         if (group.getPeerID().equals(radv.getPeerID())) {
602             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
603                 LOG.fine("Received a PeerView message about self. Discard.");
604             }
605
606             return;
607         }
608
609         // Collect the various flags.
610
611         boolean isFailure = (msg.getMessageElement(MESSAGE_NAMESPACE, FAILURE_ELEMENT_NAME) != null);
612         boolean isCached = (msg.getMessageElement(MESSAGE_NAMESPACE, CACHED_RADV_ELEMENT_NAME) != null);
613         boolean isFromEdge = (msg.getMessageElement(MESSAGE_NAMESPACE, EDGE_ELEMENT_NAME) != null);
614         boolean isTrusted = isFromEdge || seedingManager.isAcceptablePeer(radv.getRouteAdv());
615
616         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
617             String srcPeer = srcAddr.toString();
618
619             if ("jxta".equals(srcAddr.getProtocolName())) {
620                 try {
621                     String idstr = ID.URIEncodingName + ":" + ID.URNNamespace + ":" + srcAddr.getProtocolAddress();
622
623                     ID asID = IDFactory.fromURI(new URI(idstr));
624
625                     PeerViewElement pve = getPeerViewElement(asID);
626
627                     if (null != pve) {
628                         srcPeer = "\"" + pve.getRdvAdvertisement().getName() + "\"";
629                     }
630                 } catch (URISyntaxException failed) {// ignored
631                 }
632             }
633
634             LOG.fine(
635                     "[" + group.getPeerGroupID() + "] Received a" + (isCached ? " cached" : "") + (isResponse ? " response" : "")
636                     + (isFailure ? " failure" : "") + " message (" + msg.toString() + ")" + (isFromEdge ? " from edge" : "")
637                     + " regarding \"" + radv.getName() + "\" from " + srcPeer);
638         }
639
640         if (!isTrusted) {
641             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
642                 LOG.warning("Rejecting peerview message from " + radv.getPeerID());
643             }
644             return;
645         }
646
647         // if this is a notification failure. All we have to do is locally
648         // process the failure
649         if (isFailure) {
650             notifyFailure(radv.getPeerID(), false);
651             return;
652         }
653
654         handlePeerViewMessage(isResponse, isCached, isFromEdge, isTrusted, radv);
655     }
656
657     /**
658      * Following the extraction of a peerview message from a
659      */
660     private void handlePeerViewMessage(boolean isResponse, boolean isCached, boolean isFromEdge, boolean isTrusted, RdvAdvertisement radv) {
661
662         // Figure out if we know that peer already. If we do, reuse the pve
663         // that we have.
664         boolean isNewbie = false;
665         boolean added = false;
666         PeerViewElement pve;
667
668         synchronized (localView) {
669             PeerViewElement newbie = new PeerViewElement(endpoint, radv);
670
671             pve = getPeerViewElement(newbie);
672
673             if (null == pve) {
674                 pve = newbie;
675                 isNewbie = true;
676             }
677
678             if (!isFromEdge && !isCached && isTrusted) {
679                 if (isNewbie) {
680                     added = addPeerViewElement(pve);
681                 } else {
682                     pve.setRdvAdvertisement(radv);
683                 }
684             }
685         }
686
687         if (!isNewbie && isFromEdge && !isCached) {
688             // The message stated that it is from an edge we believed was a 
689             // peerview member. Best thing to do is tell everyone that it's no 
690             // longer in peerview.
691             notifyFailure(pve, true);
692             // we continue processing because it's not the other peer's fault we had the wrong idea.
693         }
694
695         // Do the rest of the add related tasks out of synch.
696         // We must not nest any possibly synchronized ops in
697         // the LocalView lock; it's the lowest level.
698
699         if (added) {
700             // Notify local listeners
701             generateEvent(PeerViewEvent.ADD, pve);
702         }
703
704         /*
705          * Now, see what if any message we have to send as a result.
706          * There are three kinds of messages we can send:
707          *
708          * - A response with ourselves, if we're being probed and we're
709          * a rdv.
710          *
711          * - A probe to the peer whose adv we received, because we want
712          * confirmation that it's alive.
713          *
714          * - A response with a random adv from our cache if we're being probed
715          *
716          * We may send more than one message.
717          */
718
719         boolean status;
720
721         if (!isCached) {
722             if (!isResponse) {
723                 // Type 1: Respond to probe
724                 //
725                 // We are being probed by an edge peer or peerview member. We respond
726                 // with our own advertisement.
727                 status = send(pve, self, true, false);
728
729                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
730                     LOG.fine("Type 1 (Respond with self PVE) : Sent to " + pve + " result =" + status);
731                 }
732
733                 // Type 3: Respond with random entry from our PV when we are probed.
734                 //
735                 // Respond with a strategized adv from our view.
736                 PeerViewElement sendpve = replyStrategy.next();
737
738                 if ((sendpve != null) && !pve.equals(sendpve) && !self.equals(sendpve)) {
739                     status = send(pve, sendpve, true, false);
740                     if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
741                         LOG.fine("Type 3 (Respond with random PVE) : Sent " + sendpve + " to " + pve + " result=" + status);
742                     }
743                 }
744             } else {
745                 // Heartbeat: do nothing.
746             }
747         } else if (isResponse) {
748             if (isNewbie && !useOnlySeeds && !isFromEdge) {
749                 // Type 2: Probe a peer we have just learned about from a referral.
750                 //
751                 // If useOnlySeeds, we're not allowed to talk to peers other than our 
752                 // seeds, so do not probe anything we learn from 3rd party. (Probing of
753                 // seeds happens as part of the "kick" strategy).
754                 status = send(pve, self, false, false);
755
756                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
757                     LOG.fine("Type 2 (Probe PVE) : Probed " + pve + " result=" + status);
758                 }
759             } else {
760                 // Already known or ignoring: do nothing.
761             }
762         } else {
763             // Invalid : do nothing.
764         }
765     }
766
767     /**
768      * {@inheritDoc}
769      */
770     @SuppressWarnings("fallsthrough")
771     public void rendezvousEvent(RendezvousEvent event) {
772
773         if (closed) {
774             return;
775         }
776
777         boolean notifyFailure = false;
778
779         synchronized (this) {
780
781             int theEventType = event.getType();
782
783             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
784                 LOG.fine("[" + group.getPeerGroupName() + "] Processing  " + event);
785             }
786
787             refreshSelf();
788
789             if ((RendezvousEvent.BECAMERDV == theEventType) || (RendezvousEvent.BECAMEEDGE == theEventType)) {
790                 // kill any existing watchdog task
791                 if (null != watchdogTask) {
792                     removeTask(watchdogTask);
793                     watchdogTask.cancel();
794                     watchdogTask = null;
795                 }
796             }
797
798             switch (theEventType) {
799             case RendezvousEvent.RDVCONNECT:
800             case RendezvousEvent.RDVRECONNECT:
801             case RendezvousEvent.CLIENTCONNECT:
802             case RendezvousEvent.CLIENTRECONNECT:
803             case RendezvousEvent.RDVFAILED:
804             case RendezvousEvent.RDVDISCONNECT:
805             case RendezvousEvent.CLIENTFAILED:
806             case RendezvousEvent.CLIENTDISCONNECT:
807                 break;
808
809             case RendezvousEvent.BECAMERDV:
810                 openWirePipes();
811                 watchdogTask = new WatchdogTask();
812                 addTask(watchdogTask, WATCHDOG_PERIOD, WATCHDOG_PERIOD);
813                 rescheduleKick(true);
814                 break;
815
816             case RendezvousEvent.BECAMEEDGE:
817                 openWirePipes();
818                 if (!localView.isEmpty()) {
819                     // FIXME bondolo 20040229 since we likely don't have a
820                     // rendezvous connection, it is kind of silly to be sending
821                     // this now. probably should wait until we get a rendezvous
822                     // connection.
823                     notifyFailure = true;
824                 }
825                 rescheduleKick(true);
826                 break;
827
828             default:
829                 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
830                     LOG.warning("[" + group.getPeerGroupName() + "] Unexpected RDV event : " + event);
831                 }
832                 break;
833             }
834         }
835
836         // we can't do the notification under synchronization.
837         if (notifyFailure) {
838             notifyFailure(self, true);
839         }
840     }
841
842     public void start() {// do nothing for now... all the good stuff happens as a result of
843         // rendezvous events.
844     }
845
846     public void stop() {
847
848         synchronized (this) {
849             // Only one thread gets to perform the shutdown.
850             if (closed) {
851                 return;
852             }
853             closed = true;
854         }
855
856         // notify other rendezvous peers that we are going down
857         notifyFailure(self, true);
858
859         // From now on we can nullify everything we want. Other threads check
860         // the closed flag where it matters.
861         synchronized (this) {
862             if (watchdogTask != null) {
863                 removeTask(watchdogTask);
864                 watchdogTask.cancel();
865                 watchdogTask = null;
866             }
867
868             // Remove message listener.
869             endpoint.removeIncomingMessageListener(SERVICE_NAME, uniqueGroupId);
870
871             // Remove rendezvous listener.
872             rdvService.removeListener(this);
873
874             // Remove all our pending scheduled tasks
875             // Carefull with the indices while removing: do it backwards, it's
876             // cheaper and simpler.
877
878             synchronized (scheduledTasks) {
879                 Iterator<TimerTask> eachTask = scheduledTasks.iterator();
880
881                 while (eachTask.hasNext()) {
882                     try {
883                         TimerTask task = eachTask.next();
884
885                         task.cancel();
886                         eachTask.remove();
887                     } catch (Exception ez1) {
888                         if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
889                             LOG.log(Level.WARNING, "Cannot cancel task: ", ez1);
890                         }
891                     }
892                 }
893             }
894
895             // Make sure that we close our WirePipes
896             closeWirePipes();
897
898             // Let go of the up and down peers.
899             downPeer = null;
900             upPeer = null;
901             localView.clear();
902
903             timer.cancel();
904
905             rpvListeners.clear();
906         }
907     }
908
909     protected void addTask(TimerTask task, long delay, long interval) {
910
911         synchronized (scheduledTasks) {
912             if (scheduledTasks.contains(task)) {
913                 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
914                     LOG.warning("Task list already contains specified task.");
915                 }
916             }
917             scheduledTasks.add(task);
918         }
919
920         if (interval >= 1) {
921             timer.schedule(task, delay, interval);
922         } else {
923             timer.schedule(task, delay);
924         }
925     }
926
927     protected void removeTask(TimerTask task) {
928         scheduledTasks.remove(task);
929     }
930
931     /**
932      * Adds the specified URI to the list of seeds. Even if useOnlySeeds is in
933      * effect, this seed may now be used, as if it was part of the initial
934      * configuration.
935      *
936      * @param seed the URI of the seed rendezvous.
937      */
938     public void addSeed(URI seed) {
939         if (seedingManager instanceof URISeedingManager) {
940             ((URISeedingManager) seedingManager).addSeed(seed);
941         }
942     }
943
944     /**
945      * Probe the specified peer immediately.
946      * <p/>
947      * Note: If "useOnlySeeds" is in effect and the peer is not a seed, any response to this probe will be ignored.
948      */
949     public boolean probeAddress(EndpointAddress address, RouteAdvertisement hint) {
950
951         PeerViewElement holdIt;
952
953         synchronized (localView) {
954             holdIt = self;
955         }
956
957         return send(address, hint, holdIt, false, false);
958     }
959
960     /**
961      * Send our own advertisement to all of the seed rendezvous.
962      */
963     public void seed() {
964         long reseedRemaining = earliestReseed - TimeUtils.timeNow();
965
966         if (reseedRemaining > 0) {
967             // Too early; the previous round is not even done.
968             if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
969                 LOG.info("Still Seeding for " + reseedRemaining + "ms.");
970             }
971             return;
972         }
973
974         if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
975             LOG.info("New Seeding...");
976         }
977
978         // Schedule sending propagated query to our local network neighbors.
979         send(null, null, self, false, false);
980
981         long iterations = 0;
982
983         if (localView.size() < minHappyPeerView) {
984             // We only do these things if we don't have a "happy" Peer View.
985             // If the Peer View is already "happy" then we will use only
986             // Peer View referrals for learning of new entires.
987
988             List<RouteAdvertisement> seedRdvs = new ArrayList<RouteAdvertisement>(
989                     Arrays.asList(seedingManager.getActiveSeedRoutes()));
990
991             while (!seedRdvs.isEmpty()) {
992                 RouteAdvertisement aSeed = seedRdvs.remove(0);
993
994                 if (null == aSeed.getDestPeerID()) {
995                     // It is an incomplete route advertisement. We are going to assume that it is only a wrapper for a single ea.
996                     Vector<String> seed_eas = aSeed.getDest().getVectorEndpointAddresses();
997
998                     if (!seed_eas.isEmpty()) {
999                         EndpointAddress aSeedHost = new EndpointAddress(seed_eas.get(0));
1000
1001                         // XXX 20061220 bondolo We could check all of our current PVEs to make sure that this address is not already known.
1002
1003                         send(aSeedHost, null, self, false, false);
1004                     }
1005                 } else {
1006                     // We have a full route, send it to the virtual address of the route!
1007                     // FIXME malveaux 20070816 Second part of conjunct can be removed once 'self' is included in the peerview
1008                     if ((null == getPeerViewElement(aSeed.getDestPeerID())) && !group.getPeerID().equals(aSeed.getDestPeerID())) {
1009                         EndpointAddress aSeedHost = new EndpointAddress("jxta", aSeed.getDestPeerID().getUniqueValue().toString(),
1010                                 null, null);
1011
1012                         send(aSeedHost, aSeed, self, false, false);
1013                     }
1014                 }
1015             }
1016
1017             if (!useOnlySeeds) {
1018                 // If use only seeds, we're not allowed to put in the peerview
1019                 // anything but our seed rdvs. So, we've done everything that
1020                 // was required.
1021
1022                 // Schedule sending propagated query to our advertising group
1023                 if (advertisingGroup != null) {
1024                     // send it, but not immediately.
1025                     scheduleAdvertisingGroupQuery(DEFAULT_SEEDING_PERIOD * 2);
1026                 }
1027             }
1028         }
1029
1030         earliestReseed = TimeUtils.toAbsoluteTimeMillis(seedingRdvConnDelay + (DEFAULT_SEEDING_PERIOD * iterations));
1031     }
1032
1033     /**
1034      * Make sure that the PeerView properly changes behavior, when switching
1035      * from edge mode to rdv mode, and vice-versa.
1036      * Since openWirePipes() requires some other services such as the Pipe
1037      * Service, and since updateStatus is invoked this work must happen in
1038      * background, giving a chance to other services to be started.
1039      */
1040     private class OpenPipesTask extends TimerTask {
1041
1042         /**
1043          * {@inheritDoc}
1044          */
1045         @Override
1046         public void run() {
1047             try {
1048                 if (closed) {
1049                     return;
1050                 }
1051
1052                 openWirePipes();
1053             } catch (Throwable all) {
1054                 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
1055                     LOG.log(Level.SEVERE, "Uncaught Throwable in thread: " + Thread.currentThread().getName(), all);
1056                 }
1057             } finally {
1058                 removeTask(this);
1059             }
1060         }
1061     }
1062
1063     private void scheduleOpenPipes(long delay) {
1064
1065         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1066             LOG.fine("Scheduling open pipes attempt in " + delay + "ms.");
1067         }
1068
1069         addTask(new OpenPipesTask(), delay, -1);
1070     }
1071
1072     /**
1073      * Send a PeerView Message to the specified peer.
1074      *
1075      * @param response indicates whether this is a response. Otherwise
1076      *                 we may create a distributed loop where peers keep perpetually
1077      *                 responding to each-other.
1078      * @param failure  Construct the message as a failure notification.
1079      */
1080     private boolean send(PeerViewElement dest, PeerViewElement pve, boolean response, boolean failure) {
1081
1082         Message msg = makeMessage(pve, response, failure);
1083
1084         boolean result = dest.sendMessage(msg, SERVICE_NAME, uniqueGroupId);
1085
1086         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1087             LOG.fine("Sending " + msg + " to " + dest + " success = " + result);
1088         }
1089
1090         return result;
1091     }
1092
1093     /**
1094      * Send a PeerView Message to the specified peer.
1095      *
1096      * @param response indicates whether this is a response. Otherwise
1097      *                 we may create a distributed loop where peers keep perpetually
1098      *                 responding to each-other.
1099      * @param failure  Construct the message as a failure notification.
1100      */
1101     private boolean send(EndpointAddress dest, RouteAdvertisement hint, PeerViewElement pve, boolean response, boolean failure) {
1102
1103         Message msg = makeMessage(pve, response, failure);
1104
1105         if (null != dest) {
1106             EndpointAddress realAddr = new EndpointAddress(dest, SERVICE_NAME, uniqueGroupId);
1107
1108             Messenger messenger = rdvService.endpoint.getMessengerImmediate(realAddr, hint);
1109
1110             if (null != messenger) {
1111                 try {
1112                     boolean result = messenger.sendMessage(msg);
1113
1114                     if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1115                         LOG.fine("Sending " + msg + " to " + dest + " success = " + result);
1116                     }
1117
1118                     return result;
1119                 } catch (IOException failed) {
1120                     if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1121                         LOG.log(Level.WARNING, "Could not send " + msg + " to " + dest, failed);
1122                     }
1123                     return false;
1124                 }
1125             } else {
1126                 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1127                     LOG.warning("Could not get messenger for " + dest);
1128                 }
1129
1130                 return false;
1131             }
1132         } else {
1133             // Else, propagate the message.
1134             try {
1135                 endpoint.propagate(msg, SERVICE_NAME, uniqueGroupId);
1136
1137                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1138                     LOG.fine("Sent " + msg + " via propagate");
1139                 }
1140                 return true;
1141             } catch (IOException ez) {
1142                 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1143                     // Pretty strange. This has little basis for failure...
1144                     LOG.log(Level.WARNING, "Could not propagate " + msg, ez);
1145                 }
1146                 return false;
1147             }
1148         }
1149     }
1150
1151     /**
1152      * Send a PeerView Message to the specified peer.
1153      *
1154      * @param response indicates whether this is a response. Otherwise
1155      *                 we may create a distributed loop where peers keep perpetually
1156      *                 responding to each-other.
1157      * @param failure  Construct the message as a failure notification.
1158      * @param dest destination output pipe
1159      * @param pve the peer view element
1160      * @return  true if successful
1161      */
1162     private boolean send(OutputPipe dest, PeerViewElement pve, boolean response, boolean failure) {
1163
1164         Message msg = makeMessage(pve, response, failure);
1165
1166         try {
1167             return dest.send(msg);
1168         } catch (IOException ez) {
1169             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1170                 LOG.log(Level.WARNING, "Could not send " + msg, ez);
1171             }
1172             return false;
1173         }
1174     }
1175
1176     /**
1177      * Make a PeerView Message
1178      *
1179      * @param content the peer view element
1180      * @param response the response
1181      * @param failure whether to create a message based on a failure
1182      * @return the message
1183      */
1184     private Message makeMessage(PeerViewElement content, boolean response, boolean failure) {
1185
1186         Message msg = new Message();
1187
1188         // // edge peers add an identifying element, RDV peers do not
1189         // if (!rdvService.isRendezVous()) {
1190         // msg.addMessageElement(MESSAGE_NAMESPACE, EDGE_ELEMENT);
1191         // }
1192         //
1193         if (failure) {
1194             // This is a failure notification.
1195             msg.addMessageElement(MESSAGE_NAMESPACE, FAILURE_ELEMENT);
1196         }
1197
1198         refreshSelf();
1199
1200         RdvAdvertisement radv = content.getRdvAdvertisement();
1201
1202         XMLDocument doc = (XMLDocument) radv.getDocument(MimeMediaType.XMLUTF8);
1203         String msgName = response ? RESPONSE_ELEMENT_NAME : MESSAGE_ELEMENT_NAME;
1204
1205         MessageElement msge = new TextDocumentMessageElement(msgName, doc, null);
1206
1207         msg.addMessageElement(MESSAGE_NAMESPACE, msge);
1208
1209         if (!content.equals(self)) {
1210             // This is a cached RdvAdvertisement
1211             msg.addMessageElement(MESSAGE_NAMESPACE, CACHED_RADV_ELEMENT);
1212
1213             // This message contains an RdvAdvertisement which is not ourself. In that
1214             // case, it is wise to also send the local route advertisement (as the optional
1215             // SrcRdvAdv) so the destination might have a better change to access the "content"
1216             // RendezvousAdv (this peer will then act as a hop).
1217
1218             RouteAdvertisement localra = EndpointUtils.extractRouteAdv(lastPeerAdv);
1219
1220             if (localra != null) {
1221                 try {
1222                     XMLDocument radoc = (XMLDocument) localra.getDocument(MimeMediaType.XMLUTF8);
1223
1224                     msge = new TextDocumentMessageElement(SRCROUTEADV_ELEMENT_NAME, radoc, null);
1225                     msg.addMessageElement(MESSAGE_NAMESPACE, msge);
1226                 } catch (Exception ez1) {
1227                     if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1228                         LOG.log(Level.WARNING, "Could not create optional src route adv for " + content, ez1);
1229                     }
1230                 }
1231             }
1232         }
1233
1234         return msg;
1235     }
1236
1237     /**
1238      * Invoked by anyone in order to inform the PeerView of a failure
1239      * of one of the member peers.
1240      *
1241      * @param pid              ID of the peer which failed.
1242      * @param propagateFailure If <tt>true</tt>then broadcast the failure to
1243      *                         other peers otherwise only update the local peerview.
1244      */
1245     public void notifyFailure(PeerID pid, boolean propagateFailure) {
1246
1247         PeerViewElement pve = getPeerViewElement(pid);
1248
1249         if (null != pve) {
1250             notifyFailure(pve, propagateFailure);
1251         }
1252     }
1253
1254     /**
1255      * Invoked when a peerview member peer becomes unreachable.
1256      *
1257      * @param pve              The peer which failed.
1258      * @param propagateFailure If {@code true} then broadcast the failure to
1259      *                         other peers otherwise only update the local peerview.
1260      */
1261     void notifyFailure(PeerViewElement pve, boolean propagateFailure) {
1262
1263         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1264             LOG.fine("Notifying failure of " + pve);
1265         }
1266
1267         try {
1268             boolean removedFromPeerView = removePeerViewElement(pve);
1269
1270             // only propagate if we actually knew of the peer
1271             propagateFailure &= (removedFromPeerView || (self == pve));
1272
1273             // Notify local listeners
1274             if (removedFromPeerView) {
1275                 generateEvent(PeerViewEvent.FAIL, pve);
1276             }
1277
1278             boolean emptyPeerView = localView.isEmpty();
1279
1280             // If the local view has become empty, reset the kicker into
1281             // a seeding mode.
1282             if (emptyPeerView && removedFromPeerView) {
1283                 rescheduleKick(true);
1284             }
1285
1286             if (propagateFailure) {
1287                 // Notify other rendezvous peers that there has been a failure.
1288                 OutputPipe op = localGroupWirePipeOutputPipe;
1289
1290                 if (null != op) {
1291                     if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1292                         LOG.fine("Propagating failure of " + pve);
1293                     }
1294
1295                     send(op, pve, true, true);
1296                 }
1297             }
1298         } catch (Exception ez) {
1299             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1300                 LOG.log(Level.WARNING, "Failure while generating noficiation of failure of PeerView : " + pve, ez);
1301             }
1302         }
1303     }
1304
1305     /**
1306      * Invoked by the Timer thread to cause each PeerView to initiate
1307      * a Peer Advertisement exchange.
1308      */
1309     private void kick() {
1310
1311         try {
1312             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1313                 LOG.fine("Begun kick() in " + group.getPeerGroupID());
1314             }
1315
1316             // Use seed strategy. (it has its own throttling and resource limiting).
1317             seed();
1318
1319             // refresh ourself to a peer in our view
1320             PeerViewElement refreshee = refreshRecipientStrategy.next();
1321
1322             if ((refreshee != null) && (self != refreshee)) {
1323                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1324                     LOG.fine("Refresh " + refreshee);
1325                 }
1326                 send(refreshee, self, false, false);
1327             }
1328
1329             // now share an adv from our local view to another peer from our
1330             // local view.
1331
1332             PeerViewElement recipient = kickRecipientStrategy.next();
1333
1334             if (recipient == null) {
1335                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1336                     LOG.fine("No recipient to send adv ");
1337                 }
1338                 return;
1339             }
1340
1341             PeerViewElement rpve = kickAdvertisementStrategy.next();
1342
1343             if (rpve == null) {
1344                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1345                     LOG.fine("No adv to send");
1346                 }
1347                 return;
1348             }
1349
1350             if (rpve.equals(recipient) || self.equals(recipient)) {
1351                 // give up: no point in sending a peer its own adv
1352                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1353                     LOG.fine("adv to send is same as recipient: Nothing to do.");
1354                 }
1355                 return;
1356             }
1357
1358             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1359                 LOG.fine("Sending adv " + rpve + " to " + recipient);
1360             }
1361
1362             send(recipient, rpve, true, false);
1363         } finally {
1364             rescheduleKick(false);
1365         }
1366     }
1367
1368     /**
1369      * Choose a boot level appropriate for the current configuration and state.
1370      *
1371      * @return the new boot level.
1372      */
1373     private int adjustBootLevel() {
1374
1375         boolean areWeHappy = localView.size() >= minHappyPeerView;
1376
1377         // increment boot level faster if we have a reasonable peerview.
1378         int increment = areWeHappy ? BOOTLEVEL_INCREMENT : BOOTLEVEL_INCREMENT * 2;
1379
1380         // if we don't have a reasonable peerview, we continue to try harder.
1381         int maxbootlevel = MAX_BOOTLEVEL - (areWeHappy ? 0 : BOOTLEVEL_INCREMENT);
1382
1383         bootLevel = Math.min(maxbootlevel, bootLevel + increment);
1384
1385         return bootLevel;
1386     }
1387
1388     private synchronized void rescheduleKick(boolean now) {
1389
1390         if (closed) {
1391             return;
1392         }
1393
1394         // Set the next iteration
1395         try {
1396             if (now) {
1397                 bootLevel = MIN_BOOTLEVEL;
1398             } else {
1399                 adjustBootLevel();
1400             }
1401
1402             long tilNextKick = DEFAULT_BOOTSTRAP_KICK_INTERVAL * ((1L << bootLevel) - 1);
1403
1404             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1405                 LOG.fine(
1406                         "Scheduling kick in " + (tilNextKick / TimeUtils.ASECOND) + " seconds at bootLevel " + bootLevel
1407                         + " in group " + group.getPeerGroupID());
1408             }
1409
1410             KickerTask task = new KickerTask();
1411
1412             addTask(task, tilNextKick, -1);
1413         } catch (Exception ez1) {
1414             if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
1415                 LOG.log(Level.SEVERE, "Cannot set timer. RPV will not work.", ez1);
1416             }
1417         }
1418     }
1419
1420     /**
1421      * Refresh the local copy of the peer advertisement and the rendezvous
1422      * advertisement.
1423      */
1424     private void refreshSelf() {
1425
1426         RdvAdvertisement radv;
1427
1428         synchronized (this) {
1429             PeerAdvertisement newPadv = group.getPeerAdvertisement();
1430             int newModCount = newPadv.getModCount();
1431
1432             if ((lastPeerAdv != newPadv) || (lastModCount != newModCount)) {
1433                 lastPeerAdv = newPadv;
1434                 lastModCount = newModCount;
1435
1436                 // create a new local RdvAdvertisement and set it to self.
1437                 radv = createRdvAdvertisement(lastPeerAdv, name);
1438
1439                 if (radv != null) {
1440                     self.setRdvAdvertisement(radv);
1441                 }
1442             }
1443         }
1444     }
1445
1446     static RdvAdvertisement createRdvAdvertisement(PeerAdvertisement padv, String serviceName) {
1447
1448         try {
1449             // FIX ME: 10/19/2002 lomax@jxta.org. We need to properly set up the service ID. Unfortunately
1450             // this current implementation of the PeerView takes a String as a service name and not its ID.
1451             // Since currently, there is only PeerView per group (all peerviews share the same "service", this
1452             // is not a problem, but that will have to be fixed eventually.
1453
1454             // create a new RdvAdvertisement
1455             RdvAdvertisement rdv = (RdvAdvertisement) AdvertisementFactory.newAdvertisement(
1456                     RdvAdvertisement.getAdvertisementType());
1457
1458             rdv.setPeerID(padv.getPeerID());
1459             rdv.setGroupID(padv.getPeerGroupID());
1460             rdv.setServiceName(serviceName);
1461             rdv.setName(padv.getName());
1462
1463             RouteAdvertisement ra = EndpointUtils.extractRouteAdv(padv);
1464
1465             // Insert it into the RdvAdvertisement.
1466             rdv.setRouteAdv(ra);
1467
1468             return rdv;
1469         } catch (Exception ez) {
1470             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1471                 LOG.log(Level.WARNING, "Cannot create Local RdvAdvertisement: ", ez);
1472             }
1473             return null;
1474         }
1475     }
1476
1477     /**
1478      * Add a listener for PeerViewEvent
1479      *
1480      * @param listener An PeerViewListener to process the event.
1481      * @return  true if successful
1482      */
1483     public boolean addListener(PeerViewListener listener) {
1484         boolean added = rpvListeners.add(listener);
1485
1486         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1487             LOG.fine("Registered PeerViewEvent Listener (" + listener.getClass().getName() + ")");
1488         }
1489
1490         return added;
1491     }
1492
1493     /**
1494      * Removes a PeerViewEvent Listener previously added with addListener.
1495      *
1496      * @param listener the PeerViewListener listener remove
1497      * @return whether successful or not
1498      */
1499     public boolean removeListener(PeerViewListener listener) {
1500         boolean removed = rpvListeners.remove(listener);
1501
1502         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1503             LOG.fine("Removed PeerViewEvent Listener (" + listener.getClass().getName() + ")");
1504         }
1505
1506         return removed;
1507     }
1508
1509     /**
1510      * Generate a PeerView Event and notify all listeners.
1511      *
1512      * @param type    the Event Type.
1513      * @param element The peer having the event.
1514      */
1515     private void generateEvent(int type, PeerViewElement element) {
1516
1517         PeerViewEvent newevent = new PeerViewEvent(this, type, element);
1518
1519         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1520             LOG.fine("Calling listeners for " + newevent + " in group " + group.getPeerGroupID());
1521         }
1522
1523         for (Object o : Arrays.asList(rpvListeners.toArray())) {
1524             PeerViewListener pvl = (PeerViewListener) o;
1525
1526             try {
1527                 pvl.peerViewEvent(newevent);
1528             } catch (Throwable ignored) {
1529                 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
1530                     LOG.log(Level.SEVERE, "Uncaught Throwable in PeerViewEvent listener : (" + pvl.getClass().getName() + ")"
1531                             ,
1532                             ignored);
1533                 }
1534             }
1535         }
1536     }
1537
1538     static PipeAdvertisement makeWirePipeAdvertisement(PeerGroup destGroup, PeerGroup group, String name) {
1539
1540         PipeAdvertisement adv = (PipeAdvertisement) AdvertisementFactory.newAdvertisement(PipeAdvertisement.getAdvertisementType());
1541
1542         // Create a pipe advertisement for this group.
1543         // Generate a well known but unique ID.
1544         // FIXME bondolo 20040507 The ID created is really poor, it has only
1545         // 2 unique bytes on average. it would be much better to hash something
1546         // also, since the the definition of how to use the seed bytes is not
1547         // fixed, it's not reliable.
1548         PipeID pipeId = IDFactory.newPipeID(destGroup.getPeerGroupID()
1549                 ,
1550                 (SERVICE_NAME + group.getPeerGroupID().getUniqueValue().toString() + name).getBytes());
1551
1552         adv.setPipeID(pipeId);
1553         adv.setType(PipeService.PropagateType);
1554         adv.setName(SERVICE_NAME + " pipe for " + group.getPeerGroupID());
1555
1556         return adv;
1557     }
1558
1559     private synchronized void openWirePipes() {
1560
1561         PipeService pipes = group.getPipeService();
1562
1563         if (null == pipes) {
1564             scheduleOpenPipes(TimeUtils.ASECOND); // Try again in one second.
1565             return;
1566         }
1567
1568         try {
1569             // First, listen to in our own PeerGroup
1570             if (null == localGroupWirePipeInputPipe) {
1571                 localGroupWirePipeInputPipe = pipes.createInputPipe(localGroupWirePipeAdv, new WirePipeListener());
1572             }
1573
1574             if (null == localGroupWirePipeOutputPipe) {
1575                 // Creates the OutputPipe - note that timeout is irrelevant for
1576                 // propagated pipe.
1577
1578                 localGroupWirePipeOutputPipe = pipes.createOutputPipe(localGroupWirePipeAdv, 1 * TimeUtils.ASECOND);
1579             }
1580
1581             if (localGroupWirePipeOutputPipe == null) {
1582                 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1583                     LOG.warning("Cannot get OutputPipe for current group");
1584                 }
1585             }
1586         } catch (Exception failed) {
1587             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1588                 LOG.fine("PipeService not ready yet. Trying again in 1 second.");
1589             }
1590             // Try again in one second.
1591             scheduleOpenPipes(TimeUtils.ASECOND);
1592             return;
1593         }
1594
1595         if (advertisingGroup != null) {
1596             try {
1597                 pipes = advertisingGroup.getPipeService();
1598
1599                 if (null == pipes) {
1600                     // Try again in one second.
1601                     scheduleOpenPipes(TimeUtils.ASECOND);
1602                     return;
1603                 }
1604
1605                 if (null == wirePipeInputPipe) {
1606                     wirePipeInputPipe = pipes.createInputPipe(advGroupPropPipeAdv, new WirePipeListener());
1607                 }
1608
1609                 if (null == wirePipeOutputPipe) {
1610                     wirePipeOutputPipe = pipes.createOutputPipe(advGroupPropPipeAdv, 1 * TimeUtils.ASECOND);
1611                 }
1612
1613                 if (wirePipeOutputPipe == null) {
1614                     if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1615                         LOG.warning("Cannot get OutputPipe for current group");
1616                     }
1617                 }
1618             } catch (Exception failed) {
1619                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1620                     LOG.fine("Could not open pipes in local group. Trying again in 1 second.");
1621                 }
1622                 // Try again in one second.
1623                 scheduleOpenPipes(TimeUtils.ASECOND);
1624                 return;
1625             }
1626         }
1627
1628         if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
1629             LOG.info("Propagate Pipes opened.");
1630         }
1631     }
1632
1633     private synchronized void closeWirePipes() {
1634
1635         if (localGroupWirePipeInputPipe != null) {
1636             localGroupWirePipeInputPipe.close();
1637             localGroupWirePipeInputPipe = null;
1638         }
1639
1640         if (localGroupWirePipeOutputPipe != null) {
1641             localGroupWirePipeOutputPipe.close();
1642             localGroupWirePipeOutputPipe = null;
1643         }
1644
1645         if (wirePipeInputPipe != null) {
1646             wirePipeInputPipe.close();
1647             wirePipeInputPipe = null;
1648         }
1649
1650         if (wirePipeOutputPipe != null) {
1651             wirePipeOutputPipe.close();
1652             wirePipeOutputPipe = null;
1653         }
1654
1655         if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
1656             LOG.info("Propagate Pipes closed.");
1657         }
1658     }
1659
1660     /**
1661      * Adapter class for receiving wire pipe messages
1662      */
1663     private class WirePipeListener implements PipeMsgListener {
1664
1665         /**
1666          * {@inheritDoc}
1667          */
1668         public void pipeMsgEvent(PipeMsgEvent event) {
1669
1670             Message msg = event.getMessage();
1671
1672             boolean failure = (null != msg.getMessageElement(MESSAGE_NAMESPACE, FAILURE_ELEMENT_NAME));
1673             boolean response = (null != msg.getMessageElement(MESSAGE_NAMESPACE, RESPONSE_ELEMENT_NAME));
1674
1675             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1676                 LOG.fine(
1677                         "Received a PeerView " + (failure ? "failure " : "") + (response ? "response " : "") + "message [" + msg
1678                         + "] on propagated pipe " + event.getPipeID());
1679             }
1680
1681             if (!failure && !response) {
1682
1683                 // If this is not a failure message then decide if we will respond.
1684                 //
1685                 // We play a game that is tuned by the view size so that the expectation of number of responses is equal to
1686                 // minHappyPeerView. The game is to draw a number between 0 and the pv size.  If the result is < minHappyPeerView,
1687                 // then we win (respond) else we lose (stay silent). The probability of winning is HAPPY_SIZE/viewsize. If each of
1688                 // the viewsize peers plays the same game, on average HAPPY_SIZE of them win (with a significant variance, but
1689                 // that is good enough). If viewsize is <= HAPPY_SIZE, then all respond.  This is approximate, of course, since
1690                 // the view size is not always consistent among peers.
1691
1692                 int viewsize = PeerView.this.localView.size();
1693
1694                 if (viewsize > minHappyPeerView) {
1695                     int randinview = random.nextInt(viewsize);
1696
1697                     if (randinview >= minHappyPeerView) {
1698                         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1699                             LOG.fine("Ignoring " + msg + " from pipe " + event.getPipeID());
1700                         }
1701                         // We "lose".
1702                         return;
1703                     }
1704                 } // Else, we always win; don't bother playing.
1705             }
1706
1707             // Fabricate dummy src and dst addrs so that we can call processIncoming. These are
1708             // only used for traces. The merit of using the pipeID is that it is recognizable
1709             // in these traces.
1710             EndpointAddress src = new EndpointAddress(event.getPipeID(), SERVICE_NAME, null);
1711             EndpointAddress dest = new EndpointAddress(event.getPipeID(), SERVICE_NAME, null);
1712
1713             try {
1714                 // call the peerview.
1715                 PeerView.this.processIncomingMessage(msg, src, dest);
1716             } catch (Throwable ez) {
1717                 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1718                     LOG.log(Level.WARNING, "Failed processing " + msg + " from pipe " + event.getPipeID(), ez);
1719                 }
1720             }
1721         }
1722     }
1723
1724     private synchronized void scheduleAdvertisingGroupQuery(long delay) {
1725
1726         if (closed) {
1727             return;
1728         }
1729
1730         TimerTask task = new AdvertisingGroupQueryTask();
1731
1732         addTask(task, delay, -1);
1733     }
1734
1735     /**
1736      * Class implementing the query request on the AdvertisingGroup
1737      */
1738     private final class AdvertisingGroupQueryTask extends TimerTask {
1739
1740         /**
1741          * {@inheritDoc}
1742          */
1743         @Override
1744         public boolean cancel() {
1745             boolean res = super.cancel();
1746             return res;
1747         }
1748
1749         /**
1750          * {@inheritDoc}
1751          */
1752         @Override
1753         public void run() {
1754             try {
1755                 if (closed) {
1756                     return;
1757                 }
1758
1759                 OutputPipe op = wirePipeOutputPipe;
1760
1761                 if (null != op) {
1762                     Message msg = makeMessage(self, false, false);
1763
1764                     op.send(msg);
1765                 }
1766             } catch (Throwable all) {
1767                 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
1768                     LOG.log(Level.SEVERE, "Uncaught Throwable in thread :" + Thread.currentThread().getName(), all);
1769                 }
1770             } finally {
1771                 removeTask(this);
1772             }
1773         }
1774     }
1775
1776     /**
1777      * Get a copy of the PeerView for this group.
1778      *
1779      * @return A SortedSet which is the current local view of the peerview
1780      */
1781     public SortedSet<PeerViewElement> getView() {
1782         synchronized (localView) {
1783             return new TreeSet<PeerViewElement>((SortedSet)localView);
1784         }
1785     }
1786
1787     /**
1788      * Add the provided element to the local peerview.
1789      *
1790      * @param pve the <code>PeerViewElement</code> to add.
1791      * @return <code>true</true> if the element was not present and added
1792      *         otherwise <code>false</code>.
1793      */
1794     private boolean addPeerViewElement(PeerViewElement pve) {
1795         boolean added;
1796
1797         if (null == pve.getRdvAdvertisement()) {
1798             throw new IllegalStateException("Cannot add a seed pve to local view");
1799         }
1800
1801         synchronized (localView) {
1802             added = localView.add(pve);
1803
1804             if (added) {
1805                 // Refresh, if necessary, our up and down peers.
1806                 updateUpAndDownPeers();
1807             }
1808         }
1809
1810         if (added) {
1811             pve.setPeerView(this);
1812         }
1813
1814         return added;
1815     }
1816
1817     /**
1818      * Remove the provided element from the local peerview.
1819      *
1820      * @param pve the <code>PeerViewElement</code> to remove.
1821      * @return <code>true</true> if the element was present and removed
1822      *         otherwise <code>false</code>.
1823      */
1824     private boolean removePeerViewElement(PeerViewElement pve) {
1825         boolean removed;
1826
1827         synchronized (localView) {
1828             removed = localView.remove(pve);
1829
1830             if (removed) {
1831                 // Refresh, if necessary, our up and down peers.
1832                 updateUpAndDownPeers();
1833             }
1834         }
1835
1836         if (removed) {
1837             pve.setPeerView(null);
1838         }
1839
1840         return removed;
1841     }
1842
1843     /**
1844      * Return from the local view, the PeerViewElement that is equal to the
1845      * given PeerViewDestination, if one exists or <code>null</code> if it is
1846      * not present. Identity is defined by {@link PeerViewDestination#equals}
1847      * which only looks at the destination address. Thus a PeerViewDestination
1848      * is enough. A full PeerViewElement may be passed as well.  This method
1849      * does not require external synchronization.
1850      *
1851      * @param wanted PeerViewDestination matching the desired one.
1852      * @return the matching PeerViewElement or <code>null</code> if it could not
1853      *         be found.
1854      */
1855     public PeerViewElement getPeerViewElement(PeerViewDestination wanted) {
1856
1857         try {
1858             PeerViewElement found = (PeerViewElement) localView.tailSet(wanted).first();
1859
1860             if (wanted.equals(found)) {
1861                 return found;
1862             }
1863         } catch (NoSuchElementException nse) {// This can happen if the tailset is empty. We could test for it,
1864             // but it could still become empty after the test, since it reflects
1865             // concurrent changes to localView. Not worth synchronizing for that
1866             // rare occurence. The end-result is still correct.
1867         }
1868
1869         return null;
1870     }
1871
1872     /**
1873      * Get from the local view, the PeerViewElement for the given PeerID, if one
1874      * exists. Null otherwise. This method does not require external
1875      * synchronization.
1876      *
1877      * @param pid the PeerID of the desired element.
1878      * @return the matching PeerViewElement null if it could not be found.
1879      */
1880     public PeerViewElement getPeerViewElement(ID pid) {
1881
1882         return getPeerViewElement(new PeerViewDestination(pid));
1883     }
1884
1885     /**
1886      * Get the down peer from the local peer.
1887      *
1888      * @return the down PeerViewElement or null if there is no such peer.
1889      */
1890     public PeerViewElement getDownPeer() {
1891         return downPeer;
1892     }
1893
1894     /**
1895      * Get the local peer.
1896      *
1897      * @return the local PeerViewElement
1898      */
1899     public PeerViewElement getSelf() {
1900         return self;
1901     }
1902
1903     /**
1904      * Get the up peer from the local peer.
1905      *
1906      * @return the up PeerViewElement or null if there is no such peer.
1907      */
1908     public PeerViewElement getUpPeer() {
1909         return upPeer;
1910     }
1911
1912     /**
1913      * update Up and Down Peers
1914      */
1915     private void updateUpAndDownPeers() {
1916
1917         synchronized (localView) {
1918             final PeerViewElement oldDown = downPeer;
1919             final PeerViewElement oldUp = upPeer;
1920
1921             SortedSet<PeerViewDestination> headSet = localView.headSet(self);
1922
1923             if (!headSet.isEmpty()) {
1924                 downPeer = (PeerViewElement) headSet.last();
1925             } else {
1926                 downPeer = null;
1927             }
1928
1929             SortedSet<PeerViewDestination> tailSet = localView.tailSet(self);
1930
1931             if (!tailSet.isEmpty()) {
1932                 if (self.equals(tailSet.first())) {
1933                     Iterator eachTail = tailSet.iterator();
1934
1935                     eachTail.next(); // self
1936
1937                     if (eachTail.hasNext()) {
1938                         upPeer = (PeerViewElement) eachTail.next();
1939                     } else {
1940                         upPeer = null;
1941                     }
1942                 } else {
1943                     upPeer = (PeerViewElement) tailSet.first();
1944                 }
1945             } else {
1946                 upPeer = null;
1947             }
1948
1949             if ((oldDown != downPeer) && (downPeer != null)) {
1950                 downPeer.setLastUpdateTime(TimeUtils.timeNow());
1951             }
1952
1953             if ((oldUp != upPeer) && (upPeer != null)) {
1954                 upPeer.setLastUpdateTime(TimeUtils.timeNow());
1955             }
1956         }
1957     }
1958
1959     /**
1960      * A task that checks on upPeer and downPeer.
1961      */
1962     private final class WatchdogTask extends TimerTask {
1963
1964         /**
1965          *  The number of iterations that the watchdog task has executed.
1966          */
1967         int iterations = 0;
1968         
1969         WatchdogTask() {}
1970
1971         /**
1972          * {@inheritDoc}
1973          */
1974         @Override
1975         public void run() {
1976             try {
1977                 if (closed) {
1978                     return;
1979                 }
1980
1981                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1982                     LOG.fine("Watchdog task executing for group " + PeerView.this.group.getPeerGroupID());
1983                 }
1984
1985                 refreshSelf();
1986                 
1987                 if(0 == (iterations % 5)) {
1988                     DiscoveryService discovery = group.getDiscoveryService();
1989                     if(null != discovery) {
1990                         discovery.publish(self.getRdvAdvertisement(), WATCHDOG_PERIOD * 10, WATCHDOG_PERIOD * 5);
1991                     }
1992                 }
1993                 
1994                 PeerViewElement up = PeerView.this.getUpPeer();
1995
1996                 if (up != null) {
1997                     if (TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), up.getLastUpdateTime()) > WATCHDOG_GRACE_DELAY) {
1998                         if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1999                             LOG.warning("UP peer has gone MIA : " + up);
2000                         }
2001
2002                         notifyFailure(up, true);
2003
2004                     } else {
2005                         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
2006                             LOG.fine("Checking on UP peer : " + up);
2007                         }
2008
2009                         PeerView.this.send(up, PeerView.this.getSelf(), false, false);
2010                     }
2011                 }
2012
2013                 PeerViewElement down = PeerView.this.getDownPeer();
2014
2015                 if (down != null) {
2016                     if (TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), down.getLastUpdateTime()) > WATCHDOG_GRACE_DELAY) {
2017                         if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
2018                             LOG.warning("DOWN peer has gone MIA : " + down);
2019                         }
2020
2021                         notifyFailure(down, true);
2022
2023                     } else {
2024                         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
2025                             LOG.fine("Checking on DOWN peer : " + down);
2026                         }
2027
2028                         PeerView.this.send(down, PeerView.this.getSelf(), false, false);
2029                     }
2030                 }
2031             } catch (Throwable all) {
2032                 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
2033                     LOG.log(Level.SEVERE, "Uncaught Throwable in thread :" + Thread.currentThread().getName(), all);
2034                 }
2035             }
2036             
2037             iterations++;
2038         }
2039     }
2040
2041
2042     /**
2043      * Class implementing the kicker
2044      */
2045     private final class KickerTask extends TimerTask {
2046
2047         /**
2048          * {@inheritDoc}
2049          */
2050         @Override
2051         public void run() {
2052             try {
2053                 if (closed) {
2054                     return;
2055                 }
2056
2057                 PeerView.this.kick();
2058             } catch (Throwable all) {
2059                 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
2060                     LOG.log(Level.SEVERE, "Uncaught Throwable in thread : " + Thread.currentThread().getName(), all);
2061                 }
2062             } finally {
2063                 removeTask(this);
2064             }
2065         }
2066     }
2067 }