Skip to content

Commit 20e087b

Browse files
committed
pass callback argument by reference
1 parent 05a0cd2 commit 20e087b

40 files changed

+495
-847
lines changed

commons/zenoh-codec/benches/codec.rs

+24-96
Original file line numberDiff line numberDiff line change
@@ -80,22 +80,10 @@ fn criterion_benchmark(c: &mut Criterion) {
8080
ext_qos: zenoh_protocol::transport::frame::ext::QoSType::DEFAULT,
8181
};
8282

83-
let data = Push {
84-
wire_expr: WireExpr::empty(),
85-
ext_qos: ext::QoSType::DEFAULT,
86-
ext_tstamp: None,
87-
ext_nodeid: ext::NodeIdType::DEFAULT,
88-
payload: PushBody::Put(Put {
89-
timestamp: None,
90-
encoding: Encoding::empty(),
91-
ext_sinfo: None,
92-
#[cfg(feature = "shared-memory")]
93-
ext_shm: None,
94-
ext_attachment: None,
95-
ext_unknown: vec![],
96-
payload: ZBuf::from(vec![0u8; 8]),
97-
}),
98-
};
83+
let data = Push::from(Put {
84+
payload: ZBuf::from(vec![0u8; 8]),
85+
..Put::default()
86+
});
9987

10088
// Calculate the number of messages
10189
// let mut writer = buff.writer();
@@ -126,22 +114,10 @@ fn criterion_benchmark(c: &mut Criterion) {
126114
ext_qos: zenoh_protocol::transport::frame::ext::QoSType::DEFAULT,
127115
};
128116

129-
let data = Push {
130-
wire_expr: WireExpr::empty(),
131-
ext_qos: ext::QoSType::DEFAULT,
132-
ext_tstamp: None,
133-
ext_nodeid: ext::NodeIdType::DEFAULT,
134-
payload: PushBody::Put(Put {
135-
timestamp: None,
136-
encoding: Encoding::empty(),
137-
ext_sinfo: None,
138-
#[cfg(feature = "shared-memory")]
139-
ext_shm: None,
140-
ext_attachment: None,
141-
ext_unknown: vec![],
142-
payload: ZBuf::from(vec![0u8; 8]),
143-
}),
144-
};
117+
let data = Push::from(Put {
118+
payload: ZBuf::from(vec![0u8; 8]),
119+
..Put::default()
120+
});
145121

146122
let mut writer = buff.writer();
147123
codec.write(&mut writer, &frame).unwrap();
@@ -167,22 +143,10 @@ fn criterion_benchmark(c: &mut Criterion) {
167143
ext_qos: zenoh_protocol::transport::frame::ext::QoSType::DEFAULT,
168144
};
169145

170-
let data = Push {
171-
wire_expr: WireExpr::empty(),
172-
ext_qos: ext::QoSType::DEFAULT,
173-
ext_tstamp: None,
174-
ext_nodeid: ext::NodeIdType::DEFAULT,
175-
payload: PushBody::Put(Put {
176-
timestamp: None,
177-
encoding: Encoding::empty(),
178-
ext_sinfo: None,
179-
#[cfg(feature = "shared-memory")]
180-
ext_shm: None,
181-
ext_attachment: None,
182-
ext_unknown: vec![],
183-
payload: ZBuf::from(vec![0u8; 8]),
184-
}),
185-
};
146+
let data = Push::from(Put {
147+
payload: ZBuf::from(vec![0u8; 8]),
148+
..Put::default()
149+
});
186150

