Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Periodically Check For Migration Opportunities #214

Merged
merged 27 commits into from
Feb 7, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
867f41f
proto: add field to set migration check period
csegarragonz Jan 7, 2022
4eef2d3
scheduler: add function migration thread that sleeps for a fixed amou…
csegarragonz Jan 7, 2022
0a971c8
scheduler: add logic for check for migration opportunities method
csegarragonz Jan 7, 2022
ff175df
scheduler: start/stop migration thread and fix race condition when re…
csegarragonz Jan 8, 2022
aa9fcc8
tests: add test for function migration thread and fix bug in thread s…
csegarragonz Jan 8, 2022
4efe4ca
util: add method to deep copy a faabric message + test
csegarragonz Jan 9, 2022
13560b3
function call server: add call to add a pending migration to remote h…
csegarragonz Jan 9, 2022
0b02aba
tests: add further testing
csegarragonz Jan 9, 2022
1b98e6b
mpi: add migration points, link with executor, and tests
csegarragonz Jan 10, 2022
6308962
tests: fix data race
csegarragonz Jan 10, 2022
5192100
pr: re-factor methods as suggested in the pr comments
csegarragonz Jan 10, 2022
b3b50d5
executor: throw exception to properly shutdown executing task
csegarragonz Jan 10, 2022
fe07c6f
executor: close the loop to alllow for function migration
csegarragonz Jan 10, 2022
a45cc00
scheduler: add UNDERFULL scheduling topology hint
csegarragonz Jan 10, 2022
c177e6a
Add migration exception, catch in executor
Shillaker Jan 12, 2022
f5fa773
Remove manual message copying
Shillaker Jan 12, 2022
ec38aad
Remove unnecessary migration APIs
Shillaker Jan 12, 2022
fefbe12
Make getMemoryView public
Shillaker Jan 12, 2022
07709f0
scheduler: get functions to migrate properly
csegarragonz Jan 12, 2022
6123721
migration working
csegarragonz Jan 12, 2022
446e90a
cleanup
csegarragonz Feb 2, 2022
c090a16
mpi: use initLocalQueues() instead of repeating the logic, remove ass…
csegarragonz Feb 3, 2022
51dcc95
mpi: remove unused getMpiMockedPendingMigrations
csegarragonz Feb 3, 2022
ba44bdd
scheduler: remove unused function declarations
csegarragonz Feb 3, 2022
28d143e
scheduler: factor out method to start the function migration thread i…
csegarragonz Feb 3, 2022
54ab7b4
mpi: use a boolean flag to indicate that app has been migrated, and c…
csegarragonz Feb 3, 2022
3bc2089
proto: make topologyHint a message field, add test for json serialisa…
csegarragonz Feb 3, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion include/faabric/scheduler/FunctionCallApi.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ enum FunctionCalls
ExecuteFunctions = 1,
Flush = 2,
Unregister = 3,
GetResources = 4
GetResources = 4,
AddPendingMigration = 5
csegarragonz marked this conversation as resolved.
Show resolved Hide resolved
};
}
6 changes: 6 additions & 0 deletions include/faabric/scheduler/FunctionCallClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ getBatchRequests();
std::vector<std::pair<std::string, faabric::EmptyRequest>>
getResourceRequests();

std::vector<std::pair<std::string, std::shared_ptr<faabric::PendingMigrations>>>
getAddPendingMigrationRequests();

std::vector<std::pair<std::string, faabric::UnregisterRequest>>
getUnregisterRequests();

Expand All @@ -42,6 +45,9 @@ class FunctionCallClient : public faabric::transport::MessageEndpointClient

faabric::HostResources getResources();

void sendAddPendingMigration(
std::shared_ptr<faabric::PendingMigrations> req);
csegarragonz marked this conversation as resolved.
Show resolved Hide resolved

void executeFunctions(std::shared_ptr<faabric::BatchExecuteRequest> req);

