WorkflowProcess is forced to make small pauses if shortMemoryStatus is

reached.
pull/1/head
Michael Peter Christen 13 years ago
parent b7bb84c0bb
commit 49be60a7c8

@ -262,6 +262,7 @@ public class Table implements Index, Iterable<Row.Entry> {
} }
} }
@Override
public long mem() { public long mem() {
return this.index.mem() + ((this.table == null) ? 0 : this.table.mem()); return this.index.mem() + ((this.table == null) ? 0 : this.table.mem());
} }
@ -271,10 +272,12 @@ public class Table implements Index, Iterable<Row.Entry> {
return MemoryControl.shortStatus() || MemoryControl.available() < this.minmemremaining; return MemoryControl.shortStatus() || MemoryControl.available() < this.minmemremaining;
} }
@Override
public byte[] smallestKey() { public byte[] smallestKey() {
return this.index.smallestKey(); return this.index.smallestKey();
} }
@Override
public byte[] largestKey() { public byte[] largestKey() {
return this.index.largestKey(); return this.index.largestKey();
} }
@ -348,6 +351,7 @@ public class Table implements Index, Iterable<Row.Entry> {
} }
} }
@Override
public synchronized void addUnique(final Entry row) throws IOException, RowSpaceExceededException { public synchronized void addUnique(final Entry row) throws IOException, RowSpaceExceededException {
assert this.file.size() == this.index.size() : "file.size() = " + this.file.size() + ", index.size() = " + this.index.size(); assert this.file.size() == this.index.size() : "file.size() = " + this.file.size() + ", index.size() = " + this.index.size();
assert this.table == null || this.table.size() == this.index.size() : "table.size() = " + this.table.size() + ", index.size() = " + this.index.size(); assert this.table == null || this.table.size() == this.index.size() : "table.size() = " + this.table.size() + ", index.size() = " + this.index.size();
@ -395,6 +399,7 @@ public class Table implements Index, Iterable<Row.Entry> {
* and * and
* @throws * @throws
*/ */
@Override
public synchronized List<RowCollection> removeDoubles() throws IOException, RowSpaceExceededException { public synchronized List<RowCollection> removeDoubles() throws IOException, RowSpaceExceededException {
assert this.file.size() == this.index.size() : "file.size() = " + this.file.size() + ", index.size() = " + this.index.size(); assert this.file.size() == this.index.size() : "file.size() = " + this.file.size() + ", index.size() = " + this.index.size();
final List<RowCollection> report = new ArrayList<RowCollection>(); final List<RowCollection> report = new ArrayList<RowCollection>();
@ -447,6 +452,7 @@ public class Table implements Index, Iterable<Row.Entry> {
return report; return report;
} }
@Override
public void close() { public void close() {
if (this.file != null) this.file.close(); if (this.file != null) this.file.close();
this.file = null; this.file = null;
@ -461,10 +467,12 @@ public class Table implements Index, Iterable<Row.Entry> {
if (this.file != null) close(); if (this.file != null) close();
} }
@Override
public String filename() { public String filename() {
return this.file.filename().toString(); return this.file.filename().toString();
} }
@Override
public Entry get(final byte[] key, final boolean _forcecopy) throws IOException { public Entry get(final byte[] key, final boolean _forcecopy) throws IOException {
if (this.file == null || this.index == null) return null; if (this.file == null || this.index == null) return null;
Entry e = get0(key); Entry e = get0(key);
@ -506,6 +514,7 @@ public class Table implements Index, Iterable<Row.Entry> {
return this.rowdef.newEntry(b); return this.rowdef.newEntry(b);
} }
@Override
public Map<byte[], Row.Entry> get(final Collection<byte[]> keys, final boolean forcecopy) throws IOException, InterruptedException { public Map<byte[], Row.Entry> get(final Collection<byte[]> keys, final boolean forcecopy) throws IOException, InterruptedException {
final Map<byte[], Row.Entry> map = new TreeMap<byte[], Row.Entry>(row().objectOrder); final Map<byte[], Row.Entry> map = new TreeMap<byte[], Row.Entry>(row().objectOrder);
Row.Entry entry; Row.Entry entry;
@ -516,15 +525,18 @@ public class Table implements Index, Iterable<Row.Entry> {
return map; return map;
} }
@Override
public boolean has(final byte[] key) { public boolean has(final byte[] key) {
if (this.index == null) return false; if (this.index == null) return false;
return this.index.has(key); return this.index.has(key);
} }
@Override
public synchronized CloneableIterator<byte[]> keys(final boolean up, final byte[] firstKey) throws IOException { public synchronized CloneableIterator<byte[]> keys(final boolean up, final byte[] firstKey) throws IOException {
return this.index.keys(up, firstKey); return this.index.keys(up, firstKey);
} }
@Override
public Entry replace(final Entry row) throws IOException, RowSpaceExceededException { public Entry replace(final Entry row) throws IOException, RowSpaceExceededException {
assert row != null; assert row != null;
if (this.file == null || row == null) return null; if (this.file == null || row == null) return null;

@ -32,9 +32,11 @@ public abstract class AbstractBlockingThread<J extends WorkflowJob> extends Abst
private WorkflowProcessor<J> manager = null; private WorkflowProcessor<J> manager = null;
private final static Log log = new Log("BlockingThread"); private final static Log log = new Log("BlockingThread");
@Override
public void setManager(final WorkflowProcessor<J> manager) { public void setManager(final WorkflowProcessor<J> manager) {
this.manager = manager; this.manager = manager;
} }
@Override
public WorkflowProcessor<J> getManager() { public WorkflowProcessor<J> getManager() {
return this.manager; return this.manager;
} }
@ -49,8 +51,18 @@ public abstract class AbstractBlockingThread<J extends WorkflowJob> extends Abst
long memstamp0, memstamp1; long memstamp0, memstamp1;
long busyCycles = 0; long busyCycles = 0;
while (running) { while (this.running) {
try { try {
// check memory status
if (!shutdownInProgress() && MemoryControl.shortStatus()) {
// try to idle a bit to get out of that problem somehow without making it worse
for (int i = 0; i < 5; i++) {
try {Thread.sleep(200);} catch (final InterruptedException e) {break;}
if (shutdownInProgress() || !MemoryControl.shortStatus()) {
break;
}
}
}
// do job // do job
timestamp = System.currentTimeMillis(); timestamp = System.currentTimeMillis();
memstamp0 = MemoryControl.used(); memstamp0 = MemoryControl.used();
@ -64,17 +76,21 @@ public abstract class AbstractBlockingThread<J extends WorkflowJob> extends Abst
break; break;
} }
final J out = this.job(in); final J out = this.job(in);
if (out != null) this.manager.passOn(out); if (out != null) {
this.manager.passOn(out);
}
// do memory and busy/idle-count/time monitoring // do memory and busy/idle-count/time monitoring
memstamp1 = MemoryControl.used(); memstamp1 = MemoryControl.used();
if (memstamp1 >= memstamp0) { if (memstamp1 >= memstamp0) {
// no GC in between. this is not shure but most probable // no GC in between. this is not sure but most probable
memuse += memstamp1 - memstamp0; this.memuse += memstamp1 - memstamp0;
} else { } else {
// GC was obviously in between. Add an average as simple heuristic // GC was obviously in between. Add an average as simple heuristic
if (busyCycles > 0) memuse += memuse / busyCycles; if (busyCycles > 0) {
this.memuse += this.memuse / busyCycles;
}
} }
busytime += System.currentTimeMillis() - timestamp; this.busytime += System.currentTimeMillis() - timestamp;
} catch (final InterruptedException e) { } catch (final InterruptedException e) {
// don't ignore this: shut down // don't ignore this: shut down
this.running = false; this.running = false;
@ -93,7 +109,10 @@ public abstract class AbstractBlockingThread<J extends WorkflowJob> extends Abst
} }
private void logSystem(final String text) { private void logSystem(final String text) {
if (log == null) Log.logConfig("THREAD-CONTROL", text); if (log == null) {
else log.logConfig(text); Log.logConfig("THREAD-CONTROL", text);
} else {
log.logConfig(text);
}
} }
} }

@ -40,13 +40,18 @@ import net.yacy.kelondro.logging.Log;
public abstract class AbstractThread extends Thread implements WorkflowThread { public abstract class AbstractThread extends Thread implements WorkflowThread {
private static Log log = new Log("WorkflowThread"); private static Log log = new Log("WorkflowThread");
protected boolean running = true; protected boolean running = true, announcedShutdown = false;
protected long busytime = 0, memuse = 0; protected long busytime = 0, memuse = 0;
private long blockPause = 0; private long blockPause = 0;
private String shortDescr = "", longDescr = ""; private String shortDescr = "", longDescr = "";
private String monitorURL = null; private String monitorURL = null;
private long threadBlockTimestamp = System.currentTimeMillis(); private long threadBlockTimestamp = System.currentTimeMillis();
protected final boolean announceShutdown() {
return this.announcedShutdown;
}
protected final void announceThreadBlockApply() { protected final void announceThreadBlockApply() {
// shall only be used, if a thread blocks for an important reason // shall only be used, if a thread blocks for an important reason
// like a socket connect and must renew the timestamp to correct // like a socket connect and must renew the timestamp to correct
@ -67,6 +72,7 @@ public abstract class AbstractThread extends Thread implements WorkflowThread {
this.busytime += millis; this.busytime += millis;
} }
@Override
public final void setDescription(final String shortText, final String longText, final String monitorURL) { public final void setDescription(final String shortText, final String longText, final String monitorURL) {
// sets a visible description string // sets a visible description string
this.shortDescr = shortText; this.shortDescr = shortText;
@ -74,37 +80,45 @@ public abstract class AbstractThread extends Thread implements WorkflowThread {
this.monitorURL = monitorURL; this.monitorURL = monitorURL;
} }
@Override
public final String getShortDescription() { public final String getShortDescription() {
return this.shortDescr; return this.shortDescr;
} }
@Override
public final String getLongDescription() { public final String getLongDescription() {
return this.longDescr; return this.longDescr;
} }
@Override
public String getMonitorURL() { public String getMonitorURL() {
return this.monitorURL; return this.monitorURL;
} }
@Override
public final long getBlockTime() { public final long getBlockTime() {
// returns the total time that this thread has been blocked so far // returns the total time that this thread has been blocked so far
return this.blockPause; return this.blockPause;
} }
@Override
public final long getExecTime() { public final long getExecTime() {
// returns the total time that this thread has worked so far // returns the total time that this thread has worked so far
return this.busytime; return this.busytime;
} }
@Override
public long getMemoryUse() { public long getMemoryUse() {
// returns the sum of all memory usage differences before and after one busy job // returns the sum of all memory usage differences before and after one busy job
return this.memuse; return this.memuse;
} }
@Override
public boolean shutdownInProgress() { public boolean shutdownInProgress() {
return !this.running || Thread.currentThread().isInterrupted(); return !this.running || this.announcedShutdown || Thread.currentThread().isInterrupted();
} }
@Override
public void terminate(final boolean waitFor) { public void terminate(final boolean waitFor) {
// after calling this method, the thread shall terminate // after calling this method, the thread shall terminate
this.running = false; this.running = false;
@ -121,10 +135,14 @@ public abstract class AbstractThread extends Thread implements WorkflowThread {
} }
private final void logError(final String text,final Throwable thrown) { private final void logError(final String text,final Throwable thrown) {
if (log == null) Log.logSevere("THREAD-CONTROL", text, thrown); if (log == null) {
else log.logSevere(text,thrown); Log.logSevere("THREAD-CONTROL", text, thrown);
} else {
log.logSevere(text,thrown);
}
} }
@Override
public void jobExceptionHandler(final Exception e) { public void jobExceptionHandler(final Exception e) {
if (!(e instanceof ClosedByInterruptException)) { if (!(e instanceof ClosedByInterruptException)) {
// default handler for job exceptions. shall be overridden for own handler // default handler for job exceptions. shall be overridden for own handler
@ -132,7 +150,9 @@ public abstract class AbstractThread extends Thread implements WorkflowThread {
} }
} }
@Override
public void open() {}; // dummy definition; should be overriden public void open() {}; // dummy definition; should be overriden
@Override
public void close() {}; // dummy definition; should be overriden public void close() {}; // dummy definition; should be overriden
} }

@ -97,7 +97,9 @@ public class WorkflowProcessor<J extends WorkflowJob> {
public J take() throws InterruptedException { public J take() throws InterruptedException {
// read from the input queue // read from the input queue
if (this.input == null) return null; if (this.input == null) {
return null;
}
final long t = System.currentTimeMillis(); final long t = System.currentTimeMillis();
final J j = this.input.take(); final J j = this.input.take();
this.blockTime += System.currentTimeMillis() - t; this.blockTime += System.currentTimeMillis() - t;
@ -107,24 +109,34 @@ public class WorkflowProcessor<J extends WorkflowJob> {
public void passOn(final J next) throws InterruptedException { public void passOn(final J next) throws InterruptedException {
// don't mix this method up with enQueue()! // don't mix this method up with enQueue()!
// this method enqueues into the _next_ queue, not this queue! // this method enqueues into the _next_ queue, not this queue!
if (this.output == null) return; if (this.output == null) {
return;
}
final long t = System.currentTimeMillis(); final long t = System.currentTimeMillis();
this.output.enQueue(next); this.output.enQueue(next);
this.passOnTime += System.currentTimeMillis() - t; this.passOnTime += System.currentTimeMillis() - t;
} }
public void clear() { public void clear() {
if (this.input != null) this.input.clear(); if (this.input != null) {
this.input.clear();
}
} }
public synchronized void relaxCapacity() { public synchronized void relaxCapacity() {
if (this.input.isEmpty()) return; if (this.input.isEmpty()) {
if (this.input.remainingCapacity() > 1000) return; return;
}
if (this.input.remainingCapacity() > 1000) {
return;
}
final BlockingQueue<J> i = new LinkedBlockingQueue<J>(); final BlockingQueue<J> i = new LinkedBlockingQueue<J>();
J e; J e;
while (!this.input.isEmpty()) { while (!this.input.isEmpty()) {
e = this.input.poll(); e = this.input.poll();
if (e == null) break; if (e == null) {
break;
}
i.add(e); i.add(e);
} }
this.input = i; this.input = i;
@ -138,7 +150,9 @@ public class WorkflowProcessor<J extends WorkflowJob> {
//Log.logWarning("PROCESSOR", "executing job " + environment.getClass().getName() + "." + methodName + " serialized"); //Log.logWarning("PROCESSOR", "executing job " + environment.getClass().getName() + "." + methodName + " serialized");
try { try {
final J out = (J) InstantBlockingThread.execMethod(this.environment, this.methodName).invoke(this.environment, new Object[]{in}); final J out = (J) InstantBlockingThread.execMethod(this.environment, this.methodName).invoke(this.environment, new Object[]{in});
if (out != null && this.output != null) this.output.enQueue(out); if (out != null && this.output != null) {
this.output.enQueue(out);
}
} catch (final IllegalArgumentException e) { } catch (final IllegalArgumentException e) {
Log.logException(e); Log.logException(e);
} catch (final IllegalAccessException e) { } catch (final IllegalAccessException e) {
@ -161,8 +175,12 @@ public class WorkflowProcessor<J extends WorkflowJob> {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void announceShutdown() { public void announceShutdown() {
if (this.executor == null) return; if (this.executor == null) {
if (this.executor.isShutdown()) return; return;
}
if (this.executor.isShutdown()) {
return;
}
// before we put pills into the queue, make sure that they will take them // before we put pills into the queue, make sure that they will take them
relaxCapacity(); relaxCapacity();
// put poison pills into the queue // put poison pills into the queue

Loading…
Cancel
Save