From f40cefee2f3be24eb05ad0695a6806518fbad0fc Mon Sep 17 00:00:00 2001 From: Remington Rohel Date: Tue, 26 Mar 2024 11:07:35 -0600 Subject: [PATCH] Changes to serve rawacf data always. * Rawacf data is always generated, so it can always be served even in non-writing experiments. * Refactored write_correlations() function in data_write.py into multiple functions so reshaping arrays and serving data can be called separately. --- src/data_write.py | 69 +++++++++++++++++++++++++++++------------------ 1 file changed, 43 insertions(+), 26 deletions(-) diff --git a/src/data_write.py b/src/data_write.py index e9a0a250a..06fefd16b 100644 --- a/src/data_write.py +++ b/src/data_write.py @@ -722,6 +722,29 @@ def two_hr_ceiling(dt): self.next_boundary = two_hr_ceiling(time_now) + def send_rawacf_to_realtime(aveperiod_data): + """ + Sends rawacf data for the aveperiod to realtime. + + :param aveperiod_data: Dict of SequenceData for each slice. + :type aveperiod_data: dict + """ + group = {} + for relevant_field in SliceData.type_fields('rawacf'): + data = getattr(aveperiod_data, relevant_field) + + # Massage the data into the correct types + if isinstance(data, dict): + data = str(data) + elif isinstance(data, list): + if isinstance(data[0], str): + data = np.bytes_(data) + else: + data = np.array(data) + group[relevant_field] = data + full_dict = {epoch_milliseconds: group} + so.send_bytes(rt_dw['socket'], rt_dw['iden'], pickle.dumps(full_dict)) + def write_file(tmp_file, aveperiod_data, two_hr_file_with_type, file_type): """ Writes the final data out to the location based on the type of file extension required @@ -766,42 +789,20 @@ def write_file(tmp_file, aveperiod_data, two_hr_file_with_type, file_type): self.write_hdf5_file(full_two_hr_file, aveperiod_data, epoch_milliseconds, file_type) - # Send rawacf data to realtime (if there is any) - if file_type == 'rawacf': - group = {} - for relevant_field in SliceData.type_fields(file_type): - data = getattr(aveperiod_data, relevant_field) - - # Massage the data into the correct types - if isinstance(data, dict): - data = str(data) - elif isinstance(data, list): - if isinstance(data[0], str): - data = np.bytes_(data) - else: - data = np.array(data) - group[relevant_field] = data - full_dict = {epoch_milliseconds: group} - so.send_bytes(rt_dw['socket'], rt_dw['iden'], pickle.dumps(full_dict)) - elif file_ext == 'json': self.write_json_file(tmp_file, aveperiod_data, file_type) elif file_ext == 'dmap': self.write_dmap_file(tmp_file, aveperiod_data) - def write_correlations(aveperiod_data): + def reshape_correlations(aveperiod_data): """ - Parses out any possible correlation data from message and writes to file. Some variables - are captured from outer scope. - main_acfs, intf_acfs, and xcfs are all passed to data_write for all sequences individually. At this point, they will be combined into data for a single integration - time via averaging. + time via averaging. Modifies the dictionary passed in. :param aveperiod_data: Dict of SequenceData for each slice. :type aveperiod_data: dict """ - main_acfs = data_parsing.mainacfs_accumulator xcfs = data_parsing.xcfs_accumulator intf_acfs = data_parsing.intfacfs_accumulator @@ -864,6 +865,19 @@ def find_expectation_value(x): slice_data.data_dimensions = np.array([len(slice_data.beam_nums), slice_data.num_ranges, slice_data.lags.shape[0]], dtype=np.uint32) + def write_correlations(aveperiod_data): + """ + Parses out any possible correlation data from message and writes to file. Some variables + are captured from outer scope. + + main_acfs, intf_acfs, and xcfs are all passed to data_write for all sequences + individually. At this point, they will be combined into data for a single integration + time via averaging. + + :param aveperiod_data: Dict of SequenceData for each slice. + :type aveperiod_data: dict + """ + for slice_num, slice_data in aveperiod_data.items(): name = dataset_name.format(sliceid=slice_num, dformat="rawacf") output_file = dataset_location.format(name=name) two_hr_file_with_type = self.slice_filenames[slice_num].format(ext="rawacf") @@ -1110,8 +1124,11 @@ def write_tx_data(): all_slice_data[rx_channel.slice_id] = parameters - if write_rawacf and data_parsing.mainacfs_available: - write_correlations(all_slice_data) + if data_parsing.mainacfs_available: + reshape_correlations(all_slice_data) + send_rawacf_to_realtime(all_slice_data) + if write_rawacf: + write_correlations(all_slice_data) if write_bfiq and data_parsing.bfiq_available: write_bfiq_params(all_slice_data) if write_antenna_iq and data_parsing.antenna_iq_available: