Skip to content

Commit 4cc4008

Browse files
committed
feat(pkt-meta): Add initial flow table implementation
This is the first cut for a flow table. There is more work to be done, but this is a good checkpoint. Interfaces and organization may change as we start using this with the stateful NAT code. Signed-off-by: Manish Vachharajani <[email protected]>
1 parent 4e12054 commit 4cc4008

File tree

10 files changed

+1718
-1
lines changed

10 files changed

+1718
-1
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkt-meta/Cargo.toml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,15 @@ publish = false
66
license = "Apache-2.0"
77

88
[features]
9+
shuttle = ["concurrency/shuttle"]
10+
std = []
911
testing = []
12+
bolero = ["dep:bolero", "net/bolero"]
1013

1114
[dependencies]
1215
ahash = { workspace = true }
1316
atomic-instant-full = { workspace = true }
17+
bolero = { workspace = true, optional = true }
1418
lpm = { workspace = true }
1519
concurrency = { workspace = true }
1620
config = { workspace = true }
@@ -27,5 +31,6 @@ tracing = { workspace = true }
2731
[dev-dependencies]
2832
bolero = { workspace = true, default-features = false }
2933
lpm = { workspace = true, features = ["testing"] }
34+
net = { workspace = true, features = ["bolero"] }
3035
tracing-test = { workspace = true, features = [] }
31-
36+
shuttle = { workspace = true }