void unregister(faabric::UnregisterRequest& req);
Expand Down
4 changes: 4 additions & 0 deletions include/faabric/scheduler/FunctionCallServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ class FunctionCallServer final
const uint8_t* buffer,
size_t bufferSize);

std::unique_ptr<google::protobuf::Message> recvAddPendingMigration(
const uint8_t* buffer,
size_t bufferSize);

void recvExecuteFunctions(const uint8_t* buffer, size_t bufferSize);

void recvUnregister(const uint8_t* buffer, size_t bufferSize);
Expand Down
26 changes: 26 additions & 0 deletions include/faabric/scheduler/FunctionMigrationThread.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#pragma once

#include <condition_variable>
#include <mutex>
#include <thread>

namespace faabric::scheduler {
class FunctionMigrationThread
{
public:
// Start a background thread that, every wake up period, will check if there
// are migration opportunities for in-flight apps that have opted in to
// being checked for migrations.
csegarragonz marked this conversation as resolved.
Show resolved Hide resolved
void start(int wakeUpPeriodSecondsIn);

void stop();

int wakeUpPeriodSeconds;

private:
std::unique_ptr<std::thread> workThread = nullptr;
std::mutex mx;
std::condition_variable mustStopCv;
Shillaker marked this conversation as resolved.
Show resolved Hide resolved
std::atomic<bool> isShutdown;
};
}
13 changes: 13 additions & 0 deletions include/faabric/scheduler/MpiWorld.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ std::vector<faabric::MpiHostsToRanksMessage> getMpiHostsToRanksMessages();
std::vector<std::shared_ptr<faabric::MPIMessage>> getMpiMockedMessages(
int sendRank);

std::vector<std::shared_ptr<faabric::PendingMigrations>>
getMpiMockedPendingMigrations();

typedef faabric::util::FixedCapacityQueue<std::shared_ptr<faabric::MPIMessage>>
InMemoryMpiQueue;

Expand Down Expand Up @@ -271,6 +274,16 @@ class MpiWorld
int recvRank,
int batchSize = 0);

/* Function Migration */

void tryMigrate(int thisRank);

void prepareMigration(
int thisRank,
std::shared_ptr<faabric::PendingMigrations> pendingMigrations);

void finishMigration(int thisRank);

/* Helper methods */

void checkRanksRange(int sendRank, int recvRank);
Expand Down
36 changes: 36 additions & 0 deletions include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <faabric/proto/faabric.pb.h>
#include <faabric/scheduler/ExecGraph.h>
#include <faabric/scheduler/FunctionCallClient.h>
#include <faabric/scheduler/FunctionMigrationThread.h>
#include <faabric/scheduler/InMemoryMessageQueue.h>
#include <faabric/snapshot/SnapshotClient.h>
#include <faabric/snapshot/SnapshotRegistry.h>
Expand All @@ -22,6 +23,10 @@

namespace faabric::scheduler {

typedef std::pair<std::shared_ptr<BatchExecuteRequest>,
std::shared_ptr<faabric::util::SchedulingDecision>>
InFlightPair;

class Scheduler;

Scheduler& getScheduler();
Expand Down Expand Up @@ -77,6 +82,9 @@ class Executor
faabric::Message& msg,
bool createIfNotExists = false);

void doMigration(
std::shared_ptr<faabric::PendingMigrations> pendingMigrations);

protected:
virtual void restore(const std::string& snapshotKey);

Expand All @@ -96,6 +104,8 @@ class Executor

uint32_t threadPoolSize = 0;

void migrateFunction(const faabric::Message& msg, const std::string& host);

private:
std::atomic<bool> claimed = false;

Expand Down Expand Up @@ -220,6 +230,18 @@ class Scheduler

ExecGraph getFunctionExecGraph(unsigned int msgId);

// ----------------------------------
// Function Migration
// ----------------------------------
void checkForMigrationOpportunities();

