#ifndef CPU_ONLY
#include
#endif
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "boost/thread.hpp"
#include "caffe/caffe.hpp"
#include "caffe/parallel.hpp"
namespace caffe {
enum Op {
copy,
replace_cpu,
replace_gpu,
replace_cpu_diff,
replace_gpu_diff
};
template
static void apply_buffers(const vector*>& blobs,
Dtype* buffer, size_t total_size, Op op) {
Dtype* ptr = buffer;
for (int i = 0; i < blobs.size(); ++i) {
int size = blobs[i]->count();
switch (op) {
case copy: {
// Init buffer to current values of blobs
caffe_copy(size,
reinterpret_cast(blobs[i]->data()->cpu_data()),
ptr);
break;
}
case replace_cpu:
blobs[i]->data()->set_cpu_data(ptr);
break;
case replace_gpu:
blobs[i]->data()->set_gpu_data(ptr);
break;
case replace_cpu_diff:
blobs[i]->diff()->set_cpu_data(ptr);
break;
case replace_gpu_diff:
blobs[i]->diff()->set_gpu_data(ptr);
break;
}
ptr += size;
}
// total_size is at least one byte
CHECK_EQ(total_size, (ptr == buffer ? 1 : ptr - buffer));
}
// Buffer size necessary to store given blobs
template
static size_t total_size(const vector*>& params) {
size_t size = 0;
for (int i = 0; i < params.size(); ++i)
size += params[i]->count();
// Size have at least one byte, otherwise cudaMalloc fails if net has no
// learnable parameters.
return (size > 0) ? size : 1;
}
template
Params::Params(shared_ptr > root_solver)
: size_(total_size(root_solver->net()->learnable_params())),
data_(),
diff_() {
}
template
GPUParams::GPUParams(shared_ptr > root_solver, int device)
: Params(root_solver) {
#ifndef CPU_ONLY
int initial_device;
CUDA_CHECK(cudaGetDevice(&initial_device));
// Allocate device buffers
CUDA_CHECK(cudaSetDevice(device));
CUDA_CHECK(cudaMalloc(&data_, size_ * sizeof(Dtype)));
// Copy blob values
const vector*>& net =
root_solver->net()->learnable_params();
apply_buffers(net, data_, size_, copy);
CUDA_CHECK(cudaMalloc(&diff_, size_ * sizeof(Dtype)));
caffe_gpu_set(size_, Dtype(0), diff_);
CUDA_CHECK(cudaSetDevice(initial_device));
#else
NO_GPU;
#endif
}
template
GPUParams::~GPUParams() {
#ifndef CPU_ONLY
CUDA_CHECK(cudaFree(data_));
CUDA_CHECK(cudaFree(diff_));
#endif
}
template
void GPUParams::configure(Solver* solver) const {
const vector*>& net =
solver->net()->learnable_params();
apply_buffers(net, data_, size_, replace_gpu);
apply_buffers(net, diff_, size_, replace_gpu_diff);
}
void DevicePair::compute(const vector devices, vector* pairs) {
#ifndef CPU_ONLY
vector remaining(devices);
// Depth for reduction tree
int remaining_depth = static_cast(ceil(log2(remaining.size())));
// Group GPUs by board
for (int d = 0; d < remaining_depth; ++d) {
for (int i = 0; i < remaining.size(); ++i) {
for (int j = i + 1; j < remaining.size(); ++j) {
cudaDeviceProp a, b;
CUDA_CHECK(cudaGetDeviceProperties(&a, remaining[i]));
CUDA_CHECK(cudaGetDeviceProperties(&b, remaining[j]));
if (a.isMultiGpuBoard && b.isMultiGpuBoard) {
if (a.multiGpuBoardGroupID == b.multiGpuBoardGroupID) {
pairs->push_back(DevicePair(remaining[i], remaining[j]));
DLOG(INFO) << "GPU board: " << remaining[i] << ":" << remaining[j];
remaining.erase(remaining.begin() + j);
break;
}
}
}
}
}
ostringstream s;
for (int i = 0; i < remaining.size(); ++i) {
s << (i ? ", " : "") << remaining[i];
}
DLOG(INFO) << "GPUs paired by boards, remaining: " << s.str();
// Group by P2P accessibility
remaining_depth = ceil(log2(remaining.size()));
for (int d = 0; d < remaining_depth; ++d) {
for (int i = 0; i < remaining.size(); ++i) {
for (int j = i + 1; j < remaining.size(); ++j) {
int access;
CUDA_CHECK(
cudaDeviceCanAccessPeer(&access, remaining[i], remaining[j]));
if (access) {
pairs->push_back(DevicePair(remaining[i], remaining[j]));
DLOG(INFO) << "P2P pair: " << remaining[i] << ":" << remaining[j];
remaining.erase(remaining.begin() + j);
break;
}
}
}
}
s.str("");
for (int i = 0; i < remaining.size(); ++i) {
s << (i ? ", " : "") << remaining[i];
}
DLOG(INFO) << "GPUs paired by P2P access, remaining: " << s.str();
// Group remaining
remaining_depth = ceil(log2(remaining.size()));
for (int d = 0; d < remaining_depth; ++d) {
for (int i = 0; i < remaining.size(); ++i) {
pairs->push_back(DevicePair(remaining[i], remaining[i + 1]));
DLOG(INFO) << "Remaining pair: " << remaining[i] << ":"
<< remaining[i + 1];
remaining.erase(remaining.begin() + i + 1);
}
}
// Should only be the parent node remaining
CHECK_EQ(remaining.size(), 1);
pairs->insert(pairs->begin(), DevicePair(-1, remaining[0]));
CHECK(pairs->size() == devices.size());
for (int i = 0; i < pairs->size(); ++i) {
CHECK((*pairs)[i].parent() != (*pairs)[i].device());
for (int j = i + 1; j < pairs->size(); ++j) {
CHECK((*pairs)[i].device() != (*pairs)[j].device());
}
}
#else
NO_GPU;
#endif
}
//
template
P2PSync::P2PSync(shared_ptr > root_solver,
P2PSync* parent, const SolverParameter& param)
: GPUParams(root_solver, param.device_id()),
parent_(parent),
children_(),
queue_(),
initial_iter_(root_solver->iter()),
solver_() {
#ifndef CPU_ONLY
int initial_device;
CUDA_CHECK(cudaGetDevice(&initial_device));
const int self = param.device_id();
CUDA_CHECK(cudaSetDevice(self));
if (parent == NULL) {
solver_ = root_solver;
} else {
Caffe::set_root_solver(false);
solver_.reset(new WorkerSolver(param, root_solver.get()));
Caffe::set_root_solver(true);
}
this->configure(solver_.get());
solver_->add_callback(this);
if (parent) {
// Enable p2p access between devices
const int peer = parent->solver_->param().device_id();
int access;
CUDA_CHECK(cudaDeviceCanAccessPeer(&access, self, peer));
if (access) {
CUDA_CHECK(cudaDeviceEnablePeerAccess(peer, 0));
} else {
LOG(INFO)<< "GPU " << self << " does not have p2p access to GPU " << peer;
}
// Allocate receiving buffer on parent
CUDA_CHECK(cudaSetDevice(peer));
CUDA_CHECK(cudaMalloc(&parent_grads_, size_ * sizeof(Dtype)));
CUDA_CHECK(cudaSetDevice(self));
}
CUDA_CHECK(cudaSetDevice(initial_device));
#else
NO_GPU;
#endif
}
template
P2PSync::~P2PSync() {
#ifndef CPU_ONLY
int initial_device;
CUDA_CHECK(cudaGetDevice(&initial_device));
const int self = solver_->param().device_id();
CUDA_CHECK(cudaSetDevice(self));
if (parent_) {
CUDA_CHECK(cudaFree(parent_grads_));
const int peer = parent_->solver_->param().device_id();
int access;
CUDA_CHECK(cudaDeviceCanAccessPeer(&access, self, peer));
if (access) {
CUDA_CHECK(cudaDeviceDisablePeerAccess(peer));
}
}
CUDA_CHECK(cudaSetDevice(initial_device));
#endif
}
template
void P2PSync::InternalThreadEntry() {
Caffe::SetDevice(solver_->param().device_id());
CHECK(Caffe::root_solver());
Caffe::set_root_solver(false);
// See if there is a defined seed and reset random state if so
if (solver_->param().random_seed() >= 0) {
// Fetch random seed and modulate by device ID to make sure
// everyone doesn't have the same seed. We seem to have some
// solver instability if we have everyone with the same seed
Caffe::set_random_seed(
solver_->param().random_seed() + solver_->param().device_id());
}
solver_->Step(solver_->param().max_iter() - initial_iter_);
}
template
void P2PSync::on_start() {
#ifndef CPU_ONLY
#ifdef DEBUG
int device;
CUDA_CHECK(cudaGetDevice(&device));
CHECK(device == solver_->param().device_id());
#else
// CHECK(false);
#endif
// Wait for update from parent
if (parent_) {
P2PSync *parent = queue_.pop();
CHECK(parent == parent_);
}
// Update children
for (int i = children_.size() - 1; i >= 0; i--) {
Dtype* src = data_;
Dtype* dst = children_[i]->data_;
#ifdef DEBUG
cudaPointerAttributes attributes;
CUDA_CHECK(cudaPointerGetAttributes(&attributes, src));
CHECK(attributes.device == device);
CUDA_CHECK(cudaPointerGetAttributes(&attributes, dst));
CHECK(attributes.device == children_[i]->solver_->param().device_id());
#endif
CUDA_CHECK(cudaMemcpyAsync(dst, src, size_ * sizeof(Dtype),
cudaMemcpyDeviceToDevice, cudaStreamDefault));
CUDA_CHECK(cudaStreamSynchronize(cudaStreamDefault));
children_[i]->queue_.push(this);
}
#endif
}
template
void P2PSync::on_gradients_ready() {
#ifndef CPU_ONLY
#ifdef DEBUG
int device;
CUDA_CHECK(cudaGetDevice(&device));
CHECK(device == solver_->param().device_id());
#endif
// Sum children gradients as they appear in the queue
for (int i = 0; i < children_.size(); ++i) {
P2PSync *child = queue_.pop();
Dtype* src = child->parent_grads_;
Dtype* dst = diff_;
#ifdef DEBUG
bool ok = false;
for (int j = 0; j < children_.size(); ++j) {
if (child == children_[j]) {
ok = true;
}
}
CHECK(ok);
cudaPointerAttributes attributes;
CUDA_CHECK(cudaPointerGetAttributes(&attributes, src));
CHECK(attributes.device == device);
CUDA_CHECK(cudaPointerGetAttributes(&attributes, dst));
CHECK(attributes.device == device);
#endif
caffe_gpu_add(size_, src, dst, dst);
}
// Send gradients to parent
if (parent_) {
Dtype* src = diff_;
Dtype* dst = parent_grads_;
#ifdef DEBUG
cudaPointerAttributes attributes;
CUDA_CHECK(cudaPointerGetAttributes(&attributes, src));
CHECK(attributes.device == device);
CUDA_CHECK(cudaPointerGetAttributes(&attributes, dst));
CHECK(attributes.device == parent_->solver_->param().device_id());
#endif
CUDA_CHECK(cudaMemcpyAsync(dst, src, size_ * sizeof(Dtype), //
cudaMemcpyDeviceToDevice, cudaStreamDefault));
CUDA_CHECK(cudaStreamSynchronize(cudaStreamDefault));
parent_->queue_.push(this);
} else {
// Loss functions divide gradients by the batch size, so to compensate
// for split batch, the root solver divides by number of solvers.
caffe_gpu_scal(size_, Dtype(1.0 / Caffe::solver_count()), diff_);
}
#endif
}
template
void P2PSync::run(const vector& gpus) {
// Pair devices for map-reduce synchronization
vector pairs;
DevicePair::compute(gpus, &pairs);
ostringstream s;
for (int i = 1; i < pairs.size(); ++i) {
s << (i == 1 ? "" : ", ") << pairs[i].parent() << ":" << pairs[i].device();
}
LOG(INFO)<< "GPUs pairs " << s.str();
SolverParameter param(solver_->param());
vector > > syncs(gpus.size());
// Build the GPU tree by finding the parent for each solver
for (int attempts = 0; attempts < pairs.size(); ++attempts) {
for (int i = 1; i < pairs.size(); ++i) {
if (!syncs[i].get()) {
P2PSync* parent = NULL;
for (int j = 0; j < syncs.size(); ++j) {
P2PSync* sync = j == 0 ? this : syncs[j].get();
if (sync) {
const SolverParameter& p = sync->solver()->param();
if (p.device_id() == pairs[i].parent()) {
parent = sync;
}
}
}
if (parent) {
param.set_device_id(pairs[i].device());
syncs[i].reset(new P2PSync(solver_, parent, param));
parent->children_.push_back((P2PSync*) syncs[i].get());
}
}
}
}
LOG(INFO)<< "Starting Optimization";
for (int i = 1; i < syncs.size(); ++i) {
syncs[i]->StartInternalThread();
}
// Run root solver on current thread
solver_->Solve();
for (int i = 1; i < syncs.size(); ++i) {
syncs[i]->StopInternalThread();
}
}
INSTANTIATE_CLASS(Params);
INSTANTIATE_CLASS(GPUParams);
INSTANTIATE_CLASS(P2PSync);
} // namespace caffe