From e92a1d5aaf8ae66a0ba866bec9627a40613a4ba4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matthias=20D=C3=B6tsch?= Date: Thu, 2 Oct 2025 14:37:50 +0200 Subject: [PATCH 1/2] parse_query: strip query --- adminapi/CLAUDE.md | 156 +++++++++++++++++++++++++++ adminapi/parse.py | 4 +- adminapi/tests/test_parse.py | 197 +++++++++++++++++++++++++++++++++++ 3 files changed, 355 insertions(+), 2 deletions(-) create mode 100644 adminapi/CLAUDE.md create mode 100644 adminapi/tests/test_parse.py diff --git a/adminapi/CLAUDE.md b/adminapi/CLAUDE.md new file mode 100644 index 00000000..26cb395e --- /dev/null +++ b/adminapi/CLAUDE.md @@ -0,0 +1,156 @@ +## Overview + +`adminapi` is a Python library for interacting with Serveradmin, a system for querying and managing server attributes. It provides both a Python API and a CLI tool for server management operations. + +Documentation: https://serveradmin.readthedocs.io/en/latest/python-api.html + +## Development Commands + +### Setup +```bash +# Install from parent directory, like +cd ~/projects/serveradmin +pip install -e . +``` + +### Testing +```bash +# Run all tests +python -m unittest discover adminapi/tests + +# Run specific test file +python -m unittest adminapi.tests.test_cli +python -m unittest adminapi.tests.test_dataset + +# Run single test +python -m unittest adminapi.tests.test_cli.TestCommandlineInterface.test_one_argument +``` + +### CLI Usage +```bash +# Query servers +adminapi 'hostname=web01' +adminapi 'project=adminapi' --attr hostname --attr state + +# Update servers +adminapi 'hostname=web01' --update state=maintenance +``` + +### Python API Examples + +**Query and modify servers** +```python +from adminapi.dataset import Query + +# Query servers matching criteria +servers = Query({ + 'servertype': 'vm', + 'project': 'test_project', + 'state': 'offline', +}) + +# Modify server attributes +for server in servers: + print(f"Bringing {server['hostname']} online") + server['state'] = 'online' + +# Commit changes to persist them +servers.commit() +``` + +**Query with restricted attributes** +```python +from adminapi.dataset import Query + +# Only fetch specific attributes to reduce payload +servers = Query( + {'project': 'test_project', 'state': 'maintenance'}, + restrict=['hostname', 'state'] +) + +for server in servers: + print(f"{server['hostname']}: {server['state']}") +``` + +**Get single server** +```python +from adminapi.dataset import Query + +# Use get() when expecting exactly one result +server = Query({'hostname': 'web01'}).get() +server['state'] = 'maintenance' +server.commit() +``` + +## Architecture + +### Core Components + +**Query System** (`dataset.py`) +- `Query`: Main class for filtering and fetching server objects from Serveradmin +- `BaseQuery`: Base class with common query operations (filtering, ordering, committing) +- Queries are lazy-loaded and cached until explicitly fetched +- Always ensures `object_id` is included in restrict lists for correlation during commits + +**Data Objects** (`dataset.py`) +- `DatasetObject`: Dict-like wrapper for server data with change tracking +- `MultiAttr`: Set-like wrapper for multi-value attributes with automatic change propagation +- Track state: `created`, `changed`, `deleted`, or `consistent` +- Old values stored in `old_values` dict for rollback/commit operations + +**Filters** (`filters.py`) +- `BaseFilter`: Exact match filter (default) +- Comparison: `GreaterThan`, `LessThan`, `GreaterThanOrEquals`, `LessThanOrEquals` +- Pattern: `Regexp`, `StartsWith`, `Contains`, `ContainedBy`, `Overlaps` +- Logical: `Any` (OR), `All` (AND), `Not` +- Special: `Empty` (null check), `ContainedOnlyBy` (IP network containment) + +**Request Layer** (`request.py`) +- Handles HTTP communication with Serveradmin server +- Authentication via SSH keys (paramiko), SSH agent, or auth tokens (HMAC-SHA1) +- Automatic retry logic (3 attempts, 5s interval) for network failures +- Gzip compression support for responses +- Environment variables: `SERVERADMIN_BASE_URL`, `SERVERADMIN_KEY_PATH`, `SERVERADMIN_TOKEN` + +**Query Parser** (`parse.py`) +- Converts string queries to filter dictionaries +- Supports function syntax: `attribute=Function(value)` +- Regex detection: triggers `Regexp` filter for patterns with `.*`, `.+`, `[`, `]`, etc. +- Hostname shorthand: first token without `=` treated as hostname filter + +**API Calls** (`api.py`) +- `FunctionGroup`: Dynamic proxy for calling Serveradmin API functions +- Pattern: `FunctionGroup('group_name').function_name(*args, **kwargs)` +- Example: `api.get('nagios').commit('push', 'user', project='foo')` + +### Data Flow + +1. **Query Construction**: Create `Query` with filters dict, restrict list, order_by +2. **Lazy Fetch**: Results fetched on first iteration/access via `_get_results()` +3. **Modification**: Change `DatasetObject` attributes, tracked in `old_values` +4. **Commit**: Build commit object with created/changed/deleted arrays, send to server +5. **Confirm**: Clear `old_values`, update object state after successful commit + +### Change Tracking + +- **Single attributes**: Save original value on first modification to `old_values` +- **Multi attributes**: `MultiAttr` operations create new sets, trigger parent `__setitem__` +- **Validation**: Type checking against existing attribute types (bool, multi, datatype) +- **Serialization**: Changed objects serialize with action (`update` or `multi`) and old/new values + +### Authentication Flow + +1. Check for `SERVERADMIN_KEY_PATH` → load private key file +2. Else check for `SERVERADMIN_TOKEN` → use HMAC authentication +3. Else try SSH agent (`paramiko.agent.Agent`) +4. Sign timestamp + request body, include signature in `X-Signatures` header +5. Server validates signature using stored public key + +## Important Patterns + +- Always use `commit()` after modifying objects to persist changes +- Use `restrict` parameter to limit fetched attributes (reduces payload size) +- `get()` expects exactly one result; use for single-server queries +- Multi-value attributes must be modified via `MultiAttr` methods or reassignment +- Query filters are immutable; changes create new filter instances +- The API is marked as draft and may change in future versions diff --git a/adminapi/parse.py b/adminapi/parse.py index 247e5bed..6b552091 100644 --- a/adminapi/parse.py +++ b/adminapi/parse.py @@ -10,8 +10,8 @@ def parse_query(term, hostname=None): # NOQA: C901 - # Ignore newlines to allow queries across multiple lines - term = term.replace('\n', '') + # Replace newlines with spaces to allow queries across multiple lines + term = term.replace('\n', ' ').strip() parsed_args = parse_function_string(term, strict=True) if not parsed_args: diff --git a/adminapi/tests/test_parse.py b/adminapi/tests/test_parse.py new file mode 100644 index 00000000..801244c5 --- /dev/null +++ b/adminapi/tests/test_parse.py @@ -0,0 +1,197 @@ +import unittest + +from adminapi.datatype import DatatypeError +from adminapi.filters import ( + BaseFilter, + Regexp, + Any, + GreaterThan, +) +from adminapi.parse import parse_query, parse_function_string + + +def assert_filters_equal(test_case, result, expected): + """Compare filter dictionaries by their repr, which includes structure and values.""" + test_case.assertEqual(sorted(result.keys()), sorted(expected.keys())) + for key in expected: + test_case.assertEqual(repr(result[key]), repr(expected[key])) + + +class TestParseQuery(unittest.TestCase): + def test_simple_attribute(self): + result = parse_query("hostname=web01") + expected = {"hostname": BaseFilter("web01")} + assert_filters_equal(self, result, expected) + + def test_whitespace_handling(self): + result = parse_query(" hostname=test ") + expected = {"hostname": BaseFilter("test")} + assert_filters_equal(self, result, expected) + + def test_multiple_attributes(self): + result = parse_query("hostname=web01 state=online") + expected = { + "hostname": BaseFilter("web01"), + "state": BaseFilter("online"), + } + assert_filters_equal(self, result, expected) + + def test_hostname_shorthand(self): + result = parse_query("web01 state=online") + expected = { + "hostname": BaseFilter("web01"), + "state": BaseFilter("online"), + } + assert_filters_equal(self, result, expected) + + def test_hostname_shorthand_with_regexp(self): + # Hostname shortcuts automatically detect regex patterns + result = parse_query("web.*") + expected = {"hostname": Regexp("web.*")} + assert_filters_equal(self, result, expected) + + def test_regexp_pattern_as_literal(self): + # Regex patterns in attribute values are treated as literals + # Use Regexp() function for actual regex filtering + result = parse_query("hostname=web.*") + expected = {"hostname": BaseFilter("web.*")} + assert_filters_equal(self, result, expected) + + def test_explicit_regexp_function(self): + # Use explicit Regexp() function for regex filtering + result = parse_query("hostname=Regexp(web.*)") + expected = {"hostname": Regexp("web.*")} + assert_filters_equal(self, result, expected) + + def test_function_filter(self): + result = parse_query("num_cores=GreaterThan(4)") + expected = {"num_cores": GreaterThan(4)} + assert_filters_equal(self, result, expected) + + def test_function_with_multiple_args(self): + result = parse_query("hostname=Any(web01 web02)") + expected = {"hostname": Any("web01", "web02")} + assert_filters_equal(self, result, expected) + + def test_empty_query(self): + result = parse_query("") + self.assertEqual(result, {}) + + def test_whitespace_only_query(self): + result = parse_query(" ") + self.assertEqual(result, {}) + + def test_newline_in_query(self): + result = parse_query("hostname=web01\nstate=online") + expected = { + "hostname": BaseFilter("web01"), + "state": BaseFilter("online"), + } + assert_filters_equal(self, result, expected) + + def test_any_filter_with_duplicate_hostname(self): + # Hostname shorthand triggers regex, but explicit attribute assignment doesn't + # Order: explicit assignment value comes first, then shorthand value + result = parse_query("web.* hostname=db.*") + expected = {"hostname": Any(BaseFilter("db.*"), Regexp("web.*"))} + assert_filters_equal(self, result, expected) + + def test_invalid_function(self): + with self.assertRaisesRegex(DatatypeError, r"Invalid function InvalidFunc"): + parse_query("hostname=InvalidFunc(test)") + + + def test_top_level_literal_error(self): + with self.assertRaisesRegex( + DatatypeError, r"Invalid term: Top level literals are not allowed" + ): + parse_query("hostname=test value") + + def test_top_level_function_as_hostname(self): + # Function syntax without key is treated as hostname shorthand + result = parse_query("GreaterThan(4)") + expected = {"hostname": BaseFilter("GreaterThan(4)")} + assert_filters_equal(self, result, expected) + + def test_garbled_hostname_error(self): + with self.assertRaisesRegex(DatatypeError, r"Garbled hostname: db01"): + parse_query("web01", hostname="db01") + + +class TestParseFunctionString(unittest.TestCase): + def test_simple_key_value(self): + result = parse_function_string("hostname=web01") + self.assertEqual(result, [("key", "hostname"), ("literal", "web01")]) + + def test_quoted_string(self): + result = parse_function_string('hostname="web 01"') + self.assertEqual(result, [("key", "hostname"), ("literal", "web 01")]) + + def test_single_quoted_string(self): + result = parse_function_string("hostname='web 01'") + self.assertEqual(result, [("key", "hostname"), ("literal", "web 01")]) + + def test_escaped_quote(self): + result = parse_function_string('hostname="web\\"01"') + # Note: string_buf stores the actual chars, but the result is the original slice + self.assertEqual(result[1][0], "literal") + + def test_function_call(self): + result = parse_function_string("num_cores=GreaterThan(4)") + expected = [ + ("key", "num_cores"), + ("func", "GreaterThan"), + ("literal", 4), + ("endfunc", ""), + ] + self.assertEqual(result, expected) + + def test_nested_function(self): + result = parse_function_string("attr=Func1(Func2(value))") + self.assertEqual(result[0], ("key", "attr")) + self.assertEqual(result[1], ("func", "Func1")) + self.assertEqual(result[2], ("func", "Func2")) + + def test_multiple_values(self): + result = parse_function_string("host1 host2 host3") + expected = [ + ("literal", "host1"), + ("literal", "host2"), + ("literal", "host3"), + ] + self.assertEqual(result, expected) + + def test_datatype_conversion(self): + result = parse_function_string("count=42") + self.assertEqual(result, [("key", "count"), ("literal", 42)]) + + def test_unterminated_string(self): + with self.assertRaisesRegex(DatatypeError, r"Unterminated string"): + parse_function_string('hostname="web01', strict=True) + + def test_invalid_escape(self): + with self.assertRaisesRegex(DatatypeError, r"Invalid escape"): + parse_function_string('hostname="web\\01"', strict=True) + + + def test_empty_string(self): + result = parse_function_string("") + self.assertEqual(result, []) + + def test_whitespace_only(self): + result = parse_function_string(" ") + self.assertEqual(result, []) + + def test_parentheses_handling(self): + result = parse_function_string("func(a b)") + expected = [ + ("func", "func"), + ("literal", "a"), + ("literal", "b"), + ("endfunc", ""), + ] + self.assertEqual(result, expected) + + +if __name__ == "__main__": + unittest.main() From 638dba37d7b28cd5ddb780b1518b9331bc5c0c1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matthias=20D=C3=B6tsch?= Date: Wed, 5 Nov 2025 08:32:25 +0100 Subject: [PATCH 2/2] parse_query: cleanup tests --- adminapi/CLAUDE.md | 156 ----------------------------------- adminapi/tests/test_parse.py | 12 +-- 2 files changed, 1 insertion(+), 167 deletions(-) delete mode 100644 adminapi/CLAUDE.md diff --git a/adminapi/CLAUDE.md b/adminapi/CLAUDE.md deleted file mode 100644 index 26cb395e..00000000 --- a/adminapi/CLAUDE.md +++ /dev/null @@ -1,156 +0,0 @@ -## Overview - -`adminapi` is a Python library for interacting with Serveradmin, a system for querying and managing server attributes. It provides both a Python API and a CLI tool for server management operations. - -Documentation: https://serveradmin.readthedocs.io/en/latest/python-api.html - -## Development Commands - -### Setup -```bash -# Install from parent directory, like -cd ~/projects/serveradmin -pip install -e . -``` - -### Testing -```bash -# Run all tests -python -m unittest discover adminapi/tests - -# Run specific test file -python -m unittest adminapi.tests.test_cli -python -m unittest adminapi.tests.test_dataset - -# Run single test -python -m unittest adminapi.tests.test_cli.TestCommandlineInterface.test_one_argument -``` - -### CLI Usage -```bash -# Query servers -adminapi 'hostname=web01' -adminapi 'project=adminapi' --attr hostname --attr state - -# Update servers -adminapi 'hostname=web01' --update state=maintenance -``` - -### Python API Examples - -**Query and modify servers** -```python -from adminapi.dataset import Query - -# Query servers matching criteria -servers = Query({ - 'servertype': 'vm', - 'project': 'test_project', - 'state': 'offline', -}) - -# Modify server attributes -for server in servers: - print(f"Bringing {server['hostname']} online") - server['state'] = 'online' - -# Commit changes to persist them -servers.commit() -``` - -**Query with restricted attributes** -```python -from adminapi.dataset import Query - -# Only fetch specific attributes to reduce payload -servers = Query( - {'project': 'test_project', 'state': 'maintenance'}, - restrict=['hostname', 'state'] -) - -for server in servers: - print(f"{server['hostname']}: {server['state']}") -``` - -**Get single server** -```python -from adminapi.dataset import Query - -# Use get() when expecting exactly one result -server = Query({'hostname': 'web01'}).get() -server['state'] = 'maintenance' -server.commit() -``` - -## Architecture - -### Core Components - -**Query System** (`dataset.py`) -- `Query`: Main class for filtering and fetching server objects from Serveradmin -- `BaseQuery`: Base class with common query operations (filtering, ordering, committing) -- Queries are lazy-loaded and cached until explicitly fetched -- Always ensures `object_id` is included in restrict lists for correlation during commits - -**Data Objects** (`dataset.py`) -- `DatasetObject`: Dict-like wrapper for server data with change tracking -- `MultiAttr`: Set-like wrapper for multi-value attributes with automatic change propagation -- Track state: `created`, `changed`, `deleted`, or `consistent` -- Old values stored in `old_values` dict for rollback/commit operations - -**Filters** (`filters.py`) -- `BaseFilter`: Exact match filter (default) -- Comparison: `GreaterThan`, `LessThan`, `GreaterThanOrEquals`, `LessThanOrEquals` -- Pattern: `Regexp`, `StartsWith`, `Contains`, `ContainedBy`, `Overlaps` -- Logical: `Any` (OR), `All` (AND), `Not` -- Special: `Empty` (null check), `ContainedOnlyBy` (IP network containment) - -**Request Layer** (`request.py`) -- Handles HTTP communication with Serveradmin server -- Authentication via SSH keys (paramiko), SSH agent, or auth tokens (HMAC-SHA1) -- Automatic retry logic (3 attempts, 5s interval) for network failures -- Gzip compression support for responses -- Environment variables: `SERVERADMIN_BASE_URL`, `SERVERADMIN_KEY_PATH`, `SERVERADMIN_TOKEN` - -**Query Parser** (`parse.py`) -- Converts string queries to filter dictionaries -- Supports function syntax: `attribute=Function(value)` -- Regex detection: triggers `Regexp` filter for patterns with `.*`, `.+`, `[`, `]`, etc. -- Hostname shorthand: first token without `=` treated as hostname filter - -**API Calls** (`api.py`) -- `FunctionGroup`: Dynamic proxy for calling Serveradmin API functions -- Pattern: `FunctionGroup('group_name').function_name(*args, **kwargs)` -- Example: `api.get('nagios').commit('push', 'user', project='foo')` - -### Data Flow - -1. **Query Construction**: Create `Query` with filters dict, restrict list, order_by -2. **Lazy Fetch**: Results fetched on first iteration/access via `_get_results()` -3. **Modification**: Change `DatasetObject` attributes, tracked in `old_values` -4. **Commit**: Build commit object with created/changed/deleted arrays, send to server -5. **Confirm**: Clear `old_values`, update object state after successful commit - -### Change Tracking - -- **Single attributes**: Save original value on first modification to `old_values` -- **Multi attributes**: `MultiAttr` operations create new sets, trigger parent `__setitem__` -- **Validation**: Type checking against existing attribute types (bool, multi, datatype) -- **Serialization**: Changed objects serialize with action (`update` or `multi`) and old/new values - -### Authentication Flow - -1. Check for `SERVERADMIN_KEY_PATH` → load private key file -2. Else check for `SERVERADMIN_TOKEN` → use HMAC authentication -3. Else try SSH agent (`paramiko.agent.Agent`) -4. Sign timestamp + request body, include signature in `X-Signatures` header -5. Server validates signature using stored public key - -## Important Patterns - -- Always use `commit()` after modifying objects to persist changes -- Use `restrict` parameter to limit fetched attributes (reduces payload size) -- `get()` expects exactly one result; use for single-server queries -- Multi-value attributes must be modified via `MultiAttr` methods or reassignment -- Query filters are immutable; changes create new filter instances -- The API is marked as draft and may change in future versions diff --git a/adminapi/tests/test_parse.py b/adminapi/tests/test_parse.py index 801244c5..aa7e5e52 100644 --- a/adminapi/tests/test_parse.py +++ b/adminapi/tests/test_parse.py @@ -91,7 +91,6 @@ def test_newline_in_query(self): def test_any_filter_with_duplicate_hostname(self): # Hostname shorthand triggers regex, but explicit attribute assignment doesn't - # Order: explicit assignment value comes first, then shorthand value result = parse_query("web.* hostname=db.*") expected = {"hostname": Any(BaseFilter("db.*"), Regexp("web.*"))} assert_filters_equal(self, result, expected) @@ -100,7 +99,6 @@ def test_invalid_function(self): with self.assertRaisesRegex(DatatypeError, r"Invalid function InvalidFunc"): parse_query("hostname=InvalidFunc(test)") - def test_top_level_literal_error(self): with self.assertRaisesRegex( DatatypeError, r"Invalid term: Top level literals are not allowed" @@ -127,14 +125,11 @@ def test_quoted_string(self): result = parse_function_string('hostname="web 01"') self.assertEqual(result, [("key", "hostname"), ("literal", "web 01")]) - def test_single_quoted_string(self): result = parse_function_string("hostname='web 01'") self.assertEqual(result, [("key", "hostname"), ("literal", "web 01")]) - def test_escaped_quote(self): result = parse_function_string('hostname="web\\"01"') - # Note: string_buf stores the actual chars, but the result is the original slice - self.assertEqual(result[1][0], "literal") + self.assertEqual(result[1], ("literal", 'web\\"01')) def test_function_call(self): result = parse_function_string("num_cores=GreaterThan(4)") @@ -173,7 +168,6 @@ def test_invalid_escape(self): with self.assertRaisesRegex(DatatypeError, r"Invalid escape"): parse_function_string('hostname="web\\01"', strict=True) - def test_empty_string(self): result = parse_function_string("") self.assertEqual(result, []) @@ -191,7 +185,3 @@ def test_parentheses_handling(self): ("endfunc", ""), ] self.assertEqual(result, expected) - - -if __name__ == "__main__": - unittest.main()