Skip to content

Commit 4f4628a

Browse files
Periodically Check For Migration Opportunities (#214)
* proto: add field to set migration check period * scheduler: add function migration thread that sleeps for a fixed amount of time, and a simple test. * scheduler: add logic for check for migration opportunities method * scheduler: start/stop migration thread and fix race condition when removing from in-flight map * tests: add test for function migration thread and fix bug in thread shutdown * util: add method to deep copy a faabric message + test * function call server: add call to add a pending migration to remote hosts * tests: add further testing * mpi: add migration points, link with executor, and tests * tests: fix data race * pr: re-factor methods as suggested in the pr comments * executor: throw exception to properly shutdown executing task * executor: close the loop to alllow for function migration * scheduler: add UNDERFULL scheduling topology hint * Add migration exception, catch in executor * Remove manual message copying * Remove unnecessary migration APIs * Make getMemoryView public * scheduler: get functions to migrate properly * cleanup * mpi: use initLocalQueues() instead of repeating the logic, remove assertion from the method * mpi: remove unused getMpiMockedPendingMigrations * scheduler: remove unused function declarations * scheduler: factor out method to start the function migration thread if necessary * mpi: use a boolean flag to indicate that app has been migrated, and check for flag in barriers to remove used pendingMigrations in the scheduler * proto: make topologyHint a message field, add test for json serialisation, remove unnecessary field to callFunctions, and re-factor necessary calls to callFunctions Co-authored-by: Simon Shillaker <mail@simonshillaker.com>
1 parent 5b67a98 commit 4f4628a

24 files changed

+1213
-53
lines changed

include/faabric/scheduler/FunctionCallApi.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ enum FunctionCalls
77
ExecuteFunctions = 1,
88
Flush = 2,
99
Unregister = 3,
10-
GetResources = 4
10+
GetResources = 4,
11+
PendingMigrations = 5
1112
};
1213
}

include/faabric/scheduler/FunctionCallClient.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ getBatchRequests();
2222
std::vector<std::pair<std::string, faabric::EmptyRequest>>
2323
getResourceRequests();
2424

25+
std::vector<std::pair<std::string, std::shared_ptr<faabric::PendingMigrations>>>
26+
getPendingMigrationsRequests();
27+
2528
std::vector<std::pair<std::string, faabric::UnregisterRequest>>
2629
getUnregisterRequests();
2730

@@ -42,6 +45,8 @@ class FunctionCallClient : public faabric::transport::MessageEndpointClient
4245

4346
faabric::HostResources getResources();
4447

48+
void sendPendingMigrations(std::shared_ptr<faabric::PendingMigrations> req);
49+
4550
void executeFunctions(std::shared_ptr<faabric::BatchExecuteRequest> req);
4651

4752
void unregister(faabric::UnregisterRequest& req);

include/faabric/scheduler/FunctionCallServer.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ class FunctionCallServer final
2929
const uint8_t* buffer,
3030
size_t bufferSize);
3131

32+
std::unique_ptr<google::protobuf::Message> recvPendingMigrations(
33+
const uint8_t* buffer,
34+
size_t bufferSize);
35+
3236
void recvExecuteFunctions(const uint8_t* buffer, size_t bufferSize);
3337

3438
void recvUnregister(const uint8_t* buffer, size_t bufferSize);
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
#pragma once
2+
3+
#include <condition_variable>
4+
#include <mutex>
5+
#include <thread>
6+
7+
namespace faabric::scheduler {
8+
// Start a background thread that, every wake up period, will check if there
9+
// are migration opportunities for in-flight apps that have opted in to
10+
// being checked for migrations.
11+
class FunctionMigrationThread
12+
{
13+
public:
14+
void start(int wakeUpPeriodSecondsIn);
15+
16+
void stop();
17+
18+
int wakeUpPeriodSeconds;
19+
20+
private:
21+
std::unique_ptr<std::thread> workThread = nullptr;
22+
std::mutex mx;
23+
std::condition_variable mustStopCv;
24+
std::atomic<bool> isShutdown;
25+
};
26+
}

