]> sjero.net Git - linphone/blob - p2pproxy/dependencies-src/jxse-src-2.5/impl/src/net/jxta/impl/cm/SrdiIndex.java
remove mediastreamer2 and add it as a submodule instead.
[linphone] / p2pproxy / dependencies-src / jxse-src-2.5 / impl / src / net / jxta / impl / cm / SrdiIndex.java
1 /*
2  * Copyright (c) 2001-2007 Sun Microsystems, Inc.  All rights reserved.
3  *
4  *  The Sun Project JXTA(TM) Software License
5  *
6  *  Redistribution and use in source and binary forms, with or without
7  *  modification, are permitted provided that the following conditions are met:
8  *
9  *  1. Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *
12  *  2. Redistributions in binary form must reproduce the above copyright notice,
13  *     this list of conditions and the following disclaimer in the documentation
14  *     and/or other materials provided with the distribution.
15  *
16  *  3. The end-user documentation included with the redistribution, if any, must
17  *     include the following acknowledgment: "This product includes software
18  *     developed by Sun Microsystems, Inc. for JXTA(TM) technology."
19  *     Alternately, this acknowledgment may appear in the software itself, if
20  *     and wherever such third-party acknowledgments normally appear.
21  *
22  *  4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must
23  *     not be used to endorse or promote products derived from this software
24  *     without prior written permission. For written permission, please contact
25  *     Project JXTA at http://www.jxta.org.
26  *
27  *  5. Products derived from this software may not be called "JXTA", nor may
28  *     "JXTA" appear in their name, without prior written permission of Sun.
29  *
30  *  THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
31  *  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
32  *  FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN
33  *  MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
34  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
35  *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
36  *  OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
37  *  LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
38  *  NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
39  *  EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
40  *
41  *  JXTA is a registered trademark of Sun Microsystems, Inc. in the United
42  *  States and other countries.
43  *
44  *  Please see the license information page at :
45  *  <http://www.jxta.org/project/www/license.html> for instructions on use of
46  *  the license in source files.
47  *
48  *  ====================================================================
49  *
50  *  This software consists of voluntary contributions made by many individuals
51  *  on behalf of Project JXTA. For more information on Project JXTA, please see
52  *  http://www.jxta.org.
53  *
54  *  This license is based on the BSD license adopted by the Apache Foundation.
55  */
56 package net.jxta.impl.cm;
57
58 import net.jxta.id.IDFactory;
59 import net.jxta.impl.util.TimeUtils;
60 import net.jxta.impl.xindice.core.DBException;
61 import net.jxta.impl.xindice.core.data.Key;
62 import net.jxta.impl.xindice.core.data.Record;
63 import net.jxta.impl.xindice.core.data.Value;
64 import net.jxta.impl.xindice.core.filer.BTreeCallback;
65 import net.jxta.impl.xindice.core.filer.BTreeFiler;
66 import net.jxta.impl.xindice.core.indexer.IndexQuery;
67 import net.jxta.impl.xindice.core.indexer.NameIndexer;
68 import net.jxta.logging.Logging;
69 import net.jxta.peer.PeerID;
70 import net.jxta.peergroup.PeerGroup;
71
72 import java.io.ByteArrayOutputStream;
73 import java.io.DataInputStream;
74 import java.io.DataOutputStream;
75 import java.io.EOFException;
76 import java.io.File;
77 import java.io.IOException;
78 import java.io.InputStream;
79 import java.lang.reflect.UndeclaredThrowableException;
80 import java.net.URI;
81 import java.net.URISyntaxException;
82 import java.util.ArrayList;
83 import java.util.Collections;
84 import java.util.HashMap;
85 import java.util.HashSet;
86 import java.util.Iterator;
87 import java.util.List;
88 import java.util.Map;
89 import java.util.Set;
90 import java.util.logging.Level;
91 import java.util.logging.Logger;
92
93 /**
94  * SrdiIndex
95  */
96 public class SrdiIndex implements Runnable {
97     
98     /**
99      * Logger
100      */
101     private final static transient Logger LOG = Logger.getLogger(SrdiIndex.class.getName());
102     
103     private long interval = 1000 * 60 * 10;
104     private volatile boolean stop = false;
105     private final Indexer srdiIndexer;
106     private final BTreeFiler cacheDB;
107     private Thread gcThread = null;
108     private final Set<PeerID> gcPeerTBL = new HashSet<PeerID>();
109     
110     private final String indexName;
111     
112     /**
113      * Constructor for the SrdiIndex
114      *
115      * @param group     group
116      * @param indexName the index name
117      */
118     public SrdiIndex(PeerGroup group, String indexName) {
119         this.indexName = indexName;
120         
121         try {
122             String pgdir = null;
123             File storeHome;
124             
125             if (group == null) {
126                 pgdir = "srdi-index";
127                 storeHome = new File(".jxta");
128             } else {
129                 pgdir = group.getPeerGroupID().getUniqueValue().toString();
130                 storeHome = new File(group.getStoreHome());
131             }
132             
133             File rootDir = new File(new File(storeHome, "cm"), pgdir);
134             
135             rootDir = new File(rootDir, "srdi");
136             if (!rootDir.exists()) {
137                 // We need to create the directory
138                 if (!rootDir.mkdirs()) {
139                     throw new RuntimeException("Cm cannot create directory " + rootDir);
140                 }
141             }
142             // peerid database
143             // Storage
144             cacheDB = new BTreeFiler();
145             // lazy checkpoint
146             cacheDB.setSync(false);
147             cacheDB.setLocation(rootDir.getCanonicalPath(), indexName);
148             
149             if (!cacheDB.open()) {
150                 cacheDB.create();
151                 // now open it
152                 cacheDB.open();
153             }
154             
155             // index
156             srdiIndexer = new Indexer(false);
157             srdiIndexer.setLocation(rootDir.getCanonicalPath(), indexName);
158             if (!srdiIndexer.open()) {
159                 srdiIndexer.create();
160                 // now open it
161                 srdiIndexer.open();
162             }
163             
164             if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
165                 LOG.info("[" + ((group == null) ? "none" : group.toString()) + "] : Initialized " + indexName);
166             }
167         } catch (DBException de) {
168             if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
169                 LOG.log(Level.SEVERE, "Unable to Initialize databases", de);
170             }
171             
172             throw new UndeclaredThrowableException(de, "Unable to Initialize databases");
173         } catch (Throwable e) {
174             if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
175                 LOG.log(Level.SEVERE, "Unable to create Cm", e);
176             }
177             
178             if (e instanceof Error) {
179                 throw (Error) e;
180             } else if (e instanceof RuntimeException) {
181                 throw (RuntimeException) e;
182             } else {
183                 throw new UndeclaredThrowableException(e, "Unable to create Cm");
184             }
185         }
186     }
187     
188     /**
189      * Construct a SrdiIndex and starts a GC thread which runs every "interval"
190      * milliseconds
191      *
192      * @param interval  the interval at which the gc will run in milliseconds
193      * @param group     group context
194      * @param indexName SrdiIndex name
195      */
196     
197     public SrdiIndex(PeerGroup group, String indexName, long interval) {
198         this(group, indexName);
199         this.interval = interval;
200         startGC(group, indexName, interval);
201     }
202     
203     /**
204      * Start the GC thread
205      *
206      * @param group the PeerGroup
207      * @param indexName index name
208      * @param interval interval in milliseconds
209      */
210     protected void startGC(PeerGroup group, String indexName, long interval) {
211         if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
212             LOG.info("[" + ((group == null) ? "none" : group.toString()) + "] : Starting SRDI GC Thread for " + indexName);
213         }
214         
215         gcThread = new Thread(group.getHomeThreadGroup(), this, "SrdiIndex GC :" + indexName + " every " + interval + "ms");
216         gcThread.setDaemon(true);
217         gcThread.start();
218     }
219     
220     /**
221      * Returns the name of this srdi index.
222      *
223      * @return index name.
224      */
225     public String getIndexName() {
226         return indexName;
227     }
228     
229     /**
230      * add an index entry
231      *
232      * @param primaryKey primary key
233      * @param attribute  Attribute String to query on
234      * @param value      value of the attribute string
235      * @param expiration expiration associated with this entry relative time in
236      *                   milliseconds
237      * @param pid        peerid reference
238      */
239     public synchronized void add(String primaryKey, String attribute, String value, PeerID pid, long expiration) {
240         
241         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
242             LOG.fine("[" + indexName + "] Adding " + primaryKey + "/" + attribute + " = \'" + value + "\' for " + pid);
243         }
244         
245         try {
246             Key key = new Key(primaryKey + attribute + value);
247             long expiresin = TimeUtils.toAbsoluteTimeMillis(expiration);
248             
249             // update the record if it exists
250             synchronized (cacheDB) {
251                 // FIXME hamada 10/14/04 it is possible a peer re-appears with
252                 // a different set of indexes since it's been marked for garbage
253                 // collection.  will address this issue in a subsequent patch
254                 gcPeerTBL.remove(pid);
255                 
256                 Record record = cacheDB.readRecord(key);
257                 List<Entry> old;
258                 
259                 if (record != null) {
260                     old = readRecord(record).list;
261                 } else {
262                     old = new ArrayList<Entry>();
263                 }
264                 Entry entry = new Entry(pid, expiresin);
265                 
266                 if (!old.contains(entry)) {
267                     old.add(entry);
268                 } else {
269                     // entry exists, replace it (effectively updating expiration)
270                     old.remove(old.indexOf(entry));
271                     old.add(entry);
272                 }
273                 // no sense in keeping expired entries.
274                 old = removeExpired(old);
275                     long t0 = TimeUtils.timeNow();
276                 byte[] data = getData(key, old);
277                 
278                 // if (LOG.isLoggable(Level.FINE)) {
279                 // LOG.fine("Serialized result in : " + (TimeUtils.timeNow() - t0) + "ms.");
280                 // }
281                 if (data == null) {
282                     if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
283                         LOG.severe("Failed to serialize data");
284                     }
285                     return;
286                 }
287                 Value recordValue = new Value(data);
288                 long pos = cacheDB.writeRecord(key, recordValue);
289                 Map<String, String> indexables = getIndexMap(primaryKey + attribute, value);
290                 
291                 srdiIndexer.addToIndex(indexables, pos);
292             }
293         } catch (IOException de) {
294             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
295                 LOG.log(Level.WARNING, "Failed to add SRDI", de);
296             }
297         } catch (DBException de) {
298             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
299                 LOG.log(Level.WARNING, "Failed to add SRDI", de);
300             }
301         }
302     }
303     
304     /**
305      * retrieves a record
306      *
307      * @param pkey  primary key
308      * @param skey  secondary key
309      * @param value value
310      * @return List of Entry objects
311      */
312     public List<Entry> getRecord(String pkey, String skey, String value) {
313         Record record = null;
314         
315         try {
316             Key key = new Key(pkey + skey + value);
317             
318             synchronized (cacheDB) {
319                 record = cacheDB.readRecord(key);
320             }
321         } catch (DBException de) {
322             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
323                 LOG.log(Level.WARNING, "Failed to retrieve SrdiIndex record", de);
324             }
325         }
326         // if record is null, readRecord returns an empty list
327         return readRecord(record).list;
328         
329     }
330     
331     /**
332      * inserts a pkey into a map with a value of value
333      *
334      * @param primaryKey primary key
335      * @param value      value
336      * @return The Map
337      */
338     
339     private Map<String, String> getIndexMap(String primaryKey, String value) {
340         if (primaryKey == null) {
341             return null;
342         }
343         if (value == null) {
344             value = "";
345         }
346         Map<String, String> map = new HashMap<String, String>(1);
347         
348         map.put(primaryKey, value);
349         return map;
350     }
351     
352     /**
353      * remove entries pointing to peer id from cache
354      *
355      * @param pid peer id to remove
356      */
357     public synchronized void remove(PeerID pid) {
358         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
359             LOG.fine(" Adding " + pid + " to peer GC table");
360         }
361         gcPeerTBL.add(pid);
362     }
363     
364     /**
365      * Query SrdiIndex
366      *
367      * @param attribute Attribute String to query on
368      * @param value     value of the attribute string
369      * @return an enumeration of canonical paths
370      * @param primaryKey primary key
371      * @param threshold max number of results
372      */
373     public synchronized List<PeerID> query(String primaryKey, String attribute, String value, int threshold) {
374         
375         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
376             LOG.fine("[" + indexName + "] Querying for " + threshold + " " + primaryKey + "/" + attribute + " = \'" + value + "\'");
377         }
378         
379         // return nothing
380         if (primaryKey == null) {
381             return Collections.emptyList();
382         }
383         
384         List<PeerID> res;
385         
386         // a blind query
387         if (attribute == null) {
388             res = query(primaryKey);
389         } else {
390             res = new ArrayList<PeerID>();
391             
392             IndexQuery iq = Cm.getIndexQuery(value);
393             
394             try {
395                 srdiIndexer.search(iq, primaryKey + attribute, new SearchCallback(cacheDB, res, threshold, gcPeerTBL));
396             } catch (Exception ex) {
397                 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
398                     LOG.log(Level.WARNING, "Exception while searching in index", ex);
399                 }
400             }
401         }
402         
403         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
404             LOG.fine( "[" + indexName + "] Returning " + res.size() + " results for " + primaryKey + "/" + attribute + " = \'"
405                     + value + "\'");
406         }
407         
408         return res;
409     }
410     
411     /**
412      * Query SrdiIndex
413      *
414      * @param primaryKey primary key
415      * @return A list of Peer IDs.
416      */
417     public synchronized List<PeerID> query(String primaryKey) {
418         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
419             LOG.fine("[" + indexName + "] Querying for " + primaryKey);
420         }
421         
422         List<PeerID> res = new ArrayList<PeerID>();
423         
424         try {
425             Map<String, NameIndexer> map = srdiIndexer.getIndexers();
426             
427             for (Map.Entry<String, NameIndexer> index : map.entrySet()) {
428                 String indexName = index.getKey();
429                 // seperate the index name from attribute
430                 if (indexName.startsWith(primaryKey)) {
431                     NameIndexer idxr = index.getValue();
432                     idxr.query(null, new SearchCallback(cacheDB, res, Integer.MAX_VALUE, gcPeerTBL));
433                 }
434             }
435         } catch (Exception ex) {
436             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
437                 LOG.log(Level.WARNING, "Exception while searching in index", ex);
438             }
439         }
440         
441         if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
442             LOG.fine("[" + indexName + "] Returning " + res.size() + " results for " + primaryKey);
443         }
444         
445         return res;
446     }
447     
448     private static final class SearchCallback implements BTreeCallback {
449         private final BTreeFiler cacheDB;
450         private final int threshold;
451         private final List<PeerID> results;
452         private final Set<PeerID> excludeTable;
453         
454         SearchCallback(BTreeFiler cacheDB, List<PeerID> results, int threshold, Set<PeerID> excludeTable) {
455             this.cacheDB = cacheDB;
456             this.threshold = threshold;
457             this.results = results;
458             this.excludeTable = excludeTable;
459         }
460         
461         /**
462          * @inheritDoc
463          */
464         public boolean indexInfo(Value val, long pos) {
465             
466             if (results.size() >= threshold) {
467                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
468                     LOG.fine("SearchCallback.indexInfo reached Threshold :" + threshold);
469                 }
470                 return false;
471             }
472             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
473                 LOG.fine("Found " + val);
474             }
475             Record record = null;
476             
477             try {
478                 record = cacheDB.readRecord(pos);
479             } catch (DBException ex) {
480                 if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
481                     LOG.log(Level.WARNING, "Exception while reading indexed", ex);
482                 }
483                 return false;
484             }
485             
486             if (record != null) {
487                 long t0 = TimeUtils.timeNow();
488                 SrdiIndexRecord rec = readRecord(record);
489
490                 if (Logging.SHOW_FINEST && LOG.isLoggable(Level.FINEST)) {
491                     LOG.finest("Got result back in : " + (TimeUtils.timeNow() - t0) + "ms.");
492                 }
493
494                 copyIntoList(results, rec.list, excludeTable);
495             }
496             
497             return results.size() < threshold;
498         }
499     }
500     
501     
502     private static final class GcCallback implements BTreeCallback {
503         private final BTreeFiler cacheDB;
504         private final Indexer idxr;
505         private final List<Long> list;
506         private final Set<PeerID> table;
507         
508         GcCallback(BTreeFiler cacheDB, Indexer idxr, List<Long> list, Set<PeerID> table) {
509             this.cacheDB = cacheDB;
510             this.idxr = idxr;
511             this.list = list;
512             this.table = table;
513         }
514         
515         /**
516          * @inheritDoc
517          */
518         public boolean indexInfo(Value val, long pos) {
519             
520             Record record = null;
521             synchronized (cacheDB) {
522                 try {
523                     record = cacheDB.readRecord(pos);
524                 } catch (DBException ex) {
525                     if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
526                         LOG.log(Level.WARNING, "Exception while reading indexed", ex);
527                     }
528                     return false;
529                 }
530                 if (record == null) {
531                     return true;
532                 }
533                 SrdiIndexRecord rec = readRecord(record);
534                 List<Entry> res = rec.list;
535                 boolean changed = false;
536                 
537                 Iterator<Entry> eachEntry = res.iterator();
538                 while(eachEntry.hasNext()) {
539                     Entry entry = eachEntry.next();
540                     
541                     if (entry.isExpired() || table.contains(entry.peerid)) {
542                         changed = true;
543                         eachEntry.remove();
544                     }
545                 }
546                 if (changed) {
547                     if (res.isEmpty()) {
548                         try {
549                             cacheDB.deleteRecord(rec.key);
550                             list.add(pos);
551                         } catch (DBException e) {
552                             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
553                                 LOG.log(Level.WARNING, "Exception while deleting empty record", e);
554                             }
555                         }                        
556                     } else {
557                         // write it back
558                         byte[] data = getData(rec.key, res);
559                         Value recordValue = new Value(data);
560                         
561                         try {
562                             cacheDB.writeRecord(pos, recordValue);
563                         } catch (DBException ex) {
564                             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
565                                 LOG.log(Level.WARNING, "Exception while writing back record", ex);
566                             }
567                         }
568                     }
569                 }
570             }
571             return true;
572         }
573     }
574     
575     /**
576      * copies the content of List into a list. Expired entries are not
577      * copied
578      *
579      * @param to   list to copy into
580      * @param from list to copy from
581      * @param table table of PeerID's
582      */
583     private static void copyIntoList(List<PeerID> to, List<Entry> from, Set<PeerID> table) {
584         for (Entry entry : from) {
585             boolean expired = entry.isExpired();
586             
587             if (Logging.SHOW_FINER && LOG.isLoggable(Level.FINER)) {
588                 LOG.finer("Entry peerid : " + entry.peerid + (expired ? " EXPIRED " : (" Expires at : " + entry.expiration)));
589             }
590             
591             if (!to.contains(entry.peerid) && !expired) {
592                 if (!table.contains(entry.peerid)) {
593                     if (Logging.SHOW_FINER && LOG.isLoggable(Level.FINER)) {
594                         LOG.finer("adding Entry :" + entry.peerid + " to list");
595                     }
596                     to.add(entry.peerid);
597                 } else {
598                     if (Logging.SHOW_FINER && LOG.isLoggable(Level.FINER)) {
599                         LOG.finer("Skipping gc marked entry :" + entry.peerid);
600                     }
601                 }
602             } else {
603                 if (Logging.SHOW_FINER && LOG.isLoggable(Level.FINER)) {
604                     LOG.finer("Skipping expired Entry :" + entry.peerid);
605                 }
606             }
607         }
608     }
609     
610     /**
611      * Converts a List of {@link Entry} into a byte[]
612      *
613      * @param key  record key
614      * @param list List to convert
615      * @return byte []
616      */
617     private static byte[] getData(Key key, List<Entry> list) {
618         try {
619             ByteArrayOutputStream bos = new ByteArrayOutputStream();
620             DataOutputStream dos = new DataOutputStream(bos);
621             
622             dos.writeUTF(key.toString());
623             dos.writeInt(list.size());
624             for (Entry anEntry : list) {
625                 dos.writeUTF(anEntry.peerid.toString());
626                 dos.writeLong(anEntry.expiration);
627             }
628             dos.close();
629             return bos.toByteArray();
630         } catch (IOException ie) {
631             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
632                 LOG.log(Level.FINE, "Exception while reading Entry", ie);
633             }
634         }
635         return null;
636     }
637     
638     /**
639      * Reads the content of a record into List
640      *
641      * @param record Btree Record
642      * @return List of entries
643      */
644     public static SrdiIndexRecord readRecord(Record record) {
645         List<Entry> result = new ArrayList<Entry>();
646         Key key = null;
647         
648         if (record == null) {
649             return new SrdiIndexRecord(null, result);
650         }
651         if (record.getValue().getLength() <= 0) {
652             return new SrdiIndexRecord(null, result);
653         }
654         InputStream is = record.getValue().getInputStream();
655         
656         try {
657             DataInputStream ois = new DataInputStream(is);
658             
659             key = new Key(ois.readUTF());
660             int size = ois.readInt();
661             
662             for (int i = 0; i < size; i++) {
663                 try {
664                     String idstr = ois.readUTF();
665                     PeerID pid = (PeerID) IDFactory.fromURI(new URI(idstr));
666                     long exp = ois.readLong();
667                     Entry entry = new Entry(pid, exp);
668                     
669                     result.add(entry);
670                 } catch (URISyntaxException badID) {
671                     // ignored
672                 }
673             }
674             ois.close();
675         } catch (EOFException eofe) {
676             if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
677                 LOG.log(Level.FINE, "Empty record", eofe);
678             }
679         } catch (IOException ie) {
680             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
681                 LOG.log(Level.WARNING, "Exception while reading Entry", ie);
682             }
683         }
684         return new SrdiIndexRecord(key, result);
685     }
686     
687     /**
688      * Empties the index completely.
689      * The entries are abandoned to the GC.
690      */
691     public synchronized void clear() {
692         // FIXME changing the behavior a bit
693         // instead of dropping all srdi entries, we let them expire
694         // if that is not a desired behavior the indexer could be dropped
695         // simply close it, and remove all index db created
696         try {
697             srdiIndexer.close();
698             cacheDB.close();
699         } catch (Exception e) {
700             // bad bits we are done
701             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
702                 LOG.log(Level.WARNING, "failed to close index", e);
703             }
704         }
705     }
706     
707     /**
708      * Garbage Collect expired entries
709      */
710     public void garbageCollect() {
711         try {
712             Map<String, NameIndexer> map = srdiIndexer.getIndexers();
713             
714             for(NameIndexer idxr : map.values()) {
715                 List<Long> list = new ArrayList<Long>();
716                 
717                 if(stop) {
718                     break;
719                 }
720                 
721                 synchronized(this) {
722                     idxr.query(null, new GcCallback(cacheDB, srdiIndexer, list, gcPeerTBL));
723                     srdiIndexer.purge(list);
724                 }
725             }
726             gcPeerTBL.clear();
727         } catch (Exception ex) {
728             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
729                 LOG.log(Level.WARNING, "Failure during SRDI Garbage Collect", ex);
730             }
731         }
732     }
733     
734     /**
735      * Remove expired entries from a List
736      *
737      * @param list A list of entries.
738      * @return The same list with the expired entries removed.
739      */
740     private static List<Entry> removeExpired(List<Entry> list) {
741         Iterator<Entry> eachEntry = list.iterator();
742         
743         while(eachEntry.hasNext()) {
744             Entry entry = eachEntry.next();
745             
746             if (entry.isExpired()) {
747                 eachEntry.remove();
748                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
749                     LOG.fine("Removing expired Entry peerid :" + entry.peerid + " Expires at :" + entry.expiration);
750                 }
751             }
752         }
753         return list;
754     }
755     
756     private static boolean isExpired(long expiration) {
757         return (TimeUtils.timeNow() > expiration);
758     }
759     
760     /**
761      * stop the current running thread
762      */
763     public synchronized void stop() {
764         if(stop) {
765             return;
766         }
767         
768         stop = true;
769         
770         // wakeup and die
771         try {
772             Thread temp = gcThread;
773             if (temp != null) {
774                 synchronized (temp) {
775                     temp.notify();
776                 }
777             }
778         } catch (Exception ignored) {
779             // ignored
780         }
781         
782         // Stop the database
783         
784         try {
785             srdiIndexer.close();
786             cacheDB.close();
787             gcPeerTBL.clear();
788         } catch (Exception ex) {
789             if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
790                 LOG.log(Level.SEVERE, "Unable to stop the Srdi Indexer", ex);
791             }
792         }
793     }
794     
795     /**
796      * {@inheritDoc}
797      * <p/>
798      * Periodic thread for GC
799      */
800     public void run() {
801         try {
802             while (!stop) {
803                 try {
804                     if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
805                         LOG.fine("Waiting for " + interval + "ms before garbage collection");
806                     }
807                     synchronized (gcThread) {
808                         gcThread.wait(interval);
809                     }
810                 } catch (InterruptedException woken) {
811                     // The only reason we are woken is to stop.
812                     Thread.interrupted();
813                     continue;
814                 }
815                 
816                 if (stop) {
817                     break;
818                 }
819                 
820                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
821                     LOG.fine("Garbage collection started");
822                 }
823                 
824                 garbageCollect();
825                 
826                 if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
827                     LOG.fine("Garbage collection completed");
828                 }
829             }
830         } catch (Throwable all) {
831             if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {
832                 LOG.log(Level.SEVERE, "Uncaught Throwable in thread :" + Thread.currentThread().getName(), all);
833             }
834         } finally {
835             synchronized (this) {
836                 gcThread = null;
837             }
838         }
839     }
840     
841     /**
842      * Flushes the Srdi directory for a specified group
843      * this method should only be called before initialization of a given group
844      * calling this method on a running group would have undefined results
845      *
846      * @param group group context
847      */
848     public static void clearSrdi(PeerGroup group) {
849         
850         if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {
851             LOG.info("Clearing SRDI for " + group.getPeerGroupName());
852         }
853         
854         try {
855             String pgdir = null;
856             
857             if (group == null) {
858                 pgdir = "srdi-index";
859             } else {
860                 pgdir = group.getPeerGroupID().getUniqueValue().toString();
861             }
862             File rootDir = null;
863             
864             if (group != null) {
865                 rootDir = new File(new File(new File(group.getStoreHome()), "cm"), pgdir);
866             }
867             
868             rootDir = new File(rootDir, "srdi");
869             if (rootDir.exists()) {
870                 // remove it along with it's content
871                 String[] list = rootDir.list();
872                 
873                 for (String aList : list) {
874                     if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
875                         LOG.fine("Removing : " + aList);
876                     }
877                     File file = new File(rootDir, aList);
878                     
879                     if (!file.delete()) {
880                         if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
881                             LOG.warning("Unable to delete the file");
882                         }
883                     }
884                 }
885                 rootDir.delete();
886             }
887         } catch (Throwable t) {
888             if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {
889                 LOG.log(Level.WARNING, "Unable to clear Srdi", t);
890             }
891         }
892     }
893     
894     /**
895      * An entry in the index tables.
896      */
897     public final static class Entry {
898         
899         public final PeerID peerid;
900         public final long expiration;
901         
902         /**
903          * Peer Pointer reference
904          *
905          * @param peerid     PeerID for this entry
906          * @param expiration the expiration for this entry
907          */
908         public Entry(PeerID peerid, long expiration) {
909             this.peerid = peerid;
910             this.expiration = expiration;
911         }
912         
913         /**
914          * {@inheritDoc}
915          */
916         @Override
917         public boolean equals(Object obj) {
918             return obj instanceof Entry && (peerid.equals(((Entry) obj).peerid));
919         }
920         
921         /**
922          * {@inheritDoc}
923          */
924         @Override
925         public int hashCode() {
926             return peerid.hashCode();
927         }
928         
929         /**
930          *  Return the absolute time in milliseconds at which this entry will
931          *  expire.
932          *
933          *  @return The absolute time in milliseconds at which this entry will
934          *  expire.
935          */
936         public long getExpiration() {
937             return expiration;
938         }
939         
940         /**
941          *  Return {@code true} if this entry is expired.
942          *
943          *  @return {@code true} if this entry is expired otherwise {@code false}.
944          */
945         public boolean isExpired() {
946             return TimeUtils.timeNow() > expiration;
947         }
948     }
949     
950     
951     /**
952      * an SrdiIndexRecord wrapper
953      */
954     public final static class SrdiIndexRecord {
955         
956         public final Key key;
957         public final List<Entry> list;
958         
959         /**
960          * SrdiIndex record
961          *
962          * @param key  record key
963          * @param list record entries
964          */
965         public SrdiIndexRecord(Key key,List<Entry> list) {
966             this.key = key;
967             this.list = list;
968         }
969         
970         /**
971          * {@inheritDoc}
972          */
973         @Override
974         public boolean equals(Object obj) {
975             return obj instanceof SrdiIndexRecord && (key.equals(((SrdiIndexRecord) obj).key));
976         }
977         
978         /**
979          * {@inheritDoc}
980          */
981         @Override
982         public int hashCode() {
983             return key.hashCode();
984         }
985     }
986 }
987