Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions cactus/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ if(APPLE)
set(CACTUS_CURL_LIBRARY_MACOS "${CACTUS_CURL_ROOT}/macos/libcurl.a")
cactus_require_vendored_curl("${CACTUS_CURL_INCLUDE_DIR}" "${CACTUS_CURL_LIBRARY_MACOS}" "macOS")
find_library(COREML_FRAMEWORK CoreML REQUIRED)
find_library(METAL_FRAMEWORK Metal REQUIRED)
find_library(MPS_FRAMEWORK MetalPerformanceShaders REQUIRED)
find_library(MPSGRAPH_FRAMEWORK MetalPerformanceShadersGraph REQUIRED)
find_library(FOUNDATION_FRAMEWORK Foundation REQUIRED)
find_library(ACCELERATE_FRAMEWORK Accelerate REQUIRED)
find_library(SECURITY_FRAMEWORK Security REQUIRED)
Expand Down Expand Up @@ -80,10 +83,12 @@ list(REMOVE_ITEM KERNEL_SOURCES "${I8MM_SOURCE}")

if(APPLE)
set(NPU_SOURCES "npu/npu_ane.mm")
set_source_files_properties(npu/npu_ane.mm PROPERTIES COMPILE_FLAGS "-fobjc-arc")
message(STATUS "Apple platform detected - NPU/ANE acceleration enabled")
set(MPS_SOURCES "kernel/kernel_mps.mm")
set_source_files_properties(npu/npu_ane.mm kernel/kernel_mps.mm PROPERTIES COMPILE_FLAGS "-fobjc-arc")
message(STATUS "Apple platform detected - NPU/ANE and Metal/MPS acceleration enabled")
else()
set(NPU_SOURCES "npu/npu.cpp")
set(MPS_SOURCES "")
endif()

