diff --git a/NeoML/Python/src/PyDnnDistributed.cpp b/NeoML/Python/src/PyDnnDistributed.cpp index 95d72f5fb..16b56cbe9 100644 --- a/NeoML/Python/src/PyDnnDistributed.cpp +++ b/NeoML/Python/src/PyDnnDistributed.cpp @@ -1,4 +1,5 @@ -/* Copyright © 2017-2023 ABBYY +/* Copyright © 2017-2024 ABBYY + Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at diff --git a/NeoML/Python/src/PyDnnDistributed.h b/NeoML/Python/src/PyDnnDistributed.h index 725440687..2d5a90973 100644 --- a/NeoML/Python/src/PyDnnDistributed.h +++ b/NeoML/Python/src/PyDnnDistributed.h @@ -1,4 +1,5 @@ -/* Copyright © 2017-2023 ABBYY +/* Copyright © 2017-2024 ABBYY + Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at @@ -20,7 +21,7 @@ limitations under the License. class CPyDistributedDataset : public IDistributedDataset { public: - CPyDistributedDataset( const py::object& data ) : getData( data ) {}; + CPyDistributedDataset( const py::object& data ) : getData( data ) {} int SetInputBatch( CDnn& dnn, int thread ) override; private: py::object getData; @@ -29,13 +30,14 @@ class CPyDistributedDataset : public IDistributedDataset { class CPyDistributedTraining : public CDistributedTraining { public: CPyDistributedTraining( CDnn& dnn, int count, TDistributedInitializer initializer, int seed ) - : CDistributedTraining( dnn, count, initializer, seed ) {}; + : CDistributedTraining( dnn, count, initializer, seed ) {} CPyDistributedTraining( CArchive& archive, int count, TDistributedInitializer initializer, int seed ) - : CDistributedTraining( archive, count, initializer, seed ) {}; + : CDistributedTraining( archive, count, initializer, seed ) {} CPyDistributedTraining( CDnn& dnn, const CArray& cudaDevs, TDistributedInitializer initializer, int seed ) - : CDistributedTraining( dnn, cudaDevs, initializer, seed ) {}; + : CDistributedTraining( dnn, cudaDevs, initializer, seed ) {} CPyDistributedTraining( CArchive& archive, const CArray& cudaDevs, TDistributedInitializer initializer, int seed ) - : CDistributedTraining( archive, cudaDevs, initializer, seed ) {}; + : CDistributedTraining( archive, cudaDevs, initializer, seed ) {} + void Run( const py::object& data ); void RunAndBackward( const py::object& data ); void Learn( const py::object& data ); @@ -46,4 +48,4 @@ class CPyDistributedTraining : public CDistributedTraining { void Save( const std::string& path ); }; -void InitializeDistributedTraining(py::module& m); +void InitializeDistributedTraining( py::module& m ); diff --git a/NeoML/include/NeoML/Dnn/DnnDistributed.h b/NeoML/include/NeoML/Dnn/DnnDistributed.h index 52f6c4f14..b135001a9 100644 --- a/NeoML/include/NeoML/Dnn/DnnDistributed.h +++ b/NeoML/include/NeoML/Dnn/DnnDistributed.h @@ -63,7 +63,7 @@ class NEOML_API CDistributedTraining { virtual ~CDistributedTraining(); // Gets the number of models in disitrbuted traning - int GetModelCount() const { return cnns.Size(); } + int GetModelCount() const { return threadPool->Size(); } // Sets the solver for all of the models void SetSolver( CArchive& archive ); // Sets the learning rate for all of the models @@ -101,11 +101,13 @@ class NEOML_API CDistributedTraining { private: struct CThreadParams; + // Run neural networks passes types + enum class TRunType { Invalid, RunOnce, RunBackwardOnce, Train }; - // Either multi-threads on a CPU or multi-devices GPU - const bool isCpu; // If multi-threads on a CPU, it is an operator of worker threads - IThreadPool* const threadPool; + CPtrOwner threadPool; + // Params to transfer to all threads function + CPtrOwner threadParams; // Separate mathEngine for each thread or device both for CPU and GPU training // Cannot use CPointerArray, as CreateDistributedCpuMathEngines requires a raw array to initialize engines CArray mathEngines; @@ -113,15 +115,12 @@ class NEOML_API CDistributedTraining { CPointerArray rands; // Separate dnn for each thread CPointerArray cnns; - // Separate `batchSize` for each dnn (may be empty) in a thread - CArray batchSize; - // `Train()` cannot be called if it `isFirstRun` - // `batchSize` may not be equal 0, if it `isFirstRun` for `RunOnce`, `RunAndBackwardOnce` or `RunAndLearnOnce`. - bool isFirstRun = true; - // Containers for errors if it happened - CArray errorMessages; - - void initialize( CArchive& archive, int count, TDistributedInitializer initializer, int seed ); + + void initialize( CArchive& archive, int count, + TDistributedInitializer initializer, int seed, size_t memoryLimit, const int* cudaDevs = nullptr ); + void serializeDnn( CDnn& dnn, int count, + TDistributedInitializer initializer, int seed, size_t memoryLimit, const int* cudaDevs = nullptr ); + void run( IDistributedDataset*, TRunType ); friend class CLoraSerializer; }; diff --git a/NeoML/src/Dnn/DnnDistributed.cpp b/NeoML/src/Dnn/DnnDistributed.cpp index edc51f693..3b98d4b71 100644 --- a/NeoML/src/Dnn/DnnDistributed.cpp +++ b/NeoML/src/Dnn/DnnDistributed.cpp @@ -161,36 +161,50 @@ static CPtr createInitializer( TDistributedInitializer type, CR //--------------------------------------------------------------------------------------------------------------------- +// Params to transfer to all threads function struct CDistributedTraining::CThreadParams final { - bool* const IsFirstRun; - IDistributedDataset* const Data; - CPointerArray& Dnns; - CArray& BatchSize; - const bool IsCpu; - CArray& ErrorMessages; - bool IsErrorHappened = false; - int TotalBatch = 0; - - // RunOnce and RunAndBackwardOnce - CThreadParams( bool* isFirstRun, IDistributedDataset* data, CPointerArray& dnns, - CArray& batchSize, bool isCpu, CArray& errorMessages ) : - IsFirstRun( isFirstRun ), - Data( data ), - Dnns( dnns ), - BatchSize( batchSize ), - IsCpu( isCpu ), - ErrorMessages( errorMessages ) - {} - - // solver.Train - CThreadParams( CPointerArray& dnns, - CArray& batchSize, int totalBatch, bool isCpu, CArray& errorMessages ) : - CThreadParams( nullptr, nullptr, dnns, batchSize, isCpu, errorMessages ) - { TotalBatch = totalBatch; } + // `Train()` cannot be called if it `isFirstRun` + // `batchSize` may not be equal 0, if it `isFirstRun` for `RunOnce`, `RunAndBackwardOnce` or `RunAndLearnOnce`. + bool IsFirstRun = true; + + IDistributedDataset* Data = nullptr; // Pointer to data for the inference for all dnns + const bool IsCpu; // Either multi-threads on a CPU or multi-devices GPU + CPointerArray& Dnns; // Separate dnn for each thread + CArray BatchSize; // Separate batch size for each dnn (may be empty) in a thread + CArray ErrorMessages; // Containers for errors if it happened + bool IsErrorHappened = false; // Flag indicates the error happened + TRunType Type; // The type of multi-threaded operation performed + int TotalBatch = 0; // The sum of all BatchSizes + CThreadParams( bool isCpu, CPointerArray& dnns ); + + void SetData( IDistributedDataset* data = nullptr, TRunType type = TRunType::Invalid ); void SetErrorMessage( int threadIndex, CString message ); + void CallRun( int threadIndex ); }; +CDistributedTraining::CThreadParams::CThreadParams( bool isCpu, CPointerArray& dnns ) : + IsCpu( isCpu ), + Dnns( dnns ) +{ + BatchSize.Add( 0, Dnns.Size() ); + ErrorMessages.Add( {}, Dnns.Size() ); +} + +void CDistributedTraining::CThreadParams::SetData( IDistributedDataset* data, TRunType type ) +{ + IsErrorHappened = false; + Type = type; + Data = data; + TotalBatch = 0; + if( type == TRunType::Train ) { + NeoAssert( !IsFirstRun ); + for( int i = 0; i < BatchSize.Size(); ++i ) { + TotalBatch += BatchSize[i]; + } + } +} + void CDistributedTraining::CThreadParams::SetErrorMessage( int threadIndex, CString message ) { IsErrorHappened = true; @@ -202,14 +216,50 @@ void CDistributedTraining::CThreadParams::SetErrorMessage( int threadIndex, CStr // For dnn.RunOnce or dnn.RunBackwardOnce other threads will not stop } +void CDistributedTraining::CThreadParams::CallRun( int threadIndex ) +{ + CThreadGroupSwitcher groupSwitcher( IsCpu, threadIndex, Dnns.Size() ); + + if( Type == TRunType::Train ) { + const float distributedCoeff = BatchSize[threadIndex] * Dnns.Size() / static_cast( TotalBatch ); + Dnns[threadIndex]->GetSolver()->Train( distributedCoeff ); + BatchSize[threadIndex] = 0; + } else { + const int currBatchSize = Data->SetInputBatch( *Dnns[threadIndex], threadIndex ); + NeoAssert( currBatchSize > 0 || ( currBatchSize == 0 && !IsFirstRun ) ); + if( currBatchSize > 0 ) { + BatchSize[threadIndex] += currBatchSize; + switch( Type ) { + case TRunType::RunOnce: + Dnns[threadIndex]->RunOnce(); + break; + case TRunType::RunBackwardOnce: + Dnns[threadIndex]->RunAndBackwardOnce(); + break; + default: + NeoAssert( false ); + } + } + IsFirstRun = false; + } +} + //--------------------------------------------------------------------------------------------------------------------- -void CDistributedTraining::initialize( CArchive& archive, int count, TDistributedInitializer initializer, int seed ) +void CDistributedTraining::initialize( CArchive& archive, int count, + TDistributedInitializer initializer, int seed, size_t memoryLimit, const int* cudaDevs ) { NeoAssert( archive.IsLoading() ); + mathEngines.SetSize( count ); + if( cudaDevs == nullptr ) { + initThreadGroupInfo(); + CreateDistributedCpuMathEngines( mathEngines.GetPtr(), count, memoryLimit ); + } else { + CreateDistributedCudaMathEngines( mathEngines.GetPtr(), count, cudaDevs, memoryLimit ); + } rands.SetBufferSize( count ); cnns.SetBufferSize( count ); - for( int i = 0; i < count; i++ ){ + for( int i = 0; i < count; ++i ) { rands.Add( new CRandom( seed ) ); cnns.Add( new CDnn( *rands[i], *mathEngines[i] ) ); cnns[i]->SetInitializer( createInitializer( initializer, *rands[i] ) ); @@ -217,35 +267,25 @@ void CDistributedTraining::initialize( CArchive& archive, int count, TDistribute archive.Serialize( *cnns[i] ); archive.Seek( 0, static_cast( 0 ) ); } - batchSize.Add( 0, count ); - errorMessages.Add( {}, count ); } -CDistributedTraining::CDistributedTraining( const CDnn& dnn, int count, - TDistributedInitializer initializer, int seed, size_t memoryLimit ) : - isCpu( true ), - threadPool( CreateThreadPool( count ) ) +void CDistributedTraining::serializeDnn( CDnn& dnn, int count, + TDistributedInitializer initializer, int seed, size_t memoryLimit, const int* cudaDevs ) { - // if count was <= 0 the pool has been initialized with the number of available CPU cores - count = threadPool->Size(); - - initThreadGroupInfo(); - mathEngines.SetSize( count ); - CreateDistributedCpuMathEngines( mathEngines.GetPtr(), count, memoryLimit ); CMemoryFile file; CArchive archive( &file, CArchive::SD_Storing ); - const_cast( dnn ).Serialize( archive ); + dnn.Serialize( archive ); archive.Close(); file.SeekToBegin(); archive.Open( &file, CArchive::SD_Loading ); - initialize( archive, count, initializer, seed ); + initialize( archive, count, initializer, seed, memoryLimit, cudaDevs ); archive.Close(); file.SeekToBegin(); archive.Open( &file, CArchive::SD_Storing ); - CPtr solver = const_cast( dnn ).GetSolver(); - SerializeSolver( archive, const_cast( dnn ), solver ); + CPtr solver = dnn.GetSolver(); + SerializeSolver( archive, dnn, solver ); archive.Close(); file.SeekToBegin(); @@ -253,65 +293,46 @@ CDistributedTraining::CDistributedTraining( const CDnn& dnn, int count, SetSolver( archive ); } -CDistributedTraining::CDistributedTraining( CArchive& archive, int count, +CDistributedTraining::CDistributedTraining( const CDnn& dnn, int count, TDistributedInitializer initializer, int seed, size_t memoryLimit ) : - isCpu( true ), threadPool( CreateThreadPool( count ) ) { - // if count was <= 0 the pool has been initialized with the number of available CPU cores - count = threadPool->Size(); + // if count was <= 0 the threadPool->Size() has been initialized with the number of available CPU cores + serializeDnn( const_cast( dnn ), threadPool->Size(), initializer, seed, memoryLimit ); + threadParams = new CThreadParams( /*isCpu*/true, cnns ); +} - initThreadGroupInfo(); - mathEngines.SetSize( count ); - CreateDistributedCpuMathEngines( mathEngines.GetPtr(), count, memoryLimit ); - initialize( archive, count, initializer, seed ); +CDistributedTraining::CDistributedTraining( CArchive& archive, int count, + TDistributedInitializer initializer, int seed, size_t memoryLimit ) : + threadPool( CreateThreadPool( count ) ) +{ + // if count was <= 0 the threadPool->Size() has been initialized with the number of available CPU cores + initialize( archive, threadPool->Size(), initializer, seed, memoryLimit ); + threadParams = new CThreadParams( /*isCpu*/true, cnns ); } CDistributedTraining::CDistributedTraining( const CDnn& dnn, const CArray& cudaDevs, TDistributedInitializer initializer, int seed, size_t memoryLimit ) : - isCpu( false ), - threadPool( CreateThreadPool(cudaDevs.Size()) ) + threadPool( CreateThreadPool( cudaDevs.Size() ) ) { - mathEngines.SetSize( cudaDevs.Size() ); - CreateDistributedCudaMathEngines( mathEngines.GetPtr(), cudaDevs.Size(), cudaDevs.GetPtr(), memoryLimit ); - CMemoryFile file; - CArchive archive( &file, CArchive::SD_Storing ); - const_cast( dnn ).Serialize( archive ); - archive.Close(); - file.SeekToBegin(); - - archive.Open( &file, CArchive::SD_Loading ); - initialize( archive, cudaDevs.Size(), initializer, seed ); - archive.Close(); - file.SeekToBegin(); - - archive.Open( &file, CArchive::SD_Storing ); - CPtr solver = const_cast( dnn ).GetSolver(); - SerializeSolver( archive, const_cast( dnn ), solver ); - archive.Close(); - file.SeekToBegin(); - - archive.Open( &file, CArchive::SD_Loading ); - SetSolver( archive ); + serializeDnn( const_cast( dnn ), cudaDevs.Size(), initializer, seed, memoryLimit, cudaDevs.GetPtr() ); + threadParams = new CThreadParams( /*isCpu*/false, cnns ); } CDistributedTraining::CDistributedTraining( CArchive& archive, const CArray& cudaDevs, TDistributedInitializer initializer, int seed, size_t memoryLimit ) : - isCpu( false ), - threadPool( CreateThreadPool(cudaDevs.Size()) ) + threadPool( CreateThreadPool( cudaDevs.Size() ) ) { - mathEngines.SetSize( cudaDevs.Size() ); - CreateDistributedCudaMathEngines( mathEngines.GetPtr(), cudaDevs.Size(), cudaDevs.GetPtr(), memoryLimit ); - initialize( archive, cudaDevs.Size(), initializer, seed ); + initialize( archive, cudaDevs.Size(), initializer, seed, memoryLimit, cudaDevs.GetPtr() ); + threadParams = new CThreadParams( /*isCpu*/false, cnns ); } CDistributedTraining::~CDistributedTraining() { - delete threadPool; cnns.DeleteAll(); rands.DeleteAll(); // As mathEngines are owned, there are no buffers in pools left for any thread - for( int i = 0; i < mathEngines.Size(); ++i ){ + for( int i = 0; i < mathEngines.Size(); ++i ) { delete mathEngines[i]; } } @@ -340,72 +361,40 @@ float CDistributedTraining::GetLearningRate() const return cnns[0]->GetSolver()->GetLearningRate(); } -void CDistributedTraining::RunOnce( IDistributedDataset& data ) +void CDistributedTraining::run( IDistributedDataset* data, TRunType type ) { - CThreadParams function_params( &isFirstRun, &data, cnns, batchSize, isCpu, errorMessages ); + threadParams->SetData( data, type ); - IThreadPool::TFunction f = [](int threadIndex, void* ptr) + IThreadPool::TFunction f = []( int threadIndex, void* ptr ) { - CThreadParams& function_params = *static_cast( ptr ); - CPointerArray& cnns = function_params.Dnns; - CArray& batchSize = function_params.BatchSize; + CThreadParams& threadParams = *static_cast( ptr ); try { - CThreadGroupSwitcher groupSwitcher( function_params.IsCpu, threadIndex, cnns.Size() ); - const int currBatchSize = function_params.Data->SetInputBatch( *cnns[threadIndex], threadIndex ); - NeoAssert( currBatchSize > 0 || ( currBatchSize == 0 && !( *function_params.IsFirstRun ) ) ); - if( currBatchSize > 0 ) { - batchSize[threadIndex] += currBatchSize; - cnns[threadIndex]->RunOnce(); - } - *function_params.IsFirstRun = false; + threadParams.CallRun( threadIndex ); } catch( std::exception& e ) { - function_params.SetErrorMessage( threadIndex, e.what() ); + threadParams.SetErrorMessage( threadIndex, e.what() ); } #ifdef NEOML_USE_FINEOBJ catch( CException* e ) { - function_params.SetErrorMessage( threadIndex, e->MessageText().CreateString() ); + threadParams.SetErrorMessage( threadIndex, e->MessageText().CreateString() ); delete e; } #endif // NEOML_USE_FINEOBJ }; - NEOML_NUM_THREADS( *threadPool, &function_params, f ); + NEOML_NUM_THREADS( *threadPool, threadParams, f ); - CheckArchitecture( !function_params.IsErrorHappened, "DistributedTraining", - JoinStrings( function_params.ErrorMessages ) ); + CheckArchitecture( !threadParams->IsErrorHappened, "DistributedTraining", + JoinStrings( threadParams->ErrorMessages ) ); + threadParams->SetData(); } -void CDistributedTraining::RunAndBackwardOnce( IDistributedDataset& data ) +void CDistributedTraining::RunOnce( IDistributedDataset& data ) { - CThreadParams function_params( &isFirstRun, &data, cnns, batchSize, isCpu, errorMessages ); - - IThreadPool::TFunction f = [](int threadIndex, void* ptr) - { - CThreadParams& function_params = *static_cast( ptr ); - CPointerArray& cnns = function_params.Dnns; - CArray& batchSize = function_params.BatchSize; - try { - CThreadGroupSwitcher groupSwitcher( function_params.IsCpu, threadIndex, cnns.Size() ); - const int currBatchSize = function_params.Data->SetInputBatch( *cnns[threadIndex], threadIndex ); - NeoAssert( currBatchSize > 0 || ( currBatchSize == 0 && !( *function_params.IsFirstRun ) ) ); - if( currBatchSize > 0 ) { - batchSize[threadIndex] += currBatchSize; - cnns[threadIndex]->RunAndBackwardOnce(); - } - *function_params.IsFirstRun = false; - } catch( std::exception& e ) { - function_params.SetErrorMessage( threadIndex, e.what() ); - } -#ifdef NEOML_USE_FINEOBJ - catch( CException* e ) { - function_params.SetErrorMessage( threadIndex, e->MessageText().CreateString() ); - delete e; - } -#endif // NEOML_USE_FINEOBJ - }; - NEOML_NUM_THREADS( *threadPool, &function_params, f ); + run( &data, TRunType::RunOnce ); +} - CheckArchitecture( !function_params.IsErrorHappened, "DistributedTraining", - JoinStrings( function_params.ErrorMessages ) ); +void CDistributedTraining::RunAndBackwardOnce( IDistributedDataset& data ) +{ + run( &data, TRunType::RunBackwardOnce ); } void CDistributedTraining::RunAndLearnOnce( IDistributedDataset& data ) @@ -416,40 +405,7 @@ void CDistributedTraining::RunAndLearnOnce( IDistributedDataset& data ) void CDistributedTraining::Train() { - NeoAssert( !isFirstRun ); - int totalBatch = 0; - for( int i = 0; i < batchSize.Size(); ++i ) { - totalBatch += batchSize[i]; - } - - CThreadParams function_params( cnns, batchSize, totalBatch, isCpu, errorMessages ); - - IThreadPool::TFunction f = [](int threadIndex, void* ptr) - { - CThreadParams& function_params = *static_cast( ptr ); - CPointerArray& cnns = function_params.Dnns; - CArray& batchSize = function_params.BatchSize; - - try { - CThreadGroupSwitcher groupSwitcher( function_params.IsCpu, threadIndex, cnns.Size() ); - const float distributedCoeff - = batchSize[threadIndex] * cnns.Size() / static_cast( function_params.TotalBatch ); - cnns[threadIndex]->GetSolver()->Train( distributedCoeff ); - batchSize[threadIndex] = 0; - } catch( std::exception& e ) { - function_params.SetErrorMessage( threadIndex, e.what() ); - } -#ifdef NEOML_USE_FINEOBJ - catch( CException* e ) { - function_params.SetErrorMessage( threadIndex, e->MessageText().CreateString() ); - delete e; - } -#endif // NEOML_USE_FINEOBJ - }; - NEOML_NUM_THREADS( *threadPool, &function_params, f ); - - CheckArchitecture( !function_params.IsErrorHappened, "DistributedTraining", - JoinStrings( function_params.ErrorMessages ) ); + run( nullptr, TRunType::Train ); } void CDistributedTraining::GetLastLoss( const CString& layerName, CArray& losses ) const @@ -482,10 +438,7 @@ void CDistributedTraining::GetLastBlob( const CString& layerName, CObjectArray& blobs ) const { - blobs.SetSize( cnns.Size() ); - for( int i = 0; i < cnns.Size(); ++i ) { - blobs[i] = CheckCast( cnns[i]->GetLayer( layerName ) )->GetBlob(); - } + GetLastBlob( layerName, reinterpret_cast&>( blobs ) ); } void CDistributedTraining::Serialize( CArchive& archive ) @@ -514,10 +467,13 @@ struct CDistributedInference::CThreadParams final { CObjectArray Refs; // Separate dnn for each thread CArray IsDnnInferenced; // Indicates for what dnns the inference was performed CArray ErrorMessages; // Containers for errors if it happened - bool IsErrorHappened = false; + bool IsErrorHappened = false; // Flag indicates the error happened CThreadParams( int threadsCount, CReferenceDnnFactory& referenceDnnFactory ); + void Initialize( IDistributedDataset& data ); + void DropData() { Data = nullptr; } + void SetErrorMessage( int threadIndex, CString message ); }; CDistributedInference::CThreadParams::CThreadParams( int threadsCount, CReferenceDnnFactory& referenceDnnFactory ) @@ -546,6 +502,12 @@ void CDistributedInference::CThreadParams::Initialize( IDistributedDataset& data IsErrorHappened = false; } +void CDistributedInference::CThreadParams::SetErrorMessage( int threadIndex, CString message ) +{ + IsErrorHappened = true; + ErrorMessages[threadIndex] = std::move( message ); +} + //--------------------------------------------------------------------------------------------------------------------- CDistributedInference::CDistributedInference( const CDnn& dnn, int threadsCount, @@ -553,7 +515,7 @@ CDistributedInference::CDistributedInference( const CDnn& dnn, int threadsCount, threadPool( CreateThreadPool( threadsCount ) ), mathEngine( CreateCpuMathEngine( memoryLimit ) ), referenceDnnFactory( new CReferenceDnnFactory( *mathEngine, dnn, optimizeDnn ) ), - // if count was <= 0 the pool has been initialized with the number of available CPU cores + // if count was <= 0 the threadPool->Size() has been initialized with the number of available CPU cores threadParams( new CThreadParams( threadPool->Size(), *referenceDnnFactory ) ) { } @@ -563,7 +525,7 @@ CDistributedInference::CDistributedInference( CArchive& archive, int threadsCoun threadPool( CreateThreadPool( threadsCount ) ), mathEngine( CreateCpuMathEngine( memoryLimit ) ), referenceDnnFactory( new CReferenceDnnFactory( *mathEngine, archive, seed, optimizeDnn ) ), - // if count was <= 0 the pool has been initialized with the number of available CPU cores + // if count was <= 0 the threadPool->Size() has been initialized with the number of available CPU cores threadParams( new CThreadParams( threadPool->Size(), *referenceDnnFactory ) ) { } @@ -588,13 +550,11 @@ void CDistributedInference::RunOnce( IDistributedDataset& data ) threadParams.IsDnnInferenced[threadIndex] = true; } } catch( std::exception& e ) { - threadParams.IsErrorHappened = true; - threadParams.ErrorMessages[threadIndex] = e.what(); + threadParams.SetErrorMessage( threadIndex, e.what() ); } #ifdef NEOML_USE_FINEOBJ catch( CException* e ) { - threadParams.IsErrorHappened = true; - threadParams.ErrorMessages[threadIndex] = e->MessageText().CreateString(); + threadParams.SetErrorMessage( threadIndex, e->MessageText().CreateString() ); delete e; } #endif // NEOML_USE_FINEOBJ @@ -603,7 +563,7 @@ void CDistributedInference::RunOnce( IDistributedDataset& data ) CheckArchitecture( !threadParams->IsErrorHappened, "DistributedTraining", JoinStrings( threadParams->ErrorMessages ) ); - threadParams->Data = nullptr; + threadParams->DropData(); } void CDistributedInference::GetLastBlob( const CString& layerName, CObjectArray& blobs ) const