Skip to content

Commit

Permalink
A large number of performance changes (related to faasm#156)
Browse files Browse the repository at this point in the history
  • Loading branch information
eigenraven committed Oct 29, 2021
1 parent 8a70efd commit a03d5fe
Show file tree
Hide file tree
Showing 32 changed files with 907 additions and 287 deletions.
10 changes: 8 additions & 2 deletions examples/server.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#include <faabric/endpoint/FaabricEndpoint.h>
#include <faabric/endpoint/Endpoint.h>
#include <faabric/endpoint/FaabricEndpointHandler.h>
#include <faabric/runner/FaabricMain.h>
#include <faabric/scheduler/ExecutorFactory.h>
#include <faabric/transport/context.h>
#include <faabric/util/config.h>
#include <faabric/util/logging.h>

using namespace faabric::scheduler;
Expand Down Expand Up @@ -50,7 +52,11 @@ int main()

// Start endpoint (will also have multiple threads)
SPDLOG_INFO("Starting endpoint");
faabric::endpoint::FaabricEndpoint endpoint;
const auto& config = faabric::util::getSystemConfig();
faabric::endpoint::Endpoint endpoint(
config.endpointPort,
config.endpointNumThreads,
std::make_shared<faabric::endpoint::FaabricEndpointHandler>());
endpoint.start();

SPDLOG_INFO("Shutting down endpoint");
Expand Down
45 changes: 35 additions & 10 deletions include/faabric/endpoint/Endpoint.h
Original file line number Diff line number Diff line change
@@ -1,28 +1,53 @@
#pragma once

#include <functional>
#include <memory>

#include <faabric/proto/faabric.pb.h>
#include <faabric/util/asio.h>
#include <faabric/util/config.h>
#include <pistache/endpoint.h>
#include <pistache/http.h>

namespace faabric::endpoint {
namespace detail {
struct EndpointState;
}

struct HttpRequestContext
{
asio::io_context& ioc;
asio::any_io_executor executor;
std::function<void(faabric::util::BeastHttpResponse&&)> sendFunction;
};

class HttpRequestHandler
{
public:
virtual void onRequest(HttpRequestContext&& ctx,
faabric::util::BeastHttpRequest&& request) = 0;
};

class Endpoint
{
public:
Endpoint();
Endpoint() = delete;
Endpoint(const Endpoint&) = delete;
Endpoint(Endpoint&&) = delete;
Endpoint& operator=(const Endpoint&) = delete;
Endpoint& operator=(Endpoint&&) = delete;
virtual ~Endpoint();

Endpoint(int port, int threadCount);
Endpoint(int port,
int threadCount,
std::shared_ptr<HttpRequestHandler> requestHandlerIn);

void start(bool awaitSignal = true);

void stop();

virtual std::shared_ptr<Pistache::Http::Handler> getHandler() = 0;

private:
int port = faabric::util::getSystemConfig().endpointPort;
int threadCount = faabric::util::getSystemConfig().endpointNumThreads;

Pistache::Http::Endpoint httpEndpoint;
int port;
int threadCount;
std::unique_ptr<detail::EndpointState> state;
std::shared_ptr<HttpRequestHandler> requestHandler;
};
}
16 changes: 0 additions & 16 deletions include/faabric/endpoint/FaabricEndpoint.h

This file was deleted.

25 changes: 13 additions & 12 deletions include/faabric/endpoint/FaabricEndpointHandler.h
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@
#pragma once

#include <faabric/endpoint/Endpoint.h>
#include <faabric/proto/faabric.pb.h>
#include <pistache/http.h>

namespace faabric::endpoint {
class FaabricEndpointHandler : public Pistache::Http::Handler
class FaabricEndpointHandler final
: public HttpRequestHandler
, public std::enable_shared_from_this<FaabricEndpointHandler>
{
public:
HTTP_PROTOTYPE(FaabricEndpointHandler)

void onTimeout(const Pistache::Http::Request& request,
Pistache::Http::ResponseWriter writer) override;

void onRequest(const Pistache::Http::Request& request,
Pistache::Http::ResponseWriter response) override;

std::pair<int, std::string> handleFunction(const std::string& requestStr);
void onRequest(HttpRequestContext&& ctx,
faabric::util::BeastHttpRequest&& request) override;

private:
std::pair<int, std::string> executeFunction(faabric::Message& msg);
void executeFunction(HttpRequestContext&& ctx,
faabric::util::BeastHttpResponse&& partialResponse,
faabric::Message&& msg);

void onFunctionResult(HttpRequestContext&& ctx,
faabric::util::BeastHttpResponse&& partialResponse,
faabric::Message& msg);
};
}
3 changes: 2 additions & 1 deletion include/faabric/mpi-native/MpiExecutor.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <faabric/endpoint/FaabricEndpoint.h>
#include <faabric/endpoint/Endpoint.h>
#include <faabric/endpoint/FaabricEndpointHandler.h>
#include <faabric/scheduler/ExecutorFactory.h>
#include <faabric/scheduler/Scheduler.h>

Expand Down
1 change: 1 addition & 0 deletions include/faabric/scheduler/FunctionCallApi.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ enum FunctionCalls
Unregister = 3,
GetResources = 4,
SetThreadResult = 5,
DirectResult = 6,
};
}
2 changes: 2 additions & 0 deletions include/faabric/scheduler/FunctionCallClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class FunctionCallClient : public faabric::transport::MessageEndpointClient
void executeFunctions(
const std::shared_ptr<faabric::BatchExecuteRequest> req);