include/faabric/scheduler/MpiWorld.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,12 @@ class MpiWorld
207207

208208
void setMsgForRank(faabric::Message& msg);
209209

210+
/* Function Migration */
211+
212+
void prepareMigration(
213+
int thisRank,
214+
std::shared_ptr<faabric::PendingMigrations> pendingMigrations);
215+
210216
private:
211217
int id = -1;
212218
int size = -1;
@@ -283,5 +289,8 @@ class MpiWorld
283289
MPI_Status* status,
284290
faabric::MPIMessage::MPIMessageType messageType =
285291
faabric::MPIMessage::NORMAL);
292+
293+
/* Function migration */
294+
bool hasBeenMigrated = false;
286295
};
287296
}

include/faabric/scheduler/Scheduler.h

Lines changed: 47 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <faabric/proto/faabric.pb.h>
44
#include <faabric/scheduler/ExecGraph.h>
55
#include <faabric/scheduler/FunctionCallClient.h>
6+
#include <faabric/scheduler/FunctionMigrationThread.h>
67
#include <faabric/scheduler/InMemoryMessageQueue.h>
78
#include <faabric/snapshot/SnapshotClient.h>
89
#include <faabric/snapshot/SnapshotRegistry.h>
@@ -22,6 +23,10 @@
2223

2324
namespace faabric::scheduler {
2425

26+
typedef std::pair<std::shared_ptr<BatchExecuteRequest>,
27+
std::shared_ptr<faabric::util::SchedulingDecision>>
28+
InFlightPair;
29+
2530
class Scheduler;
2631

2732
Scheduler& getScheduler();
@@ -77,13 +82,13 @@ class Executor
7782
faabric::Message& msg,
7883
bool createIfNotExists = false);
7984

85+
virtual std::span<uint8_t> getMemoryView();
86+
8087
protected:
8188
virtual void restore(const std::string& snapshotKey);
8289

8390
virtual void postFinish();
8491

85-
virtual std::span<uint8_t> getMemoryView();
86-
8792
virtual void setMemorySize(size_t newSize);
8893

8994
faabric::Message boundMessage;
@@ -131,9 +136,7 @@ class Scheduler
131136
void callFunction(faabric::Message& msg, bool forceLocal = false);
132137

133138
faabric::util::SchedulingDecision callFunctions(
134-
std::shared_ptr<faabric::BatchExecuteRequest> req,
135-
faabric::util::SchedulingTopologyHint =
136-
faabric::util::SchedulingTopologyHint::NORMAL);
139+
std::shared_ptr<faabric::BatchExecuteRequest> req);
137140

