Skip to content

Commit

Permalink
Changes for new pricing system (#199)
Browse files Browse the repository at this point in the history
- Move/improve flow code parts from CLI to SDK
- Add utils functions
- Add `make_instance_content` and `make_program_content`
- Refactor `create_instance` and `create_program`
- Add `get_estimated_price`
- Fixes for mypy/ruff/pytest
- Minor improvements
- Remove firecracker rootfs hashes for instances
  • Loading branch information
philogicae authored and Antonyjin committed Feb 25, 2025
1 parent 5ce940e commit 7c942c7
Show file tree
Hide file tree
Showing 15 changed files with 588 additions and 231 deletions.
26 changes: 24 additions & 2 deletions src/aleph/sdk/chains/ethereum.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,21 @@
from web3.types import TxParams, TxReceipt

from aleph.sdk.exceptions import InsufficientFundsError
from aleph.sdk.types import TokenType

from ..conf import settings
from ..connectors.superfluid import Superfluid
from ..evm_utils import (
BALANCEOF_ABI,
MIN_ETH_BALANCE,
MIN_ETH_BALANCE_WEI,
FlowUpdate,
from_wei_token,
get_chain_id,
get_chains_with_super_token,
get_rpc,
get_super_token_address,
get_token_address,
to_human_readable_token,
)
from ..exceptions import BadSignatureError
from ..utils import bytes_from_hex
Expand Down Expand Up @@ -106,8 +108,9 @@ def can_transact(self, block=True) -> bool:
valid = balance > MIN_ETH_BALANCE_WEI if self.chain else False
if not valid and block:
raise InsufficientFundsError(
token_type=TokenType.GAS,
required_funds=MIN_ETH_BALANCE,
available_funds=to_human_readable_token(balance),
available_funds=float(from_wei_token(balance)),
)
return valid

Expand Down Expand Up @@ -162,6 +165,12 @@ def get_super_token_balance(self) -> Decimal:
return Decimal(contract.functions.balanceOf(self.get_address()).call())
return Decimal(0)

def can_start_flow(self, flow: Decimal) -> bool:
"""Check if the account has enough funds to start a Superfluid flow of the given size."""
if not self.superfluid_connector:
raise ValueError("Superfluid connector is required to check a flow")
return self.superfluid_connector.can_start_flow(flow)

def create_flow(self, receiver: str, flow: Decimal) -> Awaitable[str]:
"""Creat a Superfluid flow between this account and the receiver address."""
if not self.superfluid_connector:
Expand All @@ -188,6 +197,19 @@ def delete_flow(self, receiver: str) -> Awaitable[str]:
raise ValueError("Superfluid connector is required to delete a flow")
return self.superfluid_connector.delete_flow(receiver=receiver)

def manage_flow(
self,
receiver: str,
flow: Decimal,
update_type: FlowUpdate,
) -> Awaitable[Optional[str]]:
"""Manage the Superfluid flow between this account and the receiver address."""
if not self.superfluid_connector:
raise ValueError("Superfluid connector is required to manage a flow")
return self.superfluid_connector.manage_flow(
receiver=receiver, flow=flow, update_type=update_type
)


def get_fallback_account(
path: Optional[Path] = None, chain: Optional[Chain] = None
Expand Down
9 changes: 9 additions & 0 deletions src/aleph/sdk/chains/evm.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from aleph_message.models import Chain
from eth_account import Account # type: ignore

from ..evm_utils import FlowUpdate
from .common import get_fallback_private_key
from .ethereum import ETHAccount

Expand All @@ -29,6 +30,9 @@ def get_token_balance(self) -> Decimal:
def get_super_token_balance(self) -> Decimal:
raise ValueError(f"Super token not implemented for this chain {self.CHAIN}")

def can_start_flow(self, flow: Decimal) -> bool:
raise ValueError(f"Flow checking not implemented for this chain {self.CHAIN}")

def create_flow(self, receiver: str, flow: Decimal) -> Awaitable[str]:
raise ValueError(f"Flow creation not implemented for this chain {self.CHAIN}")

Expand All @@ -41,6 +45,11 @@ def update_flow(self, receiver: str, flow: Decimal) -> Awaitable[str]:
def delete_flow(self, receiver: str) -> Awaitable[str]:
raise ValueError(f"Flow deletion not implemented for this chain {self.CHAIN}")

def manage_flow(
self, receiver: str, flow: Decimal, update_type: FlowUpdate
) -> Awaitable[Optional[str]]:
raise ValueError(f"Flow management not implemented for this chain {self.CHAIN}")


def get_fallback_account(
path: Optional[Path] = None, chain: Optional[Chain] = None
Expand Down
80 changes: 46 additions & 34 deletions src/aleph/sdk/client/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@

from aleph_message.models import (
AlephMessage,
ExecutableContent,
ItemHash,
ItemType,
MessagesResponse,
MessageType,
Payment,
PostMessage,
Expand All @@ -41,7 +41,7 @@
from aleph.sdk.utils import extended_json_encoder

from ..query.filters import MessageFilter, PostFilter
from ..query.responses import PostsResponse, PriceResponse
from ..query.responses import MessagesResponse, PostsResponse, PriceResponse
from ..types import GenericMessage, StorageEnum
from ..utils import Writable, compute_sha256

Expand Down Expand Up @@ -110,7 +110,7 @@ async def get_posts_iterator(
)
page += 1
for post in resp.posts:
yield post
yield post # type: ignore

@abstractmethod
async def download_file(self, file_hash: str) -> bytes:
Expand Down Expand Up @@ -242,6 +242,18 @@ def watch_messages(
"""
raise NotImplementedError("Did you mean to import `AlephHttpClient`?")

@abstractmethod
def get_estimated_price(
self,
content: ExecutableContent,
) -> Coroutine[Any, Any, PriceResponse]:
"""
Get Instance/Program content estimated price
:param content: Instance or Program content
"""
raise NotImplementedError("Did you mean to import `AlephHttpClient`?")

@abstractmethod
def get_program_price(
self,
Expand All @@ -265,7 +277,7 @@ async def create_post(
post_type: str,
ref: Optional[str] = None,
address: Optional[str] = None,
channel: Optional[str] = None,
channel: Optional[str] = settings.DEFAULT_CHANNEL,
inline: bool = True,
storage_engine: StorageEnum = StorageEnum.storage,
sync: bool = False,
Expand All @@ -290,9 +302,9 @@ async def create_post(
async def create_aggregate(
self,
key: str,
content: Mapping[str, Any],
content: dict[str, Any],
address: Optional[str] = None,
channel: Optional[str] = None,
channel: Optional[str] = settings.DEFAULT_CHANNEL,
inline: bool = True,
sync: bool = False,
) -> Tuple[AlephMessage, MessageStatus]:
Expand All @@ -302,7 +314,7 @@ async def create_aggregate(
:param key: Key to use to store the content
:param content: Content to store
:param address: Address to use to sign the message
:param channel: Channel to use (Default: "TEST")
:param channel: Channel to use (Default: "ALEPH-CLOUDSOLUTIONS")
:param inline: Whether to write content inside the message (Default: True)
:param sync: If true, waits for the message to be processed by the API server (Default: False)
"""
Expand All @@ -321,7 +333,7 @@ async def create_store(
ref: Optional[str] = None,
storage_engine: StorageEnum = StorageEnum.storage,
extra_fields: Optional[dict] = None,
channel: Optional[str] = None,
channel: Optional[str] = settings.DEFAULT_CHANNEL,
sync: bool = False,
) -> Tuple[AlephMessage, MessageStatus]:
"""
Expand Down Expand Up @@ -350,45 +362,45 @@ async def create_program(
program_ref: str,
entrypoint: str,
runtime: str,
environment_variables: Optional[Mapping[str, str]] = None,
storage_engine: StorageEnum = StorageEnum.storage,
channel: Optional[str] = None,
metadata: Optional[dict[str, Any]] = None,
address: Optional[str] = None,
sync: bool = False,
memory: Optional[int] = None,
vcpus: Optional[int] = None,
memory: Optional[int] = None,
timeout_seconds: Optional[float] = None,
persistent: bool = False,
allow_amend: bool = False,
internet: bool = True,
allow_amend: bool = False,
aleph_api: bool = True,
encoding: Encoding = Encoding.zip,
persistent: bool = False,
volumes: Optional[List[Mapping]] = None,
subscriptions: Optional[List[Mapping]] = None,
metadata: Optional[Mapping[str, Any]] = None,
environment_variables: Optional[dict[str, str]] = None,
subscriptions: Optional[List[dict]] = None,
sync: bool = False,
channel: Optional[str] = settings.DEFAULT_CHANNEL,
storage_engine: StorageEnum = StorageEnum.storage,
) -> Tuple[AlephMessage, MessageStatus]:
"""
Post a (create) PROGRAM message.
:param program_ref: Reference to the program to run
:param entrypoint: Entrypoint to run
:param runtime: Runtime to use
:param environment_variables: Environment variables to pass to the program
:param storage_engine: Storage engine to use (Default: "storage")
:param channel: Channel to use (Default: "TEST")
:param metadata: Metadata to attach to the message
:param address: Address to use (Default: account.get_address())
:param sync: If true, waits for the message to be processed by the API server
:param memory: Memory in MB for the VM to be allocated (Default: 128)
:param vcpus: Number of vCPUs to allocate (Default: 1)
:param memory: Memory in MB for the VM to be allocated (Default: 128)
:param timeout_seconds: Timeout in seconds (Default: 30.0)
:param persistent: Whether the program should be persistent or not (Default: False)
:param allow_amend: Whether the deployed VM image may be changed (Default: False)
:param internet: Whether the VM should have internet connectivity. (Default: True)
:param allow_amend: Whether the deployed VM image may be changed (Default: False)
:param aleph_api: Whether the VM needs access to Aleph messages API (Default: True)
:param encoding: Encoding to use (Default: Encoding.zip)
:param persistent: Whether the program should be persistent or not (Default: False)
:param volumes: Volumes to mount
:param environment_variables: Environment variables to pass to the program
:param subscriptions: Patterns of aleph.im messages to forward to the program's event receiver
:param metadata: Metadata to attach to the message
:param sync: If true, waits for the message to be processed by the API server
:param channel: Channel to use (Default: "ALEPH-CLOUDSOLUTIONS")
:param storage_engine: Storage engine to use (Default: "storage")
"""
raise NotImplementedError(
"Did you mean to import `AuthenticatedAlephHttpClient`?"
Expand All @@ -400,9 +412,9 @@ async def create_instance(
rootfs: str,
rootfs_size: int,
payment: Optional[Payment] = None,
environment_variables: Optional[Mapping[str, str]] = None,
environment_variables: Optional[dict[str, str]] = None,
storage_engine: StorageEnum = StorageEnum.storage,
channel: Optional[str] = None,
channel: Optional[str] = settings.DEFAULT_CHANNEL,
address: Optional[str] = None,
sync: bool = False,
memory: Optional[int] = None,
Expand All @@ -416,7 +428,7 @@ async def create_instance(
volumes: Optional[List[Mapping]] = None,
volume_persistence: str = "host",
ssh_keys: Optional[List[str]] = None,
metadata: Optional[Mapping[str, Any]] = None,
metadata: Optional[dict[str, Any]] = None,
requirements: Optional[HostRequirements] = None,
) -> Tuple[AlephMessage, MessageStatus]:
"""
Expand All @@ -427,7 +439,7 @@ async def create_instance(
:param payment: Payment method used to pay for the instance
:param environment_variables: Environment variables to pass to the program
:param storage_engine: Storage engine to use (Default: "storage")
:param channel: Channel to use (Default: "TEST")
:param channel: Channel to use (Default: "ALEPH-CLOUDSOLUTIONS")
:param address: Address to use (Default: account.get_address())
:param sync: If true, waits for the message to be processed by the API server
:param memory: Memory in MB for the VM to be allocated (Default: 2048)
Expand Down Expand Up @@ -455,7 +467,7 @@ async def forget(
hashes: List[ItemHash],
reason: Optional[str],
storage_engine: StorageEnum = StorageEnum.storage,
channel: Optional[str] = None,
channel: Optional[str] = settings.DEFAULT_CHANNEL,
address: Optional[str] = None,
sync: bool = False,
) -> Tuple[AlephMessage, MessageStatus]:
Expand All @@ -468,7 +480,7 @@ async def forget(
:param hashes: Hashes of the messages to forget
:param reason: Reason for forgetting the messages
:param storage_engine: Storage engine to use (Default: "storage")
:param channel: Channel to use (Default: "TEST")
:param channel: Channel to use (Default: "ALEPH-CLOUDSOLUTIONS")
:param address: Address to use (Default: account.get_address())
:param sync: If true, waits for the message to be processed by the API server (Default: False)
"""
Expand All @@ -490,7 +502,7 @@ async def generate_signed_message(
:param message_type: Type of the message (PostMessage, ...)
:param content: User-defined content of the message
:param channel: Channel to use (Default: "TEST")
:param channel: Channel to use (Default: "ALEPH-CLOUDSOLUTIONS")
:param allow_inlining: Whether to allow inlining the content of the message (Default: True)
:param storage_engine: Storage engine to use (Default: "storage")
"""
Expand Down Expand Up @@ -537,7 +549,7 @@ async def submit(
self,
content: Dict[str, Any],
message_type: MessageType,
channel: Optional[str] = None,
channel: Optional[str] = settings.DEFAULT_CHANNEL,
storage_engine: StorageEnum = StorageEnum.storage,
allow_inlining: bool = True,
sync: bool = False,
Expand All @@ -549,7 +561,7 @@ async def submit(
:param content: Content of the message
:param message_type: Type of the message
:param channel: Channel to use (Default: "TEST")
:param channel: Channel to use (Default: "ALEPH-CLOUDSOLUTIONS")
:param storage_engine: Storage engine to use (Default: "storage")
:param allow_inlining: Whether to allow inlining the content of the message (Default: True)
:param sync: If true, waits for the message to be processed by the API server (Default: False)
Expand Down
Loading

0 comments on commit 7c942c7

Please sign in to comment.