diff --git a/source/net/yacy/crawler/HostBalancer.java b/source/net/yacy/crawler/HostBalancer.java index 6eaa377b8..be7fee676 100644 --- a/source/net/yacy/crawler/HostBalancer.java +++ b/source/net/yacy/crawler/HostBalancer.java @@ -65,7 +65,7 @@ public class HostBalancer implements Balancer { private final static ConcurrentLog log = new ConcurrentLog("HostBalancer"); public final static HandleMap depthCache = new RowHandleMap(Word.commonHashLength, Word.commonHashOrder, 2, 8 * 1024 * 1024, "HostBalancer.DepthCache"); - + private final File hostsPath; private final boolean exceed134217727; private final Map queues; @@ -84,7 +84,7 @@ public class HostBalancer implements Balancer { final boolean exceed134217727) { this(hostsPath, onDemandLimit, exceed134217727, true); } - + /** * Create a new instance and fills the queue by scanning the hostsPath directory. * @param hostsPath @@ -100,7 +100,7 @@ public class HostBalancer implements Balancer { this.hostsPath = hostsPath; this.onDemandLimit = onDemandLimit; this.exceed134217727 = exceed134217727; - + // create a stack for newly entered entries if (!(hostsPath.exists())) hostsPath.mkdirs(); // make the path this.queues = new ConcurrentHashMap(); @@ -114,7 +114,7 @@ public class HostBalancer implements Balancer { * return immediately (as large unfinished crawls may take longer to load) */ private void init(final boolean async) { - if(async) { + if(async) { Thread t = new Thread("HostBalancer.init") { @Override public void run() { @@ -122,10 +122,10 @@ public class HostBalancer implements Balancer { } }; - t.start(); - } else { - runInit(); - } + t.start(); + } else { + runInit(); + } } /** @@ -185,7 +185,7 @@ public class HostBalancer implements Balancer { } return c; } - + /** * delete all urls which are stored for given host hashes * @param hosthashes @@ -230,11 +230,11 @@ public class HostBalancer implements Balancer { return c; } - /** - * @return true when the URL is queued is this or any other HostBalancer - * instance (as {@link #depthCache} is shared between all HostBalancer - * instances) - */ + /** + * @return true when the URL is queued is this or any other HostBalancer + * instance (as {@link #depthCache} is shared between all HostBalancer + * instances) + */ @Override public boolean has(final byte[] urlhashb) { if (depthCache.has(urlhashb)) return true; @@ -313,7 +313,7 @@ public class HostBalancer implements Balancer { tryagain: while (true) try { HostQueue rhq = null; String rhh = null; - + synchronized (this) { if (this.roundRobinHostHashes.size() == 0) { // refresh the round-robin cache @@ -331,14 +331,21 @@ public class HostBalancer implements Balancer { if (size <= 10) {smallStacksExist = true; break smallsearch;} } } - if (singletonStacksExist || smallStacksExist) { - Iterator i = this.roundRobinHostHashes.iterator(); - smallstacks: while (i.hasNext()) { - if (this.roundRobinHostHashes.size() <= 10) break smallstacks; // don't shrink the hosts until nothing is left - String s = i.next(); - HostQueue hq = this.queues.get(s); - if (hq == null) {i.remove(); continue smallstacks;} - int delta = Latency.waitingRemainingGuessed(hq.getHost(), hq.getPort(), s, robots, ClientIdentification.yacyInternetCrawlerAgent); + Set freshhosts = new HashSet<>(); + Iterator i = this.roundRobinHostHashes.iterator(); + smallstacks: while (i.hasNext()) { + if (this.roundRobinHostHashes.size() <= 10) break smallstacks; // don't shrink the hosts until nothing is left + String hosthash = i.next(); + HostQueue hq = this.queues.get(hosthash); + if (hq == null) {i.remove(); continue smallstacks;} + int delta = Latency.waitingRemainingGuessed(hq.getHost(), hq.getPort(), hosthash, robots, ClientIdentification.yacyInternetCrawlerAgent); + if (delta == Integer.MIN_VALUE) { + // never-crawled hosts; we do not want to have too many of them in here. Loading new hosts means: waiting for robots.txt to load + freshhosts.add(hosthash); + i.remove(); + continue smallstacks; + } + if (singletonStacksExist || smallStacksExist) { if (delta < 0) continue; // keep all non-waiting stacks; they are useful to speed up things // to protect all small stacks which have a fast throughput, remove all with long waiting time if (delta >= 1000) {i.remove(); continue smallstacks;} @@ -350,6 +357,10 @@ public class HostBalancer implements Balancer { } } } + // put at least one of the fresh hosts back + if (freshhosts.size() > 0) this.roundRobinHostHashes.add(freshhosts.iterator().next()); + + // result if (this.roundRobinHostHashes.size() == 1) { if (log.isFine()) log.fine("(re-)initialized the round-robin queue with one host"); } else { @@ -357,13 +368,13 @@ public class HostBalancer implements Balancer { } } if (this.roundRobinHostHashes.size() == 0) return null; - + // if the queue size is 1, just take that if (this.roundRobinHostHashes.size() == 1) { rhh = this.roundRobinHostHashes.iterator().next(); rhq = this.queues.get(rhh); } - + if (rhq == null) { // mixed minimum sleep time / largest queue strategy: // create a map of sleep time / queue relations with a fuzzy sleep time (ms / 500). @@ -449,7 +460,7 @@ public class HostBalancer implements Balancer { } */ } - + if (rhq == null) { this.roundRobinHostHashes.clear(); // force re-initialization continue tryagain; @@ -458,7 +469,7 @@ public class HostBalancer implements Balancer { long timestamp = System.currentTimeMillis(); Request request = rhq.pop(delay, cs, robots); // this pop is outside of synchronization to prevent blocking of pushes long actualwaiting = System.currentTimeMillis() - timestamp; - + if (actualwaiting > 1000) { synchronized (this) { // to prevent that this occurs again, remove all stacks with positive delay times (which may be less after that waiting) @@ -473,7 +484,7 @@ public class HostBalancer implements Balancer { } } } - + if (rhq.isEmpty()) { synchronized (this) { this.queues.remove(rhh); @@ -545,7 +556,7 @@ public class HostBalancer implements Balancer { @Override public List getDomainStackReferences(String host, int maxcount, long maxtime) { if (host == null) { - return Collections.emptyList(); + return Collections.emptyList(); } try { HostQueue hq = this.queues.get(DigestURL.hosthash(host, host.startsWith("ftp.") ? 21 : 80));