-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
621 lines (505 loc) · 23.7 KB
/
Copy pathmain.py
File metadata and controls
621 lines (505 loc) · 23.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
from datetime import datetime
from typing import List, Optional, Dict, Any, Tuple
import logging
import json
import os
import random
import itertools
import time
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor
import argparse
from utils import *
# Setup logging
logfile = os.path.join("logs", datetime.now().strftime("%Y-%m-%d-%H-%M-%S") + "_main_results.log")
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(logfile),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Method mapping to match the utils.py functions
METHOD_MAPPING = {
"GiveErrorSummary": "give_error_summary",
"InductionAnnotation": "induction_annotation",
"InsertUnsafeCode": "insert_unsafe_code",
"FunctionClassRenamer": "rename_function_class_name",
"VariableRenamer": "rename_variable_name",
"StringObfuscator": "string_obfuscation",
"TryExceptWrapper": "try_except_wrapper",
"StopInference": "stop_inference"
}
LOCAL_MODEL_LIST = {
"deepseek-coder:6.7B",
"codegemma:7b-instruct",
"codellama:7b"
}
class Individual:
def __init__(self, sample: Dict, genes: List[str], obfuscated_code: str = ""):
self.sample = sample
self.genes = genes # List of method names in order
self.obfuscated_code = obfuscated_code
self.deobfuscated_code = {} # Dict mapping model_name to deobfuscated_code
self.quantification_scores = {}
self.adversarial_score = 0.0
def get_individual_info(self):
return {
"problem_id": self.sample.get('problem_id', 'unknown'),
"submission_id": self.sample.get('submission_id', 'unknown'),
"genes": self.genes,
}
def loss_function(quantification_scores: Dict) -> float:
score = 0.0
score_correct = 0.0
score_correct += quantification_scores.get('semantic_scores', 0)
score_correct += quantification_scores.get('syntax_scores', 0)
cc_scores = quantification_scores.get('cc_scores', 1)
length_ratio = quantification_scores.get('length_ratio', 1)
score_simplification = cc_scores + length_ratio
# Tends to make LLM analysis wrong
score = (0.7 * score_correct + 0.3 * score_simplification) / 2 # Normalize to [0, 1]
return score
def calculate_average_scores(all_scores: List[Dict[str, Any]]) -> Dict[str, Any]:
if not all_scores:
return {}
# Initialize average scores structure
avg_scores = {
"problem_id": all_scores[0].get("problem_id", "unknown"),
"submission_id": all_scores[0].get("submission_id", "unknown"),
"syntax_scores": 0.0,
"cc_scores": 0.0,
"length_ratio": 0.0,
"semantic_scores": 0.0,
"code_lengths": {
"original": all_scores[0].get("code_lengths", {}).get("original", 0),
"obfuscated": all_scores[0].get("code_lengths", {}).get("obfuscated", 0),
"deobfuscated": 0
},
"ratios": {
"obfuscation": all_scores[0].get("ratios", {}).get("obfuscation", 1.0),
"deobfuscation": 0.0,
"recovery": 0.0
}
}
# Sum up scores for averaging
valid_count = 0
for scores in all_scores:
if not scores:
continue
valid_count += 1
# Average numeric scores
avg_scores["syntax_scores"] += scores.get("syntax_scores", 0)
avg_scores["cc_scores"] += scores.get("cc_scores", 0)
avg_scores["length_ratio"] += scores.get("length_ratio", 0)
avg_scores["semantic_scores"] += scores.get("semantic_scores", 0)
# Average code lengths and ratios
code_lengths = scores.get("code_lengths", {})
avg_scores["code_lengths"]["deobfuscated"] += code_lengths.get("deobfuscated", 0)
ratios = scores.get("ratios", {})
avg_scores["ratios"]["deobfuscation"] += ratios.get("deobfuscation", 0)
avg_scores["ratios"]["recovery"] += ratios.get("recovery", 0)
# Calculate averages
if valid_count > 0:
avg_scores["syntax_scores"] /= valid_count
avg_scores["cc_scores"] /= valid_count
avg_scores["length_ratio"] /= valid_count
avg_scores["semantic_scores"] /= valid_count
avg_scores["code_lengths"]["deobfuscated"] = int(avg_scores["code_lengths"]["deobfuscated"] / valid_count)
avg_scores["ratios"]["deobfuscation"] /= valid_count
avg_scores["ratios"]["recovery"] /= valid_count
return avg_scores
def sample_selection(population: List[Individual], selection_ratio: float = 0.15) -> List[Individual]:
sorted_population = sorted(population, key=lambda x: x.adversarial_score, reverse=True)
# Select top percentage
selection_count = max(1, int(len(population) * selection_ratio))
return sorted_population[:selection_count]
def apply_obfuscation_sequence(sample: Dict, methods: List[str]) -> str:
logger.info(f"Applying obfuscation sequence: {methods}")
current_code = sample['code']
for method in methods:
mapped_method = METHOD_MAPPING[method]
logger.debug(f"Applying method: {method} -> {mapped_method}")
obfuscated_code = transform_code(sample, current_code, mapped_method)
if obfuscated_code is None:
logger.warning(f"Method {method} failed, using previous code")
continue
current_code = obfuscated_code
logger.debug(f"Obfuscated code: {obfuscated_code}")
logger.info(
"Completed obfuscation sequence for problem %s", sample.get('problem_id', 'unknown')
)
return current_code
def create_initial_population(sample: Dict, methods: List[str]) -> List[Individual]:
logger.info("Creating initial population")
population = []
for method_combination in itertools.combinations(methods, 1):
# For each combination, generate all permutations
for method_permutation in itertools.permutations(method_combination):
# Apply obfuscation sequence
obfuscated_code = apply_obfuscation_sequence(sample, list(method_permutation))
# logger.debug(f"Obfuscated code: {obfuscated_code}")
# Create individual
individual = Individual(sample, list(method_permutation), obfuscated_code)
population.append(individual)
logger.info(f"Created individual with genes: {method_permutation}")
logger.info(f"Created population of {len(population)} individuals")
return population
def deobfuscate_code_in_local_llms(code: str) -> Dict[str, str]:
deobfuscated_codes = {}
for model in LOCAL_MODEL_LIST:
try:
logger.info("Attempting deobfuscation with model %s", model)
is_local = model in LOCAL_MODEL_LIST
deobfuscated_code = deobfuscate_code(code, is_local=is_local, model_name=model)
if deobfuscated_code:
deobfuscated_codes[model] = deobfuscated_code
logger.info(f"Deobfuscation successful with model {model}")
logger.debug(f"Deobfuscated code: {deobfuscated_code}")
else:
deobfuscated_codes[model] = ""
logger.warning(f"Deobfuscation returned empty code with model {model}")
except Exception as e:
logger.error(f"Deobfuscation with model {model} failed: {e}")
continue
if deobfuscated_codes:
return deobfuscated_codes
else:
return {}
def save_population_evaluation_results(population: List[Individual], population_id: str):
# Create timestamp and date folder
now = datetime.now()
timestamp = now.strftime("%Y-%m-%d-%H-%M-%S")
date_folder = now.strftime("%Y-%m-%d")
# Get problem_id and submission_id for directory structure
problem_id = "unknown"
submission_id = "unknown"
if population and population[0].sample:
problem_id = population[0].sample.get('problem_id', 'unknown')
submission_id = population[0].sample.get('submission_id', 'unknown')
# Create directory structure: results/evaluate_population/YYYY-MM-DD/problem_id_submission_id/
sample_folder = f"{problem_id}_{submission_id}"
eval_results_dir = os.path.join("results", "evaluate_population", date_folder, sample_folder)
os.makedirs(eval_results_dir, exist_ok=True)
# Generate filename based on population_id and timestamp
filename = f"{population_id}_{timestamp}.jsonl"
filepath = os.path.join(eval_results_dir, filename)
# Use already extracted sample info
sample_info = {
"problem_id": problem_id,
"submission_id": submission_id
}
try:
with open(filepath, 'w', encoding='utf-8') as f:
# Write population metadata
population_metadata = {
"record_type": "population_metadata",
"population_id": population_id,
"timestamp": timestamp,
"total_individuals": len(population),
"sample_info": sample_info
}
f.write(json.dumps(population_metadata, ensure_ascii=False) + '\n')
# Write individual evaluation results
for i, individual in enumerate(population):
individual_data = {
"record_type": "individual_evaluation",
"population_id": population_id,
"timestamp": timestamp,
"individual_index": i,
"genes": individual.genes,
"obfuscated_code": individual.obfuscated_code,
"adversarial_score": individual.adversarial_score,
"quantification_scores": individual.quantification_scores,
"deobfuscated_code": individual.deobfuscated_code,
"sample_info": {
"problem_id": individual.sample.get('problem_id', 'unknown') if individual.sample else 'unknown',
"submission_id": individual.sample.get('submission_id', 'unknown') if individual.sample else 'unknown'
}
}
f.write(json.dumps(individual_data, ensure_ascii=False) + '\n')
logger.info(f"Saved population evaluation results to {filepath}")
logger.info(f"Population {population_id}: {len(population)} individuals evaluated")
except Exception as e:
logger.error(f"Error saving population evaluation results to {filepath}: {e}")
raise
def evaluate_population(population: List[Individual], population_id: str = "unknown") -> None:
logger.info(f"Evaluating population of {len(population)} individuals")
for i, individual in enumerate(population):
logger.debug(f"Evaluating individual {i+1}/{len(population)}")
if individual.obfuscated_code is None:
logger.warning(f"Individual {i} has no obfuscated code, skipping")
individual.adversarial_score = 0.0
continue
# Deobfuscate the code using multiple local LLMs
logger.debug(
"Starting deobfuscation for individual %d with genes %s", i, individual.genes
)
individual.deobfuscated_code = deobfuscate_code_in_local_llms(individual.obfuscated_code)
if individual.deobfuscated_code is None or not individual.deobfuscated_code:
logger.warning(f"Deobfuscation failed for individual {i}")
individual.adversarial_score = 1.0 # High score for failed deobfuscation
continue
else:
logger.info(
"Individual %d: produced %d deobfuscated variants",
i,
len(individual.deobfuscated_code)
)
# Quantify deobfuscation effect for each deobfuscated code and calculate average
if individual.deobfuscated_code:
all_scores = []
for model_name, deobf_code in individual.deobfuscated_code.items():
try:
scores = quantification_deobfuscation(
individual.sample, individual.obfuscated_code, deobf_code, model_name=model_name
)
all_scores.append(scores)
except Exception as e:
logger.warning(f"Quantification failed for model {model_name}: {e}")
continue
if all_scores:
# Calculate average scores across all deobfuscated versions
individual.quantification_scores = calculate_average_scores(all_scores)
else:
logger.warning(f"No valid quantification scores for individual {i}")
individual.adversarial_score = 0.0
continue
else:
logger.warning(f"Individual {i} has invalid deobfuscated_code format")
individual.adversarial_score = 0.0
continue
# Calculate adversarial score
individual.adversarial_score = loss_function(individual.quantification_scores)
logger.debug(f"Individual {i} adversarial score: {individual.adversarial_score}")
logger.info(
"Individual %d evaluation complete with score %.3f and genes %s",
i,
individual.adversarial_score,
individual.genes
)
# Save population evaluation results to JSONL
save_population_evaluation_results(population, population_id)
def generate_children_from_parent_genes(parent_sample: Dict, parent_genes: List[str], donor_genes: List[str]) -> List[Individual]:
children = []
restricted_genes = {"FunctionClassRenamer", "VariableRenamer", "StringObfuscator"}
for gene in donor_genes:
# Skip if gene is restricted and already exists in parent
if gene in restricted_genes and gene in parent_genes:
logger.debug(f"Skipping restricted gene {gene} as it already exists in parent genes")
continue
# Apply the gene to create a child
try:
child_code = apply_obfuscation_sequence(parent_sample, [gene])
child_genes = parent_genes + [gene]
child = Individual(parent_sample, child_genes, child_code)
children.append(child)
logger.debug(f"Created child with genes: {child_genes}")
except Exception as e:
logger.warning(f"Failed to create child with gene {gene}: {e}")
continue
return children
def crossover(base_parent: Individual, donor_parents: List[Individual]) -> List[Individual]:
logger.debug(
"Starting crossover for base parent with genes: %s",
base_parent.genes,
)
donor_genes = []
for donor in donor_parents:
if donor is base_parent:
continue
donor_genes.extend(donor.genes)
unique_donor_genes = []
seen_genes = set()
for gene in donor_genes:
if gene in seen_genes:
continue
seen_genes.add(gene)
unique_donor_genes.append(gene)
if not unique_donor_genes:
logger.debug(
"No unique donor genes available for base parent with genes: %s",
base_parent.genes,
)
return []
children = generate_children_from_parent_genes(
base_parent.sample,
base_parent.genes,
unique_donor_genes,
)
logger.debug(
"Base parent with genes %s generated %d children using %d unique donor genes",
base_parent.genes,
len(children),
len(unique_donor_genes),
)
return children
def genetic_algorithm(sample: Dict, methods: List[str], generations: int = 2) -> Individual:
logger.info(f"Starting genetic algorithm with {generations} generations")
# Step 1: Create initial population
population = create_initial_population(sample, methods)
original_population_size = 42 # Maintain original population size 42
# Step 2: Evaluate initial population
evaluate_population(population, "initial")
# Evolutionary loop
for generation in range(generations):
logger.info(f"Generation {generation + 1}/{generations}")
# Step 3: Select top 15% for reproduction
if generation == 0 and len(population) <= 20:
selection_ratio = 1.0
else:
selection_ratio = 0.15 # Select top 15% in subsequent generations
selected = sample_selection(population, selection_ratio=selection_ratio)
logger.info(f"Selected {len(selected)} individuals for crossover")
if len(selected) > 1:
aggregated_gene_count = len({gene for parent in selected for gene in parent.genes})
logger.info(
f"Will perform crossover per parent using {aggregated_gene_count} unique donor genes in total"
)
else:
logger.info("Only one parent selected; crossover will be skipped")
# Step 4: Create new generation through crossover
new_population = []
# Generate offspring using aggregated donor genes via crossover
for idx, base_parent in enumerate(selected):
children = crossover(base_parent, selected)
if not children:
logger.debug(f"Parent {idx} produced no children during crossover")
continue
new_population.extend(children)
logger.debug(
"Parent %d generated %d children during crossover",
idx,
len(children),
)
# Step 5: Evaluate new population
evaluate_population(new_population, f"generation_{generation + 1}")
# Step 6: Truncate population by adversarial score to maintain population size
# original_population_size = len(population)
new_population.sort(key=lambda x: x.adversarial_score, reverse=True)
population = new_population[:original_population_size]
logger.info(f"Generated {len(new_population)} children, kept top {len(population)} for next generation")
if population:
logger.info(f"Best adversarial score in generation: {population[0].adversarial_score}")
logger.info(f"Worst adversarial score in generation: {population[-1].adversarial_score}")
else:
logger.warning("Population exhausted after crossover evaluation")
break
# Step 7: Return best individual and the first 10 individuals for analysis
best_individual = sorted(population, key=lambda x: x.adversarial_score, reverse=True)[0]
save_best_individuals_to_results(best_individual)
logger.info(f"Best individual genes: {best_individual.genes}")
return best_individual
def process_sample_chunk(samples: List[Dict]):
process_id = mp.current_process().name
logger.info(f"Process {process_id} starting to process {len(samples)} samples")
for i, sample in enumerate(samples):
sample_progress = f"({i+1}/{len(samples)})"
problem_id = sample.get('problem_id', 'unknown')
logger.info(f"Process {process_id} processing sample {sample_progress}: {problem_id}")
try:
run_genetic_algorithm_on_sample(sample)
except Exception as e:
logger.error(f"Process {process_id} failed on sample {problem_id}: {e}")
continue
logger.info(f"Process {process_id} completed processing {len(samples)} samples")
def run_genetic_algorithm_on_sample(sample: Dict):
# Define methods
methods = [
"GiveErrorSummary",
"InductionAnnotation",
"InsertUnsafeCode",
"FunctionClassRenamer",
"VariableRenamer",
"StringObfuscator",
"TryExceptWrapper",
"StopInference"
]
logger.info(
"Selected sample with Problem ID %s, submission %s",
sample.get('problem_id', 'unknown'),
sample.get('submission_id', 'unknown')
)
try:
genetic_algorithm(sample, methods, generations=2)
except Exception as e:
logger.error(f"Error in genetic algorithm: {e}")
raise
def run_parallel_processing(dataset: List[Dict], start_idx: int = 0, end_idx: int = None, sample_count: int = None, num_processes: int = 1):
# Determine the sampling range
if end_idx is not None:
# Use explicit start and end indices
selected_samples = dataset[start_idx:end_idx]
logger.info(f"Processing samples from index {start_idx} to {end_idx-1} ({len(selected_samples)} samples)")
elif sample_count is not None:
# Use start_idx + sample_count
end_idx = min(start_idx + sample_count, len(dataset))
selected_samples = dataset[start_idx:end_idx]
logger.info(f"Processing {len(selected_samples)} samples from index {start_idx} to {end_idx-1}")
else:
# Process all samples from start_idx to end
selected_samples = dataset[start_idx:]
logger.info(f"Processing {len(selected_samples)} samples from index {start_idx} to end")
# Split into chunks for parallel processing
chunk_size = len(selected_samples) // num_processes
chunks = []
for i in range(num_processes):
start_idx = i * chunk_size
if i == num_processes - 1: # Last chunk gets remaining samples
end_idx = len(selected_samples)
else:
end_idx = (i + 1) * chunk_size
chunk = selected_samples[start_idx:end_idx]
chunks.append(chunk)
logger.info(f"Chunk {i+1}: {len(chunk)} samples (indices {start_idx}-{end_idx-1})")
# Process chunks in parallel
logger.info(f"Starting parallel processing with {num_processes} processes")
with ProcessPoolExecutor(max_workers=num_processes) as executor:
futures = [executor.submit(process_sample_chunk, chunk) for chunk in chunks]
# Track progress as processes complete
completed_processes = 0
total_processes = len(futures)
for i, future in enumerate(futures):
try:
future.result()
completed_processes += 1
logger.info(f"Process {i+1} completed successfully ({completed_processes}/{total_processes})")
except Exception as e:
completed_processes += 1
logger.error(f"Process {i+1} failed with error: {e}")
logger.info("All parallel processes completed")
def main():
# Parse command line arguments
parser = argparse.ArgumentParser(description='Run genetic algorithm for code obfuscation')
parser.add_argument('--start-idx', type=int, default=0, help='Starting index for sampling (default: 0)')
parser.add_argument('--end-idx', type=int, default=100, help='Ending index for sampling (default: 100)')
parser.add_argument('--sample-count', type=int, default=100, help='Number of samples to process from start-idx (default: 100)')
parser.add_argument('--processes', type=int, default=1, help='Number of parallel processes (default: 1)')
parser.add_argument('--dataset-path', type=str, default="datasets/raw_datasets.jsonl", help='Path to the dataset file (default: datasets/raw_datasets.jsonl)')
args = parser.parse_args()
logger.info("Starting genetic algorithm for code obfuscation")
logger.info(f"Parameters: start_idx={args.start_idx}, end_idx={args.end_idx}, sample_count={args.sample_count}, processes={args.processes}")
# Load dataset
dataset_path = args.dataset_path
logger.info("Loading dataset from %s", dataset_path)
with open(dataset_path, 'r') as f:
dataset = [json.loads(line) for line in f]
logger.info(f"Loaded dataset with {len(dataset)} samples")
# Run parallel processing on samples with parsed arguments
run_parallel_processing(
dataset,
start_idx=args.start_idx,
end_idx=args.end_idx,
sample_count=args.sample_count,
num_processes=args.processes
)
if __name__ == "__main__":
# Set multiprocessing start method
mp.set_start_method('spawn', force=True)
start_time = time.time()
main()
end_time = time.time()
time_used = (end_time - start_time) / 60
logger.info(f"Total time used: {time_used:.2f} minutes")