2 * Copyright (c) 2004-2007 Sun Microsystems, Inc. All rights reserved.
4 * The Sun Project JXTA(TM) Software License
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
9 * 1. Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
16 * 3. The end-user documentation included with the redistribution, if any, must
17 * include the following acknowledgment: "This product includes software
18 * developed by Sun Microsystems, Inc. for JXTA(TM) technology."
19 * Alternately, this acknowledgment may appear in the software itself, if
20 * and wherever such third-party acknowledgments normally appear.
22 * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must
23 * not be used to endorse or promote products derived from this software
24 * without prior written permission. For written permission, please contact
25 * Project JXTA at http://www.jxta.org.
27 * 5. Products derived from this software may not be called "JXTA", nor may
28 * "JXTA" appear in their name, without prior written permission of Sun.
30 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
31 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
32 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN
33 * MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
34 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
36 * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
37 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
38 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
39 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41 * JXTA is a registered trademark of Sun Microsystems, Inc. in the United
42 * States and other countries.
44 * Please see the license information page at :
45 * <http://www.jxta.org/project/www/license.html> for instructions on use of
46 * the license in source files.
48 * ====================================================================
50 * This software consists of voluntary contributions made by many individuals
51 * on behalf of Project JXTA. For more information on Project JXTA, please see
52 * http://www.jxta.org.
54 * This license is based on the BSD license adopted by the Apache Foundation.
56 package net.jxta.endpoint;
58 import net.jxta.peergroup.PeerGroupID;
60 import java.io.IOException;
61 import java.io.InterruptedIOException;
62 import java.util.concurrent.ArrayBlockingQueue;
63 import java.util.concurrent.BlockingQueue;
66 * Extends Channel Messenger behaviour to provide asynchronous message sending
69 public abstract class AsyncChannelMessenger extends ChannelMessenger {
73 * private final static transient Logger LOG = Logger.getLogger(AsyncChannelMessenger.class.getName());
77 * {@code true} if we have deliberately closed our one message input queue.
79 private boolean inputClosed = false;
82 * {@code true} if we have deliberately stopped sending.
84 private boolean outputClosed = false;
87 * Actions that we defer to after returning from event methods. In other
88 * words, they cannot be done with the lock held, or they require calling
91 private enum DeferredAction {
98 * Must send the current message.
102 * Must report failure to connect.
108 * The current deferred action.
110 private DeferredAction deferredAction = DeferredAction.ACTION_NONE;
113 * The messages queue.
115 private final BlockingQueue<PendingMessage> queue;
118 * State lock and engine.
120 private final AsyncChannelMessengerState stateMachine;
123 * Our statemachine implementation; just connects the standard MessengerState action methods to
126 private class AsyncChannelMessengerState extends MessengerState {
128 protected AsyncChannelMessengerState(boolean connected) {
133 * The required action methods.
140 protected void connectAction() {
141 deferredAction = DeferredAction.ACTION_CONNECT;
148 protected void startAction() {
149 deferredAction = DeferredAction.ACTION_SEND;
156 protected void closeInputAction() {
157 // We're synchronized here. (invoked from stateMachine)
165 protected void closeOutputAction() {
166 // We're synchronized here. (invoked from stateMachine)
174 protected void failAllAction() {
176 // The queue is now closed, so we can rest assured that the last
177 // message is really the last one. This is a synchronous action. The
178 // state machine assumes that it is done when we return. There is no
179 // need to signal completion with an idleEvent.
180 PendingMessage theMsg;
185 synchronized (stateMachine) {
186 theMsg = queue.poll();
189 if (theMsg == null) {
193 Message currentMsg = theMsg.msg;
194 Throwable currentFailure = theMsg.failure;
196 if (currentFailure == null) {
197 currentFailure = new IOException("Messenger unexpectedly closed");
200 OutgoingMessageEvent event = new OutgoingMessageEvent(currentMsg, currentFailure);
202 currentMsg.setMessageProperty(Messenger.class, event);
209 * The representation of a queued message. It is shared between this
210 * abstract class and any implementation.
212 protected static class PendingMessage {
214 final String service;
218 PendingMessage(Message msg, String service, String param) {
220 this.service = service;
227 * Create a new AsyncChannelMessenger.
229 * @param baseAddress The network address messages go to; regardless of
230 * service, param, or group.
231 * @param redirection Group to which the messages must be redirected. This
232 * is used to implement the automatic group segregation which has become a
233 * de-facto standard. If not null, the unique portion of the specified
234 * groupID is prepended with {@link #InsertedServicePrefix} and inserted in
235 * every message's destination address in place of the the original service
236 * name, which gets shifted into the beginning of the service parameter. The
237 * opposite is done on arrival to restore the original destination address
238 * before the message is delivered to the listener in the the specified
239 * group. Messages that already bear a group redirection are not affected.
240 * @param origService The default destination service for messages sent
241 * without specifying a different service.
242 * @param origServiceParam The default destination service parameter for
243 * messages sent without specifying a different service parameter.
244 * @param queueSize the queue size that channels should have.
245 * @param connected true if the channel is created in the connected state.
247 public AsyncChannelMessenger(EndpointAddress baseAddress, PeerGroupID redirection, String origService, String origServiceParam, int queueSize, boolean connected) {
249 super(baseAddress, redirection, origService, origServiceParam);
251 stateMachine = new AsyncChannelMessengerState(connected);
253 queue = new ArrayBlockingQueue<PendingMessage>(queueSize);
255 // We synchronize our state with the sharedMessenger's stateMachine.
256 // Logic would dictate that we pass it to super(), but it is not itself
257 // constructed until super() returns. No way around it.
259 setStateLock(stateMachine);
265 public final void close() {
266 DeferredAction action;
268 synchronized (stateMachine) {
269 stateMachine.closeEvent();
270 action = eventCalled(true);
273 // We called an event. State may have changed.
276 performDeferredAction(action);
280 * This internal method does the common part of sending the message on
281 * behalf of both sendMessageN and sendMessageB.
282 * <p/>It is not quite possible to implement sendMessageB as a wrapper
283 * around sendMessageN without some internal cooperation. At least not in
284 * an efficient manner. sendMessageB must not set the message property:
285 * either it fails and throws, or it returns successfully and the property
286 * is set later. This is required so that messages can be retried when
287 * failing synchronously (through a blocking messenger typically, but the
288 * semantic has to be uniform).
289 * <p/>Each of sendMessageB and sendMessageN takes care of status reporting
292 * @param msg the message to send
293 * @param rService destination service
294 * @param rServiceParam destination param
295 * @return The outcome from that one attempt. {@code true} means done.
296 * {@code false} means saturated. When {@code true} is returned, it means
297 * that the fate of the message will be decided asynchronously, so we do
298 * not have any details, yet.
299 * @throws IOException is thrown if this messenger is closed.
300 * @throws InterruptedException if interrupted
302 private boolean sendMessageCommon(Message msg, String rService, String rServiceParam) throws IOException, InterruptedException {
304 String service = effectiveService(rService);
305 String serviceParam = effectiveParam(rService, rServiceParam);
306 boolean queued = true;
307 boolean change = false;
308 DeferredAction action = DeferredAction.ACTION_NONE;
310 synchronized (stateMachine) {
312 throw new IOException("This messenger is closed. It cannot be used to send messages.");
315 boolean wasEmpty = queue.isEmpty();
317 if (queue.remainingCapacity() > 1) {
318 queue.put(new PendingMessage(msg, service, serviceParam));
320 // Still not saturated. If we weren't idle either, then nothing worth mentionning.
323 stateMachine.msgsEvent();
324 action = eventCalled(false);
326 } else if (1 == queue.remainingCapacity()) {
327 queue.put(new PendingMessage(msg, service, serviceParam));
330 stateMachine.saturatedEvent();
331 action = eventCalled(false);
334 // Was already saturated.
339 if (queued && change) {
340 // If not queued, there was no change of condition as far as
341 // outsiders are concerned. (redundant saturatedEvent, only
342 // defensive; to guarantee statemachine in sync). else, if the
343 // saturation state did not change, we have no state change to
348 performDeferredAction(action);
350 // Before we return, make sure that this channel remains referenced if
351 // it has messages. It could become unreferenced if it is not yet
352 // resolved and the application lets go of it after sending messages.
353 // This means that we may need to do something only in the resolpending
354 // and resolsaturated cases. The way we do this test, there can be false
355 // positives. They're dealt with as part of the action that is carried
357 if ((stateMachine.getState() & (Messenger.RESOLPENDING | Messenger.RESOLSATURATED)) != 0) {
367 public final boolean sendMessageN(Message msg, String rService, String rServiceParam) {
370 if (sendMessageCommon(msg, rService, rServiceParam)) {
371 // If it worked the message is queued; the outcome will be notified later.
374 // Non-blocking and queue full: report overflow.
375 msg.setMessageProperty(Messenger.class, OutgoingMessageEvent.OVERFLOW);
376 } catch (IOException oie) {
377 msg.setMessageProperty(Messenger.class, new OutgoingMessageEvent(msg, oie));
378 } catch (InterruptedException interrupted) {
379 msg.setMessageProperty(Messenger.class, new OutgoingMessageEvent(msg, interrupted));
387 public final void sendMessageB(Message msg, String rService, String rServiceParam) throws IOException {
391 // if sendMessageCommon says "true" it worked.
392 if (sendMessageCommon(msg, rService, rServiceParam)) {
395 // Do a shallow check on the queue. If it seems empty (without getting into a critical section to
396 // verify it), then yielding is good bet. It is a lot cheaper and smoother than waiting.
397 // Note the message should be enqueued now. yielding makes sense now if the queue is empty
398 if (queue.isEmpty()) {
402 // If we reached this far, it is neither closed, nor ok. So it was saturated.
403 synchronized (stateMachine) {
404 // Cheaper than waitState. sendMessageCommon already does the relevant state checks.
408 } catch (InterruptedException ie) {
409 InterruptedIOException iie = new InterruptedIOException("Message send interrupted");
419 public final void resolve() {
420 DeferredAction action;
422 synchronized (stateMachine) {
423 stateMachine.resolveEvent();
424 action = eventCalled(true);
427 performDeferredAction(action); // we expect connect but let the state machine decide.
433 public final int getState() {
434 return stateMachine.getState();
441 public final Messenger getChannelMessenger(PeerGroupID redirection, String service, String serviceParam) {
442 // Channels don't make channels.
447 * Three exposed methods may need to inject new events in the system:
448 * sendMessageN, close, and shutdown.
449 * Since they can all cause actions, and since connectAction and
450 * startAction are deferred, it seems possible that one of the actions
451 * caused by send, close, or shutdown be called while connectAction or
452 * startAction are in progress.
453 * <p/>However, the state machine gives us a few guarantees: connectAction
454 * and startAction can never nest. We will not be asked to perform one while
455 * still performing the other. Only the synchronous actions closeInput,
456 * closeOutput, or failAll can possibly be requested in the interval. We
457 * could make more assumptions and simplify the code, but rather keep at
458 * least some flexibility.
460 * DEAD LOCK WARNING: the implementor's method invoke some of our call backs
461 * while synchronized. Then our call backs synchronize on the state machine
462 * in here. This nesting order must always be respected. As a result, we can
463 * never invoke implementors methods while synchronized. Hence the
464 * deferredAction processing.
466 * @param action the action
468 private void performDeferredAction(DeferredAction action) {
481 * A shortHand for a frequently used sequence. MUST be called while
482 * synchronized on stateMachine.
484 * @param notifyAll If {@code true} then this is a life-cycle event and all
485 * waiters on the stateMachine should be notified. If {@code false} then
486 * only a single waiter will be notified for simple activity events.
487 * @return the deferred action.
489 private DeferredAction eventCalled(boolean notifyAll) {
490 DeferredAction action = deferredAction;
492 deferredAction = DeferredAction.ACTION_NONE;
494 stateMachine.notifyAll();
496 stateMachine.notify();
502 * Implement the methods that our shared messenger will use to report progress.
506 * The implementation will invoke this method when it becomes resolved,
507 * after connectImpl was invoked.
509 protected void up() {
510 DeferredAction action;
512 synchronized (stateMachine) {
513 stateMachine.upEvent();
514 action = eventCalled(true);
517 performDeferredAction(action); // we expect start but let the state machine decide.
521 * The implementation invokes this method when it becomes broken.
523 protected void down() {
524 DeferredAction action;
526 synchronized (stateMachine) {
527 stateMachine.downEvent();
528 action = eventCalled(true);
531 performDeferredAction(action); // we expect connect but let the state machine decide.
535 * Here, we behave like a queue to the shared messenger. When we report
536 * being empty, though, we're automatically removed from the active queues
537 * list. We'll go back there the next time we have something to send by
540 * @return pending message
542 protected PendingMessage peek() {
544 PendingMessage theMsg;
545 DeferredAction action = DeferredAction.ACTION_NONE;
547 synchronized (stateMachine) {
548 // We like the msg to keep occupying space in the queue until it's
549 // out the door. That way, idleness (that is, not currently working
550 // on a message), is always consistent with queue emptyness.
552 theMsg = queue.peek();
553 if (theMsg == null) {
554 stateMachine.idleEvent();
555 action = eventCalled(false);
557 // We do not notifyChange, here, because, if the queue is empty,
558 // it was already notified when the last message was popped. The
559 // call to idleEvent is only defensive programming to make extra
560 // sure the state machine is in sync.
566 // We've been asked to stop sending. Which, if we were sending,
567 // must be notified by either an idle event or a down
568 // event. Nothing needs to happen to the shared messenger. We're
570 stateMachine.downEvent();
571 action = eventCalled(true);
577 performDeferredAction(action); // we expect none but let the state machine decide.
582 * Returns the number of elements in this collection. If this collection
583 * contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
584 * <tt>Integer.MAX_VALUE</tt>.
586 * @return the number of elements in this collection
588 protected int size() {
593 * One message done. Update the saturated/etc state accordingly.
595 * @return true if there are more messages after the one we removed.
597 protected boolean poll() {
600 DeferredAction action;
602 synchronized (stateMachine) {
605 if (queue.peek() == null) {
606 stateMachine.idleEvent();
607 action = eventCalled(false);
610 stateMachine.msgsEvent();
611 action = eventCalled(false);
617 performDeferredAction(action); // we expect none but let the state machine decide.
623 * We invoke this method to be placed on the list of channels that have
626 * NOTE that it is the shared messenger responsibility to synchronize so
627 * that we cannot be added to the active list just before we get removed
628 * due to reporting an empty queue in parallel. So, if we report an empty
629 * queue and have a new message to send before the shared messenger removes
630 * us form the active list, startImpl will block until the removal is done.
631 * Then we'll be added back.
633 * If it cannot be done, it means that the shared messenger is no longer
634 * usable. It may call down() in sequence. Out of defensiveness, it should
635 * do so without holding its lock.
637 protected abstract void startImpl();
640 * We invoke this method to be placed on the list of channels that are
641 * waiting for resolution.
643 * If it cannot be done, it means that the shared messenger is no longer
644 * usable. It may call down() in sequence. Out of defensiveness, it should
645 * do so without holding its lock. If the messenger is already resolved it
646 * may call up() in sequence. Same wisdom applies. It is a good idea to
647 * create channels in the resolved state if the shared messenger is already
648 * resolved. That avoids this extra contortion.
650 protected abstract void connectImpl();
653 * This is invoked to inform the implementation that this channel is now in
654 * the resolPending or resolSaturated state. This is specific to this type
655 * of channels. The shared messenger must make sure that this channel
656 * remains strongly referenced, even though it is not resolved, because
657 * there are messages in it. It is valid for an application to let go of a
658 * channel after sending a message, even if the channel is not yet
659 * resolved. The message will go if/when the channel resolves. This method
660 * may be invoked redundantly and even once the channel is no longer among
661 * the one awaiting resolution. The implementation must be careful to
664 protected abstract void resolPendingImpl();