Skip to content

Add ttc sampling and expected value to AttackGraphNode #128

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion maltoolbox/attackgraph/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
if TYPE_CHECKING:
from typing import Any, Optional
from . import Attacker
from ..language import LanguageGraphAttackStep, Detector
from ..language import LanguageGraphAttackStep
from ..model import ModelAsset

class AttackGraphNode:
Expand Down
2 changes: 1 addition & 1 deletion maltoolbox/file_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Utily functions for file handling"""
"""Utilty functions for file handling"""

import json
import yaml
Expand Down
33 changes: 31 additions & 2 deletions maltoolbox/language/languagegraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,8 @@ class LanguageGraph():
"""Graph representation of a MAL language"""
def __init__(self, lang: Optional[dict] = None):
self.assets: dict = {}
self.predef_ttcs = load_dict_from_yaml_file(
'maltoolbox/language/predefined_ttcs.yml')
if lang is not None:
self._lang_spec: dict = lang
self.metadata = {
Expand Down Expand Up @@ -695,6 +697,30 @@ def from_mar_archive(cls, mar_archive: str) -> LanguageGraph:
return LanguageGraph(json.loads(langspec))


def replace_if_predef_ttc(self, ttc_entry: dict) -> dict:
"""
If the TTC provided is a predefined name replace it with the
probability distribution it corresponds to. Otherwise, simply return
the TTC distribution provided as is.

Arguments:
ttc_entry - the TTC entry to check for predefined names

Returns:
If the TTC entry provided contained a predefined name the TTC
probability distrubtion corresponding to it. Otherwise, the TTC
distribution provided as a parameter as is.
"""
if ttc_entry is None:
return None

ttc = self.predef_ttcs.get(ttc_entry.get('name'))
if ttc is not None:
return ttc
else:
return ttc_entry


def _to_dict(self):
"""Converts LanguageGraph into a dict"""

Expand Down Expand Up @@ -836,11 +862,13 @@ def _from_dict(cls, serialized_graph: dict) -> LanguageGraph:
asset_dict['name']
)
for attack_step_dict in asset_dict['attack_steps'].values():
ttc = lang_graph.replace_if_predef_ttc(
attack_step_dict['ttc'])
attack_step_node = LanguageGraphAttackStep(
name = attack_step_dict['name'],
type = attack_step_dict['type'],
asset = asset,
ttc = attack_step_dict['ttc'],
ttc = ttc,
overrides = attack_step_dict['overrides'],
children = {},
parents = {},
Expand Down Expand Up @@ -1432,11 +1460,12 @@ def _generate_graph(self) -> None:
attack_step_attribs['name']
)

ttc = self.replace_if_predef_ttc(attack_step_attribs['ttc'])
attack_step_node = LanguageGraphAttackStep(
name = attack_step_attribs['name'],
type = attack_step_attribs['type'],
asset = asset,
ttc = attack_step_attribs['ttc'],
ttc = ttc,
overrides = attack_step_attribs['reaches']['overrides'] \
if attack_step_attribs['reaches'] else False,
children = {},
Expand Down
59 changes: 59 additions & 0 deletions maltoolbox/language/predefined_ttcs.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
EasyAndUncertain:
arguments:
- 0.5
name: Bernoulli
type: function
HardAndUncertain:
lhs:
arguments:
- 0.1
name: Exponential
type: function
rhs:
arguments:
- 0.5
name: Bernoulli
type: function
type: multiplication
VeryHardAndUncertain:
lhs:
arguments:
- 0.01
name: Exponential
type: function
rhs:
arguments:
- 0.5
name: Bernoulli
type: function
type: multiplication
EasyAndCertain:
arguments:
- 1
name: Exponential
type: function
HardAndCertain:
arguments:
- 0.1
name: Exponential
type: function
VeryHardAndCertain:
arguments:
- 0.01
name: Exponential
type: function
Enabled:
arguments:
- 1.0
name: Bernoulli
type: function
Instant:
arguments:
- 1.0
name: Bernoulli
type: function
Disabled:
arguments:
- 0.0
name: Bernoulli
type: function
175 changes: 175 additions & 0 deletions maltoolbox/probs_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
"""Utility functions for handling probabilities"""

import logging
import math
import random
from enum import Enum

logger = logging.getLogger(__name__)

class ProbCalculationMethod(Enum):
SAMPLE = 1
EXPECTED = 2

def sample_prob(probs_dict):
"""Calculate the sampled value from a probability distribution function
Arguments:
probs_dict - a dictionary containing the probability distribution
function

