See More

//////////////////////////////////////////////////////////////////////////////// // // // Copyright (C) 2016-17, goatpig. // // Distributed under the MIT license // // See LICENSE-MIT or https://opensource.org/licenses/MIT // // // //////////////////////////////////////////////////////////////////////////////// #include "BlockchainScanner.h" #include "log.h" //////////////////////////////////////////////////////////////////////////////// void BlockchainScanner::scan(int32_t scanFrom) { scanFrom = check_merkle(scanFrom); if (scanFrom == INT32_MIN) return; scan_nocheck(scanFrom); } //////////////////////////////////////////////////////////////////////////////// int32_t BlockchainScanner::check_merkle(int32_t scanFrom) { auto topBlock = blockchain_->top(); scrAddrFilter_->updateAddressMerkleInDB(); auto&& subsshSdbi = scrAddrFilter_->getSubSshSDBI(); shared_ptr sdbiblock; //check if we need to scan anything try { sdbiblock = blockchain_->getHeaderByHash(subsshSdbi.topScannedBlkHash_); } catch (...) { sdbiblock = blockchain_->getHeaderByHeight(0); } if (sdbiblock->isMainBranch()) { //this will set scanFrom to 0 before an initial scan if ((int)sdbiblock->getBlockHeight() > scanFrom) scanFrom = sdbiblock->getBlockHeight(); if (scanFrom > (int)topBlock->getBlockHeight() || scrAddrFilter_->getScrAddrMap()->size() == 0) { LOGINFO << "no history to scan"; topScannedBlockHash_ = topBlock->getThisHash(); return INT32_MIN; } } return scanFrom; } //////////////////////////////////////////////////////////////////////////////// void BlockchainScanner::scan_nocheck(int32_t scanFrom) { if (scanFrom > (int32_t)db_->blockchain()->top()->getBlockHeight()) return; TIMER_RESTART("scan_nocheck"); startAt_ = scanFrom; auto topBlock = blockchain_->top(); preloadUtxos(); auto scrRefMap = scrAddrFilter_->getOutScrRefMap(); //lambdas auto commitLambda = [this](void) { writeBlockData(); }; auto outputsLambda = [this](void) { processOutputs(); }; auto inputsLambda = [this](void) { processInputs(); }; //start threads auto commit_tID = thread(commitLambda); auto outputs_tID = thread(outputsLambda); auto inputs_tID = thread(inputsLambda); auto startHeight = scanFrom; unsigned endHeight = 0; vector> completedFutures; unsigned _count = 0; completedBatches_.store(0, memory_order_relaxed); //loop until there are no more blocks available try { unsigned firstBlockFileID = UINT32_MAX; unsigned targetBlockFileID = UINT32_MAX; while (startHeight <= topBlock->getBlockHeight()) { //figure out how many blocks to pull for this batch //batches try to grab up nBlockFilesPerBatch_ worth of block data unsigned targetHeight = 0; size_t targetSize = BATCH_SIZE; size_t tallySize; try { shared_ptr currentHeader = blockchain_->getHeaderByHeight(startHeight); firstBlockFileID = currentHeader->getBlockFileNum(); targetBlockFileID = 0; targetHeight = startHeight; tallySize = currentHeader->getBlockSize(); while (tallySize < targetSize) { currentHeader = blockchain_->getHeaderByHeight(++targetHeight); tallySize += currentHeader->getBlockSize(); if (currentHeader->getBlockFileNum() < firstBlockFileID) firstBlockFileID = currentHeader->getBlockFileNum(); if (currentHeader->getBlockFileNum() > targetBlockFileID) targetBlockFileID = currentHeader->getBlockFileNum(); } } catch (range_error& e) { //if getHeaderByHeight throws before targetHeight is topBlock's height, //something went wrong. Otherwise we just hit the end of the chain. if (targetHeight < topBlock->getBlockHeight()) { LOGERR << e.what(); throw e; } else { targetHeight = topBlock->getBlockHeight(); if (targetBlockFileID < topBlock->getBlockFileNum()) targetBlockFileID = topBlock->getBlockFileNum(); } } endHeight = targetHeight; //create batch auto&& batch = make_unique( startHeight, endHeight, firstBlockFileID, targetBlockFileID, scrRefMap); completedFutures.push_back(batch->completedPromise_.get_future()); batch->count_ = _count; //post for txout parsing outputQueue_.push_back(move(batch)); if (_count - completedBatches_.load(memory_order_relaxed) >= writeQueueDepth_) { try { auto futIter = completedFutures.begin() + (_count - writeQueueDepth_); futIter->wait(); } catch (future_error &e) { LOGERR << "future error"; throw e; } } ++_count; startHeight = endHeight + 1; } } catch (range_error&) { LOGERR << "failed to grab block data starting height: " << startHeight; if (startHeight == scanFrom) LOGERR << "no block data was scanned"; } catch (...) { LOGWARN << "scanning halted unexpectedly"; //let the scan terminate } //mark all queues complete outputQueue_.completed(); if (outputs_tID.joinable()) outputs_tID.join(); if (inputs_tID.joinable()) inputs_tID.join(); if (commit_tID.joinable()) commit_tID.join(); topScannedBlockHash_ = topBlock->getThisHash(); TIMER_STOP("scan_nocheck"); if (topBlock->getBlockHeight() - scanFrom > 100) { auto timeSpent = TIMER_READ_SEC("scan_nocheck"); LOGINFO << "scanned transaction history in " << timeSpent << "s"; } auto timeSpent = TIMER_READ_SEC("throttling"); if (timeSpent > 5) LOGINFO << "throttling for " << timeSpent << "s"; /*timeSpent = TIMER_READ_SEC("outputs"); LOGINFO << "outputs: " << timeSpent << "s"; timeSpent = TIMER_READ_SEC("inputs"); LOGINFO << "inputs: " << timeSpent << "s"; timeSpent = TIMER_READ_SEC("write"); LOGINFO << "write: " << timeSpent << "s"; timeSpent = TIMER_READ_SEC("preload"); LOGINFO << "preload: " << timeSpent << "s";*/ } //////////////////////////////////////////////////////////////////////////////// void BlockchainScanner::processOutputs() { TIMER_RESET("throttling"); TIMER_RESET("preload"); TIMER_RESET("outputs"); auto process_thread = [this](ParserBatch* batch)->void { this->processOutputsThread(batch); }; map> localFileMap; auto preloadBlockDataFiles = [&](ParserBatch* batch)->void { if (batch == nullptr) return; TIMER_START("preload"); auto file_id = batch->startBlockFileID_; while (file_id <= batch->targetBlockFileID_) { auto local_iter = localFileMap.find(file_id); if (local_iter != localFileMap.end()) { batch->fileMaps_.insert( make_pair(file_id, local_iter->second)); } else { batch->fileMaps_.insert( make_pair(file_id, blockDataLoader_.get(file_id))); } ++file_id; } localFileMap = batch->fileMaps_; TIMER_STOP("preload"); }; //init batch unique_ptr batch; while (1) { try { batch = move(outputQueue_.pop_front()); break; } catch (StopBlockingLoop&) {} } preloadBlockDataFiles(batch.get()); while (1) { //start processing threads vector thr_vec; for (unsigned i = 0; i < totalThreadCount_; i++) thr_vec.push_back(thread(process_thread, batch.get())); unique_ptr nextBatch; TIMER_START("throttling"); try { nextBatch = move(outputQueue_.pop_front()); } catch (StopBlockingLoop&) {} TIMER_STOP("throttling"); TIMER_START("outputs"); //populate the next batch's file map while the first //batch is being processed preloadBlockDataFiles(nextBatch.get()); //wait on threads for (auto& thr : thr_vec) { if (thr.joinable()) thr.join(); } //push first batch for input processing inputQueue_.push_back(move(batch)); //exit loop condition if (nextBatch == nullptr) { TIMER_STOP("outputs"); break; } //set batch for next iteration batch = move(nextBatch); TIMER_STOP("outputs"); } //done with processing ouputs, there won't be anymore batches to push //to the input queue, we can mark it complete inputQueue_.completed(); } //////////////////////////////////////////////////////////////////////////////// void BlockchainScanner::processInputs() { TIMER_RESET("inputs"); auto process_thread = [this](ParserBatch* batch)->void { this->processInputsThread(batch); }; while (1) { unique_ptr batch; try { batch = move(inputQueue_.pop_front()); } catch (StopBlockingLoop&) { //end condition break; } TIMER_START("inputs"); //reset counter batch->blockCounter_.store(batch->start_, memory_order_relaxed); //merge utxo map from batch with global one //this data needs copied because we still have use for the original map for (auto& hash_map : batch->outputMap_) { auto hash_iter = utxoMap_.find(hash_map.first); if (hash_iter == utxoMap_.end()) { utxoMap_.insert(hash_map); continue; } hash_iter->second.insert( hash_map.second.begin(), hash_map.second.end()); } //start processing threads vector thr_vec; for (unsigned i = 1; i < totalThreadCount_; i++) thr_vec.push_back(thread(process_thread, batch.get())); process_thread(batch.get()); //wait on threads for (auto& thr : thr_vec) { if (thr.joinable()) thr.join(); } //purge spent outputs from global map for (auto& spent_txout : batch->spentOutputs_) { auto hash_iter = utxoMap_.find(spent_txout.parentHash_); if (hash_iter == utxoMap_.end()) { LOGERR << "missing utxo"; continue; } auto utxo_iter = hash_iter->second.find(spent_txout.txOutIndex_); if (utxo_iter == hash_iter->second.end()) { LOGERR << "missing utxo"; continue; } hash_iter->second.erase(utxo_iter); if (hash_iter->second.size() == 0) utxoMap_.erase(hash_iter); } //push for commit commitQueue_.push_back(move(batch)); TIMER_STOP("inputs"); } //done with processing inputs, there won't be anymore batches to push //to the commit queue, we can mark it complete commitQueue_.completed(); } //////////////////////////////////////////////////////////////////////////////// shared_ptr BlockchainScanner::getBlockData( ParserBatch* batch, unsigned height) { //grab block file map auto blockheader = blockchain_->getHeaderByHeight(height); auto filenum = blockheader->getBlockFileNum(); auto mapIter = batch->fileMaps_.find(filenum); if (mapIter == batch->fileMaps_.end()) { LOGERR << "Missing file map for output scan, this is unexpected"; LOGERR << "Has the following block files:"; for (auto& file_pair : batch->fileMaps_) LOGERR << " --- #" << file_pair.first; LOGERR << "Was looking for id #" << filenum; throw runtime_error("missing file map"); } auto filemap = mapIter->second.get(); //find block and deserialize it auto getID = [blockheader](const BinaryData&)->unsigned int { return blockheader->getThisID(); }; auto bdata = make_shared(); bdata->deserialize( filemap->getPtr() + blockheader->getOffset(), blockheader->getBlockSize(), blockheader, getID, false, false); return bdata; } //////////////////////////////////////////////////////////////////////////////// void BlockchainScanner::processOutputsThread(ParserBatch* batch) { map> blockMap; map> outputMap; map> sshMap; while (1) { auto currentBlock = batch->blockCounter_.fetch_add(1, memory_order_relaxed); if (currentBlock > batch->end_) break; auto blockdata = getBlockData(batch, currentBlock); if (!blockdata->isInitialized()) { LOGERR << "Could not get block data for height #" << currentBlock; return; } blockMap.insert(make_pair(currentBlock, blockdata)); //TODO: flag isMultisig const auto header = blockdata->header(); //update processed height auto topHeight = header->getBlockHeight(); auto& txns = blockdata->getTxns(); for (unsigned i = 0; i < txns.size(); i++) { const BCTX& txn = *(txns[i].get()); for (unsigned y = 0; y < txn.txouts_.size(); y++) { auto& txout = txn.txouts_[y]; BinaryRefReader brr( txn.data_ + txout.first, txout.second); brr.advance(8); unsigned scriptSize = (unsigned)brr.get_var_int(); auto&& scrRef = BtcUtils::getTxOutScrAddrNoCopy( brr.get_BinaryDataRef(scriptSize)); auto saIter = batch->scriptRefMap_->find(scrRef); if (saIter == batch->scriptRefMap_->end()) continue; if (saIter->second >= (int)blockdata->header()->getBlockHeight()) continue; //if we got this far, this txout is ours //get tx hash auto& txHash = txn.getHash(); auto&& scrAddr = scrRef.getScrAddr(); //construct StoredTxOut StoredTxOut stxo; stxo.dataCopy_ = BinaryData( txn.data_ + txout.first, txout.second); stxo.parentHash_ = txHash; stxo.blockHeight_ = header->getBlockHeight(); stxo.duplicateID_ = header->getDuplicateID(); stxo.txIndex_ = i; stxo.txOutIndex_ = y; stxo.scrAddr_ = scrAddr; stxo.spentness_ = TXOUT_UNSPENT; stxo.parentTxOutCount_ = txn.txouts_.size(); stxo.isCoinbase_ = txn.isCoinbase_; auto value = stxo.getValue(); auto&& hgtx = DBUtils::heightAndDupToHgtx( stxo.blockHeight_, stxo.duplicateID_); auto&& txioKey = DBUtils::getBlkDataKeyNoPrefix( stxo.blockHeight_, stxo.duplicateID_, i, y); //update utxos_ auto& stxoHashMap = outputMap[txHash]; stxoHashMap.insert(make_pair(y, move(stxo))); //update ssh_ auto& ssh = sshMap[scrAddr]; auto& subssh = ssh[hgtx]; //deal with txio count in subssh at serialization TxIOPair txio; txio.setValue(value); txio.setTxOut(txioKey); txio.setFromCoinbase(txn.isCoinbase_); subssh.txioMap_.insert(make_pair(txioKey, move(txio))); } } } //grab batch mutex and merge processed data in unique_lock lock(batch->mergeMutex_); batch->blockMap_.insert(blockMap.begin(), blockMap.end()); batch->outputMap_.insert(outputMap.begin(), outputMap.end()); for (auto& ssh_pair : sshMap) { auto ssh_iter = batch->sshMap_.find(ssh_pair.first); if (ssh_iter == batch->sshMap_.end()) { batch->sshMap_.insert(move(ssh_pair)); continue; } for (auto& subssh_pair : ssh_pair.second) ssh_iter->second.insert(move(subssh_pair)); } } //////////////////////////////////////////////////////////////////////////////// void BlockchainScanner::processInputsThread(ParserBatch* batch) { map> sshMap; vector spentOutputs; while (1) { auto currentBlock = batch->blockCounter_.fetch_add(1, memory_order_relaxed); if (currentBlock > batch->end_) break; auto blockdata_iter = batch->blockMap_.find(currentBlock); if (blockdata_iter == batch->blockMap_.end()) { LOGERR << "can't find block #" << currentBlock << " in batch"; throw runtime_error("missing block"); } auto blockdata = blockdata_iter->second; const auto header = blockdata->header(); auto& txns = blockdata->getTxns(); for (unsigned i = 0; i < txns.size(); i++) { const BCTX& txn = *(txns[i].get()); for (unsigned y = 0; y < txn.txins_.size(); y++) { auto& txin = txn.txins_[y]; BinaryDataRef outHash( txn.data_ + txin.first, 32); auto utxoIter = utxoMap_.find(outHash); if (utxoIter == utxoMap_.end()) continue; unsigned txOutId = READ_UINT32_LE( txn.data_ + txin.first + 32); auto idIter = utxoIter->second.find(txOutId); if (idIter == utxoIter->second.end()) continue; //if we got this far, this txins consumes one of our utxos //create spent txout auto&& hgtx = DBUtils::getBlkDataKeyNoPrefix( header->getBlockHeight(), header->getDuplicateID()); auto&& txinkey = DBUtils::getBlkDataKeyNoPrefix( header->getBlockHeight(), header->getDuplicateID(), i, y); StoredTxOut stxo = idIter->second; stxo.spentness_ = TXOUT_SPENT; stxo.spentByTxInKey_ = txinkey; //set spenderHash and parentTxOutCount to count and hash tallying //of spent txouts stxo.spenderHash_ = txn.getHash(); stxo.parentTxOutCount_ = txn.txouts_.size(); //add to ssh_ auto& ssh = sshMap[stxo.getScrAddress()]; auto& subssh = ssh[hgtx]; //deal with txio count in subssh at serialization TxIOPair txio; auto&& txoutkey = stxo.getDBKey(false); txio.setTxOut(txoutkey); txio.setTxIn(txinkey); txio.setValue(stxo.getValue()); subssh.txioMap_[txoutkey] = move(txio); //add to spentTxOuts_ spentOutputs.push_back(move(stxo)); } } } //merge process data into batch unique_lock lock(batch->mergeMutex_); for (auto& ssh_pair : sshMap) { auto ssh_iter = batch->sshMap_.find(ssh_pair.first); if (ssh_iter == batch->sshMap_.end()) { batch->sshMap_.insert(move(ssh_pair)); continue; } for (auto& subssh_pair : ssh_pair.second) { auto txio_iter = ssh_iter->second.find(subssh_pair.first); if (txio_iter == ssh_iter->second.end()) { ssh_iter->second.insert(move(subssh_pair)); continue; } for (auto& txio_pair : subssh_pair.second.txioMap_) { txio_iter->second.txioMap_[txio_pair.first] = move(txio_pair.second); } } } batch->spentOutputs_.insert(batch->spentOutputs_.end(), spentOutputs.begin(), spentOutputs.end()); } //////////////////////////////////////////////////////////////////////////////// void BlockchainScanner::writeBlockData() { auto getGlobalOffsetForBlock = [&](unsigned height)->size_t { auto header = blockchain_->getHeaderByHeight(height); size_t val = header->getBlockFileNum(); val *= 128 * 1024 * 1024; val += header->getOffset(); return val; }; ProgressCalculator calc(getGlobalOffsetForBlock( blockchain_->top()->getBlockHeight())); auto initVal = getGlobalOffsetForBlock(startAt_); calc.init(initVal); if (reportProgress_) progress_(BDMPhase_Rescan, calc.fractionCompleted(), UINT32_MAX, initVal); auto writeHintsLambda = [&](ParserBatch* batch_ref)->void { processAndCommitTxHints(batch_ref); }; TIMER_RESET("write"); while (1) { unique_ptr batch; try { batch = move(commitQueue_.pop_front()); } catch (StopBlockingLoop&) { break; } TIMER_START("write"); //start txhint writer thread thread writeHintsThreadId = thread(writeHintsLambda, batch.get()); //sanity check if (batch->blockMap_.size() == 0) continue; //serialize data auto topheader = batch->blockMap_.rbegin()->second->getHeaderPtr(); if (topheader == nullptr) { LOGERR << "empty top block header ptr, aborting scan"; throw runtime_error("nullptr header"); } auto topHeight = topheader->getBlockHeight(); map serializedSubSSH; map serializedStxo; { for (auto& ssh : batch->sshMap_) { for (auto& subssh : ssh.second) { //TODO: modify subssh serialization to fit our needs BinaryWriter subsshkey; subsshkey.put_uint8_t(DB_PREFIX_SCRIPT); subsshkey.put_BinaryData(ssh.first); subsshkey.put_BinaryData(subssh.first); auto& bw = serializedSubSSH[subsshkey.getDataRef()]; subssh.second.serializeDBValue( bw, ARMORY_DB_BARE); } } for (auto& utxomap : batch->outputMap_) { for (auto& utxo : utxomap.second) { auto& bw = serializedStxo[utxo.second.getDBKey()]; utxo.second.serializeDBValue( bw, ARMORY_DB_BARE, true); } } } //we've serialized utxos, now let's do another pass for spent txouts //to make sure they overwrite utxos that were found and spent within //the same batch for (auto& stxo : batch->spentOutputs_) { auto& bw = serializedStxo[stxo.getDBKey()]; if (bw.getSize() > 0) bw.reset(); stxo.serializeDBValue( bw, ARMORY_DB_BARE, true); } //write data { //txouts auto&& tx = db_->beginTransaction(STXO, LMDB::ReadWrite); for (auto& stxo : serializedStxo) { //TODO: dont rewrite utxos, check if they are already in DB first db_->putValue(STXO, stxo.first.getRef(), stxo.second.getDataRef()); } } { //subssh auto&& tx = db_->beginTransaction(SUBSSH, LMDB::ReadWrite); for (auto& subssh : serializedSubSSH) { db_->putValue( SUBSSH, subssh.first.getRef(), subssh.second.getDataRef()); } //update SUBSSH sdbi auto&& sdbi = scrAddrFilter_->getSubSshSDBI(); sdbi.topBlkHgt_ = topheader->getBlockHeight(); sdbi.topScannedBlkHash_ = topheader->getThisHash(); scrAddrFilter_->putSubSshSDBI(sdbi); } //wait on writeHintsThreadId if (writeHintsThreadId.joinable()) writeHintsThreadId.join(); if (batch->start_ != batch->end_) { LOGINFO << "scanned from block #" << batch->start_ << " to #" << batch->end_; } else { LOGINFO << "scanned block #" << batch->start_; } size_t progVal = getGlobalOffsetForBlock(batch->end_); calc.advance(progVal); if (reportProgress_) progress_(BDMPhase_Rescan, calc.fractionCompleted(), calc.remainingSeconds(), progVal); topScannedBlockHash_ = topheader->getThisHash(); completedBatches_.fetch_add(1, memory_order_relaxed); batch->completedPromise_.set_value(true); TIMER_STOP("write"); } } //////////////////////////////////////////////////////////////////////////////// void BlockchainScanner::processAndCommitTxHints(ParserBatch* batch) { map txHints; map countAndHash; auto addTxHint = [&](StoredTxHints& stxh, const StoredTxOut& utxo)->void { auto&& utxokey = utxo.getDBKeyOfParentTx(false); //make sure key isn't already in there for (auto& key : stxh.dbKeyList_) { if (key == utxokey) return; } stxh.dbKeyList_.push_back(move(utxokey)); }; auto addTxHintMap = [&](const pair>& utxomap)->void { auto&& txHashPrefix = utxomap.first.getSliceCopy(0, 4); StoredTxHints& stxh = txHints[txHashPrefix]; //pull txHint from DB first, don't want to override //existing hints if (stxh.isNull()) db_->getStoredTxHints(stxh, txHashPrefix); for (auto& utxo : utxomap.second) { addTxHint(stxh, utxo.second); } stxh.preferredDBKey_ = stxh.dbKeyList_.front(); //count and hash auto& stxo = utxomap.second.begin()->second; auto& bw = countAndHash[stxo.getDBKeyOfParentTx(true)]; if (bw.getSize() != 0) return; bw.put_uint32_t(stxo.parentTxOutCount_); bw.put_BinaryData(utxomap.first); }; { auto&& hintdbtx = db_->beginTransaction(TXHINTS, LMDB::ReadOnly); for (auto& utxomap : batch->outputMap_) { addTxHintMap(utxomap); } map> spentTxOutMap; for (auto& stxo : batch->spentOutputs_) { auto& stxomap = spentTxOutMap[stxo.spenderHash_]; StoredTxOut spentstxo; spentstxo.parentHash_ = stxo.spenderHash_; spentstxo.blockHeight_ = DBUtils::hgtxToHeight(stxo.spentByTxInKey_.getSliceRef(0, 4)); spentstxo.duplicateID_ = DBUtils::hgtxToDupID(stxo.spentByTxInKey_.getSliceRef(0, 4)); spentstxo.txIndex_ = READ_UINT16_BE(stxo.spentByTxInKey_.getSliceRef(4, 2)); spentstxo.txOutIndex_ = READ_UINT16_BE(stxo.spentByTxInKey_.getSliceRef(6, 2)); spentstxo.parentTxOutCount_ = stxo.parentTxOutCount_; stxomap.insert(move( make_pair(spentstxo.txOutIndex_, move(spentstxo)))); } for (auto& stxomap : spentTxOutMap) addTxHintMap(stxomap); } map serializedHints; //serialize for (auto& txhint : txHints) { auto& bw = serializedHints[txhint.second.getDBKey()]; txhint.second.serializeDBValue(bw); } //write { auto&& hintdbtx = db_->beginTransaction(TXHINTS, LMDB::ReadWrite); for (auto& txhint : serializedHints) { db_->putValue(TXHINTS, txhint.first.getRef(), txhint.second.getDataRef()); } for (auto& cah : countAndHash) { db_->putValue(TXHINTS, cah.first.getRef(), cah.second.getDataRef()); } } } //////////////////////////////////////////////////////////////////////////////// void BlockchainScanner::updateSSH(bool force, int32_t startHeight) { //loop over all subssh entiers in SUBSSH db, //compile balance, txio count and summary map for each address //now also resolves unhinted tx hashes if (force) startHeight = 0; if (startHeight > (int32_t)db_->blockchain()->top()->getBlockHeight()) return; if (reportProgress_) progress_(BDMPhase_Balance, 0, 0, 0); StoredDBInfo sdbi = scrAddrFilter_->getSshSDBI(); { shared_ptr sdbiblock; try { sdbiblock = blockchain_->getHeaderByHash(sdbi.topScannedBlkHash_); } catch (...) { sdbiblock = blockchain_->getHeaderByHeight(0); } if (sdbiblock->isMainBranch()) { if (sdbi.topBlkHgt_ != 0 && sdbi.topBlkHgt_ >= blockchain_->top()->getBlockHeight()) { if (!force) { LOGINFO << "no SSH to scan"; return; } } } } bool resolveHashes = false; { //check for db mode against HEADERS db since it the only one that //doesn't change through rescans if (BlockDataManagerConfig::getDbType() == ARMORY_DB_FULL) resolveHashes = true; } set txnsToResolve; //process ssh, list missing hashes for hash resolver map sshMap; auto scrAddrMap = scrAddrFilter_->getScrAddrMap(); { auto sshTx = db_->beginTransaction(SUBSSH, LMDB::ReadOnly); auto sshIter = db_->getIterator(SUBSSH); sshIter->seekToStartsWith(DB_PREFIX_SCRIPT); auto getDupForHeight = [this](unsigned height)->uint8_t { return this->db_->getValidDupIDForHeight(height); }; auto scrAddrMapPtr = scrAddrFilter_->getScrAddrMap(); auto&& subsshparser_result = parseSubSsh( move(sshIter), startHeight, resolveHashes, getDupForHeight, scrAddrMapPtr, BinaryData()); //update SSH auto historyTx = db_->beginTransaction(SSH, LMDB::ReadOnly); for (auto& ssh : subsshparser_result.second) { auto& db_ssh = sshMap[ssh.first]; db_->getStoredScriptHistorySummary(db_ssh, ssh.first); if (db_ssh.isInitialized()) { db_ssh.totalUnspent_ += ssh.second.totalUnspent_; db_ssh.totalTxioCount_ += ssh.second.totalTxioCount_; db_ssh.subsshSummary_.insert( ssh.second.subsshSummary_.begin(), ssh.second.subsshSummary_.end()); } else { db_ssh = move(ssh.second); } } txnsToResolve = move(subsshparser_result.first); } //build txHash refs from listed txins if (resolveHashes && txnsToResolve.size() > 0) { set allMissingTxHashes; try { allMissingTxHashes = move(scrAddrFilter_->getMissingHashes()); } catch (runtime_error&) { //no missing hashes entry yet, move on } for (auto& txid : txnsToResolve) { //grab tx Tx tx; try { tx = move(db_->getFullTxCopy(txid)); } catch (exception&) { LOGERR << "failed to grab tx by key"; continue; } //build list of all referred hashes in txins auto txinCount = tx.getNumTxIn(); auto dataPtr = tx.getPtr(); for (auto i = 0; i < txinCount; i++) { auto offset = tx.getTxInOffset(i); BinaryDataRef bdr(dataPtr + offset, 32); //skip coinbase txns if (bdr == BtcUtils::EmptyHash_) continue; allMissingTxHashes.insert(bdr); } } scrAddrFilter_->putMissingHashes(allMissingTxHashes); } //write ssh data shared_ptr topheader; try { topheader = blockchain_->getHeaderByHash(topScannedBlockHash_); } catch (exception &e) { LOGERR << e.what(); throw e; } auto topheight = topheader->getBlockHeight(); auto&& putsshtx = db_->beginTransaction(SSH, LMDB::ReadWrite); for (auto& scrAddr : *scrAddrMap) { auto& ssh = sshMap[scrAddr.second->scrAddr_]; if (!ssh.isInitialized()) { ssh.uniqueKey_ = scrAddr.first; } BinaryData&& sshKey = ssh.getDBKey(); ssh.scanHeight_ = topheight; ssh.tallyHeight_ = topheight; BinaryWriter bw; ssh.serializeDBValue(bw, ARMORY_DB_BARE); db_->putValue(SSH, sshKey.getRef(), bw.getDataRef()); } //update sdbi sdbi.topScannedBlkHash_ = topScannedBlockHash_; sdbi.topBlkHgt_ = topheight; scrAddrFilter_->putSshSDBI(sdbi); } //////////////////////////////////////////////////////////////////////////////// void BlockchainScanner::preloadUtxos() { //TODO: check utxos pulled vs scraddrfilter (to reduce dataset for side scans) auto&& tx = db_->beginTransaction(STXO, LMDB::ReadOnly); auto dbIter = db_->getIterator(STXO); dbIter->seekToFirst(); while (dbIter->advanceAndRead()) { StoredTxOut stxo; stxo.unserializeDBKey(dbIter->getKeyRef()); stxo.unserializeDBValue(dbIter->getValueRef()); if (stxo.spentness_ == TXOUT_SPENT) continue; stxo.parentHash_ = move(db_->getTxHashForLdbKey( stxo.getDBKeyOfParentTx(false))); auto& idMap = utxoMap_[stxo.parentHash_]; idMap.insert(make_pair(stxo.txOutIndex_, move(stxo))); } } //////////////////////////////////////////////////////////////////////////////// void BlockchainScanner::undo(Blockchain::ReorganizationState& reorgState) { //dont undo subssh, these are skipped by dupID when loading history auto blockPtr = reorgState.prevTop_; map> fileMaps_; map> keysToDelete; map sshMap; set undoSpentness; //TODO: add spentness DB //TODO: sanity checks on header ptrs from reorgState if (reorgState.prevTop_->getBlockHeight() <= reorgState.reorgBranchPoint_->getBlockHeight()) throw runtime_error("invalid reorg state"); auto scrAddrMap = scrAddrFilter_->getScrAddrMap(); while (blockPtr != reorgState.reorgBranchPoint_) { int currentHeight = blockPtr->getBlockHeight(); auto currentDupId = blockPtr->getDuplicateID(); //create tx to pull subssh data auto&& sshTx = db_->beginTransaction(SUBSSH, LMDB::ReadOnly); //grab blocks from previous top until branch point if (blockPtr == nullptr) throw runtime_error("reorg failed while tracing back to " "branch point"); auto filenum = blockPtr->getBlockFileNum(); auto fileIter = fileMaps_.find(filenum); if (fileIter == fileMaps_.end()) { fileIter = fileMaps_.insert(make_pair( filenum, blockDataLoader_.get(filenum))).first; } auto filemap = fileIter->second; auto getID = [blockPtr] (const BinaryData&)->uint32_t {return blockPtr->getThisID(); }; BlockData bdata; bdata.deserialize(filemap.get()->getPtr() + blockPtr->getOffset(), blockPtr->getBlockSize(), blockPtr, getID, false, false); auto& txns = bdata.getTxns(); for (unsigned i = 0; i < txns.size(); i++) { auto& txn = txns[i]; //undo tx outs added by this block for (unsigned y = 0; y < txn->txouts_.size(); y++) { auto& txout = txn->txouts_[y]; BinaryRefReader brr( txn->data_ + txout.first, txout.second); brr.advance(8); unsigned scriptSize = (unsigned)brr.get_var_int(); auto&& scrAddr = BtcUtils::getTxOutScrAddr( brr.get_BinaryDataRef(scriptSize)); auto saIter = scrAddrMap->find(scrAddr); if (saIter == scrAddrMap->end()) continue; //update ssh value and txio count auto& ssh = sshMap[scrAddr]; if (!ssh.isInitialized()) db_->getStoredScriptHistorySummary(ssh, scrAddr); if (ssh.scanHeight_ < currentHeight) continue; brr.resetPosition(); uint64_t value = brr.get_uint64_t(); ssh.totalUnspent_ -= value; ssh.totalTxioCount_--; //mark stxo key for deletion auto&& txoutKey = DBUtils::getBlkDataKey( currentHeight, currentDupId, i, y); keysToDelete[STXO].insert(txoutKey); //decrement summary count at height, remove entry if necessary auto& sum = ssh.subsshSummary_[currentHeight]; sum--; if (sum <= 0) ssh.subsshSummary_.erase(currentHeight); } //undo spends from this block for (unsigned y = 0; y < txn->txins_.size(); y++) { auto& txin = txn->txins_[y]; BinaryDataRef outHash( txn->data_ + txin.first, 32); auto&& txKey = db_->getDBKeyForHash(outHash, currentDupId); if (txKey.getSize() != 6) continue; uint16_t txOutId = (uint16_t)READ_UINT32_LE( txn->data_ + txin.first + 32); txKey.append(WRITE_UINT16_BE(txOutId)); StoredTxOut stxo; if (!db_->getStoredTxOut(stxo, txKey)) continue; //update ssh value and txio count auto& scrAddr = stxo.getScrAddress(); auto& ssh = sshMap[scrAddr]; if (!ssh.isInitialized()) db_->getStoredScriptHistorySummary(ssh, scrAddr); if (ssh.scanHeight_ < currentHeight) continue; ssh.totalUnspent_ += stxo.getValue(); ssh.totalTxioCount_--; //mark txout key for undoing spentness undoSpentness.insert(txKey); //decrement summary count at height, remove entry if necessary auto& sum = ssh.subsshSummary_[currentHeight]; sum--; if (sum <= 0) ssh.subsshSummary_.erase(currentHeight); } } //set blockPtr to prev block try { blockPtr = blockchain_->getHeaderByHash(blockPtr->getPrevHashRef()); } catch (exception &e) { LOGERR << e.what(); throw e; } } //at this point we have a map of updated ssh, as well as a //set of keys to delete from the DB and spentness to undo by stxo key //stxo { auto&& tx = db_->beginTransaction(STXO, LMDB::ReadWrite); //grab stxos and revert spentness map stxos; for (auto& stxoKey : undoSpentness) { auto& stxo = stxos[stxoKey]; if (!db_->getStoredTxOut(stxo, stxoKey)) continue; stxo.spentByTxInKey_.clear(); stxo.spentness_ = TXOUT_UNSPENT; } //put updated stxos for (auto& stxo : stxos) { if (stxo.second.isInitialized()) db_->putStoredTxOut(stxo.second); } //delete invalidated stxos auto& stxoKeysToDelete = keysToDelete[STXO]; for (auto& key : stxoKeysToDelete) db_->deleteValue(STXO, key); } int branchPointHeight = reorgState.reorgBranchPoint_->getBlockHeight(); //ssh { auto&& tx = db_->beginTransaction(SSH, LMDB::ReadWrite); //go thourgh all ssh in scrAddrFilter for (auto& scrAddr : *scrAddrMap) { auto& ssh = sshMap[scrAddr.second->scrAddr_]; //if the ssh isn't in our map, pull it from DB if (!ssh.isInitialized()) { db_->getStoredScriptHistorySummary(ssh, scrAddr.first); if (ssh.uniqueKey_.getSize() == 0) { sshMap.erase(scrAddr.second->scrAddr_); continue; } } //update alreadyScannedUpToBlk_ to branch point height if (ssh.scanHeight_ > branchPointHeight) ssh.scanHeight_ = branchPointHeight; if (ssh.tallyHeight_ > branchPointHeight) ssh.tallyHeight_ = branchPointHeight; } //write it all up for (auto& ssh : sshMap) { auto saIter = scrAddrMap->find(ssh.second.uniqueKey_); if (saIter == scrAddrMap->end()) { LOGWARN << "invalid scrAddr during undo"; continue; } BinaryWriter bw; ssh.second.serializeDBValue(bw, ARMORY_DB_BARE); db_->putValue(SSH, ssh.second.getDBKey().getRef(), bw.getDataRef()); } //update SSH sdbi StoredDBInfo sdbi = scrAddrFilter_->getSshSDBI(); sdbi.topScannedBlkHash_ = reorgState.reorgBranchPoint_->getThisHash(); sdbi.topBlkHgt_ = branchPointHeight; scrAddrFilter_->putSshSDBI(sdbi); } } //////////////////////////////////////////////////////////////////////////////// void BlockchainScanner::getFilterHitsThread( const set& hashSet, atomic& counter, map>& resultMap) { map> localResults; { auto&& tx = db_->beginTransaction(TXFILTERS, LMDB::ReadOnly); while (1) { auto&& fileNum = counter.fetch_sub(1, memory_order_relaxed); if (fileNum < 0) break; try { auto&& pool = db_->getFilterPoolRefForFileNum(fileNum); for (auto& hash : hashSet) { auto&& blockKeys = pool.compare(hash); if (blockKeys.size() > 0) { auto& fileNumEntry = localResults[fileNum]; TxFilterResults filterResult; filterResult.hash_ = hash; filterResult.filterHits_ = move(blockKeys); fileNumEntry.insert(move(filterResult)); } } } catch (runtime_error&) { LOGWARN << "couldnt get filter pool for file: " << fileNum; continue; } } } //merge results unique_lock lock(resolverMutex_); resultMap.insert(localResults.begin(), localResults.end()); } //////////////////////////////////////////////////////////////////////////////// void BlockchainScanner::processFilterHitsThread( map>>& filtersResultMap, TransactionalSet& missingHashes, atomic& counter, map& results, function prog) { map result; uint32_t missedBlocks = 0; auto resolveHashes = [&](uint32_t fileNum, map> filterHit, set& hashSet)->void { auto fileptr = blockDataLoader_.get(fileNum); for (auto& blockkey : filterHit) { shared_ptr headerPtr; try { headerPtr = blockchain_->getHeaderById(blockkey.first); } catch (range_error&) { //no block for this id, this is an orphan missedBlocks++; continue; } auto& filterSet = blockkey.second; auto getID = [headerPtr](const BinaryData&)->unsigned int { return headerPtr->getThisID(); }; //search the block BlockData bdata; try { bdata.deserialize( fileptr->getPtr() + headerPtr->getOffset(), headerPtr->getBlockSize(), headerPtr, getID, false, false); } catch (BlockDeserializingException& e) { LOGERR << "Block deser error while processing tx filters: "; LOGERR << " " << e.what(); LOGERR << "Skipping this block"; continue; } auto txns = bdata.getTxns(); for (auto& filterhit : filterSet) { auto iditer = filterhit->filterHits_.find(blockkey.first); if (iditer == filterhit->filterHits_.end()) continue; auto& txids = iditer->second; for (auto& txid : txids) { if (txid >= txns.size()) continue; auto& txn = txns[txid]; auto& txnHash = txn->getHash(); auto hashIter = hashSet.begin(); while (hashIter != hashSet.end()) { if (txnHash == *hashIter) { auto&& countAndHash = WRITE_UINT32_LE(txids.size()); countAndHash.append(txnHash); result[countAndHash] = move( DBUtils::getBlkDataKeyNoPrefix( headerPtr->getBlockHeight(), headerPtr->getDuplicateID(), txid)); missingHashes.erase(txnHash); auto count = missingHashes.size(); prog(count); hashSet.erase(hashIter++); continue; } ++hashIter; } } } } }; vector blkFileNums; for (auto& hitsPair : filtersResultMap) blkFileNums.push_back(hitsPair.first); while (1) { auto&& index = counter.fetch_sub(1, memory_order_relaxed); if (index < 0) break; auto& fileNum = blkFileNums[index]; auto filterIter = filtersResultMap.find(fileNum); auto& blkHitsMap = filterIter->second; auto hashSet = missingHashes.get(); auto hitsPairIter = blkHitsMap.begin(); while (hitsPairIter != blkHitsMap.end()) { auto& txFilterSet = hitsPairIter->second; auto hitsIter = txFilterSet.begin(); while (hitsIter != txFilterSet.end()) { auto hashIter = hashSet->find((*hitsIter)->hash_); if (hashIter != hashSet->end()) { ++hitsIter; continue; } txFilterSet.erase(hitsIter++); } if (txFilterSet.size() > 0) { ++hitsPairIter; continue; } blkHitsMap.erase(hitsPairIter++); } if (blkHitsMap.size() == 0) continue; resolveHashes(filterIter->first, filterIter->second, *hashSet); } //merge results unique_lock lock(resolverMutex_); results.insert(result.begin(), result.end()); } //////////////////////////////////////////////////////////////////////////////// bool BlockchainScanner::resolveTxHashes() { /*** the missing hashes entry will always be empty if the db is not set to ARMORY_DB_FULL, no need to check dbType here ***/ TIMER_RESTART("resolveHashes"); if (reportProgress_) progress_(BDMPhase_SearchHashes, 0, UINT32_MAX, 0); set missingHashes; try { missingHashes = move(scrAddrFilter_->getMissingHashes()); } catch (runtime_error&) { //no missing hashes entry, return TIMER_STOP("resolveHashes"); return true; } if (missingHashes.size() == 0) return true; set resolvedHashes; auto originalMissingSet = missingHashes; auto hashIter = missingHashes.begin(); while (hashIter != missingHashes.end()) { auto&& dbkey = db_->getDBKeyForHash(*hashIter); if (dbkey.getSize() == 0) { ++hashIter; continue; } resolvedHashes.insert(*hashIter); missingHashes.erase(hashIter++); } TransactionalSet missingHashSet; map> relevantFilters; missingHashSet.insert(missingHashes); LOGINFO << "resolving txhashes"; //check filters atomic counter; counter.store((int)totalBlockFileCount_ - 1, memory_order_relaxed); vector filterThreads; map> resultMap; auto filterThr = [&](void)->void { getFilterHitsThread(missingHashes, counter, resultMap); }; for (unsigned i = 1; i < totalThreadCount_; i++) filterThreads.push_back(thread(filterThr)); filterThr(); for (auto& thr : filterThreads) { if (thr.joinable()) thr.join(); } set heights; map>> resultsByHash; unsigned missingIDs = 0; for (auto& fileNumPair : resultMap) { auto& block_result_pair = resultsByHash[fileNumPair.first]; for (auto& filterHits : fileNumPair.second) { for (auto& filterHit : filterHits.filterHits_) { try { auto header = blockchain_->getHeaderById(filterHit.first); auto height = header->getBlockHeight(); heights.insert(height); block_result_pair[filterHit.first].insert(&filterHits); } catch (...) { ++missingIDs; } } } } /*if (missingIDs > 0) { LOGINFO << missingIDs << " missing block IDs"; return false; }*/ LOGINFO << heights.size() << " blocks hit by tx filters"; //process filter hits counter.store(resultMap.size() - 1, memory_order_relaxed); map resolverResults; vector resolverThreads; auto hashCount = missingHashSet.size(); ProgressCalculator calc(hashCount); mutex progressMutex; uint64_t topCount = hashCount; auto resolveProgress = [&](size_t count)->void { if (!reportProgress_) return; unique_lock lock(progressMutex, defer_lock); if (!lock.try_lock()) return; if (count > topCount) return; topCount = count; auto intprog = hashCount - count; calc.advance(intprog); this->progress_(BDMPhase_ResolveHashes, calc.fractionCompleted(), calc.remainingSeconds(), count); }; auto resolverThr = [&](void)->void { processFilterHitsThread(resultsByHash, missingHashSet, counter, resolverResults, resolveProgress); }; if (reportProgress_) progress_(BDMPhase_ResolveHashes, 0, UINT32_MAX, 0); for (unsigned i = 1; i < totalThreadCount_; i++) resolverThreads.push_back(thread(resolverThr)); resolverThr(); for (auto& thr : resolverThreads) { if (thr.joinable()) thr.join(); } //write the resolved hashes { map txHints; map serializedHints; map countAndHash; { auto&& hintTx = db_->beginTransaction(TXHINTS, LMDB::ReadOnly); for (auto& result : resolverResults) { resolvedHashes.insert(result.first); //get hashPrefix auto hashPrefix = result.first.getSliceRef(4, 4); auto& hintObj = txHints[hashPrefix]; //pull hint if it's fresh if (hintObj.getNumHints() == 0) db_->getStoredTxHints(hintObj, hashPrefix); //append new key hintObj.dbKeyList_.push_back(result.second); //save hash and count under dbkey BinaryData dbkey; dbkey.append(WRITE_UINT8_LE(DB_PREFIX_TXDATA)); dbkey.append(result.second); auto& bw = countAndHash[dbkey]; bw.put_BinaryData(result.first); } } LOGINFO << "found " << resolverResults.size() << " missing hashes"; for (auto& hint : txHints) { BinaryData hintKey(1); hintKey.getPtr()[0] = DB_PREFIX_TXHINTS; hintKey.append(hint.first); auto& bw = serializedHints[hintKey]; hint.second.serializeDBValue(bw); } //write it { auto&& hintTx = db_->beginTransaction(TXHINTS, LMDB::ReadWrite); for (auto& toWrite : serializedHints) { db_->putValue(TXHINTS, toWrite.first.getRef(), toWrite.second.getDataRef()); } for (auto& toWrite : countAndHash) { db_->putValue(TXHINTS, toWrite.first.getRef(), toWrite.second.getDataRef()); } } } //clean up missing hashes in db missingHashes.clear(); for (auto& hash : originalMissingSet) { auto resolvedIter = resolvedHashes.find(hash); if (resolvedIter == resolvedHashes.end()) missingHashes.insert(hash); } scrAddrFilter_->putMissingHashes(missingHashes); TIMER_STOP("resolveHashes"); auto timeElapsed = TIMER_READ_SEC("resolveHashes"); LOGINFO << "Resolved missing hashes in " << timeElapsed << "s"; if (missingHashes.size() > 0) return false; return true; }