2 * Copyright (c) 2004-2007 Sun Microsystems, Inc. All rights reserved.
4 * The Sun Project JXTA(TM) Software License
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
9 * 1. Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
16 * 3. The end-user documentation included with the redistribution, if any, must
17 * include the following acknowledgment: "This product includes software
18 * developed by Sun Microsystems, Inc. for JXTA(TM) technology."
19 * Alternately, this acknowledgment may appear in the software itself, if
20 * and wherever such third-party acknowledgments normally appear.
22 * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must
23 * not be used to endorse or promote products derived from this software
24 * without prior written permission. For written permission, please contact
25 * Project JXTA at http://www.jxta.org.
27 * 5. Products derived from this software may not be called "JXTA", nor may
28 * "JXTA" appear in their name, without prior written permission of Sun.
30 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
31 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
32 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN
33 * MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
34 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
36 * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
37 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
38 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
39 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41 * JXTA is a registered trademark of Sun Microsystems, Inc. in the United
42 * States and other countries.
44 * Please see the license information page at :
45 * <http://www.jxta.org/project/www/license.html> for instructions on use of
46 * the license in source files.
48 * ====================================================================
50 * This software consists of voluntary contributions made by many individuals
51 * on behalf of Project JXTA. For more information on Project JXTA, please see
52 * http://www.jxta.org.
54 * This license is based on the BSD license adopted by the Apache Foundation.
57 package net.jxta.endpoint;
60 import net.jxta.util.AbstractSimpleSelectable;
61 import net.jxta.util.SimpleSelectable;
63 import java.io.IOException;
64 import java.io.InterruptedIOException;
68 * An AbstractMessenger is used to implement messengers (for example, by transport modules).
69 * It supplies the convenience, bw compatible, obvious, or otherwise rarely changed methods.
70 * Many method cannot be overloaded in order to ensure standard behaviour.
71 * The rest is left to implementations.
73 * @see net.jxta.endpoint.EndpointService
74 * @see net.jxta.endpoint.EndpointAddress
75 * @see net.jxta.endpoint.Message
77 public abstract class AbstractMessenger extends AbstractSimpleSelectable implements Messenger {
80 * The default Maximum Transmission Unit.
82 protected static final long DEFAULT_MTU = Long.parseLong(System.getProperty("net.jxta.MTU", "65536"));
85 * The destination address of messages sent on this messenger.
87 protected final EndpointAddress dstAddress;
90 * The stateLock that we share with the implementation.
91 * This permits to implement waitState in a totally generic way: waitState depends only on the lock
92 * (provided at construction), and on getState(), supplied by the implementation.
94 private Object stateLock;
97 * Create a new abstract messenger.
99 * Warning: This class needs to know the object on which to waitState must synchronize. It is generally impossible
100 * to pass it at construction because it is not yet constructed. Instead implementations MUST call {@link #setStateLock}
101 * from their constructor.
103 * @param dest who messages should be addressed to
105 public AbstractMessenger(EndpointAddress dest) {
112 * A simple implementation for debugging. Do not depend upon the format.
115 public String toString() {
116 return super.toString() + " {" + dstAddress + "}";
120 * Specifies the object on which waitState must synchronize.
122 * @param stateLock The object on which waitState must synchronize. This has to be the object that gets notified when the
123 * implementation changes its state. Changing state is defined as "any operation that causes the result of the
124 * <code>getState</code> method to change". Implementations that use the MessengerState state machine should typically use the
125 * MessengerState object as their state lock, but it is not assumed.
127 protected void setStateLock(Object stateLock) {
128 this.stateLock = stateLock;
132 * Messenger methods implementations.
138 * This is here for backward compatibility reasons. The notion of long term unemployment still exists, but is no-longer part
139 * of the API. Self closing for unemployment is now a built-in feature of messengers.
142 public final boolean isIdle() {
150 public final boolean isSynchronous() {
157 public final EndpointAddress getDestinationAddress() {
165 public final EndpointAddress getDestinationAddressObject() {
171 * <p/>It is not always enforced. At least this much can always be sent.
173 public long getMTU() {
180 * This is a minimal implementation. It may not detect closure
181 * initiated by the other side unless the messenger was actually used
182 * since. A more accurate (but not mandatory implementation) would
183 * actually go and check the underlying connection, if relevant...unless
184 * breakage initiated by the other side is actually reported asynchronously
185 * when it happens. Breakage detection from the other side need not
186 * be reported atomically with its occurrence. This not very important
187 * since we canonicalize transport messengers and so do not need to
188 * aggressively collect closed ones. When not used, messengers die by themselves.
190 public boolean isClosed() {
191 return (getState() & USABLE) == 0;
197 public final void flush() throws IOException {
198 int currentState = 0;
201 currentState = waitState(IDLE, 0);
202 } catch (InterruptedException ie) {
203 InterruptedIOException iio = new InterruptedIOException("flush() interrupted");
209 if ((currentState & (CLOSED | USABLE)) != 0) {
213 throw new IOException("Messenger was unexpectedly closed.");
219 public final boolean sendMessage(Message msg) throws IOException {
220 return sendMessage(msg, null, null);
227 public void sendMessage(Message msg, String service, String serviceParam, OutgoingMessageEventListener listener) {
228 throw new UnsupportedOperationException("This legacy method is not supported by this messenger.");
234 public final boolean sendMessage(Message msg, String rService, String rServiceParam) throws IOException {
236 // We have to retrieve the failure from the message and throw it if its an IOException, this is what the API
237 // says that this method does.
239 if (sendMessageN(msg, rService, rServiceParam)) {
243 Object failed = msg.getMessageProperty(Messenger.class);
245 if ((failed == null) || !(failed instanceof OutgoingMessageEvent)) {
250 Throwable t = ((OutgoingMessageEvent) failed).getFailure();
253 // Must be saturation, then. (No throw for that).
257 // Now see how we can manage to throw it.
258 if (t instanceof IOException) {
259 throw (IOException) t;
260 } else if (t instanceof RuntimeException) {
261 throw (RuntimeException) t;
262 } else if (t instanceof Error) {
266 IOException failure = new IOException("Failure sending message");
268 failure.initCause(t);
276 * This method synchronizes on the lock object supplied at construction.
278 public final int waitState(int wantedStates, long timeout) throws InterruptedException {
279 synchronized (stateLock) {
281 while ((wantedStates & getState()) == 0) {
288 stateLock.wait(timeout); // let it throw the appropriate error.
291 long start = System.currentTimeMillis();
292 long end = start + timeout;
295 end = Long.MAX_VALUE;
297 long left = end - start;
299 while ((left > 0) && (wantedStates & getState()) == 0) {
301 stateLock.wait(left);
303 left = end - System.currentTimeMillis();
311 * SimpleSelectable implementation.
315 * Implements a default for all AbstractMessengers: mirror the event to our selectors. This is what is needed by all the
316 * known AbstractMessengers that register themselves somewhere. (That is ChannelMessengers).
317 * FIXME - jice@jxta.org 20040413: Not sure that this is the best default.
319 * @param changedObject Ignored.
321 public void itemChanged(SimpleSelectable changedObject) {