Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: RoboDoig/pypulse
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: master
Choose a base ref
...
head repository: warnerwarner/pypulse
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: master
Choose a head ref
Able to merge. These branches can be automatically merged.

Commits on Jan 17, 2019

  1. Add anti pulse

    warnerwarner committed Jan 17, 2019
    Copy the full SHA
    6ad20ca View commit details
  2. Debugging

    warnerwarner committed Jan 17, 2019
    Copy the full SHA
    397a4b5 View commit details
  3. add int casting

    warnerwarner committed Jan 17, 2019
    Copy the full SHA
    703754c View commit details
  4. fixed typo

    warnerwarner committed Jan 17, 2019
    Copy the full SHA
    814cefe View commit details
  5. casting to int

    warnerwarner committed Jan 17, 2019
    Copy the full SHA
    a1f4044 View commit details
  6. removed length prints

    warnerwarner committed Jan 17, 2019
    Copy the full SHA
    5b523b2 View commit details

Commits on Jun 18, 2019

  1. Add binary pulse

    warnerwarner committed Jun 18, 2019
    Copy the full SHA
    980f195 View commit details

Commits on Jun 19, 2019

  1. Add binary pulse type

    warnerwarner committed Jun 19, 2019
    Copy the full SHA
    ce65c13 View commit details
  2. Correct parmas to params

    warnerwarner committed Jun 19, 2019
    Copy the full SHA
    b58a4fe View commit details
  3. correct var names

    warnerwarner committed Jun 19, 2019
    Copy the full SHA
    43d11fc View commit details
  4. ditto

    warnerwarner committed Jun 19, 2019
    Copy the full SHA
    92a90d2 View commit details
  5. Copy the full SHA
    8994bd9 View commit details
  6. remove resampling

    warnerwarner committed Jun 19, 2019
    Copy the full SHA
    ab52c12 View commit details

Commits on Jul 12, 2019

  1. update binary pulse generation

    Now takes a value between 2^num_of_bins and 0, instead of binary array
    warnerwarner committed Jul 12, 2019
    Copy the full SHA
    216a53c View commit details

Commits on Feb 25, 2020

  1. Copy the full SHA
    d817f9d View commit details
  2. Copy the full SHA
    f732344 View commit details
  3. Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
    Copy the full SHA
    1d2cb75 View commit details

Commits on Feb 27, 2020

  1. remove print params

    warnerwarner committed Feb 27, 2020
    Copy the full SHA
    af07d80 View commit details

Commits on Jun 17, 2020

  1. Copy the full SHA
    4f6e631 View commit details
  2. more int castings

    warnerwarner committed Jun 17, 2020
    Copy the full SHA
    2c523e8 View commit details

Commits on Feb 19, 2021

  1. Add invert valve option

    warnerwarner committed Feb 19, 2021
    Copy the full SHA
    38846db View commit details
  2. Copy the full SHA
    3cbc6dd View commit details
  3. Copy the full SHA
    2e2da72 View commit details
  4. Copy the full SHA
    9927acf View commit details
  5. debug print

    warnerwarner committed Feb 19, 2021
    Copy the full SHA
    90787b6 View commit details
  6. invert full pulse

    warnerwarner committed Feb 19, 2021
    Copy the full SHA
    c567a0c View commit details
  7. attempt 2 at full invert

    warnerwarner committed Feb 19, 2021
    Copy the full SHA
    8aecb30 View commit details
  8. attempt numero three

    warnerwarner committed Feb 19, 2021
    Copy the full SHA
    9199280 View commit details
  9. 4

    warnerwarner committed Feb 19, 2021
    Copy the full SHA
    ce2acac View commit details
  10. retry 5

    warnerwarner committed Feb 19, 2021
    Copy the full SHA
    1cdc7b2 View commit details
  11. Copy the full SHA
    bea8223 View commit details

Commits on Mar 1, 2021

  1. switched valve inversion

    Updated valve inversion so that the pulses are unchanged but that the onset and offset are the only thing inverted
    warnerwarner committed Mar 1, 2021
    Copy the full SHA
    03bbcf1 View commit details
  2. print debug removal

    warnerwarner committed Mar 1, 2021
    Copy the full SHA
    1599680 View commit details

