diff --git a/pkgs/cli/src/main.zig b/pkgs/cli/src/main.zig index e243f0ac7..2ad1874b9 100644 --- a/pkgs/cli/src/main.zig +++ b/pkgs/cli/src/main.zig @@ -326,7 +326,7 @@ pub fn main() !void { .node => |leancmd| { var logger = utils_lib.getLogger(console_log_level, utils_lib.FileBehaviourParams{ .fileActiveLevel = log_file_active_level, .filePath = log_filepath, .fileName = log_filename }); - var start_options: node.StartNodeOptions = .{ + var start_options: node.NodeOptions = .{ .node_id = leancmd.node_id, .metrics_enable = leancmd.metrics_enable, .metrics_port = leancmd.metrics_port, @@ -341,7 +341,11 @@ pub fn main() !void { try node.buildStartOptions(allocator, leancmd, &start_options); - try node.startNode(allocator, &start_options); + var lean_node: node.Node = undefined; + + try lean_node.init(allocator, &start_options); + defer lean_node.deinit(); + try lean_node.run(); }, } } diff --git a/pkgs/cli/src/metrics_server.zig b/pkgs/cli/src/metrics_server.zig index 703df44ed..f4daf6888 100644 --- a/pkgs/cli/src/metrics_server.zig +++ b/pkgs/cli/src/metrics_server.zig @@ -5,6 +5,7 @@ const metrics = @import("@zeam/metrics"); pub fn startMetricsServer(allocator: std.mem.Allocator, port: u16) !void { // Create a simple HTTP server context const ctx = try allocator.create(SimpleMetricsServer); + errdefer allocator.destroy(ctx); ctx.* = .{ .allocator = allocator, .port = port, @@ -23,6 +24,8 @@ const SimpleMetricsServer = struct { port: u16, fn run(self: *SimpleMetricsServer) !void { + // `startMetricsServer` creates this, so we need to free it here + defer self.allocator.destroy(self); const address = try std.net.Address.parseIp4("0.0.0.0", self.port); var server = try address.listen(.{}); defer server.deinit(); diff --git a/pkgs/cli/src/node.zig b/pkgs/cli/src/node.zig index e0ae0e773..668725067 100644 --- a/pkgs/cli/src/node.zig +++ b/pkgs/cli/src/node.zig @@ -23,7 +23,7 @@ const NodeCommand = @import("main.zig").NodeCommand; const prefix = "zeam_"; -pub const StartNodeOptions = struct { +pub const NodeOptions = struct { node_id: u32, bootnodes: []const []const u8, validator_indices: []usize, @@ -33,7 +33,7 @@ pub const StartNodeOptions = struct { local_priv_key: []const u8, logger: *Logger, - pub fn deinit(self: *StartNodeOptions, allocator: std.mem.Allocator) void { + pub fn deinit(self: *NodeOptions, allocator: std.mem.Allocator) void { for (self.bootnodes) |b| allocator.free(b); allocator.free(self.bootnodes); allocator.free(self.validator_indices); @@ -41,11 +41,158 @@ pub const StartNodeOptions = struct { } }; +/// A Node that encapsulates the networking, blockchain, and validator functionalities. +/// It manages the event loop, network interface, clock, and beam node. +pub const Node = struct { + loop: xev.Loop, + network: networks.EthLibp2p, + beam_node: BeamNode, + clock: Clock, + enr: ENR, + options: *const NodeOptions, + allocator: std.mem.Allocator, + + const Self = @This(); + + pub fn init(self: *Self, allocator: std.mem.Allocator, options: *const NodeOptions) !void { + self.allocator = allocator; + self.options = options; + + const node_id = options.node_id; + + if (options.metrics_enable) { + try metrics.init(allocator); + try metrics_server.startMetricsServer(allocator, options.metrics_port); + } + + // some base mainnet spec would be loaded to build this up + const chain_spec = + \\{"preset": "mainnet", "name": "beamdev"} + ; + const json_options = json.ParseOptions{ + .ignore_unknown_fields = true, + .allocate = .alloc_if_needed, + }; + var chain_options = (try json.parseFromSlice(ChainOptions, allocator, chain_spec, json_options)).value; + + chain_options.genesis_time = options.genesis_spec.genesis_time; + chain_options.num_validators = options.genesis_spec.num_validators; + const chain_config = try ChainConfig.init(Chain.custom, chain_options); + var anchorState = try sft.genGenesisState(allocator, chain_config.genesis); + errdefer anchorState.deinit(allocator); + + // TODO we seem to be needing one loop because then the events added to loop are not being fired + // in the order to which they have been added even with the an appropriate delay added + // behavior of this further needs to be investigated but for now we will share the same loop + self.loop = try xev.Loop.init(.{}); + + const addresses = try self.constructMultiaddrs(); + self.network = try networks.EthLibp2p.init(allocator, &self.loop, .{ .networkId = 0, .listen_addresses = addresses.listen_addresses, .connect_peers = addresses.connect_peers, .local_private_key = options.local_priv_key }, options.logger); + errdefer self.network.deinit(); + self.clock = try Clock.init(allocator, chain_config.genesis.genesis_time, &self.loop); + errdefer self.clock.deinit(allocator); + + self.beam_node = try BeamNode.init(allocator, .{ + // options + .nodeId = node_id, + .config = chain_config, + .anchorState = anchorState, + .backend = self.network.getNetworkInterface(), + .clock = &self.clock, + .db = .{}, + .validator_ids = options.validator_indices, + .logger = options.logger, + }); + } + + pub fn deinit(self: *Self) void { + self.clock.deinit(self.allocator); + self.beam_node.deinit(); + self.network.deinit(); + self.enr.deinit(); + self.loop.deinit(); + } + + pub fn run(self: *Node) !void { + try self.network.run(); + try self.beam_node.run(); + + const ascii_art = + \\ + \\ ███████╗███████╗ █████╗ ███╗ ███╗ + \\ ╚══███╔╝██╔════╝██╔══██╗████╗ ████║ + \\ ███╔╝ █████╗ ███████║██╔████╔██║ + \\ ███╔╝ ██╔══╝ ██╔══██║██║╚██╔╝██║ + \\ ███████╗███████╗██║ ██║██║ ╚═╝ ██║ + \\ ╚══════╝╚══════╝╚═╝ ╚═╝╚═╝ ╚═╝ + \\ + \\ a blazing fast lean consensus client + ; + + var encoded_txt_buf: [1000]u8 = undefined; + const encoded_txt = try self.enr.encodeToTxt(&encoded_txt_buf); + + const quic_port = try self.enr.getQUIC(); + + // Use logger.info instead of std.debug.print + self.options.logger.info("\n{s}", .{ascii_art}); + self.options.logger.info("════════════════════════════════════════════════════════", .{}); + self.options.logger.info(" 🚀 Zeam Lean Node Started Successfully!", .{}); + self.options.logger.info("════════════════════════════════════════════════════════", .{}); + self.options.logger.info(" Node ID: {d}", .{self.options.node_id}); + self.options.logger.info(" Listening on QUIC port: {?d}", .{quic_port}); + self.options.logger.info(" ENR: {s}", .{encoded_txt}); + self.options.logger.info("────────────────────────────────────────────────────────", .{}); + + try self.clock.run(); + } + + fn constructMultiaddrs(self: *Self) !struct { listen_addresses: []const Multiaddr, connect_peers: []const Multiaddr } { + const self_node_index = self.options.validator_indices[0]; + try ENR.decodeTxtInto(&self.enr, self.options.bootnodes[self_node_index]); + + // Overriding the IP to 0.0.0.0 to listen on all interfaces + try self.enr.kvs.put("ip", "\x00\x00\x00\x00"); + + var node_multiaddrs = try self.enr.multiaddrP2PQUIC(self.allocator); + defer node_multiaddrs.deinit(self.allocator); + // move the ownership to the `EthLibp2p`, will be freed in its deinit + const listen_addresses = try node_multiaddrs.toOwnedSlice(self.allocator); + errdefer { + for (listen_addresses) |addr| addr.deinit(); + self.allocator.free(listen_addresses); + } + var connect_peer_list: std.ArrayListUnmanaged(Multiaddr) = .empty; + defer connect_peer_list.deinit(self.allocator); + + for (self.options.bootnodes, 0..) |n, i| { + if (i != self_node_index) { + var n_enr: ENR = undefined; + try ENR.decodeTxtInto(&n_enr, n); + var peer_multiaddr_list = try n_enr.multiaddrP2PQUIC(self.allocator); + defer peer_multiaddr_list.deinit(self.allocator); + const peer_multiaddrs = try peer_multiaddr_list.toOwnedSlice(self.allocator); + defer self.allocator.free(peer_multiaddrs); + try connect_peer_list.appendSlice(self.allocator, peer_multiaddrs); + } + } + + // move the ownership to the `EthLibp2p`, will be freed in its deinit + const connect_peers = try connect_peer_list.toOwnedSlice(self.allocator); + errdefer { + for (connect_peers) |addr| addr.deinit(); + self.allocator.free(connect_peers); + } + + return .{ .listen_addresses = listen_addresses, .connect_peers = connect_peers }; + } +}; + /// Builds the start options for a node based on the provided command and options. /// It loads the necessary configuration files, parses them, and populates the /// `StartNodeOptions` structure. /// The caller is responsible for freeing the allocated resources in `StartNodeOptions`. -pub fn buildStartOptions(allocator: std.mem.Allocator, node_cmd: NodeCommand, opts: *StartNodeOptions) !void { +pub fn buildStartOptions(allocator: std.mem.Allocator, node_cmd: NodeCommand, opts: *NodeOptions) !void { try utils_lib.checkDIRExists(node_cmd.custom_genesis); const config_filepath = try std.mem.concat(allocator, u8, &[_][]const u8{ node_cmd.custom_genesis, "/config.yaml" }); @@ -68,11 +215,20 @@ pub fn buildStartOptions(allocator: std.mem.Allocator, node_cmd: NodeCommand, op defer parsed_validators.deinit(allocator); const bootnodes = try nodesFromYAML(allocator, parsed_bootnodes); - + errdefer { + for (bootnodes) |b| allocator.free(b); + allocator.free(bootnodes); + } + if (bootnodes.len == 0) { + return error.InvalidNodesConfig; + } const genesis_spec = try configs.genesisConfigFromYAML(parsed_config, node_cmd.override_genesis_time); const validator_indices = try validatorIndicesFromYAML(allocator, opts.node_id, parsed_validators); - + errdefer allocator.free(validator_indices); + if (validator_indices.len == 0) { + return error.InvalidValidatorConfig; + } try utils_lib.checkDIRExists(node_cmd.network_dir); const local_priv_key_filepath = try std.mem.concat(allocator, u8, &[_][]const u8{ node_cmd.network_dir, "/key" }); defer allocator.free(local_priv_key_filepath); @@ -84,102 +240,6 @@ pub fn buildStartOptions(allocator: std.mem.Allocator, node_cmd: NodeCommand, op opts.genesis_spec = genesis_spec; } -/// Starts a node with the given options. -/// This function does not return until the node is stopped. -/// It initializes the metrics server if enabled, sets up the network, -/// and starts the Beam node with the provided configuration. -pub fn startNode(allocator: std.mem.Allocator, options: *const StartNodeOptions) !void { - const node_id = options.node_id; - - if (options.metrics_enable) { - try metrics.init(allocator); - try metrics_server.startMetricsServer(allocator, options.metrics_port); - } - - // some base mainnet spec would be loaded to build this up - const chain_spec = - \\{"preset": "mainnet", "name": "beamdev"} - ; - const json_options = json.ParseOptions{ - .ignore_unknown_fields = true, - .allocate = .alloc_if_needed, - }; - var chain_options = (try json.parseFromSlice(ChainOptions, allocator, chain_spec, json_options)).value; - - chain_options.genesis_time = options.genesis_spec.genesis_time; - chain_options.num_validators = options.genesis_spec.num_validators; - const chain_config = try ChainConfig.init(Chain.custom, chain_options); - const anchorState = try sft.genGenesisState(allocator, chain_config.genesis); - - // TODO we seem to be needing one loop because then the events added to loop are not being fired - // in the order to which they have been added even with the an appropriate delay added - // behavior of this further needs to be investigated but for now we will share the same loop - const loop = try allocator.create(xev.Loop); - loop.* = try xev.Loop.init(.{}); - - const self_node_index = options.validator_indices[0]; - var network = try allocator.create(networks.EthLibp2p); - var node_enr: ENR = undefined; - defer node_enr.deinit(); - try ENR.decodeTxtInto(&node_enr, options.bootnodes[self_node_index]); - - // Overriding the IP to 0.0.0.0 to listen on all interfaces - try node_enr.kvs.put("ip", "\x00\x00\x00\x00"); - - var node_multiaddrs = try node_enr.multiaddrP2PQUIC(allocator); - defer node_multiaddrs.deinit(allocator); - const listen_addresses = try node_multiaddrs.toOwnedSlice(allocator); - // these addresses are converted to a slice in the `run` function of `EthLibp2p` so it can be freed safely after `run` returns - defer { - for (listen_addresses) |addr| addr.deinit(); - allocator.free(listen_addresses); - } - - var connect_peer_list: std.ArrayListUnmanaged(Multiaddr) = .empty; - defer connect_peer_list.deinit(allocator); - - for (options.bootnodes, 0..) |n, i| { - if (i != self_node_index) { - var n_enr: ENR = undefined; - try ENR.decodeTxtInto(&n_enr, n); - var peer_multiaddr_list = try n_enr.multiaddrP2PQUIC(allocator); - defer peer_multiaddr_list.deinit(allocator); - const peer_multiaddrs = try peer_multiaddr_list.toOwnedSlice(allocator); - defer allocator.free(peer_multiaddrs); - try connect_peer_list.appendSlice(allocator, peer_multiaddrs); - } - } - - const connect_peers = try connect_peer_list.toOwnedSlice(allocator); - defer { - for (connect_peers) |addr| addr.deinit(); - allocator.free(connect_peers); - } - - network.* = try networks.EthLibp2p.init(allocator, loop, .{ .networkId = 0, .listen_addresses = listen_addresses, .connect_peers = connect_peers, .local_private_key = options.local_priv_key }, options.logger); - try network.run(); - const backend = network.getNetworkInterface(); - - var clock = try allocator.create(Clock); - clock.* = try Clock.init(allocator, chain_config.genesis.genesis_time, loop); - - var beam_node = try BeamNode.init(allocator, .{ - // options - .nodeId = node_id, - .config = chain_config, - .anchorState = anchorState, - .backend = backend, - .clock = clock, - .db = .{}, - .validator_ids = options.validator_indices, - .logger = options.logger, - }); - - try beam_node.run(); - std.debug.print("Lean node {d} listened on {?d}\n", .{ node_id, try node_enr.getQUIC() }); - try clock.run(); -} - /// Parses the nodes from a YAML configuration. /// Expects a YAML structure like: /// ```yaml @@ -187,7 +247,7 @@ pub fn startNode(allocator: std.mem.Allocator, options: *const StartNodeOptions) /// - enr2... /// ``` /// Returns a set of ENR strings. The caller is responsible for freeing the returned slice. -pub fn nodesFromYAML(allocator: std.mem.Allocator, nodes_config: Yaml) ![]const []const u8 { +fn nodesFromYAML(allocator: std.mem.Allocator, nodes_config: Yaml) ![]const []const u8 { const temp_nodes = try nodes_config.parse(allocator, [][]const u8); defer allocator.free(temp_nodes); @@ -216,7 +276,7 @@ pub fn nodesFromYAML(allocator: std.mem.Allocator, nodes_config: Yaml) ![]const /// ``` /// where `node_{node_id}` is the key for the node's validator indices. /// Returns a set of validator indices. The caller is responsible for freeing the returned slice. -pub fn validatorIndicesFromYAML(allocator: std.mem.Allocator, node_id: u32, validators_config: Yaml) ![]usize { +fn validatorIndicesFromYAML(allocator: std.mem.Allocator, node_id: u32, validators_config: Yaml) ![]usize { var validator_indices: std.ArrayListUnmanaged(usize) = .empty; defer validator_indices.deinit(allocator); diff --git a/pkgs/network/src/ethlibp2p.zig b/pkgs/network/src/ethlibp2p.zig index 8268e8ce2..04ee65862 100644 --- a/pkgs/network/src/ethlibp2p.zig +++ b/pkgs/network/src/ethlibp2p.zig @@ -132,6 +132,16 @@ pub const EthLibp2p = struct { return Self{ .allocator = allocator, .params = params, .gossipHandler = try interface.GenericGossipHandler.init(allocator, loop, params.networkId, logger), .logger = logger }; } + pub fn deinit(self: *Self) void { + for (self.params.listen_addresses) |addr| addr.deinit(); + self.allocator.free(self.params.listen_addresses); + + if (self.params.connect_peers) |peers| { + for (peers) |addr| addr.deinit(); + self.allocator.free(peers); + } + } + pub fn run(self: *Self) !void { const listen_addresses_str = try multiaddrsToString(self.allocator, self.params.listen_addresses); const connect_peers_str = if (self.params.connect_peers) |peers| diff --git a/pkgs/node/src/chain.zig b/pkgs/node/src/chain.zig index e95e04600..330291aee 100644 --- a/pkgs/node/src/chain.zig +++ b/pkgs/node/src/chain.zig @@ -57,6 +57,14 @@ pub const BeamChain = struct { }; } + pub fn deinit(self: *Self) void { + var it = self.states.iterator(); + while (it.next()) |entry| { + entry.value_ptr.deinit(self.allocator); + } + self.states.deinit(); + } + pub fn registerValidatorIds(self: *Self, validator_ids: []usize) void { // right now it's simple assignment but eventually it should be a set // tacking registrations and keeping it alive for 3*2=6 slots diff --git a/pkgs/node/src/clock.zig b/pkgs/node/src/clock.zig index 74625b1be..20a0d38ab 100644 --- a/pkgs/node/src/clock.zig +++ b/pkgs/node/src/clock.zig @@ -44,6 +44,14 @@ pub const Clock = struct { }; } + pub fn deinit(self: *Self, allocator: Allocator) void { + self.timer.deinit(); + for (self.on_interval_cbs.items) |cbWrapper| { + allocator.destroy(cbWrapper); + } + self.on_interval_cbs.deinit(); + } + pub fn tickInterval(self: *Self) void { const time_now_ms: isize = @intCast(std.time.milliTimestamp()); while (self.current_interval_time_ms + constants.SECONDS_PER_INTERVAL_MS < time_now_ms + CLOCK_DISPARITY_MS) { diff --git a/pkgs/node/src/node.zig b/pkgs/node/src/node.zig index 99201ba8b..ec7a59c9d 100644 --- a/pkgs/node/src/node.zig +++ b/pkgs/node/src/node.zig @@ -62,6 +62,10 @@ pub const BeamNode = struct { }; } + pub fn deinit(self: *Self) void { + self.allocator.destroy(self.chain); + } + pub fn onGossip(ptr: *anyopaque, data: *const networks.GossipMessage) anyerror!void { const self: *Self = @ptrCast(@alignCast(ptr)); diff --git a/pkgs/state-transition/src/transition.zig b/pkgs/state-transition/src/transition.zig index 25e0cdf57..5a4ce2954 100644 --- a/pkgs/state-transition/src/transition.zig +++ b/pkgs/state-transition/src/transition.zig @@ -269,10 +269,23 @@ fn process_attestations(allocator: Allocator, state: *types.BeamState, attestati var justifications_roots = std.ArrayList(types.Root).init(allocator); var justifications_validators = std.ArrayList(u8).init(allocator); + + // First, collect all keys var iterator = justifications.iterator(); while (iterator.next()) |kv| { try justifications_roots.append(kv.key_ptr.*); - try justifications_validators.appendSlice(kv.value_ptr.*); + } + + // Sort the roots + std.mem.sortUnstable(types.Root, justifications_roots.items, {}, struct { + fn lessThanFn(_: void, a: types.Root, b: types.Root) bool { + return std.mem.order(u8, &a, &b) == .lt; + } + }.lessThanFn); + + // Now iterate over sorted roots and flatten validators in order + for (justifications_roots.items) |root| { + try justifications_validators.appendSlice(justifications.get(root) orelse unreachable); } allocator.free(state.justifications_roots); diff --git a/pkgs/types/src/lib.zig b/pkgs/types/src/lib.zig index 66fac9f6e..4b264be25 100644 --- a/pkgs/types/src/lib.zig +++ b/pkgs/types/src/lib.zig @@ -12,7 +12,7 @@ pub const ValidatorIndex = u64; pub const Bytes48 = [48]u8; //update signature size to 4000 after ssz is fixed -pub const SIGSIZE = 40; +pub const SIGSIZE = 4000; pub const Bytes4000 = [SIGSIZE]u8; pub const Root = Bytes32; @@ -122,6 +122,13 @@ pub const BeamState = struct { // a flat representation of the justifications map justifications_roots: []Root, justifications_validators: []u8, + + pub fn deinit(self: *BeamState, allocator: Allocator) void { + // historical_block_hashes and justified_slots are slices so need to be freed + // justifications_roots and justifications_validators not freed for now as they are not allocated + allocator.free(self.historical_block_hashes); + allocator.free(self.justified_slots); + } }; // non ssz types, difference is the variable list doesn't need upper boundaries @@ -189,7 +196,7 @@ test "ssz seralize/deserialize signed beam block" { var serialized_signed_block = std.ArrayList(u8).init(std.testing.allocator); defer serialized_signed_block.deinit(); try ssz.serialize(SignedBeamBlock, signed_block, &serialized_signed_block); - std.debug.print("\n\n\nserialized_signed_block ({d})=\n{any}", .{ serialized_signed_block.items.len, serialized_signed_block.items }); + std.debug.print("\n\n\nserialized_signed_block ({d})", .{serialized_signed_block.items.len}); var deserialized_signed_block: SignedBeamBlock = undefined; try ssz.deserialize(SignedBeamBlock, serialized_signed_block.items[0..], &deserialized_signed_block, std.testing.allocator); @@ -323,9 +330,12 @@ test "ssz seralize/deserialize signed stf prover input" { var serialized = std.ArrayList(u8).init(arena_allocator.allocator()); defer serialized.deinit(); try ssz.serialize(BeamSTFProverInput, prover_input, &serialized); - std.debug.print("\n\n\nprove transition ----------- serialized({d})=\n{any}\n", .{ serialized.items.len, serialized.items }); var prover_input_deserialized: BeamSTFProverInput = undefined; try ssz.deserialize(BeamSTFProverInput, serialized.items[0..], &prover_input_deserialized, arena_allocator.allocator()); - std.debug.print("should deserialize to={any}", .{prover_input_deserialized}); + + // TODO create a sszEql fn in ssz to recursively compare two ssz structures + // for now inspect two items + try std.testing.expect(std.mem.eql(u8, &prover_input.block.signature, &prover_input_deserialized.block.signature)); + try std.testing.expect(std.mem.eql(u8, &prover_input.state.latest_block_header.state_root, &prover_input_deserialized.state.latest_block_header.state_root)); }