diff --git a/examples/python/remote_storage_example/README.md b/examples/python/remote_storage_example/README.md new file mode 100644 index 000000000..cc5e39ee9 --- /dev/null +++ b/examples/python/remote_storage_example/README.md @@ -0,0 +1,110 @@ +# NIXL Storage Transfer Sample + +A high-performance storage transfer system built on NIXL (NVIDIA Inference Xfer Library) that demonstrates local and remote storage operations using POSIX and GDS (GPU Direct Storage) backends. + +## Features + +- **Flexible Storage Backends** + - GDS (GPU Direct Storage) support for high-performance transfers + - POSIX fallback for standard storage operations + - Automatic backend selection based on availability + +- **Transfer Modes** + - Local memory-to-storage transfers + - Remote memory-to-storage transfers + - Bidirectional operations (READ/WRITE) + - Batch processing support + +- **Network Communication** + - UCX-based data transfer + - Metadata exchange between nodes + - Asynchronous notification system + +## Project Structure + +- `nixl_storage_utils/` - Core module providing storage utilities + - `__init__.py` - Module initialization and exports + - `common.py` - Common utilities and shared functionality +- `nixl_p2p_storage_example.py` - Main application that can run as either initiator or target + +## Requirements + +- Python 3.6+ +- NIXL library with the following plugins: + - GDS (optional) + - POSIX + - UCX + +## Usage + +The system operates in two modes: client and server. + +The servers wait for requests from clients so it can READ/WRITE from its storage to a remote node. + +The initiator initiates transfers and can perform both local and remote operations with storage servers. + +### Running as Client + +```bash +python nixl_p2p_storage_example.py --role client \ + --agents_file \ + --fileprefix \ + --agent_name \ + [--buf_size ] \ + [--batch_size ] +``` + +Role specifies client or server. The agents file is a list of storage servers you want the client to connect to. + +The agents file should have agents separated by line, with " " on each line. + +File prefix lets you specify a path to run local storage transfers on. + +Agent name is the name you want to give the NIXL agent on this client. + +You can optionally specifically buf_size and batch_size to change how much data is transferred. + +### Running as Server + +```bash +python nixl_p2p_storage_example.py --role server \ + --fileprefix \ + --agent_name \ + [--buf_size ] \ + [--batch_size ] +``` + +Parameters are same as before, but names must match what is in the client agents file. + +Additionally, buf_size and batch_size must match what the client specifies. + +## Architecture + +![Client/Server Interaction](client_server_diagram.png) + +### Storage Module + +The `nixl_storage_utils` module provides core functionality: +- Agent creation and plugin management +- Memory and file resource handling +- Transfer state monitoring +- Common utilities and configuration + +### Storage Backends + +The system automatically selects the best available storage backend: +1. Prefers GDS when available for high-performance GPU-direct storage operations +2. Falls back to POSIX when GDS is unavailable +3. Requires at least one storage backend to operate + +### Transfer Process + +#### Local Transfers +1. Register memory and file descriptors +2. Perform direct memory-to-storage transfers +3. Support both read and write operations + +#### Remote Transfers +1. Initiator sends memory descriptors to target +2. Target performs storage-to-memory or memory-to-storage operations +3. Data is transferred between initiator and target memory diff --git a/examples/python/remote_storage_example/agent_file.in b/examples/python/remote_storage_example/agent_file.in new file mode 100644 index 000000000..45c91d2e1 --- /dev/null +++ b/examples/python/remote_storage_example/agent_file.in @@ -0,0 +1 @@ +target1 127.0.0.1 8888 diff --git a/examples/python/remote_storage_example/client_server_diagram.png b/examples/python/remote_storage_example/client_server_diagram.png new file mode 100644 index 000000000..651995d89 Binary files /dev/null and b/examples/python/remote_storage_example/client_server_diagram.png differ diff --git a/examples/python/remote_storage_example/nixl_p2p_storage_example.py b/examples/python/remote_storage_example/nixl_p2p_storage_example.py new file mode 100644 index 000000000..1a63648bf --- /dev/null +++ b/examples/python/remote_storage_example/nixl_p2p_storage_example.py @@ -0,0 +1,244 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the for the specific language governing permissions and +# limitations under the License. + +""" +NIXL Peer-to-Peer Storage Example +Demonstrates peer-to-peer storage transfers using NIXL with initiator and target modes. +""" + +import time + +import nixl_storage_utils as nsu + +from nixl.logging import get_logger + +logger = get_logger(__name__) + + +def execute_transfer(my_agent, local_descs, remote_descs, remote_name, operation): + handle = my_agent.initialize_xfer(operation, local_descs, remote_descs, remote_name) + my_agent.transfer(handle) + nsu.wait_for_transfer(my_agent, handle) + my_agent.release_xfer_handle(handle) + + +def remote_storage_transfer(my_agent, my_mem_descs, operation, remote_agent_name): + """Initiate remote memory transfer.""" + if operation != "READ" and operation != "WRITE": + logger.error("Invalid operation, exiting") + exit(-1) + + if operation == "WRITE": + operation = b"WRTE" + else: + operation = b"READ" + + # Send the descriptors that you want to read into or write from + logger.info(f"Sending {operation} request to {remote_agent_name}") + test_descs_str = my_agent.get_serialized_descs(my_mem_descs) + my_agent.send_notif(remote_agent_name, operation + test_descs_str) + + while not my_agent.check_remote_xfer_done(remote_agent_name, b"COMPLETE"): + continue + + +def connect_to_agents(my_agent, agents_file): + target_agents = [] + with open(agents_file, "r") as f: + for line in f: + # Each line in file should be: " " + parts = line.strip().split() + if len(parts) == 3: + target_agents.append(parts[0]) + my_agent.send_local_metadata(parts[1], int(parts[2])) + my_agent.fetch_remote_metadata(parts[0], parts[1], int(parts[2])) + + while my_agent.check_remote_metadata(parts[0]) is False: + logger.info(f"Waiting for remote metadata for {parts[0]}...") + time.sleep(0.2) + + logger.info(f"Remote metadata for {parts[0]} fetched") + else: + logger.error(f"Invalid line in {agents_file}: {line}") + exit(-1) + + logger.info("All remote metadata fetched") + + return target_agents + + +def handle_remote_transfer_request(my_agent, my_mem_descs, my_file_descs): + """Handle remote memory and storage transfers as target.""" + # Wait for initiator to send list of memory descriptors + notifs = my_agent.get_new_notifs() + + logger.info("Waiting for a remote transfer request...") + + while len(notifs) == 0: + notifs = my_agent.get_new_notifs() + + for req_agent in notifs: + recv_msg = notifs[req_agent][0] + + operation = None + if recv_msg[:4] == b"READ": + operation = "READ" + elif recv_msg[:4] == b"WRTE": + operation = "WRITE" + else: + logger.error("Invalid operation, exiting") + exit(-1) + + sent_descs = my_agent.deserialize_descs(recv_msg[4:]) + + logger.info("Checking to ensure metadata is loaded...") + while my_agent.check_remote_metadata(req_agent, sent_descs) is False: + continue + + if operation == "READ": + logger.info("Starting READ operation") + + # Read from file first + execute_transfer( + my_agent, my_mem_descs, my_file_descs, my_agent.name, "READ" + ) + # Send to client + execute_transfer(my_agent, my_mem_descs, sent_descs, req_agent, "WRITE") + + elif operation == "WRITE": + logger.info("Starting WRITE operation") + + # Read from client first + execute_transfer(my_agent, my_mem_descs, sent_descs, req_agent, "READ") + # Write to storage + execute_transfer( + my_agent, my_mem_descs, my_file_descs, my_agent.name, "WRITE" + ) + + # Send completion notification to initiator + my_agent.send_notif(req_agent, b"COMPLETE") + + logger.info("One transfer test complete.") + + +def run_client(my_agent, nixl_mem_reg_descs, nixl_file_reg_descs, agents_file): + logger.info("Client initialized, ready for local transfer test...") + + # For sample purposes, write to and then read from local storage + logger.info("Starting local transfer test...") + execute_transfer( + my_agent, + nixl_mem_reg_descs.trim(), + nixl_file_reg_descs.trim(), + my_agent.name, + "WRITE", + ) + execute_transfer( + my_agent, + nixl_mem_reg_descs.trim(), + nixl_file_reg_descs.trim(), + my_agent.name, + "READ", + ) + logger.info("Local transfer test complete") + + logger.info("Starting remote transfer test...") + + target_agents = connect_to_agents(my_agent, agents_file) + + # For sample purposes, write to and then read from each target agent + for target_agent in target_agents: + remote_storage_transfer( + my_agent, nixl_mem_reg_descs.trim(), "WRITE", target_agent + ) + remote_storage_transfer( + my_agent, nixl_mem_reg_descs.trim(), "READ", target_agent + ) + + logger.info("Remote transfer test complete") + + +def run_storage_server(my_agent, nixl_mem_reg_descs, nixl_file_reg_descs): + logger.info("Server initialized, ready for remote transfer test...") + while True: + handle_remote_transfer_request( + my_agent, nixl_mem_reg_descs.trim(), nixl_file_reg_descs.trim() + ) + + +if __name__ == "__main__": + parser = nsu.get_base_parser() + parser.add_argument( + "--role", + type=str, + choices=["server", "client"], + required=True, + help="Role of this node (server or client)", + ) + parser.add_argument( + "--port", + type=int, + default=5555, + help="Port to listen on for remote transfers (only needed for server)", + ) + parser.add_argument("--name", type=str, help="NIXL agent name") + parser.add_argument( + "--agents_file", + type=str, + help="File containing list of target agents (only needed for client)", + ) + args = parser.parse_args() + + my_agent = nsu.create_agent_with_plugins(args.name, args.port) + + ( + my_mem_list, + my_file_list, + nixl_mem_reg_descs, + nixl_file_reg_descs, + ) = nsu.setup_memory_and_files( + my_agent, args.batch_size, args.buf_size, args.fileprefix + ) + + if args.role == "client": + if not args.agents_file: + parser.error("--agents_file is required when role is client") + try: + run_client( + my_agent, nixl_mem_reg_descs, nixl_file_reg_descs, args.agents_file + ) + finally: + nsu.cleanup_resources( + my_agent, + nixl_mem_reg_descs, + nixl_file_reg_descs, + my_mem_list, + my_file_list, + ) + else: + if args.agents_file: + logger.warning("Warning: --agents_file is ignored when role is server") + try: + run_storage_server(my_agent, nixl_mem_reg_descs, nixl_file_reg_descs) + finally: + nsu.cleanup_resources( + my_agent, + nixl_mem_reg_descs, + nixl_file_reg_descs, + my_mem_list, + my_file_list, + ) + + logger.info("Test Complete.") diff --git a/examples/python/remote_storage_example/nixl_storage_utils/__init__.py b/examples/python/remote_storage_example/nixl_storage_utils/__init__.py new file mode 100644 index 000000000..04088c2e2 --- /dev/null +++ b/examples/python/remote_storage_example/nixl_storage_utils/__init__.py @@ -0,0 +1,35 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +NIXL Storage Utilities Module +Provides utilities for high-performance storage transfers using NIXL. +""" + +from .common import ( + cleanup_resources, + create_agent_with_plugins, + get_base_parser, + setup_memory_and_files, + wait_for_transfer, +) + +__all__ = [ + "create_agent_with_plugins", + "setup_memory_and_files", + "cleanup_resources", + "get_base_parser", + "wait_for_transfer", +] diff --git a/examples/python/remote_storage_example/nixl_storage_utils/common.py b/examples/python/remote_storage_example/nixl_storage_utils/common.py new file mode 100644 index 000000000..5d25f8acd --- /dev/null +++ b/examples/python/remote_storage_example/nixl_storage_utils/common.py @@ -0,0 +1,122 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Common utilities for NIXL storage operations. +Provides core functionality for memory management and storage operations. +""" + +import argparse +import os + +import nixl._utils as nixl_utils +from nixl._api import nixl_agent, nixl_agent_config +from nixl._bindings import DRAM_SEG +from nixl.logging import get_logger + +logger = get_logger(__name__) + + +def create_agent_with_plugins(agent_name, port): + """Create a NIXL agent with required plugins.""" + agent_config = nixl_agent_config(True, True, port, backends=[]) + new_nixl_agent = nixl_agent(agent_name, agent_config) + + plugin_list = new_nixl_agent.get_plugin_list() + + if "GDS" in plugin_list: + new_nixl_agent.create_backend("GDS") + logger.info("Using GDS storage backend") + if "POSIX" in plugin_list: + new_nixl_agent.create_backend("POSIX") + logger.info("Using POSIX storage backend") + + if "GDS" not in plugin_list and "POSIX" not in plugin_list: + logger.error("No storage backends available, exiting") + exit(-1) + + if "UCX" not in plugin_list: + logger.error("UCX not available for transfer, exiting") + exit(-1) + else: + new_nixl_agent.create_backend("UCX") + + logger.info("Initialized backends") + return new_nixl_agent + + +def setup_memory_and_files(agent, batch_size, buf_size, fileprefix): + """Setup memory and file resources.""" + my_mem_list = [] + my_file_list = [] + nixl_mem_reg_list = [] + nixl_file_reg_list = [] + + for i in range(batch_size): + my_mem_list.append(nixl_utils.malloc_passthru(buf_size)) + my_file_list.append(os.open(f"{fileprefix}_{i}", os.O_RDWR | os.O_CREAT)) + nixl_mem_reg_list.append((my_mem_list[-1], buf_size, 0, str(i))) + nixl_file_reg_list.append((0, buf_size, my_file_list[-1], str(i))) + + nixl_mem_reg_descs = agent.register_memory(nixl_mem_reg_list, "DRAM") + nixl_file_reg_descs = agent.register_memory(nixl_file_reg_list, "FILE") + + assert nixl_mem_reg_descs is not None + assert nixl_file_reg_descs is not None + + return my_mem_list, my_file_list, nixl_mem_reg_descs, nixl_file_reg_descs + + +def cleanup_resources(agent, mem_reg_descs, file_reg_descs, mem_list, file_list): + """Cleanup memory and file resources.""" + agent.deregister_memory(mem_reg_descs) + + if mem_reg_descs.getType() == DRAM_SEG: + agent.deregister_memory(file_reg_descs, backends=["POSIX"]) + + for mem in mem_list: + nixl_utils.free_passthru(mem) + else: + agent.deregister_memory(file_reg_descs, backends=["GDS"]) + # TODO: cudaFree + + for file in file_list: + os.close(file) + + +def get_base_parser(): + """Get base argument parser with common arguments.""" + parser = argparse.ArgumentParser(description="NIXL Storage Sample") + parser.add_argument("--fileprefix", type=str, help="Path to the files for testing") + parser.add_argument( + "--buf_size", + type=int, + default=4096, + help="Buffer size in bytes (default: 4096)", + ) + parser.add_argument( + "--batch_size", type=int, default=1, help="Batch size (default: 1)" + ) + return parser + + +def wait_for_transfer(agent, handle): + """Wait for transfer to complete.""" + status = agent.check_xfer_state(handle) + while status != "DONE": + if status == "ERR": + logger.error("Transfer got to Error state.") + exit() + status = agent.check_xfer_state(handle)