Skip to content

Commit

Permalink
Restructure state file format for better execution flow control
Browse files Browse the repository at this point in the history
  • Loading branch information
merlinz01 committed Dec 4, 2024
1 parent 8cb5d79 commit 6cfea45
Show file tree
Hide file tree
Showing 9 changed files with 301 additions and 91 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

This is the changelog for RedPepper.

## [Unreleased]

### Changed

- **Breaking change:** Restructure state file format for better execution flow control.
- This is not backwards compatible. All state files must be updated.

## [0.2.0]

### Changed
Expand Down
36 changes: 18 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,24 +48,24 @@ See [Configuration](docs/configuration.md) for more info.
### Sample state file

```yaml
Server installed:
type: package.Installed
name: nginx

Config file installed:
type: file.Installed
source: file-stored-on-manager.conf
path: /etc/nginx/installed-by-redpepper.conf
user: nginx
group: nginx
mode: 0600
if:
- py: not sys.platform.startswith('win')
- not file exists: /some/other/file

Server running:
type: service.Running
name: nginx
- Nginx:
- Packages:
type: package.Installed
name: nginx
- Config file:
type: file.Installed
source: file-stored-on-manager.conf
path: /etc/nginx/installed-by-redpepper.conf
user: nginx
group: nginx
mode: 0600
if:
all:
- py: not sys.platform.startswith('win')
- not file exists: /some/other/file
- Server running:
type: service.Running
name: nginx
```
## Security
Expand Down
6 changes: 3 additions & 3 deletions config/data/state/common.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
Fail2ban installed:
type: command.Run
command: sleep 1 && echo "Fail2ban installed!"
- Fail2ban installed:
type: command.Run
command: sleep 1 && echo "Fail2ban installed!"
6 changes: 3 additions & 3 deletions config/data/state/vpn.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
Wireguard installed:
type: command.Run
command: sleep 2 && echo "Wireguard installed!"
- Wireguard installed:
type: command.Run
command: sleep 1 && echo "Wireguard installed!"
92 changes: 72 additions & 20 deletions config/data/state/webservers.yml
Original file line number Diff line number Diff line change
@@ -1,21 +1,73 @@
NTP installed:
type: command.Run
command: sleep 1 && echo "NTP installed!"
- NTP installed:
type: command.Run
command: sleep 1 && echo "NTP installed!"

Go installed:
# type: command.RunMultiple
# commands:
# - wget -q https://golang.org/dl/go1.22.2.linux-amd64.tar.gz -O /tmp/go.tar.gz
# - sudo tar -C /usr/local -xzf /tmp/go.tar.gz
# - rm /tmp/go.tar.gz
# if:
# - not any:
# - not file exists: /usr/local/go/bin/go
# - cmd 0,2: go version
# - not py: print("Go version is installed") or True
# - false
# - not all:
# - not cmd: go version
# - file exists: /usr/local/go/bin/go
type: command.Run
command: cowsay "Installed Go!"
# Please note: these all use echo so that no changes are actually made to your system.
# In real life, the shell commands would be run directly
# and the function-style commands would replace command.Run.
- Web server:
- System user:
- Group:
type: command.Run
command: echo "groupadd webserver"
if: { not cmd: false && getent group webserver }
- User:
type: command.Run
command: echo "useradd -g webserver -m -d /home/webserver webserver"
if: { not cmd: false && getent passwd webserver }
- Prerequisites:
- Python:
type: command.Run
command: echo "package.Installed(name=python3)"
- Venv:
type: command.Run
command: echo "package.Installed(name=python3-venv)"
- Git SSH trust:
- SSH dir:
type: command.Run
command: echo "file.Dir(path=/home/webserver/.ssh)"
- Deployment key:
type: command.Run
command: echo "file.Installed(path=/home/webserver/.ssh/id_rsa)"
- Git server trusted:
type: command.Run
command: echo "ssh-keyscan my.gitea.server >> /home/webserver/.ssh/known_hosts"
if: { not cmd: false && grep my.gitea.server /home/webserver/.ssh/known_hosts }
- Code synced:
type: command.Run
command: echo "git.UpToDate(target=/home/webserver/src, [email protected]/repo)"
- Virtual environment:
type: command.Run
command: echo "python3 -m venv /home/webserver/src/.venv"
if: { not path isdir: /home/webserver/src/.venv }
- Python dependencies:
type: command.Run
command: echo "bash -c \"source .venv/bin/activate && pip3 install -q -r requirements.txt\""
if: { changed: Code synced }
- Config file:
type: command.Run
command: echo "file.Installed(path=/home/webserver/src/config.yml, source=webserver-config.yml)"
- Database migrations:
type: command.Run
command: echo "bash -c \"source .venv/bin/activate && aerich upgrade\""
- Systemd service file:
type: command.Run
command: echo "file.Symlinked(path=/etc/systemd/system/webserver.service, target=/home/webserver/src/systemd/webserver.service)"
- Systemd reloaded:
type: command.Run
command: echo "systemd daemon-reload"
if: { changed: Systemd service file }
- Service restarted:
type: command.Run
command: echo "service.Restart(name=webserver)"
if:
any:
- changed: Code synced
- changed: Config file
- Service enabled:
type: command.Run
command: echo "service.Enabled(name=webserver)"
- Server running:
type: command.Run
command: echo "service.Running(name=webserver)"

