Skip to content

Commit

Permalink
additional changes on thread treatment at shutdown of MT module. addi…
Browse files Browse the repository at this point in the history
…ng more complex verbosity for MT running
  • Loading branch information
Lisa Goodenough committed Apr 10, 2020
1 parent 9af8004 commit b07a87e
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 75 deletions.
26 changes: 0 additions & 26 deletions Mu2eG4/fcl/MYg4test_potMT.fcl

This file was deleted.

3 changes: 1 addition & 2 deletions Mu2eG4/fcl/prolog.fcl
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ mu2eg4DefaultDebug: {
exportPDTEnd : false
warnEveryNewRun : false
diagLevel : 0
mtDebugOutput : false
mtDebugOutput : 0
trackingVerbosityLevel : 0
steppingVerbosityLevel : 0
navigatorVerbosityLevel : 0
Expand Down Expand Up @@ -185,7 +185,6 @@ Mu2eG4MT: {
g4run : {
@table::mu2eg4runDefaultSingleStage
module_type : "Mu2eG4MT"
// initialSeed : 8
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion Mu2eG4/inc/MTMasterThread.hh
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ namespace mu2e {

private:

bool m_mtDebugOutput;
int m_mtDebugOutput;

enum class ThreadState { NotExist = 0, BeginRun = 1, EndRun = 2, Destruct = 3 };

Expand Down
2 changes: 1 addition & 1 deletion Mu2eG4/inc/Mu2eG4Config.hh
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ namespace mu2e {
fhicl::Atom<bool> navigatorCheckMode {Name("navigatorCheckMode"), false};
fhicl::Atom<int> navigatorVerbosityLevel {Name("navigatorVerbosityLevel"), 0};
fhicl::Atom<int> PiENuPolicyVerbosity {Name("PiENuPolicyVerbosity"), 0};
fhicl::Atom<bool> mtDebugOutput {Name("mtDebugOutput"), false};
fhicl::Atom<int> mtDebugOutput {Name("mtDebugOutput"), 0};

fhicl::Atom<int> checkFieldMap {Name("checkFieldMap"), 0 };

Expand Down
2 changes: 1 addition & 1 deletion Mu2eG4/inc/Mu2eG4WorkerRunManager.hh
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ namespace mu2e {

bool m_managerInitialized;
bool m_steppingVerbose;
bool m_mtDebugOutput;
int m_mtDebugOutput;
int rmvlevel_;

std::unique_ptr<Mu2eG4PerThreadStorage> perThreadObjects_;
Expand Down
40 changes: 20 additions & 20 deletions Mu2eG4/src/MTMasterThread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,40 +50,40 @@ namespace mu2e {
while (true) {
// Signal main thread that it can proceed
m_mainCanProceed = true;
if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "Master thread: State loop, notify main thread" << G4endl;
}

m_notifyMainCV.notify_one();

// Wait until the main thread sends signal
m_masterCanProceed = false;
if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "Master thread: State loop, starting wait" << G4endl;
}
m_notifyMasterCV.wait(lk2, [&] { return m_masterCanProceed; });

// Act according to the state
if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "Master thread: Woke up, state is " << static_cast<int>(m_masterThreadState) << G4endl;
}

if (m_masterThreadState == ThreadState::BeginRun) {
// Initialize Geant4
if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "Master thread: Initializing Geant4" << G4endl;
}
masterRunManager->initializeG4(run_number);
isG4Alive = true;
} else if (m_masterThreadState == ThreadState::EndRun) {
// Stop Geant4
if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "Master thread: Stopping Geant4" << G4endl;
}
masterRunManager->stopG4();
isG4Alive = false;
} else if (m_masterThreadState == ThreadState::Destruct) {
if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "Master thread: Breaking out of state loop" << G4endl;
}
if (isG4Alive)
Expand All @@ -97,7 +97,7 @@ namespace mu2e {
}

// Cleanup
if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "MTMasterThread: start Mu2eG4MTRunManager destruction\n";
G4cout << "Master thread: Am I unique owner of masterRunManager? "
<< masterRunManager.unique() << G4endl;
Expand All @@ -106,25 +106,25 @@ namespace mu2e {
masterRunManager.reset();
//G4PhysicalVolumeStore::Clean();

if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "Master thread: reset shared_ptr" << G4endl;
}
lk2.unlock();
if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "MTMasterThread: Master thread is finished" << G4endl;
}
});

