]> sjero.net Git - linphone/blob - p2pproxy/dependencies-src/jxse-src-2.5/impl/src/net/jxta/impl/proxy/ProxyService.java
81e1cd9fee693025f71a4fa9c4730eab53329867
[linphone] / p2pproxy / dependencies-src / jxse-src-2.5 / impl / src / net / jxta / impl / proxy / ProxyService.java
1 /*
2  * Copyright (c) 2005-2007 Sun Microsystems, Inc.  All rights reserved.
3  *  
4  *  The Sun Project JXTA(TM) Software License
5  *  
6  *  Redistribution and use in source and binary forms, with or without 
7  *  modification, are permitted provided that the following conditions are met:
8  *  
9  *  1. Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *  
12  *  2. Redistributions in binary form must reproduce the above copyright notice, 
13  *     this list of conditions and the following disclaimer in the documentation 
14  *     and/or other materials provided with the distribution.
15  *  
16  *  3. The end-user documentation included with the redistribution, if any, must 
17  *     include the following acknowledgment: "This product includes software 
18  *     developed by Sun Microsystems, Inc. for JXTA(TM) technology." 
19  *     Alternately, this acknowledgment may appear in the software itself, if 
20  *     and wherever such third-party acknowledgments normally appear.
21  *  
22  *  4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must 
23  *     not be used to endorse or promote products derived from this software 
24  *     without prior written permission. For written permission, please contact 
25  *     Project JXTA at http://www.jxta.org.
26  *  
27  *  5. Products derived from this software may not be called "JXTA", nor may 
28  *     "JXTA" appear in their name, without prior written permission of Sun.
29  *  
30  *  THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
31  *  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND 
32  *  FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN 
33  *  MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
34  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 
35  *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, 
36  *  OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
37  *  LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
38  *  NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, 
39  *  EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
40  *  
41  *  JXTA is a registered trademark of Sun Microsystems, Inc. in the United 
42  *  States and other countries.
43  *  
44  *  Please see the license information page at :
45  *  <http://www.jxta.org/project/www/license.html> for instructions on use of 
46  *  the license in source files.
47  *  
48  *  ====================================================================
49  *  
50  *  This software consists of voluntary contributions made by many individuals 
51  *  on behalf of Project JXTA. For more information on Project JXTA, please see 
52  *  http://www.jxta.org.
53  *  
54  *  This license is based on the BSD license adopted by the Apache Foundation. 
55  */
56 package net.jxta.impl.proxy;
57
58 import net.jxta.discovery.DiscoveryEvent;
59 import net.jxta.discovery.DiscoveryService;
60 import net.jxta.document.Advertisement;
61 import net.jxta.document.AdvertisementFactory;
62 import net.jxta.document.MimeMediaType;
63 import net.jxta.endpoint.EndpointAddress;
64 import net.jxta.endpoint.EndpointListener;
65 import net.jxta.endpoint.EndpointService;
66 import net.jxta.endpoint.Message;
67 import net.jxta.endpoint.MessageElement;
68 import net.jxta.endpoint.StringMessageElement;
69 import net.jxta.exception.PeerGroupException;
70 import net.jxta.id.ID;
71 import net.jxta.id.IDFactory;
72 import net.jxta.impl.util.Cache;
73 import net.jxta.impl.util.CacheEntry;
74 import net.jxta.impl.util.CacheEntryListener;
75 import net.jxta.impl.util.LRUCache;
76 import net.jxta.peer.PeerID;
77 import net.jxta.peergroup.PeerGroup;
78 import net.jxta.peergroup.PeerGroupID;
79 import net.jxta.pipe.InputPipe;
80 import net.jxta.pipe.OutputPipe;
81 import net.jxta.pipe.OutputPipeEvent;
82 import net.jxta.pipe.OutputPipeListener;
83 import net.jxta.pipe.PipeMsgEvent;
84 import net.jxta.pipe.PipeMsgListener;
85 import net.jxta.pipe.PipeService;
86 import net.jxta.protocol.DiscoveryResponseMsg;
87 import net.jxta.protocol.ModuleImplAdvertisement;
88 import net.jxta.protocol.PeerAdvertisement;
89 import net.jxta.protocol.PeerGroupAdvertisement;
90 import net.jxta.protocol.PipeAdvertisement;
91 import net.jxta.service.Service;
92 import java.util.logging.Level;
93 import net.jxta.logging.Logging;
94 import java.util.logging.Logger;
95
96 import java.io.IOException;
97 import java.io.StringReader;
98 import java.net.URI;
99 import java.net.URISyntaxException;
100 import java.util.Enumeration;
101 import java.util.HashMap;
102 import java.util.Iterator;
103 import java.util.LinkedList;
104 import java.util.Map;
105 import java.util.TreeMap;
106
107 import net.jxta.platform.Module;
108
109
110 // FIXME: jice@jxta.org - 20020515
111 // All public methods are synchronized.
112 // None of them does anything blocking so that should be about OK, however
113 // first it is not 100% sure, second eventhough non-blocking, some of these
114 // operations could take a significant amount of time, which may be unfair
115 // to other threads that wish to enter for a quick operation.
116 // Making the locking finer-grain would require significant code rework, so
117 // it will have to do for now.
118
119 public class ProxyService implements Service, EndpointListener, PipeMsgListener, OutputPipeListener, CacheEntryListener {
120
121     private final static Logger LOG = Logger.getLogger(ProxyService.class.getName());
122
123     public final static int DEFAULT_THRESHOLD = 2;
124     public final static int DEFAULT_LIFETIME = 1000 * 60 * 30; // 30 minutes
125
126     /**
127      * *********************************************************************
128      * Define the proxy message tags
129      * ********************************************************************
130      */
131     public static final String REQUEST_TAG = "request";
132     public static final String RESPONSE_TAG = "response";
133
134     static final String REQUESTID_TAG = "requestId";
135     static final String TYPE_TAG = "type";
136     static final String NAME_TAG = "name";
137     static final String ID_TAG = "id";
138     static final String ARG_TAG = "arg";
139     static final String ATTRIBUTE_TAG = "attr";
140     static final String VALUE_TAG = "value";
141     static final String THRESHOLD_TAG = "threshold";
142     static final String ERROR_MESSAGE_TAG = "error";
143     static final String PROXYNS = "proxy";
144
145     /**
146      * *********************************************************************
147      * Define the proxy request types
148      * ********************************************************************
149      */
150     public static final String REQUEST_JOIN = "join";
151     public static final String REQUEST_CREATE = "create";
152     public static final String REQUEST_SEARCH = "search";
153     public static final String REQUEST_LISTEN = "listen";
154     public static final String REQUEST_CLOSE = "close";
155     public static final String REQUEST_SEND = "send";
156
157     /**
158      * *********************************************************************
159      * Define the proxy response types
160      * ********************************************************************
161      */
162     public static final String RESPONSE_SUCCESS = "success";
163     public static final String RESPONSE_ERROR = "error";
164     public static final String RESPONSE_INFO = "info";
165     public static final String RESPONSE_RESULT = "result";
166     public static final String RESPONSE_MESSAGE = "data";
167
168     /**
169      * *********************************************************************
170      * Define the proxy type tags
171      * ********************************************************************
172      */
173     public static final String TYPE_PEER = "PEER";
174     public static final String TYPE_GROUP = "GROUP";
175     public static final String TYPE_PIPE = "PIPE";
176
177     private PeerGroup group = null;
178     private ID assignedID = null;
179     private String serviceName = null;
180     private String serviceParameter = null;
181     private EndpointService endpoint = null;
182     private DiscoveryService discovery = null;
183     private PipeService pipe = null;
184     private ModuleImplAdvertisement implAdvertisement = null;
185
186     private final LRUCache<Integer, Requestor> searchRequests = new LRUCache<Integer, Requestor>(25); // Currently unused
187     private final Map<String, PipeListenerList> pipeListeners = new TreeMap<String, PipeListenerList>();
188
189     /**
190      * Pending pipes cost only memory, so it is not a problrm to
191      * wait for the GC to cleanup things. No CacheEntryListener.
192      */
193     private final Cache pendingPipes = new Cache(200, null);
194     private Cache resolvedPipes;
195
196     private static Map<String, PeerGroup> proxiedGroups = new HashMap<String, PeerGroup>(16);
197     private static Map<String, String> passwords = new HashMap<String, String>(16);
198
199     /**
200      * {@inheritDoc}
201      */
202     public void init(PeerGroup group, ID assignedID, Advertisement implAdv) throws PeerGroupException {
203         this.group = group;
204         this.assignedID = assignedID;
205         this.serviceName = assignedID.toString();
206         this.implAdvertisement = (ModuleImplAdvertisement) implAdv;
207
208         serviceParameter = group.getPeerGroupID().toString();
209
210         // Resolved pipes cost non-memory resources, so we need to close
211         // them as early as we forget them. Need a CacheEntryListener (this).
212         resolvedPipes = new Cache(200, this);
213
214         if (Logging.SHOW_CONFIG && LOG.isLoggable(Level.CONFIG)) {
215             StringBuilder configInfo = new StringBuilder("Configuring JXME Proxy Service : " + assignedID);
216
217             configInfo.append("\n\tImplementation :");
218             configInfo.append("\n\t\tModule Spec ID: ").append(implAdvertisement.getModuleSpecID());
219             configInfo.append("\n\t\tImpl Description : ").append(implAdvertisement.getDescription());
220             configInfo.append("\n\t\tImpl URI : ").append(implAdvertisement.getUri());
221             configInfo.append("\n\t\tImpl Code : ").append(implAdvertisement.getCode());
222             configInfo.append("\n\tGroup Params :");
223             configInfo.append("\n\t\tGroup : ").append(group.getPeerGroupName());
224             configInfo.append("\n\t\tGroup ID : ").append(group.getPeerGroupID());
225             configInfo.append("\n\t\tPeer ID : ").append(group.getPeerID());
226             LOG.config(configInfo.toString());
227         }
228     }
229
230     /**
231      * {@inheritDoc}
232      */
233     public int startApp(String[] args) {
234
235         Service needed = group.getEndpointService();
236
237         if (null == needed) {
238             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
239                 LOG.warning("Stalled until there is a endpoint service");
240             }
241
242             return START_AGAIN_STALLED;
243         }
244
245         needed = group.getDiscoveryService();
246         if (null == needed) {
247             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
248                 LOG.warning("Stalled until there is a discovery service");
249             }
250
251             return START_AGAIN_STALLED;
252         }
253
254         needed = group.getPipeService();
255         if (null == needed) {
256             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
257                 LOG.warning("Stalled until there is a pipe service");
258             }
259
260             return START_AGAIN_STALLED;
261         }
262
263         endpoint = group.getEndpointService();
264         discovery = group.getDiscoveryService();
265         pipe = group.getPipeService();
266
267         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
268             LOG.fine("addListener " + serviceName + serviceParameter);
269         }
270
271         endpoint.addIncomingMessageListener(this, serviceName, serviceParameter);
272         if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
273             LOG.info("JXME Proxy Service started.");
274         }
275         return Module.START_OK;
276     }
277
278     /**
279      * {@inheritDoc}
280      */
281     public void stopApp() {
282         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
283             LOG.fine("removeListener " + serviceName + serviceParameter);
284         }
285
286         endpoint.removeIncomingMessageListener(serviceName, serviceParameter);
287
288         if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
289             LOG.info("JXME Proxy Service stopped.");
290         }
291     }
292
293     /**
294      * {@inheritDoc}
295      */
296     public ModuleImplAdvertisement getImplAdvertisement() {
297         return implAdvertisement;
298     }
299
300     /**
301      * {@inheritDoc}
302      */
303     public ProxyService getInterface() {
304         return this;
305     }
306
307     /**
308      * {@inheritDoc}
309      */
310     public synchronized void processIncomingMessage(Message message, EndpointAddress srcAddr, EndpointAddress dstAddr) {
311
312         logMessage(message, LOG);
313
314         Requestor requestor = null;
315
316         try {
317             // requestor = Requestor.createRequestor(group, message, srcAddr);
318             // Commented out the above line and added the following three lines.
319             // The change allows to reduce the traffice going to a JXME peer
320             // by able to remove ERM completly. As a side effect (severe one)
321             // JXTA Proxy and JXTA relay need to be running on the same peer.
322             // This changes should be pulled out as soon as ERM is implemented
323             // in a more inteligent and effective way so that it doesn't
324             // have any impact on JXME peers.
325             EndpointAddress relayAddr = new EndpointAddress("relay", srcAddr.getProtocolAddress(), srcAddr.getServiceName(),
326                     srcAddr.getServiceParameter());
327
328             requestor = Requestor.createRequestor(group, message, relayAddr, 0);
329         } catch (IOException e) {
330             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
331                 LOG.log(Level.WARNING, "could not create requestor", e);
332             }
333         }
334
335         String request = popString(REQUEST_TAG, message);
336
337         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
338             LOG.fine("request = " + request + " requestor " + requestor);
339         }
340
341         if (request != null && requestor != null) {
342             if (REQUEST_JOIN.equals(request)) {
343                 handleJoinRequest(requestor, popString(ID_TAG, message), popString(ARG_TAG, message));
344             } else if (REQUEST_CREATE.equals(request)) {
345                 handleCreateRequest(requestor, popString(TYPE_TAG, message), popString(NAME_TAG, message),
346                         popString(ID_TAG, message), popString(ARG_TAG, message));
347             } else if (REQUEST_SEARCH.equals(request)) {
348                 handleSearchRequest(requestor, popString(TYPE_TAG, message), popString(ATTRIBUTE_TAG, message),
349                         popString(VALUE_TAG, message), popString(THRESHOLD_TAG, message));
350             } else if ("listen".equals(request)) {
351                 handleListenRequest(requestor, popString(ID_TAG, message));
352             } else if ("close".equals(request)) {
353                 handleCloseRequest(requestor, popString(ID_TAG, message));
354             } else if ("send".equals(request)) {
355                 handleSendRequest(requestor, popString(ID_TAG, message), message);
356             }
357         }
358     }
359
360     // Right now there's a security hole: passwd come in clear.
361     // And not much is done for stopping clients to use the new group
362     // without being authenticated. We also never get rid of these
363     // additional groups.
364     private synchronized void handleJoinRequest(Requestor requestor, String grpId, String passwd) {
365
366         PeerGroup g = proxiedGroups.get(grpId);
367
368         if (g != null) {
369             if (g == this.group) {
370                 requestor.notifyError("Same group");
371             } else if (!passwords.get(grpId).equals(passwd)) {
372                 requestor.notifyError("Incorrect password");
373             } else {
374                 requestor.notifySuccess();
375             }
376             return;
377         }
378
379         try {
380             g = group.newGroup((PeerGroupID) IDFactory.fromURI(new URI(grpId)));
381             g.getRendezVousService().startRendezVous();
382         } catch (Exception ge) {
383             requestor.notifyError(ge.getMessage());
384             return;
385         }
386
387         // XXX check membership here. (would work only for single passwd grps)
388         // For now, assume join is always welcome.
389
390         // So far so good. Try to start a proxy in that grp.
391         try {
392             // Fork this proxy into the new grp.
393             ProxyService proxyService = new ProxyService();
394             proxyService.init(g, assignedID, implAdvertisement);
395             proxyService.startApp(null);
396         } catch (Exception e) {
397             requestor.notifyError(e.getMessage());
398             return;
399         }
400         // set non-deft passwd
401         passwords.put(grpId, passwd);
402         proxiedGroups.put(grpId, g);
403         requestor.notifySuccess();
404     }
405
406     private void handleCreateRequest(Requestor requestor, String type, String name, String id, String arg) {
407
408         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
409             LOG.fine("handleCreateRequest type=" + type + " name=" + name + " id=" + id + " arg=" + arg);
410         }
411
412         if (name == null) {
413             name = ""; // default name
414         }
415
416         if (TYPE_PEER.equals(type)) {
417             PeerAdvertisement adv = createPeerAdvertisement(name, id);
418
419             if (adv != null) {
420                 try {
421                     discovery.publish(adv);
422                 } catch (Exception e) {
423                     if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
424                         LOG.log(Level.WARNING, "Could not publish peer advertisement", e);
425                     }
426                 }
427                 requestor.send(adv, RESPONSE_SUCCESS);
428             } else {
429                 requestor.notifyError("could not create advertisement");
430             }
431         } else if (TYPE_GROUP.equals(type)) {
432             PeerGroupAdvertisement adv = createGroupAdvertisement(name, id);
433
434             if (adv != null) {
435                 requestor.send(adv, RESPONSE_SUCCESS);
436             } else {
437                 requestor.notifyError("could not create advertisement");
438             }
439         } else if (TYPE_PIPE.equals(type)) {
440             if (arg == null) {
441                 arg = PipeService.UnicastType; // default pipe type
442             }
443
444             PipeAdvertisement adv = createPipeAdvertisement(name, id, arg);
445
446             if (adv != null) {
447                 try {
448                     discovery.publish(adv);
449                 } catch (Exception e) {
450                     if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
451                         LOG.log(Level.WARNING, "Could not publish pipe advertisement", e);
452                     }
453                 }
454
455                 requestor.send(adv, RESPONSE_SUCCESS);
456             } else {
457                 requestor.notifyError("could not create advertisement");
458             }
459         } else {
460             requestor.notifyError("unsupported type");
461         }
462     }
463
464     private void handleSearchRequest(Requestor requestor, String type, String attribute, String value, String threshold) {
465
466         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
467             LOG.fine("handleSearchRequest type=" + type + " attribute=" + attribute + " value=" + value + " threshold=" + threshold);
468         }
469
470         int discoveryType;
471         int thr = DEFAULT_THRESHOLD;
472         try {
473             thr = Integer.parseInt(threshold);
474         } catch (NumberFormatException nex) {
475             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
476                 LOG.warning("handleSearchRequest failed to parse threshold " + threshold + ", using default " + DEFAULT_THRESHOLD);
477             }
478         }
479         requestor.setThreshold(thr);
480
481         if (TYPE_PEER.equals(type)) {
482             discoveryType = DiscoveryService.PEER;
483         } else if (TYPE_GROUP.equals(type)) {
484             discoveryType = DiscoveryService.GROUP;
485         } else {
486             discoveryType = DiscoveryService.ADV;
487         }
488
489         Enumeration<Advertisement> each = null;
490
491         try {
492             each = discovery.getLocalAdvertisements(discoveryType, attribute, value);
493         } catch (IOException e) {
494             requestor.notifyError("could not search locally");
495         }
496
497         int i = 0;
498         while (each.hasMoreElements() && i < thr) {
499             Advertisement adv = each.nextElement();
500
501             // notify the requestor of the result
502             // FIXME this can be optimized by sending all adv's in a
503             // single message
504             requestor.send(adv, RESPONSE_RESULT);
505             i++;
506         }
507
508         // start the query
509         int queryId = discovery.getRemoteAdvertisements(null, discoveryType, attribute, value, thr);
510
511         // register the query
512         searchRequests.put(queryId, requestor);
513     }
514
515     /**
516      * Finds a JXTA Pipe and starts listening to it.
517      *
518      * @param requestor the peer sending listen request.
519      * @param id        the id of the Pipe.
520      */
521     private void handleListenRequest(Requestor requestor, String id) {
522         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
523             LOG.fine("handleListenRequest id=" + id);
524         }
525
526         if (id == null) {
527             requestor.notifyError("Pipe ID not specified");
528             return;
529         }
530
531         PipeAdvertisement pipeAdv = findPipeAdvertisement(null, id, null);
532
533         if (pipeAdv == null) {
534             requestor.notifyError("Pipe Advertisement not found");
535             return;
536         }
537
538         String pipeId = pipeAdv.getPipeID().toString();
539
540         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
541             LOG.fine("listen to pipe name=" + pipeAdv.getName() + " id=" + pipeAdv.getPipeID() + " type=" + pipeAdv.getType());
542         }
543
544         // check to see if the input pipe already exist
545         PipeListenerList list = pipeListeners.get(pipeId);
546
547         if (list == null) {
548             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
549                 LOG.fine("first listener, create input pipe");
550             }
551
552             // create an input pipe
553             try {
554                 list = new PipeListenerList(pipe.createInputPipe(pipeAdv, this), pipeListeners, pipeId);
555             } catch (IOException e) {
556                 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
557                     LOG.log(Level.WARNING, "could not listen to pipe", e);
558                 }
559                 requestor.notifyError("could not listen to pipe");
560                 return;
561             }
562             pipeListeners.put(pipeId, list);
563         }
564
565         // add requestor to list
566         list.add(requestor);
567
568         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
569             LOG.fine("add requestor=" + requestor + " id=" + pipeId + " list=" + list);
570             LOG.fine("publish PipeAdvertisement");
571         }
572         // advertise the pipe locally
573         try {
574             discovery.publish(pipeAdv);
575         } catch (IOException e) {
576             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
577                 LOG.log(Level.WARNING, "Could not publish pipe advertisement", e);
578             }
579         }
580
581         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
582             LOG.fine("done with listen request");
583         }
584
585         // notify requestor of success
586         requestor.notifySuccess();
587     }
588
589     private void handleCloseRequest(Requestor requestor, String id) {
590         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
591             LOG.fine("handleCloseRequest id=" + id);
592         }
593
594         PipeListenerList list = pipeListeners.get(id);
595
596         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
597             LOG.fine("handleCloseRequest list = " + list);
598         }
599         if (list != null) {
600             list.remove(requestor);
601             if (list.size() == 0) {
602                 pipeListeners.remove(id);
603             }
604         }
605
606         // notify requestor of success
607         requestor.notifySuccess();
608     }
609
610     // Send the given message to the given pipe.
611     private void sendToPipe(Requestor req, Message mess, OutputPipe out) {
612         try {
613             out.send(mess);
614             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
615                 LOG.fine("output pipe send end");
616             }
617             // notify requestor of success
618             req.notifySuccess();
619         } catch (IOException e) {
620             req.notifyError("could not send to pipe");
621             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
622                 LOG.log(Level.FINE, "could not send to pipe", e);
623             }
624         }
625     }
626
627     class ClientMessage {
628         private Requestor requestor;
629         private Message message;
630
631         public ClientMessage(Requestor req, Message mess) {
632             requestor = req;
633             message = mess;
634         }
635
636         // Send this (pending) message
637         public void send(OutputPipe out) {
638             sendToPipe(requestor, message, out);
639         }
640
641     }
642
643
644     class PendingPipe {
645         private ClientMessage pending;
646
647         public PendingPipe() {
648             pending = null;
649         }
650
651         // Just got resolved ! Will send the pending message(s).
652         public void sendPending(OutputPipe out) {
653             pending.send(out);
654             pending = null;
655         }
656
657         // Enqueue a new pending message.
658         // (for now we only enqueue 1; others get trashed)
659         public void enqueue(Requestor req, Message mess) {
660             if (pending != null) {
661                 return;
662             }
663             pending = new ClientMessage(req, mess);
664         }
665     }
666
667     private void handleSendRequest(Requestor requestor, String id, Message message) {
668
669         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
670             LOG.fine("handleSendRequest id=" + id);
671         }
672
673         PipeAdvertisement pipeAdv = findPipeAdvertisement(null, id, null);
674
675         if (pipeAdv == null) {
676             requestor.notifyError("Could not find pipe");
677             return;
678         }
679
680         String pipeId = pipeAdv.getPipeID().toString();
681
682         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
683             LOG.fine(
684                     "send to pipe name=" + pipeAdv.getName() + " id=" + pipeAdv.getPipeID().toString() + " arg="
685                     + pipeAdv.getType());
686         }
687
688         // check if there are local listeners
689
690         PipeListenerList list = pipeListeners.get(pipeId);
691         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
692             LOG.fine("local listener list " + list);
693         }
694
695         if (list != null && PipeService.UnicastType.equals(pipeAdv.getType())) {
696
697             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
698                 LOG.fine("start sending to each requestor");
699             }
700
701             list.send(message, pipeId);
702             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
703                 LOG.fine("end sending to each requestor");
704             }
705             // notify requestor of success
706             requestor.notifySuccess();
707             return;
708         }
709
710         // NOTE: This part is NOT exercised by the load test because all
711         // clients are local. To exercise this part, comment out the
712         // optimization above.
713
714         // This is not a unicast pipe with at least one local listener
715         // so we need to fingure out where the message should go.
716         // This may take a while and has to be done asynchronously...
717         // Carefull that the resolution can occur synchronously by this
718         // very thread, and java lock will not prevent re-entry.
719
720         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
721             LOG.fine("output pipe creation begin");
722         }
723
724         // Look for the pipe in the resolved list. If not found
725         // look in the pending list or add it there.
726         OutputPipe out = (OutputPipe) resolvedPipes.get(pipeId);
727
728         if (out != null) {
729             sendToPipe(requestor, message, out);
730             return;
731         }
732         PendingPipe p = (PendingPipe) pendingPipes.get(pipeId);
733
734         if (p != null) {
735             p.enqueue(requestor, message);
736             return;
737         }
738
739         try {
740             p = new PendingPipe();
741             p.enqueue(requestor, message);
742             pendingPipes.put(pipeId, p);
743             pipe.createOutputPipe(pipeAdv, this);
744         } catch (IOException e) {
745             pendingPipes.remove(pipeId);
746             requestor.notifyError("could not create output pipe");
747         }
748     }
749
750     // TBD: DO WE NEED THIS FUNCTIONALITY FOR JXME?
751     private PeerAdvertisement createPeerAdvertisement(String name, String id) {
752         PeerAdvertisement adv = null;
753
754         PeerID pid = null;
755
756         if (id != null) {
757             try {
758                 ID tempId = IDFactory.fromURI(new URI(id));
759
760                 pid = (PeerID) tempId;
761             } catch (URISyntaxException e) {
762                 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
763                     LOG.log(Level.WARNING, "Could not parse peerId from url", e);
764                 }
765             } catch (ClassCastException e) {
766                 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
767                     LOG.log(Level.WARNING, "id was not a peerid", e);
768                 }
769             }
770         }
771
772         if (pid == null) {
773             pid = IDFactory.newPeerID(group.getPeerGroupID());
774         }
775
776         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
777             LOG.fine("newPeerAdvertisement name=" + name + " id=" + pid.toString());
778         }
779
780         try {
781             // Create a pipe advertisement for this pipe.
782             adv = (PeerAdvertisement) AdvertisementFactory.newAdvertisement(PeerAdvertisement.getAdvertisementType());
783
784             adv.setPeerID(pid);
785             adv.setPeerGroupID(group.getPeerGroupID());
786             adv.setName(name);
787             adv.setDescription("Peer Advertisement created for a jxme device");
788         } catch (Exception e) {
789             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
790                 LOG.log(Level.WARNING, "newPeerAdvertisement Exception", e);
791             }
792         }
793
794         return adv;
795     }
796
797     private PeerGroupAdvertisement createGroupAdvertisement(String name, String id) {
798         PeerGroupAdvertisement adv;
799
800         PeerGroupID gid = null;
801
802         if (id != null) {
803             try {
804                 ID tempId = IDFactory.fromURI(new URI(id));
805
806                 gid = (PeerGroupID) tempId;
807             } catch (URISyntaxException e) {
808                 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
809                     LOG.log(Level.WARNING, "Invalid peergroupId", e);
810                 }
811             } catch (ClassCastException e) {
812                 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
813                     LOG.log(Level.WARNING, "id was not a peergroup id", e);
814                 }
815             }
816         }
817
818         if (gid == null) {
819             gid = IDFactory.newPeerGroupID();
820         }
821
822         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
823             LOG.fine("newPeerGroupAdvertisement name=" + name + " id=" + gid.toString());
824         }
825
826         adv = group.getPeerGroupAdvertisement().clone();
827
828         try {
829             // Create a PeerGroup Advertisement for this pipe.
830             adv = (PeerGroupAdvertisement) AdvertisementFactory.newAdvertisement(PeerGroupAdvertisement.getAdvertisementType());
831             adv.setName(name);
832             adv.setPeerGroupID(gid);
833             adv.setModuleSpecID(PeerGroup.allPurposePeerGroupSpecID);
834             adv.setDescription("PeerGroup Advertisement created for a jxme device");
835             ModuleImplAdvertisement groupImplAdv = group.getAllPurposePeerGroupImplAdvertisement();
836
837             discovery.publish(groupImplAdv);
838             discovery.publish(adv);
839         } catch (Exception e) {
840             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
841                 LOG.log(Level.WARNING, "newPeerGroupAdvertisement Exception", e);
842             }
843         }
844
845         return adv;
846     }
847
848     private PipeAdvertisement createPipeAdvertisement(String pipeName, String pipeId, String pipeType) {
849         PipeAdvertisement adv = null;
850
851         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
852             LOG.fine("newPipeAdvertisement name=" + pipeName + " pipeId=" + pipeId + " pipeType=" + pipeType);
853         }
854
855         if (pipeType == null || pipeType.length() == 0) {
856             pipeType = PipeService.UnicastType;
857         }
858
859         if (pipeId == null) {
860             pipeId = IDFactory.newPipeID(group.getPeerGroupID()).toString();
861         }
862
863         try {
864             // Create a pipe advertisement for this pipe.
865             adv = (PipeAdvertisement) AdvertisementFactory.newAdvertisement(PipeAdvertisement.getAdvertisementType());
866
867             adv.setName(pipeName);
868             adv.setPipeID(IDFactory.fromURI(new URI(pipeId)));
869             adv.setType(pipeType);
870         } catch (Exception e) {
871             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
872                 LOG.log(Level.WARNING, "newPipeAdvertisement Exception", e);
873             }
874         }
875
876         return adv;
877     }
878
879     private PipeAdvertisement findPipeAdvertisement(String name, String id, String arg) {
880         String attribute, value;
881
882         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
883             LOG.fine("findPipeAdvertisement name=" + name + " id=" + id + " arg=" + arg);
884         }
885
886         if (id != null) {
887             attribute = PipeAdvertisement.IdTag;
888             value = id;
889         } else if (name != null) {
890             attribute = PipeAdvertisement.NameTag;
891             value = name;
892         } else {
893             // the id or the name must be specified
894             return null;
895         }
896
897         Enumeration<Advertisement> each;
898
899         try {
900             each = discovery.getLocalAdvertisements(DiscoveryService.ADV, attribute, value);
901         } catch (IOException e) {
902             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
903                 LOG.log(Level.WARNING, "IOException in getLocalAdvertisements()", e);
904             }
905             return null;
906         }
907
908         PipeAdvertisement pipeAdv = null;
909
910         while (each.hasMoreElements()) {
911             Advertisement adv = each.nextElement();
912
913             // take the first match
914             if (adv instanceof PipeAdvertisement) {
915                 pipeAdv = (PipeAdvertisement) adv;
916                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
917                     LOG.fine("found PipeAdvertisement = " + pipeAdv);
918                 }
919                 break;
920             }
921         }
922
923         return pipeAdv;
924     }
925
926     public synchronized void discoveryEvent(DiscoveryEvent event) {
927
928         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
929             LOG.fine("discoveryEvent " + event);
930         }
931
932         Requestor requestor = searchRequests.get(event.getQueryID());
933         if (requestor == null) {
934             return;
935         }
936
937         DiscoveryResponseMsg response = event.getResponse();
938         if (response == null) {
939             return;
940         }
941
942         Enumeration<Advertisement> each = response.getAdvertisements();
943         if (each == null || !each.hasMoreElements()) {
944             return;
945         }
946         // we have a response remove it from the LRUCache
947         searchRequests.remove(event.getQueryID());
948         int i = 0;
949
950         while (each.hasMoreElements() && i < requestor.getThreshold()) {
951             try {
952                 requestor.send(each.nextElement(), RESPONSE_RESULT);
953             } catch (Exception e) {
954                 // this should not happen unless a bad result is returned
955                 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
956                     LOG.log(Level.WARNING, "Bad result returned by DiscoveryService", e);
957                 }
958             }
959         }
960     }
961
962     /**
963      * {@inheritDoc}
964      */
965     public synchronized void pipeMsgEvent(PipeMsgEvent event) {
966         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
967             LOG.fine("pipeMsgEvent " + event.getPipeID());
968         }
969
970         String id = event.getPipeID().toString();
971
972         PipeListenerList list = pipeListeners.get(id);
973
974         if (list != null) {
975             Message message = event.getMessage();
976
977             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
978                 LOG.fine("pipeMsgEvent: start sending to each requestor");
979             }
980             list.send(message.clone(), id);
981             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
982                 LOG.fine("pipeMsgEvent: end sending to each requestor");
983             }
984         } else {
985             // there are no listeners, close the input pipe
986             ((InputPipe) event.getSource()).close();
987             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
988                 LOG.fine("close pipe id=" + id);
989             }
990         }
991     }
992
993     /**
994      * {@inheritDoc}
995      */
996     public synchronized void outputPipeEvent(OutputPipeEvent event) {
997
998         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
999             LOG.fine("outputPipeEvent " + event);
1000         }
1001         PendingPipe p = (PendingPipe) pendingPipes.remove(event.getPipeID());
1002
1003         // No one cares (anylonger). TBD should it be removed then??
1004         if (p == null) {
1005             event.getOutputPipe().close();
1006             return;
1007         }
1008
1009         resolvedPipes.put(event.getPipeID(), event.getOutputPipe());
1010         p.sendPending(event.getOutputPipe());
1011     }
1012
1013     private static String popString(String name, Message message) {
1014         MessageElement el = message.getMessageElement(PROXYNS, name);
1015
1016         if (el != null) {
1017             message.removeMessageElement(el);
1018             return el.toString();
1019         }
1020         return null;
1021     }
1022
1023     static class PipeListenerList {
1024         LinkedList<Requestor> list = new LinkedList<Requestor>();
1025         InputPipe inputPipe = null;
1026         Map<String, PipeListenerList> pipeListeners = null;
1027         String id = null;
1028
1029         PipeListenerList(InputPipe inputPipe, Map<String, PipeListenerList> pipeListeners, String id) {
1030             this.inputPipe = inputPipe;
1031             this.pipeListeners = pipeListeners;
1032             this.id = id;
1033
1034             if (pipeListeners != null) {
1035                 if (Logging.SHOW_CONFIG && LOG.isLoggable(Level.CONFIG)) {
1036                     LOG.config("number of pipeListeners = " + pipeListeners.size());
1037                 }
1038             }
1039         }
1040
1041         void add(Requestor requestor) {
1042             if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
1043                 LOG.info("add " + requestor + " from " + toString());
1044             }
1045
1046             if (!list.contains(requestor)) {
1047                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1048                     LOG.fine("requestor add");
1049                 }
1050                 list.add(requestor);
1051             } else {
1052                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1053                     LOG.fine("requestor exits already");
1054                 }
1055             }
1056         }
1057
1058         void remove(Requestor requestor) {
1059             if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
1060                 LOG.info("remove " + requestor + " from " + toString());
1061             }
1062
1063             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1064                 LOG.fine("removed = " + list.remove(requestor));
1065             }
1066
1067             if (list.size() == 0) {
1068                 // close the pipe and remove from the listenerList
1069                 if (inputPipe != null) {
1070                     inputPipe.close();
1071                 }
1072
1073                 if (id != null && pipeListeners != null) {
1074                     pipeListeners.remove(id);
1075                 }
1076             }
1077         }
1078
1079         int size() {
1080             int size = list.size();
1081             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1082                 LOG.fine("size " + size);
1083             }
1084             return size;
1085         }
1086
1087         private static StringMessageElement sme = new StringMessageElement(RESPONSE_TAG, RESPONSE_MESSAGE, null);
1088
1089         void send(Message message, String id) {
1090             LOG.fine("send list.size = " + list.size());
1091
1092             message.addMessageElement(PROXYNS, sme);
1093             message.addMessageElement(PROXYNS, new StringMessageElement(ID_TAG, id, null));
1094
1095             // removed all element that are known to be not needed
1096             Iterator<MessageElement> elements = message.getMessageElements();
1097
1098             while (elements.hasNext()) {
1099                 MessageElement el = elements.next();
1100                 String name = el.getElementName();
1101
1102                 if (name.startsWith("RendezVousPropagate")) {
1103                     if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1104                         LOG.fine("removeMessageElement " + name);
1105                     }
1106                     elements.remove();
1107                 } else if (name.startsWith("JxtaWireHeader")) {
1108                     if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1109                         LOG.fine("removeMessageElement " + name);
1110                     }
1111                     elements.remove();
1112                 } else if (name.startsWith("RdvIncarnjxta")) {
1113                     if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1114                         LOG.fine("removeMessageElement " + name);
1115                     }
1116                     elements.remove();
1117                 } else if (name.startsWith("JxtaEndpointRouter")) {
1118                     if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1119                         LOG.fine("removeMessageElement " + name);
1120                     }
1121                     elements.remove();
1122                 } else if (name.startsWith("EndpointRouterMsg")) {
1123                     if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1124                         LOG.fine("removeMessageElement " + name);
1125                     }
1126                     elements.remove();
1127                 } else if (name.startsWith("EndpointHeaderSrcPeer")) {
1128                     if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1129                         LOG.fine("removeMessageElement " + name);
1130                     }
1131                     elements.remove();
1132                 }
1133             }
1134
1135             Iterator<Requestor> iterator = list.iterator();
1136             try {
1137                 while (iterator.hasNext()) {
1138                     Requestor requestor = iterator.next();
1139
1140                     if (!requestor.send(message.clone())) {
1141                         // could not send to listener, remove them from the list
1142                         remove(requestor);
1143                     }
1144                 }
1145             } catch (Exception ex) {
1146                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1147                     LOG.fine("Error sending" + ex);
1148                 }
1149             }
1150         }
1151
1152         /**
1153          * {@inheritDoc}
1154          */
1155         @Override
1156         public String toString() {
1157             return "PipeListenerList size=" + list.size();
1158         }
1159     }
1160
1161     protected static void logMessage(Message message, Logger log) {
1162         if (!Logging.SHOW_FINER || !log.isLoggable(Level.FINER)) {
1163             return;
1164         }
1165
1166         StringBuilder out = new StringBuilder("\n**************** begin ****************\n");
1167
1168         Message.ElementIterator elements = message.getMessageElements();
1169
1170         while (elements.hasNext()) {
1171             MessageElement element = elements.next();
1172
1173             out.append("[").append(elements.getNamespace()).append(",").append(element.getElementName()).append("]=").append(element.toString()).append(
1174                     "\n");
1175         }
1176
1177         out.append("****************  end  ****************\n");
1178         log.finer(out.toString());
1179     }
1180
1181     /**
1182      * {@inheritDoc}
1183      */
1184     public void purged(CacheEntry ce) {
1185         // A resolved pipe was purged from the cache because we have to
1186         // many pre-resolved pipes hanging around. Close it, because
1187         // it may be holding critical resources that the GC will not be
1188         // sensitive to.
1189         ((OutputPipe) (ce.getValue())).close();
1190     }
1191 }