2 * Copyright (c) 2001-2007 Sun Microsystems, Inc. All rights reserved.
4 * The Sun Project JXTA(TM) Software License
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
9 * 1. Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
16 * 3. The end-user documentation included with the redistribution, if any, must
17 * include the following acknowledgment: "This product includes software
18 * developed by Sun Microsystems, Inc. for JXTA(TM) technology."
19 * Alternately, this acknowledgment may appear in the software itself, if
20 * and wherever such third-party acknowledgments normally appear.
22 * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must
23 * not be used to endorse or promote products derived from this software
24 * without prior written permission. For written permission, please contact
25 * Project JXTA at http://www.jxta.org.
27 * 5. Products derived from this software may not be called "JXTA", nor may
28 * "JXTA" appear in their name, without prior written permission of Sun.
30 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
31 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
32 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN
33 * MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
34 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
36 * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
37 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
38 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
39 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41 * JXTA is a registered trademark of Sun Microsystems, Inc. in the United
42 * States and other countries.
44 * Please see the license information page at :
45 * <http://www.jxta.org/project/www/license.html> for instructions on use of
46 * the license in source files.
48 * ====================================================================
50 * This software consists of voluntary contributions made by many individuals
51 * on behalf of Project JXTA. For more information on Project JXTA, please see
52 * http://www.jxta.org.
54 * This license is based on the BSD license adopted by the Apache Foundation.
56 package net.jxta.impl.rendezvous;
58 import net.jxta.endpoint.EndpointAddress;
59 import net.jxta.endpoint.EndpointListener;
60 import net.jxta.endpoint.Message;
61 import net.jxta.endpoint.Messenger;
62 import net.jxta.endpoint.TextDocumentMessageElement;
63 import net.jxta.id.ID;
64 import net.jxta.id.IDFactory;
65 import net.jxta.impl.endpoint.EndpointUtils;
66 import net.jxta.impl.rendezvous.rdv.RdvPeerRdvService;
67 import net.jxta.impl.rendezvous.rendezvousMeter.RendezvousMeterBuildSettings;
68 import net.jxta.impl.rendezvous.rpv.PeerViewElement;
69 import net.jxta.logging.Logging;
70 import net.jxta.peergroup.PeerGroup;
71 import net.jxta.protocol.PeerAdvertisement;
72 import net.jxta.protocol.RouteAdvertisement;
74 import java.io.IOException;
76 import java.net.URISyntaxException;
77 import java.util.Arrays;
78 import java.util.Enumeration;
79 import java.util.List;
80 import java.util.Timer;
81 import java.util.logging.Level;
82 import java.util.logging.Logger;
85 * Base class for providers which implement the JXTA Standard Rendezvous
88 * @see <a href="https://jxta-spec.dev.java.net/nonav/JXTAProtocols.html#proto-rvp" target="_blank">JXTA Protocols Specification : Rendezvous Protocol</a>
90 public abstract class StdRendezVousService extends RendezVousServiceProvider {
95 private final static Logger LOG = Logger.getLogger(StdRendezVousService.class.getName());
97 public final static String ConnectRequest = "Connect";
98 public final static String DisconnectRequest = "Disconnect";
99 public final static String ConnectedPeerReply = "ConnectedPeer";
100 public final static String ConnectedLeaseReply = "ConnectedLease";
101 public final static String ConnectedRdvAdvReply = "RdvAdvReply";
104 * Default Maximum TTL.
106 protected static final int DEFAULT_MAX_TTL = 200;
108 protected final String pName;
109 protected final String pParam;
112 * The registered handler for messages using the Standard Rendezvous
115 * @see <a href="https://jxta-spec.dev.java.net/nonav/JXTAProtocols.html#proto-rvp" target="_blank">JXTA Protocols Specification : Rendezvous Protocol
117 private StdRdvProtocolListener handler;
119 protected final Timer timer;
122 * Interface for listeners to : <assignedID>/<group-unique>
124 protected interface StdRdvProtocolListener extends EndpointListener {}
129 * @param group the PeerGroup
130 * @param rdvService the parent rendezvous service
132 protected StdRendezVousService(PeerGroup group, RendezVousServiceImpl rdvService) {
134 super(group, rdvService);
136 MAX_TTL = DEFAULT_MAX_TTL;
138 pName = rdvService.getAssignedID().toString();
139 pParam = group.getPeerGroupID().getUniqueValue().toString();
141 timer = new Timer("StdRendezVousService Timer for " + group.getPeerGroupID(), true);
147 protected int startApp(String[] argv, StdRdvProtocolListener handler) {
149 this.handler = handler;
151 rdvService.endpoint.addIncomingMessageListener(handler, pName, null);
153 return super.startApp(argv);
160 public void stopApp() {
161 EndpointListener shouldbehandler = rdvService.endpoint.removeIncomingMessageListener(pName, null);
163 if (handler != shouldbehandler) {
164 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
165 LOG.warning("Unregistered listener was not as expected." + handler + " != " + shouldbehandler);
178 public void processReceivedMessage(Message message, RendezVousPropagateMessage propHdr, EndpointAddress srcAddr, EndpointAddress dstAddr) {
180 if (srcAddr.getProtocolName().equalsIgnoreCase("jxta")) {
181 String idstr = ID.URIEncodingName + ":" + ID.URNNamespace + ":" + srcAddr.getProtocolAddress();
185 peerid = IDFactory.fromURI(new URI(idstr));
186 } catch (URISyntaxException badID) {
187 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
188 LOG.log(Level.WARNING, "Bad ID in message", badID);
193 if (!group.getPeerID().equals(peerid)) {
194 PeerConnection pConn = getPeerConnection(peerid);
199 if (this instanceof RdvPeerRdvService) {
201 pve = ((RdvPeerRdvService) this).rpv.getPeerViewElement(peerid);
207 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
208 LOG.fine("Received " + message + " (" + propHdr.getMsgId() + ") from unrecognized peer : " + peerid);
211 propHdr.setTTL(Math.min(propHdr.getTTL(), 3)); // will be reduced during repropagate stage.
213 // FIXME 20040503 bondolo need to add tombstones so that we don't end up spamming disconnects.
214 if (rdvService.isRendezVous() || (getPeerConnections().length > 0)) {
215 // edge peers with no rdv should not send disconnect.
216 sendDisconnect(peerid, null);
219 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
220 LOG.fine("Received " + message + " (" + propHdr.getMsgId() + ") from " + pve);
224 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
225 LOG.fine("Received " + message + " (" + propHdr.getMsgId() + ") from " + pConn);
229 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
230 LOG.fine("Received " + message + " (" + propHdr.getMsgId() + ") from loopback.");
234 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
235 LOG.fine("Received " + message + " (" + propHdr.getMsgId() + ") from network -- repropagating with TTL 2");
238 propHdr.setTTL(Math.min(propHdr.getTTL(), 3)); // will be reduced during repropagate stage.
240 super.processReceivedMessage(message, propHdr, srcAddr, dstAddr);
247 public void propagate(Enumeration<? extends ID> destPeerIDs, Message msg, String serviceName, String serviceParam, int initialTTL) {
249 int useTTL = Math.min(initialTTL, MAX_TTL);
251 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
253 "Propagating " + msg + "(TTL=" + useTTL + ") to :" + "\n\tsvc name:" + serviceName + "\tsvc params:"
257 RendezVousPropagateMessage propHdr = updatePropHeader(msg, getPropHeader(msg), serviceName, serviceParam, useTTL);
259 if (null != propHdr) {
263 while (destPeerIDs.hasMoreElements()) {
264 ID dest = destPeerIDs.nextElement();
267 PeerConnection pConn = getPeerConnection(dest);
269 // TODO: make use of PeerView connections as well
271 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
272 LOG.fine("Sending " + msg + " (" + propHdr.getMsgId() + ") to " + dest);
275 EndpointAddress addr = mkAddress(dest, PropSName, PropPName);
277 Messenger messenger = rdvService.endpoint.getMessengerImmediate(addr, null);
279 if (null != messenger) {
281 messenger.sendMessage(msg);
282 } catch (IOException ignored) {
289 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
290 LOG.fine("Sending " + msg + " (" + propHdr.getMsgId() + ") to " + pConn);
293 if (pConn.isConnected()) {
294 pConn.sendMessage(msg.clone(), PropSName, PropPName);
300 } catch (Exception failed) {
301 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
302 LOG.warning("Failed to send " + msg + " (" + propHdr.getMsgId() + ") to " + dest);
307 if (RendezvousMeterBuildSettings.RENDEZVOUS_METERING && (rendezvousMeter != null)) {
308 rendezvousMeter.propagateToPeers(numPeers);
311 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
312 LOG.fine("Propagated " + msg + " (" + propHdr.getMsgId() + ") to " + numPeers + " peers.");
316 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
317 LOG.fine("Declined to send " + msg + " ( no propHdr )");
326 public void propagateToNeighbors(Message msg, String serviceName, String serviceParam, int initialTTL) throws IOException {
328 int useTTL = Math.min(initialTTL, MAX_TTL);
330 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
331 LOG.fine("Propagating " + msg + "(TTL=" + useTTL + ") to neighbors to :" + "\n\tsvc name:" + serviceName+ "\tsvc params:" + serviceParam);
334 RendezVousPropagateMessage propHdr = updatePropHeader(msg, getPropHeader(msg), serviceName, serviceParam, useTTL);
336 if (null != propHdr) {
338 sendToNetwork(msg, propHdr);
340 if (RendezvousMeterBuildSettings.RENDEZVOUS_METERING && (rendezvousMeter != null)) {
341 rendezvousMeter.propagateToNeighbors();
343 } catch (IOException failed) {
344 if (RendezvousMeterBuildSettings.RENDEZVOUS_METERING && (rendezvousMeter != null)) {
345 rendezvousMeter.propagateToNeighborsFailed();
357 protected void repropagate(Message msg, RendezVousPropagateMessage propHdr, String serviceName, String serviceParam) {
360 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
361 LOG.fine("Repropagating " + msg + " (" + propHdr.getMsgId() + ")");
364 if (RendezvousMeterBuildSettings.RENDEZVOUS_METERING && (rendezvousMeter != null)) {
365 rendezvousMeter.receivedMessageRepropagatedInGroup();
369 propHdr = updatePropHeader(msg, propHdr, serviceName, serviceParam, MAX_TTL);
371 if (null != propHdr) {
372 // Note (hamada): This is an unnecessary operation, and serves
373 // no purpose other than the additional loads it imposes on the
374 // rendezvous. Local subnet network operations should be (and are)
375 // sufficient to achieve the goal.
376 // sendToEachConnection(msg, propHdr);
377 sendToNetwork(msg, propHdr);
379 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
380 LOG.fine("No propagate header, declining to repropagate " + msg + ")");
383 } catch (Exception ez1) {
384 // Not much we can do
385 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
386 if (propHdr != null) {
387 LOG.log(Level.WARNING, "Failed to repropagate " + msg + " (" + propHdr.getMsgId() + ")", ez1);
389 LOG.log(Level.WARNING, "Could to repropagate " + msg, ez1);
396 * Returns the peer connection or null if not present.
398 * @param id the node ID
399 * @return PeerConnection the peer connection or null if not present.
401 public abstract PeerConnection getPeerConnection(ID id);
404 * Returns an array of the current peer connections.
406 * @return An array of the current peer connections.
408 protected abstract PeerConnection[] getPeerConnections();
411 * Sends to all connected peers.
413 * Note: The original msg is not modified and may be reused upon return.
415 * @param msg The message to be sent.
416 * @param propHdr The propagation header associated with the message.
417 * @return the number of nodes the message was sent to
419 protected int sendToEachConnection(Message msg, RendezVousPropagateMessage propHdr) {
420 List<PeerConnection> peers = Arrays.asList(getPeerConnections());
423 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
424 LOG.fine("Sending " + msg + "(" + propHdr.getMsgId() + ") to " + peers.size() + " peers.");
427 for (PeerConnection pConn : peers) {
428 // Check if this rendezvous has already processed this propagated message.
429 if (!pConn.isConnected()) {
430 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
431 LOG.fine("Skipping " + pConn + " for " + msg + "(" + propHdr.getMsgId() + ") -- disconnected.");
437 if (propHdr.isVisited(pConn.getPeerID().toURI())) {
438 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
439 LOG.fine("Skipping " + pConn + " for " + msg + "(" + propHdr.getMsgId() + ") -- already visited.");
445 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
446 LOG.fine("Sending " + msg + "(" + propHdr.getMsgId() + ") to " + pConn);
449 if (pConn.sendMessage(msg.clone(), PropSName, PropPName)) {
454 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
455 LOG.fine("Sent " + msg + "(" + propHdr.getMsgId() + ") to " + sentToPeers + " of " + peers.size() + " peers.");
462 * Sends a disconnect message to the specified peer.
464 * @param peerid The peer to be disconnected.
465 * @param padv The peer to be disconnected.
467 protected void sendDisconnect(ID peerid, PeerAdvertisement padv) {
469 Message msg = new Message();
471 // The request simply includes the local peer advertisement.
473 msg.replaceMessageElement("jxta", new TextDocumentMessageElement(DisconnectRequest, getPeerAdvertisementDoc(), null));
475 EndpointAddress addr = mkAddress(peerid, null, null);
477 RouteAdvertisement hint = null;
480 hint = EndpointUtils.extractRouteAdv(padv);
483 Messenger messenger = rdvService.endpoint.getMessengerImmediate(addr, hint);
485 if (null == messenger) {
486 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
487 LOG.warning("Could not get messenger for " + peerid);
492 messenger.sendMessage(msg, pName, pParam);
493 } catch (Exception e) {
494 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
495 LOG.log(Level.WARNING, "sendDisconnect failed", e);
501 * Sends a disconnect message to the specified peer.
503 * @param pConn The peer to be disconnected.
505 protected void sendDisconnect(PeerConnection pConn) {
507 Message msg = new Message();
509 // The request simply includes the local peer advertisement.
511 msg.replaceMessageElement("jxta", new TextDocumentMessageElement(DisconnectRequest, getPeerAdvertisementDoc(), null));
513 pConn.sendMessage(msg, pName, pParam);
514 } catch (Exception e) {
515 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
516 LOG.log(Level.WARNING, "sendDisconnect failed", e);