Skip to content

Commit 8712a46

Browse files
committed
Merge branch 'bugfix/moving-mask'
2 parents 7e52a5c + dd84099 commit 8712a46

16 files changed

+547
-49
lines changed

src/multio/action/statistics/OperationWindow.cc

+53-5
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ OperationWindow::OperationWindow(std::shared_ptr<StatisticsIO>& IOmanager, const
9898
lastFlush_{eckit::Date{0}, eckit::Time{0}},
9999
timeStepInSeconds_{0},
100100
count_{0},
101+
counts_{},
101102
type_{0} {
102103
load(IOmanager, opt);
103104
return;
@@ -115,25 +116,43 @@ OperationWindow::OperationWindow(const eckit::DateTime& epochPoint, const eckit:
115116
lastFlush_{epochPoint},
116117
timeStepInSeconds_{timeStepInSeconds},
117118
count_{0},
119+
counts_{},
118120
type_{windowType} {}
119121

120122

121123
long OperationWindow::count() const {
122124
return count_;
123125
}
124126

127+
const std::vector<long>& OperationWindow::counts() const {
128+
return counts_;
129+
}
130+
131+
template <typename T>
132+
void OperationWindow::updateCounts(const T* values, size_t size, double missingValue) const {
133+
initCountsLazy(size);
134+
std::transform(counts_.begin(), counts_.end(), values, counts_.begin(),
135+
[missingValue](long c, T v) { return v == missingValue ? c : c + 1; });
136+
return;
137+
}
138+
template void OperationWindow::updateCounts(const float* values, size_t size, double missingValue) const;
139+
template void OperationWindow::updateCounts(const double* values, size_t size, double missingValue) const;
140+
125141
void OperationWindow::dump(std::shared_ptr<StatisticsIO>& IOmanager, const StatisticsOptions& opt) const {
126-
IOBuffer restartState{IOmanager->getBuffer(restartSize())};
142+
const size_t writeSize = restartSize();
143+
IOBuffer restartState{IOmanager->getBuffer(writeSize)};
127144
restartState.zero();
128145
serialize(restartState, IOmanager->getCurrentDir() + "/operationWindow_dump.txt", opt);
129-
IOmanager->write("operationWindow", static_cast<size_t>(16), restartSize());
146+
IOmanager->write("operationWindow", writeSize, writeSize);
130147
IOmanager->flush();
131148
return;
132149
}
133150

134151
void OperationWindow::load(std::shared_ptr<StatisticsIO>& IOmanager, const StatisticsOptions& opt) {
135-
IOBuffer restartState{IOmanager->getBuffer(restartSize())};
136-
IOmanager->read("operationWindow", restartSize());
152+
size_t readSize;
153+
IOmanager->readSize("operationWindow", readSize);
154+
IOBuffer restartState{IOmanager->getBuffer(readSize)};
155+
IOmanager->read("operationWindow", readSize);
137156
deserialize(restartState, IOmanager->getCurrentDir() + "/operationWindow_load.txt", opt);
138157
restartState.zero();
139158
return;
@@ -157,6 +176,7 @@ void OperationWindow::updateWindow(const eckit::DateTime& startPoint, const ecki
157176
prevPoint_ = startPoint;
158177
endPoint_ = endPoint;
159178
count_ = 0;
179+
counts_.clear();
160180
return;
161181
}
162182

@@ -434,6 +454,20 @@ long OperationWindow::lastFlushInSteps() const {
434454
return (lastFlush_ - epochPoint_) / timeStepInSeconds_;
435455
}
436456

457+
void OperationWindow::initCountsLazy(size_t size) const {
458+
if (counts_.size() == size) {
459+
return;
460+
}
461+
if (counts_.size() == 0) {
462+
counts_.resize(size, 0);
463+
return;
464+
}
465+
466+
std::ostringstream os;
467+
os << *this << " : counts array is already initialized with a different size" << std::endl;
468+
throw eckit::SeriousBug(os.str(), Here());
469+
}
470+
437471
void OperationWindow::serialize(IOBuffer& currState, const std::string& fname, const StatisticsOptions& opt) const {
438472

439473
if (opt.debugRestart()) {
@@ -447,6 +481,7 @@ void OperationWindow::serialize(IOBuffer& currState, const std::string& fname, c
447481
outFile << "lastFlush_ :: " << lastFlush_ << std::endl;
448482
outFile << "timeStepInSeconds_ :: " << timeStepInSeconds_ << std::endl;
449483
outFile << "count_ :: " << count_ << std::endl;
484+
outFile << "counts_.size() :: " << counts_.size() << std::endl;
450485
outFile << "type_ :: " << type_ << std::endl;
451486
outFile.close();
452487
}
@@ -476,6 +511,12 @@ void OperationWindow::serialize(IOBuffer& currState, const std::string& fname, c
476511
currState[15] = static_cast<std::uint64_t>(count_);
477512
currState[16] = static_cast<std::uint64_t>(type_);
478513

514+
const size_t countsSize = counts_.size();
515+
currState[17] = static_cast<std::uint64_t>(countsSize);
516+
for (size_t i = 0; i < countsSize; ++i) {
517+
currState[i+18] = static_cast<std::uint64_t>(counts_[i]);
518+
}
519+
479520
currState.computeChecksum();
480521

481522
return;
@@ -495,6 +536,12 @@ void OperationWindow::deserialize(const IOBuffer& currState, const std::string&
495536
count_ = static_cast<long>(currState[15]);
496537
type_ = static_cast<long>(currState[16]);
497538

539+
const auto countsSize = static_cast<size_t>(currState[17]);
540+
counts_.resize(countsSize);
541+
for (size_t i = 0; i < countsSize; ++i) {
542+
counts_[i] = static_cast<long>(currState[i+18]);
543+
}
544+
498545
if (opt.debugRestart()) {
499546
std::ofstream outFile(fname);
500547
outFile << "epochPoint_ :: " << epochPoint_ << std::endl;
@@ -506,6 +553,7 @@ void OperationWindow::deserialize(const IOBuffer& currState, const std::string&
506553
outFile << "lastFlush_ :: " << lastFlush_ << std::endl;
507554
outFile << "timeStepInSeconds_ :: " << timeStepInSeconds_ << std::endl;
508555
outFile << "count_ :: " << count_ << std::endl;
556+
outFile << "counts_.size() :: " << counts_.size() << std::endl;
509557
outFile << "type_ :: " << type_ << std::endl;
510558
outFile.close();
511559
}
@@ -514,7 +562,7 @@ void OperationWindow::deserialize(const IOBuffer& currState, const std::string&
514562
}
515563

516564
size_t OperationWindow::restartSize() const {
517-
return static_cast<size_t>(18);
565+
return 18 + counts_.size() + 1; // values + counts + checksum
518566
}
519567

520568
void OperationWindow::print(std::ostream& os) const {

src/multio/action/statistics/OperationWindow.h

+7
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ class OperationWindow {
2424
long windowType);
2525

2626
long count() const;
27+
const std::vector<long>& counts() const;
28+
29+
template <typename T>
30+
void updateCounts(const T* values, size_t size, double missingValue) const;
2731

2832
void updateData(const eckit::DateTime& currentPoint);
2933
void updateWindow(const eckit::DateTime& startPoint, const eckit::DateTime& endPoint);
@@ -114,8 +118,11 @@ class OperationWindow {
114118

115119
long timeStepInSeconds_;
116120
long count_;
121+
mutable std::vector<long> counts_;
117122
long type_;
118123

124+
void initCountsLazy(size_t sz) const;
125+
119126
void serialize(IOBuffer& currState, const std::string& fname, const StatisticsOptions& opt) const;
120127
void deserialize(const IOBuffer& currState, const std::string& fname, const StatisticsOptions& opt);
121128

src/multio/action/statistics/cfg/StatisticsOptions.cc

+26-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ StatisticsOptions::StatisticsOptions(const config::ComponentConfiguration& compC
1919
restartLib_{"fstream_io"},
2020
logPrefix_{"Plan"},
2121
windowType_{"forward-offset"},
22-
accumulatedFieldsResetFreqency_{"month"} {
22+
accumulatedFieldsResetFreqency_{"month"},
23+
valueCountThreshold_{} {
2324
// Dump usage
2425
if (compConf.parsedConfig().has("help")) {
2526
usage();
@@ -45,6 +46,7 @@ StatisticsOptions::StatisticsOptions(const config::ComponentConfiguration& compC
4546
parseLogPrefix(compConf, options);
4647
parseWindowType(compConf, options);
4748
parseSolverResetAccumulatedFields(compConf, options);
49+
parseValueCountThreshold(compConf, options);
4850
}
4951

5052

@@ -218,6 +220,24 @@ void StatisticsOptions::parseSolverResetAccumulatedFields(const config::Componen
218220
return;
219221
};
220222

223+
void StatisticsOptions::parseValueCountThreshold(const config::ComponentConfiguration& compConf,
224+
const eckit::LocalConfiguration& cfg) {
225+
long threshold = stol(compConf.multioConfig().replaceCurly(cfg.getString("value-count-threshold", "-1")));
226+
227+
if (threshold == -1) {
228+
valueCountThreshold_ = std::nullopt;
229+
return;
230+
}
231+
if (threshold > 0) {
232+
valueCountThreshold_ = threshold;
233+
return;
234+
}
235+
236+
std::ostringstream os;
237+
os << "Invalid value count threshold :: " << threshold << " (must be unset, -1 or positive value)" << std::endl;
238+
throw eckit::UserError(os.str(), Here());
239+
}
240+
221241

222242
const std::string& StatisticsOptions::logPrefix() const {
223243
return logPrefix_;
@@ -287,6 +307,11 @@ const std::string& StatisticsOptions::solverResetAccumulatedFields() const {
287307
};
288308

289309

310+
std::optional<long> StatisticsOptions::valueCountThreshold() const {
311+
return valueCountThreshold_;
312+
}
313+
314+
290315
void StatisticsOptions::dumpOptions() {
291316
// TODO: Implement this function
292317
return;

src/multio/action/statistics/cfg/StatisticsOptions.h

+5
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ class StatisticsOptions {
3333
std::string windowType_;
3434
std::string accumulatedFieldsResetFreqency_;
3535

36+
std::optional<long> valueCountThreshold_;
37+
3638
private:
3739
void parseUseDateTime(const eckit::LocalConfiguration& cfg);
3840
void parseCheckMissingValues(const eckit::LocalConfiguration& cfg);
@@ -51,6 +53,7 @@ class StatisticsOptions {
5153
void parseWindowType(const config::ComponentConfiguration& compConf, const eckit::LocalConfiguration& cfg);
5254
void parseSolverResetAccumulatedFields(const config::ComponentConfiguration& compConf,
5355
const eckit::LocalConfiguration& cfg);
56+
void parseValueCountThreshold(const config::ComponentConfiguration& compConf, const eckit::LocalConfiguration& cfg);
5457

5558
void dumpOptions();
5659
void usage();
@@ -80,6 +83,8 @@ class StatisticsOptions {
8083
const std::string& restartPrefix() const;
8184
const std::string& restartLib() const;
8285
const std::string& solverResetAccumulatedFields() const;
86+
87+
std::optional<long> valueCountThreshold() const;
8388
};
8489

8590
} // namespace multio::action

src/multio/action/statistics/operations/Accumulate.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ class Accumulate final : public OperationWithData<T> {
4949
void updateWithMissing(const T* val, const StatisticsConfiguration& cfg) {
5050
double m = cfg.missingValue();
5151
std::transform(values_.begin(), values_.end(), val, values_.begin(),
52-
[m](T v1, T v2) { return static_cast<T>(m == v2 ? m : v1 + v2); });
52+
[m](T v1, T v2) { return static_cast<T>(m == v1 || m == v2 ? m : v1 + v2); });
5353
return;
5454
}
5555

src/multio/action/statistics/operations/Average.h

+41-7
Original file line numberDiff line numberDiff line change
@@ -26,33 +26,67 @@ class Average final : public OperationWithData<T> {
2626
void compute(eckit::Buffer& buf, const StatisticsConfiguration& cfg) override {
2727
checkTimeInterval(cfg);
2828
LOG_DEBUG_LIB(LibMultio) << logHeader_ << ".compute().count=" << win_.count() << std::endl;
29-
buf.copy(values_.data(), values_.size() * sizeof(T));
29+
auto val = static_cast<T*>(buf.data());
30+
cfg.bitmapPresent() && cfg.options().valueCountThreshold() ? computeWithThreshold(val, cfg) : computeWithoutThreshold(val, cfg);
3031
return;
3132
}
3233

3334
void updateData(const void* data, long sz, const StatisticsConfiguration& cfg) override {
3435
checkSize(sz, cfg);
3536
LOG_DEBUG_LIB(LibMultio) << logHeader_ << ".update().count=" << win_.count() << std::endl;
3637
const T* val = static_cast<const T*>(data);
37-
cfg.bitmapPresent() ? updateWithMissing(val, cfg) : updateWithoutMissing(val, cfg);
38+
cfg.bitmapPresent() ? (!cfg.options().valueCountThreshold() ? updateWithMissing(val, cfg) : updateWithMissingAndCounters(val, cfg)) : updateWithoutMissing(val, cfg);
3839
return;
3940
}
4041

4142
private:
43+
void computeWithoutThreshold(T* buf, const StatisticsConfiguration& cfg) {
44+
std::copy(values_.begin(), values_.end(), buf);
45+
return;
46+
}
47+
48+
void computeWithThreshold(T* buf, const StatisticsConfiguration& cfg) {
49+
const long t = cfg.options().valueCountThreshold().value();
50+
const double m = cfg.missingValue();
51+
const std::vector<long>& counts = win_.counts();
52+
std::transform(values_.begin(), values_.end(), counts.begin(), buf,
53+
[t, m](T v, long c) { return static_cast<T>(c < t ? m : v); });
54+
return;
55+
}
56+
4257
void updateWithoutMissing(const T* val, const StatisticsConfiguration& cfg) {
43-
const double c2 = icntpp(), c1 = sc(c2);
58+
const auto c2 = icntpp(win_.count());
59+
const auto c1 = sc(c2, win_.count());
4460
std::transform(values_.begin(), values_.end(), val, values_.begin(),
4561
[c1, c2](T v1, T v2) { return static_cast<T>(v1 * c1 + v2 * c2); });
4662
return;
4763
}
4864
void updateWithMissing(const T* val, const StatisticsConfiguration& cfg) {
49-
const double c2 = icntpp(), c1 = sc(c2), m = cfg.missingValue();
65+
const auto c2 = icntpp(win_.count());
66+
const auto c1 = sc(c2, win_.count());
67+
const auto m = cfg.missingValue();
5068
std::transform(values_.begin(), values_.end(), val, values_.begin(),
51-
[c1, c2, m](T v1, T v2) { return static_cast<T>(m == v2 ? m : v1 * c1 + v2 * c2); });
69+
[c1, c2, m](T v1, T v2) { return static_cast<T>(m == v1 || m == v2 ? m : v1 * c1 + v2 * c2); });
5270
return;
5371
}
54-
double icntpp() const { return double(1.0) / double(win_.count()); };
55-
double sc(double v) const { return double(win_.count() - 1) * v; };
72+
void updateWithMissingAndCounters(const T* val, const StatisticsConfiguration& cfg) {
73+
const double m = cfg.missingValue();
74+
win_.updateCounts(val, values_.size(), m);
75+
const std::vector<long>& counts = win_.counts();
76+
77+
for (size_t i = 0; i < values_.size(); ++i) {
78+
if (val[i] == m) { continue; }
79+
const auto c = counts[i];
80+
const auto c2 = icntpp(c);
81+
const auto c1 = sc(c2, c);
82+
values_[i] = values_[i] * c1 + val[i] * c2;
83+
}
84+
return;
85+
}
86+
87+
double icntpp(long c) const { return static_cast<double>(1.0) / static_cast<double>(c); };
88+
double sc(double v, long c) const { return static_cast<double>(c - 1) * v; };
89+
5690
void print(std::ostream& os) const override { os << logHeader_; }
5791
};
5892

src/multio/action/statistics/operations/DeAccumulate.h

+2-16
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class DeAccumulate final : public OperationWithDeaccumulatedData<T> {
3737
checkSize(sz, cfg);
3838
LOG_DEBUG_LIB(LibMultio) << logHeader_ << ".update().count=" << win_.count() << std::endl;
3939
const T* val = static_cast<const T*>(data);
40-
cfg.bitmapPresent() ? updateWithMissing(val, cfg) : updateWithoutMissing(val, cfg);
40+
std::copy(val, val + (sz / sizeof(T)), values_.begin());
4141
return;
4242
}
4343

@@ -51,21 +51,7 @@ class DeAccumulate final : public OperationWithDeaccumulatedData<T> {
5151
void computeWithMissing(T* val, const StatisticsConfiguration& cfg) {
5252
double m = cfg.missingValue();
5353
std::transform(values_.begin(), values_.end(), initValues_.begin(), val,
54-
[m](T v1, T v2) { return static_cast<T>(m == v1 ? m : v1 - v2); });
55-
return;
56-
}
57-
58-
59-
void updateWithoutMissing(const T* val, const StatisticsConfiguration& cfg) {
60-
std::transform(values_.begin(), values_.end(), val, values_.begin(),
61-
[](T v1, T v2) { return static_cast<T>(v2); });
62-
return;
63-
}
64-
65-
void updateWithMissing(const T* val, const StatisticsConfiguration& cfg) {
66-
double m = cfg.missingValue();
67-
std::transform(values_.begin(), values_.end(), val, values_.begin(),
68-
[m](T v1, T v2) { return static_cast<T>(m == v2 ? m : v2); });
54+
[m](T v1, T v2) { return static_cast<T>(m == v1 || m == v2 ? m : v1 - v2); });
6955
return;
7056
}
7157

src/multio/action/statistics/operations/FixedWindowFluxAverage.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class FixedWindowFluxAverage final : public OperationWithDeaccumulatedData<T> {
4646
const double m = cfg.missingValue();
4747
const double c = static_cast<double>(1.0) / static_cast<double>(win_.count() * cfg.stepFreq() * cfg.timeStep());
4848
std::transform(values_.begin(), values_.end(), initValues_.begin(), buf,
49-
[c, m](T v1, T v2) { return static_cast<T>(m == v1 ? m : (v1 - v2) * c); });
49+
[c, m](T v1, T v2) { return static_cast<T>(m == v1 || m == v2 ? m : (v1 - v2) * c); });
5050
return;
5151
}
5252

0 commit comments

Comments
 (0)