Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 174 additions & 61 deletions src/keri/app/httping.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# -*- encoding: utf-8 -*-
"""
keri.peer.httping module
keri.app.httping module

Provides utilities for sending and receiving KERI events over HTTP
"""
import datetime
import json
Expand All @@ -28,19 +29,39 @@


class SignatureValidationComponent(object):
""" Validate SKWA signatures """
"""Validates SKWA signatures on incoming requests.

Verifies that each request carries a ``Signature`` header whose value is a
valid signature over the JSON-encoded request body, produced by the
controller identified by ``pre``.

Attributes:
hby: Habery instance providing access to the local key state.
pre (str): qb64 identifier prefix of the expected signer.
"""

def __init__(self, hby, pre):
"""Initializes SignatureValidationComponent.

Args:
hby: Habery instance used to look up key state for ``pre``.
pre (str): qb64 identifier prefix of the controller whose
signature must be present on every request.
"""
self.hby = hby
self.pre = pre

def process_request(self, req, resp):
""" Process request to ensure has a valid signature from controller
"""Validates the ``Signature`` header against the request body.

Parameters:
req: Http request object
resp: Http response object
Reads the ``Signature`` header and the JSON-encoded media body,
then delegates to :meth:`validate`. Sets the response status to
``401 Unauthorized`` and marks the response complete if validation
fails, preventing further processing.

Args:
req (falcon.Request): Incoming HTTP request object.
resp (falcon.Response): Outgoing HTTP response object.
"""
sig = req.headers.get("SIGNATURE")
ked = req.media
Expand All @@ -51,6 +72,23 @@ def process_request(self, req, resp):
return

def validate(self, sig, ser):
"""Verifies a raw signature string against serialized data.

Parses the ``sig`` string into signage markers and checks each
indexed verfer in the current key state of ``self.pre`` against
the corresponding siger.

Args:
sig (str): Raw signature header value, parseable by
:func:`~keri.end.designature`.
ser (bytes): Serialized data that was signed.

Returns:
bool: ``True`` if all verfers successfully verify their
corresponding sigers; ``False`` if ``self.pre`` is absent
from the key state, a required index is missing, or any
signature fails verification.
"""
signages = designature(sig)
markers = signages[0].markers

Expand All @@ -73,18 +111,37 @@ def validate(self, sig, ser):

@dataclass
class CesrRequest:
"""Container for a parsed CESR HTTP request.

Attributes:
payload (dict): Decoded JSON body of the request.
attachments (str): Value of the ``CESR-ATTACHMENT`` header.
"""
payload: dict
attachments: str


def parseCesrHttpRequest(req):
"""
Parse Falcon HTTP request and create a CESR message from the body of the request and the two
CESR HTTP headers (Date, Attachment).

Parameters
req (falcon.Request) http request object in CESR format:

