Skip to content

Commit

Permalink
Significantly improve ooutput handling, opera version to 0.6.1.
Browse files Browse the repository at this point in the history
Contains a breaking API change.
  • Loading branch information
sstanovnik committed Oct 2, 2020
1 parent bfa7e2d commit 9abd221
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 123 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ WIP.

## Development

Requires Python >= 3.7.

To begin:

```shell script
Expand All @@ -32,9 +34,10 @@ java -jar openapi-generator-cli-4.3.0.jar config-help --generator-name python-fl
With Docker:

```shell script
docker-compose up --build
docker-compose up --build -d
docker cp test.csar xopera-api_api_1:/app/
docker exec xopera-api_api_1 unzip test.csar
docker logs -f xopera-api_api_1
# prepare request inputs: service_template, inputs (in JSON object form, not a string)
curl -XPOST localhost:8080/validate -H "Content-Type: application/json" -d @inputs-request.json
curl -XPOST localhost:8080/deploy -H "Content-Type: application/json" -d @inputs-request.json
Expand Down
13 changes: 10 additions & 3 deletions openapi-spec.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
openapi: "3.0.0"
info:
version: 0.1.2
version: 0.2.0
title: xOpera API
license:
name: Apache-2.0
Expand Down Expand Up @@ -123,6 +123,7 @@ components:
required:
- id
- state
- operation
- timestamp
properties:
id:
Expand All @@ -134,6 +135,9 @@ components:
timestamp:
description: An ISO8601 timestamp of the invocation.
type: string
service_template:
description: The service template used for the invocation.
type: string
inputs:
description: Inputs provided for invocation.
type: object
Expand All @@ -143,11 +147,14 @@ components:
additionalProperties:
type: string
exception:
description: An internal xOpera error that occurred starting operation.
description: An internal xOpera error that occurred during the operation.
type: string
console_output:
stdout:
description: xOpera console output for operation.
type: string
stderr:
description: xOpera error output for operation.
type: string
InvocationHistory:
description: Invocation history ordered by timestamp ascending.
type: array
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
opera==0.5.9
opera==0.6.1

connexion >= 2.7.0; python_version>="3.6"
connexion >= 2.3.0; python_version=="3.5"
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ include_package_data = True
setup_requires =
setuptools_scm
install_requires =
opera == 0.5.9
opera == 0.6.1
connexion >= 2.7.0
python_dateutil >= 2.8.1
tornado >= 6.0.4
Expand Down
195 changes: 92 additions & 103 deletions src/opera/api/controllers/background_invocation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,117 +2,98 @@
import json
import multiprocessing
import os
import sys
import traceback
import typing
import uuid
from io import StringIO
from pathlib import Path
from typing import List, Callable, Optional
from typing import List, Optional

from opera.commands.deploy import deploy as opera_deploy
from opera.commands.undeploy import undeploy as opera_undeploy
from opera.storage import Storage

from opera.api.log import get_logger
from opera.api.openapi.models import Invocation, InvocationState
from opera.api.openapi.models import Invocation, InvocationState, OperationType

logger = get_logger(__name__)


def get_instance_state():
json_dict = {}
for file_path in Path(os.path.join('.opera', 'instances')).glob("*"):
parsed = json.load(open(file_path, 'r'))
component_name = parsed['tosca_name']['data']
json_dict[component_name] = parsed['state']['data']
return json_dict


class StdoutCapture(object):
def __enter__(self):
self._stdout_backup = sys.stdout
self._stringio = StringIO()
sys.stdout = self._stringio
return self

def __exit__(self, *args):
self.value = self._stringio.getvalue()
del self._stringio # free up some memory
sys.stdout = self._stdout_backup

def get_value(self):
return self._stringio.getvalue()


class WrapperException(BaseException):
def __init__(self, invocation_uuid, wrapped_exception):
self.invocation_uuid = invocation_uuid
self.wrapped_exception = wrapped_exception


def wrapper_start(function, function_args, invocation_uuid):
logger.debug("Starting %s", invocation_uuid)

local_inv = InvocationService.load_invocation(invocation_uuid)
local_inv.state = InvocationState.IN_PROGRESS
InvocationService.write_invocation(local_inv)

with StdoutCapture() as capture:
try:
function(*function_args)
# we want the console output no matter what
except BaseException as e:
wrapped_exc = WrapperException(invocation_uuid, e)
raise wrapped_exc
finally:
local_inv = InvocationService.load_invocation(invocation_uuid)
local_inv.console_output = capture.get_value()
InvocationService.write_invocation(local_inv)

