]> sjero.net Git - linphone/blob - p2pproxy/dependencies-src/jxse-src-2.5/impl/src/net/jxta/impl/endpoint/tcp/TcpTransport.java
remove mediastreamer2 and add it as a submodule instead.
[linphone] / p2pproxy / dependencies-src / jxse-src-2.5 / impl / src / net / jxta / impl / endpoint / tcp / TcpTransport.java
1 /*
2  * Copyright (c) 2001-2007 Sun Microsystems, Inc.  All rights reserved.
3  *  
4  *  The Sun Project JXTA(TM) Software License
5  *  
6  *  Redistribution and use in source and binary forms, with or without 
7  *  modification, are permitted provided that the following conditions are met:
8  *  
9  *  1. Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *  
12  *  2. Redistributions in binary form must reproduce the above copyright notice, 
13  *     this list of conditions and the following disclaimer in the documentation 
14  *     and/or other materials provided with the distribution.
15  *  
16  *  3. The end-user documentation included with the redistribution, if any, must 
17  *     include the following acknowledgment: "This product includes software 
18  *     developed by Sun Microsystems, Inc. for JXTA(TM) technology." 
19  *     Alternately, this acknowledgment may appear in the software itself, if 
20  *     and wherever such third-party acknowledgments normally appear.
21  *  
22  *  4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must 
23  *     not be used to endorse or promote products derived from this software 
24  *     without prior written permission. For written permission, please contact 
25  *     Project JXTA at http://www.jxta.org.
26  *  
27  *  5. Products derived from this software may not be called "JXTA", nor may 
28  *     "JXTA" appear in their name, without prior written permission of Sun.
29  *  
30  *  THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
31  *  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND 
32  *  FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN 
33  *  MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
34  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 
35  *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, 
36  *  OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
37  *  LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
38  *  NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, 
39  *  EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
40  *  
41  *  JXTA is a registered trademark of Sun Microsystems, Inc. in the United 
42  *  States and other countries.
43  *  
44  *  Please see the license information page at :
45  *  <http://www.jxta.org/project/www/license.html> for instructions on use of 
46  *  the license in source files.
47  *  
48  *  ====================================================================
49  *  
50  *  This software consists of voluntary contributions made by many individuals 
51  *  on behalf of Project JXTA. For more information on Project JXTA, please see 
52  *  http://www.jxta.org.
53  *  
54  *  This license is based on the BSD license adopted by the Apache Foundation. 
55  */
56 package net.jxta.impl.endpoint.tcp;
57
58 import java.io.IOException;
59 import java.io.InterruptedIOException;
60 import java.net.InetAddress;
61 import java.net.InetSocketAddress;
62 import java.net.UnknownHostException;
63 import java.nio.channels.CancelledKeyException;
64 import java.nio.channels.ClosedChannelException;
65 import java.nio.channels.ClosedSelectorException;
66 import java.nio.channels.IllegalBlockingModeException;
67 import java.nio.channels.SelectionKey;
68 import java.nio.channels.Selector;
69 import java.nio.channels.SocketChannel;
70 import java.nio.channels.spi.SelectorProvider;
71 import java.text.MessageFormat;
72 import java.util.ArrayList;
73 import java.util.Collections;
74 import java.util.Comparator;
75 import java.util.EmptyStackException;
76 import java.util.Enumeration;
77 import java.util.HashSet;
78 import java.util.Iterator;
79 import java.util.List;
80 import java.util.Map;
81 import java.util.NoSuchElementException;
82 import java.util.Set;
83 import java.util.Stack;
84 import java.util.concurrent.ConcurrentHashMap;
85 import java.util.concurrent.Executor;
86 import java.util.concurrent.RejectedExecutionException;
87 import java.util.logging.Level;
88 import java.util.logging.Logger;
89
90 import net.jxta.document.Advertisement;
91 import net.jxta.document.AdvertisementFactory;
92 import net.jxta.document.Attribute;
93 import net.jxta.document.XMLElement;
94 import net.jxta.endpoint.EndpointAddress;
95 import net.jxta.endpoint.EndpointService;
96 import net.jxta.endpoint.MessageReceiver;
97 import net.jxta.endpoint.MessageSender;
98 import net.jxta.endpoint.Messenger;
99 import net.jxta.endpoint.MessengerEvent;
100 import net.jxta.endpoint.MessengerEventListener;
101 import net.jxta.exception.PeerGroupException;
102 import net.jxta.id.ID;
103 import net.jxta.logging.Logging;
104 import net.jxta.meter.MonitorResources;
105 import net.jxta.peer.PeerID;
106 import net.jxta.peergroup.PeerGroup;
107 import net.jxta.platform.Module;
108 import net.jxta.protocol.ConfigParams;
109 import net.jxta.protocol.ModuleImplAdvertisement;
110 import net.jxta.protocol.TransportAdvertisement;
111
112 import net.jxta.impl.endpoint.IPUtils;
113 import net.jxta.impl.endpoint.LoopbackMessenger;
114 import net.jxta.impl.endpoint.transportMeter.TransportBindingMeter;
115 import net.jxta.impl.endpoint.transportMeter.TransportMeter;
116 import net.jxta.impl.endpoint.transportMeter.TransportMeterBuildSettings;
117 import net.jxta.impl.endpoint.transportMeter.TransportServiceMonitor;
118 import net.jxta.impl.meter.MonitorManager;
119 import net.jxta.impl.peergroup.StdPeerGroup;
120 import net.jxta.impl.protocol.TCPAdv;
121 import net.jxta.impl.util.TimeUtils;
122
123
124 /**
125  * This class implements the TCP Message Transport.
126  *
127  * @see net.jxta.endpoint.MessageTransport
128  * @see net.jxta.endpoint.MessagePropagater
129  * @see net.jxta.endpoint.MessageReceiver
130  * @see net.jxta.endpoint.MessageSender
131  * @see net.jxta.endpoint.EndpointService
132  * @see <a href="http://spec.jxta.org/v1.0/docbook/JXTAProtocols.html#trans-tcpipt">JXTA Protocols Specification : Standard JXTA Transport Bindings</a>
133  */
134 public class TcpTransport implements Module, MessageSender, MessageReceiver {
135
136     /**
137      * Logger
138      */
139     private static final Logger LOG = Logger.getLogger(TcpTransport.class.getName());
140
141     /**
142      * The TCP send buffer size.
143      * The size of the buffer used to store outgoing messages
144      * This should be set to the maximum message size (smaller is allowed).
145      */
146     static final int SendBufferSize = 64 * 1024; // 64 KBytes
147
148     /**
149      * The TCP receive buffer size
150      */
151     static final int RecvBufferSize = 64 * 1024; // 64 KBytes
152
153     /**
154      * The amount of time the socket "lingers" after we close it locally.
155      * Linger enables the remote socket to finish receiving any pending data
156      * at its own rate.
157      * Note: LingerDelay time unit is seconds
158      */
159     static final int LingerDelay = 2 * 60;
160
161     /**
162      * Connection  timeout
163      * use the same system property defined by URLconnection, otherwise default to 10 seconds.
164      */
165     static int connectionTimeOut = 10 * (int) TimeUtils.ASECOND;
166
167     // Java's default is 50
168     static final int MaxAcceptCnxBacklog = 50;
169
170     private String serverName = null;
171     private final List<EndpointAddress> publicAddresses = new ArrayList<EndpointAddress>();
172     private EndpointAddress publicAddress = null;
173
174     private String interfaceAddressStr;
175     InetAddress usingInterface;
176     private int serverSocketPort;
177     private int restrictionPort = -1;
178     private IncomingUnicastServer unicastServer = null;
179
180     private boolean isClosed = false;
181
182     private long messagesSent = 0;
183     private long messagesReceived = 0;
184     private long bytesSent = 0;
185     private long bytesReceived = 0;
186     private long connectionsAccepted = 0;
187
188     PeerGroup group = null;
189     EndpointService endpoint = null;
190     Executor executor;
191
192     private String protocolName = "tcp";
193     private TransportMeter unicastTransportMeter;
194     private TransportMeter multicastTransportMeter;
195
196     private boolean publicAddressOnly = false;
197
198     private MessengerEventListener messengerEventListener = null;
199
200     private Thread messengerSelectorThread;
201     Selector messengerSelector = null;
202
203     private final Map<TcpMessenger, SocketChannel> regisMap = new ConcurrentHashMap<TcpMessenger, SocketChannel>();
204     private final Set<SocketChannel> unregisMap = Collections.synchronizedSet(new HashSet<SocketChannel>());
205
206     /**
207      * This is the thread group into which we will place all of the threads
208      * we create. THIS HAS NO EFFECT ON SCHEDULING. Java thread groups are
209      * only for organization and naming.
210      */
211     ThreadGroup myThreadGroup = null;
212
213     /**
214      * The maximum number of write selectors we will maintain in our cache per
215      * transport instance.
216      */
217     protected final static int MAX_WRITE_SELECTORS = 50;
218
219     /**
220      * A cache we maintain for selectors writing messages to the socket.
221      */
222     private final static Stack<Selector> writeSelectorCache = new Stack<Selector>();
223
224     /**
225      * The number of excess write selectors believed to be in the pool.
226      */
227     private int extraWriteSelectors = 0;
228
229     /**
230      * Construct a new TcpTransport instance
231      */
232     public TcpTransport() {
233         // Add some selectors to the pool.
234         try {
235             for (int i = 0; i < MAX_WRITE_SELECTORS; i++) {
236                 writeSelectorCache.add(Selector.open());
237             }
238         } catch (IOException ex) {
239             if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
240                 LOG.severe("Failed adding selector to  write selector pool");
241             }
242         }
243
244         try {
245             String connectTOStr = System.getProperty("sun.net.client.defaultConnectTimeout");
246
247             if (connectTOStr != null) {
248                 connectionTimeOut = Integer.parseInt(connectTOStr);
249             }
250         } catch (Exception e) {
251             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
252                 LOG.warning("Could not parse system property: sun.net.client.defaultConnectTimeout");
253             }
254         }
255     }
256
257     /**
258      * Gets the number of 'connectionsAccepted'.
259      *
260      * @return the number of 'connectionsAccepted'.
261      */
262     public long getConnectionsAccepted() {
263         return connectionsAccepted;
264     }
265
266     /**
267      * increment the number of connectionsAccepted sent by 1
268      */
269     public void incrementConnectionsAccepted() {
270         connectionsAccepted++;
271     }
272
273     /**
274      * increment the number of messages sent by 1
275      */
276     public void incrementMessagesSent() {
277         messagesSent++;
278     }
279
280     /**
281      * increment the number of messages received by 1
282      */
283     public void incrementMessagesReceived() {
284         messagesReceived++;
285     }
286
287     /**
288      * increment the number of bytes sent
289      *
290      * @param bytes the number of bytes to be added
291      */
292     public void incrementBytesSent(long bytes) {
293         bytesSent += bytes;
294     }
295
296     /**
297      * increment the number of bytes received
298      *
299      * @param bytes the number of bytes to be added
300      */
301     public void incrementBytesReceived(long bytes) {
302         bytesReceived += bytes;
303     }
304
305     /**
306      * Gets the number of 'messagesSent'.
307      *
308      * @return the number of 'messagesSent'.
309      */
310     public long getMessagesSent() {
311         return messagesSent;
312     }
313
314     /**
315      * Gets the number of 'messagesReceived'.
316      *
317      * @return the number of 'messagesReceived'.
318      */
319     public long getMessagesReceived() {
320         return messagesReceived;
321     }
322
323     /**
324      * Gets the number of 'bytesSent'.
325      *
326      * @return the number of 'bytesSent'.
327      */
328     public long getBytesSent() {
329         return bytesSent;
330     }
331
332     /**
333      * Gets the number of 'bytesReceived'.
334      *
335      * @return the number of 'bytesReceived'.
336      */
337     public long getBytesReceived() {
338         return bytesReceived;
339     }
340
341     /**
342      * {@inheritDoc}
343      */
344     public boolean equals(Object target) {
345         if (this == target) {
346             return true;
347         }
348
349         if (null == target) {
350             return false;
351         }
352
353         if (target instanceof TcpTransport) {
354             TcpTransport likeMe = (TcpTransport) target;
355
356             if (!getProtocolName().equals(likeMe.getProtocolName())) {
357                 return false;
358             }
359
360             Iterator<EndpointAddress> itsAddrs = likeMe.publicAddresses.iterator();
361
362             for (EndpointAddress publicAddress1 : publicAddresses) {
363                 if (!itsAddrs.hasNext()) {
364                     return false;
365                 } // it has fewer than i do.
366
367                 EndpointAddress mine = publicAddress1;
368                 EndpointAddress its = itsAddrs.next();
369
370                 if (!mine.equals(its)) {
371                     // content didnt match
372                     return false;
373                 }
374             }
375             // ran out at the same time?
376             return (!itsAddrs.hasNext());
377         }
378         return false;
379     }
380
381     /**
382      * {@inheritDoc}
383      */
384     public int hashCode() {
385         return getPublicAddress().hashCode();
386     }
387
388     /**
389      * {@inheritDoc}
390      */
391     public void init(PeerGroup group, ID assignedID, Advertisement impl) throws PeerGroupException {
392
393         this.group = group;
394         ModuleImplAdvertisement implAdvertisement = (ModuleImplAdvertisement) impl;
395
396         this.executor = ((StdPeerGroup) group).getExecutor();
397
398         ConfigParams configAdv = group.getConfigAdvertisement();
399
400         // Get out invariable parameters from the implAdv
401         XMLElement param = (XMLElement) implAdvertisement.getParam();
402
403         if (param != null) {
404             Enumeration<XMLElement> list = param.getChildren("Proto");
405
406             if (list.hasMoreElements()) {
407                 XMLElement pname = list.nextElement();
408                 protocolName = pname.getTextValue();
409             }
410         }
411
412         // Get our peer-defined parameters in the configAdv
413         param = (XMLElement) configAdv.getServiceParam(assignedID);
414         if (null == param) {
415             throw new IllegalArgumentException(TransportAdvertisement.getAdvertisementType() + " could not be located.");
416         }
417
418         Enumeration<XMLElement> tcpChilds = param.getChildren(TransportAdvertisement.getAdvertisementType());
419
420         // get the TransportAdv
421         if (tcpChilds.hasMoreElements()) {
422             param = tcpChilds.nextElement();
423             Attribute typeAttr = param.getAttribute("type");
424
425             if (!TCPAdv.getAdvertisementType().equals(typeAttr.getValue())) {
426                 throw new IllegalArgumentException("transport adv is not a " + TCPAdv.getAdvertisementType());
427             }
428
429             if (tcpChilds.hasMoreElements()) {
430                 throw new IllegalArgumentException("Multiple transport advs detected for " + assignedID);
431             }
432         } else {
433             throw new IllegalArgumentException(TransportAdvertisement.getAdvertisementType() + " could not be located.");
434         }
435
436         Advertisement paramsAdv = null;
437
438         try {
439             paramsAdv = AdvertisementFactory.newAdvertisement(param);
440         } catch (NoSuchElementException notThere) {
441             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
442                 LOG.log(Level.FINE, "Could not find parameter document", notThere);
443             }
444         }
445
446         if (!(paramsAdv instanceof TCPAdv)) {
447             throw new IllegalArgumentException("Provided Advertisement was not a " + TCPAdv.getAdvertisementType());
448         }
449
450         TCPAdv adv = (TCPAdv) paramsAdv;
451
452         // determine the local interface to use. If the user specifies
453         // one, use that. Otherwise, use the all the available interfaces.
454         interfaceAddressStr = adv.getInterfaceAddress();
455         if (interfaceAddressStr != null) {
456             try {
457                 usingInterface = InetAddress.getByName(interfaceAddressStr);
458             } catch (UnknownHostException failed) {
459                 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
460                     LOG.warning("Invalid address for local interface address, using default");
461                 }
462                 usingInterface = IPUtils.ANYADDRESS;
463             }
464         } else {
465             usingInterface = IPUtils.ANYADDRESS;
466         }
467
468         serverName = adv.getServer();
469
470         // Even when server is not enabled, we use the serverSocketPort as a 
471         // discriminant for the simulated network partitioning, human readable
472         // messages, and a few things of that sort.
473         serverSocketPort = adv.getPort();
474
475         // should we expose other than a public address if one was specified?
476         publicAddressOnly = adv.getPublicAddressOnly();
477
478         // Start the servers
479         if (adv.isServerEnabled()) {
480             try {
481                 unicastServer = new IncomingUnicastServer(this, usingInterface, serverSocketPort, adv.getStartPort(), adv.getEndPort());
482             } catch (IOException failed) {
483                 throw new PeerGroupException("Failed to open server socket.", failed);
484             }
485
486             InetSocketAddress boundAddress = unicastServer.getLocalSocketAddress();
487
488             // TODO bondolo 20040628 Save the port back as a preference to TCPAdv
489             /*
490             if(-1 != adv.getStartPort()) {
491                 adv.setPort(boundAddress.getPort());
492             }
493             */
494
495             // Build the publicAddresses :
496             // first in the list is the "public server name". We don't try to
497             // resolve this since it might not be resolvable in the context we 
498             // are running in, we just assume it's good.
499             if (serverName != null) {
500                 // use speced server name.
501                 EndpointAddress newAddr = new EndpointAddress(protocolName, serverName, null, null);
502                 publicAddresses.add(newAddr);
503             }
504
505             // then add the rest of the local interfaces as appropriate. Unless
506             // we find an non-loopback interface, we're in local only mode.
507             boolean localOnly = true;
508
509             if (usingInterface.equals(IPUtils.ANYADDRESS)) {
510                 // its wildcarded
511                 Iterator eachLocal = IPUtils.getAllLocalAddresses();
512                 List<EndpointAddress> wildAddrs = new ArrayList<EndpointAddress>();
513
514                 while (eachLocal.hasNext()) {
515                     InetAddress anAddress = (InetAddress) eachLocal.next();
516                     String hostAddress = IPUtils.getHostAddress(anAddress);
517                     EndpointAddress newAddr = new EndpointAddress(protocolName,
518                             hostAddress + ":" + Integer.toString(boundAddress.getPort()), null, null);
519
520                     // don't add it if its already in the list
521                     if (!anAddress.isLoopbackAddress()) {
522                         localOnly = false;
523                     }
524
525                     if (!publicAddresses.contains(newAddr)) {
526                         wildAddrs.add(newAddr);
527                     }
528                 }
529
530                 // we sort them so that later equals() will be deterministic.
531                 // the result of IPUtils.getAllLocalAddresses() is not known to 
532                 // be sorted.
533                 Collections.sort(wildAddrs, new Comparator<EndpointAddress>() {
534                     public int compare(EndpointAddress one, EndpointAddress two) {
535                         return one.toString().compareTo(two.toString());
536                     }
537
538                     public boolean equals(Object that) {
539                         return (this == that);
540                     }
541                 });
542
543                 // Add public addresses:
544                 // don't add them if we have a hand-set public address and the
545                 // publicAddressOnly property is set.
546                 if (!(serverName != null && publicAddressOnly)) {
547                     publicAddresses.addAll(wildAddrs);
548                 }
549             } else {
550                 // use specified interface
551                 if (!usingInterface.isLoopbackAddress()) {
552                     localOnly = false;
553                 }
554
555                 String hostAddress = IPUtils.getHostAddress(usingInterface);
556                 EndpointAddress newAddr = new EndpointAddress(protocolName,
557                         hostAddress + ":" + Integer.toString(boundAddress.getPort()), null, null);
558
559                 // Add public address:
560                 // don't add it if its already in the list
561                 // don't add it if specified as public address and publicAddressOnly
562                 if (!(serverName != null && publicAddressOnly)) {
563                     if (!publicAddresses.contains(newAddr)) {
564                         publicAddresses.add(newAddr);
565                     }
566                 }
567             }
568
569             // If the only available interface is LOOPBACK, then make sure we 
570             // use only that (that includes resetting the outgoing/listening 
571             // interface from ANYADDRESS to LOOPBACK).
572
573             if (localOnly) {
574                 usingInterface = IPUtils.LOOPBACK;
575                 publicAddresses.clear();
576                 String hostAddress = IPUtils.getHostAddress(usingInterface);
577                 EndpointAddress pubAddr = new EndpointAddress(protocolName,
578                         hostAddress + ":" + Integer.toString(boundAddress.getPort()), null, null);
579
580                 publicAddresses.add(pubAddr);
581             }
582
583             // Set the "preferred" public address. This is the address we will 
584             // use for identifying outgoing requests.
585             publicAddress = publicAddresses.get(0);
586         } else {
587             // Only the outgoing interface matters.
588             // Verify that ANY interface does not in fact mean LOOPBACK only. If
589             // that's the case, we want to make that explicit, so that 
590             // consistency checks regarding the allowed use of that interface
591             // work properly.
592             if (usingInterface.equals(IPUtils.ANYADDRESS)) {
593                 boolean localOnly = true;
594                 Iterator eachLocal = IPUtils.getAllLocalAddresses();
595
596                 while (eachLocal.hasNext()) {
597                     InetAddress anAddress = (InetAddress) eachLocal.next();
598
599                     if (!anAddress.isLoopbackAddress()) {
600                         localOnly = false;
601                         break;
602                     }
603                 }
604
605                 if (localOnly) {
606                     usingInterface = IPUtils.LOOPBACK;
607                 }
608             }
609
610             // The "public" address is just an internal label
611             // it is not usefull to anyone outside.
612             // IMPORTANT: we set the port to zero, to signify that this address
613             // is not realy usable.
614             String hostAddress = IPUtils.getHostAddress(usingInterface);
615
616             publicAddress = new EndpointAddress(protocolName, hostAddress + ":0", null, null);
617         }
618
619         // Tell tell the world about our configuration.
620         if (Logging.SHOW_CONFIG && LOG.isLoggable(Level.CONFIG)) {
621             StringBuilder configInfo = new StringBuilder("Configuring TCP Message Transport : " + assignedID);
622
623             if (implAdvertisement != null) {
624                 configInfo.append("\n\tImplementation :");
625                 configInfo.append("\n\t\tModule Spec ID: ").append(implAdvertisement.getModuleSpecID());
626                 configInfo.append("\n\t\tImpl Description : ").append(implAdvertisement.getDescription());
627                 configInfo.append("\n\t\tImpl URI : ").append(implAdvertisement.getUri());
628                 configInfo.append("\n\t\tImpl Code : ").append(implAdvertisement.getCode());
629             }
630
631             configInfo.append("\n\tGroup Params:");
632             configInfo.append("\n\t\tGroup : ").append(group);
633             configInfo.append("\n\t\tPeer ID: ").append(group.getPeerID());
634
635             configInfo.append("\n\tConfiguration:");
636             configInfo.append("\n\t\tProtocol: ").append(protocolName);
637             configInfo.append("\n\t\tPublic address: ").append(serverName == null ? "(unspecified)" : serverName);
638             configInfo.append("\n\t\tInterface address: ").append(
639                     interfaceAddressStr == null ? "(unspecified)" : interfaceAddressStr);
640
641             configInfo.append("\n\tConfiguration :");
642             configInfo.append("\n\t\tUsing Interface: ").append(usingInterface.getHostAddress());
643
644             if (null != unicastServer) {
645                 if (-1 == unicastServer.getStartPort()) {
646                     configInfo.append("\n\t\tUnicast Server Bind Addr: ").append(usingInterface.getHostAddress()).append(":").append(
647                             serverSocketPort);
648                 } else {
649                     configInfo.append("\n\t\tUnicast Server Bind Addr: ").append(usingInterface.getHostAddress()).append(":").append(serverSocketPort).append(" [").append(unicastServer.getStartPort()).append("-").append(unicastServer.getEndPort()).append(
650                             "]");
651                 }
652                 configInfo.append("\n\t\tUnicast Server Bound Addr: ").append(unicastServer.getLocalSocketAddress());
653             } else {
654                 configInfo.append("\n\t\tUnicast Server : disabled");
655             }
656
657             configInfo.append("\n\t\tPublic Addresses: ");
658             configInfo.append("\n\t\t\tDefault Endpoint Addr : ").append(publicAddress);
659
660             for (EndpointAddress anAddr : publicAddresses) {
661                 configInfo.append("\n\t\t\tEndpoint Addr : ").append(anAddr);
662             }
663             LOG.config(configInfo.toString());
664         }
665     }
666
667     /**
668      * {@inheritDoc}
669      */
670     public synchronized int startApp(String[] arg) {
671         endpoint = group.getEndpointService();
672
673         if (null == endpoint) {
674             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
675                 LOG.warning("Stalled until there is an endpoint service");
676             }
677             return Module.START_AGAIN_STALLED;
678         }
679
680         try {
681             messengerSelector = SelectorProvider.provider().openSelector();
682         } catch (IOException e) {
683             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
684                 LOG.log(Level.WARNING, "Could not create a messenger selector", e);
685             }
686         }
687
688         messengerSelectorThread = new Thread(group.getHomeThreadGroup(), new MessengerSelectorThread(), "TCP Transport MessengerSelectorThread for " + this);
689         messengerSelectorThread.setDaemon(true);
690         messengerSelectorThread.start();
691
692         // We're fully ready to function.
693         messengerEventListener = endpoint.addMessageTransport(this);
694
695         if (messengerEventListener == null) {
696             if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
697                 LOG.severe("Transport registration refused");
698             }
699             return -1;
700         }
701
702         // Cannot start before registration, we could be announcing new messengers while we
703         // do not exist yet ! (And get an NPE because we do not have the messenger listener set).
704
705         if (unicastServer != null) {
706             if (!unicastServer.start()) {
707                 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
708                     LOG.severe("Unable to start TCP Unicast Server");
709                 }
710                 return -1;
711             }
712         }
713
714         if (TransportMeterBuildSettings.TRANSPORT_METERING) {
715             TransportServiceMonitor transportServiceMonitor = (TransportServiceMonitor) MonitorManager.getServiceMonitor(group,
716                     MonitorResources.transportServiceMonitorClassID);
717
718             if (transportServiceMonitor != null) {
719                 unicastTransportMeter = transportServiceMonitor.createTransportMeter("TCP", publicAddress);
720             }
721         }
722
723         isClosed = false;
724
725         if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
726             LOG.info("TCP Message Transport started.");
727         }
728         return Module.START_OK;
729     }
730
731     /**
732      * {@inheritDoc}
733      */
734     public synchronized void stopApp() {
735         if (isClosed) {
736             return;
737         }
738
739         isClosed = true;
740
741         if (unicastServer != null) {
742             unicastServer.stop();
743             unicastServer = null;
744         }
745
746         Thread temp = messengerSelectorThread;
747
748         if (null != temp) {
749             temp.interrupt();
750             try {
751                 messengerSelector.close();
752             } catch (IOException failed) {
753                 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
754                     LOG.log(Level.SEVERE, "IO error occured while closing server socket", failed);
755                 }
756             }
757         }
758
759         // Inform the pool that we don't need as many write selectors.
760         synchronized (writeSelectorCache) {
761             extraWriteSelectors += MAX_WRITE_SELECTORS;
762         }
763
764         endpoint.removeMessageTransport(this);
765
766         endpoint = null;
767         group = null;
768
769         if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
770             LOG.info(MessageFormat.format("Total bytes sent : {0}", getBytesSent()));
771             LOG.info(MessageFormat.format("Total Messages sent : {0}", getMessagesSent()));
772             LOG.info(MessageFormat.format("Total bytes received : {0}", getBytesReceived()));
773             LOG.info(MessageFormat.format("Total Messages received : {0}", getMessagesReceived()));
774             LOG.info(MessageFormat.format("Total connections accepted : {0}", getConnectionsAccepted()));
775
776             LOG.info("TCP Message Transport shut down.");
777         }
778     }
779
780     /**
781      * {@inheritDoc}
782      */
783     public String getProtocolName() {
784         return protocolName;
785     }
786
787     /**
788      * {@inheritDoc}
789      */
790     public EndpointAddress getPublicAddress() {
791         return publicAddress;
792     }
793
794     /**
795      * {@inheritDoc}
796      */
797     public EndpointService getEndpointService() {
798         return (EndpointService) endpoint.getInterface();
799     }
800
801     /**
802      * {@inheritDoc}
803      */
804     public Object transportControl(Object operation, Object Value) {
805         return null;
806     }
807
808     /**
809      * {@inheritDoc}
810      */
811     public Iterator<EndpointAddress> getPublicAddresses() {
812         return Collections.unmodifiableList(publicAddresses).iterator();
813     }
814
815     /**
816      * {@inheritDoc}
817      */
818     public boolean isConnectionOriented() {
819         return true;
820     }
821
822     /**
823      * {@inheritDoc}
824      */
825     public boolean allowsRouting() {
826         return true;
827     }
828
829     public Messenger getMessenger(EndpointAddress dst, Object hintIgnored) {
830         return getMessenger(dst, hintIgnored, true);
831     }
832
833     /**
834      * {@inheritDoc}
835      */
836     public Messenger getMessenger(EndpointAddress dst, Object hintIgnored, boolean selfDestruct) {
837
838         if (!dst.getProtocolName().equalsIgnoreCase(getProtocolName())) {
839             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
840                 LOG.warning("Cannot make messenger for protocol: " + dst.getProtocolName());
841             }
842             return null;
843         }
844
845         EndpointAddress plainAddr = new EndpointAddress(dst, null, null);
846
847         // If the destination is one of our addresses including loopback, we 
848         // return a loopback messenger.
849         if (publicAddresses.contains(plainAddr)) {
850             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
851                 LOG.fine("return LoopbackMessenger for addr : " + dst);
852             }
853             return new LoopbackMessenger(group, endpoint, getPublicAddress(), dst,
854                     new EndpointAddress("jxta", group.getPeerID().getUniqueValue().toString(), null, null));
855         }
856
857         try {
858             // Right now we do not want to "announce" outgoing messengers because they get pooled and so must
859             // not be grabbed by a listener. If "announcing" is to be done, that should be by the endpoint
860             // and probably with a subtely different interface.
861             return new TcpMessenger(dst, this, selfDestruct);
862         } catch (Exception caught) {
863             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
864                 if (Logging.SHOW_FINER && LOG.isLoggable(Level.FINER)) {
865                     LOG.log(Level.FINER, "Could not get messenger for " + dst, caught);
866                 } else {
867                     LOG.warning("Could not get messenger for " + dst + " : " + caught.getMessage());
868                 }
869             }
870             if (caught instanceof RuntimeException) {
871                 throw (RuntimeException) caught;
872             }
873             return null;
874         }
875     }
876
877     /**
878      * {@inheritDoc}
879      * <p/>
880      * This implementation tries to open a connection, and after tests the
881      * result.
882      */
883     public boolean ping(EndpointAddress addr) {
884         boolean result = false;
885         EndpointAddress endpointAddress;
886         long pingStartTime = 0;
887
888         if (TransportMeterBuildSettings.TRANSPORT_METERING) {
889             pingStartTime = System.currentTimeMillis();
890         }
891
892         endpointAddress = new EndpointAddress(addr, null, null);
893
894         try {
895             // Too bad that this one will not get pooled. On the other hand ping is
896             // not here too stay.
897             TcpMessenger tcpMessenger = new TcpMessenger(endpointAddress, this);
898
899             if (TransportMeterBuildSettings.TRANSPORT_METERING) {
900                 TransportBindingMeter transportBindingMeter = tcpMessenger.getTransportBindingMeter();
901
902                 if (transportBindingMeter != null) {
903                     transportBindingMeter.ping(System.currentTimeMillis() - pingStartTime);
904                 }
905             }
906             result = true;
907         } catch (Throwable e) {
908             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
909                 LOG.log(Level.WARNING, "failure pinging " + addr.toString(), e);
910             }
911             if (TransportMeterBuildSettings.TRANSPORT_METERING) {
912                 TransportBindingMeter transportBindingMeter = getUnicastTransportBindingMeter(null, endpointAddress);
913
914                 if (transportBindingMeter != null) {
915                     transportBindingMeter.pingFailed(System.currentTimeMillis() - pingStartTime);
916                 }
917             }
918         }
919
920         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
921             LOG.fine("ping to " + addr.toString() + " == " + result);
922         }
923         return result;
924     }
925
926     /**
927      * Getter for property 'restrictionPort'.
928      *
929      * @return Value for property 'restrictionPort'.
930      */
931     int getRestrictionPort() {
932         return restrictionPort;
933     }
934
935     TransportBindingMeter getUnicastTransportBindingMeter(PeerID peerID, EndpointAddress destinationAddress) {
936         if (unicastTransportMeter != null) {
937             return unicastTransportMeter.getTransportBindingMeter(
938                     (peerID != null) ? peerID.toString() : TransportMeter.UNKNOWN_PEER, destinationAddress);
939         } else {
940             return null;
941         }
942     }
943
944     void messengerReadyEvent(Messenger newMessenger, EndpointAddress connAddr) {
945         messengerEventListener.messengerReady(new MessengerEvent(this, newMessenger, connAddr));
946     }
947
948     /**
949      * Getter for property 'server'.
950      *
951      * @return Value for property 'server'.
952      */
953     IncomingUnicastServer getServer() {
954         return unicastServer;
955
956     }
957
958     /**
959      * Get a write selector from the cache.
960      *
961      * @return A write selector.
962      * @throws InterruptedException If interrupted while waiting for a selector
963      *                              to become available.
964      */
965     Selector getSelector() throws InterruptedException {
966         synchronized (writeSelectorCache) {
967             Selector selector = null;
968             try {
969                 if (!writeSelectorCache.isEmpty()) {
970                     selector = writeSelectorCache.pop();
971                 }
972             } catch (EmptyStackException ese) {
973                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
974                     LOG.fine("No write selector available, waiting for one");
975                 }
976             }
977
978             int attempts = 0;
979             while (selector == null && attempts < 2) {
980                 writeSelectorCache.wait(connectionTimeOut);
981                 try {
982                     if (!writeSelectorCache.isEmpty()) {
983                         selector = writeSelectorCache.pop();
984                     }
985                 } catch (EmptyStackException ese) {
986                     if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
987                         LOG.log(Level.FINE, "Failed to get a write selector available, waiting for one", ese);
988                     }
989                 }
990                 attempts++;
991             }
992
993             return selector;
994         }
995     }
996
997     /**
998      * Return the <code>Selector</code> to the cache
999      *
1000      * @param selector the selector to put back into the pool
1001      */
1002     void returnSelector(Selector selector) {
1003         synchronized (writeSelectorCache) {
1004             if (extraWriteSelectors > 0) {
1005                 // Allow the selector to be discarded.
1006                 extraWriteSelectors--;
1007             } else {
1008                 writeSelectorCache.push(selector);
1009                 // it does not hurt to notify, even if there are no waiters
1010                 writeSelectorCache.notify();
1011             }
1012         }
1013     }
1014
1015     /**
1016      * Waits for incoming data on channels and sends it to the appropriate
1017      * messenger object.
1018      */
1019     private class MessengerSelectorThread implements Runnable {
1020
1021         /**
1022          * {@inheritDoc}
1023          */
1024         public void run() {
1025             try {
1026                 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
1027                     LOG.info("MessengerSelectorThread polling started");
1028                 }
1029
1030                 while (!isClosed) {
1031                     try {
1032                         int selectedKeys = 0;
1033
1034                         // Update channel registerations.
1035                         updateChannelRegisterations();
1036
1037                         try {
1038                             // this can be interrupted through wakeup
1039                             selectedKeys = messengerSelector.select();
1040                         } catch (CancelledKeyException cke) {
1041                             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1042                                 LOG.log(Level.FINE, "Key was cancelled", cke);
1043                             }
1044                         }
1045
1046                         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1047                             LOG.fine(MessageFormat.format("MessengerSelector has {0} selected keys", selectedKeys));
1048                         }
1049
1050                         if (selectedKeys == 0 && messengerSelector.selectNow() == 0) {
1051                             // We were probably just woken.
1052                             continue;
1053                         }
1054
1055                         Set<SelectionKey> keySet = messengerSelector.selectedKeys();
1056
1057                         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1058                             LOG.fine(MessageFormat.format("KeySet has {0} selected keys", keySet.size()));
1059                         }
1060
1061                         Iterator<SelectionKey> it = keySet.iterator();
1062
1063                         while (it.hasNext()) {
1064                             SelectionKey key = it.next();
1065
1066                             // remove it from the SelectedKeys Set
1067                             it.remove();
1068
1069                             if (key.isValid()) {
1070                                 try {
1071                                     if (key.isReadable() && key.channel().isOpen()) {
1072                                         // ensure this channel is not selected again until the thread is done with it
1073                                         // TcpMessenger is expected to reset the interestOps back to OP_READ
1074                                         // Without this, expect multiple threads to execute on the same event, until
1075                                         // the first thread completes reading all data available
1076                                         key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
1077
1078                                         // get the messenger
1079                                         TcpMessenger msgr = (TcpMessenger) key.attachment();
1080
1081                                         // process the data
1082                                         try {
1083                                             executor.execute(msgr);
1084                                         } catch (RejectedExecutionException re) {
1085                                             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1086                                                 LOG.log(Level.FINE,
1087                                                         MessageFormat.format("Executor rejected task for messenger :{0}", msgr.toString()), re);
1088                                             }
1089                                         }
1090                                     }
1091                                 } catch (CancelledKeyException cce) {
1092                                     //in case the key was canceled after the selection
1093                                 }
1094                             } else {
1095                                 // unregister it, no need to keep invalid/closed channels around
1096                                 try {
1097                                     key.channel().close();
1098                                 } catch (IOException io) {
1099                                     // avoids breaking out of the selector loop
1100                                 }
1101                                 key.cancel();
1102                                 key = null;
1103                             }
1104                         }
1105                     } catch (ClosedSelectorException cse) {
1106                         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1107                             LOG.fine("IO Selector closed");
1108                         }
1109                     } catch (InterruptedIOException woken) {
1110                         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1111                             LOG.log(Level.FINE, "Thread inturrupted", woken);
1112                         }
1113                     } catch (IOException e1) {
1114                         if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1115                             LOG.log(Level.WARNING, "An exception occurred while selecting keys", e1);
1116                         }
1117                     } catch (SecurityException e2) {
1118                         if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1119                             LOG.log(Level.WARNING, "A security exception occurred while selecting keys", e2);
1120                         }
1121                     }
1122                 }
1123
1124                 // XXX 20070205 bondolo What should we do about the channels 
1125                 // that are still registered with the selector and any pending 
1126                 // updates?
1127
1128             } catch (Throwable all) {
1129                 if (Logging.SHOW_SEVERE && Logging.SHOW_SEVERE) {
1130                     LOG.log(Level.SEVERE, "Uncaught Throwable", all);
1131                 }
1132             } finally {
1133                 messengerSelectorThread = null;
1134             }
1135         }
1136     }
1137
1138     /**
1139      * Registers the channel with the Read selector and attaches the messenger to the channel
1140      *
1141      * @param channel   the socket channel.
1142      * @param messenger the messenger to attach to the channel.
1143      */
1144     void register(SocketChannel channel, TcpMessenger messenger) {
1145         regisMap.put(messenger, channel);
1146         messengerSelector.wakeup();
1147     }
1148
1149     /**
1150      * Unregisters the channel with the Read selector
1151      *
1152      * @param channel the socket channel.
1153      */
1154     void unregister(SocketChannel channel) {
1155         unregisMap.add(channel);
1156         messengerSelector.wakeup();
1157     }
1158
1159     /**
1160      * Registers all newly accepted and returned (by TcpMessenger) channels.
1161      * Removes all closing TcpMessengers.
1162      */
1163     private synchronized void updateChannelRegisterations() {
1164
1165         if (!regisMap.isEmpty() && Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1166             LOG.fine(MessageFormat.format("Registering {0} channels with MessengerSelectorThread", regisMap.size()));
1167         }
1168
1169         if (!regisMap.isEmpty()) {
1170             Iterator<Map.Entry<TcpMessenger, SocketChannel>> eachMsgr = regisMap.entrySet().iterator();
1171
1172             while (eachMsgr.hasNext()) {
1173                 Map.Entry<TcpMessenger, SocketChannel> anEntry = eachMsgr.next();
1174                 TcpMessenger msgr = anEntry.getKey();
1175                 SocketChannel channel = anEntry.getValue();
1176                 SelectionKey key = channel.keyFor(messengerSelector);
1177
1178                 try {
1179                     if (key == null) {
1180                         key = channel.register(messengerSelector, SelectionKey.OP_READ, msgr);
1181                     }
1182                     key.interestOps(key.interestOps() | SelectionKey.OP_READ);
1183                     if (Logging.SHOW_FINER && LOG.isLoggable(Level.FINER)) {
1184                         LOG.finer(MessageFormat.format("Key interestOps on channel {0}, bit set :{1}", channel, key.interestOps()));
1185                     }
1186                 } catch (ClosedChannelException e) {
1187                     if (Logging.SHOW_WARNING && LOG.isLoggable(Level.FINE)) {
1188                         LOG.log(Level.FINE, "Failed to register Channel with messenger selector", e);
1189                     }
1190                     // it's best a new messenger is created when a new messenger is requested
1191                     msgr.close();
1192                 } catch (CancelledKeyException e) {
1193                     if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1194                         LOG.log(Level.FINE, "Key is already cancelled, removing key from registeration map", e);
1195                     }
1196                 } catch (IllegalBlockingModeException e) {
1197                     if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1198                         LOG.log(Level.FINE, "Invalid blocking channel mode, closing messenger", e);
1199                     }
1200                     // messenger state is unknown
1201                     msgr.close();
1202                 }
1203                 // remove it from the table
1204                 eachMsgr.remove();
1205             }
1206         }
1207
1208         // Unregister and close channels.
1209         if (!unregisMap.isEmpty() && Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1210             LOG.fine(MessageFormat.format("Unregistering {0} channels with MessengerSelectorThread", unregisMap.size()));
1211         }
1212         if (!unregisMap.isEmpty()) {
1213             Iterator<SocketChannel> eachChannel;
1214
1215             synchronized (unregisMap) {
1216                 List<SocketChannel> allChannels = new ArrayList<SocketChannel>(unregisMap);
1217                 unregisMap.clear();
1218                 eachChannel = allChannels.iterator();
1219             }
1220
1221             while (eachChannel.hasNext()) {
1222                 SocketChannel aChannel = eachChannel.next();
1223                 SelectionKey key = aChannel.keyFor(messengerSelector);
1224                 if (null != key) {
1225                     try {
1226                         key.cancel();
1227                     } catch (CancelledKeyException e) {
1228                         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1229                             LOG.log(Level.FINE, "Key is already cancelled, removing key from registeration map", e);
1230                         }
1231                     }
1232                 }
1233             }
1234         }
1235     }
1236 }