attempt to fix a deadlock situation where the IODispatcher did not work.

I suspect the dispatcher thread has crashed and queues filled so no indexing process was able to write data.
This fix tries to heal the problem, but I am unsure if it helps. To get a better view of the problem, some more log outputs had been inserted.
Added also a new attribut indexer.threads to get a control over the number of default threads for the indexer (default is 1)


git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@5866 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 16 years ago
parent 09987e93fd
commit c10c257255

@ -633,8 +633,7 @@ javastart_priority=10
# wordCacheMaxLow/High is the number of word indexes that shall be held in the # wordCacheMaxLow/High is the number of word indexes that shall be held in the
# ram cache during indexing. When YaCy is shut down, this cache must be # ram cache during indexing. When YaCy is shut down, this cache must be
# flushed to disc; this may last some minutes. # flushed to disc; this may last some minutes.
wordCacheMaxCount = 30000 wordCacheMaxCount = 100000
wordCacheMaxCount__pro = 100000
# Specifies if yacy can be used as transparent http proxy. # Specifies if yacy can be used as transparent http proxy.
# #
@ -684,26 +683,23 @@ crawler.http.acceptEncoding=gzip
crawler.http.acceptLanguage=en-us,en;q=0.5 crawler.http.acceptLanguage=en-us,en;q=0.5
crawler.http.acceptCharset=ISO-8859-1,utf-8;q=0.7,*;q=0.7 crawler.http.acceptCharset=ISO-8859-1,utf-8;q=0.7,*;q=0.7
crawler.http.maxFileSize=262144 crawler.http.maxFileSize=262144
crawler.http.maxFileSize__pro=262144
# ftp crawler specific settings; size in bytes # ftp crawler specific settings; size in bytes
crawler.ftp.maxFileSize=262144 crawler.ftp.maxFileSize=262144
crawler.ftp.maxFileSize__pro=262144
# maximum number of crawler threads # maximum number of crawler threads
crawler.MaxActiveThreads = 30 crawler.MaxActiveThreads = 30
# maximum size of indexing queue # maximum size of indexing queue
indexer.slots = 40 indexer.slots = 40
indexer.slots__pro = 80 indexer.threads = 1
# maximum size of stacker queue # maximum size of stacker queue
stacker.slots = 2000 stacker.slots = 2000
# specifies if yacy should set it's own referer if no referer URL # specifies if yacy should set it's own referer if no referer URL
# was set by the client. # was set by the client.
useYacyReferer = true useYacyReferer = false
useYacyReferer__pro = false
# allow only 443(https-port) for https-proxy? # allow only 443(https-port) for https-proxy?
# if you want to tunnel other protocols, set to false # if you want to tunnel other protocols, set to false