return invocation_uuid


def wrapper_error(error: WrapperException):
if not isinstance(error, WrapperException):
logger.error("Unexpected out-of-band error.")
raise error

logger.error("Error in %s", error.invocation_uuid, exc_info=error.wrapped_exception)

local_inv = InvocationService.load_invocation(error.invocation_uuid)
local_inv.state = InvocationState.FAILED
local_inv.exception = str(error)
InvocationService.write_invocation(local_inv)


# gets param as the result of wrapper_start
def wrapper_done(invocation_uuid):
logger.debug("Done with %s", invocation_uuid)

local_inv = InvocationService.load_invocation(invocation_uuid)
local_inv.state = InvocationState.SUCCESS
local_inv.instance_state = get_instance_state()
InvocationService.write_invocation(local_inv)


# necessary because we can't pickle the storage object and therefore can't submit upstream deploy to the pool
def opera_deploy_storage_proxy(service_template: str, inputs: typing.Optional[dict], num_workers: int):
opera_storage = Storage.create()
return opera_deploy(service_template, inputs, opera_storage, num_workers)


def opera_undeploy_storage_proxy(num_workers: int):
opera_storage = Storage.create()
opera_undeploy(opera_storage, num_workers)
class InvocationWorkerProcess(multiprocessing.Process):
IN_PROGRESS_STDOUT_FILE = "/tmp/xopera-api-inprogress-stdout.txt"
IN_PROGRESS_STDERR_FILE = "/tmp/xopera-api-inprogress-stderr.txt"

def __init__(self, work_queue: multiprocessing.Queue):
super(InvocationWorkerProcess, self).__init__(
group=None, target=self._run_internal, name="Invocation-Worker", args=(),
kwargs={
"work_queue": work_queue,
}, daemon=None)

@staticmethod
def _run_internal(work_queue: multiprocessing.Queue):
file_stdout = open(InvocationWorkerProcess.IN_PROGRESS_STDOUT_FILE, "w")
file_stderr = open(InvocationWorkerProcess.IN_PROGRESS_STDERR_FILE, "w")

os.dup2(file_stdout.fileno(), 1)
os.dup2(file_stderr.fileno(), 2)

while True:
inv: Invocation = work_queue.get(block=True)

inv.state = InvocationState.IN_PROGRESS
InvocationService.write_invocation(inv)

try:
if inv.operation == OperationType.DEPLOY:
InvocationWorkerProcess._deploy(inv.service_template, inv.inputs, num_workers=1)
elif inv.operation == OperationType.UNDEPLOY:
InvocationWorkerProcess._undeploy(num_workers=1)
else:
raise RuntimeError("Unknown operation type:" + str(inv.operation))

inv.state = InvocationState.SUCCESS
except BaseException as e:
if isinstance(e, RuntimeError):
raise e
inv.state = InvocationState.FAILED
inv.exception = "{}: {}\n\n{}".format(e.__class__.__name__, str(e), traceback.format_exc())

instance_state = InvocationService.get_instance_state()
stdout = InvocationWorkerProcess.read_file(InvocationWorkerProcess.IN_PROGRESS_STDOUT_FILE)
stderr = InvocationWorkerProcess.read_file(InvocationWorkerProcess.IN_PROGRESS_STDERR_FILE)
file_stdout.truncate()
file_stderr.truncate()

inv.instance_state = instance_state
inv.stdout = stdout
inv.stderr = stderr
InvocationService.write_invocation(inv)

@staticmethod
def _deploy(service_template: str, inputs: typing.Optional[dict], num_workers: int):
opera_storage = Storage.create()
opera_deploy(service_template, inputs, opera_storage,
verbose_mode=True, num_workers=num_workers, delete_existing_state=False)

@staticmethod
def _undeploy(num_workers: int):
opera_storage = Storage.create()
opera_undeploy(opera_storage, verbose_mode=True, num_workers=num_workers)

@staticmethod
def read_file(filename):
with open(filename, "r") as f:
return f.read()


class InvocationService:
def __init__(self):
# FIXME: should really be closed or used as a context manager
self.pool = multiprocessing.Pool(1) # one thing at a time
self.work_queue: multiprocessing.Queue = multiprocessing.Queue()
self.worker = InvocationWorkerProcess(self.work_queue)
self.worker.start()

