2 * Copyright (c) 2005-2007 Sun Microsystems, Inc. All rights reserved.
4 * The Sun Project JXTA(TM) Software License
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
9 * 1. Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
16 * 3. The end-user documentation included with the redistribution, if any, must
17 * include the following acknowledgment: "This product includes software
18 * developed by Sun Microsystems, Inc. for JXTA(TM) technology."
19 * Alternately, this acknowledgment may appear in the software itself, if
20 * and wherever such third-party acknowledgments normally appear.
22 * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must
23 * not be used to endorse or promote products derived from this software
24 * without prior written permission. For written permission, please contact
25 * Project JXTA at http://www.jxta.org.
27 * 5. Products derived from this software may not be called "JXTA", nor may
28 * "JXTA" appear in their name, without prior written permission of Sun.
30 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
31 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
32 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN
33 * MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
34 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
36 * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
37 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
38 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
39 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41 * JXTA is a registered trademark of Sun Microsystems, Inc. in the United
42 * States and other countries.
44 * Please see the license information page at :
45 * <http://www.jxta.org/project/www/license.html> for instructions on use of
46 * the license in source files.
48 * ====================================================================
50 * This software consists of voluntary contributions made by many individuals
51 * on behalf of Project JXTA. For more information on Project JXTA, please see
52 * http://www.jxta.org.
54 * This license is based on the BSD license adopted by the Apache Foundation.
56 package net.jxta.impl.proxy;
58 import net.jxta.discovery.DiscoveryEvent;
59 import net.jxta.discovery.DiscoveryService;
60 import net.jxta.document.Advertisement;
61 import net.jxta.document.AdvertisementFactory;
62 import net.jxta.document.MimeMediaType;
63 import net.jxta.endpoint.EndpointAddress;
64 import net.jxta.endpoint.EndpointListener;
65 import net.jxta.endpoint.EndpointService;
66 import net.jxta.endpoint.Message;
67 import net.jxta.endpoint.MessageElement;
68 import net.jxta.endpoint.StringMessageElement;
69 import net.jxta.exception.PeerGroupException;
70 import net.jxta.id.ID;
71 import net.jxta.id.IDFactory;
72 import net.jxta.impl.util.Cache;
73 import net.jxta.impl.util.CacheEntry;
74 import net.jxta.impl.util.CacheEntryListener;
75 import net.jxta.impl.util.LRUCache;
76 import net.jxta.peer.PeerID;
77 import net.jxta.peergroup.PeerGroup;
78 import net.jxta.peergroup.PeerGroupID;
79 import net.jxta.pipe.InputPipe;
80 import net.jxta.pipe.OutputPipe;
81 import net.jxta.pipe.OutputPipeEvent;
82 import net.jxta.pipe.OutputPipeListener;
83 import net.jxta.pipe.PipeMsgEvent;
84 import net.jxta.pipe.PipeMsgListener;
85 import net.jxta.pipe.PipeService;
86 import net.jxta.protocol.DiscoveryResponseMsg;
87 import net.jxta.protocol.ModuleImplAdvertisement;
88 import net.jxta.protocol.PeerAdvertisement;
89 import net.jxta.protocol.PeerGroupAdvertisement;
90 import net.jxta.protocol.PipeAdvertisement;
91 import net.jxta.service.Service;
92 import java.util.logging.Level;
93 import net.jxta.logging.Logging;
94 import java.util.logging.Logger;
96 import java.io.IOException;
97 import java.io.StringReader;
99 import java.net.URISyntaxException;
100 import java.util.Enumeration;
101 import java.util.HashMap;
102 import java.util.Iterator;
103 import java.util.LinkedList;
104 import java.util.Map;
105 import java.util.TreeMap;
107 import net.jxta.platform.Module;
110 // FIXME: jice@jxta.org - 20020515
111 // All public methods are synchronized.
112 // None of them does anything blocking so that should be about OK, however
113 // first it is not 100% sure, second eventhough non-blocking, some of these
114 // operations could take a significant amount of time, which may be unfair
115 // to other threads that wish to enter for a quick operation.
116 // Making the locking finer-grain would require significant code rework, so
117 // it will have to do for now.
119 public class ProxyService implements Service, EndpointListener, PipeMsgListener, OutputPipeListener, CacheEntryListener {
121 private final static Logger LOG = Logger.getLogger(ProxyService.class.getName());
123 public final static int DEFAULT_THRESHOLD = 2;
124 public final static int DEFAULT_LIFETIME = 1000 * 60 * 30; // 30 minutes
127 * *********************************************************************
128 * Define the proxy message tags
129 * ********************************************************************
131 public static final String REQUEST_TAG = "request";
132 public static final String RESPONSE_TAG = "response";
134 static final String REQUESTID_TAG = "requestId";
135 static final String TYPE_TAG = "type";
136 static final String NAME_TAG = "name";
137 static final String ID_TAG = "id";
138 static final String ARG_TAG = "arg";
139 static final String ATTRIBUTE_TAG = "attr";
140 static final String VALUE_TAG = "value";
141 static final String THRESHOLD_TAG = "threshold";
142 static final String ERROR_MESSAGE_TAG = "error";
143 static final String PROXYNS = "proxy";
146 * *********************************************************************
147 * Define the proxy request types
148 * ********************************************************************
150 public static final String REQUEST_JOIN = "join";
151 public static final String REQUEST_CREATE = "create";
152 public static final String REQUEST_SEARCH = "search";
153 public static final String REQUEST_LISTEN = "listen";
154 public static final String REQUEST_CLOSE = "close";
155 public static final String REQUEST_SEND = "send";
158 * *********************************************************************
159 * Define the proxy response types
160 * ********************************************************************
162 public static final String RESPONSE_SUCCESS = "success";
163 public static final String RESPONSE_ERROR = "error";
164 public static final String RESPONSE_INFO = "info";
165 public static final String RESPONSE_RESULT = "result";
166 public static final String RESPONSE_MESSAGE = "data";
169 * *********************************************************************
170 * Define the proxy type tags
171 * ********************************************************************
173 public static final String TYPE_PEER = "PEER";
174 public static final String TYPE_GROUP = "GROUP";
175 public static final String TYPE_PIPE = "PIPE";
177 private PeerGroup group = null;
178 private ID assignedID = null;
179 private String serviceName = null;
180 private String serviceParameter = null;
181 private EndpointService endpoint = null;
182 private DiscoveryService discovery = null;
183 private PipeService pipe = null;
184 private ModuleImplAdvertisement implAdvertisement = null;
186 private final LRUCache<Integer, Requestor> searchRequests = new LRUCache<Integer, Requestor>(25); // Currently unused
187 private final Map<String, PipeListenerList> pipeListeners = new TreeMap<String, PipeListenerList>();
190 * Pending pipes cost only memory, so it is not a problrm to
191 * wait for the GC to cleanup things. No CacheEntryListener.
193 private final Cache pendingPipes = new Cache(200, null);
194 private Cache resolvedPipes;
196 private static Map<String, PeerGroup> proxiedGroups = new HashMap<String, PeerGroup>(16);
197 private static Map<String, String> passwords = new HashMap<String, String>(16);
202 public void init(PeerGroup group, ID assignedID, Advertisement implAdv) throws PeerGroupException {
204 this.assignedID = assignedID;
205 this.serviceName = assignedID.toString();
206 this.implAdvertisement = (ModuleImplAdvertisement) implAdv;
208 serviceParameter = group.getPeerGroupID().toString();
210 // Resolved pipes cost non-memory resources, so we need to close
211 // them as early as we forget them. Need a CacheEntryListener (this).
212 resolvedPipes = new Cache(200, this);
214 if (Logging.SHOW_CONFIG && LOG.isLoggable(Level.CONFIG)) {
215 StringBuilder configInfo = new StringBuilder("Configuring JXME Proxy Service : " + assignedID);
217 configInfo.append("\n\tImplementation :");
218 configInfo.append("\n\t\tModule Spec ID: ").append(implAdvertisement.getModuleSpecID());
219 configInfo.append("\n\t\tImpl Description : ").append(implAdvertisement.getDescription());
220 configInfo.append("\n\t\tImpl URI : ").append(implAdvertisement.getUri());
221 configInfo.append("\n\t\tImpl Code : ").append(implAdvertisement.getCode());
222 configInfo.append("\n\tGroup Params :");
223 configInfo.append("\n\t\tGroup : ").append(group.getPeerGroupName());
224 configInfo.append("\n\t\tGroup ID : ").append(group.getPeerGroupID());
225 configInfo.append("\n\t\tPeer ID : ").append(group.getPeerID());
226 LOG.config(configInfo.toString());
233 public int startApp(String[] args) {
235 Service needed = group.getEndpointService();
237 if (null == needed) {
238 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
239 LOG.warning("Stalled until there is a endpoint service");
242 return START_AGAIN_STALLED;
245 needed = group.getDiscoveryService();
246 if (null == needed) {
247 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
248 LOG.warning("Stalled until there is a discovery service");
251 return START_AGAIN_STALLED;
254 needed = group.getPipeService();
255 if (null == needed) {
256 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
257 LOG.warning("Stalled until there is a pipe service");
260 return START_AGAIN_STALLED;
263 endpoint = group.getEndpointService();
264 discovery = group.getDiscoveryService();
265 pipe = group.getPipeService();
267 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
268 LOG.fine("addListener " + serviceName + serviceParameter);
271 endpoint.addIncomingMessageListener(this, serviceName, serviceParameter);
272 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
273 LOG.info("JXME Proxy Service started.");
275 return Module.START_OK;
281 public void stopApp() {
282 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
283 LOG.fine("removeListener " + serviceName + serviceParameter);
286 endpoint.removeIncomingMessageListener(serviceName, serviceParameter);
288 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
289 LOG.info("JXME Proxy Service stopped.");
296 public ModuleImplAdvertisement getImplAdvertisement() {
297 return implAdvertisement;
303 public ProxyService getInterface() {
310 public synchronized void processIncomingMessage(Message message, EndpointAddress srcAddr, EndpointAddress dstAddr) {
312 logMessage(message, LOG);
314 Requestor requestor = null;
317 // requestor = Requestor.createRequestor(group, message, srcAddr);
318 // Commented out the above line and added the following three lines.
319 // The change allows to reduce the traffice going to a JXME peer
320 // by able to remove ERM completly. As a side effect (severe one)
321 // JXTA Proxy and JXTA relay need to be running on the same peer.
322 // This changes should be pulled out as soon as ERM is implemented
323 // in a more inteligent and effective way so that it doesn't
324 // have any impact on JXME peers.
325 EndpointAddress relayAddr = new EndpointAddress("relay", srcAddr.getProtocolAddress(), srcAddr.getServiceName(),
326 srcAddr.getServiceParameter());
328 requestor = Requestor.createRequestor(group, message, relayAddr, 0);
329 } catch (IOException e) {
330 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
331 LOG.log(Level.WARNING, "could not create requestor", e);
335 String request = popString(REQUEST_TAG, message);
337 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
338 LOG.fine("request = " + request + " requestor " + requestor);
341 if (request != null && requestor != null) {
342 if (REQUEST_JOIN.equals(request)) {
343 handleJoinRequest(requestor, popString(ID_TAG, message), popString(ARG_TAG, message));
344 } else if (REQUEST_CREATE.equals(request)) {
345 handleCreateRequest(requestor, popString(TYPE_TAG, message), popString(NAME_TAG, message),
346 popString(ID_TAG, message), popString(ARG_TAG, message));
347 } else if (REQUEST_SEARCH.equals(request)) {
348 handleSearchRequest(requestor, popString(TYPE_TAG, message), popString(ATTRIBUTE_TAG, message),
349 popString(VALUE_TAG, message), popString(THRESHOLD_TAG, message));
350 } else if ("listen".equals(request)) {
351 handleListenRequest(requestor, popString(ID_TAG, message));
352 } else if ("close".equals(request)) {
353 handleCloseRequest(requestor, popString(ID_TAG, message));
354 } else if ("send".equals(request)) {
355 handleSendRequest(requestor, popString(ID_TAG, message), message);
360 // Right now there's a security hole: passwd come in clear.
361 // And not much is done for stopping clients to use the new group
362 // without being authenticated. We also never get rid of these
363 // additional groups.
364 private synchronized void handleJoinRequest(Requestor requestor, String grpId, String passwd) {
366 PeerGroup g = proxiedGroups.get(grpId);
369 if (g == this.group) {
370 requestor.notifyError("Same group");
371 } else if (!passwords.get(grpId).equals(passwd)) {
372 requestor.notifyError("Incorrect password");
374 requestor.notifySuccess();
380 g = group.newGroup((PeerGroupID) IDFactory.fromURI(new URI(grpId)));
381 g.getRendezVousService().startRendezVous();
382 } catch (Exception ge) {
383 requestor.notifyError(ge.getMessage());
387 // XXX check membership here. (would work only for single passwd grps)
388 // For now, assume join is always welcome.
390 // So far so good. Try to start a proxy in that grp.
392 // Fork this proxy into the new grp.
393 ProxyService proxyService = new ProxyService();
394 proxyService.init(g, assignedID, implAdvertisement);
395 proxyService.startApp(null);
396 } catch (Exception e) {
397 requestor.notifyError(e.getMessage());
400 // set non-deft passwd
401 passwords.put(grpId, passwd);
402 proxiedGroups.put(grpId, g);
403 requestor.notifySuccess();
406 private void handleCreateRequest(Requestor requestor, String type, String name, String id, String arg) {
408 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
409 LOG.fine("handleCreateRequest type=" + type + " name=" + name + " id=" + id + " arg=" + arg);
413 name = ""; // default name
416 if (TYPE_PEER.equals(type)) {
417 PeerAdvertisement adv = createPeerAdvertisement(name, id);
421 discovery.publish(adv);
422 } catch (Exception e) {
423 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
424 LOG.log(Level.WARNING, "Could not publish peer advertisement", e);
427 requestor.send(adv, RESPONSE_SUCCESS);
429 requestor.notifyError("could not create advertisement");
431 } else if (TYPE_GROUP.equals(type)) {
432 PeerGroupAdvertisement adv = createGroupAdvertisement(name, id);
435 requestor.send(adv, RESPONSE_SUCCESS);
437 requestor.notifyError("could not create advertisement");
439 } else if (TYPE_PIPE.equals(type)) {
441 arg = PipeService.UnicastType; // default pipe type
444 PipeAdvertisement adv = createPipeAdvertisement(name, id, arg);
448 discovery.publish(adv);
449 } catch (Exception e) {
450 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
451 LOG.log(Level.WARNING, "Could not publish pipe advertisement", e);
455 requestor.send(adv, RESPONSE_SUCCESS);
457 requestor.notifyError("could not create advertisement");
460 requestor.notifyError("unsupported type");
464 private void handleSearchRequest(Requestor requestor, String type, String attribute, String value, String threshold) {
466 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
467 LOG.fine("handleSearchRequest type=" + type + " attribute=" + attribute + " value=" + value + " threshold=" + threshold);
471 int thr = DEFAULT_THRESHOLD;
473 thr = Integer.parseInt(threshold);
474 } catch (NumberFormatException nex) {
475 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
476 LOG.warning("handleSearchRequest failed to parse threshold " + threshold + ", using default " + DEFAULT_THRESHOLD);
479 requestor.setThreshold(thr);
481 if (TYPE_PEER.equals(type)) {
482 discoveryType = DiscoveryService.PEER;
483 } else if (TYPE_GROUP.equals(type)) {
484 discoveryType = DiscoveryService.GROUP;
486 discoveryType = DiscoveryService.ADV;
489 Enumeration<Advertisement> each = null;
492 each = discovery.getLocalAdvertisements(discoveryType, attribute, value);
493 } catch (IOException e) {
494 requestor.notifyError("could not search locally");
498 while (each.hasMoreElements() && i < thr) {
499 Advertisement adv = each.nextElement();
501 // notify the requestor of the result
502 // FIXME this can be optimized by sending all adv's in a
504 requestor.send(adv, RESPONSE_RESULT);
509 int queryId = discovery.getRemoteAdvertisements(null, discoveryType, attribute, value, thr);
511 // register the query
512 searchRequests.put(queryId, requestor);
516 * Finds a JXTA Pipe and starts listening to it.
518 * @param requestor the peer sending listen request.
519 * @param id the id of the Pipe.
521 private void handleListenRequest(Requestor requestor, String id) {
522 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
523 LOG.fine("handleListenRequest id=" + id);
527 requestor.notifyError("Pipe ID not specified");
531 PipeAdvertisement pipeAdv = findPipeAdvertisement(null, id, null);
533 if (pipeAdv == null) {
534 requestor.notifyError("Pipe Advertisement not found");
538 String pipeId = pipeAdv.getPipeID().toString();
540 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
541 LOG.fine("listen to pipe name=" + pipeAdv.getName() + " id=" + pipeAdv.getPipeID() + " type=" + pipeAdv.getType());
544 // check to see if the input pipe already exist
545 PipeListenerList list = pipeListeners.get(pipeId);
548 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
549 LOG.fine("first listener, create input pipe");
552 // create an input pipe
554 list = new PipeListenerList(pipe.createInputPipe(pipeAdv, this), pipeListeners, pipeId);
555 } catch (IOException e) {
556 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
557 LOG.log(Level.WARNING, "could not listen to pipe", e);
559 requestor.notifyError("could not listen to pipe");
562 pipeListeners.put(pipeId, list);
565 // add requestor to list
568 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
569 LOG.fine("add requestor=" + requestor + " id=" + pipeId + " list=" + list);
570 LOG.fine("publish PipeAdvertisement");
572 // advertise the pipe locally
574 discovery.publish(pipeAdv);
575 } catch (IOException e) {
576 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
577 LOG.log(Level.WARNING, "Could not publish pipe advertisement", e);
581 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
582 LOG.fine("done with listen request");
585 // notify requestor of success
586 requestor.notifySuccess();
589 private void handleCloseRequest(Requestor requestor, String id) {
590 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
591 LOG.fine("handleCloseRequest id=" + id);
594 PipeListenerList list = pipeListeners.get(id);
596 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
597 LOG.fine("handleCloseRequest list = " + list);
600 list.remove(requestor);
601 if (list.size() == 0) {
602 pipeListeners.remove(id);
606 // notify requestor of success
607 requestor.notifySuccess();
610 // Send the given message to the given pipe.
611 private void sendToPipe(Requestor req, Message mess, OutputPipe out) {
614 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
615 LOG.fine("output pipe send end");
617 // notify requestor of success
619 } catch (IOException e) {
620 req.notifyError("could not send to pipe");
621 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
622 LOG.log(Level.FINE, "could not send to pipe", e);
627 class ClientMessage {
628 private Requestor requestor;
629 private Message message;
631 public ClientMessage(Requestor req, Message mess) {
636 // Send this (pending) message
637 public void send(OutputPipe out) {
638 sendToPipe(requestor, message, out);
645 private ClientMessage pending;
647 public PendingPipe() {
651 // Just got resolved ! Will send the pending message(s).
652 public void sendPending(OutputPipe out) {
657 // Enqueue a new pending message.
658 // (for now we only enqueue 1; others get trashed)
659 public void enqueue(Requestor req, Message mess) {
660 if (pending != null) {
663 pending = new ClientMessage(req, mess);
667 private void handleSendRequest(Requestor requestor, String id, Message message) {
669 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
670 LOG.fine("handleSendRequest id=" + id);
673 PipeAdvertisement pipeAdv = findPipeAdvertisement(null, id, null);
675 if (pipeAdv == null) {
676 requestor.notifyError("Could not find pipe");
680 String pipeId = pipeAdv.getPipeID().toString();
682 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
684 "send to pipe name=" + pipeAdv.getName() + " id=" + pipeAdv.getPipeID().toString() + " arg="
685 + pipeAdv.getType());
688 // check if there are local listeners
690 PipeListenerList list = pipeListeners.get(pipeId);
691 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
692 LOG.fine("local listener list " + list);
695 if (list != null && PipeService.UnicastType.equals(pipeAdv.getType())) {
697 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
698 LOG.fine("start sending to each requestor");
701 list.send(message, pipeId);
702 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
703 LOG.fine("end sending to each requestor");
705 // notify requestor of success
706 requestor.notifySuccess();
710 // NOTE: This part is NOT exercised by the load test because all
711 // clients are local. To exercise this part, comment out the
712 // optimization above.
714 // This is not a unicast pipe with at least one local listener
715 // so we need to fingure out where the message should go.
716 // This may take a while and has to be done asynchronously...
717 // Carefull that the resolution can occur synchronously by this
718 // very thread, and java lock will not prevent re-entry.
720 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
721 LOG.fine("output pipe creation begin");
724 // Look for the pipe in the resolved list. If not found
725 // look in the pending list or add it there.
726 OutputPipe out = (OutputPipe) resolvedPipes.get(pipeId);
729 sendToPipe(requestor, message, out);
732 PendingPipe p = (PendingPipe) pendingPipes.get(pipeId);
735 p.enqueue(requestor, message);
740 p = new PendingPipe();
741 p.enqueue(requestor, message);
742 pendingPipes.put(pipeId, p);
743 pipe.createOutputPipe(pipeAdv, this);
744 } catch (IOException e) {
745 pendingPipes.remove(pipeId);
746 requestor.notifyError("could not create output pipe");
750 // TBD: DO WE NEED THIS FUNCTIONALITY FOR JXME?
751 private PeerAdvertisement createPeerAdvertisement(String name, String id) {
752 PeerAdvertisement adv = null;
758 ID tempId = IDFactory.fromURI(new URI(id));
760 pid = (PeerID) tempId;
761 } catch (URISyntaxException e) {
762 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
763 LOG.log(Level.WARNING, "Could not parse peerId from url", e);
765 } catch (ClassCastException e) {
766 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
767 LOG.log(Level.WARNING, "id was not a peerid", e);
773 pid = IDFactory.newPeerID(group.getPeerGroupID());
776 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
777 LOG.fine("newPeerAdvertisement name=" + name + " id=" + pid.toString());
781 // Create a pipe advertisement for this pipe.
782 adv = (PeerAdvertisement) AdvertisementFactory.newAdvertisement(PeerAdvertisement.getAdvertisementType());
785 adv.setPeerGroupID(group.getPeerGroupID());
787 adv.setDescription("Peer Advertisement created for a jxme device");
788 } catch (Exception e) {
789 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
790 LOG.log(Level.WARNING, "newPeerAdvertisement Exception", e);
797 private PeerGroupAdvertisement createGroupAdvertisement(String name, String id) {
798 PeerGroupAdvertisement adv;
800 PeerGroupID gid = null;
804 ID tempId = IDFactory.fromURI(new URI(id));
806 gid = (PeerGroupID) tempId;
807 } catch (URISyntaxException e) {
808 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
809 LOG.log(Level.WARNING, "Invalid peergroupId", e);
811 } catch (ClassCastException e) {
812 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
813 LOG.log(Level.WARNING, "id was not a peergroup id", e);
819 gid = IDFactory.newPeerGroupID();
822 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
823 LOG.fine("newPeerGroupAdvertisement name=" + name + " id=" + gid.toString());
826 adv = group.getPeerGroupAdvertisement().clone();
829 // Create a PeerGroup Advertisement for this pipe.
830 adv = (PeerGroupAdvertisement) AdvertisementFactory.newAdvertisement(PeerGroupAdvertisement.getAdvertisementType());
832 adv.setPeerGroupID(gid);
833 adv.setModuleSpecID(PeerGroup.allPurposePeerGroupSpecID);
834 adv.setDescription("PeerGroup Advertisement created for a jxme device");
835 ModuleImplAdvertisement groupImplAdv = group.getAllPurposePeerGroupImplAdvertisement();
837 discovery.publish(groupImplAdv);
838 discovery.publish(adv);
839 } catch (Exception e) {
840 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
841 LOG.log(Level.WARNING, "newPeerGroupAdvertisement Exception", e);
848 private PipeAdvertisement createPipeAdvertisement(String pipeName, String pipeId, String pipeType) {
849 PipeAdvertisement adv = null;
851 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
852 LOG.fine("newPipeAdvertisement name=" + pipeName + " pipeId=" + pipeId + " pipeType=" + pipeType);
855 if (pipeType == null || pipeType.length() == 0) {
856 pipeType = PipeService.UnicastType;
859 if (pipeId == null) {
860 pipeId = IDFactory.newPipeID(group.getPeerGroupID()).toString();
864 // Create a pipe advertisement for this pipe.
865 adv = (PipeAdvertisement) AdvertisementFactory.newAdvertisement(PipeAdvertisement.getAdvertisementType());
867 adv.setName(pipeName);
868 adv.setPipeID(IDFactory.fromURI(new URI(pipeId)));
869 adv.setType(pipeType);
870 } catch (Exception e) {
871 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
872 LOG.log(Level.WARNING, "newPipeAdvertisement Exception", e);
879 private PipeAdvertisement findPipeAdvertisement(String name, String id, String arg) {
880 String attribute, value;
882 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
883 LOG.fine("findPipeAdvertisement name=" + name + " id=" + id + " arg=" + arg);
887 attribute = PipeAdvertisement.IdTag;
889 } else if (name != null) {
890 attribute = PipeAdvertisement.NameTag;
893 // the id or the name must be specified
897 Enumeration<Advertisement> each;
900 each = discovery.getLocalAdvertisements(DiscoveryService.ADV, attribute, value);
901 } catch (IOException e) {
902 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
903 LOG.log(Level.WARNING, "IOException in getLocalAdvertisements()", e);
908 PipeAdvertisement pipeAdv = null;
910 while (each.hasMoreElements()) {
911 Advertisement adv = each.nextElement();
913 // take the first match
914 if (adv instanceof PipeAdvertisement) {
915 pipeAdv = (PipeAdvertisement) adv;
916 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
917 LOG.fine("found PipeAdvertisement = " + pipeAdv);
926 public synchronized void discoveryEvent(DiscoveryEvent event) {
928 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
929 LOG.fine("discoveryEvent " + event);
932 Requestor requestor = searchRequests.get(event.getQueryID());
933 if (requestor == null) {
937 DiscoveryResponseMsg response = event.getResponse();
938 if (response == null) {
942 Enumeration<Advertisement> each = response.getAdvertisements();
943 if (each == null || !each.hasMoreElements()) {
946 // we have a response remove it from the LRUCache
947 searchRequests.remove(event.getQueryID());
950 while (each.hasMoreElements() && i < requestor.getThreshold()) {
952 requestor.send(each.nextElement(), RESPONSE_RESULT);
953 } catch (Exception e) {
954 // this should not happen unless a bad result is returned
955 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
956 LOG.log(Level.WARNING, "Bad result returned by DiscoveryService", e);
965 public synchronized void pipeMsgEvent(PipeMsgEvent event) {
966 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
967 LOG.fine("pipeMsgEvent " + event.getPipeID());
970 String id = event.getPipeID().toString();
972 PipeListenerList list = pipeListeners.get(id);
975 Message message = event.getMessage();
977 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
978 LOG.fine("pipeMsgEvent: start sending to each requestor");
980 list.send(message.clone(), id);
981 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
982 LOG.fine("pipeMsgEvent: end sending to each requestor");
985 // there are no listeners, close the input pipe
986 ((InputPipe) event.getSource()).close();
987 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
988 LOG.fine("close pipe id=" + id);
996 public synchronized void outputPipeEvent(OutputPipeEvent event) {
998 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
999 LOG.fine("outputPipeEvent " + event);
1001 PendingPipe p = (PendingPipe) pendingPipes.remove(event.getPipeID());
1003 // No one cares (anylonger). TBD should it be removed then??
1005 event.getOutputPipe().close();
1009 resolvedPipes.put(event.getPipeID(), event.getOutputPipe());
1010 p.sendPending(event.getOutputPipe());
1013 private static String popString(String name, Message message) {
1014 MessageElement el = message.getMessageElement(PROXYNS, name);
1017 message.removeMessageElement(el);
1018 return el.toString();
1023 static class PipeListenerList {
1024 LinkedList<Requestor> list = new LinkedList<Requestor>();
1025 InputPipe inputPipe = null;
1026 Map<String, PipeListenerList> pipeListeners = null;
1029 PipeListenerList(InputPipe inputPipe, Map<String, PipeListenerList> pipeListeners, String id) {
1030 this.inputPipe = inputPipe;
1031 this.pipeListeners = pipeListeners;
1034 if (pipeListeners != null) {
1035 if (Logging.SHOW_CONFIG && LOG.isLoggable(Level.CONFIG)) {
1036 LOG.config("number of pipeListeners = " + pipeListeners.size());
1041 void add(Requestor requestor) {
1042 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
1043 LOG.info("add " + requestor + " from " + toString());
1046 if (!list.contains(requestor)) {
1047 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1048 LOG.fine("requestor add");
1050 list.add(requestor);
1052 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1053 LOG.fine("requestor exits already");
1058 void remove(Requestor requestor) {
1059 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
1060 LOG.info("remove " + requestor + " from " + toString());
1063 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1064 LOG.fine("removed = " + list.remove(requestor));
1067 if (list.size() == 0) {
1068 // close the pipe and remove from the listenerList
1069 if (inputPipe != null) {
1073 if (id != null && pipeListeners != null) {
1074 pipeListeners.remove(id);
1080 int size = list.size();
1081 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1082 LOG.fine("size " + size);
1087 private static StringMessageElement sme = new StringMessageElement(RESPONSE_TAG, RESPONSE_MESSAGE, null);
1089 void send(Message message, String id) {
1090 LOG.fine("send list.size = " + list.size());
1092 message.addMessageElement(PROXYNS, sme);
1093 message.addMessageElement(PROXYNS, new StringMessageElement(ID_TAG, id, null));
1095 // removed all element that are known to be not needed
1096 Iterator<MessageElement> elements = message.getMessageElements();
1098 while (elements.hasNext()) {
1099 MessageElement el = elements.next();
1100 String name = el.getElementName();
1102 if (name.startsWith("RendezVousPropagate")) {
1103 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1104 LOG.fine("removeMessageElement " + name);
1107 } else if (name.startsWith("JxtaWireHeader")) {
1108 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1109 LOG.fine("removeMessageElement " + name);
1112 } else if (name.startsWith("RdvIncarnjxta")) {
1113 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1114 LOG.fine("removeMessageElement " + name);
1117 } else if (name.startsWith("JxtaEndpointRouter")) {
1118 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1119 LOG.fine("removeMessageElement " + name);
1122 } else if (name.startsWith("EndpointRouterMsg")) {
1123 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1124 LOG.fine("removeMessageElement " + name);
1127 } else if (name.startsWith("EndpointHeaderSrcPeer")) {
1128 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1129 LOG.fine("removeMessageElement " + name);
1135 Iterator<Requestor> iterator = list.iterator();
1137 while (iterator.hasNext()) {
1138 Requestor requestor = iterator.next();
1140 if (!requestor.send(message.clone())) {
1141 // could not send to listener, remove them from the list
1145 } catch (Exception ex) {
1146 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1147 LOG.fine("Error sending" + ex);
1156 public String toString() {
1157 return "PipeListenerList size=" + list.size();
1161 protected static void logMessage(Message message, Logger log) {
1162 if (!Logging.SHOW_FINER || !log.isLoggable(Level.FINER)) {
1166 StringBuilder out = new StringBuilder("\n**************** begin ****************\n");
1168 Message.ElementIterator elements = message.getMessageElements();
1170 while (elements.hasNext()) {
1171 MessageElement element = elements.next();
1173 out.append("[").append(elements.getNamespace()).append(",").append(element.getElementName()).append("]=").append(element.toString()).append(
1177 out.append("**************** end ****************\n");
1178 log.finer(out.toString());
1184 public void purged(CacheEntry ce) {
1185 // A resolved pipe was purged from the cache because we have to
1186 // many pre-resolved pipes hanging around. Close it, because
1187 // it may be holding critical resources that the GC will not be
1189 ((OutputPipe) (ce.getValue())).close();