138141
faabric::util::SchedulingDecision callFunctions(
139142
std::shared_ptr<faabric::BatchExecuteRequest> req,
@@ -220,6 +223,27 @@ class Scheduler
220223

221224
ExecGraph getFunctionExecGraph(unsigned int msgId);
222225

226+
// ----------------------------------
227+
// Function Migration
228+
// ----------------------------------
229+
void checkForMigrationOpportunities();
230+
231+
std::shared_ptr<faabric::PendingMigrations> getPendingAppMigrations(
232+
uint32_t appId);
233+
234+
void addPendingMigration(std::shared_ptr<faabric::PendingMigrations> msg);
235+
236+
void removePendingMigration(uint32_t appId);
237+
238+
// ----------------------------------
239+
// Clients
240+
// ----------------------------------
241+
faabric::scheduler::FunctionCallClient& getFunctionCallClient(
242+
const std::string& otherHost);
243+
244+
faabric::snapshot::SnapshotClient& getSnapshotClient(
245+
const std::string& otherHost);
246+
223247
private:
224248
std::string thisHost;
225249

@@ -244,13 +268,6 @@ class Scheduler
244268

245269
std::mutex localResultsMutex;
246270

247-
// ---- Clients ----
248-
faabric::scheduler::FunctionCallClient& getFunctionCallClient(
249-
const std::string& otherHost);
250-
251-
faabric::snapshot::SnapshotClient& getSnapshotClient(
252-
const std::string& otherHost);
253-
254271
// ---- Host resources and hosts ----
255272
faabric::HostResources thisHostResources;
256273
std::atomic<int32_t> thisHostUsedSlots = 0;
@@ -290,6 +307,24 @@ class Scheduler
290307

291308
// ---- Point-to-point ----
292309
faabric::transport::PointToPointBroker& broker;
310+
311+
// ---- Function migration ----
312+
FunctionMigrationThread functionMigrationThread;
313+
std::unordered_map<uint32_t, InFlightPair> inFlightRequests;
314+
std::unordered_map<uint32_t, std::shared_ptr<faabric::PendingMigrations>>
315+
pendingMigrations;
316+
317+
std::vector<std::shared_ptr<faabric::PendingMigrations>>
318+
doCheckForMigrationOpportunities(
319+
faabric::util::MigrationStrategy migrationStrategy =
320+
faabric::util::MigrationStrategy::BIN_PACK);
321+
322+
void broadcastPendingMigrations(
323+
std::shared_ptr<faabric::PendingMigrations> pendingMigrations);
324+
325+
void doStartFunctionMigrationThread(
326+
std::shared_ptr<faabric::BatchExecuteRequest> req,
327+
faabric::util::SchedulingDecision& decision);
293328
};
294329

295330
}

include/faabric/util/func.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,14 @@
77

