-
Notifications
You must be signed in to change notification settings - Fork 133
test: hookcmds integration tests for secret access and port management #2562
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
9587b36
56dddee
3500a65
cc34d9b
67f62e8
9ee0489
efe9466
b07fd7c
60b9c2a
5999260
40a1d6d
8b9212b
0720fe3
ff01007
af625b6
54d5010
c852e25
961d9d1
e64797b
a9ae994
1cfc0b6
bb35285
d5b4512
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| uv.lock | ||
| *.tar.gz | ||
| *.charm | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,11 +23,18 @@ | |
| from __future__ import annotations | ||
|
|
||
| import json | ||
| import os | ||
| import pathlib | ||
| import time | ||
| from typing import Any | ||
|
|
||
| import ops | ||
| import ops.hookcmds as hookcmds | ||
|
|
||
| _REBOOT_MARKER = ( | ||
| pathlib.Path(os.environ.get('JUJU_CHARM_DIR', '.')) / '.test-hookcmds.reboot-triggered' | ||
| ) | ||
|
|
||
|
|
||
| class TestHookcmdsCharm(ops.CharmBase): | ||
| def __init__(self, framework: ops.Framework): | ||
|
|
@@ -53,14 +60,19 @@ def __init__(self, framework: ops.Framework): | |
| self.on['trigger-relation-error'].action, self._on_trigger_relation_error | ||
| ) | ||
| framework.observe(self.on['test-credential-get'].action, self._on_test_credential_get) | ||
| framework.observe(self.on['test-juju-reboot'].action, self._on_test_juju_reboot) | ||
| framework.observe(self.on.config_changed, self._on_config_changed) | ||
| framework.observe(self.on['test-reboot-marker'].action, self._on_test_reboot_marker) | ||
| framework.observe( | ||
| self.on['test-relation-model-get'].action, self._on_test_relation_model_get | ||
| ) | ||
| framework.observe(self.on['test-resource-get'].action, self._on_test_resource_get) | ||
| framework.observe(self.on['test-secret-grant'].action, self._on_test_secret_grant) | ||
| framework.observe(self.on['test-secret-revoke'].action, self._on_test_secret_revoke) | ||
| framework.observe(self.on['test-storage-add'].action, self._on_test_storage_add) | ||
| framework.observe( | ||
| self.on['test-ports-endpoint-scoped'].action, | ||
| self._on_test_ports_endpoint_scoped, | ||
| ) | ||
|
|
||
| # Lifecycle | ||
|
|
||
|
|
@@ -343,23 +355,49 @@ def _on_test_credential_get(self, event: ops.ActionEvent): | |
| cloud = hookcmds.credential_get() | ||
| event.set_results({'cloud-type': cloud.type, 'cloud-name': cloud.name}) | ||
|
|
||
| # Reboot | ||
| # Reboot — driven by config-changed because juju forbids juju-reboot | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
That's interesting. Could you point me to the place where I can read more about this Juju behavior?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's our docs and the juju docs, probably not much else.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah great. Thank you ! |
||
| # from an action context. | ||
|
|
||
| def _on_test_juju_reboot(self, event: ops.ActionEvent): | ||
| """Queue a machine reboot via juju_reboot(now=False).""" | ||
| def _on_config_changed(self, event: ops.ConfigChangedEvent): | ||
| if self.config.get('reboot-trigger') != 'reboot-please': | ||
| return | ||
| if _REBOOT_MARKER.exists(): | ||
| # Already triggered this deployment; don't reboot again. | ||
| return | ||
| _REBOOT_MARKER.write_text('triggered') | ||
| hookcmds.juju_reboot(now=False) | ||
|
|
||
| def _on_test_reboot_marker(self, event: ops.ActionEvent): | ||
| # Compute the boot epoch as `now - uptime`. /proc/stat's `btime` is | ||
|
tromai marked this conversation as resolved.
|
||
| # the host kernel boot time and doesn't change in LXD containers, so | ||
| # we can't use it directly; /proc/uptime, however, is virtualised | ||
| # per-container by lxcfs and resets on juju-reboot. Comparing the | ||
| # derived boot epoch before/after proves the reboot took effect (the | ||
| # marker alone only proves config-changed ran the path). | ||
| with open('/proc/uptime') as f: | ||
| uptime_seconds = float(f.read().split()[0]) | ||
| boot_time = int(time.time() - uptime_seconds) | ||
| event.set_results({ | ||
| 'marker-exists': str(_REBOOT_MARKER.exists()).lower(), | ||
| 'boot-time': str(boot_time), | ||
| }) | ||
|
|
||
| # Relation model | ||
|
|
||
| def _on_test_relation_model_get(self, event: ops.ActionEvent): | ||
| """Return the model UUID for the peer relation.""" | ||
| """Return the model UUID for the peer relation, plus JUJU_MODEL_UUID.""" | ||
| ids = hookcmds.relation_ids('peer') | ||
| if not ids: | ||
| event.fail('No peer relation IDs - deploy 2+ units') | ||
| return | ||
| rel_id = int(ids[0].split(':')[-1]) | ||
| model = hookcmds.relation_model_get(id=rel_id, endpoint='peer') | ||
| event.set_results({'uuid': model.uuid}) | ||
| event.set_results({ | ||
| 'uuid': model.uuid, | ||
| # Emit the env-side model UUID alongside so the test can verify | ||
| # the two match without doing its own juju show-model dance. | ||
| 'env-model-uuid': os.environ.get('JUJU_MODEL_UUID', ''), | ||
| }) | ||
|
|
||
| # Resource | ||
|
|
||
|
|
@@ -368,28 +406,42 @@ def _on_test_resource_get(self, event: ops.ActionEvent): | |
| path = hookcmds.resource_get('test-file') | ||
| event.set_results({'path': str(path)}) | ||
|
|
||
| # Secret grant / revoke | ||
| # Cross-app secret grant / revoke | ||
|
|
||
| def _on_test_secret_grant(self, event: ops.ActionEvent): | ||
| """Create a secret and grant it to the peer relation.""" | ||
| ids = hookcmds.relation_ids('peer') | ||
| """Add a secret and grant it to the related any-charm app.""" | ||
| ids = hookcmds.relation_ids('anycharm') | ||
| if not ids: | ||
| event.fail('No peer relation - deploy 2+ units') | ||
| event.fail('No anycharm relation IDs - is any-charm deployed and integrated?') | ||
| return | ||
| rel_id = int(ids[0].split(':')[-1]) | ||
| secret_id = hookcmds.secret_add({'key': 'grant-test-value'}, label='grant-test') | ||
|
|
||
| secret_id = hookcmds.secret_add( | ||
| {'token': 'grant-test-token'}, | ||
| label='hookcmds-grant-test', | ||
| description='Created for grant/revoke integration test', | ||
| ) | ||
| hookcmds.secret_grant(secret_id, rel_id) | ||
| event.set_results({'secret-id': secret_id}) | ||
|
|
||
| event.set_results({ | ||
| 'secret-id': secret_id, | ||
| 'relation-id': str(rel_id), | ||
| 'granted': 'true', | ||
| }) | ||
|
|
||
| def _on_test_secret_revoke(self, event: ops.ActionEvent): | ||
| """Revoke access to a secret from the peer relation.""" | ||
| """Revoke the grant on a secret, then remove it entirely.""" | ||
| secret_id = event.params['secret-id'] | ||
| ids = hookcmds.relation_ids('peer') | ||
| ids = hookcmds.relation_ids('anycharm') | ||
| if not ids: | ||
| event.fail('No peer relation - deploy 2+ units') | ||
| event.fail('No anycharm relation IDs - is any-charm deployed and integrated?') | ||
| return | ||
| rel_id = int(ids[0].split(':')[-1]) | ||
|
|
||
| hookcmds.secret_revoke(secret_id, relation_id=rel_id) | ||
| hookcmds.secret_remove(secret_id) | ||
|
|
||
| event.set_results({'revoked': 'true'}) | ||
|
|
||
| # Storage add | ||
|
|
||
|
|
@@ -399,6 +451,37 @@ def _on_test_storage_add(self, event: ops.ActionEvent): | |
| hookcmds.storage_add({'data': 1}) | ||
| event.set_results({'count-before-add': str(len(current))}) | ||
|
|
||
| # Endpoint-scoped ports | ||
|
|
||
| def _on_test_ports_endpoint_scoped(self, event: ops.ActionEvent): | ||
| """Open a port scoped to the peer endpoint, verify, then close it.""" | ||
| port = int(event.params.get('port', 7766)) | ||
| endpoint = 'peer' | ||
|
|
||
| hookcmds.open_port('tcp', port, endpoints=endpoint) | ||
|
|
||
| all_ports = hookcmds.opened_ports(endpoints=True) | ||
| our_port = next( | ||
| (p for p in all_ports if p.port == port and p.protocol == 'tcp'), | ||
| None, | ||
| ) | ||
|
|
||
| port_found = our_port is not None | ||
| ep_list = (our_port.endpoints or []) if our_port else [] | ||
| endpoint_matches = port_found and endpoint in ep_list | ||
|
|
||
| hookcmds.close_port('tcp', port, endpoints=endpoint) | ||
|
|
||
| final = hookcmds.opened_ports(endpoints=True) | ||
| still_open = any(p.port == port and p.protocol == 'tcp' for p in final) | ||
|
|
||
| event.set_results({ | ||
| 'port-found-with-endpoint': str(port_found).lower(), | ||
| 'endpoints-list': ','.join(ep_list), | ||
| 'endpoint-matches': str(endpoint_matches).lower(), | ||
| 'closed-after': str(not still_open).lower(), | ||
| }) | ||
|
|
||
|
|
||
| if __name__ == '__main__': | ||
| ops.main(TestHookcmdsCharm) | ||
Uh oh!
There was an error while loading. Please reload this page.