activated RWI distribution to DHT for senior peers (default redundancy 3), necessary now for network growth

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@438 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 20 years ago
parent 2549d6e236
commit ad90f0ad13

@ -3,11 +3,11 @@ javacSource=1.4
javacTarget=1.4
# Release Configuration
releaseVersion=0.391
#releaseFile=yacy_dev_v${releaseVersion}_${DSTAMP}_${releaseNr}.tar.gz
releaseFile=yacy_v${releaseVersion}_${DSTAMP}_${releaseNr}.tar.gz
#releaseDir=yacy_dev_v${releaseVersion}_${DSTAMP}_${releaseNr}
releaseDir=yacy_v${releaseVersion}_${DSTAMP}_${releaseNr}
releaseVersion=0.392
releaseFile=yacy_dev_v${releaseVersion}_${DSTAMP}_${releaseNr}.tar.gz
#releaseFile=yacy_v${releaseVersion}_${DSTAMP}_${releaseNr}.tar.gz
releaseDir=yacy_dev_v${releaseVersion}_${DSTAMP}_${releaseNr}
#releaseDir=yacy_v${releaseVersion}_${DSTAMP}_${releaseNr}
releaseNr=$Revision$
# defining some file/directory access rights

@ -70,11 +70,15 @@ public class IndexCreateWWWGlobalQueue_p {
if (post != null) {
if (post.containsKey("clearcrawlqueue")) {
String urlHash;
int c = switchboard.urlPool.noticeURL.stackSize(plasmaCrawlNURL.STACK_TYPE_LIMIT);
switchboard.urlPool.noticeURL.clear(plasmaCrawlNURL.STACK_TYPE_LIMIT);
/*
int c = 0;
while (switchboard.urlPool.noticeURL.stackSize(plasmaCrawlNURL.STACK_TYPE_LIMIT) > 0) {
urlHash = switchboard.urlPool.noticeURL.pop(plasmaCrawlNURL.STACK_TYPE_LIMIT).hash();
if (urlHash != null) { switchboard.urlPool.noticeURL.remove(urlHash); c++; }
}
*/
prop.put("info", 3);//crawling queue cleared
prop.put("info_numEntries", c);
}

@ -70,11 +70,8 @@ public class IndexCreateWWWLocalQueue_p {
if (post != null) {
if (post.containsKey("clearcrawlqueue")) {
String urlHash;
int c = 0;
while (switchboard.urlPool.noticeURL.stackSize(plasmaCrawlNURL.STACK_TYPE_CORE) > 0) {
urlHash = switchboard.urlPool.noticeURL.pop(plasmaCrawlNURL.STACK_TYPE_CORE).hash();
if (urlHash != null) { switchboard.urlPool.noticeURL.remove(urlHash); c++; }
}
int c = switchboard.urlPool.noticeURL.stackSize(plasmaCrawlNURL.STACK_TYPE_CORE);
switchboard.urlPool.noticeURL.clear(plasmaCrawlNURL.STACK_TYPE_CORE);
prop.put("info", 3);//crawling queue cleared
prop.put("info_numEntries", c);
}

@ -12,7 +12,8 @@
This is the control page for web pages that your peer has indexed during the current application run-time
as result of proxy fetch/prefetch.
<b>No personal or protected page is indexed</b>;
those pages are detected by Cookie-Use or POST-Parameters (either in URL or as HTTP protocol)
those pages are detected by properties in the HTTP header (like Cookie-Use, or HTTP Authorization)
or by POST-Parameters (either in URL or as HTTP protocol)
and automatically excluded from indexing.
</p>
@ -54,46 +55,8 @@ Please delete that file and restart.</b><br>
<br><b>An error has occurred: #[error]#.</b><br>
#(/info)#
<p>Snapshot of recently indexed web pages that passed the proxy:<br>
#(table4)#
<i>The stack is empty.</i>
::
<i>
#(size)#
Showing all #[all]# entries in this stack.
::
Showing latest #[count]# lines from a stack of #[all]# entries.
#(/size)#
</i>
<table border="0" cellpadding="2" cellspacing="1" width="100%">
<tr class="TableHeader">
<td class="small"><form action="#[feedbackpage]#" method="post" enctype="multipart/form-data">
<input type="submit" name="clearlist#[tabletype]#" value="clear list"></form></td>
#(showInit)#::<td class="small"><b>Initiator</b></td>#(/showInit)#
#(showExec)#::<td class="small"><b>Executor</b></td>#(/showExec)#
<td class="small"><b>Modified Date</b></td>
<td class="small"><b>#Words</b></td>
<td class="small"><b>Title</b></td>
<td class="small"><b>URL</b></td>
</tr>
#{indexed}#
<tr class="TableCell#(dark)#Light::Dark#(/dark)#" class="small">
<td width="30" class="small">
<form action="#[feedbackpage]#" method="post" enctype="multipart/form-data">
<input type="submit" name="deleteentry" value="delete" class="small">
<input type="hidden" name="table" value="#[tabletype]#">
<input type="hidden" name="hash" value="#[urlhash]#">
</form></td>
#(showInit)#::<td width="60" class="small">#[initiatorSeed]#</td>#(/showInit)#
#(showExec)#::<td width="60" class="small">#[executorSeed]#</td>#(/showExec)#
<td width="80" class="small">#[moddate]#</td>
<td width="40" class="small">#[wordcount]#</td>
<td width="180" class="small">#[urldescr]#</td>
<td class="small">#[url]#</td>
</tr>
#{/indexed}#
</table><br>
#(/table4)#
<p>You can see a snapshot of recently indexed pages
on the <a href="http://localhost:8000/IndexMonitor.html?process=4">Proxy Index Monitor</a> Page.
</p>
#[footer]#

@ -74,21 +74,7 @@ public class ProxyIndexingMonitor_p {
prop.put("info_message", "");
if (post != null) {
if (post.containsKey("clearlist4")) switchboard.urlPool.loadedURL.clearStack(4); // local: by proxy crawl
if (post.containsKey("deleteentry")) {
String hash = post.get("hash", null);
if (hash != null) {
// delete from database
switchboard.urlPool.loadedURL.remove(hash);
}
}
if (post.containsKey("moreIndexed")) {
showIndexedCount = Integer.parseInt(post.get("showIndexed", "40"));
}
if (post.get("se") != null) se = true;
if (post.containsKey("proxyprofileset")) try {
// read values and put them in global settings
int newProxyPrefetchDepth = Integer.parseInt((String) post.get("proxyPrefetchDepth", "0"));
@ -121,10 +107,6 @@ public class ProxyIndexingMonitor_p {
}
}
// create tables
String myname = yacyCore.seedDB.mySeed.getName();
prop.putAll(switchboard.urlPool.loadedURL.genTableProps(4, showIndexedCount, false, false, "proxy", null, "ProxyIndexingMonitor_p.html", true));
prop.put("proxyPrefetchDepth", env.getConfig("proxyPrefetchDepth", "0"));
prop.put("proxyStoreHTCacheChecked", env.getConfig("proxyStoreHTCache", "").equals("true") ? 1 : 0);
// return rewrite properties

@ -12,6 +12,7 @@
<item>
<title>#[description]#</title>
<link>#[url]#</link>
#(snippet)#::<description>#[text]#</description>#(/snippet)#
<pubDate>#[date]#</pubDate>
</item>
#{/results}#

@ -563,6 +563,7 @@ public final class httpdProxyHandler extends httpdAbstractHandler implements htt
res.writeContent(hfos, cacheFile);
if (hfos instanceof htmlFilterOutputStream) ((htmlFilterOutputStream) hfos).finalize();
this.theLogger.logDebug("for write-file of " + url + ": contentLength = " + contentLength + ", sizeBeforeDelete = " + sizeBeforeDelete);
cacheManager.writeFileAnnouncement(cacheFile);
if (sizeBeforeDelete == -1) {
// totally fresh file
//cacheEntry.status = plasmaHTCache.CACHE_FILL; // it's an insert

@ -44,6 +44,7 @@ package de.anomic.kelondro;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Set;
import java.util.ConcurrentModificationException;
public class kelondroMergeIterator implements Iterator {
@ -72,10 +73,18 @@ public class kelondroMergeIterator implements Iterator {
}
private void nexta() {
if (a.hasNext()) na = (String) a.next(); else na = null;
try {
if (a.hasNext()) na = (String) a.next(); else na = null;
} catch (ConcurrentModificationException e) {
na = null;
}
}
private void nextb() {
if (b.hasNext()) nb = (String) b.next(); else nb = null;
try {
if (b.hasNext()) nb = (String) b.next(); else nb = null;
} catch (ConcurrentModificationException e) {
nb = null;
}
}
public boolean hasNext() {

@ -239,6 +239,17 @@ public class kelondroRecords {
// thats it!
}
public void clear() throws IOException {
// Removes all mappings from this map
//throw new UnsupportedOperationException("clear not supported");
USEDC = 0;
FREEC = 0;
FREEH = new Handle(NUL);
entryFile.seek(POS_USEDC); entryFile.writeInt(this.USEDC);
entryFile.seek(POS_FREEC); entryFile.writeInt(this.FREEC);
entryFile.seek(POS_FREEH); entryFile.writeInt(this.FREEH.index);
}
public kelondroRecords(File file, long buffersize) throws IOException{
// opens an existing tree
if (!file.exists()) throw new IOException("kelondroRecords: file " + file.getAbsoluteFile().toString() + " does not exist");
@ -776,11 +787,6 @@ public class kelondroRecords {
return TXTPROPS[pos];
}
// Removes all mappings from this map (optional operation).
public void clear() {
throw new UnsupportedOperationException("clear not supported");
}
// Returns true if this map contains no key-value mappings.
public boolean isEmpty() {
return (USEDC == 0);

@ -82,6 +82,12 @@ public class kelondroStack extends kelondroRecords {
super(file, buffersize);
}
public void clear() throws IOException {
super.clear();
setHandle(root, null); // reset the root value
setHandle(toor, null); // reset the toor value
}
public class Counter implements Iterator {
Handle nextHandle = null;
public Counter() throws IOException {

@ -119,6 +119,11 @@ public class kelondroTree extends kelondroRecords implements Comparator {
super(ra, buffersize);
}
public void clear() throws IOException {
super.clear();
setHandle(root, null); // reset the root value
}
// Returns the value to which this map maps the specified key.
public synchronized byte[][] get(byte[] key) throws IOException {
//System.out.println("kelondroTree.get " + new String(key) + " in " + filename);

@ -250,6 +250,21 @@ public class plasmaCrawlNURL extends plasmaURL {
}
}
public void clear(int stackType) {
try {
switch (stackType) {
case STACK_TYPE_CORE: coreStack.clear(); break;
case STACK_TYPE_LIMIT: limitStack.clear(); break;
case STACK_TYPE_OVERHANG: overhangStack.clear(); break;
case STACK_TYPE_REMOTE: remoteStack.clear(); break;
case STACK_TYPE_IMAGE: imageStack.clear(); break;
case STACK_TYPE_MOVIE: movieStack.clear(); break;
case STACK_TYPE_MUSIC: musicStack.clear(); break;
default: return;
}
} catch (IOException e) {}
}
private Entry pop(kelondroStack stack) {
// this is a filo - pop
try {

@ -254,7 +254,8 @@ public final class plasmaHTCache {
" FILES = " + currCacheSize/1048576 + "MB, OLDEST IS " +
((ageHours < 24) ? (ageHours + " HOURS") : ((ageHours / 24) + " DAYS")) +
" OLD");
cleanup();
// start to prefetch ip's from dns
String dom;
long start = System.currentTimeMillis();

@ -380,9 +380,9 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
indexDistribution = new plasmaWordIndexDistribution(urlPool, wordIndex, log,
getConfig("allowDistributeIndex", "false").equals("true"));
indexDistribution.setCounts(100, 1, 8000);
deployThread("20_dhtdistribution", "DHT Distribution (currently by juniors only)", "selection, transfer and deletion of index entries that are not searched on your peer, but on others", null,
new serverInstantThread(indexDistribution, "job", null), 120000);
indexDistribution.setCounts(100, 1, 3, 8000);
deployThread("20_dhtdistribution", "DHT Distribution", "selection, transfer and deletion of index entries that are not searched on your peer, but on others", null,
new serverInstantThread(indexDistribution, "job", null), 12000);
// init migratiion from 0.37 -> 0.38
classicCache = new plasmaWordIndexClassicCacheMigration(plasmaPath, wordIndex);

@ -17,75 +17,99 @@ import de.anomic.kelondro.kelondroException;
public class plasmaWordIndexDistribution {
// distributes parts of the index to other peers
// stops as soon as an error occurrs
private int indexCount;
private int peerCount;
private long maxTime;
private plasmaURLPool urlPool;
private plasmaWordIndex wordIndex;
private serverLog log;
private boolean enabled;
public plasmaWordIndexDistribution(plasmaURLPool urlPool, plasmaWordIndex wordIndex, serverLog log,
boolean enable) {
this.urlPool = urlPool;
this.wordIndex = wordIndex;
setCounts(100 /*indexCount*/, 1 /*peerCount*/, 8000);
}
// distributes parts of the index to other peers
// stops as soon as an error occurrs
private int indexCount;
private int juniorPeerCount, seniorPeerCount;
private long maxTime;
private plasmaURLPool urlPool;
private plasmaWordIndex wordIndex;
private serverLog log;
private boolean enabled;
public plasmaWordIndexDistribution(plasmaURLPool urlPool, plasmaWordIndex wordIndex, serverLog log,
boolean enable) {
this.urlPool = urlPool;
this.wordIndex = wordIndex;
this.enabled = enable;
this.log = log;
setCounts(100 /*indexCount*/, 1 /*juniorPeerCount*/, 3 /*seniorPeerCount*/, 8000);
}
public void enable() {
enabled = true;
}
public void disable() {
enabled = false;
}
public boolean job() {
public void enable() {
enabled = true;
if (yacyCore.seedDB == null) {
log.logDebug("no word distribution: seedDB == null");
return false;
}
public void disable() {
enabled = false;
if (yacyCore.seedDB.mySeed == null) {
log.logDebug("no word distribution: mySeed == null");
return false;
}
if (yacyCore.seedDB.mySeed.isVirgin()) {
log.logDebug("no word distribution: status is virgin");
return false;
}
if (!(enabled)) {
log.logDebug("no word distribution: not enabled");
return false;
}
if (urlPool.loadedURL.size() < 10) {
log.logDebug("no word distribution: loadedURL.size() = " + urlPool.loadedURL.size());
return false;
}
if (wordIndex.size() < 100) {
log.logDebug("no word distribution: not enough words - wordIndex.size() = " + wordIndex.size());
return false;
}
if (urlPool.noticeURL.stackSize() > 0) {
log.logDebug("no word distribution: crawl in progress - noticeURL.stackSize() = " + urlPool.noticeURL.stackSize());
return false;
}
public boolean job() {
if ((yacyCore.seedDB == null) ||
(yacyCore.seedDB.mySeed == null) ||
(yacyCore.seedDB.mySeed.isVirgin()) ||
(urlPool.loadedURL.size() < 10) ||
(wordIndex.size() < 100) ||
(!(yacyCore.seedDB.mySeed.isJunior()))) return false;
// do the transfer
int peerCount = (yacyCore.seedDB.mySeed.isJunior()) ? juniorPeerCount : seniorPeerCount;
long starttime = System.currentTimeMillis();
int transferred = performTransferIndex(indexCount, peerCount, true);
if (transferred <= 0) {
log.logDebug("no word distribution: transfer failed");
return false;
}
int transferred;
long starttime = System.currentTimeMillis();
try {
if (
(urlPool.noticeURL.stackSize() == 0) &&
(enabled) &&
((transferred = performTransferIndex(indexCount, peerCount, true)) > 0)) {
indexCount = transferred;
if ((System.currentTimeMillis() - starttime) > (maxTime * peerCount)) indexCount--; else indexCount++;
if (indexCount < 30) indexCount = 30;
return true;
} else {
// make a long pause
return false;
}
} catch (IllegalArgumentException ee) {
// this is a bug that occurres if a not-fixeable data-inconsistency in the table structure was detected
// make a long pause
log.logError("very bad data inconsistency: " + ee.getMessage());
//ee.printStackTrace();
return false;
}
}
// adopt transfer count
if ((System.currentTimeMillis() - starttime) > (maxTime * peerCount))
indexCount--;
else
indexCount++;
if (indexCount < 30) indexCount = 30;
public void setCounts(int indexCount, int peerCount, long maxTimePerTransfer) {
this.maxTime = maxTimePerTransfer;
this.indexCount = indexCount;
if (indexCount < 30) indexCount = 30;
this.peerCount = peerCount;
}
// show success
return true;
}
public void setCounts(int indexCount, int juniorPeerCount, int seniorPeerCount, long maxTimePerTransfer) {
this.maxTime = maxTimePerTransfer;
this.indexCount = indexCount;
if (indexCount < 30) indexCount = 30;
this.juniorPeerCount = juniorPeerCount;
this.seniorPeerCount = seniorPeerCount;
}
public int performTransferIndex(int indexCount, int peerCount, boolean delete) {
if ((yacyCore.seedDB == null) || (yacyCore.seedDB.sizeConnected() == 0)) return -1;
public int performTransferIndex(int indexCount, int peerCount, boolean delete) {
if ((yacyCore.seedDB == null) || (yacyCore.seedDB.sizeConnected() == 0)) return -1;
// collect index
//String startPointHash = yacyCore.seedCache.mySeed.hash;
String startPointHash = serverCodings.encodeMD5B64("" + System.currentTimeMillis(), true).substring(0, yacySeedDB.commonHashLength);
@ -140,18 +164,18 @@ public class plasmaWordIndexDistribution {
return -1;
}
} else {
// simply close the indexEntities
for (int i = 0; i < indexEntities.length; i++) try {
indexEntities[i].close();
} catch (IOException ee) {}
}
// simply close the indexEntities
for (int i = 0; i < indexEntities.length; i++) try {
indexEntities[i].close();
} catch (IOException ee) {}
}
return indexCount;
} else {
log.logError("Index distribution failed. Too less peers (" + hc + ") received the index, not deleted locally.");
return -1;
}
}
private plasmaWordIndexEntity[] selectTransferIndexes(String hash, int count) {
Vector tmpEntities = new Vector();
String nexthash = "";
@ -161,7 +185,7 @@ public class plasmaWordIndexDistribution {
Enumeration urlEnum;
plasmaWordIndexEntry indexEntry;
while ((count > 0) && (wordHashIterator.hasNext()) &&
((nexthash = (String) wordHashIterator.next()) != null) && (nexthash.trim().length() > 0)) {
((nexthash = (String) wordHashIterator.next()) != null) && (nexthash.trim().length() > 0)) {
indexEntity = wordIndex.getEntity(nexthash, true);
if (indexEntity.size() == 0) {
indexEntity.deleteComplete();
@ -229,7 +253,7 @@ public class plasmaWordIndexDistribution {
/*
if (wordIndex.getEntity(indexEntities[i].wordHash()).deleteComplete())
System.out.println("DEBUG: trial delete of partial word index " + indexEntities[i].wordHash() + " SUCCESSFULL");
else
else
System.out.println("DEBUG: trial delete of partial word index " + indexEntities[i].wordHash() + " FAILED");
*/
// end debug
@ -238,7 +262,7 @@ public class plasmaWordIndexDistribution {
// delete complete file
if (indexEntities[i].deleteComplete()) {
indexEntities[i].close();
} else {
} else {
indexEntities[i].close();
// have another try...
if (!(plasmaWordIndexEntity.wordHash2path(wordIndex.getRoot() /*PLASMADB*/, indexEntities[i].wordHash()).delete())) {
@ -247,7 +271,7 @@ public class plasmaWordIndexDistribution {
}
}
}
indexEntities[i] = null;
indexEntities[i] = null;
}
return success;
}

@ -12,7 +12,7 @@
# INFO regular action information (i.e. any httpd request URL)
# FINEST in-function status debug output
PARSER.level = INFO
YACY.level = INFO
YACY.level = FINEST
HTCACHE.level = INFO
PLASMA.level = FINEST
SERVER.level = INFO

Loading…
Cancel
Save