Commits on Mar 2, 2021

  1. Copy the full SHA
    db5800b View commit details
  2. Copy the full SHA
    6fc9d76 View commit details

Commits on May 3, 2021

  1. Copy the full SHA
    7068bdd View commit details

Commits on Feb 11, 2022

  1. fix inversion

    warnerwarner committed Feb 11, 2022
    Copy the full SHA
    47afe23 View commit details
Showing with 165 additions and 40 deletions.
  1. +131 −35 PulseGeneration.py
  2. +34 −5 PulseInterface.py
166 changes: 131 additions & 35 deletions PulseGeneration.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import scipy.signal as signal
import numpy as np
import matplotlib.pyplot as plt
import scipy.io as sio


def square_pulse(sampling_rate, duration, frequency, duty):
t = np.linspace(0, duration, sampling_rate * duration, endpoint=False)
return (np.array(signal.square(2 * np.pi * frequency * t, duty=duty)) / 2) + 0.5, t
def square_pulse(sampling_rate, duration, frequency, duty, inverted=False):
t = np.linspace(0, duration, int(sampling_rate * duration), endpoint=False)
square_pulse = (np.array(signal.square(2 * np.pi * frequency * t, duty=duty)) / 2) + 0.5
if inverted:
square_pulse = -square_pulse + 1
return square_pulse, t


def extended_square_pulse(sampling_rate, duration, frequency, duty):
@@ -25,17 +27,20 @@ def extended_square_pulse(sampling_rate, duration, frequency, duty):
return pulse, t


def shatter_pulse(sampling_rate, duration, frequency, duty, shatter_frequency, shatter_duty):
def shatter_pulse(sampling_rate, duration, frequency, duty, shatter_frequency, shatter_duty, inverted=False):

if shatter_frequency < frequency:
raise ValueError('Shatter frequency must not be lower than major frequency.')

t = np.linspace(0, duration, sampling_rate * duration, endpoint=False)

guide_pulse, _ = square_pulse(sampling_rate, duration, frequency, duty)
t = np.linspace(0, duration, int(sampling_rate * duration), endpoint=False)
if inverted:
shatter_duty = 1-shatter_duty
guide_pulse, _ = square_pulse(sampling_rate, duration, frequency, duty, inverted=False)
shattered_pulse = (np.array(signal.square(2 * np.pi * shatter_frequency * t, duty=shatter_duty)) / 2) + 0.5

return guide_pulse * shattered_pulse, t
out_pulse = guide_pulse * shattered_pulse
if inverted:
out_pulse = out_pulse * -1 + 1
return out_pulse, t


def random_shatter_pulse(sampling_rate, duration, frequency, duty, shatter_frequency, target_duty, amp_min, amp_max, extend=False):
@@ -51,7 +56,7 @@ def random_shatter_pulse(sampling_rate, duration, frequency, duty, shatter_frequ
else:
guide_pulse, _ = square_pulse(sampling_rate, duration, frequency, duty)

t = np.linspace(0, duration, sampling_rate * duration, endpoint=False)
t = np.linspace(0, duration, int(sampling_rate * duration), endpoint=False)

if target_duty == 1.0:
return guide_pulse, t
@@ -108,8 +113,12 @@ def random_simple_pulse(sampling_rate, params):
pulse, t = square_pulse(sampling_rate, duration, frequency, duty)

# Attach onset and offset
onset = np.zeros(int(sampling_rate * params['onset']))
offset = np.zeros(int(sampling_rate * params['offset']))
if params['inversion']:
onset = np.ones(int(sampling_rate * params['onset']))
offset = np.ones(int(sampling_rate * params['offset']))
else:
onset = np.zeros(int(sampling_rate * params['onset']))
offset = np.zeros(int(sampling_rate * params['offset']))

# if we want to shadow the pulse, add this in here (repeat the pulse at a compensating duty)
if params['shadow']:
@@ -180,8 +189,12 @@ def spec_time_pulse(sampling_rate, params):
pulse = pulse[::-1]

# Attach onset and offset
onset = np.zeros(int(sampling_rate * params['onset']))
offset = np.zeros(int(sampling_rate * params['offset']))
if params['inversion']:
onset = np.ones(int(sampling_rate * params['onset']))
offset = np.ones(int(sampling_rate * params['offset']))
else:
onset = np.zeros(int(sampling_rate * params['onset']))
offset = np.zeros(int(sampling_rate * params['offset']))

pulse = np.hstack((onset, pulse, offset))

@@ -212,21 +225,25 @@ def simple_pulse(sampling_rate, params):
duration = (1.0 / frequency) * params['repeats']

if params['isClean']:
pulse, t = square_pulse(sampling_rate, duration, frequency, duty)
pulse, t = square_pulse(sampling_rate, duration, frequency, duty, inverted=params['inversion'])
else:
assert params['isShatter']
pulse, t = shatter_pulse(sampling_rate, duration, frequency, duty, params['shatter_frequency'],
params['shatter_duty'])
params['shatter_duty'], inverted=params['inversion'])