Return:
The float value obtained from calculating the sampled value corresponding
to the function provided.
"""
if probs_dict['type'] != 'function':
raise ValueError('Sample probability method requires a function '
f'probability distribution, but got "{probs_dict["type"]}"')

match(probs_dict['name']):
case 'Bernoulli':
value = random.random()
threshold = 1.0 - float(probs_dict['arguments'][0])
return math.inf if value < threshold else 0.0

case 'Exponential':
lambd = float(probs_dict['arguments'][0])
return random.expovariate(lambd)

case 'Binomial':
n = int(probs_dict['arguments'][0])
p = float(probs_dict['arguments'][1])
# TODO: Someone with basic probabilities competences should
# actually check if this is correct.
return random.binomialvariate(n, p)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My linter tells me this method does not exist in module random

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am also not the correct person to say if these sampling methods are correct :P

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://docs.python.org/3/library/random.html#random.binomialvariate

I think the issue is that it was introduced in python 3.12

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh alright, that would make mal-toolbox require 3.12 I guess so maybe it is worth to see if there is a different option.

Copy link
Collaborator

@andrewbwm andrewbwm Apr 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There isn't and something somewhere(maybe in the mal simulator) already requires 3.12. Some people trying to use it ran into it. So, I think we should just specify 3.12 as a requirement.


case 'Gamma':
alpha = float(probs_dict['arguments'][0])
beta = float(probs_dict['arguments'][1])
return random.gammavariate(alpha, beta)

case 'LogNormal':
mu = float(probs_dict['arguments'][0])
sigma = float(probs_dict['arguments'][1])
return random.lognormvariate(mu, sigma)

case 'Uniform':
a = float(probs_dict['arguments'][0])
b = float(probs_dict['arguments'][1])
return random.uniform(a, b)

case 'Pareto' | 'Truncated Normal':
raise NotImplementedError('f{probs_dict["name"]} '
'probability distribution is not currently '
'supported!')

case _:
raise ValueError('Unknown probability distribution '
f'function encountered {probs_dict["name"]}!')


def expected_prob(probs_dict):
"""Calculate the expected value from a probability distribution function
Arguments:
probs_dict - a dictionary containing the probability distribution
function

Return:
The float value obtained from calculating the expected value corresponding
to the function provided.
"""
if probs_dict['type'] != 'function':
raise ValueError('Expected value probability method requires a '
'function probability distribution, but got '
f'"{probs_dict["type"]}"')

match(probs_dict['name']):
case 'Bernoulli':
threshold = 1 - float(probs_dict['arguments'][0])
return threshold

case 'Exponential':
lambd = float(probs_dict['arguments'][0])
return 1/lambd

case 'Binomial':
n = int(probs_dict['arguments'][0])
p = float(probs_dict['arguments'][1])
# TODO: Someone with basic probabilities competences should
# actually check if this is correct.
return n * p

case 'Gamma':
alpha = float(probs_dict['arguments'][0])
beta = float(probs_dict['arguments'][1])
return alpha * beta

case 'LogNormal':
mu = float(probs_dict['arguments'][0])
sigma = float(probs_dict['arguments'][1])
return pow(math.e, (mu + (pow(sigma, 2)/2)))

case 'Uniform':
a = float(probs_dict['arguments'][0])
b = float(probs_dict['arguments'][1])
return (a + b)/2

case 'Pareto' | 'Truncated Normal':
raise NotImplementedError('f{probs_dict["name"]} '
'probability distribution is not currently '
'supported!')

case _:
raise ValueError('Unknown probability distribution '
f'function encountered {probs_dict["name"]}!')


def calculate_prob(probs_dict: dict, method: ProbCalculationMethod) -> float:
"""Calculate the value from a probability distribution
Arguments:
probs_dict - a dictionary containing the probability distribution
function
method - the method to use in calculating the probability
values(currently supporting sampled or expected values)

