2 * Copyright (c) 2001-2007 Sun Microsystems, Inc. All rights reserved.
4 * The Sun Project JXTA(TM) Software License
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
9 * 1. Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
16 * 3. The end-user documentation included with the redistribution, if any, must
17 * include the following acknowledgment: "This product includes software
18 * developed by Sun Microsystems, Inc. for JXTA(TM) technology."
19 * Alternately, this acknowledgment may appear in the software itself, if
20 * and wherever such third-party acknowledgments normally appear.
22 * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must
23 * not be used to endorse or promote products derived from this software
24 * without prior written permission. For written permission, please contact
25 * Project JXTA at http://www.jxta.org.
27 * 5. Products derived from this software may not be called "JXTA", nor may
28 * "JXTA" appear in their name, without prior written permission of Sun.
30 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
31 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
32 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN
33 * MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
34 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
36 * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
37 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
38 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
39 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41 * JXTA is a registered trademark of Sun Microsystems, Inc. in the United
42 * States and other countries.
44 * Please see the license information page at :
45 * <http://www.jxta.org/project/www/license.html> for instructions on use of
46 * the license in source files.
48 * ====================================================================
50 * This software consists of voluntary contributions made by many individuals
51 * on behalf of Project JXTA. For more information on Project JXTA, please see
52 * http://www.jxta.org.
54 * This license is based on the BSD license adopted by the Apache Foundation.
56 package net.jxta.socket;
58 import net.jxta.credential.Credential;
59 import net.jxta.credential.CredentialValidator;
60 import net.jxta.document.AdvertisementFactory;
61 import net.jxta.document.StructuredDocumentFactory;
62 import net.jxta.document.XMLDocument;
63 import net.jxta.endpoint.Message;
64 import net.jxta.endpoint.MessageElement;
65 import net.jxta.logging.Logging;
66 import net.jxta.peergroup.PeerGroup;
67 import net.jxta.pipe.InputPipe;
68 import net.jxta.pipe.PipeMsgEvent;
69 import net.jxta.pipe.PipeMsgListener;
70 import net.jxta.pipe.PipeService;
71 import net.jxta.protocol.PeerAdvertisement;
72 import net.jxta.protocol.PipeAdvertisement;
74 import java.io.IOException;
75 import java.net.ServerSocket;
76 import java.net.Socket;
77 import java.net.SocketAddress;
78 import java.net.SocketException;
79 import java.net.SocketTimeoutException;
80 import java.util.concurrent.ArrayBlockingQueue;
81 import java.util.concurrent.BlockingQueue;
82 import java.util.concurrent.TimeUnit;
83 import java.util.logging.Level;
84 import java.util.logging.Logger;
87 * JxtaServerSocket is a bi-directional Pipe that behaves very much like
88 * ServerSocket. It creates an inputpipe and listens for pipe connection
89 * requests. JxtaServerSocket also defines it own protocol. Requests arrive as
90 * a JXTA Message with the following elements:
92 * <Cred> Credentials which can be used to determine trust </Cred>
94 * <reqPipe> requestor's pipe advertisement </reqPipe>
96 * <remPipe> Remote pipe advertisement </remPipe>
98 * <reqPeer> Remote peer advertisement </reqPeer>
100 * <stream> determine whether the connection is reliable, or not </stream>
102 * <close> close request </close>
104 * <data> Data </data>
106 * JxtaServerSocket then creates a new private pipe, listens for messages on that pipe,
107 * resolves the requestor's pipe, and sends a <remPipe> private pipe created </remotePipe>
108 * advertisement back, where the remote side is resolved.
110 * The {@code accept()} backlog defaults to 50 requests.
112 * The timeout default to 60 seconds, i.e. blocking.
114 public class JxtaServerSocket extends ServerSocket implements PipeMsgListener {
116 private static final Logger LOG = Logger.getLogger(JxtaServerSocket.class.getName());
118 protected static final String MSG_ELEMENT_NAMESPACE = "JXTASOC";
119 protected static final String credTag = "Cred";
120 protected static final String reqPipeTag = "reqPipe";
121 protected static final String remPeerTag = "remPeer";
122 protected static final String remPipeTag = "remPipe";
123 protected static final String dataTag = "data";
124 protected static final String closeTag = "close";
125 protected final static String closeReqValue = "close";
126 protected final static String closeAckValue = "closeACK";
127 protected static final String streamTag = "stream";
129 private final static int DEFAULT_BACKLOG = 50;
130 private final static long DEFAULT_TIMEOUT = 60 * 1000L;
133 * QUEUE_END_MESSAGE is used to signal that the queue has been closed.
135 protected static final Message QUEUE_END_MESSAGE = new Message();
140 protected PeerGroup group;
143 * The pipe advertisement we are serving.
145 protected PipeAdvertisement pipeAdv;
148 * The input pipe on which we listen for connect requests.
150 protected InputPipe serverPipe;
153 * The credential we will present to connect requests.
155 protected Credential localCredential = null;
158 * The number of connect requests we will allow to become backlogged.
160 protected int backlog = DEFAULT_BACKLOG;
163 * The timeout for accept operations.
165 protected long timeout = DEFAULT_TIMEOUT;
167 protected BlockingQueue<Message> queue = null;
168 protected volatile boolean bound = false;
169 protected volatile boolean closed = false;
170 private CredentialValidator credValidator = null;
173 * Default Constructor
175 * A call to {@code bind()} is needed to finish initializing this object.
177 * @throws IOException if an io error occurs
179 public JxtaServerSocket() throws IOException {}
182 * Constructs and binds a JxtaServerSocket using a JxtaSocketAddress as
185 * @param address an instance of JxtaSocketAddress
186 * @throws IOException if an io error occurs
187 * @see net.jxta.socket.JxtaSocketAddress
189 public JxtaServerSocket(SocketAddress address) throws IOException {
190 this(address, DEFAULT_BACKLOG);
194 * Constructs and binds a JxtaServerSocket to the specified pipe.
196 * @param group JXTA PeerGroup
197 * @param pipeAdv PipeAdvertisement on which pipe requests are accepted
198 * @throws IOException if an I/O error occurs
200 public JxtaServerSocket(PeerGroup group, PipeAdvertisement pipeAdv) throws IOException {
201 this(group, pipeAdv, DEFAULT_BACKLOG);
205 * Constructs and binds a JxtaServerSocket using a JxtaSocketAddress as
208 * @param address an instance of JxtaSocketAddress
209 * @param backlog the size of the backlog queue
210 * @throws IOException if an I/O error occurs
211 * @see net.jxta.socket.JxtaSocketAddress
213 public JxtaServerSocket(SocketAddress address, int backlog) throws IOException {
214 this(address, backlog, (int) DEFAULT_TIMEOUT);
218 * Constructor for the JxtaServerSocket object
220 * @param group JXTA PeerGroup
221 * @param pipeAdv PipeAdvertisement on which pipe requests are accepted
222 * @param backlog the maximum length of the queue.
223 * @throws IOException if an I/O error occurs
225 public JxtaServerSocket(PeerGroup group, PipeAdvertisement pipeAdv, int backlog) throws IOException {
226 this(group, pipeAdv, backlog, (int) DEFAULT_TIMEOUT);
230 * Constructs and binds a JxtaServerSocket using a JxtaSocketAddress as
233 * @param address an instance of JxtaSocketAddress
234 * @param backlog the size of the backlog queue
235 * @param timeout connection timeout in milliseconds
236 * @throws IOException if an I/O error occurs
237 * @see net.jxta.socket.JxtaSocketAddress
239 public JxtaServerSocket(SocketAddress address, int backlog, int timeout) throws IOException {
240 setSoTimeout(timeout);
241 bind(address, backlog);
245 * Constructor for the JxtaServerSocket object.
247 * @param group JXTA PeerGroup
248 * @param pipeAdv PipeAdvertisement on which pipe requests are accepted
249 * @param backlog the maximum length of the queue.
250 * @param timeout the specified timeout, in milliseconds
251 * @throws IOException if an I/O error occurs
253 public JxtaServerSocket(PeerGroup group, PipeAdvertisement pipeAdv, int backlog, int timeout) throws IOException {
254 this(group, pipeAdv, backlog, timeout, null);
258 * Constructor for the JxtaServerSocket object.
260 * @param group JXTA PeerGroup
261 * @param pipeAdv PipeAdvertisement on which pipe requests are accepted
262 * @param backlog the maximum length of the queue.
263 * @param timeout the specified timeout, in milliseconds
264 * @param credValidator the CredentialValidator
265 * @throws IOException if an I/O error occurs
267 public JxtaServerSocket(PeerGroup group, PipeAdvertisement pipeAdv, int backlog, int timeout, CredentialValidator credValidator) throws IOException {
268 setSoTimeout(timeout);
269 this.credValidator = credValidator;
270 bind(group, pipeAdv, backlog);
276 * Closes the JxtaServerPipe.
279 protected void finalize() throws Throwable {
282 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
283 LOG.warning("JxtaServerSocket is being finalized without being previously closed. This is likely an application level bug.");
293 public Socket accept() throws IOException {
295 throw new SocketException("Socket is not bound yet");
299 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
300 LOG.fine("Waiting for a connection");
305 throw new SocketException("Socket is closed");
307 Message msg = queue.poll(timeout, TimeUnit.MILLISECONDS);
310 throw new SocketException("Socket is closed");
313 throw new SocketTimeoutException("Timeout reached");
316 if (QUEUE_END_MESSAGE == msg) {
317 throw new SocketException("Socket is closed.");
320 JxtaSocket socket = processMessage(msg);
322 // make sure we have a socket returning
323 if (socket != null) {
324 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
325 LOG.fine("New socket connection " + socket);
328 } else if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
329 LOG.warning("No connection.");
332 } catch (InterruptedException ie) {
333 SocketException interrupted = new SocketException("interrupted");
335 interrupted.initCause(ie);
341 * Binds the <code>JxtaServerSocket</code> to a specific pipe advertisement
343 * @param group JXTA PeerGroup
344 * @param pipeAdv PipeAdvertisement on which pipe requests are accepted
345 * @throws IOException if an I/O error occurs
347 public void bind(PeerGroup group, PipeAdvertisement pipeAdv) throws IOException {
348 bind(group, pipeAdv, DEFAULT_BACKLOG);
352 * Binds the <code>JxtaServerSocket</code> to a specific pipe advertisement
354 * @param group JXTA PeerGroup
355 * @param pipeadv PipeAdvertisement on which pipe requests are accepted
356 * @param backlog the maximum length of the queue.
357 * @throws IOException if an I/O error occurs
359 public void bind(PeerGroup group, PipeAdvertisement pipeadv, int backlog) throws IOException {
360 if (PipeService.PropagateType.equals(pipeadv.getType())) {
361 throw new IOException("Propagate pipe advertisements are not supported");
365 throw new IllegalArgumentException("backlog must be > 0");
368 this.backlog = backlog;
369 queue = new ArrayBlockingQueue<Message>(backlog);
371 this.pipeAdv = pipeadv;
372 PipeService pipeSvc = group.getPipeService();
374 serverPipe = pipeSvc.createInputPipe(pipeadv, this);
381 * Used to bind a JxtaServerSocket created with the no-arg constructor.
384 public void bind(SocketAddress endpoint) throws IOException {
385 bind(endpoint, backlog);
391 * Used to bind a JxtaServerSocket created with the no-arg constructor.
394 public void bind(SocketAddress endpoint, int backlog) throws IOException {
395 if (endpoint instanceof JxtaSocketAddress) {
396 JxtaSocketAddress socketAddress = (JxtaSocketAddress) endpoint;
397 PeerGroup pg = PeerGroup.globalRegistry.lookupInstance(socketAddress.getPeerGroupId());
400 throw new IOException(
401 "Can't connect socket in PeerGroup with id " + socketAddress.getPeerGroupId()
402 + ". No running instance of the group is registered.");
404 bind(pg.getWeakInterface(), socketAddress.getPipeAdv(), backlog);
407 throw new IllegalArgumentException("Unsupported subclass of SocketAddress; " + "use JxtaSocketAddress instead.");
415 public void close() throws IOException {
423 // close all the pipe
431 queue.put(QUEUE_END_MESSAGE);
432 // end queue message is now on the queue, we are done.
434 } catch (InterruptedException woken) {
435 // We MUST put the terminal message onto the queue before
436 // finishing. We won't have a second chance.
437 Thread.interrupted();
440 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
441 LOG.info("Closed : " + this);
446 * @return the server socket's JxtaSocketAddress
447 * @see java.net.ServerSocket#getLocalSocketAddress()
450 public SocketAddress getLocalSocketAddress() {
451 return new JxtaSocketAddress(getGroup(), getPipeAdv());
458 public int getSoTimeout() throws IOException {
460 throw new SocketException("Socket is closed");
463 if (timeout > Integer.MAX_VALUE) {
466 return (int) timeout;
474 public void setSoTimeout(int timeout) throws SocketException {
476 throw new SocketException("Socket is closed");
480 throw new IllegalArgumentException("timeout must be >= 0");
484 this.timeout = Long.MAX_VALUE;
486 this.timeout = (long) timeout;
494 public boolean isBound() {
502 public boolean isClosed() {
507 * Sets whether this socket is currently bound or not. A socket is
508 * considered bound if the local resources required in order to interact
509 * with a remote peer are allocated and open.
511 * @param boundState The new bound state.
513 private synchronized void setBound(boolean boundState) {
514 this.bound = boundState;
518 * Gets the group associated with this JxtaServerSocket object
520 * @return The group value
522 public PeerGroup getGroup() {
527 * Gets the PipeAdvertisement associated with this JxtaServerSocket object
529 * @return The pipeAdv value
531 public PipeAdvertisement getPipeAdv() {
538 public void pipeMsgEvent(PipeMsgEvent event) {
540 // deal with messages as they come in
541 Message message = event.getMessage();
543 if (message == null) {
547 boolean pushed = false;
549 pushed = queue.offer(message, timeout, TimeUnit.MILLISECONDS);
550 } catch (InterruptedException woken) {
551 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
552 LOG.log(Level.FINE, "Interrupted", woken);
556 if (!pushed && Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
557 LOG.warning("backlog queue full, connect request dropped");
562 * processMessage is the main mechanism in establishing bi-directional connections
564 * It accepts connection messages and constructs a JxtaSocket with a ephemeral
565 * InputPipe and a messenger.
567 * @param msg The client connection request (assumed not null)
568 * @return JxtaSocket Which may be null if an error occurs.
570 private JxtaSocket processMessage(Message msg) {
572 PipeAdvertisement remoteEphemeralPipeAdv = null;
573 PeerAdvertisement remotePeerAdv = null;
574 Credential credential = null;
576 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
577 LOG.fine("Processing a connection message : " + msg);
581 MessageElement el = msg.getMessageElement(MSG_ELEMENT_NAMESPACE, reqPipeTag);
583 XMLDocument pipeAdvDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(el);
584 remoteEphemeralPipeAdv = (PipeAdvertisement) AdvertisementFactory.newAdvertisement(pipeAdvDoc);
587 el = msg.getMessageElement(MSG_ELEMENT_NAMESPACE, remPeerTag);
589 XMLDocument peerAdvDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(el);
590 remotePeerAdv = (PeerAdvertisement) AdvertisementFactory.newAdvertisement(peerAdvDoc);
593 el = msg.getMessageElement(MSG_ELEMENT_NAMESPACE, credTag);
596 XMLDocument credDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(el);
597 credential = group.getMembershipService().makeCredential(credDoc);
598 if (!checkCred(credential)) {
599 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
600 LOG.log(Level.WARNING, "Invalid credential");
604 } catch (Exception ignored) {
609 boolean isReliable = false;
611 el = msg.getMessageElement(MSG_ELEMENT_NAMESPACE, streamTag);
613 isReliable = Boolean.valueOf(el.toString());
616 if ((null != remoteEphemeralPipeAdv) && (null != remotePeerAdv)) {
617 return createEphemeralSocket(group, pipeAdv, remoteEphemeralPipeAdv, remotePeerAdv, localCredential, credential, isReliable);
619 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
620 LOG.warning("Connection message did not contain valid connection information.");
624 } catch (IOException e) {
625 // deal with the error
626 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
627 LOG.log(Level.WARNING, "IOException occured", e);
629 } catch (RuntimeException e) {
630 // deal with the error
631 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
632 LOG.log(Level.WARNING, "Exception occured", e);
639 * Invokes the specified CredentialValidator to very a credential
640 * @param cred the credential
641 * @return <code>true</code> if valid, or if no validator is specified
643 private boolean checkCred(Credential cred) {
644 return credValidator == null || credValidator.checkCred(cred);
648 * Construct the emphemeral socket result from accept. This method exists
649 * primarily so that sub-classes can substitute a different JxtaSocket
652 * @param group The peer group for the socket.
653 * @param pipeAdv The public pipe advertisement.
654 * @param remoteEphemeralPipeAdv The pipe advertisement of the remote peer's
656 * @param remotePeerAdv The peer advertisement of the remote peer.
657 * @param localCredential Our credential.
658 * @param credential The credential of the remote peer.
659 * @param isReliable if true, uses the reliability library in non-direct mode
660 * @return The new JxtaSocket instance.
661 * @throws IOException if an io error occurs
663 protected JxtaSocket createEphemeralSocket(PeerGroup group, PipeAdvertisement pipeAdv, PipeAdvertisement remoteEphemeralPipeAdv, PeerAdvertisement remotePeerAdv, Credential localCredential, Credential credential, boolean isReliable) throws IOException {
664 return new JxtaSocket(group, pipeAdv, remoteEphemeralPipeAdv, remotePeerAdv, localCredential, credential, isReliable);
668 * Sets the credential to be used by this socket connection. If no
669 * credentials are set, the default group credential will be used.
671 * @param localCredential The credential to be used for connection responses
672 * or <tt>null</tt> if the default credential is to be used.
674 public void setCredential(Credential localCredential) {
675 this.localCredential = localCredential;
681 * This output is suitable for debugging but should not be parsed. All
682 * of the information is available through other means.
685 public String toString() {
686 StringBuilder result = new StringBuilder();
688 result.append(getClass().getName());
690 result.append(System.identityHashCode(this));
693 result.append(pipeAdv.getPipeID());
696 result.append(isClosed() ? " CLOSED :" : " OPEN :");
697 result.append(isBound() ? " BOUND " : " UNBOUND ");
698 return result.toString();