# Attach onset and offset
onset = np.zeros(int(sampling_rate * params['onset']))
offset = np.zeros(int(sampling_rate * params['offset']))
if params['inversion']:
onset = np.ones(int(int(sampling_rate * params['onset'])))
offset = np.ones(int(int(sampling_rate * params['offset'])))
else:
onset = np.zeros(int(int(sampling_rate * params['onset'])))
offset = np.zeros(int(int(sampling_rate * params['offset'])))

pulse = np.hstack((onset, pulse, offset))

total_length = round(duration + params['onset'] + params['offset'], 10) # N.B. Have to round here due to floating point representation problem

return pulse, np.linspace(0, total_length, total_length*sampling_rate)
return pulse, np.linspace(0, total_length, int(total_length*sampling_rate))


def multi_simple_pulse(sampling_rate, global_onset, global_offset, params_list):
@@ -264,7 +281,7 @@ def noise_pulse(sampling_rate, params):
amp_min = params['amp_min']
amp_max = params['amp_max']

t = np.linspace(0, duration, sampling_rate * duration)
t = np.linspace(0, duration, int(sampling_rate * duration))
np.random.seed(int(params['seed']))
while len(guide_pulse) < len(t):
rand_param = np.random.uniform(amp_min, amp_max)
@@ -275,8 +292,12 @@ def noise_pulse(sampling_rate, params):
pulse = (np.array(signal.square(2 * np.pi * params['shatter_frequency'] * t, duty=guide_pulse)) / 2) + 0.5

# Attach onset and offset
onset = np.zeros(sampling_rate * params['onset'])
offset = np.zeros(sampling_rate * params['offset'])
if params['inversion']:
onset = np.ones(int(sampling_rate * params['onset']))
offset = np.ones(int(sampling_rate * params['offset']))
else:
onset = np.zeros(int(sampling_rate * params['onset']))
offset = np.zeros(int(sampling_rate * params['offset']))

total_length = round(duration + params['onset'] + params['offset'], 10)
return np.hstack((onset, pulse, offset)), np.linspace(0, total_length, total_length * sampling_rate)
@@ -287,24 +308,97 @@ def plume_pulse(sampling_rate, params):
plume = plume['plume'][0]

# resample to match sampling rate
resampled = signal.resample(plume, len(plume)*(sampling_rate / params['data_fs']))
resampled = signal.resample(plume, int(len(plume)*(sampling_rate / params['data_fs'])))
# zero out negative values
resampled[resampled < 0] = 0
# normalise
resampled = (resampled - min(resampled)) / (max(resampled) - min(resampled))
resampled = resampled * params['target_max']

duration = len(resampled) / sampling_rate
t = np.linspace(0, duration, sampling_rate * duration)
t = np.linspace(0, duration, int(sampling_rate * duration))
pulse = (np.array(signal.square(2 * np.pi * params['shatter_frequency'] * t, duty=resampled)) / 2) + 0.5
print(len(pulse))