def invoke(self, function: Callable, function_args: list,
operation_type: str, inputs: Optional[dict]) -> Invocation:
def invoke(self, operation_type: OperationType, service_template: Optional[str], inputs: Optional[dict]) \
-> Invocation:
invocation_uuid = str(uuid.uuid4())
now = datetime.datetime.now(tz=datetime.timezone.utc)
logger.info("Invoking %s with ID %s at %s", operation_type, invocation_uuid, now.isoformat())
Expand All @@ -122,21 +103,15 @@ def invoke(self, function: Callable, function_args: list,
inv.state = InvocationState.PENDING
inv.operation = operation_type
inv.timestamp = now.isoformat()
inv.service_template = service_template
inv.inputs = inputs
inv.instance_state = None
inv.exception = None
inv.console_output = None
inv.stdout = None
inv.stderr = None
self.write_invocation(inv)

wrapper_kwargs = dict(
function=function,
function_args=function_args,
invocation_uuid=invocation_uuid
)

# the error callback is runtime correct, as we only throw one type of exception
# noinspection PyTypeChecker
self.pool.apply_async(wrapper_start, kwds=wrapper_kwargs, callback=wrapper_done, error_callback=wrapper_error)
self.work_queue.put(inv)
return inv

@classmethod
Expand All @@ -147,6 +122,11 @@ def invocation_history(cls) -> List[Invocation]:
for file_path in Path(".opera-api").glob('*.json'):
logger.debug(file_path)
invocation = Invocation.from_dict(json.load(open(file_path, 'r')))

if invocation.state == InvocationState.IN_PROGRESS:
invocation.stdout = InvocationWorkerProcess.read_file(InvocationWorkerProcess.IN_PROGRESS_STDOUT_FILE)
invocation.stderr = InvocationWorkerProcess.read_file(InvocationWorkerProcess.IN_PROGRESS_STDERR_FILE)

invocations.append(invocation)

if invocations:
Expand Down Expand Up @@ -182,3 +162,12 @@ def write_invocation(cls, inv: Invocation):
filename = "invocation-{}.json".format(inv.id)
dump = json.dumps(inv.to_dict())
storage.write(dump, filename)

@classmethod
def get_instance_state(cls):
json_dict = {}
for file_path in Path(os.path.join('.opera', 'instances')).glob("*"):
parsed = json.load(open(file_path, 'r'))
component_name = parsed['tosca_name']['data']
json_dict[component_name] = parsed['state']['data']
return json_dict
20 changes: 6 additions & 14 deletions src/opera/api/controllers/default.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import traceback

from opera.commands.outputs import outputs as opera_outputs
from opera.commands.validate import validate as opera_validate
from opera.storage import Storage

from opera.api.controllers.background_invocation import InvocationService, opera_deploy_storage_proxy, \
opera_undeploy_storage_proxy
from opera.api.controllers.background_invocation import InvocationService
from opera.api.log import get_logger
from opera.api.openapi.models import ValidationResult, OperationType
from opera.api.openapi.models.deployment_input import DeploymentInput

logger = get_logger(__name__)

# must be created (pool) _after_ any functions are referenced, otherwise AttributeError: can't get attribute
invocation_service = InvocationService()


Expand All @@ -19,22 +19,14 @@ def deploy(body: DeploymentInput = None):
logger.debug(body)

deployment_input = DeploymentInput.from_dict(body)
result = invocation_service.invoke(
opera_deploy_storage_proxy, [deployment_input.service_template, deployment_input.inputs, 1],
OperationType.DEPLOY, deployment_input.inputs
)

result = invocation_service.invoke(OperationType.DEPLOY, deployment_input.service_template, deployment_input.inputs)
return result, 200


def undeploy():
logger.debug("Entry: undeploy")

result = invocation_service.invoke(
opera_undeploy_storage_proxy, [1],
OperationType.UNDEPLOY, None
)

result = invocation_service.invoke(OperationType.UNDEPLOY, None, None)
return result, 200


Expand Down Expand Up @@ -80,6 +72,6 @@ def validate(body: DeploymentInput = None):
result.success = True
except Exception as e:
result.success = False
result.message = str(e)
result.message = "{}: {}\n\n{}".format(e.__class__.__name__, str(e), traceback.format_exc())

return result, 200

0 comments on commit 9abd221

Please sign in to comment.