@ -30,6 +30,7 @@ import java.util.concurrent.ArrayBlockingQueue;
import de.anomic.kelondro.blob.BLOBArray; import de.anomic.kelondro.blob.BLOBArray;
import de.anomic.kelondro.index.Row; import de.anomic.kelondro.index.Row;
import de.anomic.kelondro.util.Log;
/** /**
* this is a concurrent merger that can merge single files that are queued for merging. * this is a concurrent merger that can merge single files that are queued for merging.
@ -63,7 +64,7 @@ public class IODispatcher <ReferenceType extends Reference> extends Thread {
} }
public synchronized void terminate() { public synchronized void terminate() {
if (termQueue != null && this.isAlive()) { if (termQueue != null && controlQueue != null && this.isAlive()) {
try { try {
controlQueue.put(poison); controlQueue.put(poison);
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -79,7 +80,7 @@ public class IODispatcher <ReferenceType extends Reference> extends Thread {
} }
public synchronized void dump(ReferenceContainerCache<ReferenceType> cache, File file, ReferenceContainerArray<ReferenceType> array) { public synchronized void dump(ReferenceContainerCache<ReferenceType> cache, File file, ReferenceContainerArray<ReferenceType> array) {
if (dumpQueue == null || !this.isAlive()) { if (dumpQueue == null || controlQueue == null || !this.isAlive()) {
cache.dump(file); cache.dump(file);
} else { } else {
DumpJob job = new DumpJob(cache, file, array); DumpJob job = new DumpJob(cache, file, array);
@ -94,11 +95,11 @@ public class IODispatcher <ReferenceType extends Reference> extends Thread {
} }
public synchronized int queueLength() { public synchronized int queueLength() {
return controlQueue.size(); return (controlQueue == null) ? 0 : controlQueue.size();
} }
public synchronized void merge(File f1, File f2, BLOBArray array, Row payloadrow, File newFile) { public synchronized void merge(File f1, File f2, BLOBArray array, Row payloadrow, File newFile) {
if (mergeQueue == null || !this.isAlive()) { if (mergeQueue == null || controlQueue == null || !this.isAlive()) {
try { try {
array.mergeMount(f1, f2, factory, payloadrow, newFile); array.mergeMount(f1, f2, factory, payloadrow, newFile);
} catch (IOException e) { } catch (IOException e) {
@ -127,21 +128,34 @@ public class IODispatcher <ReferenceType extends Reference> extends Thread {
loop: while (controlQueue.take() != poison) { loop: while (controlQueue.take() != poison) {
// prefer dump actions to flush memory to disc // prefer dump actions to flush memory to disc
if (dumpQueue.size() > 0) { if (dumpQueue.size() > 0) {
try {
dumpJob = dumpQueue.take(); dumpJob = dumpQueue.take();
dumpJob.dump(); dumpJob.dump();
} catch (InterruptedException e) {
e.printStackTrace();
Log.logSevere("IODispatcher", "main run job was interrupted (1)", e);
}
continue loop; continue loop;
} }
// otherwise do a merge operation // otherwise do a merge operation
if (mergeQueue.size() > 0) { if (mergeQueue.size() > 0) {
try {
mergeJob = mergeQueue.take(); mergeJob = mergeQueue.take();
mergeJob.merge(); mergeJob.merge();
} catch (InterruptedException e) {
e.printStackTrace();
Log.logSevere("IODispatcher", "main run job was interrupted (2)", e);
}
continue loop; continue loop;
} }
assert false; // this should never happen assert false; // this should never happen
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
Log.logSevere("IODispatcher", "main run job was interrupted (3)", e);
} finally { } finally {
Log.logInfo("IODispatcher", "terminating run job");
controlQueue = null;
try { try {
termQueue.put(poison); termQueue.put(poison);
} catch (InterruptedException e) { } catch (InterruptedException e) {

@ -466,12 +466,13 @@ public final class ReferenceContainerCache<ReferenceType extends Reference> exte
return 0; return 0;
} }
public synchronized void add(final ReferenceContainer<ReferenceType> container) { public void add(final ReferenceContainer<ReferenceType> container) {
// this puts the entries into the cache // this puts the entries into the cache
if (this.cache == null || container == null || container.size() == 0) return; if (this.cache == null || container == null || container.size() == 0) return;
// put new words into cache // put new words into cache
ByteArray tha = new ByteArray(container.getTermHash()); ByteArray tha = new ByteArray(container.getTermHash());
synchronized (this) {
ReferenceContainer<ReferenceType> entries = cache.get(tha); // null pointer exception? wordhash != null! must be cache==null ReferenceContainer<ReferenceType> entries = cache.get(tha); // null pointer exception? wordhash != null! must be cache==null
int added = 0; int added = 0;
if (entries == null) { if (entries == null) {
@ -486,15 +487,18 @@ public final class ReferenceContainerCache<ReferenceType extends Reference> exte
entries = null; entries = null;
return; return;
} }
}
public synchronized void add(final byte[] termHash, final ReferenceType newEntry) { public void add(final byte[] termHash, final ReferenceType newEntry) {
assert this.cache != null; assert this.cache != null;
ByteArray tha = new ByteArray(termHash); ByteArray tha = new ByteArray(termHash);
synchronized (this) {
ReferenceContainer<ReferenceType> container = cache.get(tha); ReferenceContainer<ReferenceType> container = cache.get(tha);
if (container == null) container = new ReferenceContainer<ReferenceType>(factory, termHash, this.payloadrow, 1); if (container == null) container = new ReferenceContainer<ReferenceType>(factory, termHash, this.payloadrow, 1);
container.put(newEntry); container.put(newEntry);
cache.put(tha, container); cache.put(tha, container);
} }
}
public int minMem() { public int minMem() {
return 0; return 0;

@ -596,11 +596,12 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<IndexingStack.
this.clusterhashes = this.webIndex.peers().clusterHashes(getConfig("cluster.peers.yacydomain", "")); this.clusterhashes = this.webIndex.peers().clusterHashes(getConfig("cluster.peers.yacydomain", ""));
// deploy blocking threads // deploy blocking threads
int indexerThreads = (int) this.getConfigLong(plasmaSwitchboardConstants.INDEXER_THREADS, 1);
indexingStorageProcessor = new serverProcessor<indexingQueueEntry>( indexingStorageProcessor = new serverProcessor<indexingQueueEntry>(
"storeDocumentIndex", "storeDocumentIndex",
"This is the sequencing step of the indexing queue: no concurrency is wanted here, because the access of the indexer works better if it is not concurrent. Files are written as streams, councurrency would destroy IO performance. In this process the words are written to the RWI cache, which flushes if it is full.", "This is the sequencing step of the indexing queue: no concurrency is wanted here, because the access of the indexer works better if it is not concurrent. Files are written as streams, councurrency would destroy IO performance. In this process the words are written to the RWI cache, which flushes if it is full.",
new String[]{"RWI/Cache/Collections"}, new String[]{"RWI/Cache/Collections"},
this, "storeDocumentIndex", serverProcessor.useCPU + 40, null, 1); this, "storeDocumentIndex", serverProcessor.useCPU + 40, null, indexerThreads);
indexingAnalysisProcessor = new serverProcessor<indexingQueueEntry>( indexingAnalysisProcessor = new serverProcessor<indexingQueueEntry>(
"webStructureAnalysis", "webStructureAnalysis",
"This just stores the link structure of the document into a web structure database.", "This just stores the link structure of the document into a web structure database.",

@ -123,6 +123,7 @@ public final class plasmaSwitchboardConstants {
public static final String INDEXER_METHOD_JOBCOUNT = "queueSize"; public static final String INDEXER_METHOD_JOBCOUNT = "queueSize";
public static final String INDEXER_METHOD_FREEMEM = "deQueueFreeMem"; public static final String INDEXER_METHOD_FREEMEM = "deQueueFreeMem";
public static final String INDEXER_SLOTS = "indexer.slots"; public static final String INDEXER_SLOTS = "indexer.slots";
public static final String INDEXER_THREADS = "indexer.threads";
// 90_cleanup // 90_cleanup
/** /**
* <p><code>public static final String <strong>CLEANUP</strong> = "90_cleanup"</code></p> * <p><code>public static final String <strong>CLEANUP</strong> = "90_cleanup"</code></p>

Loading…
Cancel
Save