-
Notifications
You must be signed in to change notification settings - Fork 155
PYTHON: new remote storage example #841
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
# NIXL Storage Transfer Sample | ||
|
||
A high-performance storage transfer system built on NIXL (NVIDIA Inference Xfer Library) that demonstrates local and remote storage operations using POSIX and GDS (GPU Direct Storage) backends. | ||
|
||
## Features | ||
|
||
- **Flexible Storage Backends** | ||
- GDS (GPU Direct Storage) support for high-performance transfers | ||
- POSIX fallback for standard storage operations | ||
- Automatic backend selection based on availability | ||
|
||
- **Transfer Modes** | ||
- Local memory-to-storage transfers | ||
- Remote memory-to-storage transfers | ||
- Bidirectional operations (READ/WRITE) | ||
- Batch processing support | ||
|
||
- **Network Communication** | ||
- UCX-based data transfer | ||
- Metadata exchange between nodes | ||
- Asynchronous notification system | ||
|
||
## Project Structure | ||
|
||
- `nixl_storage_utils/` - Core module providing storage utilities | ||
- `__init__.py` - Module initialization and exports | ||
- `common.py` - Common utilities and shared functionality | ||
- `nixl_p2p_storage_example.py` - Main application that can run as either initiator or target | ||
|
||
## Requirements | ||
|
||
- Python 3.6+ | ||
- NIXL library with the following plugins: | ||
- GDS (optional) | ||
- POSIX | ||
- UCX | ||
|
||
## Usage | ||
|
||
The system operates in two modes: client and server. | ||
|
||
The servers wait for requests from clients so it can READ/WRITE from its storage to a remote node. | ||
|
||
The initiator initiates transfers and can perform both local and remote operations with storage servers. | ||
|
||
### Running as Client | ||
|
||
```bash | ||
python nixl_p2p_storage_example.py --role client \ | ||
--agents_file <file path> \ | ||
--fileprefix <path_prefix> \ | ||
--agent_name <name> \ | ||
[--buf_size <size>] \ | ||
[--batch_size <count>] | ||
``` | ||
|
||
Role specifies client or server. The agents file is a list of storage servers you want the client to connect to. | ||
|
||
The agents file should have agents separated by line, with "<agent name> <ip address> <port>" on each line. | ||
|
||
File prefix lets you specify a path to run local storage transfers on. | ||
|
||
Agent name is the name you want to give the NIXL agent on this client. | ||
|
||
You can optionally specifically buf_size and batch_size to change how much data is transferred. | ||
|
||
### Running as Server | ||
|
||
```bash | ||
python nixl_p2p_storage_example.py --role server \ | ||
--fileprefix <path_prefix> \ | ||
--agent_name <name> \ | ||
[--buf_size <size>] \ | ||
[--batch_size <count>] | ||
``` | ||
|
||
Parameters are same as before, but names must match what is in the client agents file. | ||
|
||
Additionally, buf_size and batch_size must match what the client specifies. | ||
|
||
## Architecture | ||
|
||
 | ||
