]> sjero.net Git - linphone/blob - p2pproxy/dependencies-src/jxse-src-2.5/api/src/net/jxta/endpoint/AsyncChannelMessenger.java
remove mediastreamer2 and add it as a submodule instead.
[linphone] / p2pproxy / dependencies-src / jxse-src-2.5 / api / src / net / jxta / endpoint / AsyncChannelMessenger.java
1 /*
2  * Copyright (c) 2004-2007 Sun Microsystems, Inc.  All rights reserved.
3  *  
4  *  The Sun Project JXTA(TM) Software License
5  *  
6  *  Redistribution and use in source and binary forms, with or without 
7  *  modification, are permitted provided that the following conditions are met:
8  *  
9  *  1. Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *  
12  *  2. Redistributions in binary form must reproduce the above copyright notice, 
13  *     this list of conditions and the following disclaimer in the documentation 
14  *     and/or other materials provided with the distribution.
15  *  
16  *  3. The end-user documentation included with the redistribution, if any, must 
17  *     include the following acknowledgment: "This product includes software 
18  *     developed by Sun Microsystems, Inc. for JXTA(TM) technology." 
19  *     Alternately, this acknowledgment may appear in the software itself, if 
20  *     and wherever such third-party acknowledgments normally appear.
21  *  
22  *  4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must 
23  *     not be used to endorse or promote products derived from this software 
24  *     without prior written permission. For written permission, please contact 
25  *     Project JXTA at http://www.jxta.org.
26  *  
27  *  5. Products derived from this software may not be called "JXTA", nor may 
28  *     "JXTA" appear in their name, without prior written permission of Sun.
29  *  
30  *  THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
31  *  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND 
32  *  FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN 
33  *  MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
34  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 
35  *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, 
36  *  OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
37  *  LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
38  *  NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, 
39  *  EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
40  *  
41  *  JXTA is a registered trademark of Sun Microsystems, Inc. in the United 
42  *  States and other countries.
43  *  
44  *  Please see the license information page at :
45  *  <http://www.jxta.org/project/www/license.html> for instructions on use of 
46  *  the license in source files.
47  *  
48  *  ====================================================================
49  *  
50  *  This software consists of voluntary contributions made by many individuals 
51  *  on behalf of Project JXTA. For more information on Project JXTA, please see 
52  *  http://www.jxta.org.
53  *  
54  *  This license is based on the BSD license adopted by the Apache Foundation. 
55  */
56 package net.jxta.endpoint;
57
58 import net.jxta.peergroup.PeerGroupID;
59
60 import java.io.IOException;
61 import java.io.InterruptedIOException;
62 import java.util.concurrent.ArrayBlockingQueue;
63 import java.util.concurrent.BlockingQueue;
64
65 /**
66  * Extends Channel Messenger behaviour to provide asynchronous message sending
67  * via queuing.
68  */
69 public abstract class AsyncChannelMessenger extends ChannelMessenger {
70
71     /*
72      *  Logger
73      * private final static transient Logger LOG = Logger.getLogger(AsyncChannelMessenger.class.getName());
74      */
75
76     /**
77      * {@code true} if we have deliberately closed our one message input queue.
78      */
79     private boolean inputClosed = false;
80
81     /**
82      * {@code true} if we have deliberately stopped sending.
83      */
84     private boolean outputClosed = false;
85
86     /**
87      * Actions that we defer to after returning from event methods. In other
88      * words, they cannot be done with the lock held, or they require calling
89      * more event methods.
90      */
91     private enum DeferredAction {
92
93         /**
94          * No action deferred.
95          */
96         ACTION_NONE,
97         /**
98          * Must send the current message.
99          */
100         ACTION_SEND,
101         /**
102          * Must report failure to connect.
103          */
104         ACTION_CONNECT
105     }
106
107     /**
108      * The current deferred action.
109      */
110     private DeferredAction deferredAction = DeferredAction.ACTION_NONE;
111
112     /**
113      * The messages queue.
114      */
115     private final BlockingQueue<PendingMessage> queue;
116
117     /**
118      * State lock and engine.
119      */
120     private final AsyncChannelMessengerState stateMachine;
121
122     /**
123      * Our statemachine implementation; just connects the standard MessengerState action methods to
124      * this object.
125      */
126     private class AsyncChannelMessengerState extends MessengerState {
127
128         protected AsyncChannelMessengerState(boolean connected) {
129             super(connected);
130         }
131
132         /*
133          * The required action methods.
134          */
135
136         /**
137          * {@inheritDoc}
138          */
139         @Override
140         protected void connectAction() {
141             deferredAction = DeferredAction.ACTION_CONNECT;
142         }
143
144         /**
145          * {@inheritDoc}
146          */
147         @Override
148         protected void startAction() {
149             deferredAction = DeferredAction.ACTION_SEND;
150         }
151
152         /**
153          * {@inheritDoc}
154          */
155         @Override
156         protected void closeInputAction() {
157             // We're synchronized here. (invoked from stateMachine)
158             inputClosed = true;
159         }
160
161         /**
162          * {@inheritDoc}
163          */
164         @Override
165         protected void closeOutputAction() {
166             // We're synchronized here. (invoked from stateMachine)
167             outputClosed = true;
168         }
169
170         /**
171          * {@inheritDoc}
172          */
173         @Override
174         protected void failAllAction() {
175
176             // The queue is now closed, so we can rest assured that the last
177             // message is really the last one. This is a synchronous action. The
178             // state machine assumes that it is done when we return. There is no
179             // need to signal completion with an idleEvent.
180             PendingMessage theMsg;
181
182             while (true) {
183                 theMsg = null;
184
185                 synchronized (stateMachine) {
186                     theMsg = queue.poll();
187                 }
188
189                 if (theMsg == null) {
190                     return;
191                 }
192
193                 Message currentMsg = theMsg.msg;
194                 Throwable currentFailure = theMsg.failure;
195
196                 if (currentFailure == null) {
197                     currentFailure = new IOException("Messenger unexpectedly closed");
198                 }
199
200                 OutgoingMessageEvent event = new OutgoingMessageEvent(currentMsg, currentFailure);
201
202                 currentMsg.setMessageProperty(Messenger.class, event);
203             }
204         }
205     }
206
207
208     /**
209      * The representation of a queued message. It is shared between this
210      * abstract class and any implementation.
211      */
212     protected static class PendingMessage {
213         final Message msg;
214         final String service;
215         final String param;
216         Throwable failure;
217
218         PendingMessage(Message msg, String service, String param) {
219             this.msg = msg;
220             this.service = service;
221             this.param = param;
222             this.failure = null;
223         }
224     }
225
226     /**
227      * Create a new AsyncChannelMessenger.
228      *
229      * @param baseAddress      The network address messages go to; regardless of
230      *                         service, param, or group.
231      * @param redirection      Group to which the messages must be redirected. This
232      *                         is used to implement the automatic group segregation which has become a
233      *                         de-facto standard. If not null, the unique portion of the specified
234      *                         groupID is prepended with {@link #InsertedServicePrefix} and inserted in
235      *                         every message's destination address in place of the the original service
236      *                         name, which gets shifted into the beginning of the service parameter. The
237      *                         opposite is done on arrival to restore the original destination address
238      *                         before the message is delivered to the listener in the the specified
239      *                         group. Messages that already bear a group redirection are not affected.
240      * @param origService      The default destination service for messages sent
241      *                         without specifying a different service.
242      * @param origServiceParam The default destination service parameter for
243      *                         messages sent without specifying a different service parameter.
244      * @param queueSize        the queue size that channels should have.
245      * @param connected        true if the channel is created in the connected state.
246      */
247     public AsyncChannelMessenger(EndpointAddress baseAddress, PeerGroupID redirection, String origService, String origServiceParam, int queueSize, boolean connected) {
248
249         super(baseAddress, redirection, origService, origServiceParam);
250
251         stateMachine = new AsyncChannelMessengerState(connected);
252
253         queue = new ArrayBlockingQueue<PendingMessage>(queueSize);
254
255         // We synchronize our state with the sharedMessenger's stateMachine.
256         // Logic would dictate that we pass it to super(), but it is not itself
257         // constructed until super() returns. No way around it.
258
259         setStateLock(stateMachine);
260     }
261
262     /**
263      * {@inheritDoc}
264      */
265     public final void close() {
266         DeferredAction action;
267
268         synchronized (stateMachine) {
269             stateMachine.closeEvent();
270             action = eventCalled(true);
271         }
272
273         // We called an event. State may have changed.
274         notifyChange();
275
276         performDeferredAction(action);
277     }
278
279     /**
280      * This internal method does the common part of sending the message on
281      * behalf of both sendMessageN and sendMessageB.
282      * <p/>It is not quite possible to implement sendMessageB as a wrapper
283      * around sendMessageN without some internal cooperation. At least not in
284      * an efficient manner. sendMessageB must not set the message property:
285      * either it fails and throws, or it returns successfully and the property
286      * is set later. This is required so that messages can be retried when
287      * failing synchronously (through a blocking messenger typically, but the
288      * semantic has to be uniform).
289      * <p/>Each of sendMessageB and sendMessageN takes care of status reporting
290      * on its own terms.
291      *
292      * @param msg           the message to send
293      * @param rService      destination service
294      * @param rServiceParam destination param
295      * @return The outcome from that one attempt. {@code true} means done.
296      *         {@code false} means saturated.  When {@code true} is returned, it means
297      *         that the fate of the message will be decided asynchronously, so we do
298      *         not have any details, yet.
299      * @throws IOException          is thrown if this messenger is closed.
300      * @throws InterruptedException if interrupted
301      */
302     private boolean sendMessageCommon(Message msg, String rService, String rServiceParam) throws IOException, InterruptedException {
303
304         String service = effectiveService(rService);
305         String serviceParam = effectiveParam(rService, rServiceParam);
306         boolean queued = true;
307         boolean change = false;
308         DeferredAction action = DeferredAction.ACTION_NONE;
309
310         synchronized (stateMachine) {
311             if (inputClosed) {
312                 throw new IOException("This messenger is closed. It cannot be used to send messages.");
313             }
314
315             boolean wasEmpty = queue.isEmpty();
316
317             if (queue.remainingCapacity() > 1) {
318                 queue.put(new PendingMessage(msg, service, serviceParam));
319
320                 // Still not saturated. If we weren't idle either, then nothing worth mentionning.
321                 if (wasEmpty) {
322                     change = true;
323                     stateMachine.msgsEvent();
324                     action = eventCalled(false);
325                 }
326             } else if (1 == queue.remainingCapacity()) {
327                 queue.put(new PendingMessage(msg, service, serviceParam));
328
329                 // Now saturated.
330                 stateMachine.saturatedEvent();
331                 action = eventCalled(false);
332                 change = true;
333             } else {
334                 // Was already saturated.
335                 queued = false;
336             }
337         }
338
339         if (queued && change) {
340             // If not queued, there was no change of condition as far as
341             // outsiders are concerned. (redundant saturatedEvent, only
342             // defensive; to guarantee statemachine in sync). else, if the
343             // saturation state did not change, we have no state change to
344             // notify.
345             notifyChange();
346         }
347
348         performDeferredAction(action);
349
350         // Before we return, make sure that this channel remains referenced if
351         // it has messages. It could become unreferenced if it is not yet
352         // resolved and the application lets go of it after sending messages.
353         // This means that we may need to do something only in the resolpending
354         // and resolsaturated cases. The way we do this test, there can be false
355         // positives. They're dealt with as part of the action that is carried
356         // out.
357         if ((stateMachine.getState() & (Messenger.RESOLPENDING | Messenger.RESOLSATURATED)) != 0) {
358             resolPendingImpl();
359         }
360
361         return queued;
362     }
363
364     /**
365      * {@inheritDoc}
366      */
367     public final boolean sendMessageN(Message msg, String rService, String rServiceParam) {
368
369         try {
370             if (sendMessageCommon(msg, rService, rServiceParam)) {
371                 // If it worked the message is queued; the outcome will be notified later.
372                 return true;
373             }
374             // Non-blocking and queue full: report overflow.
375             msg.setMessageProperty(Messenger.class, OutgoingMessageEvent.OVERFLOW);
376         } catch (IOException oie) {
377             msg.setMessageProperty(Messenger.class, new OutgoingMessageEvent(msg, oie));
378         } catch (InterruptedException interrupted) {
379             msg.setMessageProperty(Messenger.class, new OutgoingMessageEvent(msg, interrupted));
380         }
381         return false;
382     }
383
384     /**
385      * {@inheritDoc}
386      */
387     public final void sendMessageB(Message msg, String rService, String rServiceParam) throws IOException {
388
389         try {
390             while (true) {
391                 // if sendMessageCommon says "true" it worked.
392                 if (sendMessageCommon(msg, rService, rServiceParam)) {
393                     return;
394                 }
395                 // Do a shallow check on the queue. If it seems empty (without getting into a critical section to
396                 // verify it), then yielding is good bet. It is a lot cheaper and smoother than waiting.
397                 // Note the message should be enqueued now. yielding makes sense now if the queue is empty
398                 if (queue.isEmpty()) {
399                     Thread.yield();
400                 }
401
402                 // If we reached this far, it is neither closed, nor ok. So it was saturated.
403                 synchronized (stateMachine) {
404                     // Cheaper than waitState. sendMessageCommon already does the relevant state checks.
405                     stateMachine.wait();
406                 }
407             }
408         } catch (InterruptedException ie) {
409             InterruptedIOException iie = new InterruptedIOException("Message send interrupted");
410
411             iie.initCause(ie);
412             throw iie;
413         }
414     }
415
416     /**
417      * {@inheritDoc}
418      */
419     public final void resolve() {
420         DeferredAction action;
421
422         synchronized (stateMachine) {
423             stateMachine.resolveEvent();
424             action = eventCalled(true);
425         }
426         notifyChange();
427         performDeferredAction(action); // we expect connect but let the state machine decide.
428     }
429
430     /**
431      * {@inheritDoc}
432      */
433     public final int getState() {
434         return stateMachine.getState();
435     }
436
437     /**
438      * {@inheritDoc}
439      */
440     @Override
441     public final Messenger getChannelMessenger(PeerGroupID redirection, String service, String serviceParam) {
442         // Channels don't make channels.
443         return null;
444     }
445
446     /**
447      * Three exposed methods may need to inject new events in the system:
448      * sendMessageN, close, and shutdown.
449      * Since they can all cause actions, and since connectAction and
450      * startAction are deferred, it seems possible that one of the actions
451      * caused by send, close, or shutdown be called while connectAction or
452      * startAction are in progress.
453      * <p/>However, the state machine gives us a few guarantees: connectAction
454      * and startAction can never nest. We will not be asked to perform one while
455      * still performing the other. Only the synchronous actions closeInput,
456      * closeOutput, or failAll can possibly be requested in the interval. We
457      * could make more assumptions and simplify the code, but rather keep at
458      * least some flexibility.
459      * <p/>
460      * DEAD LOCK WARNING: the implementor's method invoke some of our call backs
461      * while synchronized. Then our call backs synchronize on the state machine
462      * in here. This nesting order must always be respected. As a result, we can
463      * never invoke implementors methods while synchronized. Hence the
464      * deferredAction processing.
465      *
466      * @param action the action
467      */
468     private void performDeferredAction(DeferredAction action) {
469         switch (action) {
470         case ACTION_SEND:
471             startImpl();
472             break;
473
474         case ACTION_CONNECT:
475             connectImpl();
476             break;
477         }
478     }
479
480     /**
481      * A shortHand for a frequently used sequence. MUST be called while
482      * synchronized on stateMachine.
483      *
484      * @param notifyAll If {@code true} then this is a life-cycle event and all
485      *                  waiters on the stateMachine should be notified. If {@code false} then
486      *                  only a single waiter will be notified for simple activity events.
487      * @return the deferred action.
488      */
489     private DeferredAction eventCalled(boolean notifyAll) {
490         DeferredAction action = deferredAction;
491
492         deferredAction = DeferredAction.ACTION_NONE;
493         if (notifyAll) {
494             stateMachine.notifyAll();
495         } else {
496             stateMachine.notify();
497         }
498         return action;
499     }
500
501     /*
502      * Implement the methods that our shared messenger will use to report progress.
503      */
504
505     /**
506      * The implementation will invoke this method when it becomes resolved,
507      * after connectImpl was invoked.
508      */
509     protected void up() {
510         DeferredAction action;
511
512         synchronized (stateMachine) {
513             stateMachine.upEvent();
514             action = eventCalled(true);
515         }
516         notifyChange();
517         performDeferredAction(action); // we expect start but let the state machine decide.
518     }
519
520     /**
521      * The implementation invokes this method when it becomes broken.
522      */
523     protected void down() {
524         DeferredAction action;
525
526         synchronized (stateMachine) {
527             stateMachine.downEvent();
528             action = eventCalled(true);
529         }
530         notifyChange();
531         performDeferredAction(action); // we expect connect but let the state machine decide.
532     }
533
534     /**
535      * Here, we behave like a queue to the shared messenger. When we report
536      * being empty, though, we're automatically removed from the active queues
537      * list. We'll go back there the next time we have something to send by
538      * calling startImpl.
539      *
540      * @return pending message
541      */
542     protected PendingMessage peek() {
543
544         PendingMessage theMsg;
545         DeferredAction action = DeferredAction.ACTION_NONE;
546
547         synchronized (stateMachine) {
548             // We like the msg to keep occupying space in the queue until it's
549             // out the door. That way, idleness (that is, not currently working
550             // on a message), is always consistent with queue emptyness.
551
552             theMsg = queue.peek();
553             if (theMsg == null) {
554                 stateMachine.idleEvent();
555                 action = eventCalled(false);
556
557                 // We do not notifyChange, here, because, if the queue is empty,
558                 // it was already notified when the last message was popped. The
559                 // call to idleEvent is only defensive programming to make extra
560                 // sure the state machine is in sync.
561
562                 return null;
563             }
564
565             if (outputClosed) {
566                 // We've been asked to stop sending. Which, if we were sending,
567                 // must be notified by either an idle event or a down
568                 // event. Nothing needs to happen to the shared messenger. We're
569                 // just a channel.
570                 stateMachine.downEvent();
571                 action = eventCalled(true);
572                 theMsg = null;
573             }
574         }
575
576         notifyChange();
577         performDeferredAction(action); // we expect none but let the state machine decide.
578         return theMsg;
579     }
580
581     /**
582      * Returns the number of elements in this collection.  If this collection
583      * contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
584      * <tt>Integer.MAX_VALUE</tt>.
585      *
586      * @return the number of elements in this collection
587      */
588     protected int size() {
589         return queue.size();
590     }
591     
592     /**
593      * One message done. Update the saturated/etc state accordingly.
594      *
595      * @return true if there are more messages after the one we removed.
596      */
597     protected boolean poll() {
598
599         boolean result;
600         DeferredAction action;
601
602         synchronized (stateMachine) {
603             queue.poll();
604
605             if (queue.peek() == null) {
606                 stateMachine.idleEvent();
607                 action = eventCalled(false);
608                 result = false;
609             } else {
610                 stateMachine.msgsEvent();
611                 action = eventCalled(false);
612                 result = true;
613             }
614         }
615
616         notifyChange();
617         performDeferredAction(action); // we expect none but let the state machine decide.
618
619         return result;
620     }
621
622     /**
623      * We invoke this method to be placed on the list of channels that have
624      * message to send.
625      * <p/>
626      * NOTE that it is the shared messenger responsibility to synchronize so
627      * that we cannot be added to the active list just before we get removed
628      * due to reporting an empty queue in parallel. So, if we report an empty
629      * queue and have a new message to send before the shared messenger removes
630      * us form the active list, startImpl will block until the removal is done.
631      * Then we'll be added back.
632      * <p/>
633      * If it cannot be done, it means that the shared messenger is no longer
634      * usable. It may call down() in sequence. Out of defensiveness, it should
635      * do so without holding its lock.
636      */
637     protected abstract void startImpl();
638
639     /**
640      * We invoke this method to be placed on the list of channels that are
641      * waiting for resolution.
642      * <p/>
643      * If it cannot be done, it means that the shared messenger is no longer
644      * usable. It may call down() in sequence. Out of defensiveness, it should
645      * do so without holding its lock. If the messenger is already resolved it
646      * may call up() in sequence. Same wisdom applies. It is a good idea to
647      * create channels in the resolved state if the shared messenger is already
648      * resolved. That avoids this extra contortion.
649      */
650     protected abstract void connectImpl();
651
652     /**
653      * This is invoked to inform the implementation that this channel is now in
654      * the resolPending or resolSaturated state. This is specific to this type
655      * of channels. The shared messenger must make sure that this channel
656      * remains strongly referenced, even though it is not resolved, because
657      * there are messages in it. It is valid for an application to let go of a
658      * channel after sending a message, even if the channel is not yet
659      * resolved. The message will go if/when the channel resolves. This method
660      * may be invoked redundantly and even once the channel is no longer among
661      * the one awaiting resolution. The implementation must be careful to
662      * ignore such calls.
663      */
664     protected abstract void resolPendingImpl();
665 }