Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[stdlib] Adds functionality to spawn and manage processes from exec. file #3998

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions stdlib/src/builtin/file_descriptor.mojo
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ f.close()
```

"""
from sys import os_is_macos, os_is_linux
from sys._amdgpu import printf_append_string_n, printf_begin
from sys.ffi import external_call
from sys.info import is_amd_gpu, is_nvidia_gpu

from builtin.io import _printf
from builtin.os import abort
from memory import Span, UnsafePointer


Expand Down Expand Up @@ -85,6 +87,38 @@ struct FileDescriptor(Writer):
written,
)

@always_inline
fn read_bytes(mut self, size: Int) raises -> Span[Byte, MutableAnyOrigin]:
"""
Read a Span of bytes from the file.

Args:
size: Number of bytes to read.

Returns:
A list of bytes.
"""

@parameter
if is_nvidia_gpu():
constrained[False, "Nvidia GPU read bytes not implemented"]()
return abort[Span[Byte, MutableAnyOrigin]]()
elif is_amd_gpu():
constrained[False, "AMD GPU read bytes not implemented"]()
return abort[Span[Byte, MutableAnyOrigin]]()
elif os_is_macos() or os_is_linux():
var buf = UnsafePointer[UInt8].alloc(size)
read = external_call["read", Int](self.value, buf, size)

if read < 0:
buf.free()
raise Error("Failed to read bytes.")

return Span[Byte, MutableAnyOrigin](ptr=buf, length=size)
else:
constrained[False, "Unknown platform read bytes not implemented"]()
return abort[Span[Byte, MutableAnyOrigin]]()

fn write[*Ts: Writable](mut self, *args: *Ts):
"""Write a sequence of Writable arguments to the provided Writer.

Expand Down
1 change: 1 addition & 0 deletions stdlib/src/os/__init__.mojo
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ from .os import (
unlink,
)
from .pathlike import PathLike
from .process import Process
7 changes: 6 additions & 1 deletion stdlib/src/os/os.mojo
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@ from os import listdir

from collections import InlineArray, List
from collections.string import StringSlice
from sys import external_call, is_gpu, os_is_linux, os_is_windows
from sys import (
external_call,
is_gpu,
os_is_linux,
os_is_windows,
)
from sys.ffi import OpaquePointer, c_char

from memory import UnsafePointer
Expand Down
291 changes: 291 additions & 0 deletions stdlib/src/os/process.mojo
Original file line number Diff line number Diff line change
@@ -0,0 +1,291 @@
# ===----------------------------------------------------------------------=== #
# Copyright (c) 2025, Modular Inc. All rights reserved.
#
# Licensed under the Apache License v2.0 with LLVM Exceptions:
# https://llvm.org/LICENSE.txt
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ===----------------------------------------------------------------------=== #
"""Implements os methods for dealing with processes.

Example:

```mojo
from os import Process
```
"""
from collections import Optional
from collections.string import StringSlice

from sys import (
os_is_linux,
os_is_macos,
os_is_windows,
)
from sys._libc import (
vfork,
execvp,
exit,
kill,
SignalCodes,
pipe,
fcntl,
FcntlCommands,
FcntlFDFlags,
close,
)
from sys.ffi import c_char, c_int, c_str_ptr
from sys.os import sep

from memory import Span, UnsafePointer


# ===----------------------------------------------------------------------=== #
# Process comm.
# ===----------------------------------------------------------------------=== #
struct Pipe:
"""Create a pipe for interprocess communication.

Example usage:
```
pipe().write_bytes("TEST".as_bytes())
```
"""

var fd_in: Optional[FileDescriptor]
"""File descriptor for pipe input."""
var fd_out: Optional[FileDescriptor]
"""File descriptor for pipe output."""

fn __init__(
mut self,
in_close_on_exec: Bool = False,
out_close_on_exec: Bool = False,
) raises:
"""Struct to manage interprocess pipe comms.

Args:
in_close_on_exec: Close the read side of pipe if `exec` sys. call is issued in process.
out_close_on_exec: Close the write side of pipe if `exec` sys. call is issued in process.
"""
var pipe_fds = UnsafePointer[c_int].alloc(2)
if pipe(pipe_fds) < 0:
pipe_fds.free()
raise Error("Failed to create pipe")

if in_close_on_exec:
if not self._set_close_on_exec(pipe_fds[0]):
pipe_fds.free()
raise Error("Failed to configure input pipe close on exec")

if out_close_on_exec:
if not self._set_close_on_exec(pipe_fds[1]):
pipe_fds.free()
raise Error("Failed to configure output pipe close on exec")

self.fd_in = FileDescriptor(Int(pipe_fds[0]))
self.fd_out = FileDescriptor(Int(pipe_fds[1]))
pipe_fds.free()

fn __del__(owned self):
"""Ensures pipes input and output file descriptors are closed, when the object is destroyed.
"""
self.set_input_only()
self.set_output_only()

@staticmethod
fn _set_close_on_exec(fd: c_int) -> Bool:
return (
fcntl(
fd,
FcntlCommands.F_SETFD,
fcntl(fd, FcntlCommands.F_GETFD, 0) | FcntlFDFlags.FD_CLOEXEC,
)
== 0
)

@always_inline
fn set_input_only(mut self):
"""Close the output descriptor/ channel for this side of the pipe."""
if self.fd_out:
_ = close(rebind[Int](self.fd_out.value()))
self.fd_out = None

@always_inline
fn set_output_only(mut self):
"""Close the input descriptor/ channel for this side of the pipe."""
if self.fd_in:
_ = close(rebind[Int](self.fd_in.value()))
self.fd_in = None

@always_inline
fn write_bytes(mut self, bytes: Span[Byte, _]) raises:
"""
Write a span of bytes to the pipe.

Args:
bytes: The byte span to write to this pipe.

"""
if self.fd_out:
self.fd_out.value().write_bytes(bytes)
else:
raise Error("Can not write from read only side of pipe")

@always_inline
fn read_bytes(mut self, size: Int) raises -> Span[Byte, MutableAnyOrigin]:
"""
Read a span of bytes from this pipe.

Args:
size: The number of bytes to read from this pipe.

Returns:
Span of bytes with len=size read from this pipe.
"""
if self.fd_in:
return self.fd_in.value().read_bytes(size)

raise Error("Can not read from write only side of pipe")


# ===----------------------------------------------------------------------=== #
# Process execution
# ===----------------------------------------------------------------------=== #


struct Process:
"""Create and manage child processes from file executables.

Example usage:
```
child_process = Process.run("ls", List[String]("-lha"))
if child_process.interrupt():
print("Successfully interrupted.")
```
"""

var child_pid: c_int
"""Child process id."""

fn __init__(mut self, child_pid: c_int):
"""Struct to manage metadata about child process.
Use the `run` static method to create new process.

Args:
child_pid: The pid of child processed returned by `vfork` that the struct will manage.
"""

self.child_pid = child_pid

fn _kill(self, signal: Int) -> Bool:
# `kill` returns 0 on success and -1 on failure
return kill(self.child_pid, signal) > -1

fn hangup(self) -> Bool:
"""Send the Hang up signal to the managed child process.

Returns:
Upon successful completion, True is returned else False.
"""
return self._kill(SignalCodes.HUP)

fn interrupt(self) -> Bool:
"""Send the Interrupt signal to the managed child process.

Returns:
Upon successful completion, True is returned else False.
"""
return self._kill(SignalCodes.INT)

fn kill(self) -> Bool:
"""Send the Kill signal to the managed child process.

Returns:
Upon successful completion, True is returned else False.
"""
return self._kill(SignalCodes.KILL)

@staticmethod
fn run(path: String, argv: List[String]) raises -> Process:
"""Spawn new process from file executable.

Args:
path: The path to the file.
argv: A list of string arguments to be passed to executable.

Returns:
An instance of `Process` struct.
"""

@parameter
if os_is_linux() or os_is_macos():
var file_name = path.split(sep)[-1]
var pipe = Pipe(out_close_on_exec=True)
var exec_err_code = String("EXEC_ERR")

var pid = vfork()

if pid == 0:
"""Child process."""
pipe.set_output_only()

var arg_count = len(argv)
var argv_array_ptr_cstr_ptr = UnsafePointer[c_str_ptr].alloc(
arg_count + 2
)
var offset = 0
# Arg 0 in `argv` ptr array should be the file name
argv_array_ptr_cstr_ptr[offset] = file_name.unsafe_cstr_ptr()
offset += 1

for arg in argv:
argv_array_ptr_cstr_ptr[offset] = arg[].unsafe_cstr_ptr()
offset += 1

# `argv` ptr array terminates with NULL PTR
argv_array_ptr_cstr_ptr[offset] = c_str_ptr()

_ = execvp(path.unsafe_cstr_ptr(), argv_array_ptr_cstr_ptr)

# This will only get reached if exec call fails to replace currently executing code
argv_array_ptr_cstr_ptr.free()

# Canonical fork/ exec error handling pattern of using a pipe that closes on exec is
# used to signal error to parent process `https://cr.yp.to/docs/selfpipe.html`
pipe.write_bytes(exec_err_code.as_bytes())

exit(1)

elif pid < 0:
raise Error("Unable to fork parent")

pipe.set_input_only()
var err: Optional[Span[Byte, MutableAnyOrigin]]
try:
err = pipe.read_bytes(exec_err_code.byte_length())
except e:
err = None

if (
err
and len(err.value()) > 0
and StringSlice(unsafe_from_utf8=err.value()) == exec_err_code
):
raise Error("Failed to execute " + path)

return Process(child_pid=pid)
elif os_is_windows():
constrained[
False, "Windows process execution currently not implemented"
]()
return abort[Process]()
else:
constrained[
False, "Unknown platform process execution not implemented"
]()
return abort[Process]()
Loading
Loading