- better synchronization in SortStack

- better ThreadGroup organization
- less worker threads for media search (64 was too much...)


git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@6497 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 16 years ago
parent 7b1f5b0430
commit 23aef43786

@ -215,7 +215,7 @@ public class yacysearchitem {
if (s.length() <= length) return s; if (s.length() <= length) return s;
final int p = s.lastIndexOf('.'); final int p = s.lastIndexOf('.');
if (p < 0) return s.substring(0, length - 3) + "..."; if (p < 0) return s.substring(0, length - 3) + "...";
return s.substring(0, length - (s.length() - p) - 3) + "..." + s.substring(p); return s.substring(0, length - (s.length() - p) - 3) + "..." + s.substring(p); // TODO check oob
} }
private static String sizename(int size) { private static String sizename(int size) {

@ -58,6 +58,8 @@ public class DocumentIndex extends Segment {
private Worker[] worker; private Worker[] worker;
private CallbackListener callback; private CallbackListener callback;
private static final ThreadGroup workerThreadGroup = new ThreadGroup("workerThreadGroup");
public DocumentIndex(Log log, final File segmentPath, CallbackListener callback, int cachesize) throws IOException { public DocumentIndex(Log log, final File segmentPath, CallbackListener callback, int cachesize) throws IOException {
super(log, segmentPath, cachesize, targetFileSize * 4 - 1, false, false); super(log, segmentPath, cachesize, targetFileSize * 4 - 1, false, false);
int cores = Runtime.getRuntime().availableProcessors() + 1; int cores = Runtime.getRuntime().availableProcessors() + 1;
@ -65,7 +67,7 @@ public class DocumentIndex extends Segment {
this.queue = new LinkedBlockingQueue<File>(cores * 300); this.queue = new LinkedBlockingQueue<File>(cores * 300);
this.worker = new Worker[cores]; this.worker = new Worker[cores];
for (int i = 0; i < cores; i++) { for (int i = 0; i < cores; i++) {
this.worker[i] = new Worker(); this.worker[i] = new Worker(i);
this.worker[i].start(); this.worker[i].start();
} }
} }
@ -75,6 +77,10 @@ public class DocumentIndex extends Segment {
} }
class Worker extends Thread { class Worker extends Thread {
public Worker(int count) {
super(workerThreadGroup, "query-" + count);
}
public void run() { public void run() {
File f; File f;
URIMetadataRow resultRow; URIMetadataRow resultRow;

@ -245,6 +245,7 @@ public final class RankingProcess extends Thread {
// kick out entries that are too bad according to current findings // kick out entries that are too bad according to current findings
r = Long.valueOf(order.cardinal(fEntry)); r = Long.valueOf(order.cardinal(fEntry));
assert maxentries != 0;
if ((maxentries >= 0) && (stack.size() >= maxentries) && (stack.bottom(r.longValue()))) continue; if ((maxentries >= 0) && (stack.size() >= maxentries) && (stack.bottom(r.longValue()))) continue;
// insert // insert

@ -97,7 +97,7 @@ public class ResultFetcher {
// start worker threads to fetch urls and snippets // start worker threads to fetch urls and snippets
this.workerThreads = null; this.workerThreads = null;
deployWorker(query.itemsPerPage, query.neededResults()); deployWorker(Math.min(10, query.itemsPerPage), query.neededResults());
MemoryTracker.update("SEARCH", new ProfilingGraph.searchEvent(query.id(true), this.workerThreads.length + " online snippet fetch threads started", 0, 0), false); MemoryTracker.update("SEARCH", new ProfilingGraph.searchEvent(query.id(true), this.workerThreads.length + " online snippet fetch threads started", 0, 0), false);
} }
@ -302,7 +302,7 @@ public class ResultFetcher {
(this.rankedCache.size() > this.result.size()) (this.rankedCache.size() > this.result.size())
) { ) {
// start worker threads to fetch urls and snippets // start worker threads to fetch urls and snippets
deployWorker(query.itemsPerPage, query.neededResults()); deployWorker(Math.min(10, query.itemsPerPage), query.neededResults());
} }
// finally wait until enough results are there produced from the // finally wait until enough results are there produced from the

@ -443,7 +443,7 @@ public final class serverCore extends AbstractBusyThread implements BusyThread {
public List<Session> getJobList() { public List<Session> getJobList() {
final Thread[] threadList = new Thread[sessionThreadGroup.activeCount()]; final Thread[] threadList = new Thread[sessionThreadGroup.activeCount()];
serverCore.sessionThreadGroup.enumerate(threadList); serverCore.sessionThreadGroup.enumerate(threadList, false);
ArrayList<Session> l = new ArrayList<Session>(); ArrayList<Session> l = new ArrayList<Session>();
for (Thread t: threadList) { for (Thread t: threadList) {
if (t == null) continue; if (t == null) continue;

@ -45,6 +45,8 @@ import de.anomic.yacy.dht.PeerSelection;
public class yacySearch extends Thread { public class yacySearch extends Thread {
private static final ThreadGroup ysThreadGroup = new ThreadGroup("yacySearchThreadGroup");
final private String wordhashes, excludehashes, urlhashes, sitehash, authorhash; final private String wordhashes, excludehashes, urlhashes, sitehash, authorhash;
final private boolean global; final private boolean global;
final private int partitions; final private int partitions;
@ -79,7 +81,7 @@ public class yacySearch extends Thread {
final Blacklist blacklist, final Blacklist blacklist,
final RankingProfile rankingProfile, final RankingProfile rankingProfile,
final Bitfield constraint) { final Bitfield constraint) {
super("yacySearch_" + targetPeer.getName()); super(ysThreadGroup, "yacySearch_" + targetPeer.getName());
//System.out.println("DEBUG - yacySearch thread " + this.getName() + " initialized " + ((urlhashes.length() == 0) ? "(primary)" : "(secondary)")); //System.out.println("DEBUG - yacySearch thread " + this.getName() + " initialized " + ((urlhashes.length() == 0) ? "(primary)" : "(secondary)"));
assert wordhashes.length() >= 12; assert wordhashes.length() >= 12;
this.wordhashes = wordhashes; this.wordhashes = wordhashes;

@ -26,12 +26,12 @@
package net.yacy.kelondro.util; package net.yacy.kelondro.util;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
public class SortStack<E> { public class SortStack<E> {
@ -39,15 +39,16 @@ public class SortStack<E> {
// objects pushed on the stack must implement the hashCode() method to provide a handle // objects pushed on the stack must implement the hashCode() method to provide a handle
// for a double-check. // for a double-check.
private static final Object PRESENT = new Object(); // Dummy value to associate with an Object in the backing Map
private TreeMap<Long, List<E>> onstack; // object within the stack private TreeMap<Long, List<E>> onstack; // object within the stack
private HashSet<E> instack; // keeps track which element has been on the stack private ConcurrentHashMap<E, Object> instack; // keeps track which element has been on the stack
protected int maxsize; protected int maxsize;
public SortStack(final int maxsize) { public SortStack(final int maxsize) {
// the maxsize is the maximum number of entries in the stack // the maxsize is the maximum number of entries in the stack
// if this is set to -1, the size is unlimited // if this is set to -1, the size is unlimited
this.onstack = new TreeMap<Long, List<E>>(); this.onstack = new TreeMap<Long, List<E>>();
this.instack = new HashSet<E>(); this.instack = new ConcurrentHashMap<E, Object>();
this.maxsize = maxsize; this.maxsize = maxsize;
} }
@ -61,7 +62,7 @@ public class SortStack<E> {
* @param weight * @param weight
*/ */
public void push(final E element, Long weight) { public void push(final E element, Long weight) {
if (!this.instack.add(element)) return; if (this.instack.put(element, PRESENT) != null) return;
// put the element on the stack // put the element on the stack
synchronized (this.onstack) { synchronized (this.onstack) {
@ -147,6 +148,7 @@ public class SortStack<E> {
public boolean bottom(final long weight) { public boolean bottom(final long weight) {
// returns true if the element with that weight would be on the bottom of the stack after inserting // returns true if the element with that weight would be on the bottom of the stack after inserting
if (this.onstack.size() == 0) return true;
Long l; Long l;
synchronized (this.onstack) { synchronized (this.onstack) {
l = this.onstack.lastKey(); l = this.onstack.lastKey();

@ -27,8 +27,8 @@
package net.yacy.kelondro.util; package net.yacy.kelondro.util;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
/** /**
* extends the sortStack in such a way that it adds a list where objects, that had * extends the sortStack in such a way that it adds a list where objects, that had
@ -38,8 +38,9 @@ import java.util.Iterator;
*/ */
public class SortStore<E> extends SortStack<E> { public class SortStore<E> extends SortStack<E> {
private static final Object PRESENT = new Object(); // Dummy value to associate with an Object in the backing Map
private final ArrayList<stackElement> offstack; // objects that had been on the stack but had been removed private final ArrayList<stackElement> offstack; // objects that had been on the stack but had been removed
private HashSet<E> offset; // keeps track which element has been on the stack or is now in the offstack private ConcurrentHashMap<E, Object> offset; // keeps track which element has been on the stack or is now in the offstack
private long largest; private long largest;
public SortStore() { public SortStore() {
@ -50,7 +51,7 @@ public class SortStore<E> extends SortStack<E> {
super(maxsize); super(maxsize);
this.largest = Long.MIN_VALUE; this.largest = Long.MIN_VALUE;
this.offstack = new ArrayList<stackElement>(); this.offstack = new ArrayList<stackElement>();
this.offset = new HashSet<E>(); this.offset = new ConcurrentHashMap<E, Object>();
} }
public int size() { public int size() {
@ -61,8 +62,9 @@ public class SortStore<E> extends SortStack<E> {
return this.offstack.size(); return this.offstack.size();
} }
public synchronized void push(final E element, final Long weight) { public void push(final E element, final Long weight) {
if (this.offset.contains(element)) return; if (this.offset.containsKey(element)) return;
if (super.exists(element)) return;
super.push(element, weight); super.push(element, weight);
this.largest = Math.max(this.largest, weight.longValue()); this.largest = Math.max(this.largest, weight.longValue());
if (this.maxsize <= 0) return; if (this.maxsize <= 0) return;
@ -76,21 +78,20 @@ public class SortStore<E> extends SortStack<E> {
* it is removed and added to the offstack list * it is removed and added to the offstack list
* this is exactly the same as element(offstack.size()) * this is exactly the same as element(offstack.size())
*/ */
public synchronized stackElement pop() { public stackElement pop() {
final stackElement se = super.pop(); final stackElement se = super.pop();
if (se == null) return null; if (se == null) return null;
this.offset.put(se.element, PRESENT);
this.offstack.add(se); this.offstack.add(se);
this.offset.add(se.element);
return se; return se;
} }
public synchronized stackElement top() { public stackElement top() {
return super.top(); return super.top();
} }
public synchronized boolean exists(final E element) { public boolean exists(final E element) {
if (super.exists(element)) return true; return super.exists(element) || this.offset.containsKey(element);
return this.offset.contains(element);
} }
/** /**
@ -100,11 +101,11 @@ public class SortStore<E> extends SortStack<E> {
* @param position * @param position
* @return * @return
*/ */
public synchronized stackElement element(final int position) { public stackElement element(final int position) {
if (position < this.offstack.size()) { if (position < this.offstack.size()) {
return this.offstack.get(position); return this.offstack.get(position);
} }
if (position >= size()) return null; // we don't have that element if (position >= super.size() + this.offstack.size()) return null; // we don't have that element
while (position >= this.offstack.size()) this.pop(); while (position >= this.offstack.size()) this.pop();
return this.offstack.get(position); return this.offstack.get(position);
} }
@ -116,19 +117,20 @@ public class SortStore<E> extends SortStack<E> {
* @param count * @param count
* @return * @return
*/ */
public synchronized ArrayList<stackElement> list(final int count) { public ArrayList<stackElement> list(final int count) {
if (count < 0) { if (count < 0) {
// shift all elements // shift all elements
while (super.size() > 0) this.offstack.add(this.pop()); while (super.size() > 0) this.pop();
return this.offstack; return this.offstack;
} }
if (size() < count) throw new RuntimeException("list(" + count + ") exceeded avaiable number of elements (" + size() + ")"); if (count > super.size() + this.offstack.size()) throw new RuntimeException("list(" + count + ") exceeded avaiable number of elements (" + size() + ")");
while (this.offstack.size() < count) this.offstack.add(this.pop()); while (count > this.offstack.size()) this.pop();
return this.offstack; return this.offstack;
} }
public synchronized void remove(final E element) { public void remove(final E element) {
super.remove(element); super.remove(element);
synchronized (this.offstack) {
Iterator<stackElement> i = this.offstack.iterator(); Iterator<stackElement> i = this.offstack.iterator();
while (i.hasNext()) { while (i.hasNext()) {
if (i.next().element.equals(element)) { if (i.next().element.equals(element)) {
@ -137,6 +139,7 @@ public class SortStore<E> extends SortStack<E> {
} }
} }
} }
}
public synchronized boolean bottom(final long weight) { public synchronized boolean bottom(final long weight) {
if (super.bottom(weight)) return true; if (super.bottom(weight)) return true;
@ -150,12 +153,6 @@ public class SortStore<E> extends SortStack<E> {
a.push("abc", 2L); a.push("abc", 2L);
a.push("6s_7dfZk4xvc", 1L); a.push("6s_7dfZk4xvc", 1L);
a.push("6s_7dfZk4xvc", 1L); a.push("6s_7dfZk4xvc", 1L);
a.push("6s_7dfZk4xvc", 1L);
a.push("6s_7dfZk4xvc", 1L);
a.push("6s_7dfZk4xvc", 1L);
a.push("6s_7dfZk4xvc", 1L);
a.push("6s_7dfZk4xvc", 1L);
a.push("6s_7dfZk4xvc", 1L);
a.pop(); a.pop();
System.out.println("size = " + a.size()); System.out.println("size = " + a.size());
} }

Loading…
Cancel
Save