106 changes: 64 additions & 42 deletions src/agent/redpepper/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
import ssl
import subprocess
import traceback
from typing import Any, Coroutine, Sequence
from collections import OrderedDict
from typing import Any, Coroutine, Generator, Sequence

import trio

Expand Down Expand Up @@ -136,8 +137,8 @@ async def _run_command(
state_data = await self.conn.rpc.call(
"custom", "stateDefinition", state_name=state_name
)
if not isinstance(state_data, dict):
raise ValueError(f"State {state_name} is not a dictionary")
if not isinstance(state_data, list):
raise ValueError(f"State {state_name} is not a list")
result = await self.run_state(
state_name, state_data, commandID=commandID
)
Expand Down Expand Up @@ -165,7 +166,7 @@ async def _run_command(
await self.conn.send_message(res)

async def do_operation(
self, cmdtype: str, args: Sequence[Any], kw: dict[str, Any]
self, cmdtype: str, args: Sequence[Any], kw: dict[str, Any], changed: dict = {}
) -> Result:
# In this function we can raise errors because callers will catch them
logger.debug("Running operation %s", cmdtype)
Expand Down Expand Up @@ -230,7 +231,7 @@ async def do_operation(
# Check if the operation is prevented by a condition
condition = kw.pop("if", None)
logger.debug("Checking operation condition %r", condition)
if not self.evaluate_condition(condition):
if not self.evaluate_condition(condition, changed=changed):
logger.debug("Operation condition not met for %s, not running", cmdtype)
result = Result(cmdtype)
result.succeeded = True
Expand All @@ -256,29 +257,35 @@ async def do_operation(
logger.debug("Operation result: %s", result)
return result

def _walk_state_tree(
self, items: list, parents: tuple = ()
) -> Generator[tuple[tuple, dict], None, None]:
for item in items:
if not isinstance(item, dict) or len(item) != 1:
raise ValueError("State group or name not a single-key dict")
key = next(iter(item))
value = item[key]
path = (*parents, key)
if isinstance(value, list):
yield from self._walk_state_tree(value, path)
else:
yield path, value

async def run_state(
self,
state_name: str,
state_data: dict[str, dict | list | None],
state_data: list,
commandID: str | None = None,
) -> Result:
# For now we can raise errors because we don't have any previous output to return.
# Arrange the state entries into a list of OperationSpec objects
tasks: list[OperationSpec] = []
for state_task_name, state_definition in state_data.items():
if isinstance(state_definition, list):
for i, item in enumerate(state_definition, 1):
if not isinstance(item, dict):
raise TypeError(
f"State {state_data} task {state_task_name} item {i} is not a dictionary"
)
tasks.append(OperationSpec(f"{state_task_name} #{i}", item))
elif isinstance(state_definition, dict):
tasks.append(OperationSpec(state_task_name, state_definition))
else:
for state_path, state_definition in self._walk_state_tree(state_data):
if not isinstance(state_definition, dict):
raise TypeError(
f"State {state_data} task {state_task_name} is not a dictionary or list"
f"State {state_data} task {':'.join(state_path)} is not a dictionary or list"
)
tasks.append(OperationSpec(":".join(state_path), state_definition))

# Task counter
i = 0
Expand All @@ -289,17 +296,27 @@ async def run_state(
await self.send_command_progress(
commandID, 0, len(tasks), f"Starting {state_name}..."
)
# Keep track of what changed
changed = OrderedDict()
# Run the tasks
for task in tasks:
# Give other tasks a chance to run
await trio.sleep(0)
# Send the progress message
if commandID is not None:
await self.send_command_progress(
commandID, i, len(tasks), f"Running {task.name}"
)
# Update the result with the operation name
result += f"\nRunning state {task.name}:"
# Extract the parameters
kwargs = task.data
cmdtype = kwargs.pop("type")
onchange = kwargs.pop("onchange", None)
# Run the operation
try:
cmd_result = await self.do_operation(cmdtype, [], kwargs)
cmd_result = await self.do_operation(
cmdtype, [], kwargs, changed=changed
)
except Exception:
cmd_result = Result(task.name)
cmd_result.fail(
Expand All @@ -312,23 +329,11 @@ async def run_state(
# Stop if the operation failed
if not result.succeeded:
break
# Run the onchange operation if the operation succeeded and an onchange operation is defined
if onchange and cmd_result.changed:
onchange_name = task.name + " onchange"
try:
onchange_result = await self.run_state(
onchange_name, {onchange_name: onchange}
)
except Exception:
onchange_result = Result(onchange_name)
onchange_result.fail(
f"Failed to execute state {onchange_name}:\n{traceback.format_exc()}"
)
# Update the result with the onchange operation output
result.update(onchange_result, raw_output=True)
# Stop if the onchange operation failed
if not result.succeeded:
break
changed[task.name] = cmd_result.changed
if cmd_result.changed:
for item in changed:
if task.name.startswith(item + ":"):
changed[item] = True
# Send the progress message
if commandID is not None:
await self.send_command_progress(
Expand All @@ -351,7 +356,7 @@ async def send_command_progress(
)
await self.conn.send_message(message)

def evaluate_condition(self, condition: Any) -> bool:
def evaluate_condition(self, condition: Any, changed: dict = {}) -> bool:
if condition is None:
return True
if isinstance(condition, bool):
Expand All @@ -365,14 +370,14 @@ def evaluate_condition(self, condition: Any) -> bool:
if isinstance(condition, dict) and len(condition) > 1:
raise ValueError("Use a list for multiple conditions")
if isinstance(condition, list):
return all(self.evaluate_condition(c) for c in condition)
return all(self.evaluate_condition(c, changed) for c in condition)
if not isinstance(condition, dict):
raise ValueError("Condition must be a single key-value pair")
k = next(iter(condition))
v = condition[k]
logger.debug("Evaluating condition %r: %r", k, v)
if k == "not":
return not self.evaluate_condition(v)
return not self.evaluate_condition(v, changed)
negate = False
words = k.split()
if words[0] == "not":
Expand All @@ -397,13 +402,30 @@ def evaluate_condition(self, condition: Any) -> bool:
raise ValueError("Invalid condition name: {k!r}")
if not isinstance(v, list):
raise ValueError("Value for all condition must be a list")
return not negate if all(self.evaluate_condition(c) for c in v) else negate
return (
not negate
if all(self.evaluate_condition(c, changed) for c in v)
else negate
)
if ctype == "any":
if words:
raise ValueError("Invalid condition name: {k!r}")
if not isinstance(v, list):
raise ValueError("Value for any condition must be a list")
return not negate if any(self.evaluate_condition(c) for c in v) else negate
return (
not negate
if any(self.evaluate_condition(c, changed) for c in v)
else negate
)
if ctype == "changed":
if words:
raise ValueError("Invalid condition name: {k!r}")
if not isinstance(v, str):
raise ValueError("Value for changed condition must be a string")
for item in reversed(changed):
if changed[item]:
return not negate
return negate
if ctype == "py":
if words:
raise ValueError("Invalid condition name: {k!r}")
Expand Down
Loading

0 comments on commit 6cfea45

Please sign in to comment.