diff --git a/source/de/anomic/crawler/CrawlQueues.java b/source/de/anomic/crawler/CrawlQueues.java index 24588f2f2..63343cceb 100644 --- a/source/de/anomic/crawler/CrawlQueues.java +++ b/source/de/anomic/crawler/CrawlQueues.java @@ -53,10 +53,10 @@ import de.anomic.yacy.dht.PeerSelection; public class CrawlQueues { - plasmaSwitchboard sb; - Log log; - Map workers; // mapping from url hash to Worker thread object - ProtocolLoader loader; + private plasmaSwitchboard sb; + private Log log; + private Map workers; // mapping from url hash to Worker thread object + private ProtocolLoader loader; private final ArrayList remoteCrawlProviderHashes; public NoticedURL noticeURL; @@ -117,6 +117,14 @@ public class CrawlQueues { return null; } + public void cleanup() { + // wait for all workers to finish + int timeout = (int) sb.getConfigLong("crawler.clientTimeout", 10000); + for (final crawlWorker w: workers.values()) { + if (w.age() > timeout) w.interrupt(); + } + } + public void clear() { // wait for all workers to finish for (final crawlWorker w: workers.values()) { @@ -278,14 +286,21 @@ public class CrawlQueues { //log.logDebug("GlobalCrawl: queue is empty"); return false; } + if (sb.webIndex.queuePreStack.size() >= (int) sb.getConfigLong(plasmaSwitchboardConstants.INDEXER_SLOTS, 30)) { if (this.log.isFine()) log.logFine(type + "Crawl: too many processes in indexing queue, dismissed (" + "sbQueueSize=" + sb.webIndex.queuePreStack.size() + ")"); return false; } + if (this.size() >= sb.getConfigLong(plasmaSwitchboardConstants.CRAWLER_THREADS_ACTIVE_MAX, 10)) { + // try a cleanup + this.cleanup(); + } + // check again if (this.size() >= sb.getConfigLong(plasmaSwitchboardConstants.CRAWLER_THREADS_ACTIVE_MAX, 10)) { if (this.log.isFine()) log.logFine(type + "Crawl: too many processes in loader queue, dismissed (" + "cacheLoader=" + this.size() + ")"); return false; } + if (sb.onlineCaution()) { if (this.log.isFine()) log.logFine(type + "Crawl: online caution, omitting processing"); return false; @@ -311,6 +326,11 @@ public class CrawlQueues { return false; } + if (this.size() >= sb.getConfigLong(plasmaSwitchboardConstants.CRAWLER_THREADS_ACTIVE_MAX, 10)) { + // try a cleanup + cleanup(); + } + // check again if (this.size() >= sb.getConfigLong(plasmaSwitchboardConstants.CRAWLER_THREADS_ACTIVE_MAX, 10)) { if (this.log.isFine()) log.logFine("remoteCrawlLoaderJob: too many processes in loader queue, dismissed (" + "cacheLoader=" + this.size() + ")"); return false; @@ -510,8 +530,10 @@ public class CrawlQueues { private CrawlEntry entry; private final Integer code; + private long start; public crawlWorker(final CrawlEntry entry) { + this.start = System.currentTimeMillis(); this.entry = entry; this.entry.setStatus("worker-initialized", serverProcessorJob.STATUS_INITIATED); this.code = Integer.valueOf(entry.hashCode()); @@ -521,6 +543,10 @@ public class CrawlQueues { } } + public long age() { + return System.currentTimeMillis() - start; + } + public void run() { try { // checking robots.txt for http(s) resources diff --git a/source/de/anomic/index/indexContainerCache.java b/source/de/anomic/index/indexContainerCache.java index 67a752efb..a73b293c8 100755 --- a/source/de/anomic/index/indexContainerCache.java +++ b/source/de/anomic/index/indexContainerCache.java @@ -135,7 +135,7 @@ public final class indexContainerCache implements Iterable, inde public void dump(final File heapFile) throws IOException { assert this.cache != null; - Log.logInfo("indexContainerRAMHeap", "creating alternative rwi heap dump '" + heapFile.getName() + "', " + cache.size() + " rwi's"); + Log.logInfo("indexContainerRAMHeap", "creating rwi heap dump '" + heapFile.getName() + "', " + cache.size() + " rwi's"); if (heapFile.exists()) heapFile.delete(); final HeapWriter dump = new HeapWriter(heapFile, payloadrow.primaryKeyLength, Base64Order.enhancedCoder); final long startTime = System.currentTimeMillis();