Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 43 additions & 26 deletions src/data_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,29 @@ def two_hr_ceiling(dt):

self.next_boundary = two_hr_ceiling(time_now)

def send_rawacf_to_realtime(aveperiod_data):
"""
Sends rawacf data for the aveperiod to realtime.

:param aveperiod_data: Dict of SequenceData for each slice.
:type aveperiod_data: dict
"""
group = {}
for relevant_field in SliceData.type_fields('rawacf'):
data = getattr(aveperiod_data, relevant_field)

# Massage the data into the correct types
if isinstance(data, dict):
data = str(data)
elif isinstance(data, list):
if isinstance(data[0], str):
data = np.bytes_(data)
else:
data = np.array(data)
group[relevant_field] = data
full_dict = {epoch_milliseconds: group}
so.send_bytes(rt_dw['socket'], rt_dw['iden'], pickle.dumps(full_dict))

def write_file(tmp_file, aveperiod_data, two_hr_file_with_type, file_type):
"""
Writes the final data out to the location based on the type of file extension required
Expand Down Expand Up @@ -766,42 +789,20 @@ def write_file(tmp_file, aveperiod_data, two_hr_file_with_type, file_type):

self.write_hdf5_file(full_two_hr_file, aveperiod_data, epoch_milliseconds, file_type)

# Send rawacf data to realtime (if there is any)
if file_type == 'rawacf':
group = {}
for relevant_field in SliceData.type_fields(file_type):
data = getattr(aveperiod_data, relevant_field)

# Massage the data into the correct types
if isinstance(data, dict):
data = str(data)
elif isinstance(data, list):
if isinstance(data[0], str):
data = np.bytes_(data)
else:
data = np.array(data)
group[relevant_field] = data
full_dict = {epoch_milliseconds: group}
so.send_bytes(rt_dw['socket'], rt_dw['iden'], pickle.dumps(full_dict))

elif file_ext == 'json':
self.write_json_file(tmp_file, aveperiod_data, file_type)
elif file_ext == 'dmap':
self.write_dmap_file(tmp_file, aveperiod_data)

def write_correlations(aveperiod_data):
def reshape_correlations(aveperiod_data):
"""
Parses out any possible correlation data from message and writes to file. Some variables
are captured from outer scope.

main_acfs, intf_acfs, and xcfs are all passed to data_write for all sequences
individually. At this point, they will be combined into data for a single integration
time via averaging.
time via averaging. Modifies the dictionary passed in.

:param aveperiod_data: Dict of SequenceData for each slice.
:type aveperiod_data: dict
"""

main_acfs = data_parsing.mainacfs_accumulator
xcfs = data_parsing.xcfs_accumulator
intf_acfs = data_parsing.intfacfs_accumulator
Expand Down Expand Up @@ -864,6 +865,19 @@ def find_expectation_value(x):
slice_data.data_dimensions = np.array([len(slice_data.beam_nums), slice_data.num_ranges,
slice_data.lags.shape[0]], dtype=np.uint32)

def write_correlations(aveperiod_data):
"""
Parses out any possible correlation data from message and writes to file. Some variables
are captured from outer scope.

main_acfs, intf_acfs, and xcfs are all passed to data_write for all sequences
individually. At this point, they will be combined into data for a single integration
time via averaging.

:param aveperiod_data: Dict of SequenceData for each slice.
:type aveperiod_data: dict
"""
for slice_num, slice_data in aveperiod_data.items():
name = dataset_name.format(sliceid=slice_num, dformat="rawacf")
output_file = dataset_location.format(name=name)
two_hr_file_with_type = self.slice_filenames[slice_num].format(ext="rawacf")
Expand Down Expand Up @@ -1110,8 +1124,11 @@ def write_tx_data():

all_slice_data[rx_channel.slice_id] = parameters

if write_rawacf and data_parsing.mainacfs_available:
write_correlations(all_slice_data)
if data_parsing.mainacfs_available:
reshape_correlations(all_slice_data)
send_rawacf_to_realtime(all_slice_data)
if write_rawacf:
write_correlations(all_slice_data)
if write_bfiq and data_parsing.bfiq_available:
write_bfiq_params(all_slice_data)
if write_antenna_iq and data_parsing.antenna_iq_available:
Expand Down