Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/faster ccsds offsets #42

Merged
merged 8 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions src/gribjump/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,8 @@ if( HAVE_GRIBJUMP_LOCAL_EXTRACT )
compression/compressors/Ccsds.cc
compression/compressors/Ccsds.h

compression/compressors/algorithms/SimplePacking.h
compression/compressors/algorithms/LibEccodes.h
compression/compressors/algorithms/LibEccodes.cc
compression/compressors/SimplePacking.h
compression/compressors/SimplePacking.cc

FDBPlugin.cc
FDBPlugin.h
Expand Down
4 changes: 4 additions & 0 deletions src/gribjump/FDBPlugin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,12 @@ FDBPlugin::FDBPlugin() {
static bool enableGribjump = eckit::Resource<bool>("fdbEnableGribjump;$FDB_ENABLE_GRIBJUMP", false);
static bool disableGribjump = eckit::Resource<bool>("fdbDisableGribjump;$FDB_DISABLE_GRIBJUMP", false); // Emergency off-switch
if (enableGribjump && !disableGribjump) {
LOG_DEBUG_LIB(LibGribJump) << "FDBPlugin has been enabled" << std::endl;
FDBPlugin::instance().addFDB(fdb);
}
else {
LOG_DEBUG_LIB(LibGribJump) << "FDBPlugin has been disabled" << std::endl;
}
});
}

