See More

#ifndef CPU_ONLY #include #endif #include #include #include #include #include #include #include #include #include #include "boost/thread.hpp" #include "caffe/caffe.hpp" #include "caffe/parallel.hpp" namespace caffe { enum Op { copy, replace_cpu, replace_gpu, replace_cpu_diff, replace_gpu_diff }; template static void apply_buffers(const vector*>& blobs, Dtype* buffer, size_t total_size, Op op) { Dtype* ptr = buffer; for (int i = 0; i < blobs.size(); ++i) { int size = blobs[i]->count(); switch (op) { case copy: { // Init buffer to current values of blobs caffe_copy(size, reinterpret_cast(blobs[i]->data()->cpu_data()), ptr); break; } case replace_cpu: blobs[i]->data()->set_cpu_data(ptr); break; case replace_gpu: blobs[i]->data()->set_gpu_data(ptr); break; case replace_cpu_diff: blobs[i]->diff()->set_cpu_data(ptr); break; case replace_gpu_diff: blobs[i]->diff()->set_gpu_data(ptr); break; } ptr += size; } // total_size is at least one byte CHECK_EQ(total_size, (ptr == buffer ? 1 : ptr - buffer)); } // Buffer size necessary to store given blobs template static size_t total_size(const vector*>& params) { size_t size = 0; for (int i = 0; i < params.size(); ++i) size += params[i]->count(); // Size have at least one byte, otherwise cudaMalloc fails if net has no // learnable parameters. return (size > 0) ? size : 1; } template Params::Params(shared_ptr > root_solver) : size_(total_size(root_solver->net()->learnable_params())), data_(), diff_() { } template GPUParams::GPUParams(shared_ptr > root_solver, int device) : Params(root_solver) { #ifndef CPU_ONLY int initial_device; CUDA_CHECK(cudaGetDevice(&initial_device)); // Allocate device buffers CUDA_CHECK(cudaSetDevice(device)); CUDA_CHECK(cudaMalloc(&data_, size_ * sizeof(Dtype))); // Copy blob values const vector*>& net = root_solver->net()->learnable_params(); apply_buffers(net, data_, size_, copy); CUDA_CHECK(cudaMalloc(&diff_, size_ * sizeof(Dtype))); caffe_gpu_set(size_, Dtype(0), diff_); CUDA_CHECK(cudaSetDevice(initial_device)); #else NO_GPU; #endif } template GPUParams::~GPUParams() { #ifndef CPU_ONLY CUDA_CHECK(cudaFree(data_)); CUDA_CHECK(cudaFree(diff_)); #endif } template void GPUParams::configure(Solver* solver) const { const vector*>& net = solver->net()->learnable_params(); apply_buffers(net, data_, size_, replace_gpu); apply_buffers(net, diff_, size_, replace_gpu_diff); } void DevicePair::compute(const vector devices, vector* pairs) { #ifndef CPU_ONLY vector remaining(devices); // Depth for reduction tree int remaining_depth = static_cast(ceil(log2(remaining.size()))); // Group GPUs by board for (int d = 0; d < remaining_depth; ++d) { for (int i = 0; i < remaining.size(); ++i) { for (int j = i + 1; j < remaining.size(); ++j) { cudaDeviceProp a, b; CUDA_CHECK(cudaGetDeviceProperties(&a, remaining[i])); CUDA_CHECK(cudaGetDeviceProperties(&b, remaining[j])); if (a.isMultiGpuBoard && b.isMultiGpuBoard) { if (a.multiGpuBoardGroupID == b.multiGpuBoardGroupID) { pairs->push_back(DevicePair(remaining[i], remaining[j])); DLOG(INFO) << "GPU board: " << remaining[i] << ":" << remaining[j]; remaining.erase(remaining.begin() + j); break; } } } } } ostringstream s; for (int i = 0; i < remaining.size(); ++i) { s << (i ? ", " : "") << remaining[i]; } DLOG(INFO) << "GPUs paired by boards, remaining: " << s.str(); // Group by P2P accessibility remaining_depth = ceil(log2(remaining.size())); for (int d = 0; d < remaining_depth; ++d) { for (int i = 0; i < remaining.size(); ++i) { for (int j = i + 1; j < remaining.size(); ++j) { int access; CUDA_CHECK( cudaDeviceCanAccessPeer(&access, remaining[i], remaining[j])); if (access) { pairs->push_back(DevicePair(remaining[i], remaining[j])); DLOG(INFO) << "P2P pair: " << remaining[i] << ":" << remaining[j]; remaining.erase(remaining.begin() + j); break; } } } } s.str(""); for (int i = 0; i < remaining.size(); ++i) { s << (i ? ", " : "") << remaining[i]; } DLOG(INFO) << "GPUs paired by P2P access, remaining: " << s.str(); // Group remaining remaining_depth = ceil(log2(remaining.size())); for (int d = 0; d < remaining_depth; ++d) { for (int i = 0; i < remaining.size(); ++i) { pairs->push_back(DevicePair(remaining[i], remaining[i + 1])); DLOG(INFO) << "Remaining pair: " << remaining[i] << ":" << remaining[i + 1]; remaining.erase(remaining.begin() + i + 1); } } // Should only be the parent node remaining CHECK_EQ(remaining.size(), 1); pairs->insert(pairs->begin(), DevicePair(-1, remaining[0])); CHECK(pairs->size() == devices.size()); for (int i = 0; i < pairs->size(); ++i) { CHECK((*pairs)[i].parent() != (*pairs)[i].device()); for (int j = i + 1; j < pairs->size(); ++j) { CHECK((*pairs)[i].device() != (*pairs)[j].device()); } } #else NO_GPU; #endif } // template P2PSync::P2PSync(shared_ptr > root_solver, P2PSync* parent, const SolverParameter& param) : GPUParams(root_solver, param.device_id()), parent_(parent), children_(), queue_(), initial_iter_(root_solver->iter()), solver_() { #ifndef CPU_ONLY int initial_device; CUDA_CHECK(cudaGetDevice(&initial_device)); const int self = param.device_id(); CUDA_CHECK(cudaSetDevice(self)); if (parent == NULL) { solver_ = root_solver; } else { Caffe::set_root_solver(false); solver_.reset(new WorkerSolver(param, root_solver.get())); Caffe::set_root_solver(true); } this->configure(solver_.get()); solver_->add_callback(this); if (parent) { // Enable p2p access between devices const int peer = parent->solver_->param().device_id(); int access; CUDA_CHECK(cudaDeviceCanAccessPeer(&access, self, peer)); if (access) { CUDA_CHECK(cudaDeviceEnablePeerAccess(peer, 0)); } else { LOG(INFO)<< "GPU " << self << " does not have p2p access to GPU " << peer; } // Allocate receiving buffer on parent CUDA_CHECK(cudaSetDevice(peer)); CUDA_CHECK(cudaMalloc(&parent_grads_, size_ * sizeof(Dtype))); CUDA_CHECK(cudaSetDevice(self)); } CUDA_CHECK(cudaSetDevice(initial_device)); #else NO_GPU; #endif } template P2PSync::~P2PSync() { #ifndef CPU_ONLY int initial_device; CUDA_CHECK(cudaGetDevice(&initial_device)); const int self = solver_->param().device_id(); CUDA_CHECK(cudaSetDevice(self)); if (parent_) { CUDA_CHECK(cudaFree(parent_grads_)); const int peer = parent_->solver_->param().device_id(); int access; CUDA_CHECK(cudaDeviceCanAccessPeer(&access, self, peer)); if (access) { CUDA_CHECK(cudaDeviceDisablePeerAccess(peer)); } } CUDA_CHECK(cudaSetDevice(initial_device)); #endif } template void P2PSync::InternalThreadEntry() { Caffe::SetDevice(solver_->param().device_id()); CHECK(Caffe::root_solver()); Caffe::set_root_solver(false); // See if there is a defined seed and reset random state if so if (solver_->param().random_seed() >= 0) { // Fetch random seed and modulate by device ID to make sure // everyone doesn't have the same seed. We seem to have some // solver instability if we have everyone with the same seed Caffe::set_random_seed( solver_->param().random_seed() + solver_->param().device_id()); } solver_->Step(solver_->param().max_iter() - initial_iter_); } template void P2PSync::on_start() { #ifndef CPU_ONLY #ifdef DEBUG int device; CUDA_CHECK(cudaGetDevice(&device)); CHECK(device == solver_->param().device_id()); #else // CHECK(false); #endif // Wait for update from parent if (parent_) { P2PSync *parent = queue_.pop(); CHECK(parent == parent_); } // Update children for (int i = children_.size() - 1; i >= 0; i--) { Dtype* src = data_; Dtype* dst = children_[i]->data_; #ifdef DEBUG cudaPointerAttributes attributes; CUDA_CHECK(cudaPointerGetAttributes(&attributes, src)); CHECK(attributes.device == device); CUDA_CHECK(cudaPointerGetAttributes(&attributes, dst)); CHECK(attributes.device == children_[i]->solver_->param().device_id()); #endif CUDA_CHECK(cudaMemcpyAsync(dst, src, size_ * sizeof(Dtype), cudaMemcpyDeviceToDevice, cudaStreamDefault)); CUDA_CHECK(cudaStreamSynchronize(cudaStreamDefault)); children_[i]->queue_.push(this); } #endif } template void P2PSync::on_gradients_ready() { #ifndef CPU_ONLY #ifdef DEBUG int device; CUDA_CHECK(cudaGetDevice(&device)); CHECK(device == solver_->param().device_id()); #endif // Sum children gradients as they appear in the queue for (int i = 0; i < children_.size(); ++i) { P2PSync *child = queue_.pop(); Dtype* src = child->parent_grads_; Dtype* dst = diff_; #ifdef DEBUG bool ok = false; for (int j = 0; j < children_.size(); ++j) { if (child == children_[j]) { ok = true; } } CHECK(ok); cudaPointerAttributes attributes; CUDA_CHECK(cudaPointerGetAttributes(&attributes, src)); CHECK(attributes.device == device); CUDA_CHECK(cudaPointerGetAttributes(&attributes, dst)); CHECK(attributes.device == device); #endif caffe_gpu_add(size_, src, dst, dst); } // Send gradients to parent if (parent_) { Dtype* src = diff_; Dtype* dst = parent_grads_; #ifdef DEBUG cudaPointerAttributes attributes; CUDA_CHECK(cudaPointerGetAttributes(&attributes, src)); CHECK(attributes.device == device); CUDA_CHECK(cudaPointerGetAttributes(&attributes, dst)); CHECK(attributes.device == parent_->solver_->param().device_id()); #endif CUDA_CHECK(cudaMemcpyAsync(dst, src, size_ * sizeof(Dtype), // cudaMemcpyDeviceToDevice, cudaStreamDefault)); CUDA_CHECK(cudaStreamSynchronize(cudaStreamDefault)); parent_->queue_.push(this); } else { // Loss functions divide gradients by the batch size, so to compensate // for split batch, the root solver divides by number of solvers. caffe_gpu_scal(size_, Dtype(1.0 / Caffe::solver_count()), diff_); } #endif } template void P2PSync::run(const vector& gpus) { // Pair devices for map-reduce synchronization vector pairs; DevicePair::compute(gpus, &pairs); ostringstream s; for (int i = 1; i < pairs.size(); ++i) { s << (i == 1 ? "" : ", ") << pairs[i].parent() << ":" << pairs[i].device(); } LOG(INFO)<< "GPUs pairs " << s.str(); SolverParameter param(solver_->param()); vector > > syncs(gpus.size()); // Build the GPU tree by finding the parent for each solver for (int attempts = 0; attempts < pairs.size(); ++attempts) { for (int i = 1; i < pairs.size(); ++i) { if (!syncs[i].get()) { P2PSync* parent = NULL; for (int j = 0; j < syncs.size(); ++j) { P2PSync* sync = j == 0 ? this : syncs[j].get(); if (sync) { const SolverParameter& p = sync->solver()->param(); if (p.device_id() == pairs[i].parent()) { parent = sync; } } } if (parent) { param.set_device_id(pairs[i].device()); syncs[i].reset(new P2PSync(solver_, parent, param)); parent->children_.push_back((P2PSync*) syncs[i].get()); } } } } LOG(INFO)<< "Starting Optimization"; for (int i = 1; i < syncs.size(); ++i) { syncs[i]->StartInternalThread(); } // Run root solver on current thread solver_->Solve(); for (int i = 1; i < syncs.size(); ++i) { syncs[i]->StopInternalThread(); } } INSTANTIATE_CLASS(Params); INSTANTIATE_CLASS(GPUParams); INSTANTIATE_CLASS(P2PSync); } // namespace caffe