Skip to content

Commit

Permalink
WiP - refactor conductor layer
Browse files Browse the repository at this point in the history
  • Loading branch information
Dany9966 committed Nov 15, 2024
1 parent 3bad643 commit d7a96c7
Show file tree
Hide file tree
Showing 45 changed files with 1,393 additions and 1,456 deletions.
2 changes: 1 addition & 1 deletion coriolis/api/v1/replica_schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from coriolis.api import wsgi as api_wsgi
from coriolis import exception
from coriolis.policies import replica_schedules as schedules_policies
from coriolis.replica_cron import api
from coriolis.transfer_cron import api
from coriolis import schemas

import jsonschema
Expand Down
6 changes: 3 additions & 3 deletions coriolis/api/v1/replicas.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
LOG = logging.getLogger(__name__)

SUPPORTED_REPLICA_SCENARIOS = [
constants.REPLICA_SCENARIO_REPLICA,
constants.REPLICA_SCENARIO_LIVE_MIGRATION]
constants.TRANSFER_SCENARIO_REPLICA,
constants.TRANSFER_SCENARIO_LIVE_MIGRATION]


class ReplicaController(api_wsgi.Controller):
Expand Down Expand Up @@ -79,7 +79,7 @@ def _validate_create_body(self, context, body):
f"'{scenario}', must be one of: "
f"{SUPPORTED_REPLICA_SCENARIOS}")
else:
scenario = constants.REPLICA_SCENARIO_REPLICA
scenario = constants.TRANSFER_SCENARIO_REPLICA
LOG.warn(
"No Replica 'scenario' field set in Replica body, "
f"defaulting to: '{scenario}'")
Expand Down
4 changes: 2 additions & 2 deletions coriolis/cmd/replica_cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from oslo_config import cfg

from coriolis import constants
from coriolis.replica_cron.rpc import server as rpc_server
from coriolis.transfer_cron.rpc import server as rpc_server
from coriolis import service
from coriolis import utils

Expand All @@ -19,7 +19,7 @@ def main():
utils.setup_logging()

server = service.MessagingService(
constants.REPLICA_CRON_MAIN_MESSAGING_TOPIC,
constants.TRANSFER_CRON_MAIN_MESSAGING_TOPIC,
[rpc_server.ReplicaCronServerEndpoint()],
rpc_server.VERSION, worker_count=1)
launcher = service.service.launch(
Expand Down
155 changes: 78 additions & 77 deletions coriolis/conductor/rpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,49 +128,49 @@ def get_provider_schemas(self, ctxt, platform_name, provider_type):
platform_name=platform_name,
provider_type=provider_type)

def execute_replica_tasks(self, ctxt, replica_id,
shutdown_instances=False):
def execute_transfer_tasks(self, ctxt, transfer_id,
shutdown_instances=False):
return self._call(
ctxt, 'execute_replica_tasks', replica_id=replica_id,
ctxt, 'execute_transfer_tasks', transfer_id=transfer_id,
shutdown_instances=shutdown_instances)

def get_replica_tasks_executions(self, ctxt, replica_id,
include_tasks=False):
def get_transfer_tasks_executions(self, ctxt, transfer_id,
include_tasks=False):
return self._call(
ctxt, 'get_replica_tasks_executions',
replica_id=replica_id,
ctxt, 'get_transfer_tasks_executions',
transfer_id=transfer_id,
include_tasks=include_tasks)

def get_replica_tasks_execution(self, ctxt, replica_id, execution_id,
include_task_info=False):
def get_transfer_tasks_execution(self, ctxt, transfer_id, execution_id,
include_task_info=False):
return self._call(
ctxt, 'get_replica_tasks_execution', replica_id=replica_id,
ctxt, 'get_transfer_tasks_execution', transfer_id=transfer_id,
execution_id=execution_id, include_task_info=include_task_info)

def delete_replica_tasks_execution(self, ctxt, replica_id, execution_id):
def delete_transfer_tasks_execution(self, ctxt, transfer_id, execution_id):
return self._call(
ctxt, 'delete_replica_tasks_execution', replica_id=replica_id,
ctxt, 'delete_transfer_tasks_execution', transfer_id=transfer_id,
execution_id=execution_id)

