Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ cvxpy = { version = ">=1.1.0", optional = true }
graphviz = { version = ">=0.15", optional = true }
matplotlib = { version = ">=3.0.0", optional = true }
numpy = { version = ">=1.19.0", optional = true }
networkx = { version = ">=2.5", optional = true }

# gateway dependencies
flask = { version = "^2.1.2", optional = true }
Expand All @@ -70,7 +71,7 @@ gcp = ["google-api-python-client", "google-auth", "google-cloud-compute", "googl
ibm = ["ibm-cloud-sdk-core", "ibm-cos-sdk", "ibm-vpc"]
all = ["boto3", "azure-identity", "azure-mgmt-authorization", "azure-mgmt-compute", "azure-mgmt-network", "azure-mgmt-resource", "azure-mgmt-storage", "azure-mgmt-subscription", "azure-storage-blob", "google-api-python-client", "google-auth", "google-cloud-compute", "google-cloud-storage", "ibm-cloud-sdk-core", "ibm-cos-sdk", "ibm-vpc"]
gateway = ["flask", "lz4", "pynacl", "pyopenssl", "werkzeug"]
solver = ["cvxpy", "graphviz", "matplotlib", "numpy"]
solver = ["networkx", "cvxpy", "graphviz", "matplotlib", "numpy"]

[tool.poetry.dev-dependencies]
pytest = ">=6.0.0"
Expand Down
1 change: 1 addition & 0 deletions scripts/requirements-gateway.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,4 @@ numpy
pandas
pyarrow
typer
networkx
4 changes: 0 additions & 4 deletions skyplane/api/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,11 @@
from typing import TYPE_CHECKING, Optional

from skyplane.api.config import TransferConfig
from skyplane.api.dataplane import Dataplane
from skyplane.api.provisioner import Provisioner
from skyplane.api.obj_store import ObjectStore
from skyplane.api.usage import get_clientid
from skyplane.obj_store.object_store_interface import ObjectStoreInterface
from skyplane.planner.planner import MulticastDirectPlanner
from skyplane.utils import logger
from skyplane.utils.definitions import tmp_log_dir
from skyplane.utils.path import parse_path

from skyplane.api.pipeline import Pipeline

Expand Down
2 changes: 1 addition & 1 deletion skyplane/api/config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from dataclasses import dataclass

from typing import Optional, List
from typing import Optional

from skyplane import compute

Expand Down
8 changes: 6 additions & 2 deletions skyplane/api/dataplane.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
import json
import os
import threading
from collections import defaultdict, Counter
from collections import defaultdict
from datetime import datetime
from functools import partial
from datetime import datetime

import nacl.secret
import nacl.utils
import typer
import urllib3
from pathlib import Path
from typing import TYPE_CHECKING, Dict, List, Optional

from skyplane import compute
from skyplane.api.tracker import TransferProgressTracker, TransferHook
from skyplane.api.transfer_job import CopyJob, SyncJob, TransferJob
from skyplane.api.transfer_job import TransferJob
from skyplane.api.config import TransferConfig
from skyplane.planner.topology import TopologyPlan, TopologyPlanGateway
from skyplane.utils import logger
Expand Down Expand Up @@ -218,6 +219,9 @@ def provision(
def copy_gateway_logs(self):
# copy logs from all gateways in parallel
def copy_log(instance):
typer.secho(f"Downloading log: {self.transfer_dir}/gateway_{instance.uuid()}.stdout", fg="bright_black")
typer.secho(f"Downloading log: {self.transfer_dir}/gateway_{instance.uuid()}.stderr", fg="bright_black")

instance.run_command("sudo docker logs -t skyplane_gateway 2> /tmp/gateway.stderr > /tmp/gateway.stdout")
instance.download_file("/tmp/gateway.stdout", self.transfer_dir / f"gateway_{instance.uuid()}.stdout")
instance.download_file("/tmp/gateway.stderr", self.transfer_dir / f"gateway_{instance.uuid()}.stderr")
Expand Down
35 changes: 22 additions & 13 deletions skyplane/api/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,25 @@
import json
import time
import os
import threading
from collections import defaultdict, Counter
from datetime import datetime
from functools import partial
from datetime import datetime

import nacl.secret
import nacl.utils
import urllib3
from typing import TYPE_CHECKING, Dict, List, Optional

from skyplane import compute
from skyplane.api.tracker import TransferProgressTracker, TransferHook
from skyplane.api.tracker import TransferProgressTracker
from skyplane.api.transfer_job import CopyJob, SyncJob, TransferJob
from skyplane.api.config import TransferConfig

from skyplane.planner.planner import MulticastDirectPlanner
from skyplane.planner.planner import (
MulticastDirectPlanner,
UnicastDirectPlanner,
UnicastILPPlanner,
MulticastILPPlanner,
MulticastMDSTPlanner,
)
from skyplane.planner.topology import TopologyPlanGateway
from skyplane.utils import logger
from skyplane.utils.definitions import gateway_docker_image, tmp_log_dir
from skyplane.utils.fn import PathLike, do_parallel
from skyplane.utils.definitions import tmp_log_dir

from skyplane.api.dataplane import Dataplane

Expand All @@ -39,6 +37,7 @@ def __init__(
transfer_config: TransferConfig,
# cloud_regions: dict,
max_instances: Optional[int] = 1,
num_connections: Optional[int] = 32,
planning_algorithm: Optional[str] = "direct",
debug: Optional[bool] = False,
):
Expand Down Expand Up @@ -67,8 +66,18 @@ def __init__(

# planner
self.planning_algorithm = planning_algorithm

if self.planning_algorithm == "direct":
self.planner = MulticastDirectPlanner(self.max_instances, 32)
# TODO: should find some ways to merge direct / Ndirect
self.planner = UnicastDirectPlanner(self.max_instances, num_connections)
elif self.planning_algorithm == "Ndirect":
self.planner = MulticastDirectPlanner(self.max_instances, num_connections)
elif self.planning_algorithm == "MDST":
self.planner = MulticastMDSTPlanner(self.max_instances, num_connections)
elif self.planning_algorithm == "ILP":
self.planning_algorithm = MulticastILPPlanner(self.max_instances, num_connections)
elif self.planning_algorithm == "UnicastILP":
self.planning_algorithm = UnicastILPPlanner(self.max_instances, num_connections)
else:
raise ValueError(f"No such planning algorithm {planning_algorithm}")

Expand Down Expand Up @@ -112,7 +121,7 @@ def start(self, debug=False, progress=False):
# copy gateway logs
if debug:
dp.copy_gateway_logs()
except Exception as e:
except Exception:
dp.copy_gateway_logs()
dp.deprovision(spinner=True)
return dp
Expand Down
5 changes: 2 additions & 3 deletions skyplane/api/tracker.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import functools
from pprint import pprint
import json
import time
from abc import ABC
Expand Down Expand Up @@ -214,7 +213,7 @@ def monitor_single_dst_helper(dst_region):
}
self.hooks.on_transfer_end()

start_time = int(time.time())
int(time.time())
try:
for job in self.jobs.values():
logger.fs.debug(f"[TransferProgressTracker] Finalizing job {job.uuid}")
Expand All @@ -229,7 +228,7 @@ def monitor_single_dst_helper(dst_region):
session_start_timestamp_ms,
)
raise e
end_time = int(time.time())
int(time.time())

# verify transfer
try:
Expand Down
17 changes: 11 additions & 6 deletions skyplane/api/transfer_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@
from queue import Queue
from typing import TYPE_CHECKING, Callable, Generator, List, Optional, Tuple, TypeVar, Dict

from abc import ABC, abstractmethod
from abc import ABC

import urllib3
from rich import print as rprint

from skyplane import exceptions
from skyplane.api.config import TransferConfig
from skyplane.chunk import Chunk, ChunkRequest
from skyplane.chunk import Chunk
from skyplane.obj_store.azure_blob_interface import AzureBlobObject
from skyplane.obj_store.gcs_interface import GCSObject
from skyplane.obj_store.storage_interface import StorageInterface
Expand Down Expand Up @@ -97,6 +97,7 @@ def _run_multipart_chunk_thread(
src_object = transfer_pair.src_obj
dest_objects = transfer_pair.dst_objs
dest_key = transfer_pair.dst_key
print("dest_key: ", dest_key)
if isinstance(self.src_iface, ObjectStoreInterface):
mime_type = self.src_iface.get_obj_mime_type(src_object.key)
# create multipart upload request per destination
Expand Down Expand Up @@ -269,10 +270,10 @@ def transfer_pair_generator(
dest_provider, dest_region = dst_iface.region_tag().split(":")
try:
dest_key = self.map_object_key_prefix(src_prefix, obj.key, dst_prefix, recursive=recursive)
assert (
dest_key[: len(dst_prefix)] == dst_prefix
), f"Destination key {dest_key} does not start with destination prefix {dst_prefix}"
dest_keys.append(dest_key[len(dst_prefix) :])
# TODO: why is it changed here?
# dest_keys.append(dest_key[len(dst_prefix) :])

dest_keys.append(dest_key)
except exceptions.MissingObjectException as e:
logger.fs.exception(e)
raise e from None
Expand Down Expand Up @@ -471,8 +472,12 @@ def dst_prefixes(self) -> List[str]:
if not hasattr(self, "_dst_prefix"):
if self.transfer_type == "unicast":
self._dst_prefix = [str(parse_path(self.dst_paths[0])[2])]
print("return dst_prefixes for unicast", self._dst_prefix)
else:
for path in self.dst_paths:
print("Parsing result for multicast", parse_path(path))
self._dst_prefix = [str(parse_path(path)[2]) for path in self.dst_paths]
print("return dst_prefixes for multicast", self._dst_prefix)
return self._dst_prefix

@property
Expand Down
2 changes: 1 addition & 1 deletion skyplane/api/usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

import requests
from rich import print as rprint
from typing import Optional, Dict, List
from typing import Optional, Dict

import skyplane
from skyplane.utils.definitions import tmp_log_dir
Expand Down
2 changes: 1 addition & 1 deletion skyplane/cli/impl/progress_bar.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, DownloadColumn, TransferSpeedColumn, TimeRemainingColumn
from skyplane import exceptions
from skyplane.chunk import Chunk
from skyplane.cli.impl.common import console, print_stats_completed
from skyplane.cli.impl.common import console
from skyplane.utils.definitions import format_bytes
from skyplane.api.tracker import TransferHook

Expand Down
3 changes: 0 additions & 3 deletions skyplane/compute/server.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import json
import logging
from pprint import pprint
import os
import socket
from contextlib import closing
Expand All @@ -14,12 +13,10 @@
from skyplane import compute
from skyplane.compute.const_cmds import make_autoshutdown_script, make_dozzle_command, make_sysctl_tcp_tuning_command
from skyplane.config_paths import config_path, cloud_config, __config_root__
from skyplane.gateway.gateway_program import GatewayProgram
from skyplane.utils import logger
from skyplane.utils.fn import PathLike, wait_for
from skyplane.utils.retry import retry_backoff
from skyplane.utils.timer import Timer
from skyplane.planner.topology import TopologyPlanGateway

tmp_log_dir = Path("/tmp/skyplane")

Expand Down
6 changes: 3 additions & 3 deletions skyplane/gateway/gateway_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from multiprocessing import Event, Queue
from os import PathLike
from pathlib import Path
from typing import Dict, List, Optional
from typing import Dict, List

from skyplane.utils import logger

Expand Down Expand Up @@ -218,11 +218,11 @@ def create_gateway_operators_helper(input_queue, program: List[Dict], partition_
elif op["op_type"] == "send":
# TODO: handle private ips for GCP->GCP
target_gateway_info = self.gateway_info[op["target_gateway_id"]]
print("Gateway sender sending to ", target_gateway_info["private_ip_address"])
print("Gateway sender sending to ", target_gateway_info["public_ip_address"])
operators[handle] = GatewaySender(
handle,
region=self.region,
ip_addr=target_gateway_info["private_ip_address"],
ip_addr=target_gateway_info["public_ip_address"],
input_queue=input_queue,
output_queue=output_queue,
error_event=self.error_event,
Expand Down
3 changes: 1 addition & 2 deletions skyplane/gateway/gateway_daemon_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
from multiprocessing.managers import DictProxy
from queue import Empty
from traceback import TracebackException
from typing import Dict, List, Tuple, Optional
import json
from typing import Dict, List, Tuple
from flask import Flask, jsonify, request
from werkzeug.serving import make_server

Expand Down
3 changes: 0 additions & 3 deletions skyplane/gateway/gateway_onprem.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
import psutil
from multiprocessing import Process

# TODO: migrate to programmable gateways
# from skyplane.gateway.gateway_sender import GatewaySender
#
Expand Down
13 changes: 8 additions & 5 deletions skyplane/gateway/gateway_program.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,15 @@ def __init__(self):
def get_operators(self) -> List[GatewayOperator]:
return list(self._ops.values())

def add_operators(self, ops: List[GatewayOperator], parent_handle: Optional[str] = None, partition_id: Optional[int] = 0):
def add_operators(self, ops: List[GatewayOperator], parent_handle: Optional[str] = None, partition_id: Optional[Tuple] = None):
parent_op = self._ops[parent_handle] if parent_handle else None
ops_handles = []
for op in ops:
ops_handles.append(self.add_operator(op, parent_op, partition_id))
ops_handles.append(self.add_operator(op, parent_op.handle, partition_id))

return ops_handles

def add_operator(self, op: GatewayOperator, parent_handle: Optional[str] = None, partition_id: Optional[int] = 0):
def add_operator(self, op: GatewayOperator, parent_handle: Optional[str] = None, partition_id: Optional[Tuple] = None):
parent_op = self._ops[parent_handle] if parent_handle else None
if not parent_op: # root operation
self._plan[partition_id].append(op)
Expand All @@ -129,6 +129,8 @@ def to_dict(self):
"""
program_all = []
for partition_id, op_list in self._plan.items():
partition_id = list(partition_id) # convert tuple to list

# build gateway program representation
program = []
for op in op_list:
Expand All @@ -138,11 +140,12 @@ def to_dict(self):
exists = False
for p in program_all:
if p["value"] == program: # equivalent partition exists
p["partitions"].append(partition_id)
for pid in partition_id:
p["partitions"].append(pid)
exists = True
break
if not exists:
program_all.append({"value": program, "partitions": [partition_id]})
program_all.append({"value": program, "partitions": partition_id})

return program_all

Expand Down
3 changes: 1 addition & 2 deletions skyplane/obj_store/object_store_interface.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from dataclasses import dataclass

from typing import Iterator, List, Optional, Tuple
from typing import List, Optional, Tuple

from skyplane.obj_store.storage_interface import StorageInterface
from skyplane.utils import logger


@dataclass
Expand Down
Loading