Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions pkgs/cli/src/main.zig
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ pub fn main() !void {
.node => |leancmd| {
var logger = utils_lib.getLogger(console_log_level, utils_lib.FileBehaviourParams{ .fileActiveLevel = log_file_active_level, .filePath = log_filepath, .fileName = log_filename });

var start_options: node.StartNodeOptions = .{
var start_options: node.NodeOptions = .{
.node_id = leancmd.node_id,
.metrics_enable = leancmd.metrics_enable,
.metrics_port = leancmd.metrics_port,
Expand All @@ -341,7 +341,11 @@ pub fn main() !void {

try node.buildStartOptions(allocator, leancmd, &start_options);

try node.startNode(allocator, &start_options);
var lean_node: node.Node = undefined;

try lean_node.init(allocator, &start_options);
defer lean_node.deinit();
try lean_node.run();
},
}
}
Expand Down
3 changes: 3 additions & 0 deletions pkgs/cli/src/metrics_server.zig
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const metrics = @import("@zeam/metrics");
pub fn startMetricsServer(allocator: std.mem.Allocator, port: u16) !void {
// Create a simple HTTP server context
const ctx = try allocator.create(SimpleMetricsServer);
errdefer allocator.destroy(ctx);
ctx.* = .{
.allocator = allocator,
.port = port,
Expand All @@ -23,6 +24,8 @@ const SimpleMetricsServer = struct {
port: u16,

fn run(self: *SimpleMetricsServer) !void {
// `startMetricsServer` creates this, so we need to free it here
Copy link

Copilot AI Sep 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The comment references startMetricsServer but this function name is not visible in the current diff context. Consider making the comment more explicit about where this allocation occurs or use a more generic description.

Suggested change
// `startMetricsServer` creates this, so we need to free it here
// This struct was heap-allocated and must be freed here to avoid memory leaks

Copilot uses AI. Check for mistakes.
defer self.allocator.destroy(self);
const address = try std.net.Address.parseIp4("0.0.0.0", self.port);
var server = try address.listen(.{});
defer server.deinit();
Expand Down
266 changes: 163 additions & 103 deletions pkgs/cli/src/node.zig
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const NodeCommand = @import("main.zig").NodeCommand;

const prefix = "zeam_";

pub const StartNodeOptions = struct {
pub const NodeOptions = struct {
node_id: u32,
bootnodes: []const []const u8,
validator_indices: []usize,
Expand All @@ -33,19 +33,166 @@ pub const StartNodeOptions = struct {
local_priv_key: []const u8,
logger: *Logger,

pub fn deinit(self: *StartNodeOptions, allocator: std.mem.Allocator) void {
pub fn deinit(self: *NodeOptions, allocator: std.mem.Allocator) void {
for (self.bootnodes) |b| allocator.free(b);
allocator.free(self.bootnodes);
allocator.free(self.validator_indices);
allocator.free(self.local_priv_key);
}
};

/// A Node that encapsulates the networking, blockchain, and validator functionalities.
/// It manages the event loop, network interface, clock, and beam node.
pub const Node = struct {
loop: xev.Loop,
network: networks.EthLibp2p,
beam_node: BeamNode,
clock: Clock,
enr: ENR,
options: *const NodeOptions,
allocator: std.mem.Allocator,

const Self = @This();

pub fn init(self: *Self, allocator: std.mem.Allocator, options: *const NodeOptions) !void {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a weird form of init where one passes the reference to a self which is unitialized

need @gballet 's opinion on this

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, then we need to move a whole lot of inits to this form

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think it also depends on how large the struct is as the link mentions, if the struct isn't large in size then this form is not necessary.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, most of structs in node pkgs are large

self.allocator = allocator;
self.options = options;

const node_id = options.node_id;

if (options.metrics_enable) {
try metrics.init(allocator);
try metrics_server.startMetricsServer(allocator, options.metrics_port);
}

// some base mainnet spec would be loaded to build this up
const chain_spec =
\\{"preset": "mainnet", "name": "beamdev"}
;
const json_options = json.ParseOptions{
.ignore_unknown_fields = true,
.allocate = .alloc_if_needed,
};
var chain_options = (try json.parseFromSlice(ChainOptions, allocator, chain_spec, json_options)).value;

chain_options.genesis_time = options.genesis_spec.genesis_time;
chain_options.num_validators = options.genesis_spec.num_validators;
const chain_config = try ChainConfig.init(Chain.custom, chain_options);
var anchorState = try sft.genGenesisState(allocator, chain_config.genesis);
errdefer anchorState.deinit(allocator);

// TODO we seem to be needing one loop because then the events added to loop are not being fired
// in the order to which they have been added even with the an appropriate delay added
// behavior of this further needs to be investigated but for now we will share the same loop
self.loop = try xev.Loop.init(.{});

const addresses = try self.constructMultiaddrs();
self.network = try networks.EthLibp2p.init(allocator, &self.loop, .{ .networkId = 0, .listen_addresses = addresses.listen_addresses, .connect_peers = addresses.connect_peers, .local_private_key = options.local_priv_key }, options.logger);
errdefer self.network.deinit();
self.clock = try Clock.init(allocator, chain_config.genesis.genesis_time, &self.loop);
errdefer self.clock.deinit(allocator);

self.beam_node = try BeamNode.init(allocator, .{
// options
.nodeId = node_id,
.config = chain_config,
.anchorState = anchorState,
.backend = self.network.getNetworkInterface(),
.clock = &self.clock,
.db = .{},
.validator_ids = options.validator_indices,
.logger = options.logger,
});
}

pub fn deinit(self: *Self) void {
self.clock.deinit(self.allocator);
self.beam_node.deinit();
self.network.deinit();
self.enr.deinit();
self.loop.deinit();
}

pub fn run(self: *Node) !void {
try self.network.run();
try self.beam_node.run();

const ascii_art =
\\
\\ ███████╗███████╗ █████╗ ███╗ ███╗
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool art ❤️

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can probably also ascii convert our logo and add it suitably, @noopur23 create an issue for this

\\ ╚══███╔╝██╔════╝██╔══██╗████╗ ████║
\\ ███╔╝ █████╗ ███████║██╔████╔██║
\\ ███╔╝ ██╔══╝ ██╔══██║██║╚██╔╝██║
\\ ███████╗███████╗██║ ██║██║ ╚═╝ ██║
\\ ╚══════╝╚══════╝╚═╝ ╚═╝╚═╝ ╚═╝
\\
\\ a blazing fast lean consensus client
;

var encoded_txt_buf: [1000]u8 = undefined;
const encoded_txt = try self.enr.encodeToTxt(&encoded_txt_buf);

const quic_port = try self.enr.getQUIC();

// Use logger.info instead of std.debug.print
self.options.logger.info("\n{s}", .{ascii_art});
self.options.logger.info("════════════════════════════════════════════════════════", .{});
self.options.logger.info(" 🚀 Zeam Lean Node Started Successfully!", .{});
self.options.logger.info("════════════════════════════════════════════════════════", .{});
self.options.logger.info(" Node ID: {d}", .{self.options.node_id});
self.options.logger.info(" Listening on QUIC port: {?d}", .{quic_port});
self.options.logger.info(" ENR: {s}", .{encoded_txt});
self.options.logger.info("────────────────────────────────────────────────────────", .{});

try self.clock.run();
}

fn constructMultiaddrs(self: *Self) !struct { listen_addresses: []const Multiaddr, connect_peers: []const Multiaddr } {
const self_node_index = self.options.validator_indices[0];
try ENR.decodeTxtInto(&self.enr, self.options.bootnodes[self_node_index]);

// Overriding the IP to 0.0.0.0 to listen on all interfaces
try self.enr.kvs.put("ip", "\x00\x00\x00\x00");

var node_multiaddrs = try self.enr.multiaddrP2PQUIC(self.allocator);
defer node_multiaddrs.deinit(self.allocator);
// move the ownership to the `EthLibp2p`, will be freed in its deinit
const listen_addresses = try node_multiaddrs.toOwnedSlice(self.allocator);
errdefer {
for (listen_addresses) |addr| addr.deinit();
self.allocator.free(listen_addresses);
}
var connect_peer_list: std.ArrayListUnmanaged(Multiaddr) = .empty;
defer connect_peer_list.deinit(self.allocator);

for (self.options.bootnodes, 0..) |n, i| {
if (i != self_node_index) {
var n_enr: ENR = undefined;
try ENR.decodeTxtInto(&n_enr, n);
var peer_multiaddr_list = try n_enr.multiaddrP2PQUIC(self.allocator);
defer peer_multiaddr_list.deinit(self.allocator);
const peer_multiaddrs = try peer_multiaddr_list.toOwnedSlice(self.allocator);
defer self.allocator.free(peer_multiaddrs);
try connect_peer_list.appendSlice(self.allocator, peer_multiaddrs);
}
}

// move the ownership to the `EthLibp2p`, will be freed in its deinit
const connect_peers = try connect_peer_list.toOwnedSlice(self.allocator);
errdefer {
for (connect_peers) |addr| addr.deinit();
self.allocator.free(connect_peers);
}

return .{ .listen_addresses = listen_addresses, .connect_peers = connect_peers };
}
};

/// Builds the start options for a node based on the provided command and options.
/// It loads the necessary configuration files, parses them, and populates the
/// `StartNodeOptions` structure.
/// The caller is responsible for freeing the allocated resources in `StartNodeOptions`.
pub fn buildStartOptions(allocator: std.mem.Allocator, node_cmd: NodeCommand, opts: *StartNodeOptions) !void {
pub fn buildStartOptions(allocator: std.mem.Allocator, node_cmd: NodeCommand, opts: *NodeOptions) !void {
try utils_lib.checkDIRExists(node_cmd.custom_genesis);

const config_filepath = try std.mem.concat(allocator, u8, &[_][]const u8{ node_cmd.custom_genesis, "/config.yaml" });
Expand All @@ -68,11 +215,20 @@ pub fn buildStartOptions(allocator: std.mem.Allocator, node_cmd: NodeCommand, op
defer parsed_validators.deinit(allocator);

const bootnodes = try nodesFromYAML(allocator, parsed_bootnodes);

errdefer {
for (bootnodes) |b| allocator.free(b);
allocator.free(bootnodes);
}
if (bootnodes.len == 0) {
return error.InvalidNodesConfig;
}
const genesis_spec = try configs.genesisConfigFromYAML(parsed_config, node_cmd.override_genesis_time);

const validator_indices = try validatorIndicesFromYAML(allocator, opts.node_id, parsed_validators);

errdefer allocator.free(validator_indices);
if (validator_indices.len == 0) {
return error.InvalidValidatorConfig;
}
try utils_lib.checkDIRExists(node_cmd.network_dir);
const local_priv_key_filepath = try std.mem.concat(allocator, u8, &[_][]const u8{ node_cmd.network_dir, "/key" });
defer allocator.free(local_priv_key_filepath);
Expand All @@ -84,110 +240,14 @@ pub fn buildStartOptions(allocator: std.mem.Allocator, node_cmd: NodeCommand, op
opts.genesis_spec = genesis_spec;
}

/// Starts a node with the given options.
/// This function does not return until the node is stopped.
/// It initializes the metrics server if enabled, sets up the network,
/// and starts the Beam node with the provided configuration.
pub fn startNode(allocator: std.mem.Allocator, options: *const StartNodeOptions) !void {
const node_id = options.node_id;

if (options.metrics_enable) {
try metrics.init(allocator);
try metrics_server.startMetricsServer(allocator, options.metrics_port);
}

// some base mainnet spec would be loaded to build this up
const chain_spec =
\\{"preset": "mainnet", "name": "beamdev"}
;
const json_options = json.ParseOptions{
.ignore_unknown_fields = true,
.allocate = .alloc_if_needed,
};
var chain_options = (try json.parseFromSlice(ChainOptions, allocator, chain_spec, json_options)).value;

chain_options.genesis_time = options.genesis_spec.genesis_time;
chain_options.num_validators = options.genesis_spec.num_validators;
const chain_config = try ChainConfig.init(Chain.custom, chain_options);
const anchorState = try sft.genGenesisState(allocator, chain_config.genesis);

// TODO we seem to be needing one loop because then the events added to loop are not being fired
// in the order to which they have been added even with the an appropriate delay added
// behavior of this further needs to be investigated but for now we will share the same loop
const loop = try allocator.create(xev.Loop);
loop.* = try xev.Loop.init(.{});

const self_node_index = options.validator_indices[0];
var network = try allocator.create(networks.EthLibp2p);
var node_enr: ENR = undefined;
defer node_enr.deinit();
try ENR.decodeTxtInto(&node_enr, options.bootnodes[self_node_index]);

// Overriding the IP to 0.0.0.0 to listen on all interfaces
try node_enr.kvs.put("ip", "\x00\x00\x00\x00");

var node_multiaddrs = try node_enr.multiaddrP2PQUIC(allocator);
defer node_multiaddrs.deinit(allocator);
const listen_addresses = try node_multiaddrs.toOwnedSlice(allocator);
// these addresses are converted to a slice in the `run` function of `EthLibp2p` so it can be freed safely after `run` returns
defer {
for (listen_addresses) |addr| addr.deinit();
allocator.free(listen_addresses);
}

var connect_peer_list: std.ArrayListUnmanaged(Multiaddr) = .empty;
defer connect_peer_list.deinit(allocator);

for (options.bootnodes, 0..) |n, i| {
if (i != self_node_index) {
var n_enr: ENR = undefined;
try ENR.decodeTxtInto(&n_enr, n);
var peer_multiaddr_list = try n_enr.multiaddrP2PQUIC(allocator);
defer peer_multiaddr_list.deinit(allocator);
const peer_multiaddrs = try peer_multiaddr_list.toOwnedSlice(allocator);
defer allocator.free(peer_multiaddrs);
try connect_peer_list.appendSlice(allocator, peer_multiaddrs);
}
}

const connect_peers = try connect_peer_list.toOwnedSlice(allocator);
defer {
for (connect_peers) |addr| addr.deinit();
allocator.free(connect_peers);
}

network.* = try networks.EthLibp2p.init(allocator, loop, .{ .networkId = 0, .listen_addresses = listen_addresses, .connect_peers = connect_peers, .local_private_key = options.local_priv_key }, options.logger);
try network.run();
const backend = network.getNetworkInterface();

var clock = try allocator.create(Clock);
clock.* = try Clock.init(allocator, chain_config.genesis.genesis_time, loop);

var beam_node = try BeamNode.init(allocator, .{
// options
.nodeId = node_id,
.config = chain_config,
.anchorState = anchorState,
.backend = backend,
.clock = clock,
.db = .{},
.validator_ids = options.validator_indices,
.logger = options.logger,
});

try beam_node.run();
std.debug.print("Lean node {d} listened on {?d}\n", .{ node_id, try node_enr.getQUIC() });
try clock.run();
}

/// Parses the nodes from a YAML configuration.
/// Expects a YAML structure like:
/// ```yaml
/// - enr1...
/// - enr2...
/// ```
/// Returns a set of ENR strings. The caller is responsible for freeing the returned slice.
pub fn nodesFromYAML(allocator: std.mem.Allocator, nodes_config: Yaml) ![]const []const u8 {
fn nodesFromYAML(allocator: std.mem.Allocator, nodes_config: Yaml) ![]const []const u8 {
const temp_nodes = try nodes_config.parse(allocator, [][]const u8);
defer allocator.free(temp_nodes);

Expand Down Expand Up @@ -216,7 +276,7 @@ pub fn nodesFromYAML(allocator: std.mem.Allocator, nodes_config: Yaml) ![]const
/// ```
/// where `node_{node_id}` is the key for the node's validator indices.
/// Returns a set of validator indices. The caller is responsible for freeing the returned slice.
pub fn validatorIndicesFromYAML(allocator: std.mem.Allocator, node_id: u32, validators_config: Yaml) ![]usize {
fn validatorIndicesFromYAML(allocator: std.mem.Allocator, node_id: u32, validators_config: Yaml) ![]usize {
var validator_indices: std.ArrayListUnmanaged(usize) = .empty;
defer validator_indices.deinit(allocator);

Expand Down
10 changes: 10 additions & 0 deletions pkgs/network/src/ethlibp2p.zig
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,16 @@ pub const EthLibp2p = struct {
return Self{ .allocator = allocator, .params = params, .gossipHandler = try interface.GenericGossipHandler.init(allocator, loop, params.networkId, logger), .logger = logger };
}

pub fn deinit(self: *Self) void {
for (self.params.listen_addresses) |addr| addr.deinit();
self.allocator.free(self.params.listen_addresses);

if (self.params.connect_peers) |peers| {
for (peers) |addr| addr.deinit();
self.allocator.free(peers);
}
}

pub fn run(self: *Self) !void {
const listen_addresses_str = try multiaddrsToString(self.allocator, self.params.listen_addresses);
const connect_peers_str = if (self.params.connect_peers) |peers|
Expand Down
8 changes: 8 additions & 0 deletions pkgs/node/src/chain.zig
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ pub const BeamChain = struct {
};
}

pub fn deinit(self: *Self) void {
var it = self.states.iterator();
while (it.next()) |entry| {
entry.value_ptr.deinit(self.allocator);
}
self.states.deinit();
}

pub fn registerValidatorIds(self: *Self, validator_ids: []usize) void {
// right now it's simple assignment but eventually it should be a set
// tacking registrations and keeping it alive for 3*2=6 slots
Expand Down
Loading
Loading