]> sjero.net Git - linphone/blob - p2pproxy/dependencies-src/jxse-src-2.5/impl/src/net/jxta/impl/endpoint/servlethttp/HttpServletMessenger.java
3d72bf21d6bcd2dffd79b5f923d608d78c9350f6
[linphone] / p2pproxy / dependencies-src / jxse-src-2.5 / impl / src / net / jxta / impl / endpoint / servlethttp / HttpServletMessenger.java
1 /*
2  * Copyright (c) 2001-2007 Sun Microsystems, Inc.  All rights reserved.
3  *  
4  *  The Sun Project JXTA(TM) Software License
5  *  
6  *  Redistribution and use in source and binary forms, with or without 
7  *  modification, are permitted provided that the following conditions are met:
8  *  
9  *  1. Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *  
12  *  2. Redistributions in binary form must reproduce the above copyright notice, 
13  *     this list of conditions and the following disclaimer in the documentation 
14  *     and/or other materials provided with the distribution.
15  *  
16  *  3. The end-user documentation included with the redistribution, if any, must 
17  *     include the following acknowledgment: "This product includes software 
18  *     developed by Sun Microsystems, Inc. for JXTA(TM) technology." 
19  *     Alternately, this acknowledgment may appear in the software itself, if 
20  *     and wherever such third-party acknowledgments normally appear.
21  *  
22  *  4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must 
23  *     not be used to endorse or promote products derived from this software 
24  *     without prior written permission. For written permission, please contact 
25  *     Project JXTA at http://www.jxta.org.
26  *  
27  *  5. Products derived from this software may not be called "JXTA", nor may 
28  *     "JXTA" appear in their name, without prior written permission of Sun.
29  *  
30  *  THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
31  *  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND 
32  *  FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN 
33  *  MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
34  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 
35  *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, 
36  *  OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
37  *  LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
38  *  NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, 
39  *  EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
40  *  
41  *  JXTA is a registered trademark of Sun Microsystems, Inc. in the United 
42  *  States and other countries.
43  *  
44  *  Please see the license information page at :
45  *  <http://www.jxta.org/project/www/license.html> for instructions on use of 
46  *  the license in source files.
47  *  
48  *  ====================================================================
49  *  
50  *  This software consists of voluntary contributions made by many individuals 
51  *  on behalf of Project JXTA. For more information on Project JXTA, please see 
52  *  http://www.jxta.org.
53  *  
54  *  This license is based on the BSD license adopted by the Apache Foundation. 
55  */
56 package net.jxta.impl.endpoint.servlethttp;
57
58 import java.util.Timer;
59 import java.util.TimerTask;
60
61 import java.io.IOException;
62
63 import java.util.logging.Level;
64 import net.jxta.logging.Logging;
65 import java.util.logging.Logger;
66
67 import net.jxta.endpoint.EndpointAddress;
68 import net.jxta.endpoint.Message;
69 import net.jxta.endpoint.MessageElement;
70 import net.jxta.endpoint.StringMessageElement;
71 import net.jxta.peergroup.PeerGroupID;
72
73 import net.jxta.impl.endpoint.BlockingMessenger;
74 import net.jxta.impl.endpoint.EndpointServiceImpl;
75 import net.jxta.impl.util.TimeUtils;
76
77 /**
78  * Simple messenger that waits for a message to give back to the requesting client
79  *
80  * <p/>This messenger is not entirely thread-safe. You should not use any
81  * of the <code>sendMessage</code> methods from more than one thread.
82  *
83  */
84 final class HttpServletMessenger extends BlockingMessenger {
85     
86     /**
87      *  Logger
88      */
89     private final static transient Logger LOG = Logger.getLogger(HttpServletMessenger.class.getName());
90     
91     // We need an explicit idle state. outgoingMessage being null is not enough
92     // because there is an intermediate state where the http servlet must know
93     // that there is nothing new to be sent but the relay side must also know
94     // that status has not been collected yet (the messenger is not ready to be
95     // reused). We use outgoingMessage == null to know that the message
96     // has already been picked-up by the servlet thread, and
97     // sendResult != IDLE to know that the result has not yet been collected
98     // by the relay sender thread.
99     
100     private final static int SEND_IDLE = 0;
101     private final static int SEND_INPROGRESS = 1;
102     private final static int SEND_SUCCESS = 2;
103     private final static int SEND_FAIL = 3;
104     private final static int SEND_TOOLONG = 4;
105     
106     private final static long MAX_SENDING_BLOCK = 2 * TimeUtils.AMINUTE;
107     private final static long MAX_SENDING_WAIT = 3 * TimeUtils.ASECOND;
108     
109     private final static EndpointAddress nullEndpointAddr = new EndpointAddress("http", "0.0.0.0:0", null, null);
110     
111     private final static Timer closeMessengerTimer = new Timer("HttpServletMessenger Expiration timer", true);
112     
113     private final EndpointAddress logicalAddress;
114     private final MessageElement srcAddressElement;
115     private ScheduledExipry expirationTask;
116     
117     /**
118      *  The message "queue"
119      */
120     private Message outgoingMessage = null;
121     
122     private int sendResult = SEND_IDLE;
123     private long sendingSince = 0;
124     
125     /**
126      *  Allows us to schedule the closing of a messenger.
127      */
128     private static class ScheduledExipry extends TimerTask {
129
130         /**
131          *  The messenger we will be expiring.
132          */
133         HttpServletMessenger messenger;
134         
135         ScheduledExipry(HttpServletMessenger toExpire) {
136             messenger = toExpire;
137         }
138         
139         /**
140          *  {@inheritDoc}
141          */
142         @Override
143         public boolean cancel() {
144             // It is important we clear the messenger because Timer doesn't
145             // remove cancelled TimerTasks from it's queue until they fire. This
146             // means that the Messenger would not be GCed until the TimerTask
147             // fired.
148             messenger = null;
149             boolean result = super.cancel();
150
151             // take the opportunity to also purge canceled tasks
152             closeMessengerTimer.purge();
153             return result;
154         }
155         
156         /**
157          *  {@inheritDoc}
158          */
159         @Override
160         public void run() {
161             try {
162                 HttpServletMessenger temp = messenger;
163
164                 messenger = null;
165                 
166                 if (null != temp) {
167                     temp.close();
168                 }
169             } catch (Throwable all) {
170                 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
171                     LOG.log(Level.SEVERE, "Uncaught Throwable in timer task :" + Thread.currentThread().getName(), all);
172                 }
173             }
174         }
175     }
176     
177     /**
178      *  Standard constructor.
179      *
180      * @param peerGroupID the peer group id
181      * @param srcAddress  source address
182      * @param logicalAddress logical address
183      * @param validFor validity in milliseconds
184      */
185     HttpServletMessenger(PeerGroupID peerGroupID, EndpointAddress srcAddress, EndpointAddress logicalAddress, long validFor) {
186         
187         // We do not use self destruction.
188         super(peerGroupID, nullEndpointAddr, false);
189         
190         this.logicalAddress = logicalAddress;
191         
192         this.srcAddressElement = new StringMessageElement(EndpointServiceImpl.MESSAGE_SOURCE_NAME, srcAddress.toString(), null);
193         
194         if ((0 != validFor) && (validFor < Long.MAX_VALUE)) {
195             expirationTask = new ScheduledExipry(this);
196             
197             closeMessengerTimer.schedule(expirationTask, validFor);
198         }
199         
200         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
201             LOG.fine("HttpServletMessenger\n\t" + toString());
202         }
203     }
204     
205     /**
206      * {@inheritDoc}
207      */
208     @Override
209     public synchronized void closeImpl() {
210         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
211             LOG.fine("close\n\t" + toString());
212         }
213         
214         ScheduledExipry cancelExpire = expirationTask;
215
216         expirationTask = null;
217         if (null != cancelExpire) {
218             cancelExpire.cancel();
219         }
220         
221         super.close();
222         
223         notifyAll();
224     }
225     
226     /**
227      * {@inheritDoc}
228      */
229     @Override
230     public EndpointAddress getLogicalDestinationImpl() {
231         return logicalAddress;
232     }
233     
234     /**
235      * {@inheritDoc}
236      */
237     @Override
238     public boolean isIdleImpl() {
239         // We do not use self destruction.
240         return false;
241     }
242     
243     /**
244      * Send messages. Messages are queued and processed by a thread
245      * running HttpClientConnection.
246      */
247     @Override
248     public synchronized void sendMessageBImpl(Message message, String service, String serviceParam) throws IOException {
249         
250         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
251             LOG.fine("Send " + message + " to " + dstAddress.toString() + "\n\t" + toString());
252         }
253         
254         if (isClosed()) {
255             IOException failure = new IOException("Messenger was closed, it cannot be used to send messages.");
256             
257             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
258                 LOG.log(Level.FINE, failure.getMessage(), failure);
259             }
260             
261             throw failure;
262         }
263         
264         // Set the message with the appropriate src and dest address
265         message.replaceMessageElement(EndpointServiceImpl.MESSAGE_SOURCE_NS, srcAddressElement);
266         
267         EndpointAddress destAddressToUse = getDestAddressToUse(service, serviceParam);
268         
269         MessageElement dstAddressElement = new StringMessageElement(EndpointServiceImpl.MESSAGE_DESTINATION_NAME,
270                 destAddressToUse.toString(), null);
271         
272         message.replaceMessageElement(EndpointServiceImpl.MESSAGE_DESTINATION_NS, dstAddressElement);
273         
274         // doSend returns false only when this messenger is closed.
275         if (!doSend(message)) {
276             // send message failed
277             IOException failure = new IOException("Messenger was closed, it cannot be used to send messages.");
278             
279             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
280                 LOG.log(Level.FINE, "sendMessage failed (messenger closed).\n\t" + toString(), failure);
281             }
282             
283             throw failure;
284         }
285         
286         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
287             LOG.fine("sendMessage successful for " + message + "\n\t" + toString());
288         }
289     }
290     
291     // Must be called from synchronized context only.
292     private boolean doSend(Message message) {
293         
294         // No need to wait for the messenger to be free. Transport
295         // messengers have no obligation to behave nicely if they're
296         // used by mltiple threads. If a thread comes here while
297         // we're already busy sending, then that's a congestion. Just
298         // drop the new message (pretend it went out).
299         // This is not even so nasty, because jetty has a sizeable
300         // output buffer. As long as that buffer is not full, sending
301         // is instantaneou. If sending starts blocking, then we can honestly
302         // drop messages.
303         
304         if (isClosed()) {
305             return false;
306         }
307         
308         long now = TimeUtils.timeNow();
309         
310         if (sendResult != SEND_IDLE) {
311             // check if that connection is a lemon
312             if ((sendResult == SEND_TOOLONG) && (now > (sendingSince + MAX_SENDING_BLOCK))) {
313                 close();
314             }
315             return true;
316         }
317         
318         // put the message on the outgoing "queue" of size 1
319         outgoingMessage = message;
320         sendResult = SEND_INPROGRESS;
321         sendingSince = now;
322         
323         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
324             LOG.fine("Queued " + message);
325         }
326         
327         // notify the servlet if it was waiting for a message
328         notifyAll();
329         
330         // wait for the result of the send Since there is ample
331         // buffering underneath, we're not supposed to wait for long;
332         // unless we outdo the connection, in which case, we might as
333         // well start dropping. No point in blocking.
334         // FIXME: jice@jxta.org - this is a rudimentary fix. We need
335         // something like watchedOutputStream instead. Being forced
336         // to do a two-way handshake with a servlet thread is pretty
337         // ridiculous too. We need a simpler http transport.
338         
339         long absoluteTimeOut = TimeUtils.toAbsoluteTimeMillis(MAX_SENDING_WAIT);
340         
341         while (!isClosed()) {
342             if (sendResult != SEND_INPROGRESS) {
343                 break;
344             }
345             
346             long waitfor = TimeUtils.toRelativeTimeMillis(absoluteTimeOut);
347             
348             if (waitfor <= 0) {
349                 break;
350             }
351             
352             try {
353                 wait(waitfor);
354             } catch (InterruptedException e) {
355                 Thread.interrupted();
356                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
357                     LOG.log(Level.FINE, "InterruptedException timeout = " + MAX_SENDING_WAIT + "\n\t" + toString(), e);
358                 }
359                 break;
360             }
361         }
362         
363         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
364             LOG.fine("Got result\n\t" + toString());
365         }
366         
367         if (isClosed() && (SEND_INPROGRESS == sendResult)) {
368             return false;
369         }
370         
371         // If the operation completed just confirm that we're done
372         // reading the result too. Else mark that we don't care.
373         // When it completes the result will be discarded. By default,
374         // the we return ok. If the operation did not complete fast
375         // enough, that's what we return.
376         
377         boolean result = (sendResult != SEND_FAIL);
378         
379         if (sendResult == SEND_INPROGRESS) {
380             sendResult = SEND_TOOLONG;
381             outgoingMessage = null;
382         } else {
383             sendResult = SEND_IDLE;
384         }
385         
386         // Let another contending thread use this messenger.
387         notifyAll();
388         
389         return result;
390     }
391     
392     /**
393      *  Retrieve a message from the "queue" of messages for the servlet.
394      *
395      *  @param timeout Number of milliseconds to wait for a message. Per Java
396      *  convention 0 (zero) means wait forever.
397      *  @return the message or <code>null</code> if no message was available
398      *  before the timeout was reached.
399      *  @throws InterruptedException If the thread is interrupted while waiting.
400      */
401     protected synchronized Message waitForMessage(long timeout) throws InterruptedException {
402         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
403             LOG.fine("Waiting (" + (0 == timeout ? "forever" : Long.toString(timeout)) + ") for message\n\t" + toString());
404         }
405         
406         if (0 == timeout) {
407             // it's forever
408             timeout = Long.MAX_VALUE;
409         }
410         
411         long absoluteTimeOut = TimeUtils.toAbsoluteTimeMillis(timeout);
412         
413         while (!isClosed() && (null == outgoingMessage)) {
414             long waitfor = TimeUtils.toRelativeTimeMillis(absoluteTimeOut);
415             
416             if (waitfor <= 0) {
417                 break;
418             }
419             
420             wait(waitfor);
421         }
422         
423         // get the message
424         Message result = outgoingMessage;
425         
426         outgoingMessage = null; // Msg can only be picked-up once.
427         
428         if (!isClosed() && (result == null)) {
429             // ABSURD, but seems to happen: the message sent was NULL
430             // and the sender thread is waiting for the completion event.
431             // There would be no such thing, but we can make sure it stops
432             // waiting and still leave the messenger in a sane state if there
433             // was no such absurdity going on.
434             
435             sendResult = SEND_IDLE;
436             notifyAll();
437         }
438         
439         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
440             LOG.fine("Returning " + result + "\n\t" + toString());
441         }
442         
443         return result;
444     }
445     
446     protected synchronized void messageSent(boolean wasSuccessful) {
447         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
448             LOG.fine("messageSent(" + wasSuccessful + ")\n\t" + toString());
449         }
450         
451         if (SEND_TOOLONG == sendResult) {
452             // No-one cares for the result any more. Let the next send go.
453             sendResult = SEND_IDLE;
454         } else {
455             sendResult = wasSuccessful ? SEND_SUCCESS : SEND_FAIL;
456         }
457         
458         notifyAll();
459     }
460     
461     /**
462      * {@inheritDoc}
463      *
464      *  <p/>An implementation for debugging. Do not depend on the format.
465      */
466     @Override
467     public String toString() {
468         return "[" + super.toString() + "] isClosed=" + isClosed() + " sendResult=" + sendResult + " outmsg=" + outgoingMessage;
469     }
470 }