2 * Copyright (c) 2001-2007 Sun Microsystems, Inc. All rights reserved.
4 * The Sun Project JXTA(TM) Software License
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
9 * 1. Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
16 * 3. The end-user documentation included with the redistribution, if any, must
17 * include the following acknowledgment: "This product includes software
18 * developed by Sun Microsystems, Inc. for JXTA(TM) technology."
19 * Alternately, this acknowledgment may appear in the software itself, if
20 * and wherever such third-party acknowledgments normally appear.
22 * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must
23 * not be used to endorse or promote products derived from this software
24 * without prior written permission. For written permission, please contact
25 * Project JXTA at http://www.jxta.org.
27 * 5. Products derived from this software may not be called "JXTA", nor may
28 * "JXTA" appear in their name, without prior written permission of Sun.
30 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
31 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
32 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN
33 * MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
34 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
36 * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
37 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
38 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
39 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41 * JXTA is a registered trademark of Sun Microsystems, Inc. in the United
42 * States and other countries.
44 * Please see the license information page at :
45 * <http://www.jxta.org/project/www/license.html> for instructions on use of
46 * the license in source files.
48 * ====================================================================
50 * This software consists of voluntary contributions made by many individuals
51 * on behalf of Project JXTA. For more information on Project JXTA, please see
52 * http://www.jxta.org.
54 * This license is based on the BSD license adopted by the Apache Foundation.
57 package net.jxta.impl.pipe;
60 import net.jxta.document.Advertisement;
61 import net.jxta.id.ID;
62 import net.jxta.id.IDFactory;
63 import net.jxta.impl.util.TimeUtils;
64 import net.jxta.logging.Logging;
65 import net.jxta.peergroup.PeerGroup;
66 import net.jxta.pipe.InputPipe;
67 import net.jxta.pipe.OutputPipe;
68 import net.jxta.pipe.OutputPipeEvent;
69 import net.jxta.pipe.OutputPipeListener;
70 import net.jxta.pipe.PipeID;
71 import net.jxta.pipe.PipeMsgListener;
72 import net.jxta.pipe.PipeService;
73 import net.jxta.platform.Module;
74 import net.jxta.protocol.ModuleImplAdvertisement;
75 import net.jxta.protocol.PipeAdvertisement;
76 import net.jxta.service.Service;
77 import net.jxta.peer.PeerID;
79 import java.io.IOException;
81 import java.net.URISyntaxException;
82 import java.util.Collection;
83 import java.util.Collections;
84 import java.util.HashMap;
87 import java.util.logging.Level;
88 import java.util.logging.Logger;
89 import java.text.MessageFormat;
93 * A JXTA {@link net.jxta.pipe.PipeService} implementation which implements the
94 * standard JXTA Pipe Resolver Protocol (PRP).
96 * This class provides implementation for Unicast, unicast secure and
97 * (indirectly) propagate pipes.
99 * @see net.jxta.pipe.PipeService
100 * @see net.jxta.pipe.InputPipe
101 * @see net.jxta.pipe.OutputPipe
102 * @see net.jxta.endpoint.Message
103 * @see net.jxta.protocol.PipeAdvertisement
104 * @see net.jxta.protocol.PipeResolverMessage
105 * @see <a href="https://jxta-spec.dev.java.net/nonav/JXTAProtocols.html#proto-pbp" target="_blank">JXTA Protocols Specification : Pipe Binding Protocol</a>
107 public class PipeServiceImpl implements PipeService, PipeResolver.Listener {
112 private final static Logger LOG = Logger.getLogger(PipeServiceImpl.class.getName());
115 * the interval at which we verify that a pipe is still resolved at a
118 static final long VERIFYINTERVAL = 20 * TimeUtils.AMINUTE;
121 * The group this PipeService is working for.
123 private PeerGroup group = null;
126 * Our resolver handler.
128 private PipeResolver pipeResolver = null;
131 * Link to wire pipe impl.
133 private WirePipeImpl wirePipe = null;
136 * the interface object we will hand out.
138 private PipeService myInterface = null;
141 * the impl advertisement for this impl.
143 private ModuleImplAdvertisement implAdvertisement = null;
146 * Table of listeners for asynchronous output pipe creation.
149 * <li>keys are {@link net.jxta.pipe.PipeID}</li>
150 * <li>values are {@link java.util.Map}</li>
152 * Within the value Map:
154 * <li>keys are {@link java.lang.Integer} representing queryid</li>
155 * <li>values are {@link OutputPipeHolder}</li>
158 private final Map<PipeID, Map<Integer, OutputPipeHolder>> outputPipeListeners = new HashMap<PipeID, Map<Integer, OutputPipeHolder>>();
161 * Has the pipe service been started?
163 private volatile boolean started = false;
166 * holds a pipe adv and a listener which will be called for resolutions
169 private static class OutputPipeHolder {
170 final PipeAdvertisement adv;
171 final Set<? extends ID> peers;
172 final OutputPipeListener listener;
175 OutputPipeHolder(PipeAdvertisement adv, Set<? extends ID> peers, OutputPipeListener listener, int queryid) {
178 this.listener = listener;
179 this.queryid = queryid;
185 * A listener useful for implementing synchronous behaviour.
187 private static class syncListener implements OutputPipeListener {
189 volatile OutputPipeEvent event = null;
194 * Called when a input pipe has been located for a previously registered
195 * pipe. The event contains an {@link net.jxta.pipe.OutputPipe} which can
196 * be used to communicate with the remote peer.
198 * @param event <code>net.jxta.pipe.outputPipeEvent</code> event
200 public synchronized void outputPipeEvent(OutputPipeEvent event) {
201 // we only accept the first event.
202 if (null == this.event) {
210 * Default Constructor (don't delete)
212 public PipeServiceImpl() {// What is reason for this constructor???
213 // the same is automatically generated.
219 * We create only a single interface object and return it over and over
222 public synchronized PipeService getInterface() {
223 if (null == myInterface) {
224 myInterface = new PipeServiceInterface(this);
232 public ModuleImplAdvertisement getImplAdvertisement() {
233 return implAdvertisement;
239 public synchronized void init(PeerGroup group, ID assignedID, Advertisement impl) {
242 implAdvertisement = (ModuleImplAdvertisement) impl;
244 if (Logging.SHOW_CONFIG && LOG.isLoggable(Level.CONFIG)) {
245 StringBuilder configInfo = new StringBuilder("Configuring Pipe Service : " + assignedID);
247 if (implAdvertisement != null) {
248 configInfo.append("\n\tImplementation :");
249 configInfo.append("\n\t\tModule Spec ID: ").append(implAdvertisement.getModuleSpecID());
250 configInfo.append("\n\t\tImpl Description : ").append(implAdvertisement.getDescription());
251 configInfo.append("\n\t\tImpl URI : ").append(implAdvertisement.getUri());
252 configInfo.append("\n\t\tImpl Code : ").append(implAdvertisement.getCode());
255 configInfo.append("\n\tGroup Params :");
256 configInfo.append("\n\t\tGroup : ").append(group);
257 configInfo.append("\n\t\tPeer ID : ").append(group.getPeerID());
259 configInfo.append("\n\tConfiguration :");
260 configInfo.append("\n\t\tVerify Interval : " + VERIFYINTERVAL + "ms");
261 LOG.config(configInfo.toString());
268 * Currently this service does not expect arguments.
270 public synchronized int startApp(String[] args) {
272 Service needed = group.getEndpointService();
274 if (null == needed) {
275 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
276 LOG.warning("Stalled until there is an endpoint service");
278 return START_AGAIN_STALLED;
281 needed = group.getResolverService();
282 if (null == needed) {
283 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
284 LOG.warning("Stalled until there is a resolver service");
286 return START_AGAIN_STALLED;
289 needed = group.getMembershipService();
290 if (null == needed) {
291 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
292 LOG.warning("Stalled until there is a membership service");
294 return START_AGAIN_STALLED;
297 needed = group.getRendezVousService();
298 if (null == needed) {
299 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
300 LOG.warning("Stalled until there is a rendezvous service");
302 return START_AGAIN_STALLED;
305 // create our resolver handler; it will register itself w/ the resolver.
306 pipeResolver = new PipeResolver(group);
308 // Create the WirePipe (propagated pipe)
309 wirePipe = new WirePipeImpl(group, pipeResolver);
311 // XXX 20061221 We could check the result of this.
312 wirePipe.startApp(args);
316 return Module.START_OK;
322 public synchronized void stopApp() {
326 if (wirePipe != null) {
329 } catch (Throwable failed) {
330 LOG.log(Level.SEVERE, "Failed to stop wire pipe", failed);
336 if (pipeResolver != null) {
339 } catch (Throwable failed) {
340 LOG.log(Level.SEVERE, "Failed to stop pipe resolver", failed);
345 // Avoid cross-reference problem with GC
349 // clear outputPipeListeners
350 Collection<Map<Integer, OutputPipeHolder>> values = outputPipeListeners.values();
352 for (Map<Integer, OutputPipeHolder> value : values) {
355 outputPipeListeners.clear();
361 public InputPipe createInputPipe(PipeAdvertisement adv) throws IOException {
362 return createInputPipe(adv, null);
368 public InputPipe createInputPipe(PipeAdvertisement adv, PipeMsgListener listener) throws IOException {
371 throw new IllegalStateException("Pipe Service has not been started or has been stopped");
374 String type = adv.getType();
377 throw new IllegalArgumentException("PipeAdvertisement type may not be null");
380 PipeID pipeId = (PipeID) adv.getPipeID();
382 if (pipeId == null) {
383 throw new IllegalArgumentException("PipeAdvertisement PipeID may not be null");
386 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
387 LOG.fine("Create " + type + " InputPipe for " + pipeId);
391 // create an InputPipe.
392 if (type.equals(PipeService.UnicastType)) {
393 inputPipe = new InputPipeImpl(pipeResolver, adv, listener);
394 } else if (type.equals(PipeService.UnicastSecureType)) {
395 inputPipe = new SecureInputPipeImpl(pipeResolver, adv, listener);
396 } else if (type.equals(PipeService.PropagateType)) {
397 if (wirePipe != null) {
398 inputPipe = wirePipe.createInputPipe(adv, listener);
400 throw new IOException("No propagated pipe servive available");
404 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
405 LOG.severe("Cannot create pipe for unknown type : " + type);
407 throw new IOException("Cannot create pipe for unknown type : " + type);
415 public OutputPipe createOutputPipe(PipeAdvertisement pipeAdv, long timeout) throws IOException {
416 return createOutputPipe(pipeAdv, Collections.<ID>emptySet(), timeout);
422 public OutputPipe createOutputPipe(PipeAdvertisement adv, Set<? extends ID> resolvablePeers, long timeout) throws IOException {
423 // convert zero to max value.
425 timeout = Long.MAX_VALUE;
428 long absoluteTimeOut = TimeUtils.toAbsoluteTimeMillis(timeout);
430 // Make a listener, start async resolution and then wait until the timeout expires.
431 syncListener localListener = new syncListener();
433 int queryid = PipeResolver.getNextQueryID();
435 createOutputPipe(adv, resolvablePeers, localListener, queryid);
437 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
438 LOG.fine("Waiting synchronously for " + timeout + "ms to resolve OutputPipe for " + adv.getPipeID());
442 synchronized (localListener) {
443 while ((null == localListener.event) && (TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), absoluteTimeOut) < 0)) {
445 localListener.wait(TimeUtils.ASECOND);
446 } catch (InterruptedException woken) {
447 Thread.interrupted();
452 // remove the listener we installed.
453 removeOutputPipeListener(adv.getPipeID().toString(), queryid);
456 if (null != localListener.event) {
457 return localListener.event.getOutputPipe();
459 throw new IOException("Output Pipe could not be resolved after " + timeout + "ms.");
466 public void createOutputPipe(PipeAdvertisement pipeAdv, OutputPipeListener listener) throws IOException {
467 createOutputPipe(pipeAdv, Collections.<ID>emptySet(), listener);
473 public void createOutputPipe(PipeAdvertisement pipeAdv, Set<? extends ID> resolvablePeers, OutputPipeListener listener) throws IOException {
474 createOutputPipe(pipeAdv, resolvablePeers, listener, PipeResolver.getNextQueryID());
477 private void createOutputPipe(PipeAdvertisement pipeAdv, Set<? extends ID> resolvablePeers, OutputPipeListener listener, int queryid) throws IOException {
480 throw new IOException("Pipe Service has not been started or has been stopped");
483 // Recover the PipeId from the PipeServiceImpl Advertisement
484 PipeID pipeId = (PipeID) pipeAdv.getPipeID();
485 String type = pipeAdv.getType();
488 IllegalArgumentException failed = new IllegalArgumentException("Pipe type was not set");
489 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
490 LOG.log(Level.SEVERE, failed.getMessage(), failed);
495 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
496 LOG.fine("Create " + type + " OutputPipe for " + pipeId);
499 if (PipeService.PropagateType.equals(type)) {
502 if (resolvablePeers.size() == 1) {
503 op = new BlockingWireOutputPipe(group, pipeAdv, (PeerID) resolvablePeers.iterator().next());
505 if (wirePipe != null) {
506 op = wirePipe.createOutputPipe(pipeAdv, resolvablePeers);
508 throw new IOException("No propagated pipe service available");
513 OutputPipeEvent newevent = new OutputPipeEvent(this.getInterface(), op, pipeId.toString(), PipeResolver.ANYQUERY);
516 listener.outputPipeEvent(newevent);
517 } catch (Throwable ignored) {
518 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
519 LOG.log(Level.SEVERE,
520 "Uncaught Throwable in listener for " + pipeId + " (" + listener.getClass().getName() + ")",
525 } else if (PipeService.UnicastType.equals(type) || PipeService.UnicastSecureType.equals(type)) {
527 addOutputPipeListener(pipeId, new OutputPipeHolder(pipeAdv, resolvablePeers, listener, queryid));
528 pipeResolver.addListener(pipeId, this, queryid);
529 pipeResolver.sendPipeQuery(pipeAdv, resolvablePeers, queryid);
531 // look locally for the pipe
532 if (resolvablePeers.isEmpty() || resolvablePeers.contains(group.getPeerID())) {
533 InputPipe local = pipeResolver.findLocal(pipeId);
535 // if we have a local instance, make sure the local instance is of the same type.
537 if (local.getType().equals(pipeAdv.getType())) {
538 pipeResolver.callListener(queryid, pipeId, local.getType(), group.getPeerID(), false);
540 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
541 LOG.warning(MessageFormat.format("rejecting local pipe ({0}) because type is not ({1})", local.getType(),
549 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
550 LOG.severe("createOutputPipe: cannot create pipe for unknown type : " + type);
552 throw new IOException("cannot create pipe for unknown type : " + type);
557 * Add an output listener for the given pipeId.
559 private void addOutputPipeListener(PipeID pipeId, OutputPipeHolder pipeHolder) {
560 synchronized (outputPipeListeners) {
561 Map<Integer, OutputPipeHolder> perpipelisteners = outputPipeListeners.get(pipeId);
563 if (perpipelisteners == null) {
564 perpipelisteners = new HashMap<Integer, OutputPipeHolder>();
565 outputPipeListeners.put(pipeId, perpipelisteners);
567 if (perpipelisteners.get(pipeHolder.queryid) != null) {
568 LOG.warning("Clobbering output pipe listener for query " + pipeHolder.queryid);
570 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
571 LOG.fine("Adding pipe listener for pipe " + pipeId + " and query " + pipeHolder.queryid);
573 perpipelisteners.put(pipeHolder.queryid, pipeHolder);
580 public OutputPipeListener removeOutputPipeListener(String pipeID, OutputPipeListener listener) {
581 throw new UnsupportedOperationException("Legacy method not supported. Use interface object if you need this method.");
587 public OutputPipeListener removeOutputPipeListener(ID pipeID, OutputPipeListener listener) {
589 // remove all instances of this listener, regardless of queryid
590 if (pipeResolver == null) {
594 if (!(pipeID instanceof PipeID)) {
595 throw new IllegalArgumentException("pipeID must be a PipeID.");
598 synchronized (outputPipeListeners) {
599 Map<Integer, OutputPipeHolder> perpipelisteners = outputPipeListeners.get(pipeID);
601 if (perpipelisteners != null) {
602 Set<Map.Entry<Integer, OutputPipeHolder>> entries = perpipelisteners.entrySet();
604 for (Map.Entry<Integer, OutputPipeHolder> entry : entries) {
605 OutputPipeHolder pl = entry.getValue();
607 if (pl.listener == listener) {
608 pipeResolver.removeListener((PipeID) pipeID, pl.queryid);
609 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
610 LOG.fine("Removing listener for query " + pl.queryid);
612 perpipelisteners.remove(entry.getKey());
615 // clean up the map if there are no more listeners for the pipe
616 if (perpipelisteners.isEmpty()) {
617 outputPipeListeners.remove(pipeID);
624 private OutputPipeListener removeOutputPipeListener(String opID, int queryID) {
626 if (pipeResolver == null) {
632 URI aPipeID = new URI(opID);
633 pipeID = (PipeID) IDFactory.fromURI(aPipeID);
634 } catch (URISyntaxException badID) {
635 throw new IllegalArgumentException("Bad pipe ID: " + opID);
636 } catch (ClassCastException badID) {
637 throw new IllegalArgumentException("id was not a pipe id: " + opID);
640 synchronized (outputPipeListeners) {
641 Map<Integer, OutputPipeHolder> perpipelisteners = outputPipeListeners.get(pipeID);
643 if (perpipelisteners != null) {
644 OutputPipeHolder pipeHolder = perpipelisteners.get(queryID);
646 perpipelisteners.remove(queryID);
647 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
648 LOG.fine("Removing listener for query " + queryID);
650 // clean up the map if there are no more listeners for the pipe
651 if (perpipelisteners.isEmpty()) {
652 outputPipeListeners.remove(pipeID);
654 pipeResolver.removeListener(pipeID, queryID);
655 if (pipeHolder != null) {
656 return pipeHolder.listener;
667 public boolean pipeResolveEvent(PipeResolver.Event event) {
670 ID peerID = event.getPeerID();
671 ID pipeID = event.getPipeID();
672 int queryID = event.getQueryID();
673 OutputPipeHolder pipeHolder;
675 synchronized (outputPipeListeners) {
676 Map<Integer, OutputPipeHolder> perpipelisteners = outputPipeListeners.get(pipeID);
678 if (perpipelisteners == null) {
679 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
680 LOG.fine("No listener for event for pipe " + pipeID);
684 pipeHolder = perpipelisteners.get(queryID);
685 if (pipeHolder == null) {
686 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
687 LOG.fine("No listener for event for query " + queryID);
693 // check if they wanted a resolve from a specific peer.
694 if (!pipeHolder.peers.isEmpty() && !pipeHolder.peers.contains(peerID)) {
695 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
696 LOG.warning("Event was for wrong peer \'" + peerID + "\'. Discarding.");
702 String type = pipeHolder.adv.getType();
705 if (PipeService.UnicastType.equals(type)) {
706 op = new NonBlockingOutputPipe(group, pipeResolver, pipeHolder.adv, peerID, pipeHolder.peers);
707 } else if (PipeService.UnicastSecureType.equals(type)) {
708 op = new SecureOutputPipe(group, pipeResolver, pipeHolder.adv, peerID, pipeHolder.peers);
710 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
711 LOG.warning("Could not create output pipe of type \'" + type + "\'. Discarding.");
716 // Generate an event when the output pipe was succesfully opened.
717 OutputPipeEvent newevent = new OutputPipeEvent(this.getInterface(), op, pipeID.toString(), queryID);
719 pipeHolder.listener.outputPipeEvent(newevent);
720 } catch (Throwable ignored) {
721 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
724 "Uncaught Throwable in listener for " + pipeID + "(" + pipeHolder.getClass().getName() + ")", ignored);
727 removeOutputPipeListener(pipeID.toString(), queryID);
729 } catch (IOException ie) {
730 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
731 LOG.log(Level.SEVERE, "Error creating output pipe " + event.getPipeID(), ie);
734 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
735 LOG.fine("No listener for event for " + event.getPipeID());
743 * We don't do anything with NAKs (yet)
745 public boolean pipeNAKEvent(PipeResolver.Event event) {