"""Parses a Falcon HTTP request in CESR format into a :class:`CesrRequest`.

Validates the ``Content-Type`` header, decodes the JSON body, and
extracts the required ``CESR-ATTACHMENT`` header.

Args:
req (falcon.Request): Incoming HTTP request object. Must have
``Content-Type: application/cesr`` and a valid JSON body.

Returns:
CesrRequest: Dataclass holding the decoded payload and the raw
attachment header value.

Raises:
falcon.HTTPError: With status ``406 Not Acceptable`` if the
content type is not ``application/cesr``.
falcon.HTTPError: With status ``400 Bad Request`` if the body
cannot be decoded as JSON.
falcon.HTTPError: With status ``412 Precondition Failed`` if the
``CESR-ATTACHMENT`` header is absent.
"""
if req.content_type != CESR_CONTENT_TYPE:
raise falcon.HTTPError(falcon.HTTP_NOT_ACCEPTABLE,
Expand Down Expand Up @@ -113,15 +170,24 @@ def parseCesrHttpRequest(req):


def createCESRRequest(msg, client, dest, path=None):
"""
Turns a KERI message into a CESR http request against the provided hio http Client

Parameters
msg: KERI message parsable as Serder.raw
dest (str): qb64 identifier prefix of destination controller
client: hio http Client that will send the message as a CESR request
path (str): path to post to

"""Converts a single KERI message into a CESR HTTP POST request.

Deserializes the leading event from ``msg`` using
:class:`~keri.core.SerderKERI`, strips it from the bytearray, treats
the remainder as the attachment, and issues a ``POST`` via ``client``.

Args:
msg (bytearray): Raw KERI message stream. The leading event is
consumed; remaining bytes become the attachment.
client: hio HTTP ``Client`` instance used to send the request.
dest (str): qb64 identifier prefix of the destination controller,
written to the ``CESR-DESTINATION`` header.
path (str, optional): URL path to post to. Defaults to ``"/"``.

Raises:
ExtractionError: If fewer bytes are available than the declared
event size (:class:`~keri.kering.ShortageError` is caught
internally and re-raised as :class:`~keri.kering.ExtractionError`).
"""
path = path if path is not None else "/"

Expand Down Expand Up @@ -152,18 +218,35 @@ def createCESRRequest(msg, client, dest, path=None):


def streamCESRRequests(client, ims, dest, path=None, headers=None):
"""
Turns a stream of KERI messages into CESR http requests against the provided hio http Client

Parameters
client (Client): hio http Client that will send the message as a CESR request
ims (bytearray): stream of KERI messages parsable as Serder.raw
dest (str): qb64 identifier prefix of destination controller
path (str): path to post to

Returns
int: Number of individual requests posted

"""Decomposes a KERI message stream into individual CESR HTTP POST requests.

Iterates over ``ims``, extracting one :class:`~keri.core.Sadder` event
at a time followed by its attachment bytes (everything up to the next
``0x7b`` / ``{`` byte). Each event-plus-attachment pair is dispatched
as a separate ``POST`` via ``client``.

Args:
client: hio HTTP ``Client`` instance that will send each request.
ims (bytearray): Stream of concatenated KERI messages. Consumed
in place as events and attachments are extracted.
dest (str): qb64 identifier prefix of the destination controller,
written to the ``CESR-DESTINATION`` header of every request.
path (str, optional): URL path to post to. Defaults to ``"/"``.
Joined with ``client.requester.path`` using
:func:`urllib.parse.urljoin`.
headers (Hict, optional): Additional headers merged into each
request after the standard CESR headers. Defaults to an
empty :class:`~hio.help.Hict`.

Returns:
int: Number of individual HTTP requests posted.

Raises:
ColdStartError: If the stream begins with a counter triplet
(``txt`` or ``bny`` cold-start indicator) rather than a
message.
ExtractionError: If a message cannot be fully extracted due to
insufficient bytes.
"""
path = path if path is not None else "/"
path = parse.urljoin(client.requester.path, path)
Expand Down Expand Up @@ -212,14 +295,23 @@ def streamCESRRequests(client, ims, dest, path=None, headers=None):


class Clienter(doing.DoDoer):
"""
Clienter is a DoDoer that manages hio HTTP clients using a ClientDoer for each HTTP request.
It executes HTTP requests using a HIO HTTP Client run by a ClientDoer. Once a request has
received a response then the corresponding Doer is removed from this Clienter.
"""DoDoer that manages a pool of hio HTTP clients, one per outbound request.

Each call to :meth:`request` creates a new :class:`~hio.core.http.clienting.Client`
and a corresponding :class:`~hio.core.http.clienting.ClientDoer`, both
tracked internally. A background coroutine (:meth:`clientDo`) periodically
removes clients whose responses have been pending longer than
:attr:`TimeoutClient` seconds.

Attributes:
TimeoutClient (int): Class-level timeout in seconds before a client
with no response is pruned. Default is ``300`` (5 minutes).
clients (list[tuple]): Active client records as
``(client, clientDoer, datetime)`` triples.

Doers:
- clientDo: Periodically checks for stale clients and removes them if they have not received a response
within the specified timeout period.
clientDo: Background generator that periodically scans for and
removes timed-out clients.
"""

TimeoutClient = 300 # seconds to wait for response before removing client, default is 5 minutes
Expand All @@ -238,17 +330,24 @@ def __init__(self):
super(Clienter, self).__init__(doers=doers)

def request(self, method, url, body=None, headers=None):
"""
Perform an HTTP request using a hio http Client and ClientDoer and returns the Client.
"""Issues an HTTP request and registers the client in the managed pool.

Parameters:
method (str): HTTP method to use (e.g., "GET", "POST")
url (str): URL to send the request to
body (str or bytes, optional): Body of the request, defaults to None
headers (dict, optional): Headers to include in the request, defaults to None
Parses ``url``, constructs a :class:`~hio.core.http.clienting.Client`,
sends the request, wraps the client in a
:class:`~hio.core.http.clienting.ClientDoer`, and appends both to the
internal ``clients`` list alongside the current UTC timestamp.

Args:
method (str): HTTP method (e.g., ``"GET"``, ``"POST"``).
url (str): Fully qualified URL including scheme, host, port, path,
and optional query string.
body (str or bytes, optional): Request body. ``str`` values are
encoded to UTF-8 before sending. Defaults to ``None``.
headers (dict, optional): Request headers. Defaults to ``None``.

Returns:
http.clienting.Client: The hio HTTP Client used for the request, or None if an error occurs.
hio.core.http.clienting.Client: The client used to send the request,
or ``None`` if the connection could not be established.
"""
purl = parse.urlparse(url)

Expand Down Expand Up @@ -279,11 +378,16 @@ def request(self, method, url, body=None, headers=None):
return client

def remove(self, client):
"""
Find a client tuple by hio HTTP Client and remove it and its Doer from the Clienter.
"""Removes a client and its associated doer from the managed pool.

Parameters:
client (http.clienting.Client): The hio HTTP Client to remove from the Clienter.
Looks up the first entry in ``self.clients`` whose client object
matches ``client``, removes it from the list, and delegates doer
removal to the parent :class:`~hio.base.doing.DoDoer`. No-ops if
``client`` is not found.

Args:
client (hio.core.http.clienting.Client): The client instance to
remove.
"""
doers = [(c, d, dt) for (c, d, dt) in self.clients if c == client]
if len(doers) == 0:
Expand All @@ -295,15 +399,24 @@ def remove(self, client):
super(Clienter, self).remove([doer])

def clientDo(self, tymth, tock=0.0, **kwa):
""" Periodically prune stale clients

Process existing clients and prune any that have receieved a response longer than timeout

Parameters:
tymth (function): injected function wrapper closure returned by .tymen() of
Tymist instance. Calling tymth() returns associated Tymist .tyme.
tock (float): injected initial tock value

"""Background coroutine that prunes timed-out clients.

Runs continuously, yielding between iterations. On each pass,
collects clients that have received a response and whose elapsed
time since creation exceeds :attr:`TimeoutClient`, then removes
them via :meth:`remove`.

Args:
tymth (callable): Injected closure returned by ``.tymen()`` on
the governing :class:`~hio.base.tyming.Tymist`. Calling
``tymth()`` returns the current tyme.
tock (float): Initial tock value injected by the DoDoer
framework. Controls the yield cadence.
**kwa: Additional keyword arguments (unused).

Yields:
float: ``self.tock`` on each iteration to cede control back to
the hio scheduler.
"""
self.wind(tymth)
self.tock = tock
Expand Down
Loading
Loading