-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
173 lines (129 loc) · 5.67 KB
/
utils.py
File metadata and controls
173 lines (129 loc) · 5.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
from functools import reduce
from scipy import signal
from scipy.fftpack import fftshift
#################
# FM demodulation
#################
def fm_demod(iq_samples):
# return same-length array
return np.insert(np.angle(iq_samples[1:] * np.conj(iq_samples[:-1])), 0, 0)
#######################
# Frame synchronization
#######################
def frame_start_correlations(signal, sync_word, samples_per_symbol):
i = 0
correlations = []
while i + len(sync_word) * samples_per_symbol < len(signal):
j = i
potential_line = []
for _ in range(len(sync_word)):
potential_line.append(signal[int(j)])
j += samples_per_symbol
correlations.append(reduce(lambda acc, b: acc + b[0] * b[1], zip(potential_line, sync_word), 0))
i += 1
return np.array(correlations)
def frame_start_correlations_vect(signal, sync_word, samples_per_symbol, relative=False):
# it's possible to optimize with vector operations when samples_per_symbol is an integer (or sufficiently close to integer)
samples_per_symbol = int(samples_per_symbol)
correlations_size = sum(len(signal[i::samples_per_symbol]) - len(sync_word) + 1 for i in range(samples_per_symbol))
correlations = np.zeros(correlations_size)
for i in range(samples_per_symbol):
signal_downsampled = signal[i::samples_per_symbol]
selected_correlations = np.convolve(signal_downsampled, sync_word, mode='valid')
if relative:
selected_correlations /= np.convolve(signal_downsampled ** 2, np.ones(len(sync_word)), mode='valid')
correlations[i::samples_per_symbol] = selected_correlations
return correlations
##########
# Plotting
##########
def lowpass(input_signal, sampling_freq_hz, cutoff_freq_hz, N=10):
b, a = signal.butter(N=N, Wn=cutoff_freq_hz, fs=sampling_freq_hz, btype='low', analog=False)
return signal.lfilter(b, a, input_signal)
def welch(samples, sample_rate, nper=1024, fsize=(20, 10)):
f, Pxx = signal.welch(samples, sample_rate, nperseg=nper, detrend=lambda x: x)
f, Pxx = fftshift(f), fftshift(Pxx)
ind = np.argsort(f)
f, Pxx = np.take_along_axis(f, ind, axis=0), np.take_along_axis(Pxx, ind, axis=0)
plt.figure(figsize=fsize)
plt.semilogy(f/1e3, Pxx)
plt.xlabel('f [kHz]')
plt.ylabel('PSD [Power/Hz]')
plt.grid()
plt.xticks(np.linspace(-sample_rate/2e3, sample_rate/2e3, 31))
plt.xlim(-sample_rate/2e3, sample_rate/2e3)
def read_iq(f_name):
real_sample = np.fromfile(f_name, dtype=np.int8)
return real_sample[::2] + 1j * real_sample[1::2]
def read_iq_gqrx(f_name):
real_sample = np.fromfile(f_name, dtype=np.float32)
return real_sample[::2] + 1j * real_sample[1::2]
def frequency_shift(samples, offset, sample_rate):
t_s = np.linspace(0, len(samples) / sample_rate, len(samples))
return samples * np.exp(2 * np.pi * offset * t_s * 1j)
def psd(signal, n_samples, offset=0):
signal = signal[offset:offset + n_samples] * np.hamming(n_samples)
fft = np.fft.fftshift(np.fft.fft(signal))
norm_mag_fft = 1 / n_samples * np.abs(fft)
return 20 * np.log10(norm_mag_fft)
def plot_signals(signals, start=0, width=None, figsize=(20, 10), normalize=True):
fig, ax = plt.subplots()
fig.set_size_inches(figsize)
for s in signals:
n_s = s / max(np.abs(s)) if normalize else s
ax.plot(n_s[start:-1 if width is None else start + width])
def plot_psd(psd_samples, fs):
f = np.arange(-fs/2, fs/2, fs/len(psd_samples))
plt.plot(f, psd_samples)
plt.xlabel('Frequency [Hz]')
plt.ylabel('PSD [dB]')
def plot_correlations(correlations, start=0, width=None, max_indexes=None):
fig, ax = plt.subplots()
fig.set_size_inches((20, 10))
ax.plot(correlations[start:-1 if width is None else start + width])
if max_indexes is not None:
indexes_on_plot = list(filter(lambda idx: idx >= start and (True if width is None else idx <= start + width), max_indexes))
ax.plot(indexes_on_plot, [correlations[idx] for idx in indexes_on_plot], '*')
# find best approximating indices in source for target values
# so, e.g. find_positions(x-axis labels we want, x-axis values in plot)
#
# O(n) because assumes both source and target sorted ascending
def find_positions(source, target):
positions = []
cur_target_i = 0
cur_approx_i = 0
i = 0
while i < len(source) and cur_target_i < len(target):
new_diff = np.abs(target[cur_target_i] - source[i])
if new_diff <= np.abs(target[cur_target_i] - source[cur_approx_i]):
cur_approx_i = i
else:
positions.append(cur_approx_i)
cur_target_i += 1
i -= 1
i += 1
return positions
def plot_spectrogram(signal, fs, chunk=1024):
t_step = 100
rows = [psd(signal[i:i + chunk], chunk) for i in range(0, len(signal), chunk * t_step)]
fig, ax = plt.subplots(1,1)
ax.imshow(np.stack(rows))
freq_order = 10 ** round(np.log10(fs / 2))
xlabels_pos = np.arange(0, fs / 2, freq_order / 10)
xlabels = np.concatenate([-np.flip(xlabels_pos)[:-1], xlabels_pos])
freqs = (np.arange(0, chunk) - chunk // 2) * fs / chunk
xticks = find_positions(freqs, xlabels)
ax.xaxis.set_major_locator(ticker.FixedLocator(xticks))
ax.set_xticklabels((xlabels / 1000).astype(int))
ax.set_xlabel('Frequency [kHz]')
t_max = len(signal) / fs
time_order = 10 ** round(np.log10(t_max))
ylabels = np.arange(0, t_max, t_max / 10).astype(int)
times = np.arange(0, t_max, chunk * t_step / fs)
yticks = find_positions(times, ylabels)
ax.yaxis.set_major_locator(ticker.FixedLocator(yticks))
ax.set_yticklabels(ylabels)
ax.set_ylabel('Time [s]')