Skip to content

Commit

Permalink
Merge pull request #172 from goodenou/better_thread_care
Browse files Browse the repository at this point in the history
modification to thread treatment at shutdown
  • Loading branch information
kutschke committed Apr 10, 2020
2 parents 9af8004 + b07a87e commit c531e7f
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 75 deletions.
26 changes: 0 additions & 26 deletions Mu2eG4/fcl/MYg4test_potMT.fcl

This file was deleted.

3 changes: 1 addition & 2 deletions Mu2eG4/fcl/prolog.fcl
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ mu2eg4DefaultDebug: {
exportPDTEnd : false
warnEveryNewRun : false
diagLevel : 0
mtDebugOutput : false
mtDebugOutput : 0
trackingVerbosityLevel : 0
steppingVerbosityLevel : 0
navigatorVerbosityLevel : 0
Expand Down Expand Up @@ -185,7 +185,6 @@ Mu2eG4MT: {
g4run : {
@table::mu2eg4runDefaultSingleStage
module_type : "Mu2eG4MT"
// initialSeed : 8
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion Mu2eG4/inc/MTMasterThread.hh
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ namespace mu2e {

private:

bool m_mtDebugOutput;
int m_mtDebugOutput;

enum class ThreadState { NotExist = 0, BeginRun = 1, EndRun = 2, Destruct = 3 };

Expand Down
2 changes: 1 addition & 1 deletion Mu2eG4/inc/Mu2eG4Config.hh
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ namespace mu2e {
fhicl::Atom<bool> navigatorCheckMode {Name("navigatorCheckMode"), false};
fhicl::Atom<int> navigatorVerbosityLevel {Name("navigatorVerbosityLevel"), 0};
fhicl::Atom<int> PiENuPolicyVerbosity {Name("PiENuPolicyVerbosity"), 0};
fhicl::Atom<bool> mtDebugOutput {Name("mtDebugOutput"), false};
fhicl::Atom<int> mtDebugOutput {Name("mtDebugOutput"), 0};

fhicl::Atom<int> checkFieldMap {Name("checkFieldMap"), 0 };

Expand Down
2 changes: 1 addition & 1 deletion Mu2eG4/inc/Mu2eG4WorkerRunManager.hh
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ namespace mu2e {

bool m_managerInitialized;
bool m_steppingVerbose;
bool m_mtDebugOutput;
int m_mtDebugOutput;
int rmvlevel_;

std::unique_ptr<Mu2eG4PerThreadStorage> perThreadObjects_;
Expand Down
40 changes: 20 additions & 20 deletions Mu2eG4/src/MTMasterThread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,40 +50,40 @@ namespace mu2e {
while (true) {
// Signal main thread that it can proceed
m_mainCanProceed = true;
if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "Master thread: State loop, notify main thread" << G4endl;
}

m_notifyMainCV.notify_one();

// Wait until the main thread sends signal
m_masterCanProceed = false;
if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "Master thread: State loop, starting wait" << G4endl;
}
m_notifyMasterCV.wait(lk2, [&] { return m_masterCanProceed; });

// Act according to the state
if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "Master thread: Woke up, state is " << static_cast<int>(m_masterThreadState) << G4endl;
}

if (m_masterThreadState == ThreadState::BeginRun) {
// Initialize Geant4
if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "Master thread: Initializing Geant4" << G4endl;
}
masterRunManager->initializeG4(run_number);
isG4Alive = true;
} else if (m_masterThreadState == ThreadState::EndRun) {
// Stop Geant4
if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "Master thread: Stopping Geant4" << G4endl;
}
masterRunManager->stopG4();
isG4Alive = false;
} else if (m_masterThreadState == ThreadState::Destruct) {
if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "Master thread: Breaking out of state loop" << G4endl;
}
if (isG4Alive)
Expand All @@ -97,7 +97,7 @@ namespace mu2e {
}

// Cleanup
if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "MTMasterThread: start Mu2eG4MTRunManager destruction\n";
G4cout << "Master thread: Am I unique owner of masterRunManager? "
<< masterRunManager.unique() << G4endl;
Expand All @@ -106,25 +106,25 @@ namespace mu2e {
masterRunManager.reset();
//G4PhysicalVolumeStore::Clean();

if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "Master thread: reset shared_ptr" << G4endl;
}
lk2.unlock();
if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "MTMasterThread: Master thread is finished" << G4endl;
}
});

