package de.anomic.plasma; import java.io.IOException; import java.util.Enumeration; import java.util.Vector; import java.util.Iterator; import de.anomic.yacy.yacyCore; import de.anomic.yacy.yacySeed; import de.anomic.yacy.yacySeedDB; import de.anomic.yacy.yacyClient; import de.anomic.server.serverCodings; import de.anomic.server.logging.serverLog; import de.anomic.kelondro.kelondroException; public class plasmaWordIndexDistribution { // distributes parts of the index to other peers // stops as soon as an error occurrs private int indexCount; private int peerCount; private long maxTime; private plasmaURLPool urlPool; private plasmaWordIndex wordIndex; private serverLog log; private boolean enabled; public plasmaWordIndexDistribution(plasmaURLPool urlPool, plasmaWordIndex wordIndex, serverLog log, boolean enable) { this.urlPool = urlPool; this.wordIndex = wordIndex; setCounts(100 /*indexCount*/, 1 /*peerCount*/, 8000); } public void enable() { enabled = true; } public void disable() { enabled = false; } public boolean job() { if ((yacyCore.seedDB == null) || (yacyCore.seedDB.mySeed == null) || (yacyCore.seedDB.mySeed.isVirgin()) || (urlPool.loadedURL.size() < 10) || (wordIndex.size() < 100) || (!(yacyCore.seedDB.mySeed.isJunior()))) return false; int transferred; long starttime = System.currentTimeMillis(); try { if ( (urlPool.noticeURL.stackSize() == 0) && (enabled) && ((transferred = performTransferIndex(indexCount, peerCount, true)) > 0)) { indexCount = transferred; if ((System.currentTimeMillis() - starttime) > (maxTime * peerCount)) indexCount--; else indexCount++; if (indexCount < 30) indexCount = 30; return true; } else { // make a long pause return false; } } catch (IllegalArgumentException ee) { // this is a bug that occurres if a not-fixeable data-inconsistency in the table structure was detected // make a long pause log.logError("very bad data inconsistency: " + ee.getMessage()); //ee.printStackTrace(); return false; } } public void setCounts(int indexCount, int peerCount, long maxTimePerTransfer) { this.maxTime = maxTimePerTransfer; this.indexCount = indexCount; if (indexCount < 30) indexCount = 30; this.peerCount = peerCount; } public int performTransferIndex(int indexCount, int peerCount, boolean delete) { if ((yacyCore.seedDB == null) || (yacyCore.seedDB.sizeConnected() == 0)) return -1; // collect index //String startPointHash = yacyCore.seedCache.mySeed.hash; String startPointHash = serverCodings.encodeMD5B64("" + System.currentTimeMillis(), true).substring(0, yacySeedDB.commonHashLength); plasmaWordIndexEntity[] indexEntities = selectTransferIndexes(startPointHash, indexCount); if ((indexEntities == null) || (indexEntities.length == 0)) { log.logDebug("No Index available for Index Transfer, hash start-point " + startPointHash); return -1; } // count the indexes again, can be smaller as expected indexCount = 0; for (int i = 0; i < indexEntities.length; i++) indexCount += indexEntities[i].size(); // find start point for DHT-selection String keyhash = indexEntities[indexEntities.length - 1].wordHash(); // iterate over DHT-peers and send away the indexes yacySeed seed; int hc = 0; Enumeration e = yacyCore.dhtAgent.getAcceptRemoteIndexSeeds(keyhash); String error; String peerNames = ""; while ((e.hasMoreElements()) && (hc < peerCount)) { seed = (yacySeed) e.nextElement(); if (seed != null) { error = yacyClient.transferIndex(seed, indexEntities, urlPool.loadedURL); if (error == null) { log.logInfo("Index Transfer of " + indexCount + " words [" + indexEntities[0].wordHash() + " .. " + indexEntities[indexEntities.length-1].wordHash() + "] to peer " + seed.getName() + ":" + seed.hash + " successfull"); peerNames += ", " + seed.getName(); hc++; } else { log.logWarning("Index Transfer to peer " + seed.getName() + ":" + seed.hash + " failed:'" + error + "', disconnecting peer"); yacyCore.peerActions.peerDeparture(seed); } } } if (peerNames.length() > 0) peerNames = peerNames.substring(2); // remove comma // clean up and finish with deletion of indexes if (hc >= peerCount) { // success if (delete) { try { if (deleteTransferIndexes(indexEntities)) { log.logDebug("Deleted all transferred whole-word indexes locally"); return indexCount; } else { log.logError("Deleted not all transferred whole-word indexes"); return -1; } } catch (IOException ee) { log.logError("Deletion of Indexes not possible:" + ee.getMessage()); ee.printStackTrace(); return -1; } } else { // simply close the indexEntities for (int i = 0; i < indexEntities.length; i++) try { indexEntities[i].close(); } catch (IOException ee) {} } return indexCount; } else { log.logError("Index distribution failed. Too less peers (" + hc + ") received the index, not deleted locally."); return -1; } } private plasmaWordIndexEntity[] selectTransferIndexes(String hash, int count) { Vector tmpEntities = new Vector(); String nexthash = ""; try { Iterator wordHashIterator = wordIndex.wordHashes(hash, true, true); plasmaWordIndexEntity indexEntity, tmpEntity; Enumeration urlEnum; plasmaWordIndexEntry indexEntry; while ((count > 0) && (wordHashIterator.hasNext()) && ((nexthash = (String) wordHashIterator.next()) != null) && (nexthash.trim().length() > 0)) { indexEntity = wordIndex.getEntity(nexthash, true); if (indexEntity.size() == 0) { indexEntity.deleteComplete(); } else if (indexEntity.size() <= count) { // take the whole entity tmpEntities.add(indexEntity); log.logDebug("Selected Whole Index (" + indexEntity.size() + " urls) for word " + indexEntity.wordHash()); count -= indexEntity.size(); } else { // make an on-the-fly entity and insert values tmpEntity = new plasmaWordIndexEntity(indexEntity.wordHash()); urlEnum = indexEntity.elements(true); while ((urlEnum.hasMoreElements()) && (count > 0)) { indexEntry = (plasmaWordIndexEntry) urlEnum.nextElement(); tmpEntity.addEntry(indexEntry); count--; } urlEnum = null; log.logDebug("Selected Partial Index (" + tmpEntity.size() + " from " + indexEntity.size() +" urls) for word " + tmpEntity.wordHash()); tmpEntities.add(tmpEntity); indexEntity.close(); // important: is not closed elswhere and cannot be deleted afterwards indexEntity = null; } } // transfer to array plasmaWordIndexEntity[] indexEntities = new plasmaWordIndexEntity[tmpEntities.size()]; for (int i = 0; i < tmpEntities.size(); i++) indexEntities[i] = (plasmaWordIndexEntity) tmpEntities.elementAt(i); return indexEntities; } catch (IOException e) { log.logError("selectTransferIndexes IO-Error (hash=" + nexthash + "): " + e.getMessage()); e.printStackTrace(); return new plasmaWordIndexEntity[0]; } catch (kelondroException e) { log.logError("selectTransferIndexes database corrupted: " + e.getMessage()); e.printStackTrace(); return new plasmaWordIndexEntity[0]; } } private boolean deleteTransferIndexes(plasmaWordIndexEntity[] indexEntities) throws IOException { String wordhash; Enumeration urlEnum; plasmaWordIndexEntry indexEntry; plasmaWordIndexEntity indexEntity; String[] urlHashes; int sz; boolean success = true; for (int i = 0; i < indexEntities.length; i++) { if (indexEntities[i].isTMPEntity()) { // delete entries separately int c = 0; urlHashes = new String[indexEntities[i].size()]; urlEnum = indexEntities[i].elements(true); while (urlEnum.hasMoreElements()) { indexEntry = (plasmaWordIndexEntry) urlEnum.nextElement(); urlHashes[c++] = indexEntry.getUrlHash(); } wordIndex.removeEntries(indexEntities[i].wordHash(), urlHashes, true); indexEntity = wordIndex.getEntity(indexEntities[i].wordHash(), true); sz = indexEntity.size(); indexEntity.close(); log.logDebug("Deleted Partinal Index (" + c + " urls) for word " + indexEntities[i].wordHash() + "; " + sz + " entries left"); // DEBUG: now try to delete the remaining index. If this works, this routine is fine /* if (wordIndex.getEntity(indexEntities[i].wordHash()).deleteComplete()) System.out.println("DEBUG: trial delete of partial word index " + indexEntities[i].wordHash() + " SUCCESSFULL"); else System.out.println("DEBUG: trial delete of partial word index " + indexEntities[i].wordHash() + " FAILED"); */ // end debug indexEntities[i].close(); } else { // delete complete file if (indexEntities[i].deleteComplete()) { indexEntities[i].close(); } else { indexEntities[i].close(); // have another try... if (!(plasmaWordIndexEntity.wordHash2path(wordIndex.getRoot() /*PLASMADB*/, indexEntities[i].wordHash()).delete())) { success = false; log.logError("Could not delete whole Index for word " + indexEntities[i].wordHash()); } } } indexEntities[i] = null; } return success; } }