Skip to content

Commit

Permalink
[TransferEngine] Refactor code to hide transport logics from user APIs (
Browse files Browse the repository at this point in the history
#51)


Co-authored-by: doujiang24 <doujiang24@gmail.com>
  • Loading branch information
alogfans and doujiang24 authored Jan 2, 2025
1 parent bf5f56d commit 64ddda4
Show file tree
Hide file tree
Showing 37 changed files with 1,937 additions and 1,386 deletions.
2 changes: 1 addition & 1 deletion doc/en/p2p-store.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func NewP2PStore(metadataUri string, localSegmentName string, nicPriorityMatrix
Creates an instance of `P2PStore`, which internally starts a Transfer Engine service.
- `metadataUri`: The hostname or IP address of the metadata server/etcd service.
- `localSegmentName`: The local server name (hostname/IP address:port), ensuring uniqueness within the cluster.
- `nicPriorityMatrix`: The network interface card priority order matrix, see the related description in the Transfer Engine API documentation (`TransferEngine::installOrGetTransport`).
- `nicPriorityMatrix`: The network interface card priority order matrix, see the related description in the Transfer Engine API documentation (`TransferEngine::installTransport`).
- Return value: If successful, returns a pointer to the `P2PStore` instance, otherwise returns `error`.

```go
Expand Down
62 changes: 33 additions & 29 deletions doc/en/transfer-engine.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,17 +122,18 @@ After successfully compiling Transfer Engine, the test program `transfer_engine_
# This is 10.0.0.2
export MC_GID_INDEX=n
./transfer_engine_bench --mode=target \
--metadata_server=10.0.0.1:2379 \
--metadata_server=etcd://10.0.0.1:2379 \
--local_server_name=10.0.0.2:12345 \
--device_name=erdma_0
```
The meanings of the various parameters are as follows:
- The default value of the parameter corresponding to the environment variable `MC_GID_INDEX` is 0, which means that the Transfer Engine selects a GID that is most likely to be connected.
If the connection is hung, the user still needs to set the value of such a environment variable manually.
- The default value of the parameter corresponding to the environment variable `MC_GID_INDEX` is 0, which means that the Transfer Engine selects a GID that is most likely to be connected. Since this parameter depends on the specific network environment, the user has to set the value of the environment variable manually if the connection is hung. The environment variable `NCCL_IB_GID_INDEX` is equivalent to this function.
- `--mode=target` indicates the start of the target node. The target node does not initiate read/write requests; it passively supplies or writes data as required by the initiator node.
> Note: In actual applications, there is no need to distinguish between target nodes and initiator nodes; each node can freely initiate read/write requests to other nodes in the cluster.
- `--metadata_server` is the address of the metadata server (the full address of the etcd service).
> Change `--metadata_server` to `--metadata_server=http://10.0.0.1:8080/metadata` and add `--metadata_type=http` when using `http` as the `metadata` service.
- `--metadata_server` is the address of the metadata server. Its form is `[proto]://[hostname:port]`. For example, the following addresses are VALID:
- Use `etcd` as metadata storage: `"10.0.0.1:2379"`, `"etcd://10.0.0.1:2379"` or `"etcd://10.0.0.1:2379,10.0.0.2:2379"`
- Use `redis` as metadata storage: `"redis://10.0.0.1:6379"`
- Use `http` as metadata storage: `"http://10.0.0.1:8080/metadata"`
- `--local_server_name` represents the address of this machine, which does not need to be set in most cases. If this option is not set, the value is equivalent to the hostname of this machine (i.e., `hostname(2)`). Other nodes in the cluster will use this address to attempt out-of-band communication with this node to establish RDMA connections.
> Note: If out-of-band communication fails, the connection cannot be established. Therefore, if necessary, you need to modify the `/etc/hosts` file on all nodes in the cluster to locate the correct node through the hostname.
- `--device_name` indicates the name of the RDMA network card used in the transfer process.
Expand Down Expand Up @@ -168,11 +169,11 @@ Transfer Engine provides interfaces through the `TransferEngine` class (located

### Data Transfer

#### Transport::TransferRequest
#### TransferEngine::TransferRequest

The core API provided by Mooncake Transfer Engine is submitting a group of asynchronous `Transport::TransferRequest` tasks through the `Transport::submitTransfer` interface, and querying their status through the `Transport::getTransferStatus` interface. Each `Transport::TransferRequest` specifies reading or writing a continuous data space of `length` starting from the local starting address `source`, to the position starting at `target_offset` in the segment corresponding to `target_id`.
The core API provided by Mooncake Transfer Engine is submitting a group of asynchronous `TransferRequest` tasks through the `submitTransfer` interface, and querying their status through the `getTransferStatus` interface. Each `TransferRequest` specifies reading or writing a continuous data space of `length` starting from the local starting address `source`, to the position starting at `target_offset` in the segment corresponding to `target_id`.

The `Transport::TransferRequest` structure is defined as follows:
The `TransferRequest` structure is defined as follows:

```cpp
using SegmentID = int32_t;
Expand All @@ -194,7 +195,7 @@ struct TransferRequest
- NVMeOF space type, where each file corresponds to a segment. In this case, the segment name passed to the `openSegment` interface is equivalent to the unique identifier of the file. `target_offset` is the offset of the target file.
- `length` represents the amount of data transferred. TransferEngine may further split this into multiple read/write requests internally.
#### Transport::allocateBatchID
#### TransferEngine::allocateBatchID
```cpp
BatchID allocateBatchID(size_t batch_size);
Expand All @@ -205,7 +206,7 @@ Allocates a `BatchID`. A maximum of `batch_size` `TransferRequest`s can be submi
- `batch_size`: The maximum number of `TransferRequest`s that can be submitted under the same `BatchID`;
- Return value: If successful, returns `BatchID` (non-negative); otherwise, returns a negative value.
#### Transport::submitTransfer
#### TransferEngine::submitTransfer
```cpp
int submitTransfer(BatchID batch_id, const std::vector<TransferRequest> &entries);
Expand All @@ -217,7 +218,7 @@ Submits new `TransferRequest` tasks to `batch_id`. The task is asynchronously su
- `entries`: Array of `TransferRequest`;
- Return value: If successful, returns 0; otherwise, returns a negative value.
#### Transport::getTransferStatus
#### TransferEngine::getTransferStatus
```cpp
enum TaskStatus
Expand All @@ -244,7 +245,7 @@ Obtains the running status of the `TransferRequest` with `task_id` in `batch_id`
- `status`: Output Transfer status;
- Return value: If successful, returns 0; otherwise, returns a negative value.
#### Transport::freeBatchID
#### TransferEngine::freeBatchID
```cpp
int freeBatchID(BatchID batch_id);
Expand All @@ -258,9 +259,9 @@ Recycles `BatchID`, and subsequent operations on `submitTransfer` and `getTransf
### Multi-Transport Management
The `TransferEngine` class internally manages multiple backend `Transport` classes, and users can load or unload `Transport` for different backends in `TransferEngine`.
#### TransferEngine::installOrGetTransport
#### TransferEngine::installTransport
```cpp
Transport* installOrGetTransport(const std::string& proto, void** args);
Transport* installTransport(const std::string& proto, void** args);
```
Registers `Transport` in `TransferEngine`. If a `Transport` for a certain protocol already exists, it returns that `Transport`.
Expand All @@ -272,7 +273,7 @@ Registers `Transport` in `TransferEngine`. If a `Transport` for a certain protoc
##### TCP Transfer Mode
For TCP transfer mode, there is no need to pass `args` objects when registering the `Transport` object.
```cpp
engine->installOrGetTransport("tcp", nullptr);
engine->installTransport("tcp", nullptr);
```
##### RDMA Transfer Mode
Expand All @@ -281,7 +282,7 @@ For RDMA transfer mode, the network card priority marrix must be specified throu
void** args = (void**) malloc(2 * sizeof(void*));
args[0] = /* topology matrix */;
args[1] = nullptr;
engine->installOrGetTransport("rdma", args);
engine->installTransport("rdma", args);
```
The network card priority marrix is a JSON string indicating the storage medium name and the list of network cards to be used preferentially, as shown in the example below:
```json
Expand All @@ -302,7 +303,7 @@ For NVMeOF transfer mode, the file path must be specified through `args` during
void** args = (void**) malloc(2 * sizeof(void*));
args[0] = /* topology matrix */;
args[1] = nullptr;
engine->installOrGetTransport("nvmeof", args);
engine->installTransport("nvmeof", args);
```
#### TransferEngine::uninstallTransport
Expand All @@ -328,7 +329,7 @@ Registers a space starting at address `addr` with a length of `size` on the loca
- `addr`: The starting address of the registration space;
- `size`: The length of the registration space;
- `location`: The `device` corresponding to this memory segment, such as `cuda:0` indicating the GPU device, `cpu:0` indicating the CPU socket, by matching with the network card priority order table (see `installOrGetTransport`), the preferred network card is identified.
- `location`: The `device` corresponding to this memory segment, such as `cuda:0` indicating the GPU device, `cpu:0` indicating the CPU socket, by matching with the network card priority order table (see `installTransport`), the preferred network card is identified.
- `remote_accessible`: Indicates whether this memory can be accessed by remote nodes.
- Return value: If successful, returns 0; otherwise, returns a negative value.
Expand Down Expand Up @@ -442,20 +443,23 @@ For specific implementation, refer to the demo service implemented in Golang at
### Initialization
TransferEngine needs to initializing by calling the `init` method before further actions:
```cpp
TransferEngine(std::unique_ptr<TransferMetadata> metadata_client);
TransferMetadata(const std::string &metadata_server, const std::string &protocol = "etcd");
```
- Pointer to a `TransferMetadata` object, which abstracts the communication logic between the TransferEngine framework and the metadata server. We currently support `etcd`, `redis` and `http` protocols, while `metadata_server` represents the IP address or hostname of the etcd or redis server, or the base HTTP URI of http server.
TransferEngine();
For easy exception handling, TransferEngine needs to call the init function for secondary construction after construction:
```cpp
int init(std::string& server_name, std::string& connectable_name, uint64_t rpc_port = 12345);
int init(const std::string &metadata_conn_string,
const std::string &local_server_name,
const std::string &ip_or_host_name,
uint64_t rpc_port = 12345);
```
- `server_name`: The local server name, ensuring uniqueness within the cluster. It also serves as the name of the RAM Segment that other nodes refer to the current instance (i.e., Segment Name).
- `connectable_name`: The name used for other clients to connect, which can be a hostname or IP address.
- `metadata_conn_string`: Connecting string of metadata storage servers, i.e., the IP address/hostname of `etcd`/`redis` or the URI of the http service.
The general form is `[proto]://[hostname:port]`. For example, the following metadata server addresses are legal:
- Using `etcd` as a metadata storage service: `“10.0.0.1:2379”` or `“etcd://10.0.0.1:2379”`.
- Using `redis` as a metadata storage service: `“redis://10.0.0.1:6379”`
- Using `http` as a metadata storage service: `“http://10.0.0.1:8080/metadata”`
- `local_server_name`: The local server name, ensuring uniqueness within the cluster. It also serves as the name of the RAM Segment that other nodes refer to the current instance (i.e., Segment Name).
- `ip_or_host_name`: The name used for other clients to connect, which can be a hostname or IP address.
- `rpc_port`: The rpc port used for interaction with other clients.
- Return value: If successful, returns 0; if TransferEngine has already been init, returns -1.
Expand Down
2 changes: 1 addition & 1 deletion doc/zh/p2p-store.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func NewP2PStore(metadataUri string, localSegmentName string, nicPriorityMatrix
创建 P2PStore 实例,该实例内部会启动一个 Transfer Engine 服务。
- `metadataUri`:元数据服务器/etcd服务所在主机名或 IP 地址。
- `localSegmentName`:本地的服务器名称(主机名/IP地址:端口号),保证在集群内唯一。
- `nicPriorityMatrix`:网卡优先级顺序表,参见位于 Transfer Engine API 文档的相关描述(`TransferEngine::installOrGetTransport`)。
- `nicPriorityMatrix`:网卡优先级顺序表,参见位于 Transfer Engine API 文档的相关描述(`TransferEngine::installTransport`)。
- 返回值:若成功则返回 `P2PStore` 实例指针,否则返回 `error`。

```go
Expand Down
Loading

0 comments on commit 64ddda4

Please sign in to comment.