// Copyright Sebastian Jeckel 2014.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
#include "react/engine/ToposortEngine.h"
#include "tbb/parallel_for.h"
/***************************************/ REACT_IMPL_BEGIN /**************************************/
namespace toposort {
///////////////////////////////////////////////////////////////////////////////////////////////////
/// SeqTurn
///////////////////////////////////////////////////////////////////////////////////////////////////
SeqTurn::SeqTurn(TurnIdT id, TransactionFlagsT flags) :
TurnBase( id, flags )
{}
///////////////////////////////////////////////////////////////////////////////////////////////////
/// ParTurn
///////////////////////////////////////////////////////////////////////////////////////////////////
ParTurn::ParTurn(TurnIdT id, TransactionFlagsT flags) :
TurnBase( id, flags )
{}
///////////////////////////////////////////////////////////////////////////////////////////////////
/// EngineBase
///////////////////////////////////////////////////////////////////////////////////////////////////
template
void EngineBase::OnNodeAttach(TNode& node, TNode& parent)
{
parent.Successors.Add(node);
if (node.Level <= parent.Level)
node.Level = parent.Level + 1;
}
template
void EngineBase::OnNodeDetach(TNode& node, TNode& parent)
{
parent.Successors.Remove(node);
}
template
void EngineBase::OnInputChange(TNode& node, TTurn& turn)
{
processChildren(node, turn);
}
template
void EngineBase::OnNodePulse(TNode& node, TTurn& turn)
{
processChildren(node, turn);
}
// Explicit instantiation
template class EngineBase;
template class EngineBase;
///////////////////////////////////////////////////////////////////////////////////////////////////
/// SeqEngineBase
///////////////////////////////////////////////////////////////////////////////////////////////////
void SeqEngineBase::Propagate(SeqTurn& turn)
{
while (scheduledNodes_.FetchNext())
{
for (auto* curNode : scheduledNodes_.NextValues())
{
if (curNode->Level < curNode->NewLevel)
{
curNode->Level = curNode->NewLevel;
invalidateSuccessors(*curNode);
scheduledNodes_.Push(curNode);
continue;
}
curNode->Queued = false;
curNode->Tick(&turn);
}
}
}
void SeqEngineBase::OnDynamicNodeAttach(SeqNode& node, SeqNode& parent, SeqTurn& turn)
{
this->OnNodeAttach(node, parent);
invalidateSuccessors(node);
// Re-schedule this node
node.Queued = true;
scheduledNodes_.Push(&node);
}
void SeqEngineBase::OnDynamicNodeDetach(SeqNode& node, SeqNode& parent, SeqTurn& turn)
{
this->OnNodeDetach(node, parent);
}
void SeqEngineBase::processChildren(SeqNode& node, SeqTurn& turn)
{
// Add children to queue
for (auto* succ : node.Successors)
{
if (!succ->Queued)
{
succ->Queued = true;
scheduledNodes_.Push(succ);
}
}
}
void SeqEngineBase::invalidateSuccessors(SeqNode& node)
{
for (auto* succ : node.Successors)
{
if (succ->NewLevel <= node.Level)
succ->NewLevel = node.Level + 1;
}
}
///////////////////////////////////////////////////////////////////////////////////////////////////
/// ParEngineBase
///////////////////////////////////////////////////////////////////////////////////////////////////
void ParEngineBase::Propagate(ParTurn& turn)
{
while (topoQueue_.FetchNext())
{
//using RangeT = tbb::blocked_range::const_iterator>;
using RangeT = ParEngineBase::TopoQueueT::NextRangeT;
// Iterate all nodes of current level and start processing them in parallel
tbb::parallel_for(
topoQueue_.NextRange(),
[&] (const RangeT& range)
{
for (const auto& e : range)
{
auto* curNode = e.first;
if (curNode->Level < curNode->NewLevel)
{
curNode->Level = curNode->NewLevel;
invalidateSuccessors(*curNode);
topoQueue_.Push(curNode);
continue;
}
curNode->Collected = false;
// Tick -> if changed: OnNodePulse -> adds child nodes to the queue
curNode->Tick(&turn);
}
}
);
if (dynRequests_.size() > 0)
{
for (auto req : dynRequests_)
{
if (req.ShouldAttach)
applyDynamicAttach(*req.Node, *req.Parent, turn);
else
applyDynamicDetach(*req.Node, *req.Parent, turn);
}
dynRequests_.clear();
}
}
}
void ParEngineBase::OnDynamicNodeAttach(ParNode& node, ParNode& parent, ParTurn& turn)
{
DynRequestData data{ true, &node, &parent };
dynRequests_.push_back(data);
}
void ParEngineBase::OnDynamicNodeDetach(ParNode& node, ParNode& parent, ParTurn& turn)
{
DynRequestData data{ false, &node, &parent };
dynRequests_.push_back(data);
}
void ParEngineBase::applyDynamicAttach(ParNode& node, ParNode& parent, ParTurn& turn)
{
this->OnNodeAttach(node, parent);
invalidateSuccessors(node);
// Re-schedule this node
node.Collected = true;
topoQueue_.Push(&node);
}
void ParEngineBase::applyDynamicDetach(ParNode& node, ParNode& parent, ParTurn& turn)
{
this->OnNodeDetach(node, parent);
}
void ParEngineBase::processChildren(ParNode& node, ParTurn& turn)
{
// Add children to queue
for (auto* succ : node.Successors)
if (!succ->Collected.exchange(true, std::memory_order_relaxed))
topoQueue_.Push(succ);
}
void ParEngineBase::invalidateSuccessors(ParNode& node)
{
for (auto* succ : node.Successors)
{// succ->InvalidateMutex
ParNode::InvalidateMutexT::scoped_lock lock(succ->InvalidateMutex);
if (succ->NewLevel <= node.Level)
succ->NewLevel = node.Level + 1;
}// ~succ->InvalidateMutex
}
} // ~namespace toposort
/****************************************/ REACT_IMPL_END /***************************************/