# Attach onset and offset
onset = np.zeros(int(sampling_rate * params['onset']))
offset = np.zeros(int(sampling_rate * params['offset']))
if params['inversion']:
onset = np.ones(int(sampling_rate * params['onset']))
offset = np.ones(int(sampling_rate * params['offset']))
else:
onset = np.zeros(int(sampling_rate * params['onset']))
offset = np.zeros(int(sampling_rate * params['offset']))

total_length = round(params['onset'] + params['offset'] + len(pulse) / sampling_rate, 10)
return np.hstack((onset, pulse, offset)), np.linspace(0, total_length, total_length * sampling_rate)
return np.hstack((onset, pulse, offset)), np.linspace(0, total_length, int(total_length * sampling_rate))

def anti_plume_pulse(sampling_rate, params):
plume = sio.loadmat(params['data_path'])
plume = plume['plume'][0]

# resample to match sampling rate
resampled = signal.resample(plume, int(len(plume)*(sampling_rate / params['data_fs'])))
# zero out negative values
resampled[resampled < 0] = 0
# normalise
resampled = (resampled - min(resampled)) / (max(resampled) - min(resampled))
resampled = resampled * params['target_max']

duration = len(resampled) / sampling_rate
t = np.linspace(0, duration, int(sampling_rate * duration))
pulse = (np.array(signal.square(2 * np.pi * params['shatter_frequency'] * t, duty=resampled)) / 2) + 0.5
anti_pulse = [1 - i for i in pulse]

# Attach onset and offset
if params['inversion']:
onset = np.ones(int(int(sampling_rate * params['onset'])))
offset = np.ones(int(int(sampling_rate * params['offset'])))
else:
onset = np.zeros(int(int(sampling_rate * params['onset'])))
offset = np.zeros(int(int(sampling_rate * params['offset'])))

total_length = round(params['onset'] + params['offset'] + len(pulse) / sampling_rate, 10)
return np.hstack((onset, anti_pulse, offset)), np.linspace(0, total_length, int(total_length * sampling_rate))


def binary_pulse(sampling_rate, params):
'''
Creates a pulse from a value passed to it
'''

binary = params['value_to_binarise']

assert binary >= 0, 'Binary minimum should be zero'
assert binary <= 2**params['num_of_bins'] - 1, 'Binary maximum should be less than %s' % str(2**params['num_of_bins'])
bin_width = params['bin_size']*sampling_rate
binned = bin(binary)[2:]
while len(binned) < params['num_of_bins']:
binned = '0' + binned
bin_output = []
bin_pulse = [int(i) for i in binned for j in range(int(bin_width))]


duration = len(bin_pulse) / sampling_rate
t = np.linspace(0, duration, int(sampling_rate * duration))

if 'isShatter' in params:
if params['isShatter'] is True:
if params['inversion']:
shatter_duty = 1 - params['shatter_duty']
else:
shatter_duty = params['shatter_duty']
shattered_pulse = (np.array(signal.square(2 * np.pi * params["shatter_frequency"] * t, duty=shatter_duty)) / 2) + 0.5
bin_pulse = bin_pulse * shattered_pulse

bin_pulse = np.array(bin_pulse)
if params['inversion']:
onset = np.ones(int(sampling_rate * params['onset']))
offset = np.ones(int(sampling_rate * params['offset']))
bin_pulse = bin_pulse * -1 + 1
else:
onset = np.zeros(int(sampling_rate * params['onset']))
offset = np.zeros(int(sampling_rate * params['offset']))

total_length = round(params['onset'] + params['offset'] + len(bin_pulse)/sampling_rate, 10)
return np.hstack((onset, bin_pulse, offset)), np.linspace(0, total_length, int(total_length * sampling_rate))


def dummy_noise_pulse(sampling_rate, params):
@@ -322,18 +416,22 @@ def dummy_noise_pulse(sampling_rate, params):
amp_min = params['amp_min']
amp_max = params['amp_max']

t = np.linspace(0, duration, sampling_rate * duration)
t = np.linspace(0, duration, int(sampling_rate * duration))

guide_pulse = np.ones(sampling_rate*duration)

pulse = (np.array(signal.square(2 * np.pi * params['shatter_frequency'] * t, duty=guide_pulse)) / 2) + 0.5

