2 * Copyright (c) 2001-2007 Sun Microsystems, Inc. All rights reserved.
4 * The Sun Project JXTA(TM) Software License
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
9 * 1. Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
16 * 3. The end-user documentation included with the redistribution, if any, must
17 * include the following acknowledgment: "This product includes software
18 * developed by Sun Microsystems, Inc. for JXTA(TM) technology."
19 * Alternately, this acknowledgment may appear in the software itself, if
20 * and wherever such third-party acknowledgments normally appear.
22 * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must
23 * not be used to endorse or promote products derived from this software
24 * without prior written permission. For written permission, please contact
25 * Project JXTA at http://www.jxta.org.
27 * 5. Products derived from this software may not be called "JXTA", nor may
28 * "JXTA" appear in their name, without prior written permission of Sun.
30 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
31 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
32 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN
33 * MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
34 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
36 * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
37 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
38 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
39 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41 * JXTA is a registered trademark of Sun Microsystems, Inc. in the United
42 * States and other countries.
44 * Please see the license information page at :
45 * <http://www.jxta.org/project/www/license.html> for instructions on use of
46 * the license in source files.
48 * ====================================================================
50 * This software consists of voluntary contributions made by many individuals
51 * on behalf of Project JXTA. For more information on Project JXTA, please see
52 * http://www.jxta.org.
54 * This license is based on the BSD license adopted by the Apache Foundation.
56 package net.jxta.impl.endpoint.servlethttp;
58 import java.util.Timer;
59 import java.util.TimerTask;
61 import java.io.IOException;
63 import java.util.logging.Level;
64 import net.jxta.logging.Logging;
65 import java.util.logging.Logger;
67 import net.jxta.endpoint.EndpointAddress;
68 import net.jxta.endpoint.Message;
69 import net.jxta.endpoint.MessageElement;
70 import net.jxta.endpoint.StringMessageElement;
71 import net.jxta.peergroup.PeerGroupID;
73 import net.jxta.impl.endpoint.BlockingMessenger;
74 import net.jxta.impl.endpoint.EndpointServiceImpl;
75 import net.jxta.impl.util.TimeUtils;
78 * Simple messenger that waits for a message to give back to the requesting client
80 * <p/>This messenger is not entirely thread-safe. You should not use any
81 * of the <code>sendMessage</code> methods from more than one thread.
84 final class HttpServletMessenger extends BlockingMessenger {
89 private final static transient Logger LOG = Logger.getLogger(HttpServletMessenger.class.getName());
91 // We need an explicit idle state. outgoingMessage being null is not enough
92 // because there is an intermediate state where the http servlet must know
93 // that there is nothing new to be sent but the relay side must also know
94 // that status has not been collected yet (the messenger is not ready to be
95 // reused). We use outgoingMessage == null to know that the message
96 // has already been picked-up by the servlet thread, and
97 // sendResult != IDLE to know that the result has not yet been collected
98 // by the relay sender thread.
100 private final static int SEND_IDLE = 0;
101 private final static int SEND_INPROGRESS = 1;
102 private final static int SEND_SUCCESS = 2;
103 private final static int SEND_FAIL = 3;
104 private final static int SEND_TOOLONG = 4;
106 private final static long MAX_SENDING_BLOCK = 2 * TimeUtils.AMINUTE;
107 private final static long MAX_SENDING_WAIT = 3 * TimeUtils.ASECOND;
109 private final static EndpointAddress nullEndpointAddr = new EndpointAddress("http", "0.0.0.0:0", null, null);
111 private final static Timer closeMessengerTimer = new Timer("HttpServletMessenger Expiration timer", true);
113 private final EndpointAddress logicalAddress;
114 private final MessageElement srcAddressElement;
115 private ScheduledExipry expirationTask;
118 * The message "queue"
120 private Message outgoingMessage = null;
122 private int sendResult = SEND_IDLE;
123 private long sendingSince = 0;
126 * Allows us to schedule the closing of a messenger.
128 private static class ScheduledExipry extends TimerTask {
131 * The messenger we will be expiring.
133 HttpServletMessenger messenger;
135 ScheduledExipry(HttpServletMessenger toExpire) {
136 messenger = toExpire;
143 public boolean cancel() {
144 // It is important we clear the messenger because Timer doesn't
145 // remove cancelled TimerTasks from it's queue until they fire. This
146 // means that the Messenger would not be GCed until the TimerTask
149 boolean result = super.cancel();
151 // take the opportunity to also purge canceled tasks
152 closeMessengerTimer.purge();
162 HttpServletMessenger temp = messenger;
169 } catch (Throwable all) {
170 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
171 LOG.log(Level.SEVERE, "Uncaught Throwable in timer task :" + Thread.currentThread().getName(), all);
178 * Standard constructor.
180 * @param peerGroupID the peer group id
181 * @param srcAddress source address
182 * @param logicalAddress logical address
183 * @param validFor validity in milliseconds
185 HttpServletMessenger(PeerGroupID peerGroupID, EndpointAddress srcAddress, EndpointAddress logicalAddress, long validFor) {
187 // We do not use self destruction.
188 super(peerGroupID, nullEndpointAddr, false);
190 this.logicalAddress = logicalAddress;
192 this.srcAddressElement = new StringMessageElement(EndpointServiceImpl.MESSAGE_SOURCE_NAME, srcAddress.toString(), null);
194 if ((0 != validFor) && (validFor < Long.MAX_VALUE)) {
195 expirationTask = new ScheduledExipry(this);
197 closeMessengerTimer.schedule(expirationTask, validFor);
200 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
201 LOG.fine("HttpServletMessenger\n\t" + toString());
209 public synchronized void closeImpl() {
210 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
211 LOG.fine("close\n\t" + toString());
214 ScheduledExipry cancelExpire = expirationTask;
216 expirationTask = null;
217 if (null != cancelExpire) {
218 cancelExpire.cancel();
230 public EndpointAddress getLogicalDestinationImpl() {
231 return logicalAddress;
238 public boolean isIdleImpl() {
239 // We do not use self destruction.
244 * Send messages. Messages are queued and processed by a thread
245 * running HttpClientConnection.
248 public synchronized void sendMessageBImpl(Message message, String service, String serviceParam) throws IOException {
250 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
251 LOG.fine("Send " + message + " to " + dstAddress.toString() + "\n\t" + toString());
255 IOException failure = new IOException("Messenger was closed, it cannot be used to send messages.");
257 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
258 LOG.log(Level.FINE, failure.getMessage(), failure);
264 // Set the message with the appropriate src and dest address
265 message.replaceMessageElement(EndpointServiceImpl.MESSAGE_SOURCE_NS, srcAddressElement);
267 EndpointAddress destAddressToUse = getDestAddressToUse(service, serviceParam);
269 MessageElement dstAddressElement = new StringMessageElement(EndpointServiceImpl.MESSAGE_DESTINATION_NAME,
270 destAddressToUse.toString(), null);
272 message.replaceMessageElement(EndpointServiceImpl.MESSAGE_DESTINATION_NS, dstAddressElement);
274 // doSend returns false only when this messenger is closed.
275 if (!doSend(message)) {
276 // send message failed
277 IOException failure = new IOException("Messenger was closed, it cannot be used to send messages.");
279 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
280 LOG.log(Level.FINE, "sendMessage failed (messenger closed).\n\t" + toString(), failure);
286 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
287 LOG.fine("sendMessage successful for " + message + "\n\t" + toString());
291 // Must be called from synchronized context only.
292 private boolean doSend(Message message) {
294 // No need to wait for the messenger to be free. Transport
295 // messengers have no obligation to behave nicely if they're
296 // used by mltiple threads. If a thread comes here while
297 // we're already busy sending, then that's a congestion. Just
298 // drop the new message (pretend it went out).
299 // This is not even so nasty, because jetty has a sizeable
300 // output buffer. As long as that buffer is not full, sending
301 // is instantaneou. If sending starts blocking, then we can honestly
308 long now = TimeUtils.timeNow();
310 if (sendResult != SEND_IDLE) {
311 // check if that connection is a lemon
312 if ((sendResult == SEND_TOOLONG) && (now > (sendingSince + MAX_SENDING_BLOCK))) {
318 // put the message on the outgoing "queue" of size 1
319 outgoingMessage = message;
320 sendResult = SEND_INPROGRESS;
323 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
324 LOG.fine("Queued " + message);
327 // notify the servlet if it was waiting for a message
330 // wait for the result of the send Since there is ample
331 // buffering underneath, we're not supposed to wait for long;
332 // unless we outdo the connection, in which case, we might as
333 // well start dropping. No point in blocking.
334 // FIXME: jice@jxta.org - this is a rudimentary fix. We need
335 // something like watchedOutputStream instead. Being forced
336 // to do a two-way handshake with a servlet thread is pretty
337 // ridiculous too. We need a simpler http transport.
339 long absoluteTimeOut = TimeUtils.toAbsoluteTimeMillis(MAX_SENDING_WAIT);
341 while (!isClosed()) {
342 if (sendResult != SEND_INPROGRESS) {
346 long waitfor = TimeUtils.toRelativeTimeMillis(absoluteTimeOut);
354 } catch (InterruptedException e) {
355 Thread.interrupted();
356 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
357 LOG.log(Level.FINE, "InterruptedException timeout = " + MAX_SENDING_WAIT + "\n\t" + toString(), e);
363 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
364 LOG.fine("Got result\n\t" + toString());
367 if (isClosed() && (SEND_INPROGRESS == sendResult)) {
371 // If the operation completed just confirm that we're done
372 // reading the result too. Else mark that we don't care.
373 // When it completes the result will be discarded. By default,
374 // the we return ok. If the operation did not complete fast
375 // enough, that's what we return.
377 boolean result = (sendResult != SEND_FAIL);
379 if (sendResult == SEND_INPROGRESS) {
380 sendResult = SEND_TOOLONG;
381 outgoingMessage = null;
383 sendResult = SEND_IDLE;
386 // Let another contending thread use this messenger.
393 * Retrieve a message from the "queue" of messages for the servlet.
395 * @param timeout Number of milliseconds to wait for a message. Per Java
396 * convention 0 (zero) means wait forever.
397 * @return the message or <code>null</code> if no message was available
398 * before the timeout was reached.
399 * @throws InterruptedException If the thread is interrupted while waiting.
401 protected synchronized Message waitForMessage(long timeout) throws InterruptedException {
402 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
403 LOG.fine("Waiting (" + (0 == timeout ? "forever" : Long.toString(timeout)) + ") for message\n\t" + toString());
408 timeout = Long.MAX_VALUE;
411 long absoluteTimeOut = TimeUtils.toAbsoluteTimeMillis(timeout);
413 while (!isClosed() && (null == outgoingMessage)) {
414 long waitfor = TimeUtils.toRelativeTimeMillis(absoluteTimeOut);
424 Message result = outgoingMessage;
426 outgoingMessage = null; // Msg can only be picked-up once.
428 if (!isClosed() && (result == null)) {
429 // ABSURD, but seems to happen: the message sent was NULL
430 // and the sender thread is waiting for the completion event.
431 // There would be no such thing, but we can make sure it stops
432 // waiting and still leave the messenger in a sane state if there
433 // was no such absurdity going on.
435 sendResult = SEND_IDLE;
439 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
440 LOG.fine("Returning " + result + "\n\t" + toString());
446 protected synchronized void messageSent(boolean wasSuccessful) {
447 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
448 LOG.fine("messageSent(" + wasSuccessful + ")\n\t" + toString());
451 if (SEND_TOOLONG == sendResult) {
452 // No-one cares for the result any more. Let the next send go.
453 sendResult = SEND_IDLE;
455 sendResult = wasSuccessful ? SEND_SUCCESS : SEND_FAIL;
464 * <p/>An implementation for debugging. Do not depend on the format.
467 public String toString() {
468 return "[" + super.toString() + "] isClosed=" + isClosed() + " sendResult=" + sendResult + " outmsg=" + outgoingMessage;