diff --git a/CHANGELOG.md b/CHANGELOG.md index 6774e4e10..0ec442fd7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ Attention: The newest changes should be on top --> ### Added +- ENH: Add thrustcurve api integration to retrieve motor eng data [#870](https://github.com/RocketPy-Team/RocketPy/pull/870) - ENH: Custom Exception errors and messages [#285](https://github.com/RocketPy-Team/RocketPy/issues/285) ### Changed diff --git a/docs/user/motors/genericmotor.rst b/docs/user/motors/genericmotor.rst index f9da46fd0..8c5b40703 100644 --- a/docs/user/motors/genericmotor.rst +++ b/docs/user/motors/genericmotor.rst @@ -106,3 +106,30 @@ note that the user can still provide the parameters manually if needed. The ``load_from_eng_file`` method is a very useful tool for simulating motors \ when the user does not have all the information required to build a ``SolidMotor`` yet. +The ``load_from_thrustcurve_api`` method +---------------------------------------- + +The ``GenericMotor`` class provides a convenience loader that downloads a temporary +`.eng` file from the ThrustCurve.org public API and builds a ``GenericMotor`` +instance from it. This is useful when you know a motor designation (for example +``"M1670"``) but do not want to manually download and +save the `.eng` file. + +.. note:: + + This method performs network requests to the ThrustCurve API. Use it only + when you have network access. For automated testing or reproducible runs, + prefer using local `.eng` files. + +Example +------- + +.. jupyter-execute:: + + from rocketpy.motors import GenericMotor + + # Build a motor by name (requires network access) + motor = GenericMotor.load_from_thrustcurve_api("M1670") + + # Use the motor as usual + motor.info() diff --git a/rocketpy/motors/motor.py b/rocketpy/motors/motor.py index 7930ed52b..c81c713d4 100644 --- a/rocketpy/motors/motor.py +++ b/rocketpy/motors/motor.py @@ -1,11 +1,14 @@ +import base64 import re +import tempfile import warnings import xml.etree.ElementTree as ET from abc import ABC, abstractmethod from functools import cached_property -from os import path +from os import path, remove import numpy as np +import requests from ..mathutils.function import Function, funcify_method from ..plots.motor_plots import _MotorPlots @@ -1914,6 +1917,121 @@ def load_from_rse_file( coordinate_system_orientation=coordinate_system_orientation, ) + @staticmethod + def _call_thrustcurve_api(name: str): + """ + Download a .eng file from the ThrustCurve API + based on the given motor name. + + Parameters + ---------- + name : str + The motor name according to the API (e.g., "Cesaroni_M1670" or "M1670"). + Both manufacturer-prefixed and shorthand names are commonly used; if multiple + motors match the search, the first result is used. + + Returns + ------- + data_base64 : str + The .eng file of the motor in base64 + + Raises + ------ + ValueError + If no motor is found or if the downloaded .eng data is missing. + requests.exceptions.RequestException + If a network or HTTP error occurs during the API call. + """ + base_url = "https://www.thrustcurve.org/api/v1" + + # Step 1. Search motor + response = requests.get(f"{base_url}/search.json", params={"commonName": name}) + response.raise_for_status() + data = response.json() + + if not data.get("results"): + raise ValueError( + f"No motor found for name '{name}'. " + "Please verify the motor name format (e.g., 'Cesaroni_M1670' or 'M1670') and try again." + ) + + motor_info = data["results"][0] + motor_id = motor_info.get("motorId") + # NOTE: commented bc we don't use it, but keeping for possible future use + # designation = motor_info.get("designation", "").replace("/", "-") + # manufacturer = motor_info.get("manufacturer", "") + + # Step 2. Download the .eng file + dl_response = requests.get( + f"{base_url}/download.json", + params={"motorIds": motor_id, "format": "RASP", "data": "file"}, + ) + dl_response.raise_for_status() + dl_data = dl_response.json() + + if not dl_data.get("results"): + raise ValueError( + f"No .eng file found for motor '{name}' in the ThrustCurve API." + ) + + data_base64 = dl_data["results"][0].get("data") + if not data_base64: + raise ValueError( + f"Downloaded .eng data for motor '{name}' is empty or invalid." + ) + return data_base64 + + @staticmethod + def load_from_thrustcurve_api(name: str, **kwargs): + """ + Creates a Motor instance by downloading a .eng file from the ThrustCurve API + based on the given motor name. + + Parameters + ---------- + name : str + The motor name according to the API (e.g., "Cesaroni_M1670" or "M1670"). + Both manufacturer-prefixed and shorthand names are commonly used; if multiple + motors match the search, the first result is used. + **kwargs : + Additional arguments passed to the Motor constructor or loader, such as + dry_mass, nozzle_radius, etc. + + Returns + ------- + instance : GenericMotor + A new GenericMotor instance initialized using the downloaded .eng file. + + Raises + ------ + ValueError + If no motor is found or if the downloaded .eng data is missing. + requests.exceptions.RequestException + If a network or HTTP error occurs during the API call. + """ + + data_base64 = GenericMotor._call_thrustcurve_api(name) + data_bytes = base64.b64decode(data_base64) + + # Step 3. Create the motor from the .eng file + tmp_path = None + try: + # create a temporary file that persists until we explicitly remove it + with tempfile.NamedTemporaryFile(suffix=".eng", delete=False) as tmp_file: + tmp_file.write(data_bytes) + tmp_file.flush() + tmp_path = tmp_file.name + + return GenericMotor.load_from_eng_file(tmp_path, **kwargs) + finally: + # Ensuring the temporary file is removed + if tmp_path and path.exists(tmp_path): + try: + remove(tmp_path) + except OSError: + # If cleanup fails, don't raise: we don't want to mask prior exceptions. + pass + def all_info(self): """Prints out all data and graphs available about the Motor.""" # Print motor details diff --git a/tests/unit/motors/test_genericmotor.py b/tests/unit/motors/test_genericmotor.py index 776d7b691..3d0fbd766 100644 --- a/tests/unit/motors/test_genericmotor.py +++ b/tests/unit/motors/test_genericmotor.py @@ -1,5 +1,8 @@ +import base64 + import numpy as np import pytest +import requests import scipy.integrate from rocketpy import Function, Motor @@ -211,3 +214,122 @@ def test_load_from_rse_file(generic_motor): assert thrust_curve[0][1] == 0.0 # First thrust point assert thrust_curve[-1][0] == 2.2 # Last point of time assert thrust_curve[-1][1] == 0.0 # Last thrust point + + +class MockResponse: + """Mocked response for requests.""" + + def __init__(self, json_data): + self._json_data = json_data + + def json(self): + return self._json_data + + def raise_for_status(self): + return None + + +def _mock_get(search_results=None, download_results=None): + """Return a mock_get function with predefined search/download results.""" + + def _get(url, **_kwargs): + if "search.json" in url: + return MockResponse(search_results or {"results": []}) + if "download.json" in url: + return MockResponse(download_results or {"results": []}) + raise RuntimeError(f"Unexpected URL: {url}") + + return _get + + +# Module-level constant for expected motor specs +EXPECTED_MOTOR_SPECS = { + "burn_time": (0, 3.9), + "dry_mass": 2.130, + "propellant_initial_mass": 3.101, + "chamber_radius": 75 / 1000, + "chamber_height": 757 / 1000, + "nozzle_radius": (75 / 1000) * 0.85, + "average_thrust": 1545.218, + "total_impulse": 6026.350, + "max_thrust": 2200.0, + "exhaust_velocity": 1943.357, + "chamber_position": 0, +} + + +def assert_motor_specs(motor): + specs = EXPECTED_MOTOR_SPECS + assert motor.burn_time == specs["burn_time"] + assert motor.dry_mass == specs["dry_mass"] + assert motor.propellant_initial_mass == specs["propellant_initial_mass"] + assert motor.chamber_radius == specs["chamber_radius"] + assert motor.chamber_height == specs["chamber_height"] + assert motor.chamber_position == specs["chamber_position"] + assert motor.average_thrust == pytest.approx(specs["average_thrust"]) + assert motor.total_impulse == pytest.approx(specs["total_impulse"]) + assert motor.exhaust_velocity.average(*specs["burn_time"]) == pytest.approx( + specs["exhaust_velocity"] + ) + assert motor.max_thrust == pytest.approx(specs["max_thrust"]) + assert motor.nozzle_radius == pytest.approx(specs["nozzle_radius"]) + + +def test_load_from_thrustcurve_api(monkeypatch, generic_motor): + """Tests GenericMotor.load_from_thrustcurve_api with mocked API.""" + + eng_path = "data/motors/cesaroni/Cesaroni_M1670.eng" + with open(eng_path, "rb") as f: + encoded = base64.b64encode(f.read()).decode("utf-8") + + search_json = { + "results": [ + { + "motorId": "12345", + "designation": "Cesaroni_M1670", + "manufacturer": "Cesaroni", + } + ] + } + download_json = {"results": [{"data": encoded}]} + monkeypatch.setattr(requests, "get", _mock_get(search_json, download_json)) + monkeypatch.setattr(requests.Session, "get", _mock_get(search_json, download_json)) + + motor = type(generic_motor).load_from_thrustcurve_api("M1670") + + assert_motor_specs(motor) + + _, _, points = Motor.import_eng(eng_path) + assert motor.thrust.y_array == pytest.approx( + Function(points, "Time (s)", "Thrust (N)", "linear", "zero").y_array + ) + + error_cases = [ + ("No motor found", {"results": []}, None), + ( + "No .eng file found", + { + "results": [ + {"motorId": "123", "designation": "Fake", "manufacturer": "Test"} + ] + }, + {"results": []}, + ), + ( + "Downloaded .eng data", + { + "results": [ + {"motorId": "123", "designation": "Fake", "manufacturer": "Test"} + ] + }, + {"results": [{"data": ""}]}, + ), + ] + + for msg, search_res, download_res in error_cases: + monkeypatch.setattr(requests, "get", _mock_get(search_res, download_res)) + monkeypatch.setattr( + requests.Session, "get", _mock_get(search_res, download_res) + ) + with pytest.raises(ValueError, match=msg): + type(generic_motor).load_from_thrustcurve_api("FakeMotor")