# Attach onset and offset
onset = np.zeros(sampling_rate * params['onset'])
offset = np.zeros(sampling_rate * params['offset'])
if params['inversion']:
onset = np.ones(int(sampling_rate * params['onset']))
offset = np.ones(int(sampling_rate * params['offset']))
else:
onset = np.zeros(int(sampling_rate * params['onset']))
offset = np.zeros(int(sampling_rate * params['offset']))

total_length = round(duration + params['onset'] + params['offset'], 10)
return np.hstack((onset, pulse, offset)), np.linspace(0, total_length, total_length * sampling_rate)
return np.hstack((onset, pulse, offset)), np.linspace(0, total_length, int(total_length * sampling_rate))


def multi_noise_pulse(sampling_rate, global_onset, global_offset, params_list):
@@ -371,5 +469,3 @@ def multi_noise_pulse(sampling_rate, global_onset, global_offset, params_list):
# plt.xlim((-0.1, 2.1))
# plt.ylim((-0.1, 1.1))
# plt.show()


39 changes: 34 additions & 5 deletions PulseInterface.py
Original file line number Diff line number Diff line change
@@ -2,11 +2,15 @@
import numpy as np


def make_pulse(sampling_rate, global_onset, global_offset, params_list):
def make_pulse(sampling_rate, global_onset, global_offset, params_list, *,invert_chan_list=[]):
longest_t = []
pulses = list()

for params in params_list:
for param_index, params in enumerate(params_list):
if param_index in invert_chan_list:
params['inversion'] = True
else:
params['inversion'] = False
if params['type'] == 'Simple':
this_pulse, t = PulseGeneration.simple_pulse(sampling_rate, params)
elif params['type'] == 'Noise':
@@ -19,18 +23,43 @@ def make_pulse(sampling_rate, global_onset, global_offset, params_list):
this_pulse, t = PulseGeneration.plume_pulse(sampling_rate, params)
elif params['type'] == 'ContCorr':
this_pulse, t = PulseGeneration.spec_time_pulse(sampling_rate, params)
elif params['type'] == 'Anti Plume':
this_pulse, t = PulseGeneration.anti_plume_pulse(sampling_rate, params)
elif params['type'] == 'Binary':
this_pulse, t = PulseGeneration.binary_pulse(sampling_rate, params)
else:
raise ValueError

pulses.append(this_pulse)
if len(t) > len(longest_t):
longest_t = t

# pulse_matrix = []
# print(invert_chan_list)
# for pulse_index, this_pulse in enumerate(pulses):
# if pulse_index in invert_chan_list:
# full_pulse = np.ones(len(longest_t)+ int((global_onset + global_offset) * sampling_rate))
# else:
# full_pulse = np.zeros(len(longest_t)+ int((global_onset + global_offset) * sampling_rate))
# pulse_matrix.append(full_pulse)
# pulse_matrix = np.array(pulse_matrix)

pulse_matrix = np.zeros((len(pulses), len(longest_t) + int((global_onset + global_offset) * sampling_rate)))

# invert_vect = np.ones(len(pulses))
# offset_vect = np.zeros(len(pulses))
# for i in invert_chan_list:
# if i < len(invert_vect):
# invert_vect[i] = -1
# offset_vect[i] = 1
pulse_matrix = []
for pulse_index, pulse in enumerate(pulses):
if pulse_index in invert_chan_list:
pulse_matrix.append(np.ones(len(longest_t) + int((global_onset + global_offset) * sampling_rate)))
else:
pulse_matrix.append(np.zeros(len(longest_t) + int((global_onset + global_offset) * sampling_rate)))
pulse_matrix = np.array(pulse_matrix)
for p, pulse in enumerate(pulses):
pulse_matrix[p][int(global_onset * sampling_rate):int(global_onset * sampling_rate)+len(pulse)] = pulse

# pulse_matrix = pulse_matrix * invert_vect[:, np.newaxis] + offset_vect[:, np.newaxis]
t = np.linspace(0, pulse_matrix.shape[1] / sampling_rate, pulse_matrix.shape[1])

return pulse_matrix, t