more concurrency for postprocessing

pull/1/head
orbiter 11 years ago
parent a83cf26c38
commit 2bc6199408

@ -163,17 +163,18 @@ public abstract class AbstractSolrConnector implements SolrConnector {
final String ... fields) { final String ... fields) {
assert buffersize > 0; assert buffersize > 0;
if (!prefetchIDs) return concurrentDocumentsByQueryNoPrefetch(querystring, sort, offset, maxcount, maxtime, buffersize, concurrency, fields); if (!prefetchIDs) return concurrentDocumentsByQueryNoPrefetch(querystring, sort, offset, maxcount, maxtime, buffersize, concurrency, fields);
final BlockingQueue<String> idQueue = concurrentIDsByQuery(querystring, sort, offset, maxcount, maxtime, Math.min(maxcount, 10000000), 1); final BlockingQueue<String> idQueue = concurrentIDsByQuery(querystring, sort, offset, maxcount, maxtime, Math.min(maxcount, 10000000), concurrency);
final BlockingQueue<SolrDocument> queue = buffersize <= 0 ? new LinkedBlockingQueue<SolrDocument>() : new ArrayBlockingQueue<SolrDocument>(buffersize); final BlockingQueue<SolrDocument> queue = buffersize <= 0 ? new LinkedBlockingQueue<SolrDocument>() : new ArrayBlockingQueue<SolrDocument>(Math.max(buffersize, concurrency));
final long endtime = maxtime == Long.MAX_VALUE ? Long.MAX_VALUE : System.currentTimeMillis() + maxtime; // we know infinity! final long endtime = maxtime == Long.MAX_VALUE ? Long.MAX_VALUE : System.currentTimeMillis() + maxtime; // we know infinity!
final Thread t = new Thread() { final Thread[] t = new Thread[concurrency];
for (int i = 0; i < concurrency; i++) {
t[i] = new Thread() {
@Override @Override
public void run() { public void run() {
this.setName("AbstractSolrConnector:concurrentDocumentsByQueryWithPrefetch(" + querystring + ")"); this.setName("AbstractSolrConnector:concurrentDocumentsByQueryWithPrefetch(" + querystring + ")");
String nextID; String nextID;
try { try {
while (System.currentTimeMillis() < endtime && (nextID = idQueue.take()) != AbstractSolrConnector.POISON_ID) { while (System.currentTimeMillis() < endtime && (nextID = idQueue.take()) != AbstractSolrConnector.POISON_ID) {
try { try {
SolrDocument d = getDocumentById(nextID, fields); SolrDocument d = getDocumentById(nextID, fields);
// document may be null if another process has deleted the document meanwhile // document may be null if another process has deleted the document meanwhile
@ -189,12 +190,11 @@ public abstract class AbstractSolrConnector implements SolrConnector {
} catch (InterruptedException e) { } catch (InterruptedException e) {
ConcurrentLog.severe("AbstractSolrConnector", "interrupted concurrentDocumentsByQuery: " + e.getMessage()); ConcurrentLog.severe("AbstractSolrConnector", "interrupted concurrentDocumentsByQuery: " + e.getMessage());
} }
for (int i = 0; i < Math.max(1, concurrency); i++) {
try {queue.put(AbstractSolrConnector.POISON_DOCUMENT);} catch (final InterruptedException e1) {} try {queue.put(AbstractSolrConnector.POISON_DOCUMENT);} catch (final InterruptedException e1) {}
} }
}
}; };
t.start(); t[i].start();
}
return queue; return queue;
} }

Loading…
Cancel
Save