void sendDirectResult(faabric::Message msg);

void unregister(faabric::UnregisterRequest& req);

private:
Expand Down
2 changes: 2 additions & 0 deletions include/faabric/scheduler/FunctionCallServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,7 @@ class FunctionCallServer final
void recvExecuteFunctions(const uint8_t* buffer, size_t bufferSize);

void recvUnregister(const uint8_t* buffer, size_t bufferSize);

void recvDirectResult(const uint8_t* buffer, size_t bufferSize);
};
}
57 changes: 52 additions & 5 deletions include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,20 @@
#include <faabric/scheduler/FunctionCallClient.h>
#include <faabric/scheduler/InMemoryMessageQueue.h>
#include <faabric/snapshot/SnapshotClient.h>
#include <faabric/util/asio.h>
#include <faabric/util/config.h>
#include <faabric/util/func.h>
#include <faabric/util/queue.h>
#include <faabric/util/scheduling.h>
#include <faabric/util/snapshot.h>
#include <faabric/util/timing.h>

#include <atomic>
#include <condition_variable>
#include <cstdint>
#include <functional>
#include <future>
#include <optional>
#include <shared_mutex>

#define AVAILABLE_HOST_SET "available_hosts"
Expand Down Expand Up @@ -55,6 +61,8 @@ class Executor

void finish();

virtual void setup(faabric::Message& msg);

virtual void reset(faabric::Message& msg);

virtual int32_t executeTask(
Expand All @@ -73,6 +81,8 @@ class Executor
protected:
virtual void restore(faabric::Message& msg);

virtual void softShutdown();

virtual void postFinish();

faabric::Message boundMessage;
Expand All @@ -88,11 +98,37 @@ class Executor
std::vector<std::shared_ptr<std::thread>> threadPoolThreads;
std::vector<std::shared_ptr<std::thread>> deadThreads;

std::mutex setupMutex;
std::atomic_bool setupDone;

std::vector<faabric::util::Queue<ExecutorTask>> threadTaskQueues;

void threadPoolThread(int threadPoolIdx);
};

struct MessageLocalResult final
{
std::promise<std::unique_ptr<faabric::Message>> promise;
int event_fd = -1;

MessageLocalResult();
MessageLocalResult(const MessageLocalResult&) = delete;
inline MessageLocalResult(MessageLocalResult&& other)
{
this->operator=(std::move(other));
}
MessageLocalResult& operator=(const MessageLocalResult&) = delete;
inline MessageLocalResult& operator=(MessageLocalResult&& other)
{
this->promise = std::move(other.promise);
this->event_fd = other.event_fd;
other.event_fd = -1;
return *this;
}
~MessageLocalResult();
void set_value(std::unique_ptr<faabric::Message>&& msg);
};