Return:
The float value obtained from calculating the probability distribution.

TTC Distributions in MAL:
https://mal-lang.org/mal-langspec/apidocs/org.mal_lang.langspec/org/mal_lang/langspec/ttc/TtcDistribution.html
"""
if probs_dict is None:
return math.nan

match(probs_dict['type']):
case 'addition' | 'subtraction' | 'multiplication' | \
'division' | 'exponentiation':
lv = calculate_prob(probs_dict['lhs'], method)
rv = calculate_prob(probs_dict['rhs'], method)
match(probs_dict['type']):
case 'addition':
return lv + rv
case 'subtraction':
return lv - rv
case 'multiplication':
return lv * rv
case 'division':
return lv / rv
case 'exponentiation':
return pow(lv, rv)
case _:
raise ValueError('Unknown probability distribution type '
f'encountered {probs_dict["type"]}!')

case 'function':
match(method):
case ProbCalculationMethod.SAMPLE:
return sample_prob(probs_dict)
case ProbCalculationMethod.EXPECTED:
return expected_prob(probs_dict)
case _:
raise ValueError('Unknown Probability Calculation method '
f'encountered {method}!')

case _:
raise ValueError('Unknown probability distribution type '
f'encountered {probs_dict["type"]}!')
4 changes: 4 additions & 0 deletions tests/language/test_languagegraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,3 +148,7 @@ def test_attackstep_override():
# one another.
# def test_mallib_mal():
# LanguageGraph(MalCompiler().compile('tests/testdata/mallib_test.mal'))

def test_probability_distributions():
lg = LanguageGraph(MalCompiler().compile('tests/testdata/prob_dists.mal'))
lg.save_to_file('logs/prob_dists_lg.yml')
31 changes: 31 additions & 0 deletions tests/test_probs_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""Unit tests for the probabilities utilities module"""

import pytest

from maltoolbox.model import Model
from maltoolbox.attackgraph.attackgraph import AttackGraph
from maltoolbox.probs_utils import calculate_prob, ProbCalculationMethod

def test_probs_utils(model: Model):
"""Test TTC calculation for nodes"""

app = model.add_asset('Application')
creds = model.add_asset('Credentials')
user = model.add_asset('User')
identity = model.add_asset('Identity')
vuln = model.add_asset('SoftwareVulnerability')

identity.add_associated_assets('credentials', {creds})
user.add_associated_assets('userIds', {identity})
app.add_associated_assets('highPrivAppIAMs', {identity})
app.add_associated_assets('vulnerabilities', {vuln})

attack_graph = AttackGraph(model.lang_graph, model)

for node in attack_graph.nodes.values():
#TODO: Actually check some of the results
calculate_prob(node.ttc, ProbCalculationMethod.SAMPLE)

for node in attack_graph.nodes.values():
#TODO: Actually check some of the results
calculate_prob(node.ttc, ProbCalculationMethod.EXPECTED)
16 changes: 16 additions & 0 deletions tests/testdata/prob_dists.mal
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// vim:ft=mal
#id: "test"
#version: "0.0.0"

category Test{
asset Dummy {
| expo [Exponential(0.1)]
| berni [Bernoulli(0.5)]
| composite [Exponential(0.1) * Bernoulli(0.5)]
| compositeComplex [Bernoulli(0.5) * (Binomial(10, 0.1) + Exponential(0.2) - Gamma(0.1, 0.2) * LogNormal(5, 0.5) / Pareto(7, 0.25) ^ TruncatedNormal(6, 0.3) + Uniform(0.1, 0.9))]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nkakouros just as a heads up, we don't need it right now since it will not be used any time soon, but the compiler does not handle the / correctly. The expression simply terminates when it encounters it.

| compositeComplex2 [Bernoulli(0.5) * (Binomial(10, 0.1) + Exponential(0.2) - Gamma(0.1, 0.2) * LogNormal(5, 0.5) + Pareto(7, 0.25) ^ TruncatedNormal(6, 0.3) + Uniform(0.1, 0.9))]
| hardAndUncertain [HardAndUncertain]
}

}

Loading