// Start waiting for a signal from the condition variable (releases the lock temporarily)
// First for initialization
m_mainCanProceed = false;
if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "Main thread: Signal master for initialization" << G4endl;
}
m_notifyMainCV.wait(lk, [&]() { return m_mainCanProceed; });

lk.unlock();
if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "MTMasterThread: Master thread is constructed" << G4endl;
}
}
Expand All @@ -146,14 +146,14 @@ namespace mu2e {
m_masterThreadState = ThreadState::BeginRun;
m_masterCanProceed = true;
m_mainCanProceed = false;
if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "MTMasterThread: Signal master for BeginRun" << G4endl;
}
m_notifyMasterCV.notify_one();
m_notifyMainCV.wait(lk2, [&]() { return m_mainCanProceed; });

lk2.unlock();
if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "MTMasterThread: finish BeginRun" << G4endl;
}
}
Expand All @@ -167,13 +167,13 @@ namespace mu2e {
m_masterThreadState = ThreadState::EndRun;
m_mainCanProceed = false;
m_masterCanProceed = true;
if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "MTMasterThread: Signal master for EndRun" <<G4endl;
}
m_notifyMasterCV.notify_one();
m_notifyMainCV.wait(lk2, [&]() { return m_mainCanProceed; });
lk2.unlock();
if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "MTMasterThread: finish EndRun" << G4endl;
}
}
Expand All @@ -183,7 +183,7 @@ namespace mu2e {
if (m_stopped) {
return;
}
if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "MTMasterThread::stopThread: stop main thread" << G4endl;
}

Expand All @@ -192,23 +192,23 @@ namespace mu2e {
// thread, and join it.
std::unique_lock<std::mutex> lk2(m_threadMutex);
m_masterRunManager.reset();
if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "Main thread: reset shared_ptr" << G4endl;
}

m_masterThreadState = ThreadState::Destruct;
m_masterCanProceed = true;
if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "MTMasterThread::stopThread: notify" << G4endl;
}
m_notifyMasterCV.notify_one();
lk2.unlock();

if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "Main thread: joining master thread" << G4endl;
}
m_masterThread.join();
if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "MTMasterThread::stopThread: main thread finished" << G4endl;
}
m_stopped = true;
Expand Down
73 changes: 53 additions & 20 deletions Mu2eG4/src/Mu2eG4MT_module.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@

// TBB includes
#include "tbb/concurrent_hash_map.h"
#include "tbb/task_group.h"

using namespace std;

