Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions framework/proto/flwr/proto/run.proto
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ message Run {
uint64 bytes_recv = 14;
double clientapp_runtime = 15;
string run_type = 16;
optional uint64 primary_task_id = 17;
}

message RunStatus {
Expand Down
5 changes: 5 additions & 0 deletions framework/py/flwr/common/serde.py
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,8 @@ def run_to_proto(run: typing.Run) -> ProtoRun:
clientapp_runtime=run.clientapp_runtime,
run_type=run.run_type,
)
if run.primary_task_id is not None:
proto.primary_task_id = run.primary_task_id
return proto


Expand All @@ -657,6 +659,9 @@ def run_from_proto(run_proto: ProtoRun) -> typing.Run:
status=run_status_from_proto(run_proto.status),
flwr_aid=run_proto.flwr_aid,
federation=run_proto.federation,
primary_task_id=(
run_proto.primary_task_id if run_proto.HasField("primary_task_id") else None
),
bytes_sent=run_proto.bytes_sent,
bytes_recv=run_proto.bytes_recv,
clientapp_runtime=run_proto.clientapp_runtime,
Expand Down
1 change: 1 addition & 0 deletions framework/py/flwr/common/serde_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,7 @@ def test_run_serialization_deserialization() -> None:
status=typing.RunStatus(status="running", sub_status="", details="OK"),
flwr_aid="user123",
federation="mock-fed",
primary_task_id=42,
bytes_sent=2048,
bytes_recv=1024,
clientapp_runtime=3.14,
Expand Down
2 changes: 2 additions & 0 deletions framework/py/flwr/common/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ class Run: # pylint: disable=too-many-instance-attributes
status: RunStatus
flwr_aid: str
federation: str
primary_task_id: int | None
bytes_sent: int
bytes_recv: int
clientapp_runtime: float
Expand All @@ -258,6 +259,7 @@ def create_empty(cls, run_id: int) -> "Run":
status=RunStatus(status="", sub_status="", details=""),
flwr_aid="",
federation="",
primary_task_id=None,
bytes_sent=0,
bytes_recv=0,
clientapp_runtime=0.0,
Expand Down
36 changes: 18 additions & 18 deletions framework/py/flwr/proto/run_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions framework/py/flwr/proto/run_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class Run(google.protobuf.message.Message):
BYTES_RECV_FIELD_NUMBER: builtins.int
CLIENTAPP_RUNTIME_FIELD_NUMBER: builtins.int
RUN_TYPE_FIELD_NUMBER: builtins.int
PRIMARY_TASK_ID_FIELD_NUMBER: builtins.int
run_id: builtins.int
fab_id: builtins.str
fab_version: builtins.str
Expand All @@ -81,6 +82,7 @@ class Run(google.protobuf.message.Message):
bytes_recv: builtins.int
clientapp_runtime: builtins.float
run_type: builtins.str
primary_task_id: builtins.int
@property
def override_config(self) -> google.protobuf.internal.containers.MessageMap[builtins.str, flwr.proto.transport_pb2.Scalar]: ...
@property
Expand All @@ -104,9 +106,11 @@ class Run(google.protobuf.message.Message):
bytes_recv: builtins.int = ...,
clientapp_runtime: builtins.float = ...,
run_type: builtins.str = ...,
primary_task_id: builtins.int | None = ...,
) -> None: ...
def HasField(self, field_name: typing.Literal["status", b"status"]) -> builtins.bool: ...
def ClearField(self, field_name: typing.Literal["bytes_recv", b"bytes_recv", "bytes_sent", b"bytes_sent", "clientapp_runtime", b"clientapp_runtime", "fab_hash", b"fab_hash", "fab_id", b"fab_id", "fab_version", b"fab_version", "federation", b"federation", "finished_at", b"finished_at", "flwr_aid", b"flwr_aid", "override_config", b"override_config", "pending_at", b"pending_at", "run_id", b"run_id", "run_type", b"run_type", "running_at", b"running_at", "starting_at", b"starting_at", "status", b"status"]) -> None: ...
def HasField(self, field_name: typing.Literal["_primary_task_id", b"_primary_task_id", "primary_task_id", b"primary_task_id", "status", b"status"]) -> builtins.bool: ...
def ClearField(self, field_name: typing.Literal["_primary_task_id", b"_primary_task_id", "bytes_recv", b"bytes_recv", "bytes_sent", b"bytes_sent", "clientapp_runtime", b"clientapp_runtime", "fab_hash", b"fab_hash", "fab_id", b"fab_id", "fab_version", b"fab_version", "federation", b"federation", "finished_at", b"finished_at", "flwr_aid", b"flwr_aid", "override_config", b"override_config", "pending_at", b"pending_at", "primary_task_id", b"primary_task_id", "run_id", b"run_id", "run_type", b"run_type", "running_at", b"running_at", "starting_at", b"starting_at", "status", b"status"]) -> None: ...
def WhichOneof(self, oneof_group: typing.Literal["_primary_task_id", b"_primary_task_id"]) -> typing.Literal["primary_task_id"] | None: ...