88
namespace faabric::util {
99

10+
class FunctionMigratedException : public faabric::util::FaabricException
11+
{
12+
public:
13+
explicit FunctionMigratedException(std::string message)
14+
: FaabricException(std::move(message))
15+
{}
16+
};
17+
1018
std::string funcToString(const faabric::Message& msg, bool includeId);
1119

1220
std::string funcToString(

include/faabric/util/scheduling.h

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include <cstdint>
44
#include <string>
5+
#include <unordered_map>
56
#include <vector>
67

78
#include <faabric/proto/faabric.pb.h>
@@ -46,12 +47,47 @@ class SchedulingDecision
4647
// requests in a batch.
4748
// - NORMAL: bin-packs requests to slots in hosts starting from the master
4849
// host, and overloadds the master if it runs out of resources.
50+
// - FORCE_LOCAL: force local execution irrespective of the available
51+
// resources.
4952
// - NEVER_ALONE: never allocates a single (non-master) request to a host
5053
// without other requests of the batch.
54+
// - UNDERFULL: schedule up to 50% of the master hosts' capacity to force
55+
// migration opportunities to appear.
5156
enum SchedulingTopologyHint
5257
{
5358
NORMAL,
5459
FORCE_LOCAL,
55-
NEVER_ALONE
60+
NEVER_ALONE,
61+
UNDERFULL,
62+
};
63+
64+
// Map to convert input strings to scheduling topology hints and the other way
65+
// around
66+
const std::unordered_map<std::string, SchedulingTopologyHint>
67+
strToTopologyHint = {
68+
{ "NORMAL", SchedulingTopologyHint::NORMAL },
69+
{ "FORCE_LOCAL", SchedulingTopologyHint::FORCE_LOCAL },
70+
{ "NEVER_ALONE", SchedulingTopologyHint::NEVER_ALONE },
71+
{ "UNDERFULL", SchedulingTopologyHint::UNDERFULL },
72+
};
73+
74+
const std::unordered_map<SchedulingTopologyHint, std::string>
75+
topologyHintToStr = {
76+
{ SchedulingTopologyHint::NORMAL, "NORMAL" },
77+
{ SchedulingTopologyHint::FORCE_LOCAL, "FORCE_LOCAL" },
78+
{ SchedulingTopologyHint::NEVER_ALONE, "NEVER_ALONE" },
79+
{ SchedulingTopologyHint::UNDERFULL, "UNDERFULL" },
80+
};
81+
82+
// Migration strategies help the scheduler decide wether the scheduling decision
83+
// for a batch request could be changed with the new set of available resources.
84+
// - BIN_PACK: sort hosts by the number of functions from the batch they are
85+
// running. Bin-pack batches in increasing order to hosts in
86+
// decreasing order.
87+
// - EMPTY_HOSTS: pack batches in increasing order to empty hosts.
88+
enum MigrationStrategy
89+
{
90+
BIN_PACK,
91+
EMPTY_HOSTS
5692
};
5793
}

src/proto/faabric.proto

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ message BatchExecuteRequest {
2525
FUNCTIONS = 0;
2626
THREADS = 1;
2727
PROCESSES = 2;
28+
MIGRATION = 3;
2829
}
2930

3031
BatchExecuteType type = 2;
@@ -164,6 +165,12 @@ message Message {
164165
bool recordExecGraph = 41;
165166
map<string, int32> intExecGraphDetails = 42;
166167
map<string, string> execGraphDetails = 43;
168+
169+
// Function migration
170+
int32 migrationCheckPeriod = 44;
171+
172+
// Scheduling
173+
string topologyHint = 45;
167174
}
168175

169176
// ---------------------------------------------
@@ -242,3 +249,20 @@ message PointToPointMappings {
242249

243250
repeated PointToPointMapping mappings = 3;
244251
}
252+
253+
// ---------------------------------------------
254+
// FUNCTION MIGRATIONS
255+
// ---------------------------------------------
256+
257+
message PendingMigrations {
258+
int32 appId = 1;
259+
int32 groupId = 2;
260+
261+
message PendingMigration {
262+
Message msg = 1;
263+
string srcHost = 2;
264+
string dstHost = 3;
265+
}
266+
267+
repeated PendingMigration migrations = 3;
268+
}

src/scheduler/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ faabric_lib(scheduler
44
Executor.cpp
55
FunctionCallClient.cpp
66
FunctionCallServer.cpp
7+
FunctionMigrationThread.cpp
78
MpiContext.cpp
89
MpiMessageBuffer.cpp
910
MpiWorld.cpp

src/scheduler/Executor.cpp

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include <faabric/proto/faabric.pb.h>
2+
#include <faabric/scheduler/MpiWorldRegistry.h>
23
#include <faabric/scheduler/Scheduler.h>
34
#include <faabric/snapshot/SnapshotRegistry.h>
45
#include <faabric/state/State.h>
@@ -528,9 +529,28 @@ void Executor::threadPoolThread(int threadPoolIdx)
528529

529530
// Execute the task
530531
int32_t returnValue;
532+
bool migrated = false;
531533
try {
532534
returnValue =
533535
executeTask(threadPoolIdx, task.messageIndex, task.req);
536+
} catch (const faabric::util::FunctionMigratedException& ex) {
537+
SPDLOG_DEBUG(
538+
"Task {} migrated, shutting down executor {}", msg.id(), id);
539+
540+
// Note that when a task has been migrated, we need to perform all
541+
// the normal executor shutdown, but we must NOT set the result for
542+
// the call.
543+
migrated = true;
544+
selfShutdown = true;
545+
returnValue = -99;
546+
547+
// MPI migration
548+
if (msg.ismpi()) {
549+
auto& mpiWorld =
550+
faabric::scheduler::getMpiWorldRegistry().getWorld(
551+
msg.mpiworldid());
552+
mpiWorld.destroy();
553+
}
534554
} catch (const std::exception& ex) {
535555
returnValue = 1;
536556

@@ -667,6 +687,12 @@ void Executor::threadPoolThread(int threadPoolIdx)
667687
// executor.
668688
sch.vacateSlot();
669689

690+
// If the function has been migrated, we drop out here and shut down the
691+
// executor
692+
if (migrated) {
693+
break;
694+
}
695+
670696
// Finally set the result of the task, this will allow anything
671697
// waiting on its result to continue execution, therefore must be
672698
// done once the executor has been reset, otherwise the executor may

0 commit comments

Comments
 (0)