std::shared_ptr<faabric::PendingMigrations> canAppBeMigrated(
csegarragonz marked this conversation as resolved.
Show resolved Hide resolved
uint32_t appId);

void addPendingMigration(std::shared_ptr<faabric::PendingMigrations> msg);

void removePendingMigration(uint32_t appId);

private:
std::string thisHost;

Expand Down Expand Up @@ -290,6 +312,20 @@ class Scheduler

// ---- Point-to-point ----
faabric::transport::PointToPointBroker& broker;

// ---- Function migration ----
FunctionMigrationThread functionMigrationThread;
std::unordered_map<uint32_t, InFlightPair> inFlightRequests;
std::unordered_map<uint32_t, std::shared_ptr<faabric::PendingMigrations>>
pendingMigrations;

std::vector<std::shared_ptr<faabric::PendingMigrations>>
doCheckForMigrationOpportunities(
faabric::util::MigrationStrategy migrationStrategy =
faabric::util::MigrationStrategy::BIN_PACK);

void broadcastAddPendingMigration(
std::shared_ptr<faabric::PendingMigrations> pendingMigrations);
};

}
7 changes: 7 additions & 0 deletions include/faabric/util/message.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#pragma once

#include <faabric/proto/faabric.pb.h>

namespace faabric::util {
void copyMessage(const faabric::Message* src, faabric::Message* dst);
Shillaker marked this conversation as resolved.
Show resolved Hide resolved
}
12 changes: 12 additions & 0 deletions include/faabric/util/scheduling.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,16 @@ enum SchedulingTopologyHint
FORCE_LOCAL,
NEVER_ALONE
};

// Migration strategies help the scheduler decide wether the scheduling decision
// for a batch request could be changed with the new set of available resources.
// - BIN_PACK: sort hosts by the number of functions from the batch they are
// running. Bin-pack batches in increasing order to hosts in
// decreasing order.
// - EMPTY_HOSTS: pack batches in increasing order to empty hosts.
enum MigrationStrategy
{
BIN_PACK,
EMPTY_HOSTS
};
}
20 changes: 20 additions & 0 deletions src/proto/faabric.proto
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,9 @@ message Message {
bool recordExecGraph = 41;
map<string, int32> intExecGraphDetails = 42;
map<string, string> execGraphDetails = 43;

// Function migration
int32 migrationCheckPeriod = 44;
}

// ---------------------------------------------
Expand Down Expand Up @@ -242,3 +245,20 @@ message PointToPointMappings {

repeated PointToPointMapping mappings = 3;
}

// ---------------------------------------------
// FUNCTION MIGRATIONS
// ---------------------------------------------

message PendingMigrations {
int32 appId = 1;
int32 groupId = 2;

message PendingMigration {
Message msg = 1;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICT this is the message that's sent to worker hosts to tell it to migrate a function across hosts. If that's right, doesn't that host already have the messge, so we just need the message ID here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All functions in the app will read the same PendingMigrations message. They:

  • Iterate over the PendingMigration repeated field (migrations).
  • If the message corresponds to their message id, they themselves must trigger their own migration.
  • To do so, they need to know where they are headed dstHost.

Maybe the srcHost field is not strictly needed, but it can be asserted/makes printing and logging easier.

string srcHost = 2;
string dstHost = 3;
}

repeated PendingMigration migrations = 3;
}
1 change: 1 addition & 0 deletions src/scheduler/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ faabric_lib(scheduler
Executor.cpp
FunctionCallClient.cpp
FunctionCallServer.cpp
FunctionMigrationThread.cpp
MpiContext.cpp
MpiMessageBuffer.cpp
MpiWorld.cpp
Expand Down
23 changes: 23 additions & 0 deletions src/scheduler/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,29 @@ void Executor::releaseClaim()
claimed.store(false);
}

