From a0bc2224394afb541612de62050a3772e721848d Mon Sep 17 00:00:00 2001 From: Peter M Date: Tue, 25 Feb 2025 17:16:16 +0100 Subject: [PATCH] Built-in esptool flash (via Pythonx) Optional use built-in esptool install via Pythonx. Signed-off-by: Peter M --- lib/esptool_helper.ex | 241 +++++++++++++++++++++++++++++ lib/mix/tasks/esp32.erase_flash.ex | 27 ++++ lib/mix/tasks/esp32.info.ex | 106 +++++++++++++ lib/mix/tasks/esp32.install.ex | 171 ++++++++++++++++++++ lib/mix/tasks/esp32_flash.ex | 17 +- mix.exs | 6 +- 6 files changed, 563 insertions(+), 5 deletions(-) create mode 100644 lib/esptool_helper.ex create mode 100644 lib/mix/tasks/esp32.erase_flash.ex create mode 100644 lib/mix/tasks/esp32.info.ex create mode 100644 lib/mix/tasks/esp32.install.ex diff --git a/lib/esptool_helper.ex b/lib/esptool_helper.ex new file mode 100644 index 0000000..e6a042d --- /dev/null +++ b/lib/esptool_helper.ex @@ -0,0 +1,241 @@ +defmodule ExAtomVM.EsptoolHelper do + @moduledoc """ + Module for setting up and using esptool through Pythonx. + """ + + @doc """ + Initializes Python environment with project configuration. + We use locked main branch esptool version, pending a stable 5.x release, + as we need the read to memory (instead of only to file) features. + """ + def setup do + case Code.ensure_loaded(Pythonx) do + {:module, Pythonx} -> + Application.ensure_all_started(:pythonx) + + Pythonx.uv_init(""" + [project] + name = "project" + version = "0.0.0" + requires-python = "==3.13.*" + dependencies = [ + "esptool @ git+https://github.com/espressif/esptool.git@6f0d779" + ] + """) + + _ -> + {:error, :pythonx_not_available, + "The :pythonx dependency is not available. Please add it to your mix.exs dependencies.\n{:pythonx, \"~> 0.4.0\"}"} + end + end + + def flash_pythonx(tool_args) do + # https://github.com/espressif/esptool/blob/master/docs/en/esptool/scripting.rst + + tool_args = + if not Enum.member?(tool_args, "--port") do + selected_device = select_device() + + ["--port", selected_device["port"]] ++ tool_args + else + tool_args + end + + {_result, globals} = + try do + Pythonx.eval( + """ + import esptool + import sys + + command = [arg.decode('utf-8') for arg in tool_args] + + def flash_esp(): + esptool.main(command) + + if __name__ == "__main__": + try: + result = flash_esp() + result = True + except SystemExit as e: + exit_code = int(str(e)) + result = exit_code == 0 + except Exception as e: + print(f"Warning: {e}") + result = True + + """, + %{"tool_args" => tool_args} + ) + rescue + e in Pythonx.Error -> + IO.inspect("Pythonx error occurred: #{inspect(e)}") + exit({:shutdown, 1}) + end + + Pythonx.decode(globals["result"]) + end + + @doc """ + Erases flash of an ESP32 device. + --after "no-reset" is needed for keeping USB-OTG devices like esp32-S2 in a good state. + """ + def erase_flash(tool_args \\ ["--chip", "auto", "--after", "no-reset"]) do + tool_args = + if not Enum.member?(tool_args, "--port") do + selected_device = select_device() + + confirmation = + IO.gets( + "\nAre you sure you want to erase the flash of\n#{selected_device["chip_family_name"]} - Port: #{selected_device["port"]} MAC: #{selected_device["mac_address"]} ? [N/y]: " + ) + + case String.trim(confirmation) do + input when input in ["Y", "y"] -> + IO.puts("Erasing..") + + _ -> + IO.puts("Flash erase cancelled.") + exit({:shutdown, 0}) + end + + ["--port", selected_device["port"]] ++ tool_args ++ ["erase-flash"] + else + tool_args ++ ["erase-flash"] + end + + {_result, globals} = + try do + Pythonx.eval( + """ + import esptool + + command = [arg.decode('utf-8') for arg in tool_args] + + def flash_esp(): + esptool.main(command) + + if __name__ == "__main__": + try: + result = flash_esp() + result = True + except SystemExit as e: + exit_code = int(str(e)) + result = exit_code == 0 + except Exception as e: + print(f"Warning: {e}") + result = False + """, + %{"tool_args" => tool_args} + ) + rescue + e in Pythonx.Error -> + IO.inspect("Pythonx error occurred: #{inspect(e)}") + exit({:shutdown, 1}) + end + + Pythonx.decode(globals["result"]) + end + + def connected_devices do + {_result, globals} = + try do + Pythonx.eval( + """ + from esptool.cmds import (detect_chip, read_flash, attach_flash) + import serial.tools.list_ports as list_ports + import re + + ports = [] + for port in list_ports.comports(): + if port.vid is None: + continue + ports.append(port.device) + + result = [] + for port in ports: + try: + with detect_chip(port) as esp: + description = esp.get_chip_description() + features = esp.get_chip_features() + mac_addr = ':'.join(['%02X' % b for b in esp.read_mac()]) + + # chips like esp32-s2 can have more specific names, so we call this chip family + # https://github.com/espressif/esptool/blob/807d02b0c5eb07ba46f871a492c84395fb9f37be/esptool/targets/esp32s2.py#L167 + chip_family_name = esp.CHIP_NAME + + # read 128 bytes at 0x10030 + attach_flash(esp) + app_header = read_flash(esp, 0x10030, 128, None) + app_header_strings = [s for s in re.split('\\x00', app_header.decode('utf-8', errors='replace')) if s] + + usb_mode = esp.get_usb_mode() + + # this is needed to keep USB-OTG boards like esp32-S2 in a good state + esp.run_stub() + + result.append({"port": port, "chip_family_name": chip_family_name, + "features": features, "build_info": app_header_strings, + "mac_address": mac_addr, "usb_mode": usb_mode + }) + except Exception as e: + print(f"Error: {e}") + result = [] + """, + %{} + ) + rescue + e in Pythonx.Error -> + {:error, "Pythonx error occurred: #{inspect(e)}"} + end + + Pythonx.decode(globals["result"]) + |> Enum.map(fn device -> + Map.put(device, "atomvm_installed", Enum.member?(device["build_info"], "atomvm-esp32")) + end) + end + + def select_device do + devices = connected_devices() + + selected_device = + case length(devices) do + 0 -> + IO.puts( + "Found no esp32 devices..\nYou may have to hold BOOT button down while plugging in the device" + ) + + exit({:shutdown, 1}) + + 1 -> + hd(devices) + + _ -> + IO.puts("\nMultiple ESP32 devices found:") + + devices + |> Enum.with_index(1) + |> Enum.each(fn {device, index} -> + IO.puts( + "#{index}. #{device["chip_family_name"]} - Port: #{device["port"]} MAC: #{device["mac_address"]}" + ) + end) + + selected = + IO.gets("\nSelect device (1-#{length(devices)}): ") + |> String.trim() + |> Integer.parse() + + case selected do + {num, _} when num > 0 and num <= length(devices) -> + Enum.at(devices, num - 1) + + _ -> + IO.puts("Invalid selection.") + exit({:shutdown, 1}) + end + end + + selected_device + end +end diff --git a/lib/mix/tasks/esp32.erase_flash.ex b/lib/mix/tasks/esp32.erase_flash.ex new file mode 100644 index 0000000..f3fa515 --- /dev/null +++ b/lib/mix/tasks/esp32.erase_flash.ex @@ -0,0 +1,27 @@ +defmodule Mix.Tasks.Atomvm.Esp32.EraseFlash do + @moduledoc """ + Mix task to get erase the flash of an connected ESP32 devices. + """ + use Mix.Task + + @shortdoc "Erase flash of ESP32" + + @impl Mix.Task + def run(_args) do + with :ok <- ExAtomVM.EsptoolHelper.setup(), + result <- ExAtomVM.EsptoolHelper.erase_flash() do + case result do + true -> exit({:shutdown, 0}) + false -> exit({:shutdown, 1}) + end + else + {:error, :pythonx_not_available, message} -> + IO.puts("\nError: #{message}") + exit({:shutdown, 1}) + + {:error, reason} -> + IO.puts("Error: #{reason}") + exit({:shutdown, 1}) + end + end +end diff --git a/lib/mix/tasks/esp32.info.ex b/lib/mix/tasks/esp32.info.ex new file mode 100644 index 0000000..21bd6af --- /dev/null +++ b/lib/mix/tasks/esp32.info.ex @@ -0,0 +1,106 @@ +defmodule Mix.Tasks.Atomvm.Esp32.Info do + @moduledoc """ + Mix task to get information about connected ESP32 devices. + """ + use Mix.Task + + @shortdoc "Get information about connected ESP32 devices" + + @impl Mix.Task + def run(_args) do + with :ok <- ExAtomVM.EsptoolHelper.setup(), + devices <- ExAtomVM.EsptoolHelper.connected_devices() do + case length(devices) do + 0 -> + IO.puts( + "Found no esp32 devices..\nYou may have to hold BOOT button down while plugging in the device" + ) + + count -> + IO.puts("Found #{count} connected esp32:") + end + + if length(devices) > 1 do + Enum.each(devices, fn device -> + IO.puts( + "#{format_atomvm_status(device["atomvm_installed"])}#{device["chip_family_name"]} - Port: #{device["port"]}" + ) + end) + end + + Enum.each(devices, fn device -> + IO.puts("\n━━━━━━━━━━━━━━━━━━━━━━") + + IO.puts( + "#{format_atomvm_status(device["atomvm_installed"])}#{device["chip_family_name"]} - Port: #{device["port"]}" + ) + + IO.puts("USB_MODE: #{device["usb_mode"]}") + IO.puts("MAC: #{device["mac_address"]}") + IO.puts("AtomVM installed: #{device["atomvm_installed"]}") + + IO.puts("\nBuild Information:") + + Enum.each(format_build_info(device["build_info"]), fn build_info -> + IO.puts(build_info) + end) + + IO.puts("\nFeatures:") + + Enum.each(device["features"], fn feature -> + IO.puts(" · #{feature}") + end) + end) + + IO.puts("\n") + else + {:error, :pythonx_not_available, message} -> + IO.puts("\nError: #{message}") + exit({:shutdown, 1}) + + {:error, reason} -> + IO.puts("\nError: Failed to get ESP32 device information") + IO.puts("Reason: #{reason}") + exit({:shutdown, 1}) + end + end + + defp format_build_info(build_info) when is_list(build_info) and length(build_info) == 5 do + [version, target, time, date, sdk] = + build_info + |> Enum.map(&sanitize_string/1) + + [ + " Version: #{version}", + " Target: #{target}", + " Built: #{time} #{date}", + " SDK: #{sdk}" + ] + end + + defp format_build_info(build_info) when is_list(build_info) do + build_info + |> Enum.map(&sanitize_string/1) + |> Enum.with_index(1) + |> Enum.map(fn {info, index} -> " Info #{index}: #{info}" end) + end + + defp format_build_info(_) do + [" Build info not available or corrupted"] + end + + defp sanitize_string(str) when is_binary(str) do + str + # Remove non-printable characters while preserving spaces + |> String.replace(~r/[^\x20-\x7E\s]/u, "") + |> case do + "" -> "" + sanitized -> sanitized + end + end + + defp sanitize_string(_), do: "" + + defp format_atomvm_status(true), do: "✅" + defp format_atomvm_status(_), do: "❌" +end diff --git a/lib/mix/tasks/esp32.install.ex b/lib/mix/tasks/esp32.install.ex new file mode 100644 index 0000000..5974f2f --- /dev/null +++ b/lib/mix/tasks/esp32.install.ex @@ -0,0 +1,171 @@ +defmodule Mix.Tasks.Atomvm.Esp32.Install do + @moduledoc """ + Mix task for erasing flash and installing the latest AtomVM release to connected device. + + Takes an optional --baud option to set the baud rate of the flashing. + Defaults to 921600, use 115200 for slower devices. + + After install, your project can be flashed with: + mix atomvm.esp32.flash + """ + use Mix.Task + + @shortdoc "Install latest AtomVM release on ESP32" + + alias ExAtomVM.EsptoolHelper + + @impl Mix.Task + def run(args) do + {opts, _} = OptionParser.parse!(args, strict: [baud: :string]) + baud = Keyword.get(opts, :baud, "921600") + + with :ok <- check_req_dependency(), + :ok <- EsptoolHelper.setup(), + selected_device <- EsptoolHelper.select_device(), + release_file <- get_latest_release(selected_device["chip_family_name"]), + :ok <- confirm_erase_and_flash(selected_device, release_file), + true <- + EsptoolHelper.erase_flash([ + "--port", + selected_device["port"], + "--chip", + "auto", + "--after", + "no-reset" + ]), + :timer.sleep(3000), + true <- flash_release(selected_device, release_file, baud) do + IO.puts(""" + + Successfully installed AtomVM on #{selected_device["chip_family_name"]} Port: #{selected_device["port"]} MAC: #{selected_device["mac_address"]} + + Your project can now be flashed with: + mix atomvm.esp32.flash + + """) + else + {:error, :req_not_available, message} -> + IO.puts("\nError: #{message}") + exit({:shutdown, 1}) + + {:error, :pythonx_not_available, message} -> + IO.puts("\nError: #{message}") + exit({:shutdown, 1}) + + {:error, reason} -> + IO.puts("Error: #{reason}") + exit({:shutdown, 1}) + end + end + + defp confirm_erase_and_flash(selected_device, release_file) do + confirmation = + IO.gets(""" + + Are you sure you want to erase the flash of + #{selected_device["chip_family_name"]} - Port: #{selected_device["port"]} MAC: #{selected_device["mac_address"]} + And install AtomVM: #{release_file} + ? [N/y]: + + """) + + case String.trim(confirmation) do + input when input in ["Y", "y"] -> + IO.puts("Erasing and flashing") + :ok + + _ -> + IO.puts("Install cancelled.") + exit({:shutdown, 0}) + end + end + + defp check_req_dependency do + case Code.ensure_loaded(Req) do + {:module, _} -> + :ok + + {:error, _} -> + {:error, :req_not_available, + "\nError: The 'req' package is not available. Please ensure it is listed in your dependencies.\n{:req, \"~> 0.5.0\", runtime: false}"} + end + end + + defp get_latest_release(chip_family) do + cache_dir = + if Code.ensure_loaded?(Mix.Project) do + Path.join(Path.dirname(Mix.Project.build_path()), "atomvm_binaries") + else + Path.expand("_build/atomvm_binaries") + end + + File.mkdir_p!(cache_dir) + + {:ok, _} = Application.ensure_all_started(:req) + + with {:ok, response} <- Req.get("https://api.github.com/repos/atomvm/atomvm/releases/latest"), + %{status: 200, body: body} <- response, + assets <- body["assets"] || [], + asset when not is_nil(asset) <- Enum.find(assets, &matches_chip_family?(&1, chip_family)) do + cached_file = Path.join(cache_dir, asset["name"]) + + if !File.exists?(cached_file) do + IO.puts("\nDownloading #{asset["name"]}, may take a while...") + {:ok, _response} = Req.get(asset["browser_download_url"], into: File.stream!(cached_file)) + cached_file + else + IO.puts("\nUsing cached #{asset["name"]}") + cached_file + end + else + {:error, reason} -> + raise "Failed to fetch release: #{inspect(reason)}" + + %{status: status} -> + raise "GitHub API returned status #{status}" + + nil -> + raise "No matching release found for #{chip_family}" + end + end + + defp matches_chip_family?(%{"name" => name}, chip_family) do + name = String.downcase(name) + + chip_family = + String.downcase(chip_family) + |> String.replace("-", "") + |> String.replace(" ", "") + + String.contains?(name, [chip_family]) && String.contains?(name, ["elixir"]) && + String.ends_with?(name, ".img") + end + + defp flash_release(device, release_file, baud) do + flash_offset = + %{ + "ESP32" => "0x1000", + "ESP32-S2" => "0x1000", + "ESP32-S3" => "0x0", + "ESP32-C2" => "0x0", + "ESP32-C3" => "0x0", + "ESP32-C6" => "0x0", + "ESP32-H2" => "0x0", + "ESP32-P4" => "0x2000" + }[device["chip_family_name"]] || "0x0" + + tool_args = [ + "--chip", + "auto", + "--port", + device["port"], + "--baud", + baud, + "write-flash", + flash_offset, + release_file + ] + + EsptoolHelper.flash_pythonx(tool_args) + end +end diff --git a/lib/mix/tasks/esp32_flash.ex b/lib/mix/tasks/esp32_flash.ex index 4920f36..de4ba7e 100644 --- a/lib/mix/tasks/esp32_flash.ex +++ b/lib/mix/tasks/esp32_flash.ex @@ -59,8 +59,21 @@ defmodule Mix.Tasks.Atomvm.Esp32.Flash do tool_args = if port == "auto", do: tool_args, else: ["--port", port] ++ tool_args - tool_full_path = get_esptool_path(idf_path) - System.cmd(tool_full_path, tool_args, stderr_to_stdout: true, into: IO.stream(:stdio, 1)) + case Code.ensure_loaded(Pythonx) do + {:module, Pythonx} -> + IO.inspect("Flashing using Pythonx installed esptool..") + ExAtomVM.EsptoolHelper.setup() + + case ExAtomVM.EsptoolHelper.flash_pythonx(tool_args) do + true -> exit({:shutdown, 0}) + false -> exit({:shutdown, 1}) + end + + _ -> + IO.inspect("Flashing using esptool..") + tool_full_path = get_esptool_path(idf_path) + System.cmd(tool_full_path, tool_args, stderr_to_stdout: true, into: IO.stream(:stdio, 1)) + end end defp get_esptool_path(<<"">>) do diff --git a/mix.exs b/mix.exs index 141bf37..088a8f7 100644 --- a/mix.exs +++ b/mix.exs @@ -21,9 +21,9 @@ defmodule ExAtomVM.MixProject do # Run "mix help deps" to learn about dependencies. defp deps do [ - {:uf2tool, "1.1.0"} - # {:dep_from_hexpm, "~> 0.3.0"}, - # {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"} + {:uf2tool, "1.1.0"}, + {:pythonx, "~> 0.4.0", runtime: false, optional: true}, + {:req, "~> 0.5.0", runtime: false, optional: true} ] end end