Skip to content

Commit

Permalink
[feat][azure] Azure Collector Genesis (#1710)
Browse files Browse the repository at this point in the history
  • Loading branch information
aquamatthias authored Jul 17, 2023
1 parent fb0ffeb commit c276b7d
Show file tree
Hide file tree
Showing 54 changed files with 7,689 additions and 306 deletions.
71 changes: 71 additions & 0 deletions .github/workflows/check_pr_plugin_azure.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Note: this workflow is automatically generated via the `create_pr` script in the same folder.
# Please do not change the file, but the script!

name: Check PR (Plugin azure)
on:
push:
tags:
- "*.*.*"
branches:
- main
pull_request:
paths:
- 'resotolib/**'
- 'plugins/azure/**'
- '.github/**'
- 'requirements-all.txt'

jobs:
azure:
name: "azure"
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2

- name: Setup Python
uses: actions/setup-python@v2
with:
python-version: '3.10'
architecture: 'x64'

- name: Restore dependency cache
uses: actions/cache@v3
with:
path: ~/.cache/pip
key: ${{runner.os}}-pip-${{hashFiles('./plugins/azure/pyproject.toml')}}
restore-keys: |
${{runner.os}}-pip-
- name: Install Dependencies
run: |
python -m pip install --upgrade pip
pip install --upgrade --editable resotolib/
pip install tox wheel flake8 build
- name: Run tests
working-directory: ./plugins/azure
run: tox

- name: Archive code coverage results
uses: actions/upload-artifact@v2
with:
name: plugin-azure-code-coverage-report
path: ./plugins/azure/htmlcov/

- name: Build a binary wheel and a source tarball
working-directory: ./plugins/azure
run: >-
python -m
build
--sdist
--wheel
--outdir dist/
- name: Publish distribution to PyPI
if: github.ref_type == 'tag'
uses: pypa/gh-action-pypi-publish@release/v1
with:
user: __token__
password: ${{ secrets.PYPI_RESOTO_PLUGIN_AZURE }}
packages_dir: ./plugins/azure/dist/
3 changes: 2 additions & 1 deletion .github/workflows/model_check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ on:
paths:
- 'resotolib/**'
- 'plugins/aws/**'
- 'plugins/azure/**'
- 'plugins/digitalocean/**'
- 'plugins/example_collector/**'
- 'plugins/gcp/**'
Expand Down Expand Up @@ -48,7 +49,7 @@ jobs:
run: |
python -m pip install --upgrade pip
python -m pip install -r requirements-test.txt
pip install resotolib/ plugins/aws/ plugins/digitalocean/ plugins/dockerhub/ plugins/example_collector/ plugins/gcp/ plugins/github/ plugins/k8s/ plugins/onelogin/ plugins/onprem/ plugins/posthog/ plugins/random/ plugins/scarf/ plugins/slack/ plugins/vsphere/
pip install resotolib/ plugins/aws/ plugins/azure/ plugins/digitalocean/ plugins/dockerhub/ plugins/example_collector/ plugins/gcp/ plugins/github/ plugins/k8s/ plugins/onelogin/ plugins/onprem/ plugins/posthog/ plugins/random/ plugins/scarf/ plugins/slack/ plugins/vsphere/
- name: Run tests
working-directory: ./resotolib
Expand Down
14 changes: 3 additions & 11 deletions plugins/aws/resoto_plugin_aws/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,14 @@
sns,
sqs,
)
from resoto_plugin_aws.resource.base import (
AwsAccount,
AwsApiSpec,
AwsRegion,
AwsResource,
ExecutorQueue,
GatherFutures,
GraphBuilder,
)
from resoto_plugin_aws.resource.base import AwsAccount, AwsApiSpec, AwsRegion, AwsResource, GraphBuilder

from resotolib.baseresources import Cloud, EdgeType
from resotolib.core.actions import CoreFeedback
from resotolib.core.progress import ProgressDone, ProgressTree
from resotolib.graph import Graph
from resotolib.proc import set_thread_name
from resotolib.threading import ExecutorQueue, GatherFutures
from resotolib.types import Json
from resotolib.json import value_in_path

Expand Down Expand Up @@ -166,7 +159,7 @@ def collect(self) -> None:
# The shared executor is used to spread work for the whole account.
# Note: only tasks_per_key threads are running max for each region.
tpk = self.config.shared_tasks_per_key([r.id for r in self.regions])
shared_queue = ExecutorQueue(executor, tasks_per_key=tpk, name=self.account.safe_name)
shared_queue = ExecutorQueue(executor, name=self.account.safe_name, tasks_per_key=tpk)

def get_last_run() -> Optional[datetime]:
td = self.task_data
Expand All @@ -180,7 +173,6 @@ def get_last_run() -> Optional[datetime]:
return datetime.fromtimestamp(timestamp, timezone.utc)