set(COMMON_SOURCES
Expand All @@ -93,6 +98,7 @@ set(COMMON_SOURCES
${FFI_SOURCES}
${MODEL_SOURCES}
${NPU_SOURCES}
${MPS_SOURCES}
${TELEMETRY_SOURCES}
)

Expand Down Expand Up @@ -123,6 +129,9 @@ function(configure_cactus_target target_name)
if(APPLE)
target_link_libraries(${target_name} PUBLIC
${COREML_FRAMEWORK}
${METAL_FRAMEWORK}
${MPS_FRAMEWORK}
${MPSGRAPH_FRAMEWORK}
${FOUNDATION_FRAMEWORK}
${ACCELERATE_FRAMEWORK}
${SECURITY_FRAMEWORK}
Expand Down
24 changes: 21 additions & 3 deletions cactus/graph/graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,9 @@ struct BufferDesc {
std::unique_ptr<char[]> owned_scales;

bool is_interleaved = false;
size_t original_N = 0;
size_t original_N = 0;

bool pending_gpu_write = false;

void* activation_scales_data = nullptr;
std::unique_ptr<char[]> owned_activation_scales;
Expand All @@ -261,11 +263,20 @@ struct BufferDesc {
void* get_data();
const void* get_data() const;

void flush_if_pending();
void flush_if_pending() const;

template<typename T>
T* data_as() { flush_if_pending(); return static_cast<T*>(get_data()); }

template<typename T>
T* data_as() { return static_cast<T*>(get_data()); }
const T* data_as() const { flush_if_pending(); return static_cast<const T*>(get_data()); }

template<typename T>
const T* data_as() const { return static_cast<const T*>(get_data()); }
T* data_ptr_raw() { return static_cast<T*>(get_data()); }

template<typename T>
const T* data_ptr_raw() const { return static_cast<const T*>(get_data()); }

const __fp16* scales_as_fp16() const {
return reinterpret_cast<const __fp16*>(scales_data);
Expand Down Expand Up @@ -752,6 +763,11 @@ namespace GraphFile {
void prefetch_pages();

private:
enum class StorageMode {
MappedFile,
OwnedRam,
};

int fd_;
void* mapped_data_;
size_t file_size_, data_offset_;
Expand All @@ -766,9 +782,11 @@ namespace GraphFile {

bool is_interleaved_ = false;
size_t original_N_ = 0;
StorageMode storage_mode_ = StorageMode::MappedFile;

void parse_header();
void apply_madvise_hints();
void load_file_into_ram();
};
}

Expand Down
19 changes: 19 additions & 0 deletions cactus/graph/graph_core.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "graph.h"
#include "../kernel/kernel.h"
#include <algorithm>
#include <stdexcept>
#include <cstring>
Expand Down Expand Up @@ -155,6 +156,24 @@ const void* BufferDesc::get_data() const {
return data.get();
}

void BufferDesc::flush_if_pending() {
#ifdef __APPLE__
if (pending_gpu_write) {
cactus_mps_synchronize();
pending_gpu_write = false;
}
#endif
}

void BufferDesc::flush_if_pending() const {
#ifdef __APPLE__
if (pending_gpu_write) {
cactus_mps_synchronize();
const_cast<BufferDesc*>(this)->pending_gpu_write = false;
}
#endif
}

void BufferDesc::allocate() {
if (!data && !external_data && !pooled_data) {
data = std::make_unique<char[]>(byte_size);
Expand Down
5 changes: 5 additions & 0 deletions cactus/graph/graph_execute.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "graph.h"
#include "../kernel/kernel.h"
#include "../kernel/kernel_utils.h"
#include <algorithm>
#include <chrono>
Expand Down Expand Up @@ -700,6 +701,10 @@ void CactusGraph::execute(const std::string& profile_file) {
}
}

#ifdef __APPLE__
cactus_mps_flush();
#endif

if (enable_profiling) {
auto total_end = std::chrono::high_resolution_clock::now();
auto total_duration = std::chrono::duration_cast<std::chrono::microseconds>(total_end - total_start);
Expand Down
140 changes: 126 additions & 14 deletions cactus/graph/graph_io.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
#include "graph.h"
#include "graph_param_io.h"
#include "../kernel/kernel.h"
#include <algorithm>
#include <cerrno>
#include <cctype>
#include <cstdlib>
#include <fstream>
#include <stdexcept>
#include <sys/mman.h>
Expand Down Expand Up @@ -28,6 +33,31 @@ namespace {
return offset + (alignment - remainder);
}

std::string normalize_storage_mode(std::string value) {
std::transform(value.begin(), value.end(), value.begin(),
[](unsigned char c) { return static_cast<char>(std::tolower(c)); });
return value;
}

bool should_avoid_mmap_for_tensor_files() {
const char* env_mode = std::getenv("CACTUS_WEIGHT_STORAGE");
if (env_mode && *env_mode) {
const std::string mode = normalize_storage_mode(env_mode);
if (mode == "ram" || mode == "copy" || mode == "heap") {
return true;
}
if (mode == "mmap" || mode == "map" || mode == "mapped") {
return false;
}
}

#ifdef __APPLE__
return cactus_mps_enabled() && cactus_mps_available();
#else
return false;
#endif
}

inline void write_u32(std::ostream& out, uint32_t v) {
out.write(reinterpret_cast<const char*>(&v), sizeof(v));
}
Expand Down Expand Up @@ -598,24 +628,49 @@ MappedFile::MappedFile(const std::string& filename)
}
file_size_ = static_cast<size_t>(st.st_size);

mapped_data_ = mmap(nullptr, file_size_, PROT_READ, MAP_SHARED, fd_, 0);
if (mapped_data_ == MAP_FAILED) {
close(fd_);
throw std::runtime_error("Cannot map file: " + filename);
}
try {
if (should_avoid_mmap_for_tensor_files()) {
storage_mode_ = StorageMode::OwnedRam;
load_file_into_ram();
} else {
mapped_data_ = mmap(nullptr, file_size_, PROT_READ, MAP_SHARED, fd_, 0);
if (mapped_data_ == MAP_FAILED) {
throw std::runtime_error("Cannot map file: " + filename);
}
storage_mode_ = StorageMode::MappedFile;
}

close(fd_);
fd_ = -1;
close(fd_);
fd_ = -1;

parse_header();
apply_madvise_hints();
parse_header();
apply_madvise_hints();
} catch (...) {
if (fd_ != -1) {
close(fd_);
fd_ = -1;
}
if (storage_mode_ == StorageMode::MappedFile &&
mapped_data_ != nullptr && mapped_data_ != MAP_FAILED) {
munmap(mapped_data_, file_size_);
mapped_data_ = nullptr;
} else if (storage_mode_ == StorageMode::OwnedRam && mapped_data_ != nullptr) {
free(mapped_data_);
mapped_data_ = nullptr;
}
throw;
}
}

MappedFile::~MappedFile() {
if (mapped_data_ != nullptr && mapped_data_ != MAP_FAILED) {
if (storage_mode_ == StorageMode::MappedFile &&
mapped_data_ != nullptr && mapped_data_ != MAP_FAILED) {
madvise(mapped_data_, file_size_, MADV_DONTNEED);
munmap(mapped_data_, file_size_);
mapped_data_ = nullptr;
} else if (storage_mode_ == StorageMode::OwnedRam && mapped_data_ != nullptr) {
free(mapped_data_);
mapped_data_ = nullptr;
}
if (fd_ != -1) {
close(fd_);
Expand All @@ -631,18 +686,23 @@ MappedFile::MappedFile(MappedFile&& other) noexcept
scales_offset_(other.scales_offset_), scales_bytes_(other.scales_bytes_),
alignment_(other.alignment_),
is_interleaved_(other.is_interleaved_),
original_N_(other.original_N_) {
original_N_(other.original_N_),
storage_mode_(other.storage_mode_) {
other.fd_ = -1;
other.mapped_data_ = nullptr;
other.file_size_ = 0;
other.is_interleaved_ = false;
other.original_N_ = 0;
other.storage_mode_ = StorageMode::MappedFile;
}

MappedFile& MappedFile::operator=(MappedFile&& other) noexcept {
if (this != &other) {
if (mapped_data_ != nullptr && mapped_data_ != MAP_FAILED) {
if (storage_mode_ == StorageMode::MappedFile &&
mapped_data_ != nullptr && mapped_data_ != MAP_FAILED) {
munmap(mapped_data_, file_size_);
} else if (storage_mode_ == StorageMode::OwnedRam && mapped_data_ != nullptr) {
free(mapped_data_);
}
if (fd_ != -1) {
close(fd_);
Expand All @@ -662,11 +722,13 @@ MappedFile& MappedFile::operator=(MappedFile&& other) noexcept {
alignment_ = other.alignment_;
is_interleaved_ = other.is_interleaved_;
original_N_ = other.original_N_;
storage_mode_ = other.storage_mode_;
other.fd_ = -1;
other.mapped_data_ = nullptr;
other.file_size_ = 0;
other.is_interleaved_ = false;
other.original_N_ = 0;
other.storage_mode_ = StorageMode::MappedFile;
}
return *this;
}
Expand Down Expand Up @@ -771,6 +833,10 @@ void MappedFile::parse_header() {
}

void MappedFile::apply_madvise_hints() {
if (storage_mode_ != StorageMode::MappedFile) {
return;
}

if (scales_bytes_ > 0 && scales_offset_ > 0) {
madvise(static_cast<char*>(mapped_data_) + scales_offset_, scales_bytes_, MADV_WILLNEED);
}
Expand All @@ -783,7 +849,10 @@ void MappedFile::apply_madvise_hints() {
}

void MappedFile::release_pages() {
if (mapped_data_ == nullptr || mapped_data_ == MAP_FAILED) return;
if (storage_mode_ != StorageMode::MappedFile ||
mapped_data_ == nullptr || mapped_data_ == MAP_FAILED) {
return;
}

if (scales_bytes_ > 0 && scales_offset_ > 0) {
madvise(static_cast<char*>(mapped_data_) + scales_offset_, scales_bytes_, MADV_DONTNEED);
Expand All @@ -792,14 +861,57 @@ void MappedFile::release_pages() {
}

void MappedFile::prefetch_pages() {
if (mapped_data_ == nullptr || mapped_data_ == MAP_FAILED) return;
if (storage_mode_ != StorageMode::MappedFile ||
mapped_data_ == nullptr || mapped_data_ == MAP_FAILED) {
return;
}

if (scales_bytes_ > 0 && scales_offset_ > 0) {
madvise(static_cast<char*>(mapped_data_) + scales_offset_, scales_bytes_, MADV_WILLNEED);
}
madvise(static_cast<char*>(mapped_data_) + data_offset_, byte_size_, MADV_WILLNEED);
}

void MappedFile::load_file_into_ram() {
if (file_size_ == 0) {
throw std::runtime_error("Cannot load empty tensor file into RAM");
}

long page_size = sysconf(_SC_PAGESIZE);
if (page_size <= 0) {
page_size = 4096;
}

void* buffer = nullptr;
const int alloc_rc = posix_memalign(&buffer, static_cast<size_t>(page_size), file_size_);
if (alloc_rc != 0 || buffer == nullptr) {
throw std::runtime_error("Cannot allocate RAM buffer for tensor file");
}

char* dst = static_cast<char*>(buffer);
size_t total_read = 0;
constexpr size_t kMaxReadChunk = static_cast<size_t>(1) << 30; // Keep macOS read() calls well below INT_MAX.
while (total_read < file_size_) {
size_t remaining = file_size_ - total_read;
size_t chunk_size = std::min(remaining, kMaxReadChunk);
ssize_t bytes_read = read(fd_, dst + total_read, chunk_size);
if (bytes_read < 0) {
if (errno == EINTR) {
continue;
}
free(buffer);
throw std::runtime_error("Cannot read tensor file into RAM");
}
if (bytes_read == 0) {
free(buffer);
throw std::runtime_error("Unexpected EOF while reading tensor file into RAM");
}
total_read += static_cast<size_t>(bytes_read);
}

mapped_data_ = buffer;
}

template const int8_t* MappedFile::typed_data<int8_t>() const;
template const float* MappedFile::typed_data<float>() const;
template const uint16_t* MappedFile::typed_data<uint16_t>() const;
Expand Down
Loading
Loading