generated from ansible-community/project-template
-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
b7d625c
commit e411283
Showing
2 changed files
with
121 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
from importlib.metadata import EntryPoint, entry_points | ||
import inspect | ||
|
||
from awx_plugins.interfaces._temporary_private_api import ManagedCredentialType, CredentialPlugin | ||
from awx_plugins.interfaces._temporary_private_licensing_api import ( | ||
detect_server_product_name, | ||
) | ||
|
||
|
||
def detect_server_product_name(): | ||
return 'NO_AWX' | ||
|
||
|
||
class BasePluginRegistry: | ||
def _get_all_entry_points_for(self, | ||
entry_point_subsections: list[str], /) -> dict[str, EntryPoint]: | ||
return { | ||
ep.name: ep | ||
for entry_point_category in entry_point_subsections | ||
for ep in entry_points(group=f'awx_plugins.{entry_point_category}') | ||
} | ||
|
||
|
||
class BaseCredentialTypeRegistry(BasePluginRegistry): | ||
"""Load and track ManagedCredentialType plugins.""" | ||
_registry: dict[str, ManagedCredentialType] = {} | ||
|
||
def add(self, credential_type: ManagedCredentialType) -> None: | ||
"""Add the credential type to the registry | ||
:param credential_type: ManagedCredentialType to add to the registery | ||
""" | ||
namespace = credential_type.namespace | ||
if namespace in self._registry: | ||
raise ValueError('a ManagedCredentialType with namespace={} is already defined in {}'.format( | ||
namespace, inspect.getsourcefile(self._registry[namespace].__class__) | ||
)) | ||
self._registry[namespace] = credential_type | ||
|
||
def get(self, namespace: str) -> ManagedCredentialType: | ||
"""Access ManagedCredentialType by namespace. | ||
:param namespace: The namespace name of plugin. | ||
:returns: The ManagedCredentialType object or None if not found | ||
""" | ||
return self._registry.get(namespace, None) | ||
|
||
def get_keys(self) -> list[str]: | ||
"""All loaded plugin names. | ||
:returns: A list of plugin names. | ||
""" | ||
return self._registry.keys() | ||
|
||
def get_all(self) -> list[ManagedCredentialType]: | ||
"""Access all loaded credential types. | ||
:returns: All loaded credential types | ||
""" | ||
return self._registry.values() | ||
|
||
def _discover_all(self) -> list[EntryPoint]: | ||
"""Find all entry points to be loaded.""" | ||
raise ValueError("Implement me") | ||
|
||
def load_all(self): | ||
"""Load all the discovered plugins""" | ||
for entry_point_name, entry_point in self._discover_all().items(): | ||
credential_type = entry_point.load() | ||
self.add(credential_type) | ||
|
||
|
||
class _ManagedCredentialTypeRegistry(BaseCredentialTypeRegistry): | ||
def _discover_all(self) -> list[EntryPoint]: | ||
"""Find all relevant awx managed credential entry points.""" | ||
is_awx = detect_server_product_name() == 'AWX' | ||
return self._get_all_entry_points_for(['managed_credentials'] if is_awx else ['managed_credentials', 'managed_credentials.supported']) | ||
|
||
|
||
class _CredentialPluginRegistry(BaseCredentialTypeRegistry): | ||
"""Load and manage lookup credential plugin types""" | ||
_registry: dict[str, CredentialPlugin] = {} | ||
|
||
def get_all(self) -> list[CredentialPlugin]: | ||
"""Access all loaded credential plugins. | ||
:returns: All loaded credential plugins | ||
""" | ||
return self._registry.values() | ||
|
||
def _discover_all(self) -> list[EntryPoint]: | ||
"""Find all user managed credential entry points.""" | ||
return self._get_all_entry_points_for(['credentials',]) | ||
|
||
def load_all(self): | ||
"""Load all the user discovered plugins""" | ||
for entry_point_name, entry_point in self._discover_all().items(): | ||
cred_plugin = entry_point.load() | ||
self._registry[entry_point_name] = CredentialPlugin( | ||
name=cred_plugin.name, | ||
inputs=cred_plugin.inputs, | ||
backend=cred_plugin.backend | ||
) | ||
|
||
ManagedCredentialTypeRegistry = _ManagedCredentialTypeRegistry() | ||
CredentialPluginRegistry = _CredentialPluginRegistry() |