global___Run = Run

Expand Down
1 change: 1 addition & 0 deletions framework/py/flwr/server/grid/inmemory_grid_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ def setUp(self) -> None:
status=RunStatus(status=Status.PENDING, sub_status="", details=""),
flwr_aid="user123",
federation="mock-fed",
primary_task_id=None,
bytes_sent=0,
bytes_recv=0,
clientapp_runtime=0.0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ def register_messages_into_state(
),
flwr_aid="user123",
federation="mock-fed",
primary_task_id=None,
bytes_sent=0,
bytes_recv=0,
clientapp_runtime=0.0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,7 @@ def create_run(
),
flwr_aid=flwr_aid if flwr_aid else "",
federation=federation,
primary_task_id=None,
bytes_sent=0,
bytes_recv=0,
clientapp_runtime=0.0,
Expand Down Expand Up @@ -755,12 +756,12 @@ def acknowledge_node_heartbeat(
return True
return False

def _on_tokens_expired(self, expired_records: list[tuple[int, float]]) -> None:
def _on_tokens_expired(self, expired_records: list[tuple[int, int]]) -> None:
"""Transition runs with expired tokens to failed status.

Parameters
----------
expired_records : list[tuple[int, float]]
expired_records : list[tuple[int, int]]
List of tuples containing (run_id, active_until timestamp)
for expired tokens.
"""
Expand Down
21 changes: 14 additions & 7 deletions framework/py/flwr/server/superlink/linkstate/sql_linkstate.py
Original file line number Diff line number Diff line change
Expand Up @@ -815,14 +815,15 @@ def create_run( # pylint: disable=R0913, R0914, R0917
query = """
INSERT INTO run
(run_id, fab_id, fab_version, fab_hash, override_config, federation,
federation_config, run_type, pending_at, starting_at, running_at,
finished_at, usage_reported_at, sub_status, details, flwr_aid,
bytes_sent, bytes_recv, clientapp_runtime)
primary_task_id, federation_config, run_type, pending_at,
starting_at, running_at, finished_at, usage_reported_at,
sub_status, details, flwr_aid, bytes_sent, bytes_recv,
clientapp_runtime)
VALUES (:run_id, :fab_id, :fab_version, :fab_hash, :override_config,
:federation, :federation_config, :run_type, :pending_at,
:starting_at, :running_at, :finished_at, :usage_reported_at,
:sub_status, :details, :flwr_aid, :bytes_sent, :bytes_recv,
:clientapp_runtime)
:federation, :primary_task_id, :federation_config, :run_type,
:pending_at, :starting_at, :running_at, :finished_at,
:usage_reported_at, :sub_status, :details, :flwr_aid,
:bytes_sent, :bytes_recv, :clientapp_runtime)
"""
override_config_json = json.dumps(override_config)
params = {
Expand All @@ -832,6 +833,7 @@ def create_run( # pylint: disable=R0913, R0914, R0917
"fab_hash": fab_hash or "",
"override_config": override_config_json,
"federation": federation,
"primary_task_id": None,
"federation_config": fed_config_json,
"run_type": run_type,
"pending_at": now().isoformat(),
Expand Down Expand Up @@ -954,6 +956,11 @@ def get_run_info( # pylint: disable=too-many-arguments, too-many-branches
),
flwr_aid=row["flwr_aid"],
federation=row["federation"],
primary_task_id=(
int64_to_uint64(row["primary_task_id"])
if row["primary_task_id"] is not None
else None
),
bytes_sent=row["bytes_sent"],
bytes_recv=row["bytes_recv"],
clientapp_runtime=row["clientapp_runtime"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
)
from flwr.server.superlink.linkstate import LinkStateFactory
from flwr.supercore.interceptors import (
create_serverappio_runtime_version_server_interceptor,
create_serverappio_superexec_auth_server_interceptor,
create_serverappio_token_auth_server_interceptor,
)
Expand Down Expand Up @@ -69,6 +70,7 @@ def run_serverappio_api_grpc( # pylint: disable=R0913,R0917
master_secret=superexec_auth_secret,
)
)
interceptors.append(create_serverappio_runtime_version_server_interceptor())
serverappio_add_servicer_to_server_fn = add_ServerAppIoServicer_to_server
serverappio_grpc_server = generic_create_grpc_server(
servicer_and_add_fn=(
Expand Down
10 changes: 8 additions & 2 deletions framework/py/flwr/simulation/simulationio_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@
from flwr.common.logger import log
from flwr.common.retry_invoker import make_simple_grpc_retry_invoker, wrap_stub
from flwr.proto.serverappio_pb2_grpc import ServerAppIoStub # pylint: disable=E0611
from flwr.supercore.interceptors import AppIoTokenClientInterceptor
from flwr.supercore.interceptors import (
AppIoTokenClientInterceptor,
RuntimeVersionClientInterceptor,
)


class SimulationIoConnection:
Expand Down Expand Up @@ -85,7 +88,10 @@ def _connect(self) -> None:
server_address=self._addr,
insecure=self._insecure,
root_certificates=self._cert,
interceptors=[AppIoTokenClientInterceptor(token=self._token)],
interceptors=[
RuntimeVersionClientInterceptor(component_name="flwr-simulation"),
AppIoTokenClientInterceptor(token=self._token),
],
)
self._channel.subscribe(on_channel_state_change)
self._grpc_stub = ServerAppIoStub(self._channel)
Expand Down
14 changes: 9 additions & 5 deletions framework/py/flwr/simulation/simulationio_connection_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
import unittest
from unittest.mock import Mock, patch

from flwr.supercore.interceptors import AppIoTokenClientInterceptor
from flwr.supercore.interceptors import (
AppIoTokenClientInterceptor,
RuntimeVersionClientInterceptor,
)

from .simulationio_connection import SimulationIoConnection

Expand All @@ -29,13 +32,13 @@ class TestSimulationIoConnection(unittest.TestCase):
@patch("flwr.simulation.simulationio_connection.wrap_stub")
@patch("flwr.simulation.simulationio_connection.ServerAppIoStub")
@patch("flwr.simulation.simulationio_connection.create_channel")
def test_connect_adds_client_interceptor(
def test_connect_adds_client_interceptors(
self,
mock_create_channel: Mock,
_mock_serverappio_stub: Mock,
_mock_wrap_stub: Mock,
) -> None:
"""`_connect` should pass the token interceptor to create_channel."""
"""`_connect` should pass version and token interceptors to create_channel."""
mock_create_channel.return_value = Mock()
conn = SimulationIoConnection(token="test-token")

Expand All @@ -45,8 +48,9 @@ def test_connect_adds_client_interceptor(
interceptors = kwargs["interceptors"]
self.assertIsNotNone(interceptors)
assert interceptors is not None
self.assertEqual(len(interceptors), 1)
self.assertIsInstance(interceptors[0], AppIoTokenClientInterceptor)
self.assertEqual(len(interceptors), 2)
self.assertIsInstance(interceptors[0], RuntimeVersionClientInterceptor)
self.assertIsInstance(interceptors[1], AppIoTokenClientInterceptor)

def test_init_requires_token(self) -> None:
"""`SimulationIoConnection` should require token values."""
Expand Down
5 changes: 5 additions & 0 deletions framework/py/flwr/supercore/constant.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@
MIN_TIMESTAMP_DIFF_SECONDS = -SYSTEM_TIME_TOLERANCE
MAX_TIMESTAMP_DIFF_SECONDS = TIMESTAMP_TOLERANCE + SYSTEM_TIME_TOLERANCE

# Constants for Flower runtime version metadata
FLWR_PACKAGE_NAME_METADATA_KEY = "flwr-package-name"
FLWR_PACKAGE_VERSION_METADATA_KEY = "flwr-package-version"
FLWR_COMPONENT_NAME_METADATA_KEY = "flwr-component-name"
VERSION_INCOMPATIBILITY_MESSAGE_METADATA_KEY = "flwr-version-incompatibility-message"

# System message type
SYSTEM_MESSAGE_TYPE = "system"
Expand Down
Loading