Expand Down
4 changes: 0 additions & 4 deletions src/gribjump/GribJumpDataAccessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@ class GribJumpDataAccessor : public mc::DataAccessor {
public:
GribJumpDataAccessor(eckit::DataHandle& dh, const mc::Block range) : dh_{dh}, data_section_range_{range} {}

void write(const eckit::Buffer& buffer, const size_t offset) const override {
NOTIMP;
}

eckit::Buffer read(const mc::Block& range) const override {
eckit::Offset offset = range.first;
eckit::Length size = range.second;
Expand Down
58 changes: 2 additions & 56 deletions src/gribjump/compression/DataAccessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,75 +20,21 @@
#include "eckit/exception/Exceptions.h"
#include "eckit/io/Buffer.h"

namespace mc {
namespace gribjump::mc {

class DataAccessor {
public:
virtual ~DataAccessor() = default;
virtual void write(const eckit::Buffer& buffer, const size_t offset) const = 0;
virtual eckit::Buffer read(const Block& range) const = 0;
virtual eckit::Buffer read() const = 0;
virtual size_t eof() const = 0;
};


class PosixAccessor : public DataAccessor {
public:
explicit PosixAccessor(const std::string& path) : ifs_{path, std::ifstream::binary} {}
~PosixAccessor() {
ifs_.close();
}

void write(const eckit::Buffer& buffer, const size_t offset) const override {
NOTIMP;
}

eckit::Buffer read(const Block& range) const override {
const auto [offset, size] = range;
eckit::Buffer buf(size);
ifs_.seekg(offset, std::ios::beg);
ifs_.read(reinterpret_cast<char*>(buf.data()), size);
if (!ifs_) {
std::stringstream ss;
ss << "Error: only " << ifs_.gcount() << " could be read";
throw eckit::ReadError(ss.str(), Here());
}
assert(size != 0);
return eckit::Buffer{buf.data(), size};
}

eckit::Buffer read() const override {
ifs_.seekg(0, std::ios::end);
size_t size = ifs_.tellg();
ifs_.seekg(0, std::ios::beg);
eckit::Buffer buf(size);
ifs_.read(reinterpret_cast<char*>(buf.data()), size);
if (!ifs_) {
std::stringstream ss;
ss << "Error: only " << ifs_.gcount() << " could be read";
throw eckit::ReadError(ss.str(), Here());
}
assert(size != 0);
return eckit::Buffer{buf.data(), size};
}

size_t eof() const override {
ifs_.seekg(0, std::ios::end);
return ifs_.tellg();
}
private:
mutable std::ifstream ifs_;
};


class MemoryAccessor : public DataAccessor {
public:
explicit MemoryAccessor(const eckit::Buffer& buffer) : buf_{buffer.data(), buffer.size()} {}

void write(const eckit::Buffer& buffer, const size_t offset) const override {
NOTIMP;
}

eckit::Buffer read(const Block& range) const override {
const auto [offset, size] = range;
if (offset + size > buf_.size()) {
Expand All @@ -110,4 +56,4 @@ class MemoryAccessor : public DataAccessor {
eckit::Buffer buf_;
};

} // namespace mc
} // namespace gribjump::mc
111 changes: 66 additions & 45 deletions src/gribjump/compression/NumericCompressor.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,66 +18,87 @@
#include <eckit/io/Buffer.h>
#include <unordered_map>

namespace mc {
namespace gribjump::mc {

template <typename ValueType>
class NumericDecompressor;

template <typename ValueType>
class NumericCompressor {
public:
using CompressedData = eckit::Buffer;
using Values = std::vector<ValueType>;
virtual std::pair<std::unique_ptr<NumericDecompressor<ValueType>>, CompressedData> encode(const Values&) = 0;
using CompressedData = eckit::Buffer;
using Values = std::vector<ValueType>;
virtual std::pair<std::unique_ptr<NumericDecompressor<ValueType>>, CompressedData> encode(const Values&) = 0;
};

template <typename ValueType>
class NumericDecompressor {
public:
using CompressedData = eckit::Buffer;
using Values = std::vector<ValueType>;
virtual Values decode(const CompressedData&) = 0;
virtual Values decode(const std::shared_ptr<DataAccessor>, const Block&) = 0;


virtual std::vector<Values> decode(const std::shared_ptr<DataAccessor>& accessor, const std::vector<mc::Block>& ranges) {
using Values = typename NumericDecompressor<ValueType>::Values;
std::vector<Values> result;
decode(accessor, ranges, result);
return result;
}

virtual void decode(const std::shared_ptr<DataAccessor>& accessor, const std::vector<mc::Block>& ranges, std::vector<Values>& result) {
using Values = typename NumericDecompressor<ValueType>::Values;

std::unordered_map<Block, std::pair<Block, std::shared_ptr<Values>>> ranges_map;

// find which sub_ranges are in which buckets
BlockBuckets buckets;
for (const auto& range : ranges) {
buckets << range;
using CompressedData = eckit::Buffer;
using Values = std::vector<ValueType>;
using Offsets = std::vector<size_t>;

virtual Values decode(const CompressedData&) = 0;
virtual Values decode(const std::shared_ptr<DataAccessor>, const Block&) = 0;
virtual Offsets decode_offsets(const CompressedData&) {NOTIMP;}

virtual std::vector<Values> decode(const std::shared_ptr<DataAccessor>& accessor, const std::vector<mc::Block>& ranges) {
using Values = typename NumericDecompressor<ValueType>::Values;
std::vector<Values> result;
decode(accessor, ranges, result);
return result;
}

// inverse buckets and associate data with ranges
for (const auto& [bucket_range, bucket_sub_ranges] : buckets) {
auto decoded = std::make_shared<Values>(decode(accessor, bucket_range));
for (const auto& bucket_sub_range : bucket_sub_ranges) {
ranges_map[bucket_sub_range] = std::make_pair(bucket_range, decoded);
}
virtual void decode(const std::shared_ptr<DataAccessor>& accessor, const std::vector<mc::Block>& ranges, std::vector<Values>& result) {
using Values = typename NumericDecompressor<ValueType>::Values;

std::unordered_map<Block, std::pair<Block, std::shared_ptr<Values>>> ranges_map;

// find which sub_ranges are in which buckets
BlockBuckets buckets;
for (const auto& range : ranges) {
buckets << range;
}

// inverse buckets and associate data with ranges
for (const auto& [bucket_range, bucket_sub_ranges] : buckets) {
auto decoded = std::make_shared<Values>(decode(accessor, bucket_range));
for (const auto& bucket_sub_range : bucket_sub_ranges) {
ranges_map[bucket_sub_range] = std::make_pair(bucket_range, decoded);
}
}

// assign data to ranges
for (const auto& user_range : ranges) {
const auto& [bucket_range, decoded] = ranges_map[user_range];
const auto& [bucket_range_offset, bucket_range_size] = bucket_range;
const auto& [user_range_offset, user_range_size] = user_range;

ValueType* data_start = reinterpret_cast<ValueType*>(decoded->data()) + (user_range_offset - bucket_range_offset);
Values values(data_start, data_start + user_range_size);
result.push_back(std::move(values));
}
}
};

// assign data to ranges
for (const auto& user_range : ranges) {
const auto& [bucket_range, decoded] = ranges_map[user_range];
const auto& [bucket_range_offset, bucket_range_size] = bucket_range;
const auto& [user_range_offset, user_range_size] = user_range;

ValueType* data_start = reinterpret_cast<ValueType*>(decoded->data()) + (user_range_offset - bucket_range_offset);
Values values(data_start, data_start + user_range_size);
result.push_back(std::move(values));
/// @todo: this is copy pasted from eccodes - do we *really* need this?
/* Return n to the power of s */
template <typename T>
constexpr T codes_power(long s, long n) {
T divisor = 1.0;
if (s == 0)
return 1.0;
if (s == 1)
return n;
while (s < 0) {
divisor /= n;
s++;
}
while (s > 0) {
divisor *= n;
s--;
}
return divisor;
}

}
};

} // namespace mc
} // namespace gribjump::mc
27 changes: 15 additions & 12 deletions src/gribjump/compression/Range.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,33 @@
#include <algorithm>
#include <cassert>

std::pair<size_t, size_t> begin_end(const mc::Block& range)
namespace gribjump::mc{

std::pair<size_t, size_t> begin_end(const Block& range)
{
const auto [offset, size] = range;
return {offset, offset + size};
}


mc::Block operator+(const mc::Block& r1, const mc::Block& r2)
Block operator+(const Block& r1, const Block& r2)
{
auto [b1, e1] = begin_end(r1);
auto [b2, e2] = begin_end(r2);
assert(!((b1 > e2) && (b2 > e1)));
return mc::Block{std::min(b1, b2), std::max(e1, e2) - std::min(b1, b2)};
return Block{std::min(b1, b2), std::max(e1, e2) - std::min(b1, b2)};
}


std::ostream& operator<<(std::ostream& os, const mc::Block& range)
std::ostream& operator<<(std::ostream& os, const Block& range)
{
auto [rb, re] = begin_end(range);
os << "[" << rb << ", " << re << "]";
return os;
}


std::ostream& operator<<(std::ostream& os, const mc::BlockBucket& range)
std::ostream& operator<<(std::ostream& os, const BlockBucket& range)
{
os << range.first << std::endl;
for (const auto& r : range.second) {
Expand All @@ -47,20 +49,20 @@ std::ostream& operator<<(std::ostream& os, const mc::BlockBucket& range)
}


mc::BlockBuckets& operator<<(mc::BlockBuckets& buckets, const mc::Block& r) {
BlockBuckets& operator<<(BlockBuckets& buckets, const Block& r) {

const mc::Block sub_range{r};
const Block sub_range{r};
const auto [sub_start, sub_end] = begin_end(sub_range);

// Find the position where the range might be inserted
auto it = std::lower_bound(buckets.begin(), buckets.end(), sub_range,
[](const mc::BlockBucket& bucket, const mc::Block& range) {
[](const BlockBucket& bucket, const Block& range) {
const auto [bucket_start, bucket_end] = begin_end(bucket.first);
return bucket_end < range.first;
});

mc::Block merged_range = sub_range;
mc::SubBlocks merged_subranges = {sub_range};
Block merged_range = sub_range;
SubBlocks merged_subranges = {sub_range};

// Merge with any overlapping buckets before the insertion point
while (it != buckets.begin()) {
Expand Down Expand Up @@ -98,11 +100,12 @@ mc::BlockBuckets& operator<<(mc::BlockBuckets& buckets, const mc::Block& r) {
return buckets;
}

std::size_t std::hash<mc::Block>::operator() (const mc::Block& range) const
} // namespace gribjump::mc

std::size_t std::hash<gribjump::mc::Block>::operator() (const gribjump::mc::Block& range) const
{
static_assert(sizeof(std::size_t) == sizeof(std::uint64_t), "std::size_t must be 64 bits");
const auto [offset, size] = range;
return offset ^ (size << 32);
}


21 changes: 11 additions & 10 deletions src/gribjump/compression/Range.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,30 @@
#include <iostream>
#include <tuple>
#include <cstdint>
namespace mc {
namespace gribjump::mc {
using Block = std::pair<size_t, size_t>;
}

template<>
struct std::hash<mc::Block> {
std::size_t operator()(const mc::Block& range) const;
struct std::hash<gribjump::mc::Block> {
std::size_t operator()(const gribjump::mc::Block& range) const;
};


namespace mc {
namespace gribjump::mc {

// A bucket is a continous range of data that can be decoded in one go
std::pair<size_t, size_t> get_begin_end(const Block& range);
using SubBlock = Block;
using SubBlocks = std::vector<SubBlock>;
using BlockBucket = std::pair<Block, SubBlocks>;
using BlockBuckets = std::vector<BlockBucket>; // Sorted to allow binary search (std::lower_bound)
} // namespace mc

std::ostream& operator<<(std::ostream& os, const mc::Block& range);
std::ostream& operator<<(std::ostream& os, const mc::BlockBucket& bucket);
mc::BlockBuckets& operator<<(mc::BlockBuckets& buckets, const mc::Block& r);
mc::Block operator+(const mc::Block& r1, const mc::Block& r2);
std::ostream& operator<<(std::ostream& os, const Block& range);
std::ostream& operator<<(std::ostream& os, const BlockBucket& bucket);
BlockBuckets& operator<<(BlockBuckets& buckets, const Block& r);
Block operator+(const Block& r1, const Block& r2);

std::pair<size_t, size_t> begin_end(const mc::Block& range);
std::pair<size_t, size_t> begin_end(const Block& range);

} // namespace gribjump::mc
Loading
Loading