void Executor::doMigration(
std::shared_ptr<faabric::PendingMigrations> pendingMigrations)
{
for (int i = 0; i < pendingMigrations->migrations_size(); i++) {
auto m = pendingMigrations->mutable_migrations()->at(i);
if (m.msg().id() == boundMessage.id()) {
migrateFunction(m.msg(), m.dsthost());
// TODO: terminate current executing thread
}
}
}

void Executor::migrateFunction(const faabric::Message& msg,
const std::string& host)
{
SPDLOG_INFO("Executor received request to migrate message {} from host {}"
" to host {}",
msg.id(),
msg.executedhost(),
host);
SPDLOG_ERROR("Executor::migrate() not implemented");
}

// ------------------------------------------
// HOOKS
// ------------------------------------------
Expand Down
28 changes: 28 additions & 0 deletions src/scheduler/FunctionCallClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ static std::unordered_map<std::string,
faabric::util::Queue<faabric::HostResources>>
queuedResourceResponses;

static std::vector<
std::pair<std::string, std::shared_ptr<faabric::PendingMigrations>>>
addPendingMigrationRequests;

static std::vector<std::pair<std::string, faabric::UnregisterRequest>>
unregisterRequests;

Expand Down Expand Up @@ -57,6 +61,13 @@ std::vector<std::pair<std::string, faabric::EmptyRequest>> getResourceRequests()
return resourceRequests;
}

std::vector<std::pair<std::string, std::shared_ptr<faabric::PendingMigrations>>>
getAddPendingMigrationRequests()
{
faabric::util::UniqueLock lock(mockMutex);
return addPendingMigrationRequests;
}

std::vector<std::pair<std::string, faabric::UnregisterRequest>>
getUnregisterRequests()
{
Expand All @@ -76,6 +87,7 @@ void clearMockRequests()
functionCalls.clear();
batchMessages.clear();
resourceRequests.clear();
addPendingMigrationRequests.clear();
unregisterRequests.clear();

for (auto& p : queuedResourceResponses) {
Expand Down Expand Up @@ -128,6 +140,22 @@ faabric::HostResources FunctionCallClient::getResources()
return response;
}

void FunctionCallClient::sendAddPendingMigration(
std::shared_ptr<PendingMigrations> req)
{
faabric::PendingMigrations request;
faabric::EmptyResponse response;

if (faabric::util::isMockMode()) {
faabric::util::UniqueLock lock(mockMutex);
addPendingMigrationRequests.emplace_back(host, req);
} else {
syncSend(faabric::scheduler::FunctionCalls::AddPendingMigration,
csegarragonz marked this conversation as resolved.
Show resolved Hide resolved
req.get(),
&response);
}
}

void FunctionCallClient::executeFunctions(
const std::shared_ptr<faabric::BatchExecuteRequest> req)
{
Expand Down
16 changes: 16 additions & 0 deletions src/scheduler/FunctionCallServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ std::unique_ptr<google::protobuf::Message> FunctionCallServer::doSyncRecv(
case faabric::scheduler::FunctionCalls::GetResources: {
return recvGetResources(buffer, bufferSize);
}
case faabric::scheduler::FunctionCalls::AddPendingMigration: {
return recvAddPendingMigration(buffer, bufferSize);
}
default: {
throw std::runtime_error(
fmt::format("Unrecognized sync call header: {}", header));
Expand Down Expand Up @@ -100,4 +103,17 @@ std::unique_ptr<google::protobuf::Message> FunctionCallServer::recvGetResources(
scheduler.getThisHostResources());
return response;
}

std::unique_ptr<google::protobuf::Message>
FunctionCallServer::recvAddPendingMigration(const uint8_t* buffer,
size_t bufferSize)
{
PARSE_MSG(faabric::PendingMigrations, buffer, bufferSize);

auto msgPtr = std::make_shared<faabric::PendingMigrations>(msg);

scheduler.addPendingMigration(msgPtr);

return std::make_unique<faabric::EmptyResponse>();
}
}
Loading