def cancel_replica_tasks_execution(self, ctxt, replica_id, execution_id,
force):
def cancel_transfer_tasks_execution(self, ctxt, transfer_id, execution_id,
force):
return self._call(
ctxt, 'cancel_replica_tasks_execution', replica_id=replica_id,
ctxt, 'cancel_transfer_tasks_execution', transfer_id=transfer_id,
execution_id=execution_id, force=force)

def create_instances_replica(self, ctxt,
replica_scenario,
origin_endpoint_id,
destination_endpoint_id,
origin_minion_pool_id,
destination_minion_pool_id,
instance_osmorphing_minion_pool_mappings,
source_environment, destination_environment,
instances, network_map, storage_mappings,
notes=None, user_scripts=None):
return self._call(
ctxt, 'create_instances_replica',
replica_scenario=replica_scenario,
def create_instances_transfer(self, ctxt,
transfer_scenario,
origin_endpoint_id,
destination_endpoint_id,
origin_minion_pool_id,
destination_minion_pool_id,
instance_osmorphing_minion_pool_mappings,
source_environment, destination_environment,
instances, network_map, storage_mappings,
notes=None, user_scripts=None):
return self._call(
ctxt, 'create_instances_transfer',
transfer_scenario=transfer_scenario,
origin_endpoint_id=origin_endpoint_id,
destination_endpoint_id=destination_endpoint_id,
origin_minion_pool_id=origin_minion_pool_id,
Expand All @@ -185,25 +185,25 @@ def create_instances_replica(self, ctxt,
source_environment=source_environment,
user_scripts=user_scripts)

def get_replicas(self, ctxt, include_tasks_executions=False,
include_task_info=False):
def get_transfers(self, ctxt, include_tasks_executions=False,
include_task_info=False):
return self._call(
ctxt, 'get_replicas',
ctxt, 'get_transfers',
include_tasks_executions=include_tasks_executions,
include_task_info=include_task_info)

def get_replica(self, ctxt, replica_id, include_task_info=False):
def get_transfer(self, ctxt, transfer_id, include_task_info=False):
return self._call(
ctxt, 'get_replica', replica_id=replica_id,
ctxt, 'get_transfer', transfer_id=transfer_id,
include_task_info=include_task_info)

def delete_replica(self, ctxt, replica_id):
def delete_transfer(self, ctxt, transfer_id):
self._call(
ctxt, 'delete_replica', replica_id=replica_id)
ctxt, 'delete_transfer', transfer_id=transfer_id)

def delete_replica_disks(self, ctxt, replica_id):
def delete_transfer_disks(self, ctxt, transfer_id):
return self._call(
ctxt, 'delete_replica_disks', replica_id=replica_id)
ctxt, 'delete_transfer_disks', transfer_id=transfer_id)

def get_deployments(self, ctxt, include_tasks=False,
include_task_info=False):
Expand All @@ -216,12 +216,12 @@ def get_deployment(self, ctxt, deployment_id, include_task_info=False):
ctxt, 'get_deployment', deployment_id=deployment_id,
include_task_info=include_task_info)