pkt-meta/src/flow_table/README.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Flow Table
2+
3+
The current implementation of flow table uses `dash_map` and per-thread priority queue's (for timeouts) along with `Arc` and `Weak` to get a reasonable flow table with timeouts.
4+
However, it leaves a lot of room for optimizations.
5+
6+
## Flow Table Implementation
7+
8+
The main `DashMap` holds `Weak` references to all the flow entries so that the memory gets automatically deallocated when the entry times out.
9+
10+
The priority queue's hold `Arc` references to the flow entries to keep them alive when they are not in any packet meta data.
11+
When the entry times-out and is removed from the priority queue and the last packet referencing that flow is dropped, the memory for the entry is freed.
12+
13+
Note that in the current implementation, a flow is not removed from the flow table until the last Arc to the flow_info is dropped or the flow entry is replaced. This can be changed if needed, or even have it be an option on the flow as to whether timeout removes the flow or not.
14+
15+
## Optimizations
16+
17+
In the current implementation, there has to be periodic or on-timeout reaping the Weak reference in the hash table.
18+
This is better done by having a version of `DashMap` that can reap the dead `Weak` reference as it walks the table on lookups, instead of waiting for key collisions.
19+
The hope, for now, is that the entries in the hash table array will contain a small pointer and not take up too much extra memory.
20+
Those dead `Weak` pointers will prevent shrinking of the hash table though, if the implementation supports that.
21+
22+
Second, the `priority_queue` crate uses a `HashMap` inside the queue in order to allow fast removal and re-insertion.
23+
However, this wastes space and requires extra hashes.
24+
The better way to do this is to have a custom priority queue integrated with the custom weak-reaping hash map so that the same hash table can be used for both operations.
25+
This improves cache locality, reduces memory utlization, and avoids multiple hash table lookups in many cases.
26+
27+
However, in the interest of time to completion for the code, this module currently uses existing data structures instead of full custom implementations of everything.
28+
However, the interface should be able to hide a change from the current to the optimized implementation.
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// Copyright Open Network Fabric Authors
3+
4+
use atomic_instant_full;
5+
use std::fmt::{Debug, Formatter};
6+
use std::ops::Deref;
7+
use std::sync::atomic::Ordering;
8+
use std::time::Instant;
9+
10+
#[repr(transparent)]
11+
pub struct AtomicInstant(atomic_instant_full::AtomicInstant);
12+
13+
impl Debug for AtomicInstant {
14+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
15+
write!(f, "({:?})", self.0.load(Ordering::Relaxed))
16+
}
17+
}
18+
19+
impl Deref for AtomicInstant {
20+
type Target = atomic_instant_full::AtomicInstant;
21+
22+
fn deref(&self) -> &Self::Target {
23+
&self.0
24+
}
25+
}
26+
27+
impl AtomicInstant {
28+
#[must_use]
29+
pub fn new(instant: Instant) -> Self {
30+
Self(atomic_instant_full::AtomicInstant::new(instant))
31+
}
32+
}
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// Copyright Open Network Fabric Authors
3+
4+
use std::fmt::Debug;
5+
use std::time::{Duration, Instant};
6+
7+
use concurrency::sync::RwLock;
8+
use net::packet::VpcDiscriminant;
9+
10+
use crate::flow_table::AtomicInstant;
11+
12+
use std::sync::atomic::{AtomicU8, Ordering};
13+
14+
#[derive(Debug, thiserror::Error)]
15+
pub enum FlowInfoError {
16+
#[error("flow expired")]
17+
FlowExpired(Instant),
18+
#[error("no such status")]
19+
NoSuchStatus(u8),
20+
}
21+
22+
#[repr(u8)]
23+
#[derive(Clone, Copy, Debug, PartialEq)]
24+
pub enum FlowStatus {
25+
Active = 0,
26+
Expired = 1,
27+
Removed = 2,
28+
}
29+
30+
impl TryFrom<u8> for FlowStatus {
31+
type Error = FlowInfoError;
32+
33+
fn try_from(value: u8) -> Result<Self, Self::Error> {
34+
match value {
35+
0 => Ok(FlowStatus::Active),
36+
1 => Ok(FlowStatus::Expired),
37+
2 => Ok(FlowStatus::Removed),
38+
v => Err(FlowInfoError::NoSuchStatus(v)),
39+
}
40+
}
41+
}
42+
43+
impl From<FlowStatus> for u8 {
44+
fn from(status: FlowStatus) -> Self {
45+
status as u8
46+
}
47+
}
48+
49+
pub struct AtomicFlowStatus(AtomicU8);
50+
51+
impl Debug for AtomicFlowStatus {
52+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53+
write!(f, "{:?}", self.load(std::sync::atomic::Ordering::Relaxed))
54+
}
55+
}
56+
57+
impl AtomicFlowStatus {
58+
/// Load the flow status.
59+
///
60+
/// # Panics
61+
///
62+
/// Panics if the the stored flow status is invalid, which should never happen.
63+
///
64+
#[must_use]
65+
pub fn load(&self, ordering: Ordering) -> FlowStatus {
66+
let value = self.0.load(ordering);
67+
FlowStatus::try_from(value).expect("Invalid enum state")
68+
}
69+
70+
pub fn store(&self, state: FlowStatus, ordering: Ordering) {
71+
self.0.store(u8::from(state), ordering);
72+
}
73+
74+
/// Atomic compare and exchange of the flow status.
75+
///
76+
/// # Errors
77+
///
78+
/// Returns previous `FlowStatus` if the compare and exchange fails.
79+
///
80+
/// # Panics
81+
///
82+
/// Panics if the the stored flow status is invalid, which should never happen.
83+
///
84+
pub fn compare_exchange(
85+
&self,
86+
current: FlowStatus,
87+
new: FlowStatus,
88+
success: Ordering,
89+
failure: Ordering,
90+
) -> Result<FlowStatus, FlowStatus> {
91+
match self
92+
.0
93+
.compare_exchange(current as u8, new as u8, success, failure)
94+
{
95+
Ok(prev) => Ok(FlowStatus::try_from(prev).expect("Invalid enum state")),
96+
Err(prev) => Err(FlowStatus::try_from(prev).expect("Invalid enum state")),
97+
}
98+
}
99+
}
100+
101+
impl From<FlowStatus> for AtomicFlowStatus {
102+
fn from(status: FlowStatus) -> Self {
103+
Self(AtomicU8::new(status as u8))
104+
}
105+
}
106+
107+
#[derive(Debug, Clone)]
108+
pub struct FlowInfoLocked {
109+
pub dst_vpcd: Option<VpcDiscriminant>,
110+
}
111+
112+
#[derive(Debug)]
113+
pub struct FlowInfo {
114+
expires_at: AtomicInstant,
115+
status: AtomicFlowStatus,
116+
pub locked: RwLock<FlowInfoLocked>,
117+
}
118+
119+
// TODO: We need a way to stuff an Arc<FlowInfo> into the packet
120+
// meta data. That means this has to move to net or we need a generic
121+
// meta data extension method.
122+
impl FlowInfo {
123+
#[must_use]
124+
pub fn new(expires_at: Instant) -> Self {
125+
Self {
126+
expires_at: AtomicInstant::new(expires_at),
127+
status: AtomicFlowStatus::from(FlowStatus::Active),
128+
locked: RwLock::new(FlowInfoLocked { dst_vpcd: None }),
129+
}
130+
}
131+
132+
pub fn expires_at(&self) -> Instant {
133+
self.expires_at.load(std::sync::atomic::Ordering::Relaxed)
134+
}
135+
136+
/// Extend the expiry of the flow if it is not expired.
137+
///
138+
/// # Errors
139+
///
140+
/// Returns `FlowInfoError::FlowExpired` if the flow is expired with the expiry `Instant`
141+
///
142+
pub fn extend_expiry(&self, duration: Duration) -> Result<(), FlowInfoError> {
143+
if self.status.load(std::sync::atomic::Ordering::Relaxed) == FlowStatus::Expired {
144+
return Err(FlowInfoError::FlowExpired(self.expires_at()));
145+
}
146+
self.extend_expiry_unchecked(duration);
147+
Ok(())
148+
}
149+
150+
/// Extend the expiry of the flow without checking if it is already expired.
151+
///
152+
/// # Thread Safety
153+
///
154+
/// This method is thread-safe.
155+
///
156+
pub fn extend_expiry_unchecked(&self, duration: Duration) {
157+
self.expires_at
158+
.fetch_add(duration, std::sync::atomic::Ordering::Relaxed);
159+
}
160+
161+
pub fn status(&self) -> FlowStatus {
162+
self.status.load(std::sync::atomic::Ordering::Relaxed)
163+
}
164+
165+
/// Update the flow status.
166+
///
167+
/// # Thread Safety
168+
///
169+
/// This method is thread-safe.
170+
///
171+
/// # Errors
172+
///
173+
/// Returns an error if the status transition is invalid.
174+
///
175+
pub fn update_status(&self, status: FlowStatus) -> Result<(), FlowInfoError> {
176+
self.status
177+
.store(status, std::sync::atomic::Ordering::Relaxed);
178+
Ok(())
179+
}
180+
}

0 commit comments

Comments
 (0)