// Start waiting for a signal from the condition variable (releases the lock temporarily)
// First for initialization
m_mainCanProceed = false;
if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "Main thread: Signal master for initialization" << G4endl;
}
m_notifyMainCV.wait(lk, [&]() { return m_mainCanProceed; });

lk.unlock();
if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "MTMasterThread: Master thread is constructed" << G4endl;
}
}
Expand All @@ -146,14 +146,14 @@ namespace mu2e {
m_masterThreadState = ThreadState::BeginRun;
m_masterCanProceed = true;
m_mainCanProceed = false;
if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "MTMasterThread: Signal master for BeginRun" << G4endl;
}
m_notifyMasterCV.notify_one();
m_notifyMainCV.wait(lk2, [&]() { return m_mainCanProceed; });

lk2.unlock();
if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "MTMasterThread: finish BeginRun" << G4endl;
}
}
Expand All @@ -167,13 +167,13 @@ namespace mu2e {
m_masterThreadState = ThreadState::EndRun;
m_mainCanProceed = false;
m_masterCanProceed = true;
if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "MTMasterThread: Signal master for EndRun" <<G4endl;
}
m_notifyMasterCV.notify_one();
m_notifyMainCV.wait(lk2, [&]() { return m_mainCanProceed; });
lk2.unlock();
if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "MTMasterThread: finish EndRun" << G4endl;
}
}
Expand All @@ -183,7 +183,7 @@ namespace mu2e {
if (m_stopped) {
return;
}
if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "MTMasterThread::stopThread: stop main thread" << G4endl;
}

Expand All @@ -192,23 +192,23 @@ namespace mu2e {
// thread, and join it.
std::unique_lock<std::mutex> lk2(m_threadMutex);
m_masterRunManager.reset();
if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "Main thread: reset shared_ptr" << G4endl;
}

m_masterThreadState = ThreadState::Destruct;
m_masterCanProceed = true;
if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "MTMasterThread::stopThread: notify" << G4endl;
}
m_notifyMasterCV.notify_one();
lk2.unlock();

if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "Main thread: joining master thread" << G4endl;
}
m_masterThread.join();
if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "MTMasterThread::stopThread: main thread finished" << G4endl;
}
m_stopped = true;
Expand Down
73 changes: 53 additions & 20 deletions Mu2eG4/src/Mu2eG4MT_module.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@

// TBB includes
#include "tbb/concurrent_hash_map.h"
#include "tbb/task_group.h"

using namespace std;

