2 * Copyright (c) 2001-2007 Sun Microsystems, Inc. All rights reserved.
4 * The Sun Project JXTA(TM) Software License
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
9 * 1. Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
16 * 3. The end-user documentation included with the redistribution, if any, must
17 * include the following acknowledgment: "This product includes software
18 * developed by Sun Microsystems, Inc. for JXTA(TM) technology."
19 * Alternately, this acknowledgment may appear in the software itself, if
20 * and wherever such third-party acknowledgments normally appear.
22 * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must
23 * not be used to endorse or promote products derived from this software
24 * without prior written permission. For written permission, please contact
25 * Project JXTA at http://www.jxta.org.
27 * 5. Products derived from this software may not be called "JXTA", nor may
28 * "JXTA" appear in their name, without prior written permission of Sun.
30 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
31 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
32 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN
33 * MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
34 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
36 * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
37 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
38 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
39 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41 * JXTA is a registered trademark of Sun Microsystems, Inc. in the United
42 * States and other countries.
44 * Please see the license information page at :
45 * <http://www.jxta.org/project/www/license.html> for instructions on use of
46 * the license in source files.
48 * ====================================================================
50 * This software consists of voluntary contributions made by many individuals
51 * on behalf of Project JXTA. For more information on Project JXTA, please see
52 * http://www.jxta.org.
54 * This license is based on the BSD license adopted by the Apache Foundation.
56 package net.jxta.impl.cm;
58 import net.jxta.credential.Credential;
59 import net.jxta.document.MimeMediaType;
60 import net.jxta.document.StructuredDocument;
61 import net.jxta.id.ID;
62 import net.jxta.id.IDFactory;
63 import net.jxta.impl.protocol.ResolverSrdiMsgImpl;
64 import net.jxta.impl.protocol.SrdiMessageImpl;
65 import net.jxta.impl.util.JxtaHash;
66 import net.jxta.logging.Logging;
67 import net.jxta.membership.MembershipService;
68 import net.jxta.peer.PeerID;
69 import net.jxta.peergroup.PeerGroup;
70 import net.jxta.protocol.RdvAdvertisement;
71 import net.jxta.protocol.ResolverQueryMsg;
72 import net.jxta.protocol.ResolverSrdiMsg;
73 import net.jxta.protocol.SrdiMessage;
74 import net.jxta.rendezvous.RendezVousService;
75 import net.jxta.rendezvous.RendezVousStatus;
76 import net.jxta.rendezvous.RendezvousEvent;
77 import net.jxta.rendezvous.RendezvousListener;
78 import net.jxta.resolver.ResolverService;
80 import java.beans.PropertyChangeEvent;
81 import java.beans.PropertyChangeListener;
82 import java.math.BigInteger;
84 import java.net.URISyntaxException;
85 import java.text.MessageFormat;
86 import java.util.ArrayList;
87 import java.util.HashMap;
88 import java.util.Iterator;
89 import java.util.List;
91 import java.util.Random;
92 import java.util.SortedSet;
93 import java.util.TreeSet;
94 import java.util.Vector;
95 import java.util.concurrent.atomic.AtomicBoolean;
96 import java.util.logging.Level;
97 import java.util.logging.Logger;
100 * Srdi is a service which provides SRDI functionalities such as :
103 * <li>pushing of SRDI messages to a another peer/propagate</li>
104 * <li>replication of an SRDI Message to other peers in a given peerview</li>
105 * <li>given an expression SRDI provides a independently calculated starting point</li>
106 * <li>Forwarding a ResolverQuery, and taking care of hopCount, random selection</li>
107 * <li>registers with the RendezvousService to determine when to share SrdSRDIi Entries</li>
108 * and whether to push deltas, or full a index</li>
109 * <li>provides a SrdiInterface giving to provide a generic SRDI message definition</li>
112 * If Srdi is started as a thread it performs periodic SRDI pushes of
113 * indices and also has the ability to respond to rendezvous events.
115 * ResolverSrdiMessages define a ttl, to indicate to the receiving service
116 * whether to replicate such message or not.
118 * In addition A ResolverQuery defines a hopCount to indicate how many
119 * hops a query has been forwarded. This element could be used to detect/stop a
120 * query forward loopback hopCount is checked to make ensure a query is not
121 * forwarded more than twice.
123 * @see <a href="https://jxta-spec.dev.java.net/nonav/JXTAProtocols.html#proto-prp" target="_blank">JXTA Protocols Specification : Peer Resolver Protocol</a>
125 public class Srdi implements Runnable, RendezvousListener {
130 private final static Logger LOG = Logger.getLogger(Srdi.class.getName());
132 private PeerGroup group = null;
133 private String handlername = null;
134 private SrdiInterface srdiService = null;
135 private SrdiIndex srdiIndex;
136 private long connectPollInterval = 0;
137 private long pushInterval = 0;
139 private volatile boolean stop = false;
140 private AtomicBoolean republishSignal = new AtomicBoolean(false);
142 private ResolverService resolver;
143 private MembershipService membership;
144 private final JxtaHash jxtaHash = new JxtaHash();
145 private CredentialListener membershipCredListener = null;
146 private Credential credential = null;
147 private StructuredDocument credentialDoc = null;
148 private final String rdvEventLock;
151 * Random number generator used for random result selection
153 private final static Random random = new Random();
155 // This ought be to configurable/based on a function applied to the rpv size
157 * Replication threshold (minimum number of rdv's in peer view before replication)
159 public final static int RPV_REPLICATION_THRESHOLD = 3;
162 * Listener we use for membership property events.
164 private class CredentialListener implements PropertyChangeListener {
169 public void propertyChange(PropertyChangeEvent evt) {
170 if ("defaultCredential".equals(evt.getPropertyName())) {
171 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
172 LOG.fine("New default credential event");
175 synchronized (Srdi.this) {
176 credential = (Credential) evt.getNewValue();
177 credentialDoc = null;
178 if (null != credential) {
180 credentialDoc = credential.getDocument(MimeMediaType.XMLUTF8);
181 } catch (Exception all) {
182 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
183 LOG.log(Level.WARNING, "Could not generate credential document", all);
194 * Interface for pushing entries.
196 public interface SrdiInterface {
199 * Pushe SRDI entries.
201 * @param all if true then push all entries otherwise just push
202 * those which have changed since the last push.
204 void pushEntries(boolean all);
208 * Starts the Srdi Service. wait for connectPollInterval prior to
209 * pushing the index if connected to a rdv, otherwise index is
210 * as soon as the Rendezvous connect occurs
212 * @param group group context to operate in
213 * @param handlername the SRDI handlername
214 * @param srdiService the service utilizing this Srdi, for purposes of
215 * callback push entries on events such as rdv connect/disconnect, etc.
216 * @param srdiIndex The index instance associated with this service
217 * @param connectPollInterval initial timeout before the very first push of entries in milliseconds
218 * @param pushInterval the Interval at which the deltas are pushed in milliseconds
220 public Srdi(PeerGroup group, String handlername, SrdiInterface srdiService, SrdiIndex srdiIndex, long connectPollInterval, long pushInterval) {
223 this.handlername = handlername;
224 this.srdiService = srdiService;
225 this.srdiIndex = srdiIndex;
226 this.connectPollInterval = connectPollInterval;
227 this.pushInterval = pushInterval;
228 this.rdvEventLock = new String(handlername);
229 membership = group.getMembershipService();
231 resolver = group.getResolverService();
233 group.getRendezVousService().addListener(this);
235 synchronized (this) {
236 membershipCredListener = new CredentialListener();
237 membership.addPropertyChangeListener("defaultCredential", membershipCredListener);
240 credential = membership.getDefaultCredential();
242 if (null != credential) {
243 credentialDoc = credential.getDocument(MimeMediaType.XMLUTF8);
245 credentialDoc = null;
247 } catch (Exception all) {
248 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
249 LOG.log(Level.WARNING, "could not get credential", all);
256 * stop the current running thread
258 public synchronized void stop() {
266 RendezVousService rendezvous = group.getRendezVousService();
268 if (null != rendezvous) {
269 rendezvous.removeListener(this);
272 membership.removePropertyChangeListener("defaultCredential", membershipCredListener);
273 membershipCredListener = null;
276 synchronized (rdvEventLock) {
277 rdvEventLock.notify();
282 * Replicates a SRDI message to other rendezvous'
283 * entries are replicated by breaking out entries out of the message
284 * and sorted out into rdv distribution bins. after which smaller messages
285 * are sent to other rdv's
287 * @param srdiMsg srdi message to replicate
290 public void replicateEntries(SrdiMessage srdiMsg) {
292 List<PeerID> rpv = getGlobalPeerView();
294 if (srdiMsg.getScope() < SrdiMessage.REPLICATE || !group.isRendezvous() || rpv.size() < RPV_REPLICATION_THRESHOLD) {
298 Iterator allEntries = srdiMsg.getEntries().iterator();
299 Map<PeerID, SrdiMessageImpl> bins = new HashMap<PeerID, SrdiMessageImpl>(rpv.size());
301 while (allEntries.hasNext()) {
302 SrdiMessage.Entry entry = (SrdiMessage.Entry) allEntries.next();
303 PeerID destPeer = getReplicaPeer(srdiMsg.getPrimaryKey() + entry.key + entry.value);
305 if (destPeer == null || destPeer.equals(group.getPeerID())) {
306 // don't replicate message back to ourselves
309 SrdiMessageImpl sm = bins.get(destPeer);
312 sm = new SrdiMessageImpl();
313 sm.setPrimaryKey(srdiMsg.getPrimaryKey());
314 sm.setPeerID(srdiMsg.getPeerID());
315 bins.put(destPeer, sm);
320 for (PeerID destPeer : bins.keySet()) {
321 SrdiMessageImpl msg = bins.get(destPeer);
323 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
324 LOG.fine("[" + group.getPeerGroupName() + " / " + handlername + "] Forwarding replica Srdi to " + destPeer);
326 pushSrdi(destPeer, msg);
331 * Push an SRDI message to a peer
332 * ttl is 1, and therefore services receiving this message could
333 * choose to replicate this message
335 * @param peer peer to push message to, if peer is null it is
336 * the message is propagated
337 * @param srdi SRDI message to send
339 public void pushSrdi(ID peer, SrdiMessage srdi) {
341 ResolverSrdiMsg resSrdi = new ResolverSrdiMsgImpl(handlername, credential, srdi.toString());
344 resolver.sendSrdi(null, resSrdi);
346 resolver.sendSrdi(peer.toString(), resSrdi);
348 } catch (Exception e) {
349 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
350 LOG.log(Level.WARNING, "Failed to send srdi message", e);
356 * Forwards a Query to a specific peer
357 * hopCount is incremented to indicate this query is forwarded
359 * @param peer peerid to forward query to
360 * @param query The query
362 public void forwardQuery(PeerID peer, ResolverQueryMsg query) {
364 query.incrementHopCount();
365 if (query.getHopCount() > 2) {
366 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
367 LOG.fine("hopCount exceeded. Not forwarding query " + query.getHopCount());
369 // query has been forwarded too many times
372 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
373 LOG.fine(MessageFormat.format("[{0} / {1}] Forwarding Query to {2}",
374 group.getPeerGroupName(), handlername, peer));
376 resolver.sendQuery(peer.toString(), query);
380 * Forwards a Query to a list of peers
381 * hopCount is incremented to indicate this query is forwarded
383 * @param peers The peerids to forward query to
384 * @param query The query
386 public void forwardQuery(List<PeerID> peers, ResolverQueryMsg query) {
388 query.incrementHopCount();
389 if (query.getHopCount() > 2) {
390 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
391 LOG.fine(MessageFormat.format("hopCount exceeded not forwarding query {0}", query.getHopCount()));
393 // query has been forwarded too many times
397 for (PeerID destPeer : peers) {
398 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
399 LOG.fine(MessageFormat.format("[{0} / {1}] Forwarding Query to {2}",
400 group.getPeerGroupName(), handlername, destPeer));
402 resolver.sendQuery(destPeer.toString(), query);
407 * Forwards a Query to a list of peers
408 * if the list of peers exceeds threshold, and random threshold is picked
409 * from <code>peers</code>
410 * hopCount is incremented to indicate this query is forwarded
412 * @param peers The peerids to forward query to
413 * @param query The query
414 * @param threshold number of peers to forward the query to
416 public void forwardQuery(List<PeerID> peers, ResolverQueryMsg query, int threshold) {
418 if (query.getHopCount() > 2) {
419 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
420 LOG.fine(MessageFormat.format("[{0} / {1}] hopCount exceeded ({2}) not forwarding query.",
421 group.getPeerGroupName(), handlername, query.getHopCount()));
423 // query has been forwarded too many times
426 if (peers.size() <= threshold) {
427 forwardQuery(peers, query);
429 // pick some random entries out of the list
430 List<PeerID> newPeers = randomResult(peers, threshold);
431 forwardQuery(newPeers, query);
436 * returns a random List(threshold) from a given list
438 * @param result starting set
439 * @param threshold sub-set desired
440 * @return sub-list of result
442 protected List<PeerID> randomResult(List<PeerID> result, int threshold) {
443 if (threshold < result.size()) {
444 List<PeerID> res = new ArrayList<PeerID>(threshold);
445 for (int i = 0; i < threshold; i++) {
446 int rand = random.nextInt(result.size());
447 res.add(result.get(rand));
456 * Given an expression return a peer from the list peers in the peerview
457 * this function is used to to give a replication point, and entry point
460 * @param expression expression to derive the mapping from
461 * @return The replicaPeer value
463 public PeerID getReplicaPeer(String expression) {
465 List<PeerID> rpv = getGlobalPeerView();
467 if (rpv.size() >= RPV_REPLICATION_THRESHOLD) {
470 synchronized (jxtaHash) {
471 jxtaHash.update(expression);
472 digest = jxtaHash.getDigestInteger().abs();
474 BigInteger sizeOfSpace = java.math.BigInteger.valueOf(rpv.size());
475 BigInteger sizeOfHashSpace = BigInteger.ONE.shiftLeft(8 * digest.toByteArray().length);
476 int pos = (digest.multiply(sizeOfSpace)).divide(sizeOfHashSpace).intValue();
479 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
480 LOG.fine(MessageFormat.format("[{0} / {1}] Found a direct peer {2}", group.getPeerGroupName(), handlername, pid));
489 * forward srdi message to another peer
491 * @param peerid PeerID to forward query to
492 * @param srcPid The source originator
493 * @param primaryKey primary key
494 * @param secondarykey secondary key
495 * @param value value of the entry
496 * @param expiration expiration in ms
498 public void forwardSrdiMessage(PeerID peerid, PeerID srcPid, String primaryKey, String secondarykey, String value, long expiration) {
501 SrdiMessageImpl srdi = new SrdiMessageImpl(srcPid, // ttl of 0, avoids additional replication
502 0, primaryKey, secondarykey, value, expiration);
504 ResolverSrdiMsgImpl resSrdi = new ResolverSrdiMsgImpl(handlername, credential, srdi.toString());
506 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
507 LOG.fine(MessageFormat.format("[{0} / {1}] Forwarding a SRDI messsage of type {2} to {3}", group.getPeerGroupName(),
508 handlername, primaryKey, peerid));
510 resolver.sendSrdi(peerid.toString(), resSrdi);
511 } catch (Exception e) {
512 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
513 LOG.log(Level.WARNING, "Failed forwarding SRDI Message", e);
521 @SuppressWarnings("fallthrough")
522 public void rendezvousEvent(RendezvousEvent event) {
524 int theEventType = event.getType();
526 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
527 LOG.fine(MessageFormat.format("[{0} / {1}] Processing {2}", group.getPeerGroupName(), handlername, event));
530 switch (theEventType) {
532 case RendezvousEvent.RDVCONNECT:
533 synchronized (rdvEventLock) {
534 // wake up the publish thread now.
535 rdvEventLock.notify();
541 case RendezvousEvent.RDVRECONNECT:
542 // No need to wake up the publish thread; reconnect should not force indices to be published.
545 case RendezvousEvent.CLIENTCONNECT:
546 case RendezvousEvent.CLIENTRECONNECT:
547 case RendezvousEvent.BECAMERDV:
548 case RendezvousEvent.BECAMEEDGE:
549 // XXX 20031110 bondolo perhaps becoming edge one should cause it to wake up so that run() switch to
550 // don't do anything.
553 case RendezvousEvent.RDVFAILED:
554 case RendezvousEvent.RDVDISCONNECT:
555 republishSignal.set(true);
558 case RendezvousEvent.CLIENTFAILED:
559 case RendezvousEvent.CLIENTDISCONNECT:
560 // we should flush the cache for the peer
561 synchronized (rdvEventLock) {
562 if (group.isRendezvous() && (srdiIndex != null)) {
563 srdiIndex.remove((PeerID) event.getPeerID());
569 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
571 MessageFormat.format("[{0} / {1}] Unexpected RDV event {2}", group.getPeerGroupName(), handlername, event));
580 * Main processing method for the SRDI Worker thread
581 * Send all entries, wait for pushInterval, then send deltas
585 boolean waitingForRdv;
586 boolean republish = true;
590 // upon connection we will have to republish
591 republish |= republishSignal.compareAndSet(true, false);
592 waitingForRdv = group.isRendezvous() || !group.getRendezVousService().isConnectedToRendezVous() ||
593 group.getRendezVousService().getRendezVousStatus() == RendezVousStatus.ADHOC;
595 if (!waitingForRdv) {
596 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
597 LOG.fine("[" + group.getPeerGroupName() + " / " + handlername + "] Pushing "
598 + (republish ? "all entries" : "deltas"));
601 srdiService.pushEntries(republish);
605 synchronized (rdvEventLock) {
607 rdvEventLock.wait(waitingForRdv ? connectPollInterval : pushInterval);
608 } catch (InterruptedException e) {
609 Thread.interrupted();
613 } catch (Throwable all) {
614 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
615 LOG.log(Level.SEVERE,
616 "Uncaught Throwable in " + Thread.currentThread().getName() + "[" + group.getPeerGroupName() + " / "
617 + handlername + "]",all);
623 * get the global peerview as the rendezvous service only returns
624 * the peerview without the local RDV peer. We need this
625 * consistent view for the SRDI index if not each RDV will have a
626 * different peerview, off setting the index even when the peerview
629 * @return the sorted list
631 public Vector<PeerID> getGlobalPeerView() {
633 Vector<PeerID> global = new Vector<PeerID>();
634 SortedSet<String> set = new TreeSet<String>();
637 // get the local peerview
638 List<RdvAdvertisement> rpv = group.getRendezVousService().getLocalWalkView();
640 for (RdvAdvertisement padv : rpv) {
641 set.add(padv.getPeerID().toString());
645 set.add(group.getPeerID().toString());
647 // produce a vector of Peer IDs
648 for (String aSet : set) {
650 PeerID peerID = (PeerID) IDFactory.fromURI(new URI(aSet));
652 } catch (URISyntaxException badID) {
653 throw new IllegalArgumentException("Bad PeerID ID in advertisement");
654 } catch (ClassCastException badID) {
655 throw new IllegalArgumentException("ID was not a peerID");
658 } catch (Exception ex) {
659 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
660 LOG.log(Level.WARNING, "Failure generating the global view", ex);