2 * Copyright (c) 2001-2007 Sun Microsystems, Inc. All rights reserved.
4 * The Sun Project JXTA(TM) Software License
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
9 * 1. Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
16 * 3. The end-user documentation included with the redistribution, if any, must
17 * include the following acknowledgment: "This product includes software
18 * developed by Sun Microsystems, Inc. for JXTA(TM) technology."
19 * Alternately, this acknowledgment may appear in the software itself, if
20 * and wherever such third-party acknowledgments normally appear.
22 * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must
23 * not be used to endorse or promote products derived from this software
24 * without prior written permission. For written permission, please contact
25 * Project JXTA at http://www.jxta.org.
27 * 5. Products derived from this software may not be called "JXTA", nor may
28 * "JXTA" appear in their name, without prior written permission of Sun.
30 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
31 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
32 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN
33 * MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
34 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
36 * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
37 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
38 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
39 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41 * JXTA is a registered trademark of Sun Microsystems, Inc. in the United
42 * States and other countries.
44 * Please see the license information page at :
45 * <http://www.jxta.org/project/www/license.html> for instructions on use of
46 * the license in source files.
48 * ====================================================================
50 * This software consists of voluntary contributions made by many individuals
51 * on behalf of Project JXTA. For more information on Project JXTA, please see
52 * http://www.jxta.org.
54 * This license is based on the BSD license adopted by the Apache Foundation.
56 package net.jxta.impl.endpoint.tcp;
58 import java.io.IOException;
59 import java.io.InterruptedIOException;
60 import java.net.InetAddress;
61 import java.net.InetSocketAddress;
62 import java.net.UnknownHostException;
63 import java.nio.channels.CancelledKeyException;
64 import java.nio.channels.ClosedChannelException;
65 import java.nio.channels.ClosedSelectorException;
66 import java.nio.channels.IllegalBlockingModeException;
67 import java.nio.channels.SelectionKey;
68 import java.nio.channels.Selector;
69 import java.nio.channels.SocketChannel;
70 import java.nio.channels.spi.SelectorProvider;
71 import java.text.MessageFormat;
72 import java.util.ArrayList;
73 import java.util.Collections;
74 import java.util.Comparator;
75 import java.util.EmptyStackException;
76 import java.util.Enumeration;
77 import java.util.HashSet;
78 import java.util.Iterator;
79 import java.util.List;
81 import java.util.NoSuchElementException;
83 import java.util.Stack;
84 import java.util.concurrent.ConcurrentHashMap;
85 import java.util.concurrent.Executor;
86 import java.util.concurrent.RejectedExecutionException;
87 import java.util.logging.Level;
88 import java.util.logging.Logger;
90 import net.jxta.document.Advertisement;
91 import net.jxta.document.AdvertisementFactory;
92 import net.jxta.document.Attribute;
93 import net.jxta.document.XMLElement;
94 import net.jxta.endpoint.EndpointAddress;
95 import net.jxta.endpoint.EndpointService;
96 import net.jxta.endpoint.MessageReceiver;
97 import net.jxta.endpoint.MessageSender;
98 import net.jxta.endpoint.Messenger;
99 import net.jxta.endpoint.MessengerEvent;
100 import net.jxta.endpoint.MessengerEventListener;
101 import net.jxta.exception.PeerGroupException;
102 import net.jxta.id.ID;
103 import net.jxta.logging.Logging;
104 import net.jxta.meter.MonitorResources;
105 import net.jxta.peer.PeerID;
106 import net.jxta.peergroup.PeerGroup;
107 import net.jxta.platform.Module;
108 import net.jxta.protocol.ConfigParams;
109 import net.jxta.protocol.ModuleImplAdvertisement;
110 import net.jxta.protocol.TransportAdvertisement;
112 import net.jxta.impl.endpoint.IPUtils;
113 import net.jxta.impl.endpoint.LoopbackMessenger;
114 import net.jxta.impl.endpoint.transportMeter.TransportBindingMeter;
115 import net.jxta.impl.endpoint.transportMeter.TransportMeter;
116 import net.jxta.impl.endpoint.transportMeter.TransportMeterBuildSettings;
117 import net.jxta.impl.endpoint.transportMeter.TransportServiceMonitor;
118 import net.jxta.impl.meter.MonitorManager;
119 import net.jxta.impl.peergroup.StdPeerGroup;
120 import net.jxta.impl.protocol.TCPAdv;
121 import net.jxta.impl.util.TimeUtils;
125 * This class implements the TCP Message Transport.
127 * @see net.jxta.endpoint.MessageTransport
128 * @see net.jxta.endpoint.MessagePropagater
129 * @see net.jxta.endpoint.MessageReceiver
130 * @see net.jxta.endpoint.MessageSender
131 * @see net.jxta.endpoint.EndpointService
132 * @see <a href="http://spec.jxta.org/v1.0/docbook/JXTAProtocols.html#trans-tcpipt">JXTA Protocols Specification : Standard JXTA Transport Bindings</a>
134 public class TcpTransport implements Module, MessageSender, MessageReceiver {
139 private static final Logger LOG = Logger.getLogger(TcpTransport.class.getName());
142 * The TCP send buffer size.
143 * The size of the buffer used to store outgoing messages
144 * This should be set to the maximum message size (smaller is allowed).
146 static final int SendBufferSize = 64 * 1024; // 64 KBytes
149 * The TCP receive buffer size
151 static final int RecvBufferSize = 64 * 1024; // 64 KBytes
154 * The amount of time the socket "lingers" after we close it locally.
155 * Linger enables the remote socket to finish receiving any pending data
157 * Note: LingerDelay time unit is seconds
159 static final int LingerDelay = 2 * 60;
163 * use the same system property defined by URLconnection, otherwise default to 10 seconds.
165 static int connectionTimeOut = 10 * (int) TimeUtils.ASECOND;
167 // Java's default is 50
168 static final int MaxAcceptCnxBacklog = 50;
170 private String serverName = null;
171 private final List<EndpointAddress> publicAddresses = new ArrayList<EndpointAddress>();
172 private EndpointAddress publicAddress = null;
174 private String interfaceAddressStr;
175 InetAddress usingInterface;
176 private int serverSocketPort;
177 private int restrictionPort = -1;
178 private IncomingUnicastServer unicastServer = null;
180 private boolean isClosed = false;
182 private long messagesSent = 0;
183 private long messagesReceived = 0;
184 private long bytesSent = 0;
185 private long bytesReceived = 0;
186 private long connectionsAccepted = 0;
188 PeerGroup group = null;
189 EndpointService endpoint = null;
192 private String protocolName = "tcp";
193 private TransportMeter unicastTransportMeter;
194 private TransportMeter multicastTransportMeter;
196 private boolean publicAddressOnly = false;
198 private MessengerEventListener messengerEventListener = null;
200 private Thread messengerSelectorThread;
201 Selector messengerSelector = null;
203 private final Map<TcpMessenger, SocketChannel> regisMap = new ConcurrentHashMap<TcpMessenger, SocketChannel>();
204 private final Set<SocketChannel> unregisMap = Collections.synchronizedSet(new HashSet<SocketChannel>());
207 * This is the thread group into which we will place all of the threads
208 * we create. THIS HAS NO EFFECT ON SCHEDULING. Java thread groups are
209 * only for organization and naming.
211 ThreadGroup myThreadGroup = null;
214 * The maximum number of write selectors we will maintain in our cache per
215 * transport instance.
217 protected final static int MAX_WRITE_SELECTORS = 50;
220 * A cache we maintain for selectors writing messages to the socket.
222 private final static Stack<Selector> writeSelectorCache = new Stack<Selector>();
225 * The number of excess write selectors believed to be in the pool.
227 private int extraWriteSelectors = 0;
230 * Construct a new TcpTransport instance
232 public TcpTransport() {
233 // Add some selectors to the pool.
235 for (int i = 0; i < MAX_WRITE_SELECTORS; i++) {
236 writeSelectorCache.add(Selector.open());
238 } catch (IOException ex) {
239 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
240 LOG.severe("Failed adding selector to write selector pool");
245 String connectTOStr = System.getProperty("sun.net.client.defaultConnectTimeout");
247 if (connectTOStr != null) {
248 connectionTimeOut = Integer.parseInt(connectTOStr);
250 } catch (Exception e) {
251 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
252 LOG.warning("Could not parse system property: sun.net.client.defaultConnectTimeout");
258 * Gets the number of 'connectionsAccepted'.
260 * @return the number of 'connectionsAccepted'.
262 public long getConnectionsAccepted() {
263 return connectionsAccepted;
267 * increment the number of connectionsAccepted sent by 1
269 public void incrementConnectionsAccepted() {
270 connectionsAccepted++;
274 * increment the number of messages sent by 1
276 public void incrementMessagesSent() {
281 * increment the number of messages received by 1
283 public void incrementMessagesReceived() {
288 * increment the number of bytes sent
290 * @param bytes the number of bytes to be added
292 public void incrementBytesSent(long bytes) {
297 * increment the number of bytes received
299 * @param bytes the number of bytes to be added
301 public void incrementBytesReceived(long bytes) {
302 bytesReceived += bytes;
306 * Gets the number of 'messagesSent'.
308 * @return the number of 'messagesSent'.
310 public long getMessagesSent() {
315 * Gets the number of 'messagesReceived'.
317 * @return the number of 'messagesReceived'.
319 public long getMessagesReceived() {
320 return messagesReceived;
324 * Gets the number of 'bytesSent'.
326 * @return the number of 'bytesSent'.
328 public long getBytesSent() {
333 * Gets the number of 'bytesReceived'.
335 * @return the number of 'bytesReceived'.
337 public long getBytesReceived() {
338 return bytesReceived;
344 public boolean equals(Object target) {
345 if (this == target) {
349 if (null == target) {
353 if (target instanceof TcpTransport) {
354 TcpTransport likeMe = (TcpTransport) target;
356 if (!getProtocolName().equals(likeMe.getProtocolName())) {
360 Iterator<EndpointAddress> itsAddrs = likeMe.publicAddresses.iterator();
362 for (EndpointAddress publicAddress1 : publicAddresses) {
363 if (!itsAddrs.hasNext()) {
365 } // it has fewer than i do.
367 EndpointAddress mine = publicAddress1;
368 EndpointAddress its = itsAddrs.next();
370 if (!mine.equals(its)) {
371 // content didnt match
375 // ran out at the same time?
376 return (!itsAddrs.hasNext());
384 public int hashCode() {
385 return getPublicAddress().hashCode();
391 public void init(PeerGroup group, ID assignedID, Advertisement impl) throws PeerGroupException {
394 ModuleImplAdvertisement implAdvertisement = (ModuleImplAdvertisement) impl;
396 this.executor = ((StdPeerGroup) group).getExecutor();
398 ConfigParams configAdv = group.getConfigAdvertisement();
400 // Get out invariable parameters from the implAdv
401 XMLElement param = (XMLElement) implAdvertisement.getParam();
404 Enumeration<XMLElement> list = param.getChildren("Proto");
406 if (list.hasMoreElements()) {
407 XMLElement pname = list.nextElement();
408 protocolName = pname.getTextValue();
412 // Get our peer-defined parameters in the configAdv
413 param = (XMLElement) configAdv.getServiceParam(assignedID);
415 throw new IllegalArgumentException(TransportAdvertisement.getAdvertisementType() + " could not be located.");
418 Enumeration<XMLElement> tcpChilds = param.getChildren(TransportAdvertisement.getAdvertisementType());
420 // get the TransportAdv
421 if (tcpChilds.hasMoreElements()) {
422 param = tcpChilds.nextElement();
423 Attribute typeAttr = param.getAttribute("type");
425 if (!TCPAdv.getAdvertisementType().equals(typeAttr.getValue())) {
426 throw new IllegalArgumentException("transport adv is not a " + TCPAdv.getAdvertisementType());
429 if (tcpChilds.hasMoreElements()) {
430 throw new IllegalArgumentException("Multiple transport advs detected for " + assignedID);
433 throw new IllegalArgumentException(TransportAdvertisement.getAdvertisementType() + " could not be located.");
436 Advertisement paramsAdv = null;
439 paramsAdv = AdvertisementFactory.newAdvertisement(param);
440 } catch (NoSuchElementException notThere) {
441 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
442 LOG.log(Level.FINE, "Could not find parameter document", notThere);
446 if (!(paramsAdv instanceof TCPAdv)) {
447 throw new IllegalArgumentException("Provided Advertisement was not a " + TCPAdv.getAdvertisementType());
450 TCPAdv adv = (TCPAdv) paramsAdv;
452 // determine the local interface to use. If the user specifies
453 // one, use that. Otherwise, use the all the available interfaces.
454 interfaceAddressStr = adv.getInterfaceAddress();
455 if (interfaceAddressStr != null) {
457 usingInterface = InetAddress.getByName(interfaceAddressStr);
458 } catch (UnknownHostException failed) {
459 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
460 LOG.warning("Invalid address for local interface address, using default");
462 usingInterface = IPUtils.ANYADDRESS;
465 usingInterface = IPUtils.ANYADDRESS;
468 serverName = adv.getServer();
470 // Even when server is not enabled, we use the serverSocketPort as a
471 // discriminant for the simulated network partitioning, human readable
472 // messages, and a few things of that sort.
473 serverSocketPort = adv.getPort();
475 // should we expose other than a public address if one was specified?
476 publicAddressOnly = adv.getPublicAddressOnly();
479 if (adv.isServerEnabled()) {
481 unicastServer = new IncomingUnicastServer(this, usingInterface, serverSocketPort, adv.getStartPort(), adv.getEndPort());
482 } catch (IOException failed) {
483 throw new PeerGroupException("Failed to open server socket.", failed);
486 InetSocketAddress boundAddress = unicastServer.getLocalSocketAddress();
488 // TODO bondolo 20040628 Save the port back as a preference to TCPAdv
490 if(-1 != adv.getStartPort()) {
491 adv.setPort(boundAddress.getPort());
495 // Build the publicAddresses :
496 // first in the list is the "public server name". We don't try to
497 // resolve this since it might not be resolvable in the context we
498 // are running in, we just assume it's good.
499 if (serverName != null) {
500 // use speced server name.
501 EndpointAddress newAddr = new EndpointAddress(protocolName, serverName, null, null);
502 publicAddresses.add(newAddr);
505 // then add the rest of the local interfaces as appropriate. Unless
506 // we find an non-loopback interface, we're in local only mode.
507 boolean localOnly = true;
509 if (usingInterface.equals(IPUtils.ANYADDRESS)) {
511 Iterator eachLocal = IPUtils.getAllLocalAddresses();
512 List<EndpointAddress> wildAddrs = new ArrayList<EndpointAddress>();
514 while (eachLocal.hasNext()) {
515 InetAddress anAddress = (InetAddress) eachLocal.next();
516 String hostAddress = IPUtils.getHostAddress(anAddress);
517 EndpointAddress newAddr = new EndpointAddress(protocolName,
518 hostAddress + ":" + Integer.toString(boundAddress.getPort()), null, null);
520 // don't add it if its already in the list
521 if (!anAddress.isLoopbackAddress()) {
525 if (!publicAddresses.contains(newAddr)) {
526 wildAddrs.add(newAddr);
530 // we sort them so that later equals() will be deterministic.
531 // the result of IPUtils.getAllLocalAddresses() is not known to
533 Collections.sort(wildAddrs, new Comparator<EndpointAddress>() {
534 public int compare(EndpointAddress one, EndpointAddress two) {
535 return one.toString().compareTo(two.toString());
538 public boolean equals(Object that) {
539 return (this == that);
543 // Add public addresses:
544 // don't add them if we have a hand-set public address and the
545 // publicAddressOnly property is set.
546 if (!(serverName != null && publicAddressOnly)) {
547 publicAddresses.addAll(wildAddrs);
550 // use specified interface
551 if (!usingInterface.isLoopbackAddress()) {
555 String hostAddress = IPUtils.getHostAddress(usingInterface);
556 EndpointAddress newAddr = new EndpointAddress(protocolName,
557 hostAddress + ":" + Integer.toString(boundAddress.getPort()), null, null);
559 // Add public address:
560 // don't add it if its already in the list
561 // don't add it if specified as public address and publicAddressOnly
562 if (!(serverName != null && publicAddressOnly)) {
563 if (!publicAddresses.contains(newAddr)) {
564 publicAddresses.add(newAddr);
569 // If the only available interface is LOOPBACK, then make sure we
570 // use only that (that includes resetting the outgoing/listening
571 // interface from ANYADDRESS to LOOPBACK).
574 usingInterface = IPUtils.LOOPBACK;
575 publicAddresses.clear();
576 String hostAddress = IPUtils.getHostAddress(usingInterface);
577 EndpointAddress pubAddr = new EndpointAddress(protocolName,
578 hostAddress + ":" + Integer.toString(boundAddress.getPort()), null, null);
580 publicAddresses.add(pubAddr);
583 // Set the "preferred" public address. This is the address we will
584 // use for identifying outgoing requests.
585 publicAddress = publicAddresses.get(0);
587 // Only the outgoing interface matters.
588 // Verify that ANY interface does not in fact mean LOOPBACK only. If
589 // that's the case, we want to make that explicit, so that
590 // consistency checks regarding the allowed use of that interface
592 if (usingInterface.equals(IPUtils.ANYADDRESS)) {
593 boolean localOnly = true;
594 Iterator eachLocal = IPUtils.getAllLocalAddresses();
596 while (eachLocal.hasNext()) {
597 InetAddress anAddress = (InetAddress) eachLocal.next();
599 if (!anAddress.isLoopbackAddress()) {
606 usingInterface = IPUtils.LOOPBACK;
610 // The "public" address is just an internal label
611 // it is not usefull to anyone outside.
612 // IMPORTANT: we set the port to zero, to signify that this address
613 // is not realy usable.
614 String hostAddress = IPUtils.getHostAddress(usingInterface);
616 publicAddress = new EndpointAddress(protocolName, hostAddress + ":0", null, null);
619 // Tell tell the world about our configuration.
620 if (Logging.SHOW_CONFIG && LOG.isLoggable(Level.CONFIG)) {
621 StringBuilder configInfo = new StringBuilder("Configuring TCP Message Transport : " + assignedID);
623 if (implAdvertisement != null) {
624 configInfo.append("\n\tImplementation :");
625 configInfo.append("\n\t\tModule Spec ID: ").append(implAdvertisement.getModuleSpecID());
626 configInfo.append("\n\t\tImpl Description : ").append(implAdvertisement.getDescription());
627 configInfo.append("\n\t\tImpl URI : ").append(implAdvertisement.getUri());
628 configInfo.append("\n\t\tImpl Code : ").append(implAdvertisement.getCode());
631 configInfo.append("\n\tGroup Params:");
632 configInfo.append("\n\t\tGroup : ").append(group);
633 configInfo.append("\n\t\tPeer ID: ").append(group.getPeerID());
635 configInfo.append("\n\tConfiguration:");
636 configInfo.append("\n\t\tProtocol: ").append(protocolName);
637 configInfo.append("\n\t\tPublic address: ").append(serverName == null ? "(unspecified)" : serverName);
638 configInfo.append("\n\t\tInterface address: ").append(
639 interfaceAddressStr == null ? "(unspecified)" : interfaceAddressStr);
641 configInfo.append("\n\tConfiguration :");
642 configInfo.append("\n\t\tUsing Interface: ").append(usingInterface.getHostAddress());
644 if (null != unicastServer) {
645 if (-1 == unicastServer.getStartPort()) {
646 configInfo.append("\n\t\tUnicast Server Bind Addr: ").append(usingInterface.getHostAddress()).append(":").append(
649 configInfo.append("\n\t\tUnicast Server Bind Addr: ").append(usingInterface.getHostAddress()).append(":").append(serverSocketPort).append(" [").append(unicastServer.getStartPort()).append("-").append(unicastServer.getEndPort()).append(
652 configInfo.append("\n\t\tUnicast Server Bound Addr: ").append(unicastServer.getLocalSocketAddress());
654 configInfo.append("\n\t\tUnicast Server : disabled");
657 configInfo.append("\n\t\tPublic Addresses: ");
658 configInfo.append("\n\t\t\tDefault Endpoint Addr : ").append(publicAddress);
660 for (EndpointAddress anAddr : publicAddresses) {
661 configInfo.append("\n\t\t\tEndpoint Addr : ").append(anAddr);
663 LOG.config(configInfo.toString());
670 public synchronized int startApp(String[] arg) {
671 endpoint = group.getEndpointService();
673 if (null == endpoint) {
674 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
675 LOG.warning("Stalled until there is an endpoint service");
677 return Module.START_AGAIN_STALLED;
681 messengerSelector = SelectorProvider.provider().openSelector();
682 } catch (IOException e) {
683 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
684 LOG.log(Level.WARNING, "Could not create a messenger selector", e);
688 messengerSelectorThread = new Thread(group.getHomeThreadGroup(), new MessengerSelectorThread(), "TCP Transport MessengerSelectorThread for " + this);
689 messengerSelectorThread.setDaemon(true);
690 messengerSelectorThread.start();
692 // We're fully ready to function.
693 messengerEventListener = endpoint.addMessageTransport(this);
695 if (messengerEventListener == null) {
696 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
697 LOG.severe("Transport registration refused");
702 // Cannot start before registration, we could be announcing new messengers while we
703 // do not exist yet ! (And get an NPE because we do not have the messenger listener set).
705 if (unicastServer != null) {
706 if (!unicastServer.start()) {
707 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
708 LOG.severe("Unable to start TCP Unicast Server");
714 if (TransportMeterBuildSettings.TRANSPORT_METERING) {
715 TransportServiceMonitor transportServiceMonitor = (TransportServiceMonitor) MonitorManager.getServiceMonitor(group,
716 MonitorResources.transportServiceMonitorClassID);
718 if (transportServiceMonitor != null) {
719 unicastTransportMeter = transportServiceMonitor.createTransportMeter("TCP", publicAddress);
725 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
726 LOG.info("TCP Message Transport started.");
728 return Module.START_OK;
734 public synchronized void stopApp() {
741 if (unicastServer != null) {
742 unicastServer.stop();
743 unicastServer = null;
746 Thread temp = messengerSelectorThread;
751 messengerSelector.close();
752 } catch (IOException failed) {
753 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
754 LOG.log(Level.SEVERE, "IO error occured while closing server socket", failed);
759 // Inform the pool that we don't need as many write selectors.
760 synchronized (writeSelectorCache) {
761 extraWriteSelectors += MAX_WRITE_SELECTORS;
764 endpoint.removeMessageTransport(this);
769 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
770 LOG.info(MessageFormat.format("Total bytes sent : {0}", getBytesSent()));
771 LOG.info(MessageFormat.format("Total Messages sent : {0}", getMessagesSent()));
772 LOG.info(MessageFormat.format("Total bytes received : {0}", getBytesReceived()));
773 LOG.info(MessageFormat.format("Total Messages received : {0}", getMessagesReceived()));
774 LOG.info(MessageFormat.format("Total connections accepted : {0}", getConnectionsAccepted()));
776 LOG.info("TCP Message Transport shut down.");
783 public String getProtocolName() {
790 public EndpointAddress getPublicAddress() {
791 return publicAddress;
797 public EndpointService getEndpointService() {
798 return (EndpointService) endpoint.getInterface();
804 public Object transportControl(Object operation, Object Value) {
811 public Iterator<EndpointAddress> getPublicAddresses() {
812 return Collections.unmodifiableList(publicAddresses).iterator();
818 public boolean isConnectionOriented() {
825 public boolean allowsRouting() {
829 public Messenger getMessenger(EndpointAddress dst, Object hintIgnored) {
830 return getMessenger(dst, hintIgnored, true);
836 public Messenger getMessenger(EndpointAddress dst, Object hintIgnored, boolean selfDestruct) {
838 if (!dst.getProtocolName().equalsIgnoreCase(getProtocolName())) {
839 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
840 LOG.warning("Cannot make messenger for protocol: " + dst.getProtocolName());
845 EndpointAddress plainAddr = new EndpointAddress(dst, null, null);
847 // If the destination is one of our addresses including loopback, we
848 // return a loopback messenger.
849 if (publicAddresses.contains(plainAddr)) {
850 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
851 LOG.fine("return LoopbackMessenger for addr : " + dst);
853 return new LoopbackMessenger(group, endpoint, getPublicAddress(), dst,
854 new EndpointAddress("jxta", group.getPeerID().getUniqueValue().toString(), null, null));
858 // Right now we do not want to "announce" outgoing messengers because they get pooled and so must
859 // not be grabbed by a listener. If "announcing" is to be done, that should be by the endpoint
860 // and probably with a subtely different interface.
861 return new TcpMessenger(dst, this, selfDestruct);
862 } catch (Exception caught) {
863 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
864 if (Logging.SHOW_FINER && LOG.isLoggable(Level.FINER)) {
865 LOG.log(Level.FINER, "Could not get messenger for " + dst, caught);
867 LOG.warning("Could not get messenger for " + dst + " : " + caught.getMessage());
870 if (caught instanceof RuntimeException) {
871 throw (RuntimeException) caught;
880 * This implementation tries to open a connection, and after tests the
883 public boolean ping(EndpointAddress addr) {
884 boolean result = false;
885 EndpointAddress endpointAddress;
886 long pingStartTime = 0;
888 if (TransportMeterBuildSettings.TRANSPORT_METERING) {
889 pingStartTime = System.currentTimeMillis();
892 endpointAddress = new EndpointAddress(addr, null, null);
895 // Too bad that this one will not get pooled. On the other hand ping is
896 // not here too stay.
897 TcpMessenger tcpMessenger = new TcpMessenger(endpointAddress, this);
899 if (TransportMeterBuildSettings.TRANSPORT_METERING) {
900 TransportBindingMeter transportBindingMeter = tcpMessenger.getTransportBindingMeter();
902 if (transportBindingMeter != null) {
903 transportBindingMeter.ping(System.currentTimeMillis() - pingStartTime);
907 } catch (Throwable e) {
908 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
909 LOG.log(Level.WARNING, "failure pinging " + addr.toString(), e);
911 if (TransportMeterBuildSettings.TRANSPORT_METERING) {
912 TransportBindingMeter transportBindingMeter = getUnicastTransportBindingMeter(null, endpointAddress);
914 if (transportBindingMeter != null) {
915 transportBindingMeter.pingFailed(System.currentTimeMillis() - pingStartTime);
920 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
921 LOG.fine("ping to " + addr.toString() + " == " + result);
927 * Getter for property 'restrictionPort'.
929 * @return Value for property 'restrictionPort'.
931 int getRestrictionPort() {
932 return restrictionPort;
935 TransportBindingMeter getUnicastTransportBindingMeter(PeerID peerID, EndpointAddress destinationAddress) {
936 if (unicastTransportMeter != null) {
937 return unicastTransportMeter.getTransportBindingMeter(
938 (peerID != null) ? peerID.toString() : TransportMeter.UNKNOWN_PEER, destinationAddress);
944 void messengerReadyEvent(Messenger newMessenger, EndpointAddress connAddr) {
945 messengerEventListener.messengerReady(new MessengerEvent(this, newMessenger, connAddr));
949 * Getter for property 'server'.
951 * @return Value for property 'server'.
953 IncomingUnicastServer getServer() {
954 return unicastServer;
959 * Get a write selector from the cache.
961 * @return A write selector.
962 * @throws InterruptedException If interrupted while waiting for a selector
963 * to become available.
965 Selector getSelector() throws InterruptedException {
966 synchronized (writeSelectorCache) {
967 Selector selector = null;
969 if (!writeSelectorCache.isEmpty()) {
970 selector = writeSelectorCache.pop();
972 } catch (EmptyStackException ese) {
973 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
974 LOG.fine("No write selector available, waiting for one");
979 while (selector == null && attempts < 2) {
980 writeSelectorCache.wait(connectionTimeOut);
982 if (!writeSelectorCache.isEmpty()) {
983 selector = writeSelectorCache.pop();
985 } catch (EmptyStackException ese) {
986 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
987 LOG.log(Level.FINE, "Failed to get a write selector available, waiting for one", ese);
998 * Return the <code>Selector</code> to the cache
1000 * @param selector the selector to put back into the pool
1002 void returnSelector(Selector selector) {
1003 synchronized (writeSelectorCache) {
1004 if (extraWriteSelectors > 0) {
1005 // Allow the selector to be discarded.
1006 extraWriteSelectors--;
1008 writeSelectorCache.push(selector);
1009 // it does not hurt to notify, even if there are no waiters
1010 writeSelectorCache.notify();
1016 * Waits for incoming data on channels and sends it to the appropriate
1019 private class MessengerSelectorThread implements Runnable {
1026 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
1027 LOG.info("MessengerSelectorThread polling started");
1032 int selectedKeys = 0;
1034 // Update channel registerations.
1035 updateChannelRegisterations();
1038 // this can be interrupted through wakeup
1039 selectedKeys = messengerSelector.select();
1040 } catch (CancelledKeyException cke) {
1041 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1042 LOG.log(Level.FINE, "Key was cancelled", cke);
1046 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1047 LOG.fine(MessageFormat.format("MessengerSelector has {0} selected keys", selectedKeys));
1050 if (selectedKeys == 0 && messengerSelector.selectNow() == 0) {
1051 // We were probably just woken.
1055 Set<SelectionKey> keySet = messengerSelector.selectedKeys();
1057 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1058 LOG.fine(MessageFormat.format("KeySet has {0} selected keys", keySet.size()));
1061 Iterator<SelectionKey> it = keySet.iterator();
1063 while (it.hasNext()) {
1064 SelectionKey key = it.next();
1066 // remove it from the SelectedKeys Set
1069 if (key.isValid()) {
1071 if (key.isReadable() && key.channel().isOpen()) {
1072 // ensure this channel is not selected again until the thread is done with it
1073 // TcpMessenger is expected to reset the interestOps back to OP_READ
1074 // Without this, expect multiple threads to execute on the same event, until
1075 // the first thread completes reading all data available
1076 key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
1078 // get the messenger
1079 TcpMessenger msgr = (TcpMessenger) key.attachment();
1083 executor.execute(msgr);
1084 } catch (RejectedExecutionException re) {
1085 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1087 MessageFormat.format("Executor rejected task for messenger :{0}", msgr.toString()), re);
1091 } catch (CancelledKeyException cce) {
1092 //in case the key was canceled after the selection
1095 // unregister it, no need to keep invalid/closed channels around
1097 key.channel().close();
1098 } catch (IOException io) {
1099 // avoids breaking out of the selector loop
1105 } catch (ClosedSelectorException cse) {
1106 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1107 LOG.fine("IO Selector closed");
1109 } catch (InterruptedIOException woken) {
1110 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1111 LOG.log(Level.FINE, "Thread inturrupted", woken);
1113 } catch (IOException e1) {
1114 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1115 LOG.log(Level.WARNING, "An exception occurred while selecting keys", e1);
1117 } catch (SecurityException e2) {
1118 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1119 LOG.log(Level.WARNING, "A security exception occurred while selecting keys", e2);
1124 // XXX 20070205 bondolo What should we do about the channels
1125 // that are still registered with the selector and any pending
1128 } catch (Throwable all) {
1129 if (Logging.SHOW_SEVERE && Logging.SHOW_SEVERE) {
1130 LOG.log(Level.SEVERE, "Uncaught Throwable", all);
1133 messengerSelectorThread = null;
1139 * Registers the channel with the Read selector and attaches the messenger to the channel
1141 * @param channel the socket channel.
1142 * @param messenger the messenger to attach to the channel.
1144 void register(SocketChannel channel, TcpMessenger messenger) {
1145 regisMap.put(messenger, channel);
1146 messengerSelector.wakeup();
1150 * Unregisters the channel with the Read selector
1152 * @param channel the socket channel.
1154 void unregister(SocketChannel channel) {
1155 unregisMap.add(channel);
1156 messengerSelector.wakeup();
1160 * Registers all newly accepted and returned (by TcpMessenger) channels.
1161 * Removes all closing TcpMessengers.
1163 private synchronized void updateChannelRegisterations() {
1165 if (!regisMap.isEmpty() && Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1166 LOG.fine(MessageFormat.format("Registering {0} channels with MessengerSelectorThread", regisMap.size()));
1169 if (!regisMap.isEmpty()) {
1170 Iterator<Map.Entry<TcpMessenger, SocketChannel>> eachMsgr = regisMap.entrySet().iterator();
1172 while (eachMsgr.hasNext()) {
1173 Map.Entry<TcpMessenger, SocketChannel> anEntry = eachMsgr.next();
1174 TcpMessenger msgr = anEntry.getKey();
1175 SocketChannel channel = anEntry.getValue();
1176 SelectionKey key = channel.keyFor(messengerSelector);
1180 key = channel.register(messengerSelector, SelectionKey.OP_READ, msgr);
1182 key.interestOps(key.interestOps() | SelectionKey.OP_READ);
1183 if (Logging.SHOW_FINER && LOG.isLoggable(Level.FINER)) {
1184 LOG.finer(MessageFormat.format("Key interestOps on channel {0}, bit set :{1}", channel, key.interestOps()));
1186 } catch (ClosedChannelException e) {
1187 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.FINE)) {
1188 LOG.log(Level.FINE, "Failed to register Channel with messenger selector", e);
1190 // it's best a new messenger is created when a new messenger is requested
1192 } catch (CancelledKeyException e) {
1193 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1194 LOG.log(Level.FINE, "Key is already cancelled, removing key from registeration map", e);
1196 } catch (IllegalBlockingModeException e) {
1197 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1198 LOG.log(Level.FINE, "Invalid blocking channel mode, closing messenger", e);
1200 // messenger state is unknown
1203 // remove it from the table
1208 // Unregister and close channels.
1209 if (!unregisMap.isEmpty() && Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1210 LOG.fine(MessageFormat.format("Unregistering {0} channels with MessengerSelectorThread", unregisMap.size()));
1212 if (!unregisMap.isEmpty()) {
1213 Iterator<SocketChannel> eachChannel;
1215 synchronized (unregisMap) {
1216 List<SocketChannel> allChannels = new ArrayList<SocketChannel>(unregisMap);
1218 eachChannel = allChannels.iterator();
1221 while (eachChannel.hasNext()) {
1222 SocketChannel aChannel = eachChannel.next();
1223 SelectionKey key = aChannel.keyFor(messengerSelector);
1227 } catch (CancelledKeyException e) {
1228 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1229 LOG.log(Level.FINE, "Key is already cancelled, removing key from registeration map", e);