]> sjero.net Git - linphone/blob - p2pproxy/dependencies-src/jxse-src-2.5/impl/src/net/jxta/impl/rendezvous/StdRendezVousService.java
f0c1153f9a2587a24d630c537013eb2e97cfe758
[linphone] / p2pproxy / dependencies-src / jxse-src-2.5 / impl / src / net / jxta / impl / rendezvous / StdRendezVousService.java
1 /*
2  * Copyright (c) 2001-2007 Sun Microsystems, Inc.  All rights reserved.
3  *  
4  *  The Sun Project JXTA(TM) Software License
5  *  
6  *  Redistribution and use in source and binary forms, with or without 
7  *  modification, are permitted provided that the following conditions are met:
8  *  
9  *  1. Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *  
12  *  2. Redistributions in binary form must reproduce the above copyright notice, 
13  *     this list of conditions and the following disclaimer in the documentation 
14  *     and/or other materials provided with the distribution.
15  *  
16  *  3. The end-user documentation included with the redistribution, if any, must 
17  *     include the following acknowledgment: "This product includes software 
18  *     developed by Sun Microsystems, Inc. for JXTA(TM) technology." 
19  *     Alternately, this acknowledgment may appear in the software itself, if 
20  *     and wherever such third-party acknowledgments normally appear.
21  *  
22  *  4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must 
23  *     not be used to endorse or promote products derived from this software 
24  *     without prior written permission. For written permission, please contact 
25  *     Project JXTA at http://www.jxta.org.
26  *  
27  *  5. Products derived from this software may not be called "JXTA", nor may 
28  *     "JXTA" appear in their name, without prior written permission of Sun.
29  *  
30  *  THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
31  *  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND 
32  *  FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN 
33  *  MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
34  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 
35  *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, 
36  *  OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
37  *  LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
38  *  NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, 
39  *  EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
40  *  
41  *  JXTA is a registered trademark of Sun Microsystems, Inc. in the United 
42  *  States and other countries.
43  *  
44  *  Please see the license information page at :
45  *  <http://www.jxta.org/project/www/license.html> for instructions on use of 
46  *  the license in source files.
47  *  
48  *  ====================================================================
49  *  
50  *  This software consists of voluntary contributions made by many individuals 
51  *  on behalf of Project JXTA. For more information on Project JXTA, please see 
52  *  http://www.jxta.org.
53  *  
54  *  This license is based on the BSD license adopted by the Apache Foundation. 
55  */
56 package net.jxta.impl.rendezvous;
57
58 import net.jxta.endpoint.EndpointAddress;
59 import net.jxta.endpoint.EndpointListener;
60 import net.jxta.endpoint.Message;
61 import net.jxta.endpoint.Messenger;
62 import net.jxta.endpoint.TextDocumentMessageElement;
63 import net.jxta.id.ID;
64 import net.jxta.id.IDFactory;
65 import net.jxta.impl.endpoint.EndpointUtils;
66 import net.jxta.impl.rendezvous.rdv.RdvPeerRdvService;
67 import net.jxta.impl.rendezvous.rendezvousMeter.RendezvousMeterBuildSettings;
68 import net.jxta.impl.rendezvous.rpv.PeerViewElement;
69 import net.jxta.logging.Logging;
70 import net.jxta.peergroup.PeerGroup;
71 import net.jxta.protocol.PeerAdvertisement;
72 import net.jxta.protocol.RouteAdvertisement;
73
74 import java.io.IOException;
75 import java.net.URI;
76 import java.net.URISyntaxException;
77 import java.util.Arrays;
78 import java.util.Enumeration;
79 import java.util.List;
80 import java.util.Timer;
81 import java.util.logging.Level;
82 import java.util.logging.Logger;
83
84 /**
85  * Base class for providers which implement the JXTA Standard Rendezvous
86  * Protocol.
87  *
88  * @see <a href="https://jxta-spec.dev.java.net/nonav/JXTAProtocols.html#proto-rvp" target="_blank">JXTA Protocols Specification : Rendezvous Protocol</a>
89  */
90 public abstract class StdRendezVousService extends RendezVousServiceProvider {
91
92     /**
93      * Logger
94      */
95     private final static Logger LOG = Logger.getLogger(StdRendezVousService.class.getName());
96
97     public final static String ConnectRequest = "Connect";
98     public final static String DisconnectRequest = "Disconnect";
99     public final static String ConnectedPeerReply = "ConnectedPeer";
100     public final static String ConnectedLeaseReply = "ConnectedLease";
101     public final static String ConnectedRdvAdvReply = "RdvAdvReply";
102
103     /**
104      * Default Maximum TTL.
105      */
106     protected static final int DEFAULT_MAX_TTL = 200;
107
108     protected final String pName;
109     protected final String pParam;
110
111     /**
112      * The registered handler for messages using the Standard Rendezvous
113      * Protocol.
114      *
115      * @see <a href="https://jxta-spec.dev.java.net/nonav/JXTAProtocols.html#proto-rvp" target="_blank">JXTA Protocols Specification : Rendezvous Protocol
116      */
117     private StdRdvProtocolListener handler;
118
119     protected final Timer timer;
120
121     /**
122      * Interface for listeners to : &lt;assignedID>/<group-unique>
123      */
124     protected interface StdRdvProtocolListener extends EndpointListener {}
125
126     /**
127      * Constructor
128      *
129      * @param group      the PeerGroup
130      * @param rdvService the parent rendezvous service
131      */
132     protected StdRendezVousService(PeerGroup group, RendezVousServiceImpl rdvService) {
133
134         super(group, rdvService);
135
136         MAX_TTL = DEFAULT_MAX_TTL;
137
138         pName = rdvService.getAssignedID().toString();
139         pParam = group.getPeerGroupID().getUniqueValue().toString();
140
141         timer = new Timer("StdRendezVousService Timer for " + group.getPeerGroupID(), true);
142     }
143
144     /**
145      * {@inheritDoc}
146      */
147     protected int startApp(String[] argv, StdRdvProtocolListener handler) {
148
149         this.handler = handler;
150
151         rdvService.endpoint.addIncomingMessageListener(handler, pName, null);
152
153         return super.startApp(argv);
154     }
155
156     /**
157      * {@inheritDoc}
158      */
159     @Override
160     public void stopApp() {
161         EndpointListener shouldbehandler = rdvService.endpoint.removeIncomingMessageListener(pName, null);
162
163         if (handler != shouldbehandler) {
164             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
165                 LOG.warning("Unregistered listener was not as expected." + handler + " != " + shouldbehandler);
166             }
167         }
168
169         timer.cancel();
170
171         super.stopApp();
172     }
173
174     /**
175      * {@inheritDoc}
176      */
177     @Override
178     public void processReceivedMessage(Message message, RendezVousPropagateMessage propHdr, EndpointAddress srcAddr, EndpointAddress dstAddr) {
179
180         if (srcAddr.getProtocolName().equalsIgnoreCase("jxta")) {
181             String idstr = ID.URIEncodingName + ":" + ID.URNNamespace + ":" + srcAddr.getProtocolAddress();
182
183             ID peerid;
184             try {
185                 peerid = IDFactory.fromURI(new URI(idstr));
186             } catch (URISyntaxException badID) {
187                 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
188                     LOG.log(Level.WARNING, "Bad ID in message", badID);
189                 }
190                 return;
191             }
192
193             if (!group.getPeerID().equals(peerid)) {
194                 PeerConnection pConn = getPeerConnection(peerid);
195
196                 if (null == pConn) {
197                     PeerViewElement pve;
198
199                     if (this instanceof RdvPeerRdvService) {
200                         // cheap hack....
201                         pve = ((RdvPeerRdvService) this).rpv.getPeerViewElement(peerid);
202                     } else {
203                         pve = null;
204                     }
205
206                     if (null == pve) {
207                         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
208                             LOG.fine("Received " + message + " (" + propHdr.getMsgId() + ") from unrecognized peer : " + peerid);
209                         }
210
211                         propHdr.setTTL(Math.min(propHdr.getTTL(), 3)); // will be reduced during repropagate stage.
212
213                         // FIXME 20040503 bondolo need to add tombstones so that we don't end up spamming disconnects.
214                         if (rdvService.isRendezVous() || (getPeerConnections().length > 0)) {
215                             // edge peers with no rdv should not send disconnect.
216                             sendDisconnect(peerid, null);
217                         }
218                     } else {
219                         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
220                             LOG.fine("Received " + message + " (" + propHdr.getMsgId() + ") from " + pve);
221                         }
222                     }
223                 } else {
224                     if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
225                         LOG.fine("Received " + message + " (" + propHdr.getMsgId() + ") from " + pConn);
226                     }
227                 }
228             } else {
229                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
230                     LOG.fine("Received " + message + " (" + propHdr.getMsgId() + ") from loopback.");
231                 }
232             }
233         } else {
234             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
235                 LOG.fine("Received " + message + " (" + propHdr.getMsgId() + ") from network -- repropagating with TTL 2");
236             }
237
238             propHdr.setTTL(Math.min(propHdr.getTTL(), 3)); // will be reduced during repropagate stage.
239         }
240         super.processReceivedMessage(message, propHdr, srcAddr, dstAddr);
241     }
242
243     /**
244      * {@inheritDoc}
245      */
246     @Override
247     public void propagate(Enumeration<? extends ID> destPeerIDs, Message msg, String serviceName, String serviceParam, int initialTTL) {
248         msg = msg.clone();
249         int useTTL = Math.min(initialTTL, MAX_TTL);
250
251         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
252             LOG.fine(
253                     "Propagating " + msg + "(TTL=" + useTTL + ") to :" + "\n\tsvc name:" + serviceName + "\tsvc params:"
254                     + serviceParam);
255         }
256
257         RendezVousPropagateMessage propHdr = updatePropHeader(msg, getPropHeader(msg), serviceName, serviceParam, useTTL);
258
259         if (null != propHdr) {
260             int numPeers = 0;
261
262             try {
263                 while (destPeerIDs.hasMoreElements()) {
264                     ID dest = destPeerIDs.nextElement();
265
266                     try {
267                         PeerConnection pConn = getPeerConnection(dest);
268
269                         // TODO: make use of PeerView connections as well
270                         if (null == pConn) {
271                             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
272                                 LOG.fine("Sending " + msg + " (" + propHdr.getMsgId() + ") to " + dest);
273                             }
274
275                             EndpointAddress addr = mkAddress(dest, PropSName, PropPName);
276
277                             Messenger messenger = rdvService.endpoint.getMessengerImmediate(addr, null);
278
279                             if (null != messenger) {
280                                 try {
281                                     messenger.sendMessage(msg);
282                                 } catch (IOException ignored) {
283                                     continue;
284                                 }
285                             } else {
286                                 continue;
287                             }
288                         } else {
289                             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
290                                 LOG.fine("Sending " + msg + " (" + propHdr.getMsgId() + ") to " + pConn);
291                             }
292
293                             if (pConn.isConnected()) {
294                                 pConn.sendMessage(msg.clone(), PropSName, PropPName);
295                             } else {
296                                 continue;
297                             }
298                         }
299                         numPeers++;
300                     } catch (Exception failed) {
301                         if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
302                             LOG.warning("Failed to send " + msg + " (" + propHdr.getMsgId() + ") to " + dest);
303                         }
304                     }
305                 }
306             } finally {
307                 if (RendezvousMeterBuildSettings.RENDEZVOUS_METERING && (rendezvousMeter != null)) {
308                     rendezvousMeter.propagateToPeers(numPeers);
309                 }
310
311                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
312                     LOG.fine("Propagated " + msg + " (" + propHdr.getMsgId() + ") to " + numPeers + " peers.");
313                 }
314             }
315         } else {
316             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
317                 LOG.fine("Declined to send " + msg + " ( no propHdr )");
318             }
319         }
320     }
321
322     /**
323      * {@inheritDoc}
324      */
325     @Override
326     public void propagateToNeighbors(Message msg, String serviceName, String serviceParam, int initialTTL) throws IOException {
327         msg = msg.clone();
328         int useTTL = Math.min(initialTTL, MAX_TTL);
329
330         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
331             LOG.fine("Propagating " + msg + "(TTL=" + useTTL + ") to neighbors to :" + "\n\tsvc name:" + serviceName+ "\tsvc params:" + serviceParam);
332         }
333
334         RendezVousPropagateMessage propHdr = updatePropHeader(msg, getPropHeader(msg), serviceName, serviceParam, useTTL);
335
336         if (null != propHdr) {
337             try {
338                 sendToNetwork(msg, propHdr);
339
340                 if (RendezvousMeterBuildSettings.RENDEZVOUS_METERING && (rendezvousMeter != null)) {
341                     rendezvousMeter.propagateToNeighbors();
342                 }
343             } catch (IOException failed) {
344                 if (RendezvousMeterBuildSettings.RENDEZVOUS_METERING && (rendezvousMeter != null)) {
345                     rendezvousMeter.propagateToNeighborsFailed();
346                 }
347
348                 throw failed;
349             }
350         }
351     }
352
353     /**
354      * {@inheritDoc}
355      */
356     @Override
357     protected void repropagate(Message msg, RendezVousPropagateMessage propHdr, String serviceName, String serviceParam) {
358         msg = msg.clone();
359
360         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
361             LOG.fine("Repropagating " + msg + " (" + propHdr.getMsgId() + ")");
362         }
363
364         if (RendezvousMeterBuildSettings.RENDEZVOUS_METERING && (rendezvousMeter != null)) {
365             rendezvousMeter.receivedMessageRepropagatedInGroup();
366         }
367
368         try {
369             propHdr = updatePropHeader(msg, propHdr, serviceName, serviceParam, MAX_TTL);
370
371             if (null != propHdr) {
372                 // Note (hamada): This is an unnecessary operation, and serves
373                 // no purpose other than the additional loads it imposes on the
374                 // rendezvous.  Local subnet network operations should be (and are)
375                 // sufficient to achieve the goal.
376                 // sendToEachConnection(msg, propHdr);
377                 sendToNetwork(msg, propHdr);
378             } else {
379                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
380                     LOG.fine("No propagate header, declining to repropagate " + msg + ")");
381                 }
382             }
383         } catch (Exception ez1) {
384             // Not much we can do
385             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
386                 if (propHdr != null) {
387                     LOG.log(Level.WARNING, "Failed to repropagate " + msg + " (" + propHdr.getMsgId() + ")", ez1);
388                 } else {
389                     LOG.log(Level.WARNING, "Could to repropagate " + msg, ez1);
390                 }
391             }
392         }
393     }
394
395     /**
396      * Returns the peer connection or null if not present.
397      *
398      * @param id the node ID
399      * @return PeerConnection the peer connection or null if not present.
400      */
401     public abstract PeerConnection getPeerConnection(ID id);
402
403     /**
404      * Returns an array of the current peer connections.
405      *
406      * @return An array of the current peer connections.
407      */
408     protected abstract PeerConnection[] getPeerConnections();
409
410     /**
411      * Sends to all connected peers.
412      * <p/>
413      * Note: The original msg is not modified and may be reused upon return.
414      *
415      * @param msg     The message to be sent.
416      * @param propHdr The propagation header associated with the message.
417      * @return the number of nodes the message was sent to
418      */
419     protected int sendToEachConnection(Message msg, RendezVousPropagateMessage propHdr) {
420         List<PeerConnection> peers = Arrays.asList(getPeerConnections());
421         int sentToPeers = 0;
422
423         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
424             LOG.fine("Sending " + msg + "(" + propHdr.getMsgId() + ") to " + peers.size() + " peers.");
425         }
426
427         for (PeerConnection pConn : peers) {
428             // Check if this rendezvous has already processed this propagated message.
429             if (!pConn.isConnected()) {
430                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
431                     LOG.fine("Skipping " + pConn + " for " + msg + "(" + propHdr.getMsgId() + ") -- disconnected.");
432                 }
433                 // next!
434                 continue;
435             }
436
437             if (propHdr.isVisited(pConn.getPeerID().toURI())) {
438                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
439                     LOG.fine("Skipping " + pConn + " for " + msg + "(" + propHdr.getMsgId() + ") -- already visited.");
440                 }
441                 // next!
442                 continue;
443             }
444
445             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
446                 LOG.fine("Sending " + msg + "(" + propHdr.getMsgId() + ") to " + pConn);
447             }
448
449             if (pConn.sendMessage(msg.clone(), PropSName, PropPName)) {
450                 sentToPeers++;
451             }
452         }
453
454         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
455             LOG.fine("Sent " + msg + "(" + propHdr.getMsgId() + ") to " + sentToPeers + " of " + peers.size() + " peers.");
456         }
457
458         return sentToPeers;
459     }
460
461     /**
462      * Sends a disconnect message to the specified peer.
463      *
464      * @param peerid The peer to be disconnected.
465      * @param padv   The peer to be disconnected.
466      */
467     protected void sendDisconnect(ID peerid, PeerAdvertisement padv) {
468
469         Message msg = new Message();
470
471         // The request simply includes the local peer advertisement.
472         try {
473             msg.replaceMessageElement("jxta", new TextDocumentMessageElement(DisconnectRequest, getPeerAdvertisementDoc(), null));
474
475             EndpointAddress addr = mkAddress(peerid, null, null);
476
477             RouteAdvertisement hint = null;
478
479             if (null != padv) {
480                 hint = EndpointUtils.extractRouteAdv(padv);
481             }
482
483             Messenger messenger = rdvService.endpoint.getMessengerImmediate(addr, hint);
484
485             if (null == messenger) {
486                 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
487                     LOG.warning("Could not get messenger for " + peerid);
488                 }
489                 return;
490             }
491
492             messenger.sendMessage(msg, pName, pParam);
493         } catch (Exception e) {
494             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
495                 LOG.log(Level.WARNING, "sendDisconnect failed", e);
496             }
497         }
498     }
499
500     /**
501      * Sends a disconnect message to the specified peer.
502      *
503      * @param pConn The peer to be disconnected.
504      */
505     protected void sendDisconnect(PeerConnection pConn) {
506
507         Message msg = new Message();
508
509         // The request simply includes the local peer advertisement.
510         try {
511             msg.replaceMessageElement("jxta", new TextDocumentMessageElement(DisconnectRequest, getPeerAdvertisementDoc(), null));
512
513             pConn.sendMessage(msg, pName, pParam);
514         } catch (Exception e) {
515             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
516                 LOG.log(Level.WARNING, "sendDisconnect failed", e);
517             }
518         }
519     }
520 }