2 * Copyright (c) 2001-2007 Sun Microsystems, Inc. All rights reserved.
4 * The Sun Project JXTA(TM) Software License
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
9 * 1. Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
16 * 3. The end-user documentation included with the redistribution, if any, must
17 * include the following acknowledgment: "This product includes software
18 * developed by Sun Microsystems, Inc. for JXTA(TM) technology."
19 * Alternately, this acknowledgment may appear in the software itself, if
20 * and wherever such third-party acknowledgments normally appear.
22 * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must
23 * not be used to endorse or promote products derived from this software
24 * without prior written permission. For written permission, please contact
25 * Project JXTA at http://www.jxta.org.
27 * 5. Products derived from this software may not be called "JXTA", nor may
28 * "JXTA" appear in their name, without prior written permission of Sun.
30 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
31 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
32 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN
33 * MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
34 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
36 * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
37 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
38 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
39 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41 * JXTA is a registered trademark of Sun Microsystems, Inc. in the United
42 * States and other countries.
44 * Please see the license information page at :
45 * <http://www.jxta.org/project/www/license.html> for instructions on use of
46 * the license in source files.
48 * ====================================================================
50 * This software consists of voluntary contributions made by many individuals
51 * on behalf of Project JXTA. For more information on Project JXTA, please see
52 * http://www.jxta.org.
54 * This license is based on the BSD license adopted by the Apache Foundation.
56 package net.jxta.impl.peergroup;
58 import java.util.concurrent.BlockingQueue;
59 import java.util.concurrent.Executor;
60 import java.util.concurrent.ThreadPoolExecutor;
61 import net.jxta.access.AccessService;
62 import net.jxta.discovery.DiscoveryService;
63 import net.jxta.document.Advertisement;
64 import net.jxta.document.AdvertisementFactory;
65 import net.jxta.document.Element;
66 import net.jxta.document.XMLElement;
67 import net.jxta.endpoint.EndpointService;
68 import net.jxta.exception.PeerGroupException;
69 import net.jxta.exception.ProtocolNotSupportedException;
70 import net.jxta.exception.ServiceNotFoundException;
71 import net.jxta.id.ID;
72 import net.jxta.id.IDFactory;
73 import net.jxta.impl.loader.RefJxtaLoader;
74 import net.jxta.impl.protocol.PSEConfigAdv;
75 import net.jxta.impl.protocol.PlatformConfig;
76 import net.jxta.impl.util.TimeUtils;
77 import net.jxta.logging.Logging;
78 import net.jxta.membership.MembershipService;
79 import net.jxta.peer.PeerID;
80 import net.jxta.peer.PeerInfoService;
81 import net.jxta.peergroup.PeerGroup;
82 import net.jxta.peergroup.PeerGroupID;
83 import net.jxta.pipe.PipeService;
84 import net.jxta.platform.JxtaLoader;
85 import net.jxta.platform.Module;
86 import net.jxta.platform.ModuleClassID;
87 import net.jxta.platform.ModuleSpecID;
88 import net.jxta.protocol.ConfigParams;
89 import net.jxta.protocol.ModuleImplAdvertisement;
90 import net.jxta.protocol.PeerAdvertisement;
91 import net.jxta.protocol.PeerGroupAdvertisement;
92 import net.jxta.rendezvous.RendezVousService;
93 import net.jxta.resolver.ResolverService;
94 import net.jxta.service.Service;
96 import java.io.IOException;
97 import java.lang.reflect.Method;
98 import java.lang.reflect.UndeclaredThrowableException;
101 import java.security.cert.Certificate;
102 import java.util.ArrayList;
103 import java.util.Collection;
104 import java.util.Collections;
105 import java.util.Enumeration;
106 import java.util.HashMap;
107 import java.util.Hashtable;
108 import java.util.Iterator;
109 import java.util.List;
110 import java.util.Map;
111 import java.util.NoSuchElementException;
112 import java.util.concurrent.ArrayBlockingQueue;
113 import java.util.concurrent.RejectedExecutionException;
114 import java.util.concurrent.RejectedExecutionHandler;
115 import java.util.concurrent.ScheduledExecutorService;
116 import java.util.concurrent.ScheduledThreadPoolExecutor;
117 import java.util.concurrent.ThreadFactory;
118 import java.util.concurrent.TimeUnit;
119 import java.util.concurrent.atomic.AtomicInteger;
120 import java.util.logging.Level;
121 import java.util.logging.Logger;
124 * Provides common services for most peer group implementations.
126 public abstract class GenericPeerGroup implements PeerGroup {
131 private final static transient Logger LOG = Logger.getLogger(GenericPeerGroup.class.getName());
134 * Holder for configuration parameters for groups in the process of being created.
136 private final static Map<ID, ConfigParams> group_configs = Collections.synchronizedMap(new HashMap<ID, ConfigParams>());
139 * The loader - use the getter and setter for modifying the ClassLoader for
140 * a security manager.
142 * This should eventually be group scoped rather than implementation
143 * scoped. We are currently allowing classes to loaded into contexts which
144 * they should not be known.
146 private final static JxtaLoader loader = new RefJxtaLoader(new URL[0], new CompatibilityEquater() {
147 public boolean compatible(Element test) {
148 return StdPeerGroup.isCompatible(test);
153 * Shortcuts to well known services.
155 private EndpointService endpoint;
156 private ResolverService resolver;
157 private DiscoveryService discovery;
158 private PipeService pipe;
159 private MembershipService membership;
160 private RendezVousService rendezvous;
161 private PeerInfoService peerinfo;
162 private AccessService access;
165 * This peer's advertisement in this group.
167 private final PeerAdvertisement peerAdvertisement;
170 * This group's advertisement.
172 private PeerGroupAdvertisement peerGroupAdvertisement = null;
175 * This group's implAdvertisement.
177 protected ModuleImplAdvertisement implAdvertisement = null;
180 * This peer's config advertisement.
182 protected ConfigParams configAdvertisement = null;
185 * This service implements a group but, being a Service, it runs inside of
186 * some group. That's its home group.
188 * Exception: The platform peer group does not have a parent group. It
189 * has to be entirely self sufficient.
191 protected PeerGroup parentGroup = null;
194 * The location of our store
196 protected URI jxtaHome = null;
199 * The services that do the real work of the Peer Group.
201 private final Map<ModuleClassID, Service> services = new HashMap<ModuleClassID, Service>();
204 * {@code true} when we have decided to stop this group.
206 private volatile boolean stopping = false;
209 * {@code true} when the PG adv has been published.
211 private boolean published = false; // assume it hasn't
214 * Counts the number of times an interface to this group has been given out.
215 * This is decremented every time an interface object is GCed or
216 * its owner calls unref().
218 * <p/>When it reaches zero, if it is time to tear-down the group instance;
219 * nomatter what the GC thinks. There are threads that need to be stopped
220 * before the group instance object ever becomes un-reachable.
222 private int masterRefCount = 0;
225 * Is {@code true} when at least one interface object has been generated AFTER
226 * initComplete has become true. If true, the group stops when its ref
227 * count reaches zero.
229 private boolean stopWhenUnreferenced = false;
232 * Is set to {@code true} when {@code init()} is completed enough that it
233 * makes sense to perform ref-counting.
235 protected volatile boolean initComplete = false;
238 * The thread group in which threads created by this group or services of
239 * this group should live. The thread group is used primarily for debugging
240 * and classification purposes--we don't try to use any of the fancy (and
241 * mostly useless) ThreadGroup features.
243 private ThreadGroup threadGroup = null;
246 * The minimum number of Threads our Executor will reserve. Once started
247 * these Threads will remain.
249 * todo convert these hardcoded settings into group config params.
251 private final int COREPOOLSIZE = 5;
255 * The number of seconds that Threads above {@code COREPOOLSIZE} will
256 * remain idle before terminating.
258 * todo convert these hardcoded settings into group config params.
260 private final long KEEPALIVETIME = 15;
264 * The intended upper bound on the number of threads we will allow our
265 * Executor to create. We will allow the pool to grow to twice this size if
266 * we run out of threads.
268 * todo convert these hardcoded settings into group config params.
270 private final int MAXPOOLSIZE = 100;
274 * Queue for tasks waiting to be run by our {@code Executor}.
276 private BlockingQueue<Runnable> taskQueue;
280 * The PeerGroup ThreadPool
282 private ThreadPoolExecutor threadPool;
285 * The PeerGroup ScheduledExecutor
287 private ScheduledThreadPoolExecutor scheduledExecutor;
293 * We do not want to count on the invoker to properly unreference the group
294 * object that we return; this call is often used in a loop and it is silly
295 * to increment and decrement ref-counts for references that are sure to
296 * live shorter than the referee.
298 * On the other hand it is dangerous for us to share our reference object to
299 * the parent group. That's where weak interface objects come in handy. We
300 * can safely make one and give it away.
302 public PeerGroup getParentGroup() {
303 if (parentGroup == null) {
306 return parentGroup.getWeakInterface();
312 public URI getStoreHome() {
317 * Sets the root location for the store to be used by this peergroup.
319 * This should be set early in the peer group's life and then never
322 * @param newHome The new store location.
324 protected void setStoreHome(URI newHome) {
331 public static JxtaLoader getJxtaLoader() {
335 public GenericPeerGroup() {
336 // Start building our peer adv.
337 peerAdvertisement = (PeerAdvertisement)
338 AdvertisementFactory.newAdvertisement(PeerAdvertisement.getAdvertisementType());
345 public boolean equals(Object target) {
346 if (!(target instanceof PeerGroup)) {
350 PeerGroup targetAsPeerGroup = (PeerGroup) target;
352 // both null or both non-null.
353 if ((null == parentGroup) && (null != targetAsPeerGroup.getParentGroup())) {
357 if ((null != parentGroup) && (null == targetAsPeerGroup.getParentGroup())) {
361 if ((null != parentGroup) && !parentGroup.equals(targetAsPeerGroup.getParentGroup())) {
365 // and same peer ids.
366 return getPeerGroupID().equals(targetAsPeerGroup.getPeerGroupID());
373 public int hashCode() {
374 // before init we must fail.
375 if ((null == peerAdvertisement) || (null == getPeerGroupID())) {
376 throw new IllegalStateException("PeerGroup not sufficiently initialized");
379 // XXX 20050907 bondolo including parentGroup would improve the hash.
380 return getPeerGroupID().hashCode();
386 * An implementation suitable for debugging. <b>Don't try to parse
387 * this string!</b> All of the information is available from other sources.
390 public String toString() {
391 if (null == getPeerGroupID()) {
392 return super.toString();
395 StringBuilder result = new StringBuilder();
397 result.append(getPeerGroupID().toString());
398 String peerGroupName = peerGroupAdvertisement.getName();
400 if (null != peerGroupName) {
401 result.append(" \"");
402 result.append(peerGroupName);
407 result.append(masterRefCount);
410 if (null != parentGroup) {
411 result.append(" / ");
412 result.append(parentGroup.toString());
415 return result.toString();
421 public ThreadGroup getHomeThreadGroup() {
426 * Discover advertisements.
428 * @param discovery The discovery service to use.
429 * @param type the Discovery advertisement type.
430 * @param attr The attribute to search for or {@code null}.
431 * @param value The attribute value to match or {@code null}.
432 * @param seconds The number of seconds to search.
433 * @param thisClass The Advertisement class which the advertisement must
435 * @return a Collection of advertisements
437 private Collection<Advertisement> discoverSome(DiscoveryService discovery, int type, String attr, String value, int seconds, Class thisClass) {
438 long discoverUntil = TimeUtils.toAbsoluteTimeMillis(seconds * TimeUtils.ASECOND);
439 long lastRemoteAt = 0; // no previous remote discovery made.
441 List<Advertisement> results = new ArrayList<Advertisement>();
445 Enumeration<Advertisement> res = discovery.getLocalAdvertisements(type, attr, value);
447 while (res.hasMoreElements()) {
448 Advertisement a = res.nextElement();
450 if (thisClass.isInstance(a)) {
455 if (!results.isEmpty()) {
459 if (TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), lastRemoteAt) > (30 * TimeUtils.ASECOND)) {
460 discovery.getRemoteAdvertisements(null, type, attr, value, 20);
461 lastRemoteAt = TimeUtils.timeNow();
464 // snooze waiting for responses to come in.
466 } while (TimeUtils.timeNow() < discoverUntil);
467 } catch (Exception whatever) {
468 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
469 LOG.log(Level.WARNING, "Failure during discovery", whatever);
477 * Discover an advertisement within the local peer group.
479 * @param type the Discovery advertisement type.
480 * @param attr The attribute to search for or {@code null}.
481 * @param value The attribute value to match or {@code null}.
482 * @param seconds The number of seconds to search.
483 * @param thisClass The Advertisement class which the advertisement must match.
484 * @return a Collection of advertisements
486 private Advertisement discoverOne(int type, String attr, String value, int seconds, Class thisClass) {
487 Iterator<Advertisement> res = discoverSome(discovery, type, attr, value, seconds, thisClass).iterator();
489 if (!res.hasNext()) {
496 * Shortcuts to the standard basic services.
498 * @param mcid The Module Class ID of the service.
499 * @param service The service instance to set as the shortcut or
500 * {@code null} to clear the shortcut.
502 private void setShortCut(ModuleClassID mcid, Service service) {
503 if (endpointClassID.equals(mcid)) {
504 endpoint = (EndpointService) service;
507 if (resolverClassID.equals(mcid)) {
508 resolver = (ResolverService) service;
511 if (discoveryClassID.equals(mcid)) {
512 discovery = (DiscoveryService) service;
515 if (pipeClassID.equals(mcid)) {
516 pipe = (PipeService) service;
519 if (membershipClassID.equals(mcid)) {
520 membership = (MembershipService) service;
523 if (peerinfoClassID.equals(mcid)) {
524 peerinfo = (PeerInfoService) service;
527 if (rendezvousClassID.equals(mcid)) {
528 rendezvous = (RendezVousService) service;
531 if (accessClassID.equals(mcid)) {
532 access = (AccessService) service;
537 * Add a service to the collection of known services.
539 * @param mcid The Module Class ID of the service.
540 * @param service The service instance to set as the shortcut or
542 protected synchronized void addService(ModuleClassID mcid, Service service) {
547 if (services.containsKey(mcid)) {
548 throw new IllegalStateException("Service" + mcid + " already registered.");
551 services.put(mcid, service);
553 setShortCut(mcid, service);
559 synchronized public Service lookupService(ID mcid) throws ServiceNotFoundException {
560 Service p = services.get(mcid);
563 throw new ServiceNotFoundException("Not found: " + mcid.toString());
566 return p.getInterface();
572 * Group implementations do not have to support mapping.
573 * it would be nice to separate better Interfaces, so that
574 * Interface Objects can do things that the real service does
575 * not have to implement.
577 public Service lookupService(ID mcid, int roleIndex) throws ServiceNotFoundException {
579 // If the role number is != 0, it can't be honored: we
580 // do not have an explicit map.
582 if (roleIndex != 0) {
583 throw new ServiceNotFoundException("Not found: " + mcid + "[" + roleIndex + "]");
586 return lookupService(mcid);
592 public Iterator getRoleMap(ID name) {
593 // No translation; use the given name in a singleton.
594 return Collections.singletonList(name).iterator();
598 * check that all required core services are registered
600 * @throws ServiceNotFoundException If a required service was not found.
602 protected void checkServices() throws ServiceNotFoundException {
605 ignored = lookupService(endpointClassID);
606 ignored = lookupService(resolverClassID);
607 ignored = lookupService(membershipClassID);
608 ignored = lookupService(accessClassID);
611 * ignored = lookupService(discoveryClassID);
612 * ignored = lookupService(pipeClassID);
613 * ignored = lookupService(rendezvousClassID);
614 * ignored = lookupService(peerinfoClassID);
619 * Ask a group to unregister and unload a service
621 * @param mcid The service to be removed.
622 * @throws ServiceNotFoundException if service is not found
624 protected synchronized void removeService(ModuleClassID mcid) throws ServiceNotFoundException {
625 setShortCut(mcid, null);
627 Service p = services.remove(mcid);
630 throw new ServiceNotFoundException("Not found: " + mcid.toString());
635 // service.terminate(); FIXME. We probably need a terminate()
637 // FIXME: [jice@jxta.org 20011013] to make sure the service is
638 // no-longer referenced, we should always return interfaces, and
639 // have a way to cut the reference to the real service in the
640 // interfaces. One way of doing that would be to have to levels
641 // of indirection: we should keep one and return references to it.
642 // when we want to cut the service loose, we should clear the
643 // reference from the interface that we own before letting it go.
644 // We need to study the consequences of doing that before implementing
651 public Module loadModule(ID assigned, Advertisement impl) throws ProtocolNotSupportedException, PeerGroupException {
652 return loadModule(assigned, (ModuleImplAdvertisement) impl, false);
656 * Load a Module from a ModuleImplAdv.
658 * Compatibility is checked and load is attempted. If compatible and
659 * loaded successfully, the resulting Module is initialized and returned.
660 * In most cases the other loadModule() method should be preferred, since
661 * unlike this one, it will seek many compatible implementation
662 * advertisements and try them all until one works. The home group of the new
663 * module (its' parent group if the new Module is a group) will be this group.
665 * @param assigned Id to be assigned to that module (usually its ClassID).
666 * @param implAdv An implementation advertisement for that module.
667 * @param privileged If {@code true} then the module is provided the true
668 * group obj instead of just an interface to the group object. This is
669 * normally used only for the group's defined services and applications.
670 * @return Module the module loaded and initialized.
671 * @throws ProtocolNotSupportedException The module is incompatible.
672 * @throws PeerGroupException The module could not be loaded or initialized
674 protected Module loadModule(ID assigned, ModuleImplAdvertisement implAdv, boolean privileged) throws ProtocolNotSupportedException, PeerGroupException {
676 Element compat = implAdv.getCompat();
678 if (null == compat) {
679 throw new IllegalArgumentException("No compatibility statement for : " + assigned);
682 if (!compatible(compat)) {
683 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
684 LOG.warning("Incompatible Module : " + assigned);
687 throw new ProtocolNotSupportedException("Incompatible Module : " + assigned);
690 Module newMod = null;
692 if ((null != implAdv.getCode()) && (null != implAdv.getUri())) {
698 clazz = (Class<Module>) loader.findClass(implAdv.getModuleSpecID());
699 } catch (ClassNotFoundException notLoaded) {
700 clazz = (Class<Module>) loader.defineClass(implAdv);
704 throw new ClassNotFoundException("Cannot load class (" + implAdv.getCode() + ") : " + assigned);
707 newMod = clazz.newInstance();
709 newMod.init(privileged ? this : (PeerGroup) getInterface(), assigned, implAdv);
711 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
712 LOG.info( "Loaded" + (privileged ? " privileged" : "") +
713 " module : " + implAdv.getDescription() + " (" + implAdv.getCode() + ")");
715 } catch (Exception ex) {
718 } catch (Throwable ignored) {
719 // If this does not work, nothing needs to be done.
721 throw new PeerGroupException("Could not load module for : " + assigned + " (" + implAdv.getDescription() + ")", ex);
726 if (null == implAdv.getCode()) {
727 error = "ModuleImpAdvertisement missing Code element";
728 } else if (null == implAdv.getUri()) {
729 error = "ModuleImpAdvertisement missing URI element";
731 error = "ModuleImpAdvertisement missing both Code and URI elements";
733 throw new PeerGroupException("Can not load module : " + error + " for" + assigned);
736 // Publish or renew the lease of this adv since we're using it.
738 if (discovery != null) {
739 discovery.publish(implAdv, DEFAULT_LIFETIME, DEFAULT_EXPIRATION);
741 } catch (Exception ignored) {// ignored
744 // If we reached this point we're done.
751 public Module loadModule(ID assigned, ModuleSpecID specID, int where) {
752 return loadModule(assigned, specID, where, false);
756 * Load a module from a ModuleSpecID
758 * Advertisement is sought, compatibility is checked on all candidates and
759 * load is attempted. The first one that is compatible and loads
760 * successfully is initialized and returned.
762 * @param assignedID Id to be assigned to that module (usually its ClassID).
763 * @param specID The specID of this module.
764 * @param where May be one of: {@code Here}, {@code FromParent}, or
765 * {@code Both}, meaning that the implementation advertisement will be
766 * searched in this group, its parent or both. As a general guideline, the
767 * implementation advertisements of a group should be searched in its
768 * prospective parent (that is Here), the implementation advertisements of a
769 * group standard service should be searched in the same group than where
770 * this group's advertisement was found (that is, FromParent), while
771 * applications may be sought more freely (Both).
772 * @param privileged If {@code true} then the module is provided the true
773 * group obj instead of just an interface to the group object. This is
774 * normally used only for the group's defined services and applications.
775 * @return Module the new module, or {@code null} if no usable implementation was found.
777 protected Module loadModule(ID assignedID, ModuleSpecID specID, int where, boolean privileged) {
779 List<Advertisement> allModuleImplAdvs = new ArrayList<Advertisement>();
781 ModuleImplAdvertisement loadedImplAdv = loader.findModuleImplAdvertisement(specID);
782 if(null != loadedImplAdv) {
783 // We already have a module defined for this spec id. Use that.
784 allModuleImplAdvs.add(loadedImplAdv);
786 // Search for a module to use.
787 boolean fromHere = (where == Here || where == Both);
788 boolean fromParent = (where == FromParent || where == Both);
790 if (fromHere && (null != discovery)) {
791 Collection<Advertisement> here = discoverSome(discovery, DiscoveryService.ADV,
792 "MSID", specID.toString(), 120, ModuleImplAdvertisement.class);
794 allModuleImplAdvs.addAll(here);
797 if (fromParent && (null != getParentGroup()) && (null != parentGroup.getDiscoveryService())) {
798 Collection<Advertisement> parent = discoverSome(parentGroup.getDiscoveryService(), DiscoveryService.ADV,
799 "MSID", specID.toString(), 120, ModuleImplAdvertisement.class);
801 allModuleImplAdvs.addAll(parent);
805 Throwable recentFailure = null;
807 for (Advertisement eachAdv : allModuleImplAdvs) {
808 if( !(eachAdv instanceof ModuleImplAdvertisement) ) {
812 ModuleImplAdvertisement foundImpl = (ModuleImplAdvertisement) eachAdv;
815 // First check that the MSID is really the one we're looking for.
816 // It could have appeared somewhere else in the adv than where
817 // we're looking, and discovery doesn't know the difference.
818 if (!specID.equals(foundImpl.getModuleSpecID())) {
822 Module newMod = loadModule(assignedID, foundImpl, privileged);
824 // If we reach that point, the module is good.
826 } catch (ProtocolNotSupportedException failed) {
827 // Incompatible implementation.
828 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
829 LOG.log(Level.FINE, "Incompatbile impl adv");
831 } catch (PeerGroupException failed) {
832 // Initialization failure.
833 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
834 LOG.log(Level.WARNING, "Initialization failed", failed);
836 } catch (Throwable e) {
838 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
839 LOG.log(Level.WARNING, "Not a usable impl adv: ", e);
844 // Throw an exception if there was a recent failure.
845 if (null != recentFailure) {
846 if (recentFailure instanceof Error) {
847 throw (Error) recentFailure;
848 } else if (recentFailure instanceof RuntimeException) {
849 throw (RuntimeException) recentFailure;
851 throw new UndeclaredThrowableException(recentFailure);
855 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
856 LOG.warning("Could not find a loadable implementation for SpecID: " + specID);
865 public ConfigParams getConfigAdvertisement() {
866 return configAdvertisement;
870 * Sets the configuration advertisement for this peer group.
872 * @param config The configuration advertisement which will be used for
873 * this peer group or {@code null} if no configuration advertisement is to
876 protected void setConfigAdvertisement(ConfigParams config) {
877 configAdvertisement = config;
881 * Adds configuration parameters for the specified group. The configuration
882 * parameters remain cached until either the specified group is started or
883 * the parameters are replaced.
885 * @param groupid The group for who's params are being provided.
886 * @param params The parameters to be provided to the peer group when it is
889 public static void setGroupConfigAdvertisement(ID groupid, ConfigParams params) {
890 if( null != params) {
891 group_configs.put(groupid, params);
893 group_configs.remove(groupid);
898 * Now comes the implementation of the public API, including the
899 * API mandated by the Service interface.
905 * It is not recommended to overload this method. Instead, subclassers
906 * should overload either or both of
907 * {@link #initFirst(PeerGroup,ID,Advertisement)} and {@link #initLast()}.
908 * If this method is to be overloaded, the overloading method must
909 * invoke <code>super.init</code>.
911 * This method invokes <code>initFirst</code>
912 * with identical parameters. <code>initLast</initLast> does not take
913 * parameters since the relevant information can be obtained from the
914 * group following completion of the <code>initFirst</code> phase.
915 * The resulting values may be different from the parameters to
916 * <code>initFirst</code> since <code>initFirst</code> may
917 * be overLoaded and the overloading method may modify these parameters
918 * when calling <code>super.initFirst</code>. (See
919 * {@link net.jxta.impl.peergroup.Platform} for an example of such a case).
921 * Upon completion, the group object is marked as completely initialized
922 * in all cases. Once a group object is completely initialized, it becomes
923 * sensitive to reference counting.
925 * In the future this method may become final.
927 public void init(PeerGroup homeGroup, ID assignedID, Advertisement impl) throws PeerGroupException {
929 initFirst(homeGroup, assignedID, impl);
932 // This must be done in all cases.
938 * Performs all initialization steps that need to be performed
939 * before any subclass initialization is performed.
941 * Classes that override this method should always call
942 * <code>super.initFirst()</code> <strong>before</strong> doing
943 * any of their own work.
945 * @param homeGroup The group that serves as a parent to this group.
946 * @param assignedID The unique ID assigned to this module. For
947 * group this is the group ID or <code>null</code> if a group ID
948 * has not yet been assigned. If null is passed, GenericPeerGroup
949 * will generate a new group ID.
950 * @param impl The ModuleImplAdvertisement which defines this
951 * group's implementation.
952 * @throws PeerGroupException if a group initialization error occurs
954 protected void initFirst(PeerGroup homeGroup, ID assignedID, Advertisement impl) throws PeerGroupException {
956 this.implAdvertisement = (ModuleImplAdvertisement) impl;
957 this.parentGroup = homeGroup;
959 if (null != parentGroup) {
960 jxtaHome = parentGroup.getStoreHome();
963 // Set the peer configuration before we start.
964 if((null != assignedID) && (null == getConfigAdvertisement())) {
965 setConfigAdvertisement(group_configs.remove(assignedID));
969 // FIXME 20030919 bondolo@jxta.org This setup doesnt give us any
970 // capability to use seed material or parent group.
971 if (null == assignedID) {
972 if ("cbid".equals(IDFactory.getDefaultIDFormat())) {
973 throw new IllegalStateException("Cannot generate group id for cbid group");
975 assignedID = IDFactory.newPeerGroupID();
978 if (parentGroup != null) {
979 DiscoveryService disco = parentGroup.getDiscoveryService();
981 Enumeration found = disco.getLocalAdvertisements(DiscoveryService.GROUP, "GID", assignedID.toString());
982 if (found.hasMoreElements()) {
983 peerGroupAdvertisement = (PeerGroupAdvertisement) found.nextElement();
989 if (!(assignedID instanceof PeerGroupID)) {
990 throw new PeerGroupException("assignedID must be a peer group ID");
993 peerAdvertisement.setPeerGroupID((PeerGroupID) assignedID);
995 // // make sure the parent group is the required group
996 // if (null != peerAdvertisement.getPeerGroupID().getParentPeerGroupID()) {
997 // if (null == parentGroup) {
998 // throw new PeerGroupException("Group requires parent group : " + peerAdvertisement.getPeerGroupID().getParentPeerGroupID());
999 // } else if (!parentGroup.getPeerGroupID().equals(peerAdvertisement.getPeerGroupID().getParentPeerGroupID())) {
1000 // throw new PeerGroupException("Group requires parent group : " + peerAdvertisement.getPeerGroupID().getParentPeerGroupID() + ". Provided parent was : " + parentGroup.getPeerGroupID());
1004 // Do our part of the PeerAdv construction.
1005 if ((configAdvertisement != null) && (configAdvertisement instanceof PlatformConfig)) {
1006 PlatformConfig platformConfig = (PlatformConfig) configAdvertisement;
1008 // Normally there will be a peer ID and a peer name in the config.
1009 PeerID configPID = platformConfig.getPeerID();
1011 if ((null == configPID) || (ID.nullID == configPID)) {
1012 if ("cbid".equals(IDFactory.getDefaultIDFormat())) {
1013 // Get our peer-defined parameters in the configAdv
1014 XMLElement param = (XMLElement) platformConfig.getServiceParam(PeerGroup.membershipClassID);
1016 if (null == param) {
1017 throw new IllegalArgumentException(PSEConfigAdv.getAdvertisementType() + " could not be located");
1020 Advertisement paramsAdv = null;
1022 paramsAdv = AdvertisementFactory.newAdvertisement(param);
1023 } catch (NoSuchElementException noadv) {// ignored
1025 if (!(paramsAdv instanceof PSEConfigAdv)) {
1026 throw new IllegalArgumentException(
1027 "Provided Advertisement was not a " + PSEConfigAdv.getAdvertisementType());
1030 PSEConfigAdv config = (PSEConfigAdv) paramsAdv;
1031 Certificate clientRoot = config.getCertificate();
1032 byte[] pub_der = clientRoot.getPublicKey().getEncoded();
1034 platformConfig.setPeerID(IDFactory.newPeerID((PeerGroupID) assignedID, pub_der));
1036 platformConfig.setPeerID(IDFactory.newPeerID((PeerGroupID) assignedID));
1040 peerAdvertisement.setPeerID(platformConfig.getPeerID());
1041 peerAdvertisement.setName(platformConfig.getName());
1042 peerAdvertisement.setDesc(platformConfig.getDesc());
1044 if (null == parentGroup) {
1045 // If we did not get a valid peer id, we'll initialize it here.
1046 peerAdvertisement.setPeerID(IDFactory.newPeerID((PeerGroupID) assignedID));
1048 // We're not the world peer group, which is the authoritative source of these values.
1049 peerAdvertisement.setPeerID(parentGroup.getPeerAdvertisement().getPeerID());
1050 peerAdvertisement.setName(parentGroup.getPeerAdvertisement().getName());
1051 peerAdvertisement.setDesc(parentGroup.getPeerAdvertisement().getDesc());
1055 if (peerGroupAdvertisement == null) {
1056 // No existing gadv. OK then we're creating the group or we're
1057 // the platform, it seems. Start a grp adv with the essentials
1059 peerGroupAdvertisement = (PeerGroupAdvertisement)
1060 AdvertisementFactory.newAdvertisement(PeerGroupAdvertisement.getAdvertisementType());
1062 peerGroupAdvertisement.setPeerGroupID((PeerGroupID) assignedID);
1063 peerGroupAdvertisement.setModuleSpecID(implAdvertisement.getModuleSpecID());
1068 // If we still do not have a config adv, make one with the minimal info in it.
1069 // All groups but the Platform and the netPG are in that case.
1070 // In theory a plain ConfigParams would be enough for subgroups. But for now
1071 // GenericPeerGroup always has a full Platformconfig and there is no other concrete
1072 // ConfigParams subclass.
1073 if (configAdvertisement == null) {
1074 PlatformConfig conf = (PlatformConfig) AdvertisementFactory.newAdvertisement(PlatformConfig.getAdvertisementType());
1076 conf.setPeerID(peerAdvertisement.getPeerID());
1077 conf.setName(peerAdvertisement.getName());
1078 conf.setDesc(peerAdvertisement.getDesc());
1079 configAdvertisement = conf;
1082 // Merge service params with those specified by the group (if any). The only
1083 // policy, right now, is to give peer params the precedence over group params.
1084 Hashtable grpParams = peerGroupAdvertisement.getServiceParams();
1085 Enumeration keys = grpParams.keys();
1087 while (keys.hasMoreElements()) {
1088 ID key = (ID) keys.nextElement();
1089 Element e = (Element) grpParams.get(key);
1091 if (configAdvertisement.getServiceParam(key) == null) {
1092 configAdvertisement.putServiceParam(key, e);
1097 * Now seems like the right time to attempt to register the group.
1098 * The only trouble is that it could cause the group to
1099 * be used before all the services are initialized, but on the
1100 * other hand, we do not want to let a redundant group go through
1101 * it's service initialization because that would cause irreparable
1102 * damage to the legitimate instance. There should be a synchro on
1103 * on the get<service>() and lookupService() routines.
1105 if (!globalRegistry.registerInstance((PeerGroupID) assignedID, this)) {
1106 throw new PeerGroupException("Group already instantiated");
1108 } catch (Throwable any) {
1109 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
1110 LOG.log(Level.SEVERE, "Group init failed", any);
1113 if (any instanceof Error) {
1115 } else if (any instanceof RuntimeException) {
1116 throw (RuntimeException) any;
1117 } else if (any instanceof PeerGroupException) {
1118 throw (PeerGroupException) any;
1121 throw new PeerGroupException("Group init failed", any);
1124 ThreadGroup parentThreadGroup = (null != this.parentGroup)
1125 ? parentGroup.getHomeThreadGroup()
1126 : Thread.currentThread().getThreadGroup();
1128 threadGroup = new ThreadGroup(parentThreadGroup, "Group " + peerGroupAdvertisement.getPeerGroupID());
1130 taskQueue = new ArrayBlockingQueue<Runnable>(COREPOOLSIZE * 2);
1131 threadPool = new ThreadPoolExecutor(COREPOOLSIZE, MAXPOOLSIZE,
1132 KEEPALIVETIME, TimeUnit.SECONDS,
1134 new PeerGroupThreadFactory("Executor", getHomeThreadGroup()),
1135 new CallerBlocksPolicy());
1137 // Try to allow core threads to idle out. (Requires a 1.6 method)
1139 Method allowCoreThreadTimeOut = threadPool.getClass().getMethod("allowCoreThreadTimeOut", boolean.class);
1141 allowCoreThreadTimeOut.invoke(threadPool, Boolean.TRUE);
1142 } catch(Throwable ohWell) {
1143 // Our attempt failed.
1144 if (Logging.SHOW_FINEST && LOG.isLoggable(Level.FINEST)) {
1145 LOG.log(Level.FINEST, "Failed to enable 'allowCoreThreadTimeOut'", ohWell);
1149 scheduledExecutor = new ScheduledThreadPoolExecutor(1,
1150 new PeerGroupThreadFactory("Scheduled Executor", getHomeThreadGroup()));
1153 * The rest of construction and initialization are left to the
1154 * group subclass, between here and the begining for initLast.
1155 * That should include instanciating and setting the endpoint, and
1156 * finally supplying it with endpoint protocols.
1157 * That also includes instanciating the appropriate services
1158 * and registering them.
1159 * For an example, see the StdPeerGroup class.
1164 * Perform all initialization steps that need to be performed
1165 * after any subclass initialization is performed.
1167 * Classes that override this method should always call super.initLast
1168 * <strong>after</strong> doing any of their own work.
1169 * @throws PeerGroupException if a group initialization error occurs
1171 protected void initLast() throws PeerGroupException {
1172 if (Logging.SHOW_CONFIG && LOG.isLoggable(Level.CONFIG)) {
1173 StringBuilder configInfo = new StringBuilder("Configuring Group : " + getPeerGroupID());
1175 if (implAdvertisement != null) {
1176 configInfo.append("\n\tImplementation :");
1177 configInfo.append("\n\t\tModule Spec ID: ").append(implAdvertisement.getModuleSpecID());
1178 configInfo.append("\n\t\tImpl Description : ").append(implAdvertisement.getDescription());
1179 configInfo.append("\n\t\tImpl URI : ").append(implAdvertisement.getUri());
1180 configInfo.append("\n\t\tImpl Code : ").append(implAdvertisement.getCode());
1182 configInfo.append("\n\tGroup Params :");
1183 configInfo.append("\n\t\tModule Spec ID : ").append(implAdvertisement.getModuleSpecID());
1184 configInfo.append("\n\t\tPeer Group ID : ").append(getPeerGroupID());
1185 configInfo.append("\n\t\tGroup Name : ").append(getPeerGroupName());
1186 configInfo.append("\n\t\tPeer ID in Group : ").append(getPeerID());
1187 configInfo.append("\n\tConfiguration :");
1188 if (null == parentGroup) {
1189 configInfo.append("\n\t\tHome Group : (none)");
1191 configInfo.append("\n\t\tHome Group : \"").append(parentGroup.getPeerGroupName()).append("\" / ").append(
1192 parentGroup.getPeerGroupID());
1194 configInfo.append("\n\t\tServices :");
1195 for (Map.Entry<ModuleClassID, Service> anEntry : services.entrySet()) {
1196 ModuleClassID aMCID = anEntry.getKey();
1197 ModuleImplAdvertisement anImplAdv = (ModuleImplAdvertisement) anEntry.getValue().getImplAdvertisement();
1199 configInfo.append("\n\t\t\t").append(aMCID).append("\t").append(anImplAdv.getDescription());
1201 LOG.config(configInfo.toString());
1208 public int startApp(String[] arg) {
1209 return Module.START_OK;
1215 * PeerGroupInterface's stopApp() does nothing. Only a real reference to the
1216 * group object permits to stop it without going through ref counting.
1218 public void stopApp() {
1221 Collection<ModuleClassID> allServices = new ArrayList<ModuleClassID>(services.keySet());
1223 // Stop and remove all remaining services.
1224 for (ModuleClassID aService : allServices) {
1226 removeService(aService);
1227 } catch (Exception failure) {
1228 LOG.log(Level.WARNING, "Failure shutting down service : " + aService, failure);
1232 if (!services.isEmpty()) {
1233 LOG.warning(services.size() + " services could not be shut down during peer group stop.");
1236 // remove everything (just in case);
1239 globalRegistry.unRegisterInstance(peerGroupAdvertisement.getPeerGroupID(), this);
1241 // Explicitly unreference our parent group in order to allow it
1242 // to terminate if this group object was itself the last reference
1244 if (parentGroup != null) {
1245 parentGroup.unref();
1249 // shutdown the threadpool
1250 threadPool.shutdownNow();
1251 scheduledExecutor.shutdownNow();
1253 // No longer initialized.
1254 initComplete = false;
1260 * May be called by a module which has a direct reference to the group
1261 * object and wants to notify its abandoning it. Has no effect on the real
1264 public void unref() {}
1267 * Called every time an interface object that refers to this group
1268 * goes away, either by being finalized or by its unref() method being
1269 * invoked explicitly.
1271 protected void decRefCount() {
1272 synchronized (this) {
1275 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
1276 Throwable trace = new Throwable("Stack Trace");
1277 StackTraceElement elements[] = trace.getStackTrace();
1278 LOG.info("[" + getPeerGroupID() + "] GROUP REF COUNT DECCREMENTED TO: " + masterRefCount + " by\n\t" + elements[2]);
1281 if (masterRefCount != 0) {
1285 if (!stopWhenUnreferenced) {
1290 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
1291 LOG.info("[" + getPeerGroupID() + "] STOPPING UNREFERENCED GROUP");
1295 masterRefCount = Integer.MIN_VALUE;
1299 * Implement the Service API so that we can make groups services when we
1306 public Service getInterface() {
1307 synchronized (this) {
1310 if (masterRefCount < 1) {
1311 throw new IllegalStateException("Group has been shutdown. getInterface() is not available");
1314 if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
1315 Throwable trace = new Throwable("Stack Trace");
1316 StackTraceElement elements[] = trace.getStackTrace();
1318 LOG.info("[" + getPeerGroupID() + "] GROUP REF COUNT INCREMENTED TO: " + masterRefCount + " by\n\t" + elements[2]);
1322 // If init is complete the group can become sensitive to
1323 // its ref count reaching zero. Before there could be
1324 // transient references before there is a chance to give
1325 // a permanent reference to the invoker of newGroup.
1326 stopWhenUnreferenced = true;
1330 return new RefCountPeerGroupInterface(this);
1336 public PeerGroup getWeakInterface() {
1337 return new PeerGroupInterface(this);
1343 public ModuleImplAdvertisement getImplAdvertisement() {
1344 return implAdvertisement.clone();
1350 public void publishGroup(String name, String description) throws IOException {
1356 peerGroupAdvertisement.setName(name);
1357 peerGroupAdvertisement.setDescription(description);
1359 if (parentGroup == null) {
1363 DiscoveryService parentDiscovery = parentGroup.getDiscoveryService();
1365 if (null == parentDiscovery) {
1369 parentDiscovery.publish(peerGroupAdvertisement, DEFAULT_LIFETIME, DEFAULT_EXPIRATION);
1376 public PeerGroup newGroup(Advertisement pgAdv) throws PeerGroupException {
1378 PeerGroupAdvertisement adv = (PeerGroupAdvertisement) pgAdv;
1379 PeerGroupID gid = adv.getPeerGroupID();
1381 if ((gid == null) || ID.nullID.equals(gid)) {
1382 throw new IllegalArgumentException("Advertisement did not contain a peer group ID");
1385 PeerGroup theNewGroup = globalRegistry.lookupInstance(gid);
1387 if (theNewGroup != null) {
1391 // We do not know if the grp adv had been previously published or not... Since it may contain information essential to
1392 // the configuration of services, we need to make sure it is published localy, rather than letting the group publish
1393 // itself after the fact.
1395 // FIXME jice@jxta.org 20040713 : The downside is that we're publishing the adv even before making sure that this group
1396 // can really be instantiated. We're basically using the cm as a means to pass parameters to the module because it is a
1397 // group. We have the same parameter issue with the config adv. Eventually we need to find a clean way of passing
1398 // parameters specific to a certain types of module.
1401 discovery.publish(adv, DEFAULT_LIFETIME, DEFAULT_EXPIRATION);
1402 } catch (Exception any) {
1403 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1404 LOG.log(Level.WARNING, "Could not publish the group advertisement: ", any);
1408 theNewGroup = (PeerGroup) loadModule(adv.getPeerGroupID(), adv.getModuleSpecID(), Here, false);
1410 if (theNewGroup == null) {
1411 throw new PeerGroupException("Could not find group implementation with " + adv.getModuleSpecID());
1414 return (PeerGroup) theNewGroup.getInterface();
1420 public PeerGroup newGroup(PeerGroupID gid, Advertisement impl, String name, String description) throws PeerGroupException {
1421 PeerGroup theNewGroup = null;
1424 theNewGroup = globalRegistry.lookupInstance(gid);
1427 if (theNewGroup != null) {
1432 theNewGroup = (PeerGroup) loadModule(gid, (ModuleImplAdvertisement) impl, false);
1433 } catch (Throwable any) {
1434 if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
1435 LOG.log(Level.SEVERE, "Could not load group implementation", any);
1438 throw new PeerGroupException("Could not load group implementation", any);
1442 // The group adv definitely needs to be published.
1443 theNewGroup.publishGroup(name, description);
1444 } catch (Exception any) {
1445 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
1446 LOG.log(Level.WARNING, "Could not publish group or implementation:", any);
1449 return (PeerGroup) theNewGroup.getInterface();
1455 public PeerGroup newGroup(PeerGroupID gid) throws PeerGroupException {
1457 if ((gid == null) || ID.nullID.equals(gid)) {
1458 throw new IllegalArgumentException("Invalid peer group ID");
1461 PeerGroup result = globalRegistry.lookupInstance(gid);
1463 if (result != null) {
1467 PeerGroupAdvertisement adv;
1470 adv = (PeerGroupAdvertisement)
1471 discoverOne(DiscoveryService.GROUP, "GID", gid.toString(), 120, PeerGroupAdvertisement.class);
1472 } catch (Throwable any) {
1473 throw new PeerGroupException("Failed finding group advertisement for " + gid, any);
1477 throw new PeerGroupException("Could not find group advertisement for group " + gid);
1480 return newGroup(adv);
1486 public JxtaLoader getLoader() {
1493 public String getPeerName() {
1494 // before init we must fail.
1495 if (null == peerAdvertisement) {
1496 throw new IllegalStateException("PeerGroup not sufficiently initialized");
1498 return peerAdvertisement.getName();
1504 public String getPeerGroupName() {
1505 // before init we must fail.
1506 if (null == peerGroupAdvertisement) {
1507 throw new IllegalStateException("PeerGroup not sufficiently initialized");
1509 return peerGroupAdvertisement.getName();
1515 public PeerGroupID getPeerGroupID() {
1516 // before init we must fail.
1517 if (null == peerGroupAdvertisement) {
1518 throw new IllegalStateException("PeerGroup not sufficiently initialized");
1521 return peerGroupAdvertisement.getPeerGroupID();
1527 public PeerID getPeerID() {
1528 // before init we must fail.
1529 if (null == peerAdvertisement) {
1530 throw new IllegalStateException("PeerGroup not sufficiently initialized");
1532 return peerAdvertisement.getPeerID();
1538 public PeerAdvertisement getPeerAdvertisement() {
1539 return peerAdvertisement;
1545 public PeerGroupAdvertisement getPeerGroupAdvertisement() {
1546 return peerGroupAdvertisement;
1552 public boolean isRendezvous() {
1553 if (rendezvous == null) {
1554 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
1555 LOG.fine("Rendezvous service null");
1558 return (rendezvous != null) && rendezvous.isRendezVous();
1562 * shortcuts to the well-known services, in order to avoid calls to lookup.
1568 public EndpointService getEndpointService() {
1569 if (endpoint == null) {
1572 return (EndpointService) endpoint.getInterface();
1578 public ResolverService getResolverService() {
1579 if (resolver == null) {
1582 return (ResolverService) resolver.getInterface();
1588 public DiscoveryService getDiscoveryService() {
1589 if (discovery == null) {
1592 return (DiscoveryService) discovery.getInterface();
1598 public PeerInfoService getPeerInfoService() {
1599 if (peerinfo == null) {
1602 return (PeerInfoService) peerinfo.getInterface();
1608 public MembershipService getMembershipService() {
1609 if (membership == null) {
1612 return (MembershipService) membership.getInterface();
1618 public PipeService getPipeService() {
1622 return (PipeService) pipe.getInterface();
1628 public RendezVousService getRendezVousService() {
1629 if (rendezvous == null) {
1632 return (RendezVousService) rendezvous.getInterface();
1638 public AccessService getAccessService() {
1639 if (access == null) {
1642 return (AccessService) access.getInterface();
1646 * Returns the executor pool
1648 * @return the executor pool
1650 public Executor getExecutor() {
1655 * Returns the scheduled executor. The
1657 * @return the scheduled executor
1659 public ScheduledExecutorService getScheduledExecutor() {
1660 // FIXME 20070815 bondolo We should return a proxy object to disable shutdown()
1661 return scheduledExecutor;
1665 * Our rejected execution handler which has the effect of pausing the
1666 * caller until the task can be queued.
1668 private static class CallerBlocksPolicy implements RejectedExecutionHandler {
1670 private CallerBlocksPolicy() {
1676 public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
1677 BlockingQueue<Runnable> queue = executor.getQueue();
1679 while (!executor.isShutdown()) {
1683 boolean pushed = queue.offer(runnable, 500, TimeUnit.MILLISECONDS);
1688 } catch (InterruptedException woken) {
1689 throw new RejectedExecutionException("Interrupted while attempting to enqueue", woken);
1696 * Our thread factory that adds the threads to our thread group and names
1697 * the thread to something recognizable.
1699 static class PeerGroupThreadFactory implements ThreadFactory {
1700 final AtomicInteger threadNumber = new AtomicInteger(1);
1702 final ThreadGroup threadgroup;
1704 PeerGroupThreadFactory(String name, ThreadGroup threadgroup) {
1706 this.threadgroup = threadgroup;
1709 public Thread newThread(Runnable runnable) {
1710 Thread thread = new Thread(threadgroup, runnable, name + " - " + threadNumber.getAndIncrement(), 0);
1711 if(thread.isDaemon()) {
1712 thread.setDaemon(false);
1714 if (thread.getPriority() != Thread.NORM_PRIORITY) {
1715 thread.setPriority(Thread.NORM_PRIORITY);