|
||
### Storage Module | ||
|
||
The `nixl_storage_utils` module provides core functionality: | ||
- Agent creation and plugin management | ||
- Memory and file resource handling | ||
- Transfer state monitoring | ||
- Common utilities and configuration | ||
|
||
### Storage Backends | ||
|
||
The system automatically selects the best available storage backend: | ||
1. Prefers GDS when available for high-performance GPU-direct storage operations | ||
2. Falls back to POSIX when GDS is unavailable | ||
3. Requires at least one storage backend to operate | ||
|
||
### Transfer Process | ||
|
||
#### Local Transfers | ||
1. Register memory and file descriptors | ||
2. Perform direct memory-to-storage transfers | ||
3. Support both read and write operations | ||
|
||
#### Remote Transfers | ||
1. Initiator sends memory descriptors to target | ||
2. Target performs storage-to-memory or memory-to-storage operations | ||
3. Data is transferred between initiator and target memory |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
target1 127.0.0.1 8888 |
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The Picture misses UCX plugin here to show the network interaction. A little too wordy and feels the point of converged storage with NIXL is getting lost in the picture. Can we simplify by doing the following
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @tstamler one more change can we use the same arrow type for storage data transfer too? |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,244 @@ | ||
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. | ||
# SPDX-License-Identifier: Apache-2.0 | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
""" | ||
NIXL Peer-to-Peer Storage Example | ||
Demonstrates peer-to-peer storage transfers using NIXL with initiator and target modes. | ||
""" | ||
|
||
import time | ||
|
||
import nixl_storage_utils as nsu | ||
|
||
from nixl.logging import get_logger | ||
|
||
logger = get_logger(__name__) | ||
|
||
|
||
def execute_transfer(my_agent, local_descs, remote_descs, remote_name, operation): | ||
handle = my_agent.initialize_xfer(operation, local_descs, remote_descs, remote_name) | ||
my_agent.transfer(handle) | ||
nsu.wait_for_transfer(my_agent, handle) | ||
my_agent.release_xfer_handle(handle) | ||
|
||
|
||
def remote_storage_transfer(my_agent, my_mem_descs, operation, remote_agent_name): | ||
"""Initiate remote memory transfer.""" | ||
if operation != "READ" and operation != "WRITE": | ||
logger.error("Invalid operation, exiting") | ||
exit(-1) | ||
|
||
if operation == "WRITE": | ||
operation = b"WRTE" | ||
else: | ||
operation = b"READ" | ||
|
||
# Send the descriptors that you want to read into or write from | ||
logger.info(f"Sending {operation} request to {remote_agent_name}") | ||
test_descs_str = my_agent.get_serialized_descs(my_mem_descs) | ||
my_agent.send_notif(remote_agent_name, operation + test_descs_str) | ||
|
||
while not my_agent.check_remote_xfer_done(remote_agent_name, b"COMPLETE"): | ||
continue | ||
|
||
|
||
def connect_to_agents(my_agent, agents_file): | ||
target_agents = [] | ||
with open(agents_file, "r") as f: | ||
for line in f: | ||
# Each line in file should be: "<agent_name> <ip> <port>" | ||
parts = line.strip().split() | ||
if len(parts) == 3: | ||
target_agents.append(parts[0]) | ||
my_agent.send_local_metadata(parts[1], int(parts[2])) | ||
my_agent.fetch_remote_metadata(parts[0], parts[1], int(parts[2])) | ||
|
||
while my_agent.check_remote_metadata(parts[0]) is False: | ||
logger.info(f"Waiting for remote metadata for {parts[0]}...") | ||
time.sleep(0.2) | ||
|
||
logger.info(f"Remote metadata for {parts[0]} fetched") | ||
else: | ||
logger.error(f"Invalid line in {agents_file}: {line}") | ||
exit(-1) | ||
|
||
logger.info("All remote metadata fetched") | ||
|
||
return target_agents | ||
|
||
|
||
def handle_remote_transfer_request(my_agent, my_mem_descs, my_file_descs): | ||
"""Handle remote memory and storage transfers as target.""" | ||
# Wait for initiator to send list of memory descriptors | ||
notifs = my_agent.get_new_notifs() | ||
|
||
logger.info("Waiting for a remote transfer request...") | ||
|
||
while len(notifs) == 0: | ||
notifs = my_agent.get_new_notifs() | ||
|
||
for req_agent in notifs: | ||
recv_msg = notifs[req_agent][0] | ||
|
||
operation = None | ||
if recv_msg[:4] == b"READ": | ||
operation = "READ" | ||
elif recv_msg[:4] == b"WRTE": | ||
operation = "WRITE" | ||
else: | ||
logger.error("Invalid operation, exiting") | ||
exit(-1) | ||
|
||
sent_descs = my_agent.deserialize_descs(recv_msg[4:]) | ||
|
||
logger.info("Checking to ensure metadata is loaded...") | ||
while my_agent.check_remote_metadata(req_agent, sent_descs) is False: | ||
continue | ||
|
||
if operation == "READ": | ||
logger.info("Starting READ operation") | ||
|
||
# Read from file first | ||
execute_transfer( | ||
my_agent, my_mem_descs, my_file_descs, my_agent.name, "READ" | ||
) | ||
# Send to client | ||
execute_transfer(my_agent, my_mem_descs, sent_descs, req_agent, "WRITE") | ||
|
||
elif operation == "WRITE": | ||
logger.info("Starting WRITE operation") | ||
|
||
# Read from client first | ||
execute_transfer(my_agent, my_mem_descs, sent_descs, req_agent, "READ") | ||
# Write to storage | ||
execute_transfer( | ||
my_agent, my_mem_descs, my_file_descs, my_agent.name, "WRITE" | ||
) | ||
|
||
# Send completion notification to initiator | ||
my_agent.send_notif(req_agent, b"COMPLETE") | ||
|
||
logger.info("One transfer test complete.") | ||
|
||
|
||
def run_client(my_agent, nixl_mem_reg_descs, nixl_file_reg_descs, agents_file): | ||
logger.info("Client initialized, ready for local transfer test...") | ||
|
||
# For sample purposes, write to and then read from local storage | ||
logger.info("Starting local transfer test...") | ||
execute_transfer( | ||
my_agent, | ||
nixl_mem_reg_descs.trim(), | ||
nixl_file_reg_descs.trim(), | ||
my_agent.name, | ||
"WRITE", | ||
) | ||
execute_transfer( | ||
my_agent, | ||
nixl_mem_reg_descs.trim(), | ||
nixl_file_reg_descs.trim(), | ||
my_agent.name, | ||
"READ", | ||
) | ||
logger.info("Local transfer test complete") | ||
|
||
logger.info("Starting remote transfer test...") | ||
|
||
target_agents = connect_to_agents(my_agent, agents_file) | ||
|
||
# For sample purposes, write to and then read from each target agent | ||
for target_agent in target_agents: | ||
remote_storage_transfer( | ||
my_agent, nixl_mem_reg_descs.trim(), "WRITE", target_agent | ||
) | ||
remote_storage_transfer( | ||
my_agent, nixl_mem_reg_descs.trim(), "READ", target_agent | ||
) | ||
|
||
logger.info("Remote transfer test complete") | ||
|
||
|
||
def run_storage_server(my_agent, nixl_mem_reg_descs, nixl_file_reg_descs): | ||
logger.info("Server initialized, ready for remote transfer test...") | ||
while True: | ||
handle_remote_transfer_request( | ||
my_agent, nixl_mem_reg_descs.trim(), nixl_file_reg_descs.trim() | ||
) | ||
|
||
|
||
if __name__ == "__main__": | ||
parser = nsu.get_base_parser() | ||
parser.add_argument( | ||
"--role", | ||
type=str, | ||
choices=["server", "client"], | ||
required=True, | ||
help="Role of this node (server or client)", | ||
) | ||
parser.add_argument( | ||
"--port", | ||
type=int, | ||
default=5555, | ||
help="Port to listen on for remote transfers (only needed for server)", | ||
) | ||
parser.add_argument("--name", type=str, help="NIXL agent name") | ||
parser.add_argument( | ||
"--agents_file", | ||
type=str, | ||
help="File containing list of target agents (only needed for client)", | ||
) | ||
args = parser.parse_args() | ||
|
||
my_agent = nsu.create_agent_with_plugins(args.name, args.port) | ||
|
||
( | ||
my_mem_list, | ||
my_file_list, | ||
nixl_mem_reg_descs, | ||
nixl_file_reg_descs, | ||
) = nsu.setup_memory_and_files( | ||
my_agent, args.batch_size, args.buf_size, args.fileprefix | ||
) | ||
|
||
if args.role == "client": | ||
if not args.agents_file: | ||
parser.error("--agents_file is required when role is client") | ||
try: | ||
run_client( | ||
my_agent, nixl_mem_reg_descs, nixl_file_reg_descs, args.agents_file | ||
) | ||
finally: | ||
nsu.cleanup_resources( | ||
my_agent, | ||
nixl_mem_reg_descs, | ||
nixl_file_reg_descs, | ||
my_mem_list, | ||
my_file_list, | ||
) | ||
else: | ||
if args.agents_file: | ||
logger.warning("Warning: --agents_file is ignored when role is server") | ||
try: | ||
run_storage_server(my_agent, nixl_mem_reg_descs, nixl_file_reg_descs) | ||
finally: | ||
nsu.cleanup_resources( | ||
my_agent, | ||
nixl_mem_reg_descs, | ||
nixl_file_reg_descs, | ||
my_mem_list, | ||
my_file_list, | ||
) | ||
|
||
logger.info("Test Complete.") |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. | ||
# SPDX-License-Identifier: Apache-2.0 | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
""" | ||
NIXL Storage Utilities Module | ||
Provides utilities for high-performance storage transfers using NIXL. | ||
""" | ||
|
||
from .common import ( | ||
cleanup_resources, | ||
create_agent_with_plugins, | ||
get_base_parser, | ||
setup_memory_and_files, | ||
wait_for_transfer, | ||
) | ||
|
||
__all__ = [ | ||
"create_agent_with_plugins", | ||
"setup_memory_and_files", | ||
"cleanup_resources", | ||
"get_base_parser", | ||
"wait_for_transfer", | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a line about how to install packages like gds installation/pip install nixl and provide links here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is part of the NIXL repo now, so these instructions are already in the base README