]> sjero.net Git - linphone/blob - p2pproxy/dependencies-src/jxse-src-2.5/impl/src/net/jxta/impl/pipe/PipeServiceImpl.java
e36833d0b7e4f6508503baf2ed5952791e87b9c0
[linphone] / p2pproxy / dependencies-src / jxse-src-2.5 / impl / src / net / jxta / impl / pipe / PipeServiceImpl.java
1 /*
2  * Copyright (c) 2001-2007 Sun Microsystems, Inc.  All rights reserved.
3  *  
4  *  The Sun Project JXTA(TM) Software License
5  *  
6  *  Redistribution and use in source and binary forms, with or without 
7  *  modification, are permitted provided that the following conditions are met:
8  *  
9  *  1. Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *  
12  *  2. Redistributions in binary form must reproduce the above copyright notice, 
13  *     this list of conditions and the following disclaimer in the documentation 
14  *     and/or other materials provided with the distribution.
15  *  
16  *  3. The end-user documentation included with the redistribution, if any, must 
17  *     include the following acknowledgment: "This product includes software 
18  *     developed by Sun Microsystems, Inc. for JXTA(TM) technology." 
19  *     Alternately, this acknowledgment may appear in the software itself, if 
20  *     and wherever such third-party acknowledgments normally appear.
21  *  
22  *  4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must 
23  *     not be used to endorse or promote products derived from this software 
24  *     without prior written permission. For written permission, please contact 
25  *     Project JXTA at http://www.jxta.org.
26  *  
27  *  5. Products derived from this software may not be called "JXTA", nor may 
28  *     "JXTA" appear in their name, without prior written permission of Sun.
29  *  
30  *  THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
31  *  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND 
32  *  FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN 
33  *  MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
34  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 
35  *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, 
36  *  OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
37  *  LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
38  *  NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, 
39  *  EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
40  *  
41  *  JXTA is a registered trademark of Sun Microsystems, Inc. in the United 
42  *  States and other countries.
43  *  
44  *  Please see the license information page at :
45  *  <http://www.jxta.org/project/www/license.html> for instructions on use of 
46  *  the license in source files.
47  *  
48  *  ====================================================================
49  *  
50  *  This software consists of voluntary contributions made by many individuals 
51  *  on behalf of Project JXTA. For more information on Project JXTA, please see 
52  *  http://www.jxta.org.
53  *  
54  *  This license is based on the BSD license adopted by the Apache Foundation. 
55  */
56
57 package net.jxta.impl.pipe;
58
59
60 import net.jxta.document.Advertisement;
61 import net.jxta.id.ID;
62 import net.jxta.id.IDFactory;
63 import net.jxta.impl.util.TimeUtils;
64 import net.jxta.logging.Logging;
65 import net.jxta.peergroup.PeerGroup;
66 import net.jxta.pipe.InputPipe;
67 import net.jxta.pipe.OutputPipe;
68 import net.jxta.pipe.OutputPipeEvent;
69 import net.jxta.pipe.OutputPipeListener;
70 import net.jxta.pipe.PipeID;
71 import net.jxta.pipe.PipeMsgListener;
72 import net.jxta.pipe.PipeService;
73 import net.jxta.platform.Module;
74 import net.jxta.protocol.ModuleImplAdvertisement;
75 import net.jxta.protocol.PipeAdvertisement;
76 import net.jxta.service.Service;
77 import net.jxta.peer.PeerID;
78
79 import java.io.IOException;
80 import java.net.URI;
81 import java.net.URISyntaxException;
82 import java.util.Collection;
83 import java.util.Collections;
84 import java.util.HashMap;
85 import java.util.Map;
86 import java.util.Set;
87 import java.util.logging.Level;
88 import java.util.logging.Logger;
89 import java.text.MessageFormat;
90
91
92 /**
93  * A JXTA {@link net.jxta.pipe.PipeService} implementation which implements the
94  * standard JXTA Pipe Resolver Protocol (PRP).
95  * <p/>
96  * This class provides implementation for Unicast, unicast secure and
97  * (indirectly) propagate pipes.
98  *
99  * @see net.jxta.pipe.PipeService
100  * @see net.jxta.pipe.InputPipe
101  * @see net.jxta.pipe.OutputPipe
102  * @see net.jxta.endpoint.Message
103  * @see net.jxta.protocol.PipeAdvertisement
104  * @see net.jxta.protocol.PipeResolverMessage
105  * @see <a href="https://jxta-spec.dev.java.net/nonav/JXTAProtocols.html#proto-pbp" target="_blank">JXTA Protocols Specification : Pipe Binding Protocol</a>
106  */
107 public class PipeServiceImpl implements PipeService, PipeResolver.Listener {
108
109     /**
110      * The Logger
111      */
112     private final static Logger LOG = Logger.getLogger(PipeServiceImpl.class.getName());
113
114     /**
115      * the interval at which we verify that a pipe is still resolved at a
116      * remote peer.
117      */
118     static final long VERIFYINTERVAL = 20 * TimeUtils.AMINUTE;
119
120     /**
121      * The group this PipeService is working for.
122      */
123     private PeerGroup group = null;
124
125     /**
126      * Our resolver handler.
127      */
128     private PipeResolver pipeResolver = null;
129
130     /**
131      * Link to wire pipe impl.
132      */
133     private WirePipeImpl wirePipe = null;
134
135     /**
136      * the interface object we will hand out.
137      */
138     private PipeService myInterface = null;
139
140     /**
141      * the impl advertisement for this impl.
142      */
143     private ModuleImplAdvertisement implAdvertisement = null;
144
145     /**
146      * Table of listeners for asynchronous output pipe creation.
147      * <p/>
148      * <ul>
149      * <li>keys are {@link net.jxta.pipe.PipeID}</li>
150      * <li>values are {@link java.util.Map}</li>
151      * </ul>
152      * Within the value Map:
153      * <ul>
154      * <li>keys are {@link java.lang.Integer} representing queryid</li>
155      * <li>values are {@link OutputPipeHolder}</li>
156      * </ul>
157      */
158     private final Map<PipeID, Map<Integer, OutputPipeHolder>> outputPipeListeners = new HashMap<PipeID, Map<Integer, OutputPipeHolder>>();
159
160     /**
161      * Has the pipe service been started?
162      */
163     private volatile boolean started = false;
164
165     /**
166      * holds a pipe adv and a listener which will be called for resolutions
167      * of the pipe.
168      */
169     private static class OutputPipeHolder {
170         final PipeAdvertisement adv;
171         final Set<? extends ID> peers;
172         final OutputPipeListener listener;
173         final int queryid;
174
175         OutputPipeHolder(PipeAdvertisement adv, Set<? extends ID> peers, OutputPipeListener listener, int queryid) {
176             this.adv = adv;
177             this.peers = peers;
178             this.listener = listener;
179             this.queryid = queryid;
180         }
181     }
182
183
184     /**
185      * A listener useful for implementing synchronous behaviour.
186      */
187     private static class syncListener implements OutputPipeListener {
188
189         volatile OutputPipeEvent event = null;
190
191         syncListener() {}
192
193         /**
194          * Called when a input pipe has been located for a previously registered
195          * pipe. The event contains an {@link net.jxta.pipe.OutputPipe} which can
196          * be used to communicate with the remote peer.
197          *
198          * @param event <code>net.jxta.pipe.outputPipeEvent</code> event
199          */
200         public synchronized void outputPipeEvent(OutputPipeEvent event) {
201             // we only accept the first event.
202             if (null == this.event) {
203                 this.event = event;
204                 notifyAll();
205             }
206         }
207     }
208
209     /**
210      * Default Constructor (don't delete)
211      */
212     public PipeServiceImpl() {// What is reason for this constructor???
213         // the same is automatically generated.
214     }
215
216     /**
217      * {@inheritDoc}
218      * <p/>
219      * We create only a single interface object and return it over and over
220      * again.
221      */
222     public synchronized PipeService getInterface() {
223         if (null == myInterface) {
224             myInterface = new PipeServiceInterface(this);
225         }
226         return myInterface;
227     }
228
229     /**
230      * {@inheritDoc}
231      */
232     public ModuleImplAdvertisement getImplAdvertisement() {
233         return implAdvertisement;
234     }
235
236     /**
237      * {@inheritDoc}
238      */
239     public synchronized void init(PeerGroup group, ID assignedID, Advertisement impl) {
240
241         this.group = group;
242         implAdvertisement = (ModuleImplAdvertisement) impl;
243
244         if (Logging.SHOW_CONFIG && LOG.isLoggable(Level.CONFIG)) {
245             StringBuilder configInfo = new StringBuilder("Configuring Pipe Service : " + assignedID);
246
247             if (implAdvertisement != null) {
248                 configInfo.append("\n\tImplementation :");
249                 configInfo.append("\n\t\tModule Spec ID: ").append(implAdvertisement.getModuleSpecID());
250                 configInfo.append("\n\t\tImpl Description : ").append(implAdvertisement.getDescription());
251                 configInfo.append("\n\t\tImpl URI : ").append(implAdvertisement.getUri());
252                 configInfo.append("\n\t\tImpl Code : ").append(implAdvertisement.getCode());
253             }
254
255             configInfo.append("\n\tGroup Params :");
256             configInfo.append("\n\t\tGroup : ").append(group);
257             configInfo.append("\n\t\tPeer ID : ").append(group.getPeerID());
258
259             configInfo.append("\n\tConfiguration :");
260             configInfo.append("\n\t\tVerify Interval : " + VERIFYINTERVAL + "ms");
261             LOG.config(configInfo.toString());
262         }
263     }
264
265     /**
266      * {@inheritDoc}
267      * <p/>
268      * Currently this service does not expect arguments.
269      */
270     public synchronized int startApp(String[] args) {
271
272         Service needed = group.getEndpointService();
273
274         if (null == needed) {
275             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
276                 LOG.warning("Stalled until there is an endpoint service");
277             }
278             return START_AGAIN_STALLED;
279         }
280
281         needed = group.getResolverService();
282         if (null == needed) {
283             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
284                 LOG.warning("Stalled until there is a resolver service");
285             }
286             return START_AGAIN_STALLED;
287         }
288
289         needed = group.getMembershipService();
290         if (null == needed) {
291             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
292                 LOG.warning("Stalled until there is a membership service");
293             }
294             return START_AGAIN_STALLED;
295         }
296
297         needed = group.getRendezVousService();
298         if (null == needed) {
299             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
300                 LOG.warning("Stalled until there is a rendezvous service");
301             }
302             return START_AGAIN_STALLED;
303         }
304
305         // create our resolver handler; it will register itself w/ the resolver.
306         pipeResolver = new PipeResolver(group);
307
308         // Create the WirePipe (propagated pipe)
309         wirePipe = new WirePipeImpl(group, pipeResolver);
310
311         // XXX 20061221 We could check the result of this.
312         wirePipe.startApp(args);
313
314         started = true;
315
316         return Module.START_OK;
317     }
318
319     /**
320      * {@inheritDoc}
321      */
322     public synchronized void stopApp() {
323         started = false;
324
325         try {
326             if (wirePipe != null) {
327                 wirePipe.stopApp();
328             }
329         } catch (Throwable failed) {
330             LOG.log(Level.SEVERE, "Failed to stop wire pipe", failed);
331         } finally {
332             wirePipe = null;
333         }
334
335         try {
336             if (pipeResolver != null) {
337                 pipeResolver.stop();
338             }
339         } catch (Throwable failed) {
340             LOG.log(Level.SEVERE, "Failed to stop pipe resolver", failed);
341         } finally {
342             pipeResolver = null;
343         }
344
345         // Avoid cross-reference problem with GC
346         group = null;
347         myInterface = null;
348
349         // clear outputPipeListeners
350         Collection<Map<Integer, OutputPipeHolder>> values = outputPipeListeners.values();
351
352         for (Map<Integer, OutputPipeHolder> value : values) {
353             value.clear();
354         }
355         outputPipeListeners.clear();
356     }
357
358     /**
359      * {@inheritDoc}
360      */
361     public InputPipe createInputPipe(PipeAdvertisement adv) throws IOException {
362         return createInputPipe(adv, null);
363     }
364
365     /**
366      * {@inheritDoc}
367      */
368     public InputPipe createInputPipe(PipeAdvertisement adv, PipeMsgListener listener) throws IOException {
369
370         if (!started) {
371             throw new IllegalStateException("Pipe Service has not been started or has been stopped");
372         }
373
374         String type = adv.getType();
375
376         if (type == null) {
377             throw new IllegalArgumentException("PipeAdvertisement type may not be null");
378         }
379
380         PipeID pipeId = (PipeID) adv.getPipeID();
381
382         if (pipeId == null) {
383             throw new IllegalArgumentException("PipeAdvertisement PipeID may not be null");
384         }
385
386         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
387             LOG.fine("Create " + type + " InputPipe for " + pipeId);
388         }
389
390         InputPipe inputPipe;
391         // create an InputPipe.
392         if (type.equals(PipeService.UnicastType)) {
393             inputPipe = new InputPipeImpl(pipeResolver, adv, listener);
394         } else if (type.equals(PipeService.UnicastSecureType)) {
395             inputPipe = new SecureInputPipeImpl(pipeResolver, adv, listener);
396         } else if (type.equals(PipeService.PropagateType)) {
397             if (wirePipe != null) {
398                 inputPipe = wirePipe.createInputPipe(adv, listener);
399             } else {
400                 throw new IOException("No propagated pipe servive available");
401             }
402         } else {
403             // Unknown type
404             if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
405                 LOG.severe("Cannot create pipe for unknown type : " + type);
406             }
407             throw new IOException("Cannot create pipe for unknown type : " + type);
408         }
409         return inputPipe;
410     }
411
412     /**
413      * {@inheritDoc}
414      */
415     public OutputPipe createOutputPipe(PipeAdvertisement pipeAdv, long timeout) throws IOException {
416         return createOutputPipe(pipeAdv, Collections.<ID>emptySet(), timeout);
417     }
418
419     /**
420      * {@inheritDoc}
421      */
422     public OutputPipe createOutputPipe(PipeAdvertisement adv, Set<? extends ID> resolvablePeers, long timeout) throws IOException {
423         // convert zero to max value.
424         if (0 == timeout) {
425             timeout = Long.MAX_VALUE;
426         }
427
428         long absoluteTimeOut = TimeUtils.toAbsoluteTimeMillis(timeout);
429
430         // Make a listener, start async resolution and then wait until the timeout expires.
431         syncListener localListener = new syncListener();
432
433         int queryid = PipeResolver.getNextQueryID();
434
435         createOutputPipe(adv, resolvablePeers, localListener, queryid);
436
437         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
438             LOG.fine("Waiting synchronously for " + timeout + "ms to resolve OutputPipe for " + adv.getPipeID());
439         }
440
441         try {
442             synchronized (localListener) {
443                 while ((null == localListener.event) && (TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), absoluteTimeOut) < 0)) {
444                     try {
445                         localListener.wait(TimeUtils.ASECOND);
446                     } catch (InterruptedException woken) {
447                         Thread.interrupted();
448                     }
449                 }
450             }
451         } finally {
452             // remove the listener we installed.
453             removeOutputPipeListener(adv.getPipeID().toString(), queryid);
454         }
455
456         if (null != localListener.event) {
457             return localListener.event.getOutputPipe();
458         } else {
459             throw new IOException("Output Pipe could not be resolved after " + timeout + "ms.");
460         }
461     }
462
463     /**
464      * {@inheritDoc}
465      */
466     public void createOutputPipe(PipeAdvertisement pipeAdv, OutputPipeListener listener) throws IOException {
467         createOutputPipe(pipeAdv, Collections.<ID>emptySet(), listener);
468     }
469
470     /**
471      * {@inheritDoc}
472      */
473     public void createOutputPipe(PipeAdvertisement pipeAdv, Set<? extends ID> resolvablePeers, OutputPipeListener listener) throws IOException {
474         createOutputPipe(pipeAdv, resolvablePeers, listener, PipeResolver.getNextQueryID());
475     }
476
477     private void createOutputPipe(PipeAdvertisement pipeAdv, Set<? extends ID> resolvablePeers, OutputPipeListener listener, int queryid) throws IOException {
478
479         if (!started) {
480             throw new IOException("Pipe Service has not been started or has been stopped");
481         }
482
483         // Recover the PipeId from the PipeServiceImpl Advertisement
484         PipeID pipeId = (PipeID) pipeAdv.getPipeID();
485         String type = pipeAdv.getType();
486
487         if (null == type) {
488             IllegalArgumentException failed = new IllegalArgumentException("Pipe type was not set");
489             if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
490                 LOG.log(Level.SEVERE, failed.getMessage(), failed);
491             }
492             throw failed;
493         }
494
495         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
496             LOG.fine("Create " + type + " OutputPipe for " + pipeId);
497         }
498
499         if (PipeService.PropagateType.equals(type)) {
500             OutputPipe op;
501
502             if (resolvablePeers.size() == 1) {
503                 op = new BlockingWireOutputPipe(group, pipeAdv, (PeerID) resolvablePeers.iterator().next());
504             } else {
505                 if (wirePipe != null) {
506                     op = wirePipe.createOutputPipe(pipeAdv, resolvablePeers);
507                 } else {
508                     throw new IOException("No propagated pipe service available");
509                 }
510             }
511
512             if (null != op) {
513                 OutputPipeEvent newevent = new OutputPipeEvent(this.getInterface(), op, pipeId.toString(), PipeResolver.ANYQUERY);
514
515                 try {
516                     listener.outputPipeEvent(newevent);
517                 } catch (Throwable ignored) {
518                     if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
519                         LOG.log(Level.SEVERE,
520                                 "Uncaught Throwable in listener for " + pipeId + " (" + listener.getClass().getName() + ")",
521                                 ignored);
522                     }
523                 }
524             }
525         } else if (PipeService.UnicastType.equals(type) || PipeService.UnicastSecureType.equals(type)) {
526
527             addOutputPipeListener(pipeId, new OutputPipeHolder(pipeAdv, resolvablePeers, listener, queryid));
528             pipeResolver.addListener(pipeId, this, queryid);
529             pipeResolver.sendPipeQuery(pipeAdv, resolvablePeers, queryid);
530
531             // look locally for the pipe
532             if (resolvablePeers.isEmpty() || resolvablePeers.contains(group.getPeerID())) {
533                 InputPipe local = pipeResolver.findLocal(pipeId);
534
535                 // if we have a local instance, make sure the local instance is of the same type.
536                 if (null != local) {
537                     if (local.getType().equals(pipeAdv.getType())) {
538                         pipeResolver.callListener(queryid, pipeId, local.getType(), group.getPeerID(), false);
539                     } else {
540                         if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
541                             LOG.warning(MessageFormat.format("rejecting local pipe ({0}) because type is not ({1})", local.getType(),
542                                     pipeAdv.getType()));
543                         }
544                     }
545                 }
546             }
547         } else {
548             // Unknown type
549             if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
550                 LOG.severe("createOutputPipe: cannot create pipe for unknown type : " + type);
551             }
552             throw new IOException("cannot create pipe for unknown type : " + type);
553         }
554     }
555
556     /*
557      * Add an output listener for the given pipeId.
558      */
559     private void addOutputPipeListener(PipeID pipeId, OutputPipeHolder pipeHolder) {
560         synchronized (outputPipeListeners) {
561             Map<Integer, OutputPipeHolder> perpipelisteners = outputPipeListeners.get(pipeId);
562
563             if (perpipelisteners == null) {
564                 perpipelisteners = new HashMap<Integer, OutputPipeHolder>();
565                 outputPipeListeners.put(pipeId, perpipelisteners);
566             }
567             if (perpipelisteners.get(pipeHolder.queryid) != null) {
568                 LOG.warning("Clobbering output pipe listener for query " + pipeHolder.queryid);
569             }
570             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
571                 LOG.fine("Adding pipe listener for pipe " + pipeId + " and query " + pipeHolder.queryid);
572             }
573             perpipelisteners.put(pipeHolder.queryid, pipeHolder);
574         }
575     }
576
577     /**
578      * {@inheritDoc}
579      */
580     public OutputPipeListener removeOutputPipeListener(String pipeID, OutputPipeListener listener) {
581         throw new UnsupportedOperationException("Legacy method not supported. Use interface object if you need this method.");
582     }
583
584     /**
585      * {@inheritDoc}
586      */
587     public OutputPipeListener removeOutputPipeListener(ID pipeID, OutputPipeListener listener) {
588
589         // remove all instances of this listener, regardless of queryid
590         if (pipeResolver == null) {
591             return null;
592         }
593
594         if (!(pipeID instanceof PipeID)) {
595             throw new IllegalArgumentException("pipeID must be a PipeID.");
596         }
597
598         synchronized (outputPipeListeners) {
599             Map<Integer, OutputPipeHolder> perpipelisteners = outputPipeListeners.get(pipeID);
600
601             if (perpipelisteners != null) {
602                 Set<Map.Entry<Integer, OutputPipeHolder>> entries = perpipelisteners.entrySet();
603
604                 for (Map.Entry<Integer, OutputPipeHolder> entry : entries) {
605                     OutputPipeHolder pl = entry.getValue();
606
607                     if (pl.listener == listener) {
608                         pipeResolver.removeListener((PipeID) pipeID, pl.queryid);
609                         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
610                             LOG.fine("Removing listener for query " + pl.queryid);
611                         }
612                         perpipelisteners.remove(entry.getKey());
613                     }
614                 }
615                 // clean up the map if there are no more listeners for the pipe
616                 if (perpipelisteners.isEmpty()) {
617                     outputPipeListeners.remove(pipeID);
618                 }
619             }
620         }
621         return listener;
622     }
623
624     private OutputPipeListener removeOutputPipeListener(String opID, int queryID) {
625
626         if (pipeResolver == null) {
627             return null;
628         }
629
630         PipeID pipeID;
631         try {
632             URI aPipeID = new URI(opID);
633             pipeID = (PipeID) IDFactory.fromURI(aPipeID);
634         } catch (URISyntaxException badID) {
635             throw new IllegalArgumentException("Bad pipe ID: " + opID);
636         } catch (ClassCastException badID) {
637             throw new IllegalArgumentException("id was not a pipe id: " + opID);
638         }
639
640         synchronized (outputPipeListeners) {
641             Map<Integer, OutputPipeHolder> perpipelisteners = outputPipeListeners.get(pipeID);
642
643             if (perpipelisteners != null) {
644                 OutputPipeHolder pipeHolder = perpipelisteners.get(queryID);
645
646                 perpipelisteners.remove(queryID);
647                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
648                     LOG.fine("Removing listener for query " + queryID);
649                 }
650                 // clean up the map if there are no more listeners for the pipe
651                 if (perpipelisteners.isEmpty()) {
652                     outputPipeListeners.remove(pipeID);
653                 }
654                 pipeResolver.removeListener(pipeID, queryID);
655                 if (pipeHolder != null) {
656                     return pipeHolder.listener;
657                 }
658             }
659
660         }
661         return null;
662     }
663
664     /**
665      * {@inheritDoc}
666      */
667     public boolean pipeResolveEvent(PipeResolver.Event event) {
668
669         try {
670             ID peerID = event.getPeerID();
671             ID pipeID = event.getPipeID();
672             int queryID = event.getQueryID();
673             OutputPipeHolder pipeHolder;
674
675             synchronized (outputPipeListeners) {
676                 Map<Integer, OutputPipeHolder> perpipelisteners = outputPipeListeners.get(pipeID);
677
678                 if (perpipelisteners == null) {
679                     if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
680                         LOG.fine("No listener for event for pipe " + pipeID);
681                     }
682                     return false;
683                 }
684                 pipeHolder = perpipelisteners.get(queryID);
685                 if (pipeHolder == null) {
686                     if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
687                         LOG.fine("No listener for event for query " + queryID);
688                     }
689                     return false;
690                 }
691             }
692
693             // check if they wanted a resolve from a specific peer.
694             if (!pipeHolder.peers.isEmpty() && !pipeHolder.peers.contains(peerID)) {
695                 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
696                     LOG.warning("Event was for wrong peer \'" + peerID + "\'. Discarding.");
697                 }
698                 return false;
699             }
700
701             // create op
702             String type = pipeHolder.adv.getType();
703             OutputPipe op;
704
705             if (PipeService.UnicastType.equals(type)) {
706                 op = new NonBlockingOutputPipe(group, pipeResolver, pipeHolder.adv, peerID, pipeHolder.peers);
707             } else if (PipeService.UnicastSecureType.equals(type)) {
708                 op = new SecureOutputPipe(group, pipeResolver, pipeHolder.adv, peerID, pipeHolder.peers);
709             } else {
710                 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
711                     LOG.warning("Could not create output pipe of type \'" + type + "\'. Discarding.");
712                 }
713                 return false;
714             }
715
716             // Generate an event when the output pipe was succesfully opened.
717             OutputPipeEvent newevent = new OutputPipeEvent(this.getInterface(), op, pipeID.toString(), queryID);
718             try {
719                 pipeHolder.listener.outputPipeEvent(newevent);
720             } catch (Throwable ignored) {
721                 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
722                     LOG.log(Level.SEVERE
723                             ,
724                             "Uncaught Throwable in listener for " + pipeID + "(" + pipeHolder.getClass().getName() + ")", ignored);
725                 }
726             }
727             removeOutputPipeListener(pipeID.toString(), queryID);
728             return true;
729         } catch (IOException ie) {
730             if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
731                 LOG.log(Level.SEVERE, "Error creating output pipe " + event.getPipeID(), ie);
732             }
733         }
734         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
735             LOG.fine("No listener for event for " + event.getPipeID());
736         }
737         return false;
738     }
739
740     /**
741      * {@inheritDoc}
742      * <p/>
743      * We don't do anything with NAKs (yet)
744      */
745     public boolean pipeNAKEvent(PipeResolver.Event event) {
746         return false;
747     }
748 }