187151
let mut writer = buff.writer();
188152
codec.write(&mut writer, &frame).unwrap();
@@ -208,22 +172,10 @@ fn criterion_benchmark(c: &mut Criterion) {
208172
let mut buff = ZBuf::empty();
209173
let codec = Zenoh080::new();
210174

211-
let data = Push {
212-
wire_expr: WireExpr::empty(),
213-
ext_qos: ext::QoSType::DEFAULT,
214-
ext_tstamp: None,
215-
ext_nodeid: ext::NodeIdType::DEFAULT,
216-
payload: PushBody::Put(Put {
217-
timestamp: None,
218-
encoding: Encoding::empty(),
219-
ext_sinfo: None,
220-
#[cfg(feature = "shared-memory")]
221-
ext_shm: None,
222-
ext_attachment: None,
223-
ext_unknown: vec![],
224-
payload: ZBuf::from(vec![0u8; 1_000_000]),
225-
}),
226-
};
175+
let data = Push::from(Put {
176+
payload: ZBuf::from(vec![0u8; 1_000_000]),
177+
..Put::default()
178+
});
227179

228180
c.bench_function("Fragmentation ZBuf Write", |b| {
229181
b.iter(|| {
@@ -236,22 +188,10 @@ fn criterion_benchmark(c: &mut Criterion) {
236188
let mut buff = vec![];
237189
let codec = Zenoh080::new();
238190

239-
let data = Push {
240-
wire_expr: WireExpr::empty(),
241-
ext_qos: ext::QoSType::DEFAULT,
242-
ext_tstamp: None,
243-
ext_nodeid: ext::NodeIdType::DEFAULT,
244-
payload: PushBody::Put(Put {
245-
timestamp: None,
246-
encoding: Encoding::empty(),
247-
ext_sinfo: None,
248-
#[cfg(feature = "shared-memory")]
249-
ext_shm: None,
250-
ext_attachment: None,
251-
ext_unknown: vec![],
252-
payload: ZBuf::from(vec![0u8; 1_000_000]),
253-
}),
254-
};
191+
let data = Push::from(Put {
192+
payload: ZBuf::from(vec![0u8; 1_000_000]),
193+
..Put::default()
194+
});
255195

256196
let mut writer = buff.writer();
257197
codec.write(&mut writer, &data).unwrap();
@@ -275,22 +215,10 @@ fn criterion_benchmark(c: &mut Criterion) {
275215
let mut buff = vec![];
276216
let codec = Zenoh080::new();
277217

278-
let data = Push {
279-
wire_expr: WireExpr::empty(),
280-
ext_qos: ext::QoSType::DEFAULT,
281-
ext_tstamp: None,
282-
ext_nodeid: ext::NodeIdType::DEFAULT,
283-
payload: PushBody::Put(Put {
284-
timestamp: None,
285-
encoding: Encoding::empty(),
286-
ext_sinfo: None,
287-
#[cfg(feature = "shared-memory")]
288-
ext_shm: None,
289-
ext_attachment: None,
290-
ext_unknown: vec![],
291-
payload: ZBuf::from(vec![0u8; 1_000_000]),
292-
}),
293-
};
218+
let data = Push::from(Put {
219+
payload: ZBuf::from(vec![0u8; 1_000_000]),
220+
..Put::default()
221+
});
294222

295223
let mut writer = buff.writer();
296224
codec.write(&mut writer, &data).unwrap();

commons/zenoh-protocol/src/network/push.rs

+38-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@
1111
// Contributors:
1212
// ZettaScale Zenoh Team, <[email protected]>
1313
//
14-
use crate::{core::WireExpr, zenoh::PushBody};
14+
use crate::{
15+
core::WireExpr,
16+
zenoh::{Del, PushBody, Put},
17+
};
1518

1619
pub mod flag {
1720
pub const N: u8 = 1 << 5; // 0x20 Named if N==1 then the key expr has name/suffix
@@ -84,3 +87,37 @@ impl Push {
8487
}
8588
}
8689
}
90+
91+
impl From<PushBody> for Push {
92+
fn from(value: PushBody) -> Self {
93+
Self {
94+
wire_expr: WireExpr::empty(),
95+
ext_qos: ext::QoSType::DEFAULT,
96+
ext_tstamp: None,
97+
ext_nodeid: ext::NodeIdType::DEFAULT,
98+
payload: value,
99+
}
100+
}
101+
}
102+
103+
impl From<Put> for Push {
104+
fn from(value: Put) -> Self {
105+
PushBody::from(value).into()
106+
}
107+
}
108+
109+
impl From<Del> for Push {
110+
fn from(value: Del) -> Self {
111+
PushBody::from(value).into()
112+
}
113+
}
114+
115+
#[cfg(test)]
116+
impl From<Vec<u8>> for Push {
117+
fn from(value: Vec<u8>) -> Self {
118+
Self::from(Put {
119+
payload: value.into(),
120+
..Put::default()
121+
})
122+
}
123+
}

commons/zenoh-protocol/src/zenoh/del.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ pub mod flag {
4040
pub const Z: u8 = 1 << 7; // 0x80 Extensions if Z==1 then an extension will follow
4141
}
4242

43-
#[derive(Debug, Clone, PartialEq, Eq)]
43+
#[derive(Debug, Default, Clone, PartialEq, Eq)]
4444
pub struct Del {
4545
pub timestamp: Option<Timestamp>,
4646
pub ext_sinfo: Option<ext::SourceInfoType>,

commons/zenoh-protocol/src/zenoh/put.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ pub mod flag {
4545
pub const Z: u8 = 1 << 7; // 0x80 Extensions if Z==1 then an extension will follow
4646
}
4747

48-
#[derive(Debug, Clone, PartialEq, Eq)]
48+
#[derive(Debug, Default, Clone, PartialEq, Eq)]
4949
pub struct Put {
5050
pub timestamp: Option<Timestamp>,
5151
pub encoding: Encoding,

io/zenoh-transport/src/common/batch.rs

+3-15
Original file line numberDiff line numberDiff line change
@@ -607,23 +607,11 @@ mod tests {
607607
let mut batch = WBatch::new(config);
608608

609609
let tmsg: TransportMessage = KeepAlive.into();
610-
let mut nmsg: NetworkMessage = Push {
610+
let mut nmsg = NetworkMessage::from(Push {
611611
wire_expr: WireExpr::empty(),
612612
ext_qos: ext::QoSType::new(Priority::DEFAULT, CongestionControl::Block, false),
613-
ext_tstamp: None,
614-
ext_nodeid: ext::NodeIdType::DEFAULT,
615-
payload: PushBody::Put(Put {
616-
timestamp: None,
617-
encoding: Encoding::empty(),
618-
ext_sinfo: None,
619-
#[cfg(feature = "shared-memory")]
620-
ext_shm: None,
621-
ext_attachment: None,
622-
ext_unknown: vec![],
623-
payload: ZBuf::from(vec![0u8; 8]),
624-
}),
625-
}
626-
.into();
613+
..Push::from(vec![0u8; 8])
614+
});
627615

628616
let mut tmsgs_in = vec![];
629617
let mut nmsgs_in = vec![];

io/zenoh-transport/src/common/pipeline.rs

+12-63
Original file line numberDiff line numberDiff line change
@@ -1047,25 +1047,12 @@ mod tests {
10471047
fn schedule(queue: TransmissionPipelineProducer, num_msg: usize, payload_size: usize) {
10481048
// Send reliable messages
10491049
let key = "test".into();
1050-
let payload = ZBuf::from(vec![0_u8; payload_size]);
10511050

1052-
let message: NetworkMessage = Push {
1051+
let message = NetworkMessage::from(Push {
10531052
wire_expr: key,
10541053
ext_qos: ext::QoSType::new(Priority::Control, CongestionControl::Block, false),
1055-
ext_tstamp: None,
1056-
ext_nodeid: ext::NodeIdType::DEFAULT,
1057-
payload: PushBody::Put(Put {
1058-
timestamp: None,
1059-
encoding: Encoding::empty(),
1060-
ext_sinfo: None,
1061-
#[cfg(feature = "shared-memory")]
1062-
ext_shm: None,
1063-
ext_attachment: None,
1064-
ext_unknown: vec![],
1065-
payload,
1066-
}),
1067-
}
1068-
.into();
1054+
..Push::from(vec![0_u8; payload_size])
1055+
});
10691056

10701057
println!(
10711058
"Pipeline Flow [>>>]: Sending {num_msg} messages with payload size of {payload_size} bytes"
@@ -1175,25 +1162,12 @@ mod tests {
11751162

11761163
// Send reliable messages
11771164
let key = "test".into();
1178-
let payload = ZBuf::from(vec![0_u8; payload_size]);
11791165

1180-
let message: NetworkMessage = Push {
1166+
let message = NetworkMessage::from(Push {
11811167
wire_expr: key,
11821168
ext_qos: ext::QoSType::new(Priority::Control, CongestionControl::Block, false),
1183-
ext_tstamp: None,
1184-
ext_nodeid: ext::NodeIdType::DEFAULT,
1185-
payload: PushBody::Put(Put {
1186-
timestamp: None,
1187-
encoding: Encoding::empty(),
1188-
ext_sinfo: None,
1189-
#[cfg(feature = "shared-memory")]
1190-
ext_shm: None,
1191-
ext_attachment: None,
1192-
ext_unknown: vec![],
1193-
payload,
1194-
}),
1195-
}
1196-
.into();
1169+
..Push::from(vec![0_u8; payload_size])
1170+
});
11971171

11981172
// The last push should block since there shouldn't any more batches
11991173
// available for serialization.
@@ -1289,29 +1263,16 @@ mod tests {
12891263

12901264
// Send reliable messages
12911265
let key = "pipeline/thr".into();
1292-
let payload = ZBuf::from(vec![0_u8; *size]);
12931266

1294-
let message: NetworkMessage = Push {
1267+
let message = NetworkMessage::from(Push {
12951268
wire_expr: key,
12961269
ext_qos: ext::QoSType::new(
12971270
Priority::Control,
12981271
CongestionControl::Block,
12991272
false,
13001273
),
1301-
ext_tstamp: None,
1302-
ext_nodeid: ext::NodeIdType::DEFAULT,
1303-
payload: PushBody::Put(Put {
1304-
timestamp: None,
1305-
encoding: Encoding::empty(),
1306-
ext_sinfo: None,
1307-
#[cfg(feature = "shared-memory")]
1308-
ext_shm: None,
1309-
ext_attachment: None,
1310-
ext_unknown: vec![],
1311-
payload,
1312-
}),
1313-
}
1314-
.into();
1274+
..Push::from(vec![0_u8; *size])
1275+
});
13151276

13161277
let duration = Duration::from_millis(5_500);
13171278
let start = Instant::now();
@@ -1354,23 +1315,11 @@ mod tests {
13541315
// Drop consumer to close the pipeline
13551316
drop(consumer);
13561317

1357-
let message: NetworkMessage = Push {
1318+
let message = NetworkMessage::from(Push {
13581319
wire_expr: "test".into(),
13591320
ext_qos: ext::QoSType::new(Priority::Control, CongestionControl::Block, true),
1360-
ext_tstamp: None,
1361-
ext_nodeid: ext::NodeIdType::DEFAULT,
1362-
payload: PushBody::Put(Put {
1363-
timestamp: None,
1364-
encoding: Encoding::empty(),
1365-
ext_sinfo: None,
1366-
#[cfg(feature = "shared-memory")]
1367-
ext_shm: None,
1368-
ext_attachment: None,
1369-
ext_unknown: vec![],
1370-
payload: vec![42u8].into(),
1371-
}),
1372-
}
1373-
.into();
1321+
..Push::from(vec![42u8])
1322+
});
13741323
// First message should not be rejected as the is one batch available in the queue
13751324
assert!(producer.push_network_message(message.as_ref()).is_ok());
13761325
// Second message should be rejected

0 commit comments

Comments
 (0)