def deploy_replica_instances(
self, ctxt, replica_id,
def deploy_transfer_instances(
self, ctxt, transfer_id,
instance_osmorphing_minion_pool_mappings=None, clone_disks=False,
force=False, skip_os_morphing=False, user_scripts=None):
return self._call(
ctxt, 'deploy_replica_instances', replica_id=replica_id,
ctxt, 'deploy_transfer_instances', transfer_id=transfer_id,
instance_osmorphing_minion_pool_mappings=(
instance_osmorphing_minion_pool_mappings),
clone_disks=clone_disks, force=force,
Expand Down Expand Up @@ -283,48 +283,48 @@ def update_task_progress_update(
new_current_step=new_current_step,
new_total_steps=new_total_steps, new_message=new_message)

def create_replica_schedule(self, ctxt, replica_id,
schedule, enabled, exp_date,
shutdown_instance):
def create_transfer_schedule(self, ctxt, transfer_id,
schedule, enabled, exp_date,
shutdown_instance):
return self._call(
ctxt, 'create_replica_schedule',
replica_id=replica_id,
ctxt, 'create_transfer_schedule',
transfer_id=transfer_id,
schedule=schedule,
enabled=enabled,
exp_date=exp_date,
shutdown_instance=shutdown_instance)

def update_replica_schedule(self, ctxt, replica_id, schedule_id,
updated_values):
def update_transfer_schedule(self, ctxt, transfer_id, schedule_id,
updated_values):
return self._call(
ctxt, 'update_replica_schedule',
replica_id=replica_id,
ctxt, 'update_transfer_schedule',
transfer_id=transfer_id,
schedule_id=schedule_id,
updated_values=updated_values)

def delete_replica_schedule(self, ctxt, replica_id, schedule_id):
def delete_transfer_schedule(self, ctxt, transfer_id, schedule_id):
return self._call(
ctxt, 'delete_replica_schedule',
replica_id=replica_id,
ctxt, 'delete_transfer_schedule',
transfer_id=transfer_id,
schedule_id=schedule_id)

def get_replica_schedules(self, ctxt, replica_id=None, expired=True):
def get_transfer_schedules(self, ctxt, transfer_id=None, expired=True):
return self._call(
ctxt, 'get_replica_schedules',
replica_id=replica_id, expired=expired)
ctxt, 'get_transfer_schedules',
transfer_id=transfer_id, expired=expired)

def get_replica_schedule(self, ctxt, replica_id,
schedule_id, expired=True):
def get_transfer_schedule(self, ctxt, transfer_id,
schedule_id, expired=True):
return self._call(
ctxt, 'get_replica_schedule',
replica_id=replica_id,
ctxt, 'get_transfer_schedule',
transfer_id=transfer_id,
schedule_id=schedule_id,
expired=expired)

def update_replica(self, ctxt, replica_id, updated_properties):
def update_transfer(self, ctxt, transfer_id, updated_properties):
return self._call(
ctxt, 'update_replica',
replica_id=replica_id,
ctxt, 'update_transfer',
transfer_id=transfer_id,
updated_properties=updated_properties)

def get_diagnostics(self, ctxt):
Expand Down Expand Up @@ -391,31 +391,32 @@ def delete_service(self, ctxt, service_id):
return self._call(
ctxt, 'delete_service', service_id=service_id)

def confirm_replica_minions_allocation(
self, ctxt, replica_id, minion_machine_allocations):
def confirm_transfer_minions_allocation(
self, ctxt, transfer_id, minion_machine_allocations):
self._call(
ctxt, 'confirm_replica_minions_allocation', replica_id=replica_id,
ctxt, 'confirm_transfer_minions_allocation',
transfer_id=transfer_id,
minion_machine_allocations=minion_machine_allocations)

def report_replica_minions_allocation_error(
self, ctxt, replica_id, minion_allocation_error_details):
def report_transfer_minions_allocation_error(
self, ctxt, transfer_id, minion_allocation_error_details):
self._call(
ctxt, 'report_replica_minions_allocation_error',
replica_id=replica_id,
ctxt, 'report_transfer_minions_allocation_error',
transfer_id=transfer_id,
minion_allocation_error_details=minion_allocation_error_details)

def confirm_migration_minions_allocation(
self, ctxt, migration_id, minion_machine_allocations):
def confirm_deployment_minions_allocation(
self, ctxt, deployment_id, minion_machine_allocations):
self._call(
ctxt, 'confirm_migration_minions_allocation',
migration_id=migration_id,
ctxt, 'confirm_deployment_minions_allocation',
deployment_id=deployment_id,
minion_machine_allocations=minion_machine_allocations)

def report_migration_minions_allocation_error(
self, ctxt, migration_id, minion_allocation_error_details):
def report_deployment_minions_allocation_error(
self, ctxt, deployment_id, minion_allocation_error_details):
self._call(
ctxt, 'report_migration_minions_allocation_error',
migration_id=migration_id,
ctxt, 'report_deployment_minions_allocation_error',
deployment_id=deployment_id,
minion_allocation_error_details=minion_allocation_error_details)


Expand Down
Loading

0 comments on commit d7a96c7

Please sign in to comment.