Expand Down Expand Up @@ -123,7 +124,7 @@ namespace mu2e {
std::unique_ptr<IMu2eG4Cut> commonCuts_;

int _rmvlevel;
bool _mtDebugOutput;
int _mtDebugOutput;

art::InputTag _generatorModuleLabel;

Expand Down Expand Up @@ -153,8 +154,7 @@ namespace mu2e {
int const num_threads{art::Globals::instance()->nthreads()};

typedef tbb::concurrent_hash_map< std::thread::id, std::unique_ptr<Mu2eG4WorkerRunManager> > WorkerRMMap;
WorkerRMMap myworkerRunManagerMap;

WorkerRMMap myworkerRunManagerMap;
}; // end G4 header


Expand Down Expand Up @@ -346,13 +346,10 @@ namespace mu2e {
WorkerRMMap::accessor access_workerMap;

if (!myworkerRunManagerMap.find(access_workerMap, tid)){
if (_mtDebugOutput){
if (_mtDebugOutput > 0){
G4cout << "FOR TID: " << tid << ", NO WORKER. We are making one.\n";
}
myworkerRunManagerMap.insert(access_workerMap, tid);
//std::ostringstream oss;
//oss << tid;
//std::string workerID = oss.str();
access_workerMap->second = std::make_unique<Mu2eG4WorkerRunManager>(conf_, tid);
}

Expand All @@ -364,7 +361,7 @@ namespace mu2e {
Mu2eG4WorkerRunManager* scheduleWorkerRM = (access_workerMap->second).get();
access_workerMap.release();

if (_mtDebugOutput){
if (_mtDebugOutput > 0){
G4cout << "FOR SchedID: " << schedID << ", TID=" << tid << ", workerRunManagers[schedID].get() is:" << scheduleWorkerRM << "\n";
}

Expand All @@ -378,7 +375,7 @@ namespace mu2e {
perThreadStore->initializeEventInfo(&event, &spHelper, &parentHelper, &genInputHits, _generatorModuleLabel);
scheduleWorkerRM->processEvent(&event);

if (_mtDebugOutput){
if (_mtDebugOutput > 0){
G4cout << "Current Event in RM is: " << scheduleWorkerRM->GetCurrentEvent()->GetEventID() << "\n";
}

Expand Down Expand Up @@ -420,25 +417,61 @@ namespace mu2e {

// Tell G4 that this run is over.
void Mu2eG4MT::endRun(art::Run & run, art::ProcessingFrame const& procFrame) {

if (_mtDebugOutput){
G4cout << "At endRun, we have " << myworkerRunManagerMap.size() << " members in the map\n";
}
WorkerRMMap::iterator it = myworkerRunManagerMap.begin();

while (it != myworkerRunManagerMap.end()) {
it->second.release();
++it;
if (_mtDebugOutput > 1){
G4cout << "At endRun pt1, we have " << myworkerRunManagerMap.size() << " members in the map "
<< "and are running " << num_threads << " threads.\n" ;
}

else if (num_threads < static_cast <int> (myworkerRunManagerMap.size()) && _mtDebugOutput > 0){
G4cout << "At endRun pt1, we have " << myworkerRunManagerMap.size() << " members in the map "
<< "and are running " << num_threads << " threads.\n" ;
}


if (storePhysicsTablesDir_!="") {
if ( _rmvlevel > 0 ) {
G4cout << __func__ << " Will write out physics tables to "
<< storePhysicsTablesDir_
<< G4endl;
<< storePhysicsTablesDir_
<< G4endl;
}
masterThread->masterRunManagerPtr()->getMasterPhysicsList()->StorePhysicsTable(storePhysicsTablesDir_);
}

std::atomic<int> threads_left = num_threads;
tbb::task_group g;
for (int i = 0; i < num_threads; ++i) {

auto destroy_worker = [&threads_left, i, this] {
WorkerRMMap::accessor access_workerMap;
std::thread::id this_tid = std::this_thread::get_id();

if (myworkerRunManagerMap.find(access_workerMap, this_tid)) {
access_workerMap->second.reset();
myworkerRunManagerMap.erase(access_workerMap);
}

access_workerMap.release();
--threads_left;
while (threads_left != 0) {}
return;
};
g.run(destroy_worker);
}//for
g.wait();

if (_mtDebugOutput > 0){
G4cout << "At endRun pt2, we have " << myworkerRunManagerMap.size() << " members in the map.\n";
}

//This cleans up the worker run managers that are in threads no longer being used, i.e. 'transient threads'
WorkerRMMap::iterator it = myworkerRunManagerMap.begin();
while (it != myworkerRunManagerMap.end()) {
if (_mtDebugOutput > 0){
std::cout << "releasing RM for thread ID" << it->first << std::endl;
}
it->second.release();
++it;
}

G4cout << "at endRun: numExcludedEvents = " << numExcludedEvents << G4endl;
myworkerRunManagerMap.clear();
Expand Down
8 changes: 4 additions & 4 deletions Mu2eG4/src/Mu2eG4WorkerRunManager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ namespace mu2e {
steppingCuts_(createMu2eG4Cuts(conf.Mu2eG4SteppingOnlyCut.get<fhicl::ParameterSet>(), mu2elimits_)),
commonCuts_(createMu2eG4Cuts(conf.Mu2eG4CommonCut.get<fhicl::ParameterSet>(), mu2elimits_))
{
if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "WorkerRM on thread " << workerID_ << " is being created\n!";
//to see random number seeds for each event and other verbosity, uncomment this
SetPrintProgress(1);
Expand All @@ -93,7 +93,7 @@ namespace mu2e {

// Destructor of base is called automatically. No need to do anything.
Mu2eG4WorkerRunManager::~Mu2eG4WorkerRunManager(){
if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "WorkerRM on thread " << workerID_ << " is being destroyed\n!";
}
}
Expand All @@ -103,7 +103,7 @@ namespace mu2e {

masterRM = mRM;

if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "starting WorkerRM::initializeThread on thread: " << workerID_ << G4endl;
}

Expand Down Expand Up @@ -167,7 +167,7 @@ namespace mu2e {

//we have to do this so that the state is correct for RunInitialization
G4StateManager::GetStateManager()->SetNewState(G4State_Idle);
if (m_mtDebugOutput) {
if (m_mtDebugOutput > 0) {
G4cout << "completed WorkerRM::initializeThread on thread " << workerID_ << G4endl;
}
}
Expand Down

0 comments on commit b07a87e

Please sign in to comment.