Skip to content

Commit

Permalink
Migrate operations and requests to async
Browse files Browse the repository at this point in the history
  • Loading branch information
merlinz01 committed Dec 3, 2024
1 parent 5195651 commit 19fadb1
Show file tree
Hide file tree
Showing 21 changed files with 80 additions and 54 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@

This is the changelog for RedPepper.

## [Unreleased]

### Changed

- **Breaking change:** Migrate operations and requests to be asynchronous.
- Non-asynchronous operations are still functional, but expect support for them to
be removed in a future release.

## [0.1.4]

### Changed
Expand Down
14 changes: 7 additions & 7 deletions docs/operations.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ Parameter validation and assigning parameters to attributes should be done here.

If an error is raised in the `__init__` method, it will stop all further operations and be reported to the user.

### `def __str__(self) -> str`
### `async def __str__(self) -> str`

Operations should define this method to provide
a concise, user-friendly indicator of the operation and its parameters.
Expand All @@ -43,7 +43,7 @@ This does not need to be a valid Python expression.

Example format: `file.Installed("/some/file" from "some-source-file.txt")`

### `def run(self, agent: Agent) -> Result`
### `async def run(self, agent: Agent) -> Result`

This method is generally where the operation is executed.

Expand Down Expand Up @@ -99,7 +99,7 @@ use the provided agent's `request_data()` method.
ok, data = agent.request_data('some.key.defined.in.the.YAML.files')
```

### `def test(self, agent: Agent) -> bool`
### `async def test(self, agent: Agent) -> bool`

This function is to determine if the operation needs to be executed,
or whether the desired outcome already exists.
Expand All @@ -109,15 +109,15 @@ Return True if the outcome already exists, or False if it does not and the comma
By default this function returns False,
so that the operation's `run()` method is called every time.

### `def ensure(self, agent: Agent) -> Result`
### `async def ensure(self, agent: Agent) -> Result`

Make sure the outcome exists by executing the operation if needed.
This is the method is called by RedPepper when a command or state is executed.

The default implementation is basically a combination of `test()` and `run()`.
Many operations will not need to reimplement this method,
as this default implementation is sufficient.
However, in some cases it may be better to skip an explicit test and simply execute the command every time.
However, in some cases it may be better to skip an explicit test and simply execute the command every time, checking the output of the command to determine if anything changed.

## Example

Expand All @@ -140,11 +140,11 @@ class WriteSomeConfigFile(Operation):
# Return a concise representation of the operation
return f'custom.WriteSomeConfigFile({self.username}:***@{self.host}:{self.port} to file {self.filename})'

def test(self, agent):
async def test(self, agent):
# Test if the file already exists
return os.path.isfile(self.filename)

def run(self, agent):
async def run(self, agent):
# Write the file
result = Result()
with open(self.filename, 'w') as out:
Expand Down
18 changes: 12 additions & 6 deletions src/common/redpepper/common/operations.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Variables and functions for use by the operation modules."""

import traceback
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Coroutine

if TYPE_CHECKING:
from redpepper.agent.agent import Agent # pragma: no cover
Expand All @@ -12,18 +12,24 @@ class Operation:

_no_changes_text = "No changes needed."

def run(self, agent: "Agent") -> "Result":
async def run(self, agent: "Agent") -> "Result":
"""Run the operation to ensure the condition exists. Assume test() returned False."""
raise NotImplementedError # pragma: no cover

def test(self, agent: "Agent") -> bool:
async def test(self, agent: "Agent") -> bool:
"""Return True if the condition created by this operation already exists."""
return False

def ensure(self, agent: "Agent") -> "Result":
async def ensure(self, agent: "Agent") -> "Result":
"""Ensure that the condition created by this operation exists, running the operation if the test returns False."""
if not self.test(agent):
return self.run(agent)
test = self.test(agent)
if isinstance(test, Coroutine):
test = await test
if not test:
res = self.run(agent)
if isinstance(res, Coroutine):
res = await res
return res
result = Result(self)
result += self._no_changes_text
return result
Expand Down
10 changes: 6 additions & 4 deletions src/operations/redpepper/operations/apt.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def __init__(self, name, env={}):
def __str__(self):
return f"apt.Installed({self.name})"

def test(self, agent):
async def test(self, agent):
cmd = [
"dpkg-query",
"--showformat",
Expand All @@ -45,7 +45,7 @@ def test(self, agent):
logger.error("dpkg-query failed with return code %s", p.returncode)
raise subprocess.CalledProcessError(p.returncode, cmd, p.stdout, p.stderr)

def run(self, agent):
async def run(self, agent):
result = Result(self)
cmd = [
"apt-get",
Expand Down Expand Up @@ -73,9 +73,11 @@ class UnattendedUpgrade(Operation):
def __str__(self):
return "apt.UnattendedUpgrade()"

def run(self, agent):
async def run(self, agent):
result = Result(self)
if not result.update(Installed("unattended-upgrades").ensure(agent)).succeeded:
if not result.update(
await Installed("unattended-upgrades").ensure(agent)
).succeeded:
return result
p = subprocess.run(["unattended-upgrades"], capture_output=True, text=True)
if result.check_completed_process(p).succeeded:
Expand Down
6 changes: 3 additions & 3 deletions src/operations/redpepper/operations/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def __init__(
def __str__(self):
return f'Run("{self.command}{"" if self.wait else " &"}"{" as " + self.user if self.user else ""})'

def run(self, agent):
async def run(self, agent):
result = Result(self)
kw = {}
kw["env"] = os.environ.copy()
Expand Down Expand Up @@ -102,10 +102,10 @@ def __init__(self, commands, **kw):
def __str__(self):
return f"RunMultiple({len(self.commands)} commands)"

def run(self, agent):
async def run(self, agent):
result = Result(self)
for cmd in self.commands:
result += f"Running {cmd}:"
if not result.update(cmd.run(agent)).succeeded:
if not result.update(await cmd.run(agent)).succeeded:
return result
return result
4 changes: 2 additions & 2 deletions src/operations/redpepper/operations/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ def __init__(self, name):
def __str__(self):
return f"data.Show({self.name})"

def run(self, agent):
async def run(self, agent):
result = Result(self)
data = agent.request("data", name=self.name)
data = await agent.conn.rpc.call("custom", "data", name=self.name)
result.succeeded = True
result += data
return result
2 changes: 1 addition & 1 deletion src/operations/redpepper/operations/echo.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def __init__(self, message, reverse=False):
def __str__(self):
return f'echo.Echo("{self.message}"{" reverse" if self.reverse else ""})'

def run(self, agent):
async def run(self, agent):
result = Result(self)
message = self.message
if self.reverse:
Expand Down
21 changes: 12 additions & 9 deletions src/operations/redpepper/operations/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def __str__(self):
source = f'"{self.source}"'
return f'file.Installed("{self.path}" from {source})'

def ensure(self, agent):
async def ensure(self, agent):
result = Result(self)
# Open in binary writable mode without truncating the file
try:
Expand Down Expand Up @@ -75,7 +75,7 @@ def ensure(self, agent):
result.changed = True
if self.overwrite:
try:
nwritten, mtime = self.ensure_file_contents(agent, f)
nwritten, mtime = await self.ensure_file_contents(agent, f)
except Exception:
result.exception()
return result
Expand All @@ -89,7 +89,7 @@ def ensure(self, agent):
result += f'File "{self.path}" is already in the specified state.'
return result

def ensure_file_contents(self, agent, f: io.BufferedIOBase):
async def ensure_file_contents(self, agent, f: io.BufferedIOBase):
logger.debug("Comparing file using %s method", self.method)
rewrite = False
if self.method == "content":
Expand All @@ -107,7 +107,9 @@ def ensure_file_contents(self, agent, f: io.BufferedIOBase):
nwritten = len(shouldbe)
return nwritten, None
return None, None
remote_stat = agent.request("dataFileStat", path=self.source)
remote_stat = await agent.conn.rpc.call(
"custom", "dataFileStat", path=self.source
)
if self.method == "stat":
try:
existing_stat = os.fstat(f.fileno())
Expand All @@ -130,7 +132,7 @@ def ensure_file_contents(self, agent, f: io.BufferedIOBase):
or existing_size != remote_stat["size"]
)
elif self.method == "hash":
hash = agent.request("dataFileHash", path=self.source)
hash = await agent.conn.rpc.call("custom", "dataFileHash", path=self.source)
existing_hash = self.hash_file(f)
logger.debug("Hash of %s: %s vs. %s", self.path, existing_hash, hash)
rewrite = existing_hash != hash
Expand All @@ -140,7 +142,8 @@ def ensure_file_contents(self, agent, f: io.BufferedIOBase):
# leave the file in an inconsistent state if the connection is lost
contents = io.BytesIO()
while True:
data = agent.request(
data = await agent.conn.rpc.call(
"custom",
"dataFileContents",
filename=self.source,
offset=contents.tell(),
Expand Down Expand Up @@ -186,7 +189,7 @@ def test(self, agent):
existing_target = None
return existing_target == self.target

def run(self, agent):
async def run(self, agent):
result = Result(self)
try:
existing_target = os.readlink(self.path)
Expand Down Expand Up @@ -219,7 +222,7 @@ def __init__(self, path, user=None, group=None, mode=None):
def __str__(self):
return f'file.DirExists("{self.path}")'

def test(self, agent):
async def test(self, agent):
if not os.path.isdir(self.path):
return False
stat = os.stat(self.path)
Expand All @@ -231,7 +234,7 @@ def test(self, agent):
return False
return True

def run(self, agent):
async def run(self, agent):
result = Result(self)
try:
stat = os.stat(self.path)
Expand Down
2 changes: 1 addition & 1 deletion src/operations/redpepper/operations/git.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def __init__(
def __str__(self):
return f'git.UpToDate({self.target} from "{self.repo}")'

def ensure(self, agent):
async def ensure(self, agent):
result = Result(self)
clone = False
if not os.path.isdir(self.target):
Expand Down
2 changes: 1 addition & 1 deletion src/operations/redpepper/operations/noop.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ def __init__(self):
def __str__(self):
return "no-op"

def ensure(self, agent):
async def ensure(self, agent):
result = Result(self)
result.succeeded = True
return result
2 changes: 1 addition & 1 deletion src/operations/redpepper/operations/py.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def __str__(self):
cmd = cmd[:50] + "..."
return f'py.Exec("{cmd.strip()}")'

def ensure(self, agent):
async def ensure(self, agent):
result = Result(self)
try:
exec(self.script, self.env, locals())
Expand Down
8 changes: 4 additions & 4 deletions src/operations/redpepper/operations/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ def __init__(self, request_name, **kwargs):
def __str__(self):
return f"request.Request({self.request_name})"

def run(self, agent):
async def run(self, agent):
result = Result(self)
data = agent.request(self.request_name, **self.kwargs)
data = await agent.conn.rpc.call(self.request_name, **self.kwargs)
result.succeeded = True
result += str(data)
return result
Expand All @@ -25,9 +25,9 @@ def __init__(self, request_name, **kwargs):
def __str__(self):
return f"request.StateRequest({self.request_name})"

def run(self, agent):
async def run(self, agent):
result = Result(self)
data = agent.request(self.request_name, **self.kwargs)
data = await agent.conn.rpc.call(self.request_name, **self.kwargs)
result.succeeded = data["succeeded"]
result.changed = data["changed"]
result += data["output"]
Expand Down
4 changes: 2 additions & 2 deletions src/operations/redpepper/operations/sysctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __init__(self, name, value):
def __str__(self):
return f"sysctl.Parameter({self.name} = {self.value})"

def test(self, agent):
async def test(self, agent):
if not os.path.exists(SYSCTL_CONF_PATH):
return False
with open(SYSCTL_CONF_PATH, "r") as f:
Expand All @@ -45,7 +45,7 @@ def test(self, agent):
return True
return False

def run(self, agent):
async def run(self, agent):
result = Result(self)
with open(SYSCTL_CONF_PATH, "a") as f:
f.write(f"\n{self.name} = {self.value}\n")
Expand Down
12 changes: 6 additions & 6 deletions src/operations/redpepper/operations/systemd.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ def __init__(self, name):
def __str__(self):
return f"systemd.Running({self.name})"

def test(self, agent):
async def test(self, agent):
cmd = ["systemctl", "is-active", self.name]
p = subprocess.run(cmd, stdout=subprocess.DEVNULL)
return p.returncode == 0

def run(self, agent):
async def run(self, agent):
result = Result(self)
cmd = ["systemctl", "start", self.name]
p = subprocess.run(cmd, capture_output=True, text=True)
Expand All @@ -36,12 +36,12 @@ def __init__(self, name):
def __str__(self):
return f"systemd.Enabled({self.name})"

def test(self, agent):
async def test(self, agent):
cmd = ["systemctl", "is-enabled", self.name]
p = subprocess.run(cmd, stdout=subprocess.DEVNULL)
return p.returncode == 0

def run(self, agent):
async def run(self, agent):
result = Result(self)
cmd = ["systemctl", "enable", self.name]
p = subprocess.run(cmd, capture_output=True, text=True)
Expand All @@ -58,7 +58,7 @@ def __init__(self, name):
def __str__(self):
return f"systemd.Restart({self.name})"

def run(self, agent):
async def run(self, agent):
result = Result(self)
cmd = ["systemctl", "restart", self.name]
p = subprocess.run(cmd, capture_output=True, text=True)
Expand All @@ -75,7 +75,7 @@ def __init__(self, name):
def __str__(self):
return f"systemd.Reload({self.name})"

def run(self, agent):
async def run(self, agent):
result = Result(self)
cmd = ["systemctl", "reload", self.name]
p = subprocess.run(cmd, capture_output=True, text=True)
Expand Down
3 changes: 2 additions & 1 deletion src/requests/redpepper/requests/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
from redpepper.requests import RequestError


def call(conn: AgentConnection, name: str):
async def call(conn: AgentConnection, name: str):
assert conn.agent_id
try:
return conn.manager.data_manager.get_data_for_agent(conn.agent_id, name)
except KeyError:
Expand Down
Loading

0 comments on commit 19fadb1

Please sign in to comment.