last_run = get_last_run()

global_builder = GraphBuilder(
self.graph,
self.cloud,
Expand Down
161 changes: 5 additions & 156 deletions plugins/aws/resoto_plugin_aws/resource/base.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
from __future__ import annotations

import concurrent
import logging
from abc import ABC
from collections import defaultdict, deque
from concurrent.futures import Executor, Future
from concurrent.futures import Future
from datetime import datetime, timezone, timedelta
from functools import lru_cache, reduce
from threading import Event, Lock
from typing import Any, Callable, ClassVar, Deque, Dict, Iterator, List, Optional, Tuple, Type, TypeVar
from functools import lru_cache
from typing import Any, Callable, ClassVar, Dict, Iterator, List, Optional, Type, TypeVar
from math import ceil

from attr import evolve, field
from attr import evolve
from attrs import define
from boto3.exceptions import Boto3Error

Expand All @@ -36,6 +33,7 @@
from resotolib.json_bender import Bender, bend
from resotolib.lock import RWLock
from resotolib.proc import set_thread_name
from resotolib.threading import ExecutorQueue
from resotolib.types import Json

log = logging.getLogger("resoto.plugins.aws")
Expand Down Expand Up @@ -320,155 +318,6 @@ class AwsEc2VolumeType(AwsResource, BaseVolumeType):
kind: ClassVar[str] = "aws_ec2_volume_type"


class CancelOnFirstError(Exception):
pass


class GatherFutures:
def __init__(self, futures: List[Future[Any]]) -> None:
self._futures = futures
self._lock = Lock()
self._to_wait = len(futures)
self._when_done: Future[None] = Future()
for future in futures:
future.add_done_callback(self._on_future_done)

def _on_future_done(self, _: Future[Any]) -> None:
with self._lock:
self._to_wait -= 1
if self._to_wait == 0:
self._when_done.set_result(None)

@staticmethod
def all(futures: List[Future[Any]]) -> Future[None]:
return GatherFutures(futures)._when_done


@define
class ExecutorQueueTask:
key: Any
fn: Callable[..., T]
args: Tuple[Any, ...]
kwargs: Dict[str, Any]
future: Future[Any]

def __call__(self) -> T: # type: ignore
try:
result: T = self.fn(*self.args, **self.kwargs)
self.future.set_result(result)
return result
except Exception as e:
self.future.set_exception(e)
raise


@define
class ExecutorQueue:
"""
Use an underlying executor to perform work in parallel, but limit the number of tasks per key.
If fail_on_first_exception_in_group is True, then the first exception in a group
will not execute any more tasks in the same group.
"""

executor: Executor
tasks_per_key: Callable[[str], int]
name: str
fail_on_first_exception_in_group: bool = False
_tasks_lock: Lock = Lock()
_tasks: Dict[str, Deque[ExecutorQueueTask]] = field(factory=lambda: defaultdict(deque))
_in_progress: Dict[str, int] = field(factory=lambda: defaultdict(int))
_futures: List[Future[Any]] = field(factory=list)
_exceptions: Dict[Any, Exception] = field(factory=dict)
_task_finished: Event = Event()

def submit_work(self, key: Any, fn: Callable[..., T], *args: Any, **kwargs: Any) -> Future[T]:
future = Future[T]()
task = ExecutorQueueTask(key=key, fn=fn, args=args, kwargs=kwargs, future=future)
self.__append_work(task)
return future

def __append_work(self, task: ExecutorQueueTask) -> None:
with self._tasks_lock:
self._tasks[task.key].appendleft(task)
self.__check_queue(task.key)

def __check_queue(self, key: Any) -> None:
# note: this method is not thread safe, it should only be called from within a lock
in_progress = self._in_progress[key]
tasks = self._tasks[key]

if self.fail_on_first_exception_in_group and self._exceptions.get(key) is not None:
# Fail all tasks in this group
ex = CancelOnFirstError("Exception happened in another thread. Do not start work.")
for task in tasks:
task.future.set_exception(ex)
# Clear the queue, so we don't execute them
# Clear the queue, so we don't execute them
tasks.clear()

if in_progress < self.tasks_per_key(key) and tasks:
task = tasks.pop()
self._in_progress[key] += 1
self.__perform_task(task)

def __perform_task(self, task: ExecutorQueueTask) -> Future[T]:
def only_start_when_no_error() -> T:
# in case of exception let's fail fast and do not execute the function
if self._exceptions.get(task.key) is None:
try:
return task()
except Exception as e:
# only store the first exception if we should fail on first future
if self._exceptions.get(task.key) is None:
self._exceptions[task.key] = e
raise e
else:
raise CancelOnFirstError(
"Exception happened in another thread. Do not start work."
) from self._exceptions[task.key]

def execute() -> T:
try:
return only_start_when_no_error() if self.fail_on_first_exception_in_group else task()
finally:
with self._tasks_lock:
self._in_progress[task.key] -= 1
self._task_finished.set()
self.__check_queue(task.key)

future = self.executor.submit(execute)

self._futures.append(future)
return future

def wait_for_submitted_work(self) -> None:
# wait until all futures are complete
to_wait = []

# step 1: wait until all tasks are committed to the executor
while True:
with self._tasks_lock:
ip = reduce(lambda x, y: x + y, self._in_progress.values(), 0)
if ip == 0:
to_wait = self._futures
self._futures = []
break
else:
# safe inside the lock. clear this event and check when next task is done
self._task_finished.clear()
self._task_finished.wait()

# step 2: wait for all tasks to complete
for future in concurrent.futures.as_completed(to_wait):
try:
future.result()
except CancelOnFirstError:
pass
except Exception as ex:
log.exception(f"Unhandled exception in {self.name}: {ex}")
raise


class GraphBuilder:
def __init__(
self,
Expand Down
5 changes: 3 additions & 2 deletions plugins/aws/test/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
from resoto_plugin_aws.collector import AwsAccountCollector
from resoto_plugin_aws.configuration import AwsConfig
from resoto_plugin_aws.aws_client import AwsClient
from resoto_plugin_aws.resource.base import AwsAccount, AwsRegion, GraphBuilder, ExecutorQueue
from resoto_plugin_aws.resource.base import AwsAccount, AwsRegion, GraphBuilder
from resotolib.baseresources import Cloud
from resotolib.core.actions import CoreFeedback
from resotolib.graph import Graph
from resotolib.threading import ExecutorQueue
from test.resources import BotoFileBasedSession


Expand All @@ -29,7 +30,7 @@ def aws_client(aws_config: AwsConfig) -> AwsClient:
@fixture
def builder(aws_client: AwsClient, no_feedback: CoreFeedback) -> Iterator[GraphBuilder]:
with ThreadPoolExecutor(1) as executor:
queue = ExecutorQueue(executor, lambda _: 1, "dummy")
queue = ExecutorQueue(executor, "dummy", lambda _: 1)
yield GraphBuilder(
Graph(), Cloud(id="aws"), AwsAccount(id="test"), AwsRegion(id="us-east-1"), aws_client, queue, no_feedback
)
Expand Down
2 changes: 1 addition & 1 deletion plugins/aws/test/resources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
AwsRegion,
AwsResource,
AwsApiSpec,
ExecutorQueue,
)
from resotolib.baseresources import Cloud
from resotolib.core.actions import CoreFeedback
from resotolib.graph import Graph
from resotolib.threading import ExecutorQueue


class BotoDummyStsClient:
Expand Down
5 changes: 3 additions & 2 deletions plugins/aws/test/resources/base_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@

from more_itertools import flatten

from resoto_plugin_aws.resource.base import ExecutorQueue, AwsRegion, GraphBuilder, GatherFutures
from resoto_plugin_aws.resource.base import AwsRegion, GraphBuilder
from resoto_plugin_aws.resource.ec2 import AwsEc2InstanceType
from resotolib.threading import ExecutorQueue, GatherFutures
from test import account_collector, builder, aws_client, aws_config, no_feedback # noqa: F401


Expand All @@ -30,7 +31,7 @@ def do_work(num: int) -> None:

with ThreadPoolExecutor(max_workers=workers) as executor:
queue = ExecutorQueue(
executor, lambda _: tasks_per_key, "test", fail_on_first_exception_in_group=fail_on_first_exception
executor, "test", lambda _: tasks_per_key, fail_on_first_exception_in_group=fail_on_first_exception
)
for key, idx in work:
queue.submit_work(key, do_work, idx)
Expand Down
4 changes: 2 additions & 2 deletions plugins/aws/test/resources/cloudtrail_test.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
from concurrent.futures import ThreadPoolExecutor

from resoto_plugin_aws.resource.base import ExecutorQueue
from resoto_plugin_aws.resource.cloudtrail import AwsCloudTrail
from resoto_plugin_aws.resource.kms import AwsKmsKey
from resoto_plugin_aws.resource.s3 import AwsS3Bucket
from resoto_plugin_aws.resource.sns import AwsSnsTopic
from resotolib.threading import ExecutorQueue
from test.resources import round_trip_for


def test_trails() -> None:
first, builder = round_trip_for(AwsCloudTrail, region_name="us-east-1")
with ThreadPoolExecutor(1) as executor:
builder.executor = ExecutorQueue(executor, lambda _: 1, "dummy")
builder.executor = ExecutorQueue(executor, "dummy")
AwsS3Bucket.collect_resources(builder)
AwsKmsKey.collect_resources(builder)
AwsSnsTopic.collect_resources(builder)
Expand Down
Loading

0 comments on commit c276b7d

Please sign in to comment.