Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion DAQController.cc
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ int DAQController::OpenThreads(){
fProcessingThreads.reserve(fNProcessingThreads);
for(int i=0; i<fNProcessingThreads; i++){
try {
fFormatters.emplace_back(std::make_unique<StraxFormatter>(fOptions, fLog));
fFormatters.emplace_back(std::make_unique<StraxFormatter>(fOptions, fLog, fDigitizers));
fProcessingThreads.emplace_back(&StraxFormatter::Process, fFormatters.back().get());
} catch(const std::exception& e) {
fLog->Entry(MongoLog::Warning, "Error opening processing threads: %s",
Expand Down
15 changes: 14 additions & 1 deletion DAXHelpers.hh
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,25 @@ public:
DAXHelpers(){};
~DAXHelpers(){};

static unsigned int StringToHex(std::string str){
static unsigned int StringToHexOld(std::string str){
// This function takes ~360ns to run
std::stringstream ss(str);
u_int32_t result;
return ss >> std::hex >> result ? result : 0;
};

static unsigned int StringToHex(const std::string& str) {
// this function takes ~12ns to run
uint32_t result = 0;
int i = 0;
for (auto it = str.rbegin(); it != str.rend(); ++it) {
bool is_letter = *it & 0x40; // see ascii
int val = (!is_letter)*(*it-'0') + is_letter*(9+(*it & 0xF)); // branchless is 2x faster
result += (val << 4*i++); // nibble by nibble
}
return result;
}

const static int Idle = 0;
const static int Arming = 1;
const static int Armed = 2;
Expand Down
2 changes: 1 addition & 1 deletion Options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ int16_t Options::GetChannel(int bid, int cid){
return bson_options["channels"][boardstring][cid].get_int32().value;
}
catch(std::exception& e){
fLog->Entry(MongoLog::Error, "Failed to look up board %i ch %i", bid, cid);
fLog->Entry(MongoLog::Local, "Failed to look up board %i ch %i", bid, cid);
return -1;
}
}
Expand Down
65 changes: 24 additions & 41 deletions StraxFormatter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,45 +57,44 @@ const std::map<std::string, std::function<long(std::shared_ptr<std::string>&, st
{"delete", compress_devnull}
};

StraxFormatter::StraxFormatter(std::shared_ptr<Options>& opts, std::shared_ptr<MongoLog>& log){
StraxFormatter::StraxFormatter(std::shared_ptr<Options>& opts, std::shared_ptr<MongoLog>& log, const std::map<int, std::vector<std::shared_ptr<V1724>>>& digis){
fActive = true;
fChunkNameLength=6;
fStraxHeaderSize=24;
fBytesProcessed = 0;
fInputBufferSize = 0;
fOutputBufferSize = 0;
fProcTimeDP = fProcTimeEv = fProcTimeCh = fCompTime = 0.;
fOptions = opts;
fChunkLength = long(fOptions->GetDouble("strax_chunk_length", 5)*1e9); // default 5s
fChunkOverlap = long(fOptions->GetDouble("strax_chunk_overlap", 0.5)*1e9); // default 0.5s
fFragmentBytes = fOptions->GetInt("strax_fragment_payload_bytes", 110*2);
fChunkLength = long(opts->GetDouble("strax_chunk_length", 5)*1e9); // default 5s
fChunkOverlap = long(opts->GetDouble("strax_chunk_overlap", 0.5)*1e9); // default 0.5s
fFragmentBytes = opts->GetInt("strax_fragment_payload_bytes", 110*2);
fFullFragmentSize = fFragmentBytes + fStraxHeaderSize;
try {
fCompressor = compressors.at(fOptions->GetString("compressor", "lz4"));
fCompressor = compressors.at(opts->GetString("compressor", "lz4"));
} catch (...) {
fLog->Entry(MongoLog::Error, "Invalid compressor specified");
throw std::runtime_error("Invalid compressor");
}
fFullChunkLength = fChunkLength+fChunkOverlap;
fHostname = fOptions->Hostname();
std::string run_name;
fHostname = opts->Hostname();
const int run_name_length = 6;
int run_num = fOptions->GetInt("number", -1);
if (run_num == -1) run_name = "run";
else {
run_name = std::to_string(run_num);
if (run_name.size() < run_name_length)
run_name.insert(0, run_name_length - run_name.size(), int('0'));
}
std::string run_name(run_name_length+1, '\0');
fRunNumber = opts->GetInt("number", -1);
if (fRunNumber == -1) run_name = "run";
else sprintf(run_name.data(), "06d", fRunNumber); // run name length is 6

// cache channel map
for (const auto& link : digis) for (const auto& digi: link) for (unsigned ch = 0; ch < digi->GetNumChannels; ++ch)
fChannelMap[digi->fBID()].push_back(Options->GetChannel(digi->fBID(), ch));

fEmptyVerified = 0;
fLog = log;

fBufferNumChunks = fOptions->GetInt("strax_buffer_num_chunks", 2);
fWarnIfChunkOlderThan = fOptions->GetInt("strax_chunk_phase_limit", 2);
fBufferNumChunks = opts->GetInt("strax_buffer_num_chunks", 2);
fWarnIfChunkOlderThan = opts->GetInt("strax_chunk_phase_limit", 2);
fMutexWaitTime.reserve(1<<20);

std::string output_path = fOptions->GetString("strax_output_path", "./");
std::string output_path = opts->GetString("strax_output_path", "./");
try{
fs::path op(output_path);
op /= run_name;
Expand Down Expand Up @@ -133,25 +132,6 @@ void StraxFormatter::GetDataPerChan(std::map<int, int>& ret) {
return;
}

void StraxFormatter::GenerateArtificialDeadtime(int64_t timestamp, const std::shared_ptr<V1724>& digi) {
std::string fragment;
fragment.reserve(fFullFragmentSize);
timestamp *= digi->GetClockWidth(); // TODO nv
int32_t length = fFragmentBytes>>1;
int16_t sw = digi->SampleWidth(), channel = digi->GetADChannel(), zero = 0;
fragment.append((char*)&timestamp, sizeof(timestamp));
fragment.append((char*)&length, sizeof(length));
fragment.append((char*)&sw, sizeof(sw));
fragment.append((char*)&channel, sizeof(channel));
fragment.append((char*)&length, sizeof(length));
fragment.append((char*)&zero, sizeof(zero)); // fragment_i
fragment.append((char*)&zero, sizeof(zero)); // baseline
for (; length > 0; length--)
fragment.append((char*)&zero, sizeof(zero)); // wf
AddFragmentToBuffer(std::move(fragment), 0, 0);
return;
}

void StraxFormatter::ProcessDatapacket(std::unique_ptr<data_packet> dp){
// Take a buffer and break it up into one document per channel
auto it = dp->buff.begin();
Expand All @@ -174,7 +154,7 @@ void StraxFormatter::ProcessDatapacket(std::unique_ptr<data_packet> dp){
missed = false;
// this happens quite rarely, the chance of overwriting ourselves is vanishing
// but it's nice to be able to know why we missed an event
std::string filename = std::to_string(fOptions->GetInt("number", -1)) + "_missed";
std::string filename = std::to_string(fRunNumber) + "_missed";
std::ofstream fout(filename, std::ios::out | std::ios::binary);
fout.write((char*)dp->buff.data(), dp->buff.size()*sizeof(dp->buff[0]));
fout.close();
Expand Down Expand Up @@ -233,7 +213,7 @@ int StraxFormatter::ProcessChannel(std::u32string_view buff, int words_in_event,
uint32_t samples_in_pulse = wf.size()*sizeof(char32_t)/sizeof(uint16_t);
uint16_t sw = dp->digi->SampleWidth();
int samples_per_frag= fFragmentBytes>>1;
int16_t global_ch = fOptions->GetChannel(dp->digi->bid(), channel);
int16_t global_ch = fChannelMap[dp->digi->bid()][channel];
// Failing to discern which channel we're getting data from seems serious enough to throw
if(global_ch==-1)
throw std::runtime_error("Failed to parse channel map. I'm gonna just kms now.");
Expand Down Expand Up @@ -298,11 +278,14 @@ void StraxFormatter::AddFragmentToBuffer(std::string fragment, uint32_t ts, int

fOutputBufferSize += fFullFragmentSize;

if(!overlap){
auto chunks = {&fChunks, &fOverlaps};
(*chunks[overlap])[chunk_id].emplace_back(std::move(fragment));

/* if(!overlap){
fChunks[chunk_id].emplace_back(std::move(fragment));
} else {
fOverlaps[chunk_id].emplace_back(std::move(fragment));
}
}*/
}

int StraxFormatter::ReceiveDatapackets(std::list<std::unique_ptr<data_packet>>& in, int bytes) {
Expand Down
6 changes: 3 additions & 3 deletions StraxFormatter.hh
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class StraxFormatter{
*/

public:
StraxFormatter(std::shared_ptr<Options>&, std::shared_ptr<MongoLog>&);
StraxFormatter(std::shared_ptr<Options>&, std::shared_ptr<MongoLog>&, const std::map<int, std::vector<std::shared_ptr<V1724>>>&);
~StraxFormatter();

void Close(std::map<int,int>& ret);
Expand All @@ -71,7 +71,6 @@ private:
void WriteOutChunk(int);
void WriteOutChunks();
void End();
void GenerateArtificialDeadtime(int64_t, const std::shared_ptr<V1724>&);
void AddFragmentToBuffer(std::string, uint32_t, int);
std::vector<std::string> GetChunkNames(int);

Expand All @@ -89,10 +88,10 @@ private:
int fFullFragmentSize;
int fBufferNumChunks;
int fWarnIfChunkOlderThan;
int fRunNumber;
unsigned fChunkNameLength;
int64_t fFullChunkLength;
std::string fOutputPath, fHostname, fFullHostname;
std::shared_ptr<Options> fOptions;
std::shared_ptr<MongoLog> fLog;
std::atomic_bool fActive;
std::map<int, std::list<std::string>> fChunks, fOverlaps;
Expand All @@ -105,6 +104,7 @@ private:
std::map<int, long> fBytesPerChunk;
std::atomic_int fInputBufferSize, fOutputBufferSize;
long fBytesProcessed;
std::map<int, std::vector<int>> fChannelMap;

double fProcTimeDP, fProcTimeEv, fProcTimeCh, fCompTime;
std::thread::id fThreadId;
Expand Down
7 changes: 4 additions & 3 deletions V1724.cc
Original file line number Diff line number Diff line change
Expand Up @@ -334,9 +334,10 @@ std::tuple<int64_t, int, uint16_t, std::u32string_view> V1724::UnpackChannelHead
// More rollover logic here, because channels are independent and the
// processing is multithreaded. We leverage the fact that readout windows are
// short and polled frequently compared to the rollover timescale, so there
// will never be a large difference in timestamps in one data packet
if (ch_time > 15e8 && header_time < 5e8 && rollovers != 0) rollovers--;
else if (ch_time < 5e8 && header_time > 15e8) rollovers++;
// will never be a large difference in timestamps in one data packet.
// Allegedly
rollovers -= 1*(ch_time > 15e8 && header_time < 5e8 && rollovers != 0); // header rolled while channel hasn't
rollovers += 1*(ch_time < 5e8 && header_time > 15e8); // channel rolled while header hasn't
return {((rollovers<<31)+ch_time)*fClockCycle - fDelayPerCh[ch] - fPreTrigPerCh[ch], words, 0, sv.substr(2, words-2)};
}

Expand Down