2 * Copyright (c) 2001-2007 Sun Microsystems, Inc. All rights reserved.
4 * The Sun Project JXTA(TM) Software License
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
9 * 1. Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
16 * 3. The end-user documentation included with the redistribution, if any, must
17 * include the following acknowledgment: "This product includes software
18 * developed by Sun Microsystems, Inc. for JXTA(TM) technology."
19 * Alternately, this acknowledgment may appear in the software itself, if
20 * and wherever such third-party acknowledgments normally appear.
22 * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must
23 * not be used to endorse or promote products derived from this software
24 * without prior written permission. For written permission, please contact
25 * Project JXTA at http://www.jxta.org.
27 * 5. Products derived from this software may not be called "JXTA", nor may
28 * "JXTA" appear in their name, without prior written permission of Sun.
30 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
31 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
32 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN
33 * MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
34 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
36 * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
37 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
38 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
39 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41 * JXTA is a registered trademark of Sun Microsystems, Inc. in the United
42 * States and other countries.
44 * Please see the license information page at :
45 * <http://www.jxta.org/project/www/license.html> for instructions on use of
46 * the license in source files.
48 * ====================================================================
50 * This software consists of voluntary contributions made by many individuals
51 * on behalf of Project JXTA. For more information on Project JXTA, please see
52 * http://www.jxta.org.
54 * This license is based on the BSD license adopted by the Apache Foundation.
56 package net.jxta.impl.endpoint.relay;
59 import java.io.IOException;
61 import java.util.ArrayList;
62 import java.util.Arrays;
63 import java.util.Collections;
64 import java.util.HashMap;
65 import java.util.List;
67 import java.util.NoSuchElementException;
68 import java.util.Random;
70 import net.jxta.discovery.DiscoveryService;
71 import net.jxta.document.Advertisement;
72 import net.jxta.document.AdvertisementFactory;
73 import net.jxta.document.MimeMediaType;
74 import net.jxta.document.XMLDocument;
75 import net.jxta.endpoint.EndpointAddress;
76 import net.jxta.endpoint.EndpointService;
77 import net.jxta.endpoint.Message;
78 import net.jxta.endpoint.MessageElement;
79 import net.jxta.endpoint.MessageSender;
80 import net.jxta.endpoint.Messenger;
81 import net.jxta.endpoint.MessengerEvent;
82 import net.jxta.endpoint.MessengerEventListener;
83 import net.jxta.endpoint.TextDocumentMessageElement;
84 import net.jxta.id.ID;
85 import net.jxta.id.IDFactory;
86 import net.jxta.impl.access.AccessList;
87 import net.jxta.impl.protocol.RelayConfigAdv;
88 import net.jxta.impl.util.TimeUtils;
89 import net.jxta.peer.PeerID;
90 import net.jxta.peergroup.PeerGroup;
91 import net.jxta.peergroup.PeerGroupID;
92 import net.jxta.pipe.InputPipe;
93 import net.jxta.pipe.OutputPipe;
94 import net.jxta.pipe.PipeMsgEvent;
95 import net.jxta.pipe.PipeMsgListener;
96 import net.jxta.pipe.PipeService;
97 import net.jxta.protocol.PeerAdvertisement;
98 import net.jxta.protocol.PipeAdvertisement;
99 import net.jxta.protocol.RdvAdvertisement;
100 import net.jxta.protocol.RouteAdvertisement;
102 import java.util.logging.Level;
103 import net.jxta.logging.Logging;
104 import java.util.logging.Logger;
105 import net.jxta.impl.endpoint.EndpointUtils;
108 * Relay server that maintains outgoing message queues, leases, etc.
110 public class RelayServer implements MessageSender, MessengerEventListener, Runnable {
115 private final static transient Logger LOG = Logger.getLogger(RelayServer.class.getName());
117 private final static int MAX_CACHED_SERVERS = 20;
120 * The EndpointService for the RelayService
122 private EndpointService endpointService;
125 * The DiscoveryService for the RelayService
127 private DiscoveryService discoveryService;
130 * The public address is of the form relay://peerId
132 private final EndpointAddress publicAddress;
135 * Map of the current clients
137 private final Map<String, RelayServerClient> relayedClients = new HashMap<String, RelayServerClient>();
139 protected final PeerGroup group;
140 protected final String serviceName;
141 private final int maxClients;
142 private final long maxLeaseDuration;
143 private final long stallTimeout;
144 private final int clientQueueSize;
145 private final long minBroadcastInterval;
147 protected final String peerId;
149 protected final AccessList acl;
150 protected File aclFile;
151 protected long refreshTime = 0;
153 protected long aclFileLastModified = 0;
154 private static final long ACL_REFRESH_PERIOD = 30 * TimeUtils.AMINUTE;
156 protected RelayServerCache relayServerCache;
158 private Thread gcThread = null;
160 private MessengerEventListener messengerEventListener = null;
165 public RelayServer(PeerGroup group, String serviceName, RelayConfigAdv relayConfigAdv) {
168 peerId = group.getPeerID().getUniqueValue().toString();
169 publicAddress = new EndpointAddress(RelayTransport.protocolName, peerId, null, null);
171 this.serviceName = serviceName;
173 this.maxClients = (-1 != relayConfigAdv.getMaxClients())
174 ? relayConfigAdv.getMaxClients()
175 : RelayTransport.DEFAULT_MAX_CLIENTS;
176 this.clientQueueSize = (-1 != relayConfigAdv.getClientMessageQueueSize())
177 ? relayConfigAdv.getClientMessageQueueSize()
178 : RelayTransport.DEFAULT_CLIENT_QUEUE_SIZE;
179 this.maxLeaseDuration = (-1 != relayConfigAdv.getServerLeaseDuration())
180 ? relayConfigAdv.getServerLeaseDuration()
181 : RelayTransport.DEFAULT_LEASE;
182 this.minBroadcastInterval = (-1 != relayConfigAdv.getAnnounceInterval())
183 ? relayConfigAdv.getAnnounceInterval()
184 : RelayTransport.DEFAULT_BROADCAST_INTERVAL;
185 this.stallTimeout = (-1 != relayConfigAdv.getStallTimeout())
186 ? relayConfigAdv.getStallTimeout()
187 : RelayTransport.DEFAULT_STALL_TIMEOUT;
189 aclFile = new File(new File(group.getStoreHome()), "relayACL.xml");
190 aclFileLastModified = aclFile.lastModified();
191 this.acl = new AccessList();
194 this.refreshTime = System.currentTimeMillis() + ACL_REFRESH_PERIOD;
195 } catch (IOException io) {
196 acl.setGrantAll(true);
197 this.refreshTime = Long.MAX_VALUE;
198 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
199 LOG.info("RelayServer Access Control granting all permissions");
203 if (Logging.SHOW_CONFIG && LOG.isLoggable(Level.CONFIG)) {
204 StringBuilder configInfo = new StringBuilder("Configuring Relay Server");
206 configInfo.append("\n\tGroup Params :");
207 configInfo.append("\n\t\tGroup : ").append(group);
208 configInfo.append("\n\t\tPeer ID : ").append(group.getPeerID());
210 configInfo.append("\n\tConfiguration :");
211 configInfo.append("\n\t\tService Name : ").append(serviceName);
212 configInfo.append("\n\t\tMax Relay Clients : ").append(maxClients);
213 configInfo.append("\n\t\tMax Lease Length : ").append(maxLeaseDuration).append("ms.");
214 configInfo.append("\n\t\tBroadcast Interval : ").append(minBroadcastInterval).append("ms.");
215 configInfo.append("\n\t\tStall Timeout : ").append(stallTimeout).append("ms.");
217 LOG.config(configInfo.toString());
222 * Debug routine: returns the list of relayedClients with details.
224 public List<String> getRelayedClients() {
225 List<String> res = new ArrayList<String>();
227 for (Object o : Arrays.asList(relayedClients.values().toArray())) {
228 String client = o.toString();
236 public boolean startServer() {
238 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
239 LOG.info("Starting " + publicAddress.toString());
242 endpointService = group.getEndpointService();
243 discoveryService = group.getDiscoveryService();
245 if ((messengerEventListener = endpointService.addMessageTransport(this)) == null) {
246 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
247 LOG.severe("Transport registration refused");
253 discoveryService.publish(createRdvAdvertisement(group.getPeerAdvertisement(), serviceName));
254 } catch (IOException e) {
255 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
256 LOG.log(Level.WARNING, "Could not publish Relay RdvAdvertisement", e);
260 relayServerCache = new RelayServerCache(this);
262 // start cache relay servers
263 relayServerCache.startCache();
265 endpointService.addMessengerEventListener(this, EndpointService.HighPrecedence);
267 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
268 LOG.info("Relay Server started");
273 public void stopServer() {
274 // stop cache relay servers
275 relayServerCache.stopCache();
276 relayServerCache = null;
278 // remove messenger events listener since we do not have any clients
279 endpointService.removeMessengerEventListener(this, EndpointService.HighPrecedence);
281 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
282 LOG.fine("Messenger Event Listener removed " + serviceName);
285 // Close all clients.
286 // Get a list of the clients but leave them in the map;
287 // they remove themselves by calling removeClient(this).
288 // That's why we do not iterate through the real map to close them.
290 RelayServerClient[] oldClients;
292 synchronized (relayedClients) {
293 oldClients = relayedClients.values().toArray(new RelayServerClient[0]);
296 int i = oldClients.length;
299 oldClients[i].closeClient();
302 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
303 LOG.info("Stopped " + publicAddress);
308 * Methods inherited from MessageSender
314 public EndpointAddress getPublicAddress() {
315 return publicAddress;
321 public boolean isConnectionOriented() {
328 public boolean allowsRouting() {
335 public Object transportControl(Object operation, Object Value) {
342 public Messenger getMessenger(EndpointAddress destAddr, Object hintIgnored) {
343 Messenger messenger = null;
345 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
346 LOG.fine("getMessenger for dest " + destAddr);
349 if (!RelayTransport.protocolName.equals(destAddr.getProtocolName())) {
350 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
351 LOG.warning("could not make messenger for protocol :" + destAddr.getProtocolName());
357 // check if we have a queue for this client
358 RelayServerClient handler = getClient(destAddr.getProtocolAddress());
360 if (handler != null) {
361 messenger = handler.getMessenger(publicAddress, destAddr, false);
364 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
365 LOG.fine("messenger for " + destAddr.getProtocolAddress() + " is " + messenger);
375 public boolean ping(EndpointAddress addr) {
377 synchronized (relayedClients) {
378 return (null != relayedClients.get(addr.getProtocolAddress()));
383 * Methods inherited from MessageTransport
389 public String getProtocolName() {
390 return RelayTransport.protocolName;
396 public EndpointService getEndpointService() {
397 return endpointService;
403 public boolean messengerReady(MessengerEvent event) {
404 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
405 LOG.fine("messengerReady");
408 Messenger newMessenger = event.getMessenger();
409 Object source = event.getSource();
410 EndpointAddress connectionAddress = event.getConnectionAddress();
412 // Sanity check, this should not happen
413 if (newMessenger == null || source == null || connectionAddress == null) {
414 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
415 LOG.fine("there was not a messenger or not enough information");
421 // We do not grab just any messenger; that would replace the
422 // existing one and then we could have a fight between the
423 // front channel and the back channel from the same peer.
424 // We only grab back-channel messengers that where explicitly
425 // directed to the relay.
426 if (!serviceName.equals(connectionAddress.getServiceName())) {
430 // make sure that it is not a higher level messenger
431 if (source instanceof MessageSender && !((MessageSender) source).allowsRouting()) {
432 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
433 LOG.fine("this is a higher level messenger");
439 // make sure that this is not one of our own.
440 if (source == this || newMessenger instanceof RelayServerClient.RelayMessenger) {
441 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
442 LOG.fine("this is a relay messenger");
448 // make sure that the messenger matches a possible client address
449 EndpointAddress destAddr = newMessenger.getLogicalDestinationAddress();
451 if (destAddr == null || !"jxta".equals(destAddr.getProtocolName())) {
452 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
453 LOG.fine("LogicalDestinationAddress is not a \"jxta\" protocol");
459 // check if we have a queue for this client
460 // In that case, we just give it the handler and be done.
461 // We must not process the lease request that comes with a messenger
462 // for an existing client. If we did, we would reply with a lease
463 // response. Some connections can carry only one message and then
464 // close. In that case, the client has to re-establish the connection
465 // every time we respond. So, if we repond to all incoming connections
466 // we're going nowhere. In some cases, the client realy wants a
467 // response because it believes it is an initial connection while
468 // we still have it from a previous session. In that case, the client
469 // must try to send an explicit lease renewal message. (To which
472 String clientPeerId = destAddr.getProtocolAddress();
473 RelayServerClient handler = getClient(clientPeerId);
475 if (handler != null) {
476 return handler.addMessenger(newMessenger);
479 // Non-existent client. We want to process the
480 // connection request and respond.
481 // handleRequest may do whatever, but we always keep the
482 // messenger. It was meant for us anyway.
483 handleRequest(newMessenger, connectionAddress);
487 protected void handleRequest(Messenger messenger, EndpointAddress connectionAddress) {
489 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
490 LOG.fine("handleRequest from messenger");
493 // In this case, the request comes within the messenger's destination.
494 String request = connectionAddress.getServiceParameter();
496 // make sure that the messenger shows a client logical address
497 EndpointAddress clientAddr = messenger.getLogicalDestinationAddress();
499 if (clientAddr == null || !"jxta".equals(clientAddr.getProtocolName())) {
500 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
501 LOG.fine("LogicalDestinationAddress is not a \"jxta\" protocol");
507 String clientPeerId = clientAddr.getProtocolAddress();
509 handleRequest(request, clientPeerId, messenger);
512 protected void handleRequest(Message message, EndpointAddress dstAddr) {
514 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
515 LOG.fine("handleRequest from message");
518 String request = RelayTransport.getString(message, RelayTransport.REQUEST_ELEMENT);
519 String clientPeerId = dstAddr.getServiceParameter();
521 handleRequest(request, clientPeerId, null);
524 void handleRequest(String request, String clientPeerId, Messenger messenger) {
525 // This request may come along with a messenger (if it is a renewal
526 // post-disconnection or an initial lease request).
528 if (request == null) {
532 request = request.toLowerCase();
534 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
535 LOG.fine("request = " + request);
538 // only process the request if a client peer id was sent
539 if (clientPeerId == null) {
543 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
544 LOG.fine("clientPeerId = " + clientPeerId);
547 // The only valid anonymous request is a request to obtain a real pid.
548 if ((clientPeerId.equals("unknown-unknown")) && (!request.startsWith(RelayTransport.PID_REQUEST))) {
552 Message responseMessage = null;
554 RelayServerClient closingHandler = null;
555 boolean rawMessenger = false;
556 boolean closeMessenger = false;
558 // Figure out which request it is
559 if (request.startsWith(RelayTransport.CONNECT_REQUEST)) {
561 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
562 LOG.fine("connect clientPeerId = " + clientPeerId);
565 long requestedLease = maxLeaseDuration;
566 boolean returnRelayAdv = false;
567 boolean returnOtherRelayAdv = false;
568 boolean flushQueue = false;
570 String requestedLeaseString = null;
572 // check if a lease value was specified
573 int startIdx = request.indexOf(',');
575 if (startIdx != -1) {
576 // find the end of the lease value
577 int endIdx = request.indexOf(',', startIdx + 1);
580 requestedLeaseString = request.substring(startIdx + 1);
582 requestedLeaseString = request.substring(startIdx + 1, endIdx);
583 String flags = request.substring(endIdx + 1);
585 if (flags.endsWith("true")) {
586 returnRelayAdv = true;
587 } else if (flags.endsWith("other")) {
588 // This is an addition to the protocol. Newer
589 // clients will always set that in connection requests
590 // when not setting true. Only older clients use to
591 // set nothing at all.
592 returnOtherRelayAdv = true;
594 // Only two flag positions for now
595 // The inserted first position is another extention.
596 // Only newer clients use it. Older servers will not
597 // notice it because they only check how the request ends.
598 // So, new clients are also compatible with old servers.
599 if (flags.startsWith("flush")) {
605 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
607 "request lease string = " + requestedLeaseString + "\treturn relay adv = " + returnRelayAdv
608 + "\n\treturn other relay adv = " + returnOtherRelayAdv + "\tflush queue = " + flushQueue);
611 if (requestedLeaseString != null) {
613 requestedLease = Long.parseLong(requestedLeaseString);
614 } catch (NumberFormatException e) {
615 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
616 LOG.info("could not parse requested lease string");
620 if (requestedLease > maxLeaseDuration) {
621 requestedLease = maxLeaseDuration;
625 // process the connect request
626 EndpointAddress clientAddr = new EndpointAddress("jxta", clientPeerId, serviceName, peerId);
628 // If we have a messenger, the clientHandler gets it.
629 // If the client handler did not already exist, it will be
630 // created only if we pass a messenger. We can no-longer create
631 // new clients without an incoming messenger. We used to get one
632 // from the router but no-longer. Now initial lease requests must
633 // come as part of the messenger creation.
635 RelayServerClient handler = addClient(clientPeerId, requestedLease, messenger, flushQueue);
637 if (handler != null) {
639 // the client was added, send a connected response
640 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
641 LOG.fine("added client " + clientPeerId);
644 // Now get a messenger that goes through the handler and
645 // sends messages out-of-band (and internal perk).
646 // jice@jxta.org - 20021227 all this code is getting ridiculous
647 // it has to be re-organized. Addind the outOfBand feature
648 // to all RelayMessengers just for that is overkill. This
649 // just a temporary patch. The real fix would be to respond
650 // straight with the messenger we have. Unfortunately, sometimes
651 // we have to respond without a messenger in our hands because
652 // sending a message over an explicit connection is the only
653 // way for existing clients to ask for a response when they
654 // reconnect. We would need to change the protocol and add an
655 // "initial connection" request type to fix that.
657 messenger = handler.getMessenger(publicAddress, clientAddr, true);
658 responseMessage = RelayTransport.createConnectedMessage(handler.getLeaseRemaining());
659 // For protocol compatibility reasons, returnRelayAdv realy
660 // means "return your own because I do not know it".
661 // If returnOtherRelayAdv is true, then, we will return one
662 // selected among those we know, for the enlightenment of the
664 // If neither is true, we'll return no adv at all in order not
665 // to confuse existing clients.
667 RdvAdvertisement relayAdv = null;
669 if (returnRelayAdv) {
670 relayAdv = createRdvAdvertisement(group.getPeerAdvertisement(), serviceName);
671 } else if (returnOtherRelayAdv) {
672 relayAdv = relayServerCache.getRandomCacheAdv();
674 if (relayAdv != null) {
675 XMLDocument asDoc = (XMLDocument) relayAdv.getDocument(MimeMediaType.XMLUTF8);
677 MessageElement relayAdvElement = new TextDocumentMessageElement(RelayTransport.RELAY_ADV_ELEMENT, asDoc, null);
679 responseMessage.addMessageElement(RelayTransport.RELAY_NS, relayAdvElement);
682 // We can't keep the messenger.
683 // the client was not added, send a disconnected response
684 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
685 LOG.fine("could not add client " + clientPeerId);
688 // We do not get a messenger for ourselves here, so
689 // just get one from the router ourselves, if we have to.
691 if (messenger == null) {
692 // If we did not get one and manage to obtain one
693 // from the endpoint, we can use it in-line, but
694 // we must close it. (The only case).
695 messenger = endpointService.getMessenger(clientAddr);
696 if (messenger != null) {
697 closeMessenger = true;
702 // This is the incoming messenger. We cannot use it
703 // synchronously. See, the use of BGSend, below.
708 responseMessage = RelayTransport.createDisconnectedMessage();
710 // add the relay advertisement of another know relay for the client to try
711 RdvAdvertisement relayAdv = relayServerCache.getRandomCacheAdv();
713 if (relayAdv != null) {
714 XMLDocument asDoc = (XMLDocument) relayAdv.getDocument(MimeMediaType.XMLUTF8);
716 MessageElement relayAdvElement = new TextDocumentMessageElement(RelayTransport.RELAY_ADV_ELEMENT, asDoc, null);
718 responseMessage.addMessageElement(RelayTransport.RELAY_NS, relayAdvElement);
721 } else if (RelayTransport.DISCONNECT_REQUEST.equals(request)) {
722 // Disconnect Request, don't send a response
723 if (clientPeerId != null) {
724 closingHandler = removeClient(clientPeerId);
725 if (closingHandler != null) {
726 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
727 LOG.fine("removed client " + clientPeerId);
731 } else if (RelayTransport.PID_REQUEST.equals(request)) {
733 // Generate a PeerID in the same group as our PeerID.
734 // The group which my peerID stems from is not necessarily
735 // the group where I am running (more likely it is the net peer
736 // group). Rather than guessing, get the group from our own PID.
738 PeerGroupID groupOfMyPid = (PeerGroupID) group.getPeerID().getPeerGroupID();
740 String pidStr = IDFactory.newPeerID(groupOfMyPid).toString();
742 responseMessage = RelayTransport.createPIDResponseMessage(pidStr);
744 // If there is a raw incoming messenger, that's what we
745 // use. Else, we won't respond.
749 // if there is a messenger and a response, send it
750 if (messenger != null && responseMessage != null) {
751 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
752 LOG.fine("sending response to client " + clientPeerId);
755 // If rawMessenger, then this is the incoming
756 // messenger brought in by messengerReady. In that case,
757 // be carefull. It is synchronous and it could block this
758 // here thread until the message can be sent. Which could
759 // possibly imply that this here method returns...dead lock.
760 // See HttpMessageServlet: messengerReady is called by
761 // the same thread that subsequently picks up messages from
762 // the BCMessenger. So, spawn a thread to reply.
763 // FIXME: eventualy we should start replacing some listener
764 // based code with state machines and event queues.
768 // BGSend will *not* close the messenger after use
769 // Because incoming messengers do not need to be closed.
770 new BGSend(messenger, responseMessage, serviceName, peerId);
773 messenger.sendMessage(responseMessage, serviceName, peerId);
774 } catch (IOException e) {
775 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
776 LOG.log(Level.WARNING, "Could not send response message to " + clientPeerId, e);
782 if (closeMessenger) {
786 if (closingHandler != null) {
787 closingHandler.closeClient();
790 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
791 LOG.fine("relayedClients.size()=" + relayedClients.size());
795 private RelayServerClient getClient(String clientPeerId) {
796 RelayServerClient handler;
798 synchronized (relayedClients) {
799 handler = relayedClients.get(clientPeerId);
802 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
803 LOG.fine("getClient(" + clientPeerId + ") = " + handler);
809 // Add client is idempotent. It can be called for a client that already
810 // exists. The flushqueue option instructs to clear the queue if the client
812 private RelayServerClient addClient(String clientPeerId, long requestedLease, Messenger messenger, boolean flushQueue) {
813 RelayServerClient handler;
814 boolean isNewClient = false;
816 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
817 LOG.fine("addClient(" + clientPeerId + ")");
820 synchronized (relayedClients) {
821 // check if this client is already registered
822 handler = relayedClients.get(clientPeerId);
823 if (handler == null) {
824 // make sure the maximum number of clients has not been reached
825 // and make sure that we have a messenger to give to the new
827 if ((relayedClients.size() < maxClients) && (messenger != null) && !messenger.isClosed()) {
829 // create a new handler
830 handler = new RelayServerClient(this, clientPeerId, requestedLease, stallTimeout, clientQueueSize);
832 // add the handler to the list
833 relayedClients.put(clientPeerId, handler);
836 // check if this is the first client added
837 if (relayedClients.size() == 1) {
838 // start the gcThread if it is not already started
839 if (gcThread == null) {
840 gcThread = new Thread(group.getHomeThreadGroup(), this, "GC Thread for Relay at " + publicAddress);
841 gcThread.setDaemon(true);
846 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
848 "new client denied. nb clients: " + relayedClients.size() + "/" + maxClients + ", messenger: "
855 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
856 LOG.fine("added = " + (handler != null));
859 if (handler == null) {
863 // renew the lease on the old handler
864 // Watchout. The handler might have expired since we got it from the
865 // map. RenewLease will tell us. In that case, tough luck. We don't
866 // make a new one. FIXME: it's not nice to the client, but in no way
867 // a disaster (and very rare).
869 if (!handler.renewLease()) {
874 handler.flushQueue();
877 if (messenger != null) {
878 handler.addMessenger(messenger);
880 // We must force the router to learn the new relay connection as
881 // a direct route, so that it replies to route queries even if we
882 // never start talking to the client otherwise.
883 // Here we do something rather acrobatic. We invoke messengerReady
884 // recursively with a new relay messenger that the router will
885 // catch as if it where an incoming messenger (which it is, sort
886 // of). The cleaner alternative: call getMessenger with a hint
887 // causes too much commotion: sometimes an unreachable tcp address
888 // is tried before the hint, which blocks getMessenger for long.
891 EndpointAddress ear = new EndpointAddress(RelayTransport.protocolName, clientPeerId, null, null);
893 MessengerEvent me = new MessengerEvent(this, handler.getMessenger(publicAddress, ear, false), null);
895 messengerEventListener.messengerReady(me);
902 private RelayServerClient removeClient(String clientPeerId) {
903 RelayServerClient handler;
905 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
906 LOG.fine("removeClient(" + clientPeerId + ")");
909 synchronized (relayedClients) {
910 handler = relayedClients.remove(clientPeerId);
912 // check if there are any clients
913 if (relayedClients.size() == 0) {
915 if (gcThread != null) {
917 gcThread.interrupt();
918 } catch (SecurityException e) {
919 // ignore this exception
920 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
921 LOG.fine(e.toString());
931 // this is only used by the RelayServerClient when it is closing and needs to remove itself
932 protected void removeClient(String clientPeerId, RelayServerClient handler) {
933 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
934 LOG.fine("removeClient(" + clientPeerId + "," + handler + ")");
937 synchronized (relayedClients) {
938 RelayServerClient currentHandler = relayedClients.get(clientPeerId);
940 // only remove the client if the current handler matches the passed one
941 if (currentHandler == handler) {
942 relayedClients.remove(clientPeerId);
945 // check if there are any clients
946 if (relayedClients.size() == 0) {
948 Thread temp = gcThread;
953 } catch (SecurityException e) {
954 // ignore this exception
955 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
956 LOG.fine(e.toString());
968 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
969 LOG.info("Starting lease gc thread");
974 // check if there are any client handlers left
975 synchronized (relayedClients) {
976 if (relayedClients.size() == 0) {
984 // check if there are any client handlers left
985 synchronized (relayedClients) {
986 if (relayedClients.size() == 0) {
991 // sleep for a while.
993 Thread.sleep(stallTimeout);
994 } catch (InterruptedException e) {
995 Thread.interrupted();
998 } catch (Throwable all) {
999 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
1000 LOG.log(Level.SEVERE, "Uncaught Throwable in thread :" + Thread.currentThread().getName(), all);
1005 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
1006 LOG.info("stopping lease gc thread");
1011 // checks for expired client handlers
1012 private void doClientGC() {
1013 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1014 LOG.fine("start: check for expired client handler. # clients = " + relayedClients.size());
1017 // get a snapshot of the client handlers
1018 RelayServerClient[] handlers;
1020 synchronized (relayedClients) {
1021 handlers = relayedClients.values().toArray(new RelayServerClient[0]);
1024 // run through the client handlers
1025 int i = handlers.length;
1029 // simply calling isExpired will cause the handler to check
1030 // if it is expired and remove itself if expired
1031 handlers[i].isExpired();
1032 } catch (Exception e) {
1033 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1034 LOG.log(Level.WARNING, "Exception during client gc", e);
1039 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1040 LOG.fine("stop: check for expired client handler. # clients = " + relayedClients.size());
1044 private static class RelayServerCache implements PipeMsgListener, Runnable {
1045 final static ID pipeID = ID.create(
1046 URI.create("urn:jxta:uuid-59616261646162614E50472050325033DEADBEEFDEAFBABAFEEDBABE0000000F04"));
1048 final RelayServer server;
1049 final PipeAdvertisement pipeAdv;
1050 InputPipe inputPipe = null;
1052 volatile boolean doRun = false;
1053 Thread cacheThread = null;
1055 final Map<String, RdvAdvertisement> relayAdvCache = new HashMap<String, RdvAdvertisement>();
1057 final Random rand = new Random();
1059 protected RelayServerCache(RelayServer server) {
1060 this.server = server;
1062 pipeAdv = (PipeAdvertisement) AdvertisementFactory.newAdvertisement(PipeAdvertisement.getAdvertisementType());
1063 pipeAdv.setPipeID(pipeID);
1064 pipeAdv.setType(PipeService.PropagateType);
1067 private int relayAdvCacheSize() {
1068 synchronized (relayAdvCache) {
1069 return relayAdvCache.size();
1073 protected RdvAdvertisement getRandomCacheAdv() {
1074 synchronized (relayAdvCache) {
1075 RdvAdvertisement[] items = relayAdvCache.values().toArray(new RdvAdvertisement[0]);
1077 if (items.length == 0) {
1081 return items[rand.nextInt(items.length)];
1085 private boolean putCacheAdv(String peerId, RdvAdvertisement adv) {
1086 if (!server.acl.isAllowed(adv.getPeerID())) {
1087 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1088 LOG.fine("Rejected cache entry for : " + peerId);
1092 synchronized (relayAdvCache) {
1093 boolean replaced = (null != relayAdvCache.put(peerId, adv));
1095 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1096 LOG.fine((replaced ? "Updated" : "Created") + " cache entry for : " + peerId);
1099 if (relayAdvCache.size() >= MAX_CACHED_SERVERS) {
1100 // New entry and map full. Remove one at random.
1101 String[] keys = relayAdvCache.keySet().toArray(new String[0]);
1103 relayAdvCache.remove(keys[rand.nextInt(keys.length)]);
1113 public void pipeMsgEvent(PipeMsgEvent event) {
1114 Message message = event.getMessage();
1116 if (message == null) {
1120 boolean isResponse = (RelayTransport.getString(message, RelayTransport.RESPONSE_ELEMENT) != null);
1121 String peerId = RelayTransport.getString(message, RelayTransport.PEERID_ELEMENT);
1123 if (peerId == null || peerId.equals(server.peerId)) {
1124 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1125 LOG.fine("pipeMsgEvent() discarding message no response PID defined, or loopback ");
1130 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1131 LOG.fine("pipeMsgEvent() from " + peerId);
1134 MessageElement me = message.getMessageElement(RelayTransport.RELAY_NS, RelayTransport.RELAY_ADV_ELEMENT);
1142 // XXX bondolo 20041207 Force parsing of MessageElement as
1143 // XMLUTF8 rather than the actual mime type associated with the
1144 // MessageElement since the advertisement is often incorrectly
1145 // stored as a String by older JXTA implementations.
1146 adv = AdvertisementFactory.newAdvertisement(MimeMediaType.XMLUTF8, me.getStream());
1147 } catch (IOException failed) {
1148 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1149 LOG.log(Level.WARNING, "Failed building relay advertisement", failed);
1152 } catch (NoSuchElementException failed) {
1153 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1154 LOG.log(Level.WARNING, "Could not build relay advertisement", failed);
1159 if (!(adv instanceof RdvAdvertisement)) {
1160 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1161 LOG.warning("Response does not contain relay advertisement (" + adv.getAdvType() + ")");
1166 RdvAdvertisement radv = (RdvAdvertisement) adv;
1168 if (putCacheAdv(peerId, radv)) {
1170 // New entry, we might want to respond.
1171 // "someone" should respond; on average, one response
1172 // is all we want. And that response obviously should be
1174 // We achieve an approximation of that by making a computation
1175 // that will result in "true" on average on only one peer
1176 // of the set, based on our idea of what the set is.
1177 // If we know very few other relays compared to what other
1178 // relays know, we are more likely to respond than they are.
1179 // So this is very approximate. We want to keep it simple
1180 // until we have time replace this lazy junk with something
1183 // If it's a response already, the story stops here !
1189 int i = relayAdvCacheSize();
1190 long magic = server.peerId.hashCode() % i;
1192 if (rand.nextInt(i) == magic) {
1194 // Our number came out. Respond.
1196 // See if we have amunition to respond anyway.
1197 // Very defensive. I care a lot more not to break anything
1198 // at this stage, than to have optimal functionality.
1200 RdvAdvertisement myAdv = RelayServer.createRdvAdvertisement(server.group.getPeerAdvertisement(), server.serviceName);
1202 // Need to convert the other party's string pid into
1204 PeerID otherPid = null;
1206 otherPid = (PeerID) IDFactory.fromURI(new URI(ID.URIEncodingName, ID.URNNamespace + ":" + peerId, null));
1207 } catch (Exception ex) {
1208 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1209 LOG.log(Level.WARNING, "Bad peerid : " + peerId, ex);
1214 PipeService pipeService = server.group.getPipeService();
1215 if (pipeService == null) {
1216 return; // Funny. We're receiving messages, after all.
1219 // FIXME: jice@jxta.org 20030131 - We're making a rather
1220 // unorthodox use of the peer-subset feature of propagate
1221 // pipes. Basically what this does is to send the message
1222 // in unicast so that it is received on the propagate
1223 // input pipe of the specified peer.
1224 // The correct API, if it existed, would be respond().
1226 OutputPipe retPipe = null;
1228 retPipe = pipeService.createOutputPipe(pipeAdv, Collections.singleton(otherPid), 2 * TimeUtils.ASECOND);
1229 if (retPipe == null) {
1233 // create a new cache message
1234 message = new Message();
1236 // String version of unique portion only. Per the protocol.
1237 RelayTransport.setString(message, RelayTransport.PEERID_ELEMENT, server.peerId);
1239 RelayTransport.setString(message, RelayTransport.RELAY_ADV_ELEMENT, myAdv.toString());
1241 // This is a response. New servers: do not respond! Old
1242 // servers won't respond anyway.
1243 RelayTransport.setString(message, RelayTransport.RESPONSE_ELEMENT, "t");
1245 retPipe.send(message);
1247 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1248 LOG.fine("Responded");
1251 } catch (IOException e) {
1252 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1253 LOG.log(Level.FINE, "Could not send reply on pipe ", e);
1257 if (retPipe != null) {
1269 OutputPipe outputPipe = null;
1270 PipeService pipeService = server.group.getPipeService();
1272 while (doRun && inputPipe == null) {
1274 inputPipe = pipeService.createInputPipe(pipeAdv, this);
1275 } catch (IOException e) {
1276 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1277 LOG.fine("Could not create input pipe, try again");
1279 } catch (IllegalStateException e) {
1280 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1281 LOG.fine("Pipe Service not ready yet, try again");
1286 Thread.sleep(TimeUtils.ASECOND);
1287 } catch (InterruptedException e) {
1288 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1289 LOG.fine("wait interrupted");
1294 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1295 LOG.fine("Created input pipe");
1298 while (doRun && outputPipe == null) {
1300 outputPipe = pipeService.createOutputPipe(pipeAdv, 5 * TimeUtils.ASECOND);
1301 } catch (IOException e) {
1302 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1303 LOG.fine("Could not create output pipe, try again");
1305 } catch (IllegalStateException e) {
1306 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1307 LOG.fine("Pipe Service not ready yet, try again");
1312 Thread.sleep(TimeUtils.ASECOND);
1313 } catch (InterruptedException e) {
1314 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1315 LOG.fine("wait interrupted ");
1320 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1321 LOG.fine("Created output pipe");
1324 // Wait a little before mcasting our hello.
1325 // We depend on the rendezvous infrastructure for it to
1326 // work. It's pretty important to get the first one out
1327 // so that we may get a response from others. After that
1328 // the interval is very long (and its computation an total
1329 // nonsense) and so others do not talk much
1330 // either. We want to learn at least one other relay early on.
1331 // FIXME: jice@jxta.org 20030131 - We realy need to switch to
1332 // using peerview. It does all of that correctly.
1335 Thread.sleep(10 * TimeUtils.ASECOND);
1336 } catch (InterruptedException e) {
1337 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1338 LOG.fine("wait interrupted");
1343 RdvAdvertisement adv = RelayServer.createRdvAdvertisement(server.group.getPeerAdvertisement(), server.serviceName);
1345 // Make sure that the version that can be discovered
1348 server.discoveryService.publish(adv);
1349 } catch (IOException e) {
1350 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1351 LOG.log(Level.FINE, "Could not publish Relay RdvAdvertisement", e);
1356 // create a new cache message
1357 Message message = new Message();
1359 RelayTransport.setString(message, RelayTransport.PEERID_ELEMENT, server.peerId);
1360 RelayTransport.setString(message, RelayTransport.RELAY_ADV_ELEMENT, adv.toString());
1363 outputPipe.send(message);
1364 } catch (IOException e) {
1365 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1366 LOG.log(Level.FINE, "Could not send message on pipe ", e);
1371 long sleepTime = server.minBroadcastInterval
1372 + ((server.relayedClients.size() + 1) * 100 / (server.maxClients + 1)) * server.minBroadcastInterval;
1374 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1375 LOG.fine("sleepTime=" + sleepTime);
1379 Thread.sleep(sleepTime);
1380 } catch (InterruptedException e) {
1381 Thread.interrupted();
1385 if (System.currentTimeMillis() > server.refreshTime) {
1386 server.refreshTime = System.currentTimeMillis() + ACL_REFRESH_PERIOD;
1387 if (server.aclFile.lastModified() > server.aclFileLastModified) {
1388 server.aclFileLastModified = server.aclFile.lastModified();
1389 server.acl.refresh(server.aclFile);
1392 } catch (Throwable all) {
1393 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
1394 LOG.log(Level.SEVERE, "Uncaught Throwable in thread :" + Thread.currentThread().getName(), all);
1398 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
1399 LOG.info("Cache thread quitting.");
1404 protected void startCache() {
1406 cacheThread = new Thread(server.group.getHomeThreadGroup(), this, "RelayCache Worker Thread for " + server.publicAddress);
1407 cacheThread.setDaemon(true);
1408 cacheThread.start();
1411 protected void stopCache() {
1414 if (inputPipe != null) {
1418 cacheThread.interrupt();
1424 * Sends a message on an synchronous messenger.
1426 static class BGSend extends Thread {
1433 BGSend(Messenger mr, Message ms, String sn, String ps) {
1434 super("Relay Background Sender");
1449 mr.sendMessage(ms, sn, ps);
1450 } catch (IOException e) {
1451 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1452 LOG.log(Level.WARNING, "Failed sending response " + ms + " to " + ps, e);
1454 } catch (Throwable all) {
1455 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
1456 LOG.log(Level.SEVERE, "Uncaught Throwable in thread :" + Thread.currentThread().getName(), all);
1463 private static RdvAdvertisement createRdvAdvertisement(PeerAdvertisement padv, String name) {
1465 // FIX ME: 10/19/2002 lomax@jxta.org. We need to properly set up the service ID. Unfortunately
1466 // this current implementation of the PeerView takes a String as a service name and not its ID.
1467 // Since currently, there is only PeerView per group (all peerviews share the same "service", this
1468 // is not a problem, but that will have to be fixed eventually.
1470 // create a new RdvAdvertisement
1471 RdvAdvertisement rdv = (RdvAdvertisement) AdvertisementFactory.newAdvertisement(
1472 RdvAdvertisement.getAdvertisementType());
1474 rdv.setPeerID(padv.getPeerID());
1475 rdv.setGroupID(padv.getPeerGroupID());
1476 rdv.setServiceName(name);
1477 rdv.setName(padv.getName());
1479 RouteAdvertisement ra = EndpointUtils.extractRouteAdv(padv);
1482 // No route available
1486 // Insert it into the RdvAdvertisement.
1487 rdv.setRouteAdv(ra);
1490 } catch (Exception ez) {
1491 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1492 LOG.log(Level.WARNING, "Cannot create Local RdvAdvertisement: ", ez);