]> sjero.net Git - linphone/blob - p2pproxy/dependencies-src/jxse-src-2.5/impl/src/net/jxta/impl/endpoint/relay/RelayServer.java
c62b150c853f303d77883ccc83e1f3ea8c4dc3b5
[linphone] / p2pproxy / dependencies-src / jxse-src-2.5 / impl / src / net / jxta / impl / endpoint / relay / RelayServer.java
1 /*
2  * Copyright (c) 2001-2007 Sun Microsystems, Inc.  All rights reserved.
3  *  
4  *  The Sun Project JXTA(TM) Software License
5  *  
6  *  Redistribution and use in source and binary forms, with or without 
7  *  modification, are permitted provided that the following conditions are met:
8  *  
9  *  1. Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *  
12  *  2. Redistributions in binary form must reproduce the above copyright notice, 
13  *     this list of conditions and the following disclaimer in the documentation 
14  *     and/or other materials provided with the distribution.
15  *  
16  *  3. The end-user documentation included with the redistribution, if any, must 
17  *     include the following acknowledgment: "This product includes software 
18  *     developed by Sun Microsystems, Inc. for JXTA(TM) technology." 
19  *     Alternately, this acknowledgment may appear in the software itself, if 
20  *     and wherever such third-party acknowledgments normally appear.
21  *  
22  *  4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must 
23  *     not be used to endorse or promote products derived from this software 
24  *     without prior written permission. For written permission, please contact 
25  *     Project JXTA at http://www.jxta.org.
26  *  
27  *  5. Products derived from this software may not be called "JXTA", nor may 
28  *     "JXTA" appear in their name, without prior written permission of Sun.
29  *  
30  *  THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
31  *  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND 
32  *  FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN 
33  *  MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
34  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 
35  *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, 
36  *  OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
37  *  LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
38  *  NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, 
39  *  EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
40  *  
41  *  JXTA is a registered trademark of Sun Microsystems, Inc. in the United 
42  *  States and other countries.
43  *  
44  *  Please see the license information page at :
45  *  <http://www.jxta.org/project/www/license.html> for instructions on use of 
46  *  the license in source files.
47  *  
48  *  ====================================================================
49  *  
50  *  This software consists of voluntary contributions made by many individuals 
51  *  on behalf of Project JXTA. For more information on Project JXTA, please see 
52  *  http://www.jxta.org.
53  *  
54  *  This license is based on the BSD license adopted by the Apache Foundation. 
55  */
56 package net.jxta.impl.endpoint.relay;
57
58 import java.io.File;
59 import java.io.IOException;
60 import java.net.URI;
61 import java.util.ArrayList;
62 import java.util.Arrays;
63 import java.util.Collections;
64 import java.util.HashMap;
65 import java.util.List;
66 import java.util.Map;
67 import java.util.NoSuchElementException;
68 import java.util.Random;
69
70 import net.jxta.discovery.DiscoveryService;
71 import net.jxta.document.Advertisement;
72 import net.jxta.document.AdvertisementFactory;
73 import net.jxta.document.MimeMediaType;
74 import net.jxta.document.XMLDocument;
75 import net.jxta.endpoint.EndpointAddress;
76 import net.jxta.endpoint.EndpointService;
77 import net.jxta.endpoint.Message;
78 import net.jxta.endpoint.MessageElement;
79 import net.jxta.endpoint.MessageSender;
80 import net.jxta.endpoint.Messenger;
81 import net.jxta.endpoint.MessengerEvent;
82 import net.jxta.endpoint.MessengerEventListener;
83 import net.jxta.endpoint.TextDocumentMessageElement;
84 import net.jxta.id.ID;
85 import net.jxta.id.IDFactory;
86 import net.jxta.impl.access.AccessList;
87 import net.jxta.impl.protocol.RelayConfigAdv;
88 import net.jxta.impl.util.TimeUtils;
89 import net.jxta.peer.PeerID;
90 import net.jxta.peergroup.PeerGroup;
91 import net.jxta.peergroup.PeerGroupID;
92 import net.jxta.pipe.InputPipe;
93 import net.jxta.pipe.OutputPipe;
94 import net.jxta.pipe.PipeMsgEvent;
95 import net.jxta.pipe.PipeMsgListener;
96 import net.jxta.pipe.PipeService;
97 import net.jxta.protocol.PeerAdvertisement;
98 import net.jxta.protocol.PipeAdvertisement;
99 import net.jxta.protocol.RdvAdvertisement;
100 import net.jxta.protocol.RouteAdvertisement;
101
102 import java.util.logging.Level;
103 import net.jxta.logging.Logging;
104 import java.util.logging.Logger;
105 import net.jxta.impl.endpoint.EndpointUtils;
106
107 /**
108  * Relay server that maintains outgoing message queues, leases, etc.
109  */
110 public class RelayServer implements MessageSender, MessengerEventListener, Runnable {
111     
112     /**
113      *  Logger
114      */
115     private final static transient Logger LOG = Logger.getLogger(RelayServer.class.getName());
116     
117     private final static int MAX_CACHED_SERVERS = 20;
118     
119     /**
120      * The EndpointService for the RelayService
121      */
122     private EndpointService endpointService;
123     
124     /**
125      * The DiscoveryService for the RelayService
126      */
127     private DiscoveryService discoveryService;
128     
129     /**
130      * The public address is of the form relay://peerId
131      */
132     private final EndpointAddress publicAddress;
133     
134     /**
135      *  Map of the current clients
136      */
137     private final Map<String, RelayServerClient> relayedClients = new HashMap<String, RelayServerClient>();
138     
139     protected final PeerGroup group;
140     protected final String serviceName;
141     private final int maxClients;
142     private final long maxLeaseDuration;
143     private final long stallTimeout;
144     private final int clientQueueSize;
145     private final long minBroadcastInterval;
146     
147     protected final String peerId;
148
149     protected final AccessList acl;
150     protected File aclFile;
151     protected long refreshTime = 0;
152
153     protected long aclFileLastModified = 0;
154     private static final long ACL_REFRESH_PERIOD = 30 * TimeUtils.AMINUTE;
155     
156     protected RelayServerCache relayServerCache;
157     
158     private Thread gcThread = null;
159
160     private MessengerEventListener messengerEventListener = null;
161     
162     /**
163      * constructor
164      */
165     public RelayServer(PeerGroup group, String serviceName, RelayConfigAdv relayConfigAdv) {
166         
167         this.group = group;
168         peerId = group.getPeerID().getUniqueValue().toString();
169         publicAddress = new EndpointAddress(RelayTransport.protocolName, peerId, null, null);
170         
171         this.serviceName = serviceName;
172         
173         this.maxClients = (-1 != relayConfigAdv.getMaxClients())
174                 ? relayConfigAdv.getMaxClients()
175                 : RelayTransport.DEFAULT_MAX_CLIENTS;
176         this.clientQueueSize = (-1 != relayConfigAdv.getClientMessageQueueSize())
177                 ? relayConfigAdv.getClientMessageQueueSize()
178                 : RelayTransport.DEFAULT_CLIENT_QUEUE_SIZE;
179         this.maxLeaseDuration = (-1 != relayConfigAdv.getServerLeaseDuration())
180                 ? relayConfigAdv.getServerLeaseDuration()
181                 : RelayTransport.DEFAULT_LEASE;
182         this.minBroadcastInterval = (-1 != relayConfigAdv.getAnnounceInterval())
183                 ? relayConfigAdv.getAnnounceInterval()
184                 : RelayTransport.DEFAULT_BROADCAST_INTERVAL;
185         this.stallTimeout = (-1 != relayConfigAdv.getStallTimeout())
186                 ? relayConfigAdv.getStallTimeout()
187                 : RelayTransport.DEFAULT_STALL_TIMEOUT;
188
189         aclFile = new File(new File(group.getStoreHome()), "relayACL.xml");
190         aclFileLastModified = aclFile.lastModified();
191         this.acl = new AccessList();
192         try {
193             acl.init(aclFile);
194             this.refreshTime = System.currentTimeMillis() + ACL_REFRESH_PERIOD;
195         } catch (IOException io) {
196             acl.setGrantAll(true);
197             this.refreshTime = Long.MAX_VALUE;
198             if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
199                 LOG.info("RelayServer Access Control granting all permissions");
200             }
201         }
202         
203         if (Logging.SHOW_CONFIG && LOG.isLoggable(Level.CONFIG)) {
204             StringBuilder configInfo = new StringBuilder("Configuring Relay Server");
205
206             configInfo.append("\n\tGroup Params :");
207             configInfo.append("\n\t\tGroup : ").append(group);
208             configInfo.append("\n\t\tPeer ID : ").append(group.getPeerID());
209
210             configInfo.append("\n\tConfiguration :");
211             configInfo.append("\n\t\tService Name : ").append(serviceName);
212             configInfo.append("\n\t\tMax Relay Clients : ").append(maxClients);
213             configInfo.append("\n\t\tMax Lease Length : ").append(maxLeaseDuration).append("ms.");
214             configInfo.append("\n\t\tBroadcast Interval : ").append(minBroadcastInterval).append("ms.");
215             configInfo.append("\n\t\tStall Timeout : ").append(stallTimeout).append("ms.");
216             
217             LOG.config(configInfo.toString());
218         }
219     }
220     
221     /**
222      * Debug routine: returns the list of relayedClients with details.
223      */
224     public List<String> getRelayedClients() {
225         List<String> res = new ArrayList<String>();
226
227         for (Object o : Arrays.asList(relayedClients.values().toArray())) {
228             String client = o.toString();
229
230             res.add(client);
231         }
232
233         return res;
234     }
235     
236     public boolean startServer() {
237         
238         if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
239             LOG.info("Starting " + publicAddress.toString());
240         }
241         
242         endpointService = group.getEndpointService();
243         discoveryService = group.getDiscoveryService();
244         
245         if ((messengerEventListener = endpointService.addMessageTransport(this)) == null) {
246             if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
247                 LOG.severe("Transport registration refused");
248             }
249             return false;
250         }
251         
252         try {
253             discoveryService.publish(createRdvAdvertisement(group.getPeerAdvertisement(), serviceName));
254         } catch (IOException e) {
255             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
256                 LOG.log(Level.WARNING, "Could not publish Relay RdvAdvertisement", e);
257             }
258         }
259         
260         relayServerCache = new RelayServerCache(this);
261         
262         // start cache relay servers
263         relayServerCache.startCache();
264         
265         endpointService.addMessengerEventListener(this, EndpointService.HighPrecedence);
266         
267         if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
268             LOG.info("Relay Server started");
269         }
270         return true;
271     }
272
273     public void stopServer() {
274         // stop cache relay servers
275         relayServerCache.stopCache();
276         relayServerCache = null;
277         
278         // remove messenger events listener since we do not have any clients
279         endpointService.removeMessengerEventListener(this, EndpointService.HighPrecedence);
280         
281         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
282             LOG.fine("Messenger Event Listener removed " + serviceName);
283         }
284         
285         // Close all clients.
286         // Get a list of the clients but leave them in the map;
287         // they remove themselves by calling removeClient(this).
288         // That's why we do not iterate through the real map to close them.
289         
290         RelayServerClient[] oldClients;
291         
292         synchronized (relayedClients) {
293             oldClients = relayedClients.values().toArray(new RelayServerClient[0]);
294         }
295         
296         int i = oldClients.length;
297         
298         while (i-- > 0) {
299             oldClients[i].closeClient();
300         }
301         
302         if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
303             LOG.info("Stopped " + publicAddress);
304         }        
305     }
306     
307     /*
308      * Methods inherited from MessageSender
309      */
310     
311     /**
312      * {@inheritDoc}
313      */
314     public EndpointAddress getPublicAddress() {
315         return publicAddress;
316     }
317     
318     /**
319      * {@inheritDoc}
320      */
321     public boolean isConnectionOriented() {
322         return true;
323     }
324     
325     /**
326      * {@inheritDoc}
327      */
328     public boolean allowsRouting() {
329         return true;
330     }
331     
332     /**
333      * {@inheritDoc}
334      */
335     public Object transportControl(Object operation, Object Value) {
336         return null;
337     }
338     
339     /**
340      * {@inheritDoc}
341      */
342     public Messenger getMessenger(EndpointAddress destAddr, Object hintIgnored) {
343         Messenger messenger = null;
344         
345         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
346             LOG.fine("getMessenger for dest " + destAddr);
347         }
348         
349         if (!RelayTransport.protocolName.equals(destAddr.getProtocolName())) {
350             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
351                 LOG.warning("could not make messenger for protocol :" + destAddr.getProtocolName());
352             }
353             
354             return null;
355         }
356         
357         // check if we have a queue for this client
358         RelayServerClient handler = getClient(destAddr.getProtocolAddress());
359         
360         if (handler != null) {
361             messenger = handler.getMessenger(publicAddress, destAddr, false);
362         }
363         
364         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
365             LOG.fine("messenger for " + destAddr.getProtocolAddress() + " is " + messenger);
366         }
367         
368         return messenger;
369     }
370
371     /**
372      * {@inheritDoc}
373      */
374     @Deprecated
375     public boolean ping(EndpointAddress addr) {
376         
377         synchronized (relayedClients) {
378             return (null != relayedClients.get(addr.getProtocolAddress()));
379         }
380     }
381     
382     /*
383      * Methods inherited from MessageTransport
384      */
385     
386     /**
387      * {@inheritDoc}
388      */
389     public String getProtocolName() {
390         return RelayTransport.protocolName;
391     }
392     
393     /**
394      * {@inheritDoc}
395      */
396     public EndpointService getEndpointService() {
397         return endpointService;
398     }
399     
400     /**
401      * {@inheritDoc}
402      */
403     public boolean messengerReady(MessengerEvent event) {
404         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
405             LOG.fine("messengerReady");
406         }
407         
408         Messenger newMessenger = event.getMessenger();
409         Object source = event.getSource();
410         EndpointAddress connectionAddress = event.getConnectionAddress();
411         
412         // Sanity check, this should not happen
413         if (newMessenger == null || source == null || connectionAddress == null) {
414             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
415                 LOG.fine("there was not a messenger or not enough information");
416             }
417             
418             return false;
419         }
420         
421         // We do not grab just any messenger; that would replace the
422         // existing one and then we could have a fight between the
423         // front channel and the back channel from the same peer.
424         // We only grab back-channel messengers that where explicitly
425         // directed to the relay.
426         if (!serviceName.equals(connectionAddress.getServiceName())) {
427             return false;
428         }
429         
430         // make sure that it is not a higher level messenger
431         if (source instanceof MessageSender && !((MessageSender) source).allowsRouting()) {
432             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
433                 LOG.fine("this is a higher level messenger");
434             }
435             
436             return false;
437         }
438         
439         // make sure that this is not one of our own.
440         if (source == this || newMessenger instanceof RelayServerClient.RelayMessenger) {
441             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
442                 LOG.fine("this is a relay messenger");
443             }
444             
445             return false;
446         }
447         
448         // make sure that the messenger matches a possible client address
449         EndpointAddress destAddr = newMessenger.getLogicalDestinationAddress();
450         
451         if (destAddr == null || !"jxta".equals(destAddr.getProtocolName())) {
452             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
453                 LOG.fine("LogicalDestinationAddress is not a \"jxta\" protocol");
454             }
455             
456             return false;
457         }
458         
459         // check if we have a queue for this client
460         // In that case, we just give it the handler and be done.
461         // We must not process the lease request that comes with a messenger
462         // for an existing client. If we did, we would reply with a lease
463         // response. Some connections can carry only one message and then
464         // close. In that case, the client has to re-establish the connection
465         // every time we respond. So, if we repond to all incoming connections
466         // we're going nowhere. In some cases, the client realy wants a
467         // response because it believes it is an initial connection while
468         // we still have it from a previous session. In that case, the client
469         // must try to send an explicit lease renewal message. (To which
470         // we do respond).
471         
472         String clientPeerId = destAddr.getProtocolAddress();
473         RelayServerClient handler = getClient(clientPeerId);
474         
475         if (handler != null) {
476             return handler.addMessenger(newMessenger);
477         }
478         
479         // Non-existent client. We want to process the
480         // connection request and respond.
481         // handleRequest may do whatever, but we always keep the
482         // messenger. It was meant for us anyway.
483         handleRequest(newMessenger, connectionAddress);
484         return true;
485     }
486
487     protected void handleRequest(Messenger messenger, EndpointAddress connectionAddress) {
488         
489         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
490             LOG.fine("handleRequest from messenger");
491         }
492         
493         // In this case, the request comes within the messenger's destination.
494         String request = connectionAddress.getServiceParameter();
495         
496         // make sure that the messenger shows a client logical address
497         EndpointAddress clientAddr = messenger.getLogicalDestinationAddress();
498         
499         if (clientAddr == null || !"jxta".equals(clientAddr.getProtocolName())) {
500             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
501                 LOG.fine("LogicalDestinationAddress is not a \"jxta\" protocol");
502             }
503             
504             return;
505         }
506         
507         String clientPeerId = clientAddr.getProtocolAddress();
508         
509         handleRequest(request, clientPeerId, messenger);
510     }
511     
512     protected void handleRequest(Message message, EndpointAddress dstAddr) {
513         
514         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
515             LOG.fine("handleRequest from message");
516         }
517         
518         String request = RelayTransport.getString(message, RelayTransport.REQUEST_ELEMENT);
519         String clientPeerId = dstAddr.getServiceParameter();
520         
521         handleRequest(request, clientPeerId, null);
522     }
523     
524     void handleRequest(String request, String clientPeerId, Messenger messenger) {
525         // This request may come along with a messenger (if it is a renewal
526         // post-disconnection or an initial lease request).
527         
528         if (request == null) {
529             return;
530         }
531         
532         request = request.toLowerCase();
533         
534         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
535             LOG.fine("request = " + request);
536         }
537         
538         // only process the request if a client peer id was sent
539         if (clientPeerId == null) {
540             return;
541         }
542         
543         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
544             LOG.fine("clientPeerId = " + clientPeerId);
545         }
546         
547         // The only valid anonymous request is a request to obtain a real pid.
548         if ((clientPeerId.equals("unknown-unknown")) && (!request.startsWith(RelayTransport.PID_REQUEST))) {    
549             return;
550         }
551         
552         Message responseMessage = null;
553         
554         RelayServerClient closingHandler = null;
555         boolean rawMessenger = false;
556         boolean closeMessenger = false;
557         
558         // Figure out which request it is
559         if (request.startsWith(RelayTransport.CONNECT_REQUEST)) {
560             // Connect Request
561             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
562                 LOG.fine("connect clientPeerId = " + clientPeerId);
563             }
564             
565             long requestedLease = maxLeaseDuration;
566             boolean returnRelayAdv = false;
567             boolean returnOtherRelayAdv = false;
568             boolean flushQueue = false;
569             
570             String requestedLeaseString = null;
571             
572             // check if a lease value was specified
573             int startIdx = request.indexOf(',');
574             
575             if (startIdx != -1) {
576                 // find the end of the lease value
577                 int endIdx = request.indexOf(',', startIdx + 1);
578                 
579                 if (endIdx == -1) {
580                     requestedLeaseString = request.substring(startIdx + 1);
581                 } else {
582                     requestedLeaseString = request.substring(startIdx + 1, endIdx);
583                     String flags = request.substring(endIdx + 1);
584                     
585                     if (flags.endsWith("true")) {
586                         returnRelayAdv = true;
587                     } else if (flags.endsWith("other")) {
588                         // This is an addition to the protocol. Newer
589                         // clients will always set that in connection requests
590                         // when not setting true. Only older clients use to
591                         // set nothing at all.
592                         returnOtherRelayAdv = true;
593                     }
594                     // Only two flag positions for now
595                     // The inserted first position is another extention.
596                     // Only newer clients use it. Older servers will not
597                     // notice it because they only check how the request ends.
598                     // So, new clients are also compatible with old servers.
599                     if (flags.startsWith("flush")) {
600                         flushQueue = true;
601                     }
602                 }
603             }
604             
605             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
606                 LOG.fine(
607                         "request lease string = " + requestedLeaseString + "\treturn relay adv = " + returnRelayAdv
608                         + "\n\treturn other relay adv = " + returnOtherRelayAdv + "\tflush queue = " + flushQueue);
609             }
610             
611             if (requestedLeaseString != null) {
612                 try {
613                     requestedLease = Long.parseLong(requestedLeaseString);
614                 } catch (NumberFormatException e) {
615                     if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
616                         LOG.info("could not parse requested lease string");
617                     }
618                 }
619                 
620                 if (requestedLease > maxLeaseDuration) {
621                     requestedLease = maxLeaseDuration;
622                 }
623             }
624             
625             // process the connect request
626             EndpointAddress clientAddr = new EndpointAddress("jxta", clientPeerId, serviceName, peerId);
627             
628             // If we have a messenger, the clientHandler gets it.
629             // If the client handler did not already exist, it will be
630             // created only if we pass a messenger. We can no-longer create
631             // new clients without an incoming messenger. We used to get one
632             // from the router but no-longer. Now initial lease requests must
633             // come as part of the messenger creation.
634             
635             RelayServerClient handler = addClient(clientPeerId, requestedLease, messenger, flushQueue);
636             
637             if (handler != null) {
638                 
639                 // the client was added, send a connected response
640                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
641                     LOG.fine("added client " + clientPeerId);
642                 }
643                 
644                 // Now get a messenger that goes through the handler and
645                 // sends messages out-of-band (and internal perk).
646                 // jice@jxta.org - 20021227 all this code is getting ridiculous
647                 // it has to be re-organized. Addind the outOfBand feature
648                 // to all RelayMessengers just for that is overkill. This
649                 // just a temporary patch. The real fix would be to respond
650                 // straight with the messenger we have. Unfortunately, sometimes
651                 // we have to respond without a messenger in our hands because
652                 // sending a message over an explicit connection is the only
653                 // way for existing clients to ask for a response when they
654                 // reconnect. We would need to change the protocol and add an
655                 // "initial connection" request type to fix that.
656                 
657                 messenger = handler.getMessenger(publicAddress, clientAddr, true);
658                 responseMessage = RelayTransport.createConnectedMessage(handler.getLeaseRemaining());
659                 // For protocol compatibility reasons, returnRelayAdv realy
660                 // means "return your own because I do not know it".
661                 // If returnOtherRelayAdv is true, then, we will return one
662                 // selected among those we know, for the enlightenment of the
663                 // other party.
664                 // If neither is true, we'll return no adv at all in order not
665                 // to confuse existing clients.
666                 
667                 RdvAdvertisement relayAdv = null;
668                 
669                 if (returnRelayAdv) {
670                     relayAdv = createRdvAdvertisement(group.getPeerAdvertisement(), serviceName);
671                 } else if (returnOtherRelayAdv) {
672                     relayAdv = relayServerCache.getRandomCacheAdv();
673                 }
674                 if (relayAdv != null) {
675                     XMLDocument asDoc = (XMLDocument) relayAdv.getDocument(MimeMediaType.XMLUTF8);
676                     
677                     MessageElement relayAdvElement = new TextDocumentMessageElement(RelayTransport.RELAY_ADV_ELEMENT, asDoc, null);
678
679                     responseMessage.addMessageElement(RelayTransport.RELAY_NS, relayAdvElement);
680                 }
681             } else {
682                 // We can't keep the messenger.
683                 // the client was not added, send a disconnected response
684                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
685                     LOG.fine("could not add client " + clientPeerId);
686                 }
687                 
688                 // We do not get a messenger for ourselves here, so
689                 // just get one from the router ourselves, if we have to.
690                 // and can.
691                 if (messenger == null) {
692                     // If we did not get one and manage to obtain one
693                     // from the endpoint, we can use it in-line, but
694                     // we must close it. (The only case).
695                     messenger = endpointService.getMessenger(clientAddr);
696                     if (messenger != null) {
697                         closeMessenger = true;
698                     }
699                     
700                 } else {
701                     
702                     // This is the incoming messenger. We cannot use it
703                     // synchronously. See, the use of BGSend, below.
704                     
705                     rawMessenger = true;
706                 }
707                 
708                 responseMessage = RelayTransport.createDisconnectedMessage();
709                 
710                 // add the relay advertisement of another know relay for the client to try
711                 RdvAdvertisement relayAdv = relayServerCache.getRandomCacheAdv();
712                 
713                 if (relayAdv != null) {
714                     XMLDocument asDoc = (XMLDocument) relayAdv.getDocument(MimeMediaType.XMLUTF8);
715                     
716                     MessageElement relayAdvElement = new TextDocumentMessageElement(RelayTransport.RELAY_ADV_ELEMENT, asDoc, null);
717
718                     responseMessage.addMessageElement(RelayTransport.RELAY_NS, relayAdvElement);
719                 }
720             }
721         } else if (RelayTransport.DISCONNECT_REQUEST.equals(request)) {
722             // Disconnect Request, don't send a response
723             if (clientPeerId != null) {
724                 closingHandler = removeClient(clientPeerId);
725                 if (closingHandler != null) {
726                     if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
727                         LOG.fine("removed client " + clientPeerId);
728                     }
729                 }
730             }
731         } else if (RelayTransport.PID_REQUEST.equals(request)) {
732             
733             // Generate a PeerID in the same group as our PeerID.
734             // The group which my peerID stems from is not necessarily
735             // the group where I am running (more likely it is the net peer
736             // group). Rather than guessing, get the group from our own PID.
737             
738             PeerGroupID groupOfMyPid = (PeerGroupID) group.getPeerID().getPeerGroupID();
739             
740             String pidStr = IDFactory.newPeerID(groupOfMyPid).toString();
741             
742             responseMessage = RelayTransport.createPIDResponseMessage(pidStr);
743             
744             // If there is a raw incoming messenger, that's what we
745             // use. Else, we won't respond.
746             rawMessenger = true;
747         }
748         
749         // if there is a messenger and a response, send it
750         if (messenger != null && responseMessage != null) {
751             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
752                 LOG.fine("sending response to client " + clientPeerId);
753             }
754             
755             // If rawMessenger, then this is the incoming
756             // messenger brought in by messengerReady. In that case,
757             // be carefull. It is synchronous and it could block this
758             // here thread until the message can be sent. Which could
759             // possibly imply that this here method returns...dead lock.
760             // See HttpMessageServlet: messengerReady is called by
761             // the same thread that subsequently picks up messages from
762             // the BCMessenger. So, spawn a thread to reply.
763             // FIXME: eventualy we should start replacing some listener
764             // based code with state machines and event queues.
765             
766             if (rawMessenger) {
767                 
768                 // BGSend will *not* close the messenger after use
769                 // Because incoming messengers do not need to be closed.
770                 new BGSend(messenger, responseMessage, serviceName, peerId);
771             } else {
772                 try {
773                     messenger.sendMessage(responseMessage, serviceName, peerId);
774                 } catch (IOException e) {
775                     if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
776                         LOG.log(Level.WARNING, "Could not send response message to " + clientPeerId, e);
777                     }
778                 }
779             }
780         }
781         
782         if (closeMessenger) {
783             messenger.close();
784         }
785         
786         if (closingHandler != null) {
787             closingHandler.closeClient();
788         }
789         
790         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
791             LOG.fine("relayedClients.size()=" + relayedClients.size());
792         }
793     }
794     
795     private RelayServerClient getClient(String clientPeerId) {
796         RelayServerClient handler;
797         
798         synchronized (relayedClients) {
799             handler = relayedClients.get(clientPeerId);
800         }
801         
802         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
803             LOG.fine("getClient(" + clientPeerId + ") = " + handler);
804         }
805         
806         return handler;
807     }
808     
809     // Add client is idempotent. It can be called for a client that already
810     // exists. The flushqueue option instructs to clear the queue if the client
811     // exists.
812     private RelayServerClient addClient(String clientPeerId, long requestedLease, Messenger messenger, boolean flushQueue) {
813         RelayServerClient handler;
814         boolean isNewClient = false;
815         
816         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
817             LOG.fine("addClient(" + clientPeerId + ")");
818         }
819         
820         synchronized (relayedClients) {
821             // check if this client is already registered
822             handler = relayedClients.get(clientPeerId);
823             if (handler == null) {
824                 // make sure the maximum number of clients has not been reached
825                 // and make sure that we have a messenger to give to the new
826                 // clientHandler.
827                 if ((relayedClients.size() < maxClients) && (messenger != null) && !messenger.isClosed()) {
828                     
829                     // create a new handler
830                     handler = new RelayServerClient(this, clientPeerId, requestedLease, stallTimeout, clientQueueSize);
831                     
832                     // add the handler to the list
833                     relayedClients.put(clientPeerId, handler);
834                     isNewClient = true;
835                     
836                     // check if this is the first client added
837                     if (relayedClients.size() == 1) {
838                         // start the gcThread if it is not already started
839                         if (gcThread == null) {
840                             gcThread = new Thread(group.getHomeThreadGroup(), this, "GC Thread for Relay at " + publicAddress);
841                             gcThread.setDaemon(true);
842                             gcThread.start();
843                         }
844                     }
845                 } else {
846                     if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
847                         LOG.fine(
848                                 "new client denied. nb clients: " + relayedClients.size() + "/" + maxClients + ", messenger: "
849                                 + messenger);
850                     }
851                 }
852             }
853         }
854         
855         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
856             LOG.fine("added = " + (handler != null));
857         }
858         
859         if (handler == null) {
860             return null;
861         }
862         
863         // renew the lease on the old handler
864         // Watchout. The handler might have expired since we got it from the
865         // map. RenewLease will tell us. In that case, tough luck. We don't
866         // make a new one. FIXME: it's not nice to the client, but in no way
867         // a disaster (and very rare).
868         
869         if (!handler.renewLease()) {
870             return null;
871         }
872         
873         if (flushQueue) {
874             handler.flushQueue();
875         }
876         
877         if (messenger != null) {
878             handler.addMessenger(messenger);
879             
880             // We must force the router to learn the new relay connection as
881             // a direct route, so that it replies to route queries even if we
882             // never start talking to the client otherwise.
883             // Here we do something rather acrobatic. We invoke messengerReady
884             // recursively with a new relay messenger that the router will
885             // catch as if it where an incoming messenger (which it is, sort
886             // of). The cleaner alternative: call getMessenger with a hint
887             // causes too much commotion: sometimes an unreachable tcp address
888             // is tried before the hint, which blocks getMessenger for long.
889             
890             if (isNewClient) {
891                 EndpointAddress ear = new EndpointAddress(RelayTransport.protocolName, clientPeerId, null, null);
892                 
893                 MessengerEvent me = new MessengerEvent(this, handler.getMessenger(publicAddress, ear, false), null);
894                 
895                 messengerEventListener.messengerReady(me);
896             }
897         }
898         
899         return handler;
900     }
901     
902     private RelayServerClient removeClient(String clientPeerId) {
903         RelayServerClient handler;
904         
905         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
906             LOG.fine("removeClient(" + clientPeerId + ")");
907         }
908         
909         synchronized (relayedClients) {
910             handler = relayedClients.remove(clientPeerId);
911             
912             // check if there are any clients
913             if (relayedClients.size() == 0) {
914                 // stop the gcThread
915                 if (gcThread != null) {
916                     try {
917                         gcThread.interrupt();
918                     } catch (SecurityException e) {
919                         // ignore this exception
920                         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
921                             LOG.fine(e.toString());
922                         }
923                     }
924                 }
925             }
926         }
927         
928         return handler;
929     }
930     
931     // this is only used by the RelayServerClient when it is closing and needs to remove itself
932     protected void removeClient(String clientPeerId, RelayServerClient handler) {
933         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
934             LOG.fine("removeClient(" + clientPeerId + "," + handler + ")");
935         }
936         
937         synchronized (relayedClients) {
938             RelayServerClient currentHandler = relayedClients.get(clientPeerId);
939             
940             // only remove the client if the current handler matches the passed one
941             if (currentHandler == handler) {
942                 relayedClients.remove(clientPeerId);
943             }
944             
945             // check if there are any clients
946             if (relayedClients.size() == 0) {
947                 // stop the gcThread
948                 Thread temp = gcThread;
949                 
950                 if (temp != null) {
951                     try {
952                         temp.interrupt();
953                     } catch (SecurityException e) {
954                         // ignore this exception
955                         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
956                             LOG.fine(e.toString());
957                         }
958                     }
959                 }
960             }
961         }
962     }
963     
964     /**
965      *  {@inheritDoc}
966      */
967     public void run() {
968         if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
969             LOG.info("Starting lease gc thread");
970         }
971         
972         try {
973             while (true) {
974                 // check if there are any client handlers left
975                 synchronized (relayedClients) {
976                     if (relayedClients.size() == 0) {
977                         break;
978                     }
979                 }
980                 
981                 // do the lease gc
982                 doClientGC();
983                 
984                 // check if there are any client handlers left
985                 synchronized (relayedClients) {
986                     if (relayedClients.size() == 0) {
987                         break;
988                     }
989                 }
990                 
991                 // sleep for a while.
992                 try {
993                     Thread.sleep(stallTimeout);
994                 } catch (InterruptedException e) {
995                     Thread.interrupted();
996                 }
997             }
998         } catch (Throwable all) {
999             if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
1000                 LOG.log(Level.SEVERE, "Uncaught Throwable in thread :" + Thread.currentThread().getName(), all);
1001             }
1002         } finally {
1003             gcThread = null;
1004             
1005             if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
1006                 LOG.info("stopping lease gc thread");
1007             }
1008         }
1009     }
1010     
1011     // checks for expired client handlers
1012     private void doClientGC() {
1013         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1014             LOG.fine("start: check for expired client handler. # clients = " + relayedClients.size());
1015         }
1016         
1017         // get a snapshot of the client handlers
1018         RelayServerClient[] handlers;
1019         
1020         synchronized (relayedClients) {
1021             handlers = relayedClients.values().toArray(new RelayServerClient[0]);
1022         }
1023         
1024         // run through the client handlers
1025         int i = handlers.length;
1026         
1027         while (i-- > 0) {
1028             try {
1029                 // simply calling isExpired will cause the handler to check
1030                 // if it is expired and remove itself if expired
1031                 handlers[i].isExpired();
1032             } catch (Exception e) {
1033                 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1034                     LOG.log(Level.WARNING, "Exception during client gc", e);
1035                 }
1036             }
1037         }
1038         
1039         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1040             LOG.fine("stop: check for expired client handler. # clients = " + relayedClients.size());
1041         }
1042     }
1043     
1044     private static class RelayServerCache implements PipeMsgListener, Runnable {
1045         final static ID pipeID = ID.create(
1046                 URI.create("urn:jxta:uuid-59616261646162614E50472050325033DEADBEEFDEAFBABAFEEDBABE0000000F04"));
1047         
1048         final RelayServer server;
1049         final PipeAdvertisement pipeAdv;
1050         InputPipe inputPipe = null;
1051         
1052         volatile boolean doRun = false;
1053         Thread cacheThread = null;
1054         
1055         final Map<String, RdvAdvertisement> relayAdvCache = new HashMap<String, RdvAdvertisement>();
1056         
1057         final Random rand = new Random();
1058         
1059         protected RelayServerCache(RelayServer server) {
1060             this.server = server;
1061             
1062             pipeAdv = (PipeAdvertisement) AdvertisementFactory.newAdvertisement(PipeAdvertisement.getAdvertisementType());
1063             pipeAdv.setPipeID(pipeID);
1064             pipeAdv.setType(PipeService.PropagateType);
1065         }
1066         
1067         private int relayAdvCacheSize() {
1068             synchronized (relayAdvCache) {
1069                 return relayAdvCache.size();
1070             }
1071         }
1072         
1073         protected RdvAdvertisement getRandomCacheAdv() {
1074             synchronized (relayAdvCache) {
1075                 RdvAdvertisement[] items = relayAdvCache.values().toArray(new RdvAdvertisement[0]);
1076                 
1077                 if (items.length == 0) {
1078                     return null;
1079                 }
1080                 
1081                 return items[rand.nextInt(items.length)];
1082             }
1083         }
1084         
1085         private boolean putCacheAdv(String peerId, RdvAdvertisement adv) {
1086             if (!server.acl.isAllowed(adv.getPeerID())) {
1087                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1088                     LOG.fine("Rejected cache entry for : " + peerId);
1089                 }
1090                 return false;
1091             }
1092             synchronized (relayAdvCache) {
1093                 boolean replaced = (null != relayAdvCache.put(peerId, adv));
1094                 
1095                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1096                     LOG.fine((replaced ? "Updated" : "Created") + " cache entry for : " + peerId);
1097                 }
1098                 
1099                 if (relayAdvCache.size() >= MAX_CACHED_SERVERS) {
1100                     // New entry and map full. Remove one at random.
1101                     String[] keys = relayAdvCache.keySet().toArray(new String[0]);
1102                     
1103                     relayAdvCache.remove(keys[rand.nextInt(keys.length)]);
1104                 }
1105                 
1106                 return replaced;
1107             }
1108         }
1109         
1110         /**
1111          *  {@inheritDoc}
1112          */
1113         public void pipeMsgEvent(PipeMsgEvent event) {
1114             Message message = event.getMessage();
1115             
1116             if (message == null) {
1117                 return;
1118             }
1119             
1120             boolean isResponse = (RelayTransport.getString(message, RelayTransport.RESPONSE_ELEMENT) != null);
1121             String peerId = RelayTransport.getString(message, RelayTransport.PEERID_ELEMENT);
1122             
1123             if (peerId == null || peerId.equals(server.peerId)) {
1124                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1125                     LOG.fine("pipeMsgEvent() discarding message no response PID defined, or loopback ");
1126                 }
1127                 return;
1128             }
1129             
1130             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1131                 LOG.fine("pipeMsgEvent() from " + peerId);
1132             }
1133             
1134             MessageElement me = message.getMessageElement(RelayTransport.RELAY_NS, RelayTransport.RELAY_ADV_ELEMENT);
1135             
1136             if (null == me) {
1137                 return;
1138             }
1139             
1140             Advertisement adv;
1141             try {
1142                 // XXX bondolo 20041207 Force parsing of MessageElement as 
1143                 // XMLUTF8 rather than the actual mime type associated with the
1144                 // MessageElement since the advertisement is often incorrectly
1145                 // stored as a String by older JXTA implementations.
1146                 adv = AdvertisementFactory.newAdvertisement(MimeMediaType.XMLUTF8, me.getStream());
1147             } catch (IOException failed) {
1148                 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1149                     LOG.log(Level.WARNING, "Failed building relay advertisement", failed);
1150                 }
1151                 return;
1152             } catch (NoSuchElementException failed) {
1153                 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1154                     LOG.log(Level.WARNING, "Could not build relay advertisement", failed);
1155                 }
1156                 return;
1157             }
1158             
1159             if (!(adv instanceof RdvAdvertisement)) {
1160                 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1161                     LOG.warning("Response does not contain relay advertisement (" + adv.getAdvType() + ")");
1162                 }
1163                 return;
1164             }
1165             
1166             RdvAdvertisement radv = (RdvAdvertisement) adv;
1167             
1168             if (putCacheAdv(peerId, radv)) {
1169                 
1170                 // New entry, we might want to respond.
1171                 // "someone" should respond; on average, one response
1172                 // is all we want. And that response obviously should be
1173                 // unicast.
1174                 // We achieve an approximation of that by making a computation
1175                 // that will result in "true" on average on only one peer
1176                 // of the set, based on our idea of what the set is.
1177                 // If we know very few other relays compared to what other
1178                 // relays know, we are more likely to respond than they are.
1179                 // So this is very approximate. We want to keep it simple
1180                 // until we have time replace this lazy junk with something
1181                 // sensible.
1182                 
1183                 // If it's a response already, the story stops here !
1184                 if (isResponse) {
1185                     return;
1186                 }
1187                 
1188                 // Here we go:
1189                 int i = relayAdvCacheSize();
1190                 long magic = server.peerId.hashCode() % i;
1191                 
1192                 if (rand.nextInt(i) == magic) {
1193                     
1194                     // Our number came out. Respond.
1195                     
1196                     // See if we have amunition to respond anyway.
1197                     // Very defensive. I care a lot more not to break anything
1198                     // at this stage, than to have optimal functionality.
1199                     
1200                     RdvAdvertisement myAdv = RelayServer.createRdvAdvertisement(server.group.getPeerAdvertisement(), server.serviceName);
1201
1202                     // Need to convert the other party's string pid into
1203                     // a real pid.
1204                     PeerID otherPid = null;
1205                     try {
1206                         otherPid = (PeerID) IDFactory.fromURI(new URI(ID.URIEncodingName, ID.URNNamespace + ":" + peerId, null));
1207                     } catch (Exception ex) {
1208                         if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1209                             LOG.log(Level.WARNING, "Bad peerid : " + peerId, ex);
1210                         }
1211                         
1212                     }
1213
1214                     PipeService pipeService = server.group.getPipeService();
1215                     if (pipeService == null) {
1216                         return; // Funny. We're receiving messages, after all.
1217                     }
1218                     
1219                     // FIXME: jice@jxta.org 20030131 - We're making a rather
1220                     // unorthodox use of the peer-subset feature of propagate
1221                     // pipes. Basically what this does is to send the message
1222                     // in unicast so that it is received on the propagate
1223                     // input pipe of the specified peer.
1224                     // The correct API, if it existed, would be respond().
1225                     
1226                     OutputPipe retPipe = null;
1227                     try {
1228                         retPipe = pipeService.createOutputPipe(pipeAdv, Collections.singleton(otherPid), 2 * TimeUtils.ASECOND);
1229                         if (retPipe == null) {
1230                             return;
1231                         }
1232                         
1233                         // create a new cache message
1234                         message = new Message();
1235                         
1236                         // String version of unique portion only. Per the protocol.
1237                         RelayTransport.setString(message, RelayTransport.PEERID_ELEMENT, server.peerId);
1238                         // Our own adv.
1239                         RelayTransport.setString(message, RelayTransport.RELAY_ADV_ELEMENT, myAdv.toString());
1240                         
1241                         // This is a response. New servers: do not respond! Old
1242                         // servers won't respond anyway.
1243                         RelayTransport.setString(message, RelayTransport.RESPONSE_ELEMENT, "t");
1244                         
1245                         retPipe.send(message);
1246                         
1247                         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1248                             LOG.fine("Responded");
1249                         }
1250                         
1251                     } catch (IOException e) {
1252                         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1253                             LOG.log(Level.FINE, "Could not send reply on pipe ", e);
1254                         }
1255                     }
1256                     
1257                     if (retPipe != null) {
1258                         retPipe.close();
1259                     }
1260                 }
1261             }
1262         }
1263         
1264         /**
1265          *  {@inheritDoc}
1266          */
1267         public void run() {
1268             try {
1269                 OutputPipe outputPipe = null;
1270                 PipeService pipeService = server.group.getPipeService();
1271                 
1272                 while (doRun && inputPipe == null) {
1273                     try {
1274                         inputPipe = pipeService.createInputPipe(pipeAdv, this);
1275                     } catch (IOException e) {
1276                         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1277                             LOG.fine("Could not create input pipe, try again");
1278                         }
1279                     } catch (IllegalStateException e) {
1280                         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1281                             LOG.fine("Pipe Service not ready yet, try again");
1282                         }
1283                     }
1284                     
1285                     try {
1286                         Thread.sleep(TimeUtils.ASECOND);
1287                     } catch (InterruptedException e) {
1288                         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1289                             LOG.fine("wait interrupted");
1290                         }
1291                     }
1292                 }
1293                 
1294                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1295                     LOG.fine("Created input pipe");
1296                 }
1297                 
1298                 while (doRun && outputPipe == null) {
1299                     try {
1300                         outputPipe = pipeService.createOutputPipe(pipeAdv, 5 * TimeUtils.ASECOND);
1301                     } catch (IOException e) {
1302                         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1303                             LOG.fine("Could not create output pipe, try again");
1304                         }
1305                     } catch (IllegalStateException e) {
1306                         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1307                             LOG.fine("Pipe Service not ready yet, try again");
1308                         }
1309                     }
1310                     
1311                     try {
1312                         Thread.sleep(TimeUtils.ASECOND);
1313                     } catch (InterruptedException e) {
1314                         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1315                             LOG.fine("wait interrupted ");
1316                         }
1317                     }
1318                 }
1319                 
1320                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1321                     LOG.fine("Created output pipe");
1322                 }
1323                 
1324                 // Wait a little before mcasting our hello.
1325                 // We depend on the rendezvous infrastructure for it to
1326                 // work. It's pretty important to get the first one out
1327                 // so that we may get a response from others. After that
1328                 // the interval is very long (and its computation an total
1329                 // nonsense) and so others do not talk much
1330                 // either. We want to learn at least one other relay early on.
1331                 // FIXME: jice@jxta.org 20030131 - We realy need to switch to
1332                 // using peerview. It does all of that correctly.
1333                 
1334                 try {
1335                     Thread.sleep(10 * TimeUtils.ASECOND);
1336                 } catch (InterruptedException e) {
1337                     if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1338                         LOG.fine("wait interrupted");
1339                     }
1340                 }
1341                 
1342                 while (doRun) {
1343                     RdvAdvertisement adv = RelayServer.createRdvAdvertisement(server.group.getPeerAdvertisement(), server.serviceName);
1344                     
1345                     // Make sure that the version that can be discovered
1346                     // is consistent.
1347                     try {
1348                         server.discoveryService.publish(adv);
1349                     } catch (IOException e) {
1350                         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1351                             LOG.log(Level.FINE, "Could not publish Relay RdvAdvertisement", e);
1352                         }
1353                     }
1354                     
1355                     if (adv != null) {
1356                         // create a new cache message
1357                         Message message = new Message();
1358                         
1359                         RelayTransport.setString(message, RelayTransport.PEERID_ELEMENT, server.peerId);
1360                         RelayTransport.setString(message, RelayTransport.RELAY_ADV_ELEMENT, adv.toString());
1361                         
1362                         try {
1363                             outputPipe.send(message);
1364                         } catch (IOException e) {
1365                             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1366                                 LOG.log(Level.FINE, "Could not send message on pipe ", e);
1367                             }
1368                         }
1369                     }
1370                     
1371                     long sleepTime = server.minBroadcastInterval
1372                             + ((server.relayedClients.size() + 1) * 100 / (server.maxClients + 1)) * server.minBroadcastInterval;
1373                     
1374                     if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1375                         LOG.fine("sleepTime=" + sleepTime);
1376                     }
1377                     
1378                     try {
1379                         Thread.sleep(sleepTime);
1380                     } catch (InterruptedException e) {
1381                         Thread.interrupted();
1382                     }
1383                 }
1384                 outputPipe.close();
1385                 if (System.currentTimeMillis() > server.refreshTime) {
1386                     server.refreshTime = System.currentTimeMillis() + ACL_REFRESH_PERIOD;
1387                     if (server.aclFile.lastModified() > server.aclFileLastModified) {
1388                         server.aclFileLastModified = server.aclFile.lastModified();
1389                         server.acl.refresh(server.aclFile);
1390                     }
1391                 }
1392             } catch (Throwable all) {
1393                 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
1394                     LOG.log(Level.SEVERE, "Uncaught Throwable in thread :" + Thread.currentThread().getName(), all);
1395                 }
1396             } finally {
1397                 cacheThread = null;
1398                 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
1399                     LOG.info("Cache thread quitting.");
1400                 }
1401             }
1402         }
1403         
1404         protected void startCache() {
1405             doRun = true;
1406             cacheThread = new Thread(server.group.getHomeThreadGroup(), this, "RelayCache Worker Thread for " + server.publicAddress);
1407             cacheThread.setDaemon(true);
1408             cacheThread.start();
1409         }
1410         
1411         protected void stopCache() {
1412             doRun = false;
1413             
1414             if (inputPipe != null) {
1415                 inputPipe.close();
1416                 inputPipe = null;
1417             }
1418             cacheThread.interrupt();
1419         }
1420     }
1421     
1422
1423     /**
1424      *  Sends a message on an synchronous messenger.
1425      */
1426     static class BGSend extends Thread {
1427         
1428         Messenger mr;
1429         Message ms;
1430         String sn;
1431         String ps;
1432         
1433         BGSend(Messenger mr, Message ms, String sn, String ps) {
1434             super("Relay Background Sender");
1435             this.mr = mr;
1436             this.ms = ms;
1437             this.sn = sn;
1438             this.ps = ps;
1439             setDaemon(true);
1440             start();
1441         }
1442         
1443         /**
1444          *  {@inheritDoc}
1445          */
1446         @Override
1447         public void run() {
1448             try {
1449                 mr.sendMessage(ms, sn, ps);
1450             } catch (IOException e) {
1451                 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1452                     LOG.log(Level.WARNING, "Failed sending response " + ms + " to " + ps, e);
1453                 }
1454             } catch (Throwable all) {
1455                 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
1456                     LOG.log(Level.SEVERE, "Uncaught Throwable in thread :" + Thread.currentThread().getName(), all);
1457                 }
1458             }
1459             
1460         }
1461     }
1462     
1463     private static RdvAdvertisement createRdvAdvertisement(PeerAdvertisement padv, String name) {
1464         try {
1465             // FIX ME: 10/19/2002 lomax@jxta.org. We need to properly set up the service ID. Unfortunately
1466             // this current implementation of the PeerView takes a String as a service name and not its ID.
1467             // Since currently, there is only PeerView per group (all peerviews share the same "service", this
1468             // is not a problem, but that will have to be fixed eventually.
1469             
1470             // create a new RdvAdvertisement
1471             RdvAdvertisement rdv = (RdvAdvertisement) AdvertisementFactory.newAdvertisement(
1472                     RdvAdvertisement.getAdvertisementType());
1473             
1474             rdv.setPeerID(padv.getPeerID());
1475             rdv.setGroupID(padv.getPeerGroupID());
1476             rdv.setServiceName(name);
1477             rdv.setName(padv.getName());
1478             
1479             RouteAdvertisement ra = EndpointUtils.extractRouteAdv(padv);
1480             
1481             if (null == ra) {
1482                 // No route available
1483                 return null;
1484             }
1485             
1486             // Insert it into the RdvAdvertisement.
1487             rdv.setRouteAdv(ra);
1488             
1489             return rdv;
1490         } catch (Exception ez) {
1491             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1492                 LOG.log(Level.WARNING, "Cannot create Local RdvAdvertisement: ", ez);
1493             }
1494             return null;
1495         }
1496     }
1497         
1498 }