Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
csegarragonz committed Feb 2, 2022
1 parent 6123721 commit 7f81c7c
Show file tree
Hide file tree
Showing 5 changed files with 8 additions and 34 deletions.
8 changes: 2 additions & 6 deletions include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,13 @@ class Executor
faabric::Message& msg,
bool createIfNotExists = false);

// TODO - maybe remove me
bool doMigration(
std::shared_ptr<faabric::PendingMigrations> pendingMigrations);

virtual std::span<uint8_t> getMemoryView();

protected:
virtual void restore(const std::string& snapshotKey);

virtual void postFinish();

virtual std::span<uint8_t> getMemoryView();

virtual void setMemorySize(size_t newSize);

faabric::Message boundMessage;
Expand Down
5 changes: 3 additions & 2 deletions src/scheduler/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -545,10 +545,11 @@ void Executor::threadPoolThread(int threadPoolIdx)
returnValue = -99;

// MPI migration
if(msg.ismpi()) {
if (msg.ismpi()) {
// TODO - when should we delete the pending migration?
auto& mpiWorld =
faabric::scheduler::getMpiWorldRegistry().getWorld(msg.mpiworldid());
faabric::scheduler::getMpiWorldRegistry().getWorld(
msg.mpiworldid());
mpiWorld.destroy();
}
} catch (const std::exception& ex) {
Expand Down
3 changes: 2 additions & 1 deletion src/scheduler/MpiWorld.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1802,7 +1802,8 @@ void MpiWorld::prepareMigration(
// TODO - merge with initLocalQueues
for (const int sendRank : ranksForHost[thisHost]) {
for (const int recvRank : ranksForHost[thisHost]) {
if (localQueues[getIndexForRanks(sendRank, recvRank)] == nullptr) {
if (localQueues[getIndexForRanks(sendRank, recvRank)] ==
nullptr) {
localQueues[getIndexForRanks(sendRank, recvRank)] =
std::make_shared<InMemoryMpiQueue>();
}
Expand Down
1 change: 0 additions & 1 deletion src/scheduler/Scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1371,7 +1371,6 @@ Scheduler::doCheckForMigrationOpportunities(
&(*(req->mutable_messages()->begin() +
std::distance(originalDecision.hosts.begin(), right)));
auto* migrationMsgPtr = migration->mutable_msg();
// faabric::util::copyMessage(msgPtr, migrationMsgPtr);
*migrationMsgPtr = *msgPtr;
// Decrement by one the availability, and check for more
// possible sources of migration
Expand Down
25 changes: 1 addition & 24 deletions tests/test/scheduler/test_function_migration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
#include <faabric/scheduler/FunctionMigrationThread.h>
#include <faabric/scheduler/Scheduler.h>
#include <faabric/util/config.h>
#include <faabric/util/message.h>
#include <faabric/util/testing.h>

using namespace faabric::scheduler;
Expand Down Expand Up @@ -86,8 +85,7 @@ class FunctionMigrationTestFixture : public SchedulerTestFixture
for (auto pair : migrations) {
auto* migration = expected.add_migrations();
auto* migrationMsg = migration->mutable_msg();
faabric::util::copyMessage(&req->mutable_messages()->at(pair.first),
migrationMsg);
*migrationMsg = req->mutable_messages()->at(pair.first);
migration->set_srchost(hosts.at(pair.first));
migration->set_dsthost(hosts.at(pair.second));
}
Expand Down Expand Up @@ -449,32 +447,11 @@ TEST_CASE_METHOD(FunctionMigrationTestFixture,
checkPendingMigrationsExpectation(
expectedMigrations, actualMigrations, hosts, true);

// Check that certain MPI calls actually do the migration
SECTION("MPI barrier triggers a migration point") { world.barrier(0); }

SECTION("MPI all reduce triggers a migration point")
{
std::vector<int> messageData = { 0, 1, 2 };
world.allReduce(0,
BYTES(messageData.data()),
BYTES(messageData.data()),
MPI_INT,
messageData.size(),
MPI_SUM);
}

// When performing the migration, MPI will remove it from the pending
// migrations map
REQUIRE(sch.getPendingAppMigrations(appId) == nullptr);
checkPendingMigrationsExpectation(
expectedMigrations, getMpiMockedPendingMigrations().front(), hosts, true);

faabric::Message res =
sch.getFunctionResult(firstMsg->id(), 2 * timeToSleep);
REQUIRE(res.returnvalue() == 0);

// Clean up
world.destroy();
}

}

0 comments on commit 7f81c7c

Please sign in to comment.