class Scheduler
{
public:
Expand Down Expand Up @@ -128,6 +164,12 @@ class Scheduler

faabric::Message getFunctionResult(unsigned int messageId, int timeout);

void getFunctionResultAsync(unsigned int messageId,
int timeoutMs,
asio::io_context& ioc,
asio::any_io_executor& executor,
std::function<void(faabric::Message&)> handler);

void setThreadResult(const faabric::Message& msg, int32_t returnValue);

void pushSnapshotDiffs(
Expand Down Expand Up @@ -183,7 +225,15 @@ class Scheduler

ExecGraph getFunctionExecGraph(unsigned int msgId);

void updateMonitoring();

std::atomic_int32_t monitorLocallyScheduledTasks;
std::atomic_int32_t monitorStartedTasks;
std::atomic_int32_t monitorWaitingTasks;

private:
int monitorFd = -1;

std::string thisHost;

faabric::util::SystemConfig& conf;
Expand All @@ -208,8 +258,7 @@ class Scheduler
std::set<std::string> availableHostsCache;
std::unordered_map<std::string, std::set<std::string>> registeredHosts;

std::unordered_map<uint32_t,
std::promise<std::unique_ptr<faabric::Message>>>
std::unordered_map<uint32_t, std::shared_ptr<MessageLocalResult>>
localResults;
std::mutex localResultsMutex;

Expand All @@ -221,9 +270,7 @@ class Scheduler
std::vector<std::string> getUnregisteredHosts(const std::string& funcStr,
bool noCache = false);

std::shared_ptr<Executor> claimExecutor(
faabric::Message& msg,
faabric::util::FullLock& schedulerLock);
std::shared_ptr<Executor> claimExecutor(faabric::Message& msg);

faabric::HostResources getHostResources(const std::string& host);

Expand Down
2 changes: 1 addition & 1 deletion include/faabric/snapshot/SnapshotRegistry.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class SnapshotRegistry

bool snapshotExists(const std::string& key);

void mapSnapshot(const std::string& key, uint8_t* target);
uint8_t* mapSnapshot(const std::string& key, uint8_t* target);

void takeSnapshot(const std::string& key,
faabric::util::SnapshotData data,
Expand Down
14 changes: 14 additions & 0 deletions include/faabric/util/asio.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#pragma once

#include <boost/asio.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include <boost/beast/version.hpp>

namespace asio = boost::asio;
namespace beast = boost::beast;

namespace faabric::util {
using BeastHttpRequest = beast::http::request<beast::http::string_body>;
using BeastHttpResponse = beast::http::response<beast::http::string_body>;
}
3 changes: 3 additions & 0 deletions include/faabric/util/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ class SystemConfig
int endpointPort;
int endpointNumThreads;

// Monitoring
std::string schedulerMonitorFile;

// Transport
int functionServerThreads;
int stateServerThreads;
Expand Down
13 changes: 8 additions & 5 deletions include/faabric/util/delta.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <cstdint>
#include <functional>
#include <string>
#include <utility>
#include <vector>

namespace faabric::util {
Expand Down Expand Up @@ -40,11 +41,13 @@ enum DeltaCommand : uint8_t
DELTACMD_END = 0xFE,
};

std::vector<uint8_t> serializeDelta(const DeltaSettings& cfg,
const uint8_t* oldDataStart,
size_t oldDataLen,
const uint8_t* newDataStart,
size_t newDataLen);
std::vector<uint8_t> serializeDelta(
const DeltaSettings& cfg,
const uint8_t* oldDataStart,
size_t oldDataLen,
const uint8_t* newDataStart,
size_t newDataLen,
const std::vector<std::pair<uint32_t, uint32_t>>* excludedPtrLens = nullptr);

void applyDelta(const std::vector<uint8_t>& delta,
std::function<void(uint32_t)> setDataSize,
Expand Down
1 change: 0 additions & 1 deletion src/endpoint/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@

faabric_lib(endpoint
Endpoint.cpp
FaabricEndpoint.cpp
FaabricEndpointHandler.cpp
)

Expand Down
Loading

0 comments on commit a03d5fe

Please sign in to comment.