-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest.py
More file actions
229 lines (192 loc) · 9.33 KB
/
test.py
File metadata and controls
229 lines (192 loc) · 9.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
from legacy.contracts import LegacyUserContract, Wallet, Ethereum
from user_app import store_file_in_ipfs
from beneficiary_app import read_file_from_ipfs
from util.cipher import AESCipher
from util.util import load_object, say
from secretsharing import PlaintextToHexSecretSharer
from cryptography.fernet import Fernet
from hashlib import sha256
import os, re
import config
# parameters
contract_name = 'test_contract'
owner = '0xnflu0c8ml2maxpc6h30r3am2qyaeok5labd1po5f'
# these are the accounts of the beneficiaries
addresses = [
'test_0x41k2qiianhyajppzd5avvrp12wbn42y0m2k70y5c',
'test_0x9q6v0r516hrk7kvn0ixojefu6diyoblvj5urlo70',
'test_0xn8b77b6nmo14ozrr00gcy0ccirlpyw2hs31a9k0f'
]
message_files = addresses
n = 3
k = 2
secret_messages =(
"Luke, I'm your father",
"Get my ETH. My private key is 3a1076bf45ab87712ad64ccb3b10217737f7faacbf2872e88fdd9a537d8fe266",
"My computer password is 3KS014Q. Do whatever you see fit with that"
)
share_of_funds = ('50%', '25%', '25%')
secret = "I've seen things you people wouldn't believe"
t_PoL = 90
init_balance = 100 # user has 100 eth to distribute
# params = {
# 'contract_name': 'test_contract',
# 'owner' = '0xnflu0c8ml2maxpc6h30r3am2qyaeok5labd1po5f',
# 'addresses': [ # these are the accounts of the beneficiaries
# 'test_0x41k2qiianhyajppzd5avvrp12wbn42y0m2k70y5c',
# 'test_0x9q6v0r516hrk7kvn0ixojefu6diyoblvj5urlo70',
# 'test_0xn8b77b6nmo14ozrr00gcy0ccirlpyw2hs31a9k0f'
# ],
# 'message_files': addresses,
# 'n': 3,
# 'k': 2,
# 'secret_messages': (
# "Luke, I'm your father",
# "Get my ETH. My private key is 3a1076bf45ab87712ad64ccb3b10217737f7faacbf2872e88fdd9a537d8fe266",
# "My computer password is 3KS014Q. Do whatever you see fit with that"
# ),
# 'share_of_funds': ('50%', '25%', '25%'),
# 'secret': "I've seen things you people wouldn't believe",
# 't_PoL': 90,
# 'init_balance': 100 # user has 100 eth to distribute
# }
# def create_test_scenario(params):
# # retrieve parameters
# contract_name = params['contract_name']
# owner = params['owner']
# addresses = params['addresses']
# message_files = params['message_files']
# n = params['n']
# k = params['k']
# secret_messages = params['secret_messages']
# share_of_funds = params['share_of_funds']
# secret = params['secret']
# t_PoL = params['t_PoL']
# init_balance = params['init_balance']
# secret_low = secret.lower()
# secret_pieces = PlaintextToHexSecretSharer.split_secret(secret_low, k, n)
# # create encryption object based on shared secret
# aes_cipher = AESCipher(secret_low)
# beneficiaries = []
# personal_keys = []
# enc_messages = []
# doub_enc_messages = []
# for i in range(n):
# wallet_i = Wallet(addresses[i]) # creates a wallet with random address
# wallet_i.save()
# personal_keys.append(Fernet.generate_key())
# cipher_suite = Fernet(personal_keys[i])
# enc_messages.append(cipher_suite.encrypt(secret_messages[i]))
# doub_enc_messages.append(aes_cipher.encrypt(enc_messages[i]))
# message_url_i = store_file_in_ipfs(doub_enc_messages[i], message_files[i])
# funds_share_i = share_of_funds[i]
# beneficiaries.append({'wallet_address': wallet_i.address, 'message_url': message_url_i,
# 'funds_share': funds_share_i, 'secret_piece_hash': sha256(secret_pieces[i]).hexdigest()})
# user_contract = LegacyUserContract(k, n, t_PoL, init_balance, beneficiaries, owner, contract_name)
# user_contract.save(contract_name)
# contract_state = hash(user_contract)
# return contract_state, secret_pieces, enc_messages, doub_enc_messages, personal_keys, beneficiaries
def create_test_scenario():
secret_low = secret.lower()
secret_pieces = PlaintextToHexSecretSharer.split_secret(secret_low, k, n)
# create encryption object based on shared secret
aes_cipher = AESCipher(secret_low)
beneficiaries = []
personal_keys = []
enc_messages = []
doub_enc_messages = []
for i in range(n):
wallet_i = Wallet(addresses[i]) # creates a wallet with random address
wallet_i.save()
personal_keys.append(Fernet.generate_key())
cipher_suite = Fernet(personal_keys[i])
enc_messages.append(cipher_suite.encrypt(secret_messages[i]))
doub_enc_messages.append(aes_cipher.encrypt(enc_messages[i]))
message_url_i = store_file_in_ipfs(doub_enc_messages[i], message_files[i])
funds_share_i = share_of_funds[i]
beneficiaries.append({'wallet_address': wallet_i.address, 'message_url': message_url_i,
'funds_share': funds_share_i, 'secret_piece_hash': sha256(secret_pieces[i]).hexdigest()})
user_contract = LegacyUserContract(k, n, t_PoL, init_balance, beneficiaries, owner, contract_name)
user_contract.save(contract_name)
contract_state = hash(user_contract)
return contract_state, secret_pieces, enc_messages, doub_enc_messages, personal_keys, beneficiaries
def purge_test_scenario():
# deletes all files in the data directory whose name start with 'test_'
for f in os.listdir(config.DATA_DIR):
file_path = os.path.join(config.DATA_DIR, f)
if os.path.isfile(file_path) and f.startswith("test_"):
os.remove(file_path)
if __name__ == '__main__':
contract_state, secret_pieces, enc_messages, doub_enc_messages, personal_keys, beneficiaries = create_test_scenario()
# test object load
user_contract = LegacyUserContract.load(contract_name)
if contract_state != hash(user_contract):
say("An error occurred while loading the contract object.", 2)
# try to recover secret with 1 out of 3 secret shares (should fail)
try:
recov_secret = PlaintextToHexSecretSharer.recover_secret(secret_pieces[0])
except:
pass
else:
say("This is weird. It shouldn't be possible to recover a secret with only one piece", 2)
# try to recover secret with 2 out of 3 secret shares (should work)
try:
recov_secret = PlaintextToHexSecretSharer.recover_secret(secret_pieces[0:2])
except:
say("Could not recover secret.", 2)
# check if recovered secret matches original one
if recov_secret != secret.lower():
say('Error. The recovered secret does not match the original one', 2)
# save secrets in contract and recover secret
for i in range(0, n):
user_contract.save_secret_piece(secret_pieces[i], beneficiaries[i]['wallet_address'])
user_contract.save(contract_name)
del user_contract
user_contract = LegacyUserContract.load(contract_name)
# after saving, secret should have been recovered
if recov_secret != user_contract.secret:
say("Error. Recovered secret mismatch.", 2)
# testing proof of life related functions
# since we just created the contract, user should be alive
if not user_contract.is_active():
say("Error. User should be alive", 2)
t0 = user_contract.PoL_limit
user_contract.proof_of_life()
if user_contract.PoL_limit - t0 != user_contract.t_PoL:
say("Error with proof_of_life() method", 2)
aes_cipher = AESCipher(recov_secret)
# decrypt messages (from array in memory)
for i in range(0, n):
# first decryption step
enc_message_i_prime = aes_cipher.decrypt(doub_enc_messages[i])
if enc_message_i_prime != enc_messages[i]:
say("Error, decrypted message using shared secret doesn't match original", 2)
print enc_message_i_prime + '\n' + enc_messages[i] + '\n'
# second decryption step
cipher_suite = Fernet(personal_keys[i])
message_i_prime = cipher_suite.decrypt(enc_message_i_prime)
if message_i_prime != secret_messages[i]:
say("Error, decrypted message using personal key doesn't match the original one", 2)
print message_i_prime + '\n' + secret_messages[i] + '\n'
# decrypt messages (from stored files)
# tests read_file_from_ipfs(index, filename="")
for i in range(n):
doub_enc_message = read_file_from_ipfs(message_files[i])
# first decryption step
enc_message_i_prime = aes_cipher.decrypt(doub_enc_message)
if enc_message_i_prime != enc_messages[i]:
say("Error, decrypted message using shared secret doesn't match original", 2)
print enc_message_i_prime + '\n' + enc_messages[i] + '\n'
# second decryption step
cipher_suite = Fernet(personal_keys[i])
message_i_prime = cipher_suite.decrypt(enc_message_i_prime)
if message_i_prime != secret_messages[i]:
say("Error, decrypted message using personal key doesn't match the original one", 2)
print message_i_prime + '\n' + secret_messages[i] + '\n'
# testing method get_beneficiary in contract:
b = user_contract.get_beneficiary('test_0x41k2qiianhyajppzd5avvrp12wbn42y0m2k70y5c')
if b['wallet_address'] != 'test_0x41k2qiianhyajppzd5avvrp12wbn42y0m2k70y5c':
say('Error in function get_beneficiary() of class LegacyUserContract', 2)
say("success!", 1)
# clean up data directory
purge_test_scenario()