Expand Down Expand Up @@ -123,7 +124,7 @@ namespace mu2e {
std::unique_ptr<IMu2eG4Cut> commonCuts_;

int _rmvlevel;
bool _mtDebugOutput;
int _mtDebugOutput;

art::InputTag _generatorModuleLabel;

Expand Down Expand Up @@ -153,8 +154,7 @@ namespace mu2e {
int const num_threads{art::Globals::instance()->nthreads()};

typedef tbb::concurrent_hash_map< std::thread::id, std::unique_ptr<Mu2eG4WorkerRunManager> > WorkerRMMap;
WorkerRMMap myworkerRunManagerMap;

WorkerRMMap myworkerRunManagerMap;
}; // end G4 header


Expand Down Expand Up @@ -346,13 +346,10 @@ namespace mu2e {
WorkerRMMap::accessor access_workerMap;

if (!myworkerRunManagerMap.find(access_workerMap, tid)){
if (_mtDebugOutput){
if (_mtDebugOutput > 0){
G4cout << "FOR TID: " << tid << ", NO WORKER. We are making one.\n";
}
myworkerRunManagerMap.insert(access_workerMap, tid);
//std::ostringstream oss;
//oss << tid;
//std::string workerID = oss.str();
access_workerMap->second = std::make_unique<Mu2eG4WorkerRunManager>(conf_, tid);
}

Expand All @@ -364,7 +361,7 @@ namespace mu2e {
Mu2eG4WorkerRunManager* scheduleWorkerRM = (access_workerMap->second).get();
access_workerMap.release();

if (_mtDebugOutput){
if (_mtDebugOutput > 0){
G4cout << "FOR SchedID: " << schedID << ", TID=" << tid << ", workerRunManagers[schedID].get() is:" << scheduleWorkerRM << "\n";
}

Expand All @@ -378,7 +375,7 @@ namespace mu2e {
perThreadStore->initializeEventInfo(&event, &spHelper, &parentHelper, &genInputHits, _generatorModuleLabel);
scheduleWorkerRM->processEvent(&event);

if (_mtDebugOutput){
if (_mtDebugOutput > 0){
G4cout << "Current Event in RM is: " << scheduleWorkerRM->GetCurrentEvent()->GetEventID() << "\n";
}

Expand Down Expand Up @@ -420,25 +417,61 @@ namespace mu2e {

// Tell G4 that this run is over.
void Mu2eG4MT::endRun(art::Run & run, art::ProcessingFrame const& procFrame) {

if (_mtDebugOutput){
G4cout << "At endRun, we have " << myworkerRunManagerMap.size() << " members in the map\n";
}
WorkerRMMap::iterator it = myworkerRunManagerMap.begin();

while (it != myworkerRunManagerMap.end()) {
it->second.release();
++it;
if (_mtDebugOutput > 1){
G4cout << "At endRun pt1, we have " << myworkerRunManagerMap.size() << " members in the map "
<< "and are running " << num_threads << " threads.\n" ;
}

else if (num_threads < static_cast <int> (myworkerRunManagerMap.size()) && _mtDebugOutput > 0){
G4cout << "At endRun pt1, we have " << myworkerRunManagerMap.size() << " members in the map "
<< "and are running " << num_threads << " threads.\n" ;
}


if (storePhysicsTablesDir_!="") {
if ( _rmvlevel > 0 ) {
G4cout << __func__ << " Will write out physics tables to "
<< storePhysicsTablesDir_
<< G4endl;
<< storePhysicsTablesDir_
<< G4endl;
}
masterThread->masterRunManagerPtr()->getMasterPhysicsList()->StorePhysicsTable(storePhysicsTablesDir_);
}

std::atomic<int> threads_left = num_threads;
tbb::task_group g;
for (int i = 0; i < num_threads; ++i) {

auto destroy_worker = [&threads_left, i, this] {
WorkerRMMap::accessor access_workerMap;
std::thread::id this_tid = std::this_thread::get_id();

if (myworkerRunManagerMap.find(access_workerMap, this_tid)) {
access_workerMap->second.reset();
myworkerRunManagerMap.erase(access_workerMap);
}

access_workerMap.release();
--threads_left;
while (threads_left != 0) {}
return;
};
g.run(destroy_worker);
}//for
g.wait();

if (_mtDebugOutput > 0){
G4cout << "At endRun pt2, we have " << myworkerRunManagerMap.size() << " members in the map.\n";
}

//This cleans up the worker run managers that are in threads no longer being used, i.e. 'transient threads'
WorkerRMMap::iterator it = myworkerRunManagerMap.begin();
while (it != myworkerRunManagerMap.end()) {
if (_mtDebugOutput > 0){
std::cout << "releasing RM for thread ID" << it->first << std::endl;
}
it->second.release();
++it;
}

G4cout << "at endRun: numExcludedEvents = " << numExcludedEvents << G4endl;
myworkerRunManagerMap.clear();
Expand Down
8 changes: 4 additions & 4 deletions Mu2eG4/src/Mu2eG4WorkerRunManager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ namespace mu2e {
steppingCuts_(createMu2eG4Cuts(conf.Mu2eG4SteppingOnlyCut.get<fhicl::ParameterSet>(), mu2elimits_)),
commonCuts_(createMu2eG4Cuts(conf.Mu2eG4CommonCut.get<fhicl::ParameterSet>(), mu2elimits_))
{
if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "WorkerRM on thread " << workerID_ << " is being created\n!";
//to see random number seeds for each event and other verbosity, uncomment this
SetPrintProgress(1);
Expand All @@ -93,7 +93,7 @@ namespace mu2e {

// Destructor of base is called automatically. No need to do anything.
Mu2eG4WorkerRunManager::~Mu2eG4WorkerRunManager(){
if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "WorkerRM on thread " << workerID_ << " is being destroyed\n!";
}
}
Expand All @@ -103,7 +103,7 @@ namespace mu2e {

masterRM = mRM;

if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "starting WorkerRM::initializeThread on thread: " << workerID_ << G4endl;
}

Expand Down Expand Up @@ -167,7 +167,7 @@ namespace mu2e {

//we have to do this so that the state is correct for RunInitialization
G4StateManager::GetStateManager()->SetNewState(G4State_Idle);
if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "completed WorkerRM::initializeThread on thread " << workerID_ << G4endl;
}
}
Expand Down

0 comments on commit c531e7f

Please sign in to comment.