2 * Copyright (c) 2001-2007 Sun Microsystems, Inc. All rights reserved.
4 * The Sun Project JXTA(TM) Software License
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
9 * 1. Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
16 * 3. The end-user documentation included with the redistribution, if any, must
17 * include the following acknowledgment: "This product includes software
18 * developed by Sun Microsystems, Inc. for JXTA(TM) technology."
19 * Alternately, this acknowledgment may appear in the software itself, if
20 * and wherever such third-party acknowledgments normally appear.
22 * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must
23 * not be used to endorse or promote products derived from this software
24 * without prior written permission. For written permission, please contact
25 * Project JXTA at http://www.jxta.org.
27 * 5. Products derived from this software may not be called "JXTA", nor may
28 * "JXTA" appear in their name, without prior written permission of Sun.
30 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
31 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
32 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN
33 * MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
34 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
36 * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
37 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
38 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
39 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41 * JXTA is a registered trademark of Sun Microsystems, Inc. in the United
42 * States and other countries.
44 * Please see the license information page at :
45 * <http://www.jxta.org/project/www/license.html> for instructions on use of
46 * the license in source files.
48 * ====================================================================
50 * This software consists of voluntary contributions made by many individuals
51 * on behalf of Project JXTA. For more information on Project JXTA, please see
52 * http://www.jxta.org.
54 * This license is based on the BSD license adopted by the Apache Foundation.
56 package net.jxta.impl.peer;
58 import java.util.Enumeration;
59 import java.util.Iterator;
60 import java.util.Hashtable;
61 import java.util.Random;
62 import java.util.Timer;
63 import java.util.TimerTask;
64 import java.util.logging.Logger;
65 import java.util.logging.Level;
66 import net.jxta.logging.Logging;
68 import net.jxta.document.Element;
69 import net.jxta.protocol.PeerInfoQueryMessage;
70 import net.jxta.protocol.PeerInfoResponseMessage;
71 import net.jxta.peergroup.PeerGroup;
72 import net.jxta.exception.PeerGroupException;
73 import net.jxta.peer.PeerInfoService;
74 import net.jxta.peergroup.PeerGroupID;
76 import net.jxta.meter.MonitorEvent;
77 import net.jxta.meter.MonitorException;
78 import net.jxta.meter.MonitorFilter;
79 import net.jxta.meter.MonitorFilterException;
80 import net.jxta.meter.MonitorListener;
81 import net.jxta.meter.MonitorReport;
82 import net.jxta.meter.MonitorResources;
83 import net.jxta.meter.PeerMonitorInfo;
84 import net.jxta.meter.PeerMonitorInfoEvent;
85 import net.jxta.meter.PeerMonitorInfoListener;
86 import net.jxta.meter.ServiceMonitorFilter;
87 import net.jxta.peer.PeerID;
88 import net.jxta.util.documentSerializable.DocumentSerializableUtilities;
89 import net.jxta.util.documentSerializable.DocumentSerializationException;
90 import net.jxta.impl.util.TimerThreadNamer;
92 class RemoteMonitorPeerInfoHandler implements PeerInfoHandler {
93 public static final String MONITOR_HANDLER_NAME = "Monitor";
94 public static final int MAX_LEASE = 5 * 60 * 1000; // 5 minutes
95 public static final int MIN_LEASE = 60 * 1000; // 1 minute
97 private static final Random rand = new Random();
98 private final static Logger LOG = Logger.getLogger(RemoteMonitorPeerInfoHandler.class.getName());
100 private Hashtable<Integer, RequestInfo> requestInfos = new Hashtable<Integer, RequestInfo>();
101 private Hashtable<Integer, LeaseInfo> leaseInfos = new Hashtable<Integer, LeaseInfo>();
102 private Hashtable<Integer, Long> timeouts = new Hashtable<Integer, Long>();
103 private PeerGroup peerGroup;
104 private PeerInfoServiceImpl peerInfoServiceImpl;
105 private Timer timer = new Timer(true);
107 RemoteMonitorPeerInfoHandler(PeerGroup peerGroup, PeerInfoServiceImpl peerInfoServiceImpl) {
108 this.peerGroup = peerGroup;
109 this.peerInfoServiceImpl = peerInfoServiceImpl;
110 timer.schedule(new TimerThreadNamer("RemoteMonitorPeerInfo timer for " + peerGroup.getPeerGroupID()), 0);
117 private int getNextLeaseId() {
119 synchronized (rand) {
120 id = rand.nextInt(Integer.MAX_VALUE);
125 private class RequestInfo {
126 long requestTime = System.currentTimeMillis();
130 MonitorListener monitorListener;
131 PeerMonitorInfoListener peerMonitorInfoListener;
134 boolean responseReceived = false;
135 int leaseId; // other guys leaseId.
137 PeerInfoMessenger peerInfoMessenger;
138 RequestInfo(PeerID peerId, int queryId, MonitorListener monitorListener, long timeout, PeerInfoMessenger peerInfoMessenger) {
139 this(peerId, queryId, timeout, peerInfoMessenger);
140 this.monitorListener = monitorListener;
143 RequestInfo(PeerID peerId, int queryId, PeerMonitorInfoListener peerMonitorInfoListener, long timeout, PeerInfoMessenger peerInfoMessenger) {
144 this(peerId, queryId, timeout, peerInfoMessenger);
145 this.peerMonitorInfoListener = peerMonitorInfoListener;
148 RequestInfo(PeerID peerId, int queryId, long timeout, PeerInfoMessenger peerInfoMessenger) {
149 this.peerId = peerId;
150 this.queryId = queryId;
151 this.timeout = timeout;
152 this.peerInfoMessenger = peerInfoMessenger;
153 this.validUntil = System.currentTimeMillis() + timeout;
158 private class LeaseInfo {
160 PeerID peerID; // Peer that requested the lease
161 int queryId; // The other guy's query Id
162 MonitorListener monitorListener;
164 boolean listenerAddedToWorldGroup = false;
165 PeerGroup worldGroup;
166 PeerInfoMessenger peerInfoMessenger;
167 LeaseInfo(int leaseId, PeerID peerID, int queryId, MonitorListener monitorListener, long leaseLength, PeerInfoMessenger peerInfoMessenger) {
168 this.leaseId = leaseId;
169 this.peerID = peerID;
170 this.queryId = queryId;
171 this.monitorListener = monitorListener;
172 this.peerInfoMessenger = peerInfoMessenger;
173 validUntil = System.currentTimeMillis() + leaseLength;
178 public void getPeerMonitorInfo(final PeerID peerID, PeerMonitorInfoListener peerMonitorInfoListener, long timeout, PeerInfoMessenger peerInfoMessenger) throws MonitorException {
179 int queryId = peerInfoServiceImpl.getNextQueryId();
181 RemoteMonitorQuery remoteMonitorQuery = RemoteMonitorQuery.createPeerMonitorInfoQuery();
182 peerInfoMessenger.sendPeerInfoRequest(queryId, peerID, MONITOR_HANDLER_NAME, remoteMonitorQuery);
183 final RequestInfo requestInfo = new RequestInfo(peerID, queryId, peerMonitorInfoListener, timeout, peerInfoMessenger);
184 requestInfos.put(queryId, requestInfo);
185 timer.schedule(new TimerTask() {
188 if (!requestInfo.responseReceived) {
189 PeerMonitorInfoEvent peerMonitorInfoEvent = new PeerMonitorInfoEvent(peerID, null);
191 requestInfo.peerMonitorInfoListener.peerMonitorInfoNotReceived(peerMonitorInfoEvent);
192 requestInfos.remove(requestInfo.queryId);
198 public void getCumulativeMonitorReport(PeerID peerID, MonitorFilter monitorFilter, MonitorListener monitorListener, long timeout, PeerInfoMessenger peerInfoMessenger) throws MonitorException {
199 int queryId = peerInfoServiceImpl.getNextQueryId();
200 RemoteMonitorQuery remoteMonitorQuery = RemoteMonitorQuery.createGetCumulativeReportQuery(monitorFilter);
201 peerInfoMessenger.sendPeerInfoRequest(queryId, peerID, MONITOR_HANDLER_NAME, remoteMonitorQuery);
202 final RequestInfo requestInfo = new RequestInfo(peerID, queryId, monitorListener, timeout, peerInfoMessenger);
203 requestInfos.put(queryId, requestInfo);
204 timer.schedule(new TimerTask() {
207 if (!requestInfo.responseReceived) {
208 requestInfos.remove(requestInfo.queryId);
214 public void addRemoteMonitorListener(PeerID peerID, MonitorFilter monitorFilter, long reportRate, boolean includeCumulative, MonitorListener monitorListener, long lease, long timeout, PeerInfoMessenger peerInfoMessenger) throws MonitorException {
215 int queryId = peerInfoServiceImpl.getNextQueryId();
216 RemoteMonitorQuery remoteMonitorQuery = RemoteMonitorQuery.createRegisterMonitorQuery(includeCumulative, monitorFilter, reportRate, lease);
217 peerInfoMessenger.sendPeerInfoRequest(queryId, peerID, MONITOR_HANDLER_NAME, remoteMonitorQuery);
218 final RequestInfo requestInfo = new RequestInfo(peerID, queryId, monitorListener, timeout, peerInfoMessenger);
219 requestInfo.requestedLease = lease;
220 requestInfos.put(queryId, requestInfo);
221 timer.schedule(new TimerTask() {
224 if (!requestInfo.responseReceived) {
225 MonitorEvent monitorEvent = MonitorEvent.createFailureEvent(MonitorEvent.TIMEOUT, requestInfo.peerId,
226 requestInfo.queryId);
228 requestInfo.monitorListener.monitorRequestFailed(monitorEvent);
229 requestInfos.remove(requestInfo.queryId);
233 scheduleTimeout(requestInfo);
236 public void removeRemoteMonitorListener(PeerID peerID, MonitorListener monitorListener, long timeout, PeerInfoMessenger peerInfoMessenger) throws MonitorException {
237 int queryId = peerInfoServiceImpl.getNextQueryId();
239 RequestInfo oldRequestInfo = null;
240 for (Enumeration<RequestInfo> e = requestInfos.elements(); e.hasMoreElements();) {
241 RequestInfo ri = e.nextElement();
243 if (ri.monitorListener == monitorListener) {
249 if (oldRequestInfo != null) {
250 RemoteMonitorQuery remoteMonitorQuery = RemoteMonitorQuery.createRemoveMonitorListenerQuery(oldRequestInfo.leaseId);
251 peerInfoMessenger.sendPeerInfoRequest(queryId, peerID, MONITOR_HANDLER_NAME, remoteMonitorQuery);
252 final RequestInfo requestInfo = new RequestInfo(peerID, queryId, monitorListener, timeout, peerInfoMessenger);
253 requestInfo.origRequestId = oldRequestInfo.queryId;
254 requestInfos.put(queryId, requestInfo);
257 final RequestInfo requestInfo = oldRequestInfo;
258 timer.schedule(new TimerTask() {
262 requestInfos.remove(new Integer(requestInfo.queryId));
267 public void removeRemoteMonitorListener(MonitorListener monitorListener, long timeout, PeerInfoMessenger peerInfoMessenger) throws MonitorException {
268 for (Enumeration<RequestInfo> e = requestInfos.elements(); e.hasMoreElements();) {
269 RequestInfo requestInfo = e.nextElement();
271 if (requestInfo.monitorListener == monitorListener) {
272 removeRemoteMonitorListener(requestInfo.peerId, monitorListener, timeout, peerInfoMessenger);
277 public void processRequest(int queryId, PeerID requestSourceID, PeerInfoQueryMessage peerInfoQueryMessage, Element requestElement, PeerInfoMessenger peerInfoMessenger) {
279 RemoteMonitorQuery remoteMonitorQuery = (RemoteMonitorQuery) DocumentSerializableUtilities.getDocumentSerializable(requestElement, RemoteMonitorQuery.class);
281 if (remoteMonitorQuery.isRegisterMonitorQuery()) {
282 handleRegisterMonitorQuery(queryId, requestSourceID, remoteMonitorQuery, peerInfoMessenger);
284 } else if (remoteMonitorQuery.isCumulativeReportQuery()) {
285 handleCumulativeReportQuery(queryId, requestSourceID, remoteMonitorQuery.getMonitorFilter(), peerInfoMessenger);
287 } else if (remoteMonitorQuery.isRemoveMonitorQuery()) {
288 handleRemoveMonitorQuery(queryId, requestSourceID, remoteMonitorQuery, peerInfoMessenger);
290 } else if (remoteMonitorQuery.isPeerMonitorInfoQuery()) {
291 handlePeerMonitorInfoQuery(queryId, requestSourceID, peerInfoMessenger);
293 } else if (remoteMonitorQuery.isLeaseRenewal()) {
294 handleLeaseRenewalQuery(queryId, requestSourceID, remoteMonitorQuery, peerInfoMessenger);
297 } catch (Exception e) {
298 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
299 LOG.log(Level.FINE, "Monitor failed in processQuery", e);
304 public void processResponse(int queryId, PeerInfoResponseMessage peerInfoResponseMessage, Element responseElement, PeerInfoMessenger peerInfoMessenger) {
306 RemoteMonitorResponse remoteMonitorResponse;
309 remoteMonitorResponse = (RemoteMonitorResponse) DocumentSerializableUtilities.getDocumentSerializable(responseElement, RemoteMonitorResponse.class);
310 RequestInfo requestInfo = requestInfos.get(new Integer(queryId));
312 if (requestInfo != null) {
313 requestInfo.responseReceived = true;
315 resetTimeout(requestInfo);
316 if (remoteMonitorResponse.isMonitorRegistered()) {
317 int leaseId = remoteMonitorResponse.getLeaseId();
318 long leaseLength = remoteMonitorResponse.getLease();
320 requestInfo.leaseId = leaseId;
321 scheduleLeaseRenewal(requestInfo, leaseLength);
323 } else if (remoteMonitorResponse.isMonitorRemoved()) {
324 requestInfos.remove(new Integer(requestInfo.origRequestId));
325 requestInfos.remove(new Integer(queryId));
327 } else if (remoteMonitorResponse.isCumulativeReport() || remoteMonitorResponse.isMonitorReport()) {
328 MonitorReport monitorReport = remoteMonitorResponse.getMonitorReport();
329 MonitorEvent monitorEvent = MonitorEvent.createRemoteMonitorReportEvent(requestInfo.peerId, requestInfo.queryId, monitorReport);
330 requestInfo.monitorListener.processMonitorReport(monitorEvent);
331 } else if (remoteMonitorResponse.isInvalidFilter()) {
332 MonitorEvent monitorEvent = MonitorEvent.createFailureEvent(MonitorEvent.INVALID_MONITOR_FILTER, requestInfo.peerId, requestInfo.queryId);
333 requestInfo.monitorListener.monitorRequestFailed(monitorEvent);
334 requestInfos.remove(new Integer(queryId));
335 } else if (remoteMonitorResponse.isInvalidReportRate()) {
336 MonitorEvent monitorEvent = MonitorEvent.createFailureEvent(MonitorEvent.INVALID_REPORT_RATE, requestInfo.peerId, requestInfo.queryId);
337 requestInfo.monitorListener.monitorRequestFailed(monitorEvent);
338 requestInfos.remove(new Integer(queryId));
339 } else if (remoteMonitorResponse.isMeteringNotSupported()) {
340 MonitorEvent monitorEvent = MonitorEvent.createFailureEvent(MonitorEvent.REFUSED, requestInfo.peerId, requestInfo.queryId);
341 requestInfo.monitorListener.monitorRequestFailed(monitorEvent);
342 requestInfos.remove(new Integer(queryId));
343 } else if (remoteMonitorResponse.isRequestDenied()) {
344 MonitorEvent monitorEvent = MonitorEvent.createFailureEvent(MonitorEvent.REFUSED, requestInfo.peerId, requestInfo.queryId);
345 requestInfo.monitorListener.monitorRequestFailed(monitorEvent);
346 } else if (remoteMonitorResponse.isPeerMonitorInfo()) {
347 PeerMonitorInfoEvent peerMonitorInfoEvent = new PeerMonitorInfoEvent(requestInfo.peerId, remoteMonitorResponse.getPeerMonitorInfo());
348 requestInfo.peerMonitorInfoListener.peerMonitorInfoReceived(peerMonitorInfoEvent);
349 requestInfos.remove(new Integer(queryId));
350 } else if (remoteMonitorResponse.isLeaseRenewed()) {
351 long lease = remoteMonitorResponse.getLease();
352 int origRequestId = requestInfo.origRequestId;
353 RequestInfo origRequest = requestInfos.get(new Integer(origRequestId));
354 scheduleLeaseRenewal(origRequest, lease);
355 requestInfos.remove(new Integer(queryId));
358 } catch (DocumentSerializationException e) {
359 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
360 LOG.log(Level.FINE, "Document Serialization Failed", e);
365 private void resetTimeout(RequestInfo requestInfo) {
366 timeouts.put(requestInfo.queryId, requestInfo.timeout + System.currentTimeMillis());
369 private long getTimeout(int queryId) {
370 return timeouts.get(queryId);
373 private void scheduleTimeout(final RequestInfo requestInfo) {
374 final int queryId = requestInfo.queryId;
380 if (requestInfos.containsKey(new Integer(queryId))) {
382 if (System.currentTimeMillis() > getTimeout(queryId)) {
383 MonitorEvent monitorEvent = MonitorEvent.createFailureEvent(MonitorEvent.TIMEOUT, requestInfo.peerId, queryId);
384 requestInfo.monitorListener.monitorRequestFailed(monitorEvent);
386 } catch (Exception e) {
393 }, requestInfo.timeout, requestInfo.timeout);
396 private void scheduleLeaseRenewal(RequestInfo requestInfo, long leaseLength) {
397 long roundTrip = requestInfo.requestTime - System.currentTimeMillis();
398 long renewTime = leaseLength - roundTrip - 30 * 1000L; // 30s comfort
400 final int queryId = requestInfo.queryId;
402 if (renewTime > MIN_LEASE) {
403 timer.schedule(new TimerTask() {
409 } catch (Exception e) {
410 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
411 LOG.log(Level.FINE, "Lease Renewal Failed", e);
419 private void handleRegisterMonitorQuery(final int queryId, final PeerID requestSourceID, RemoteMonitorQuery remoteMonitorQuery, final PeerInfoMessenger peerInfoMessenger) {
420 MonitorFilter monitorFilter = remoteMonitorQuery.getMonitorFilter();
421 long lease = remoteMonitorQuery.getLease();
422 long reportRate = remoteMonitorQuery.getReportRate();
423 boolean includeCumulative = remoteMonitorQuery.isIncludeCumulative();
425 MonitorListener monitorListener = new MonitorListener() {
427 public void processMonitorReport(MonitorEvent monitorEvent) {
428 MonitorReport monitorReport = monitorEvent.getMonitorReport();
430 RemoteMonitorResponse remoteMonitorResponse = RemoteMonitorResponse.createMonitorReportResponse(queryId, monitorReport);
431 peerInfoMessenger.sendPeerInfoResponse(queryId, requestSourceID, MONITOR_HANDLER_NAME, remoteMonitorResponse);
432 } catch (Exception e) {
433 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
434 LOG.fine(e.toString());
439 public void monitorReportingCancelled(MonitorEvent monitorEvent) {
440 throw new RuntimeException("METHOD NOT IMPLEMENTED");
443 public void monitorRequestFailed(MonitorEvent monitorEvent) {
444 throw new RuntimeException("METHOD NOT IMPLEMENTED");
448 int leaseId = getNextLeaseId();
449 final LeaseInfo leaseInfo = new LeaseInfo(leaseId, requestSourceID, queryId, monitorListener, lease, peerInfoMessenger);
450 long leaseTime = getLeaseTime(lease);
452 setupLeaseTimeout(leaseInfo.leaseId, leaseTime);
456 * Currently we can neither ask peers in the netgroup for transport
457 * metrics, nor discover peers in the world group. Therefore we're
458 * asking peers in the netgroup to send TransportMetrics, but that
459 * peer is actually attaching the MonitorFilter to it's WorldGroup
462 for (Iterator i = monitorFilter.getServiceMonitorFilters(); i.hasNext();) {
463 ServiceMonitorFilter serviceMonitorFilter = (ServiceMonitorFilter) i.next();
465 if (serviceMonitorFilter.getModuleClassID().equals(MonitorResources.transportServiceMonitorClassID)) {
467 MonitorFilter worldGroupFilter = new MonitorFilter("worldGroupFilter");
469 worldGroupFilter.addServiceMonitorFilter(serviceMonitorFilter);
471 PeerGroup worldGroup = peerGroup.newGroup(PeerGroupID.worldPeerGroupID);
472 PeerInfoService worldService = worldGroup.getPeerInfoService();
474 worldService.addMonitorListener(worldGroupFilter, remoteMonitorQuery.getReportRate(), includeCumulative,
476 leaseInfo.listenerAddedToWorldGroup = true;
477 leaseInfo.worldGroup = worldGroup;
478 } catch (PeerGroupException e) {
479 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
480 LOG.fine(e.toString());
485 if (monitorFilter.getServiceMonitorFilterCount() > 0) {
486 peerInfoServiceImpl.addMonitorListener(monitorFilter, reportRate, includeCumulative, monitorListener);
488 leaseInfos.put(leaseId, leaseInfo);
489 RemoteMonitorResponse remoteMonitorResponse = RemoteMonitorResponse.createMonitorRegisteredResponse(queryId, leaseId, leaseTime);
490 peerInfoMessenger.sendPeerInfoResponse(queryId, requestSourceID, MONITOR_HANDLER_NAME, remoteMonitorResponse);
491 } catch (MonitorFilterException e) {
492 RemoteMonitorResponse remoteMonitorResponse = RemoteMonitorResponse.createInvalidFilterResponse(queryId);
494 peerInfoMessenger.sendPeerInfoResponse(queryId, requestSourceID, MONITOR_HANDLER_NAME, remoteMonitorResponse);
495 } catch (MonitorException e) {
496 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
497 LOG.fine(e.toString());
502 private void handleRemoveMonitorQuery(int queryId, PeerID requestSourceID, RemoteMonitorQuery remoteMonitorQuery, PeerInfoMessenger peerInfoMessenger) {
504 int leaseId = remoteMonitorQuery.getLeaseId();
505 LeaseInfo leaseInfo = leaseInfos.get(new Integer(leaseId));
507 if (leaseInfo != null) {
508 MonitorListener monitorListener = leaseInfo.monitorListener;
510 peerInfoServiceImpl.removeMonitorListener(monitorListener);
511 if (leaseInfo.listenerAddedToWorldGroup) {
512 PeerInfoService peerInfoService = leaseInfo.worldGroup.getPeerInfoService();
513 peerInfoService.removeMonitorListener(monitorListener);
516 RemoteMonitorResponse remoteMonitorResponse = RemoteMonitorResponse.createMonitorRemovedResponse(queryId);
517 peerInfoMessenger.sendPeerInfoResponse(queryId, requestSourceID, MONITOR_HANDLER_NAME, remoteMonitorResponse);
519 } catch (MonitorException e) {
520 // Currently not thrown by MonitorManager.removeMonitorListener()
521 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
522 LOG.fine(e.toString());
527 private void handleCumulativeReportQuery(int queryId, PeerID requestSourceID, MonitorFilter monitorFilter, PeerInfoMessenger peerInfoMessenger) throws MonitorException, DocumentSerializationException {
528 MonitorReport monitorReport = peerInfoServiceImpl.getCumulativeMonitorReport(monitorFilter);
529 RemoteMonitorResponse remoteMonitorResponse = RemoteMonitorResponse.createCumulativeReportResponse(queryId, monitorReport);
530 peerInfoMessenger.sendPeerInfoResponse(queryId, requestSourceID, MONITOR_HANDLER_NAME, remoteMonitorResponse);
533 private void handlePeerMonitorInfoQuery(int queryId, PeerID requestSourceID, PeerInfoMessenger peerInfoMessenger) throws DocumentSerializationException {
535 /* Asking the NetGroup Peer won't tell me if it supports transport
536 * monitoring or not, but asking the world group guy gives me
537 * everything I need because as currently implemented you can't turn
538 * monitoring on or off at the PeerGroup level, only the device level.
541 PeerGroup worldGroup = peerGroup.newGroup(PeerGroupID.worldPeerGroupID);
542 PeerInfoService worldService = worldGroup.getPeerInfoService();
544 PeerMonitorInfo peerMonitorInfo = worldService.getPeerMonitorInfo();
545 RemoteMonitorResponse remoteMonitorResponse = RemoteMonitorResponse.createPeerMonitorInfoResponse(queryId, peerMonitorInfo);
546 peerInfoMessenger.sendPeerInfoResponse(queryId, requestSourceID, MONITOR_HANDLER_NAME, remoteMonitorResponse);
547 } catch (PeerGroupException e) {
548 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
549 LOG.fine(e.toString());
554 private void handleLeaseRenewalQuery(int queryId, PeerID requestSourceID, RemoteMonitorQuery remoteMonitorQuery, PeerInfoMessenger peerInfoMessenger) throws DocumentSerializationException {
555 int leaseId = remoteMonitorQuery.getLeaseId();
556 LeaseInfo leaseInfo = leaseInfos.get(new Integer(leaseId));
558 if (leaseInfo != null) {
559 long reqLease = remoteMonitorQuery.getLease();
560 long lease = getLeaseTime(reqLease);
562 leaseInfo.validUntil = System.currentTimeMillis() + lease;
563 setupLeaseTimeout(leaseInfo.leaseId, lease);
565 RemoteMonitorResponse remoteMonitorResponse = RemoteMonitorResponse.createLeaseRenewedResponse(queryId, leaseInfo.leaseId, lease);
566 peerInfoMessenger.sendPeerInfoResponse(queryId, requestSourceID, MONITOR_HANDLER_NAME, remoteMonitorResponse);
568 RemoteMonitorResponse remoteMonitorResponse = RemoteMonitorResponse.createDeniedResponse(queryId);
569 peerInfoMessenger.sendPeerInfoResponse(queryId, requestSourceID, MONITOR_HANDLER_NAME, remoteMonitorResponse);
573 long getLeaseTime(long requestedLease) {
574 long leaseTime = requestedLease < MAX_LEASE ? requestedLease : MAX_LEASE;
576 leaseTime = leaseTime > MIN_LEASE ? leaseTime : MIN_LEASE;
580 private void cancelLease(LeaseInfo leaseInfo) throws MonitorException, DocumentSerializationException {
581 if (leaseInfo.listenerAddedToWorldGroup) {
582 leaseInfo.worldGroup.getPeerInfoService().removeMonitorListener(leaseInfo.monitorListener);
585 RemoteMonitorResponse remoteMonitorResponse = RemoteMonitorResponse.createLeaseEndedResponse(leaseInfo.queryId, leaseInfo.leaseId);
586 leaseInfo.peerInfoMessenger.sendPeerInfoResponse(leaseInfo.queryId, leaseInfo.peerID, MONITOR_HANDLER_NAME, remoteMonitorResponse);
589 private void renewLease(int queryId) {
591 RequestInfo requestInfo = requestInfos.get(new Integer(queryId));
593 if (requestInfo != null) {
594 int renewalQueryId = peerInfoServiceImpl.getNextQueryId();
595 PeerID peerID = requestInfo.peerId;
596 long timeout = requestInfo.timeout;
598 RemoteMonitorQuery remoteMonitorQuery = RemoteMonitorQuery.createLeaseRenewalQuery(requestInfo.leaseId, requestInfo.requestedLease);
600 requestInfo.peerInfoMessenger.sendPeerInfoRequest(queryId, peerID, MONITOR_HANDLER_NAME, remoteMonitorQuery);
601 final RequestInfo renewalRequestInfo = new RequestInfo(peerID, queryId, timeout, requestInfo.peerInfoMessenger);
603 renewalRequestInfo.requestedLease = requestInfo.requestedLease;
604 renewalRequestInfo.origRequestId = queryId;
605 requestInfos.put(renewalQueryId, renewalRequestInfo);
607 } catch (Exception e) {
608 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
609 LOG.log(Level.FINE, "error while attempting Monitor lease renewal", e);
614 private void setupLeaseTimeout(final int leaseId, long lease) {
616 timer.schedule(new TimerTask() {
620 LeaseInfo leaseInfo = leaseInfos.get(new Integer(leaseId));
622 if (leaseInfo != null) {
623 long currentTime = System.currentTimeMillis();
625 if (leaseInfo.validUntil <= currentTime) {
627 cancelLease(leaseInfo);
628 } catch (Exception e) {
631 leaseInfos.remove(leaseInfo.queryId);