Skip to content

Commit ea7c75b

Browse files
committed
Makes the serializers compatible with async I/O
1 parent f337faa commit ea7c75b

File tree

6 files changed

+164
-70
lines changed

6 files changed

+164
-70
lines changed

README.md

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,15 @@ assert_eq!(reader.read_next_event()?, JsonEvent::Eof);
2929
Writer example:
3030

3131
```rust
32-
use json_event_parser::{JsonWriter, JsonEvent};
32+
use json_event_parser::{ToWriteJsonWriter, JsonEvent};
3333

34-
let mut buffer = Vec::new();
35-
let mut writer = JsonWriter::from_writer(&mut buffer);
34+
let mut writer = ToWriteJsonWriter::new(Vec::new());
3635
writer.write_event(JsonEvent::StartObject)?;
3736
writer.write_event(JsonEvent::ObjectKey("foo".into()))?;
3837
writer.write_event(JsonEvent::Number("1".into()))?;
3938
writer.write_event(JsonEvent::EndObject)?;
4039

41-
assert_eq!(buffer.as_slice(), b"{\"foo\":1}");
40+
assert_eq!(writer.finish()?.as_slice(), b"{\"foo\":1}");
4241
# std::io::Result::Ok(())
4342
```
4443

fuzz/fuzz_targets/parse.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#![no_main]
22

33
use json_event_parser::{
4-
JsonEvent, JsonWriter, LowLevelJsonReader, LowLevelJsonReaderResult, SyntaxError,
4+
JsonEvent, LowLevelJsonReader, LowLevelJsonReaderResult, SyntaxError, ToWriteJsonWriter,
55
};
66
use libfuzzer_sys::fuzz_target;
77

@@ -10,7 +10,7 @@ fn parse_chunks(chunks: &[&[u8]]) -> (String, Option<SyntaxError>) {
1010
let mut input_cursor = 0;
1111
let mut output_buffer = Vec::new();
1212
let mut reader = LowLevelJsonReader::new();
13-
let mut writer = JsonWriter::from_writer(&mut output_buffer);
13+
let mut writer = ToWriteJsonWriter::new(&mut output_buffer);
1414
for (i, chunk) in chunks.iter().enumerate() {
1515
input_buffer.extend_from_slice(chunk);
1616
loop {
@@ -21,7 +21,8 @@ fn parse_chunks(chunks: &[&[u8]]) -> (String, Option<SyntaxError>) {
2121
input_cursor += consumed_bytes;
2222
match event {
2323
Some(Ok(JsonEvent::Eof)) => {
24-
return (String::from_utf8(output_buffer).unwrap(), None)
24+
writer.finish().unwrap();
25+
return (String::from_utf8(output_buffer).unwrap(), None);
2526
}
2627
Some(Ok(event)) => writer.write_event(event).unwrap(),
2728
Some(Err(e)) => return (String::from_utf8(output_buffer).unwrap(), Some(e)),

src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ pub use crate::read::{
1919
FromBufferJsonReader, FromReadJsonReader, LowLevelJsonReader, LowLevelJsonReaderResult,
2020
ParseError, SyntaxError, TextPosition,
2121
};
22-
pub use crate::write::JsonWriter;
22+
#[cfg(feature = "async-tokio")]
23+
pub use crate::write::ToTokioAsyncWriteJsonWriter;
24+
pub use crate::write::{LowLevelJsonWriter, ToWriteJsonWriter};
2325
use std::borrow::Cow;
2426

2527
/// Possible events during JSON parsing.

src/read.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ pub struct FromReadJsonReader<R: Read> {
3737
}
3838

3939
impl<R: Read> FromReadJsonReader<R> {
40-
pub fn new(read: R) -> Self {
40+
pub const fn new(read: R) -> Self {
4141
Self {
42-
input_buffer: Vec::with_capacity(MIN_BUFFER_SIZE),
42+
input_buffer: Vec::new(),
4343
input_buffer_start: 0,
4444
input_buffer_end: 0,
4545
max_buffer_size: MAX_BUFFER_SIZE,
@@ -140,9 +140,9 @@ pub struct FromTokioAsyncReadJsonReader<R: AsyncRead + Unpin> {
140140

141141
#[cfg(feature = "async-tokio")]
142142
impl<R: AsyncRead + Unpin> FromTokioAsyncReadJsonReader<R> {
143-
pub fn new(read: R) -> Self {
143+
pub const fn new(read: R) -> Self {
144144
Self {
145-
input_buffer: Vec::with_capacity(MIN_BUFFER_SIZE),
145+
input_buffer: Vec::new(),
146146
input_buffer_start: 0,
147147
input_buffer_end: 0,
148148
max_buffer_size: MAX_BUFFER_SIZE,
@@ -234,7 +234,7 @@ pub struct FromBufferJsonReader<'a> {
234234
}
235235

236236
impl<'a> FromBufferJsonReader<'a> {
237-
pub fn new(buffer: &'a [u8]) -> Self {
237+
pub const fn new(buffer: &'a [u8]) -> Self {
238238
Self {
239239
input_buffer: buffer,
240240
parser: LowLevelJsonReader::new(),

src/write.rs

Lines changed: 144 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,71 +1,149 @@
11
use crate::JsonEvent;
22
use std::io::{Error, ErrorKind, Result, Write};
3+
#[cfg(feature = "async-tokio")]
4+
use tokio::io::{AsyncWrite, AsyncWriteExt};
35

4-
/// A JSON streaming writer.
6+
/// A JSON streaming writer writing to a [`Write`] implementation.
57
///
68
/// ```
7-
/// use json_event_parser::{JsonWriter, JsonEvent};
9+
/// use json_event_parser::{ToWriteJsonWriter, JsonEvent};
810
///
9-
/// let mut buffer = Vec::new();
10-
/// let mut writer = JsonWriter::from_writer(&mut buffer);
11+
/// let mut writer = ToWriteJsonWriter::new(Vec::new());
1112
/// writer.write_event(JsonEvent::StartObject)?;
1213
/// writer.write_event(JsonEvent::ObjectKey("foo".into()))?;
1314
/// writer.write_event(JsonEvent::Number("1".into()))?;
1415
/// writer.write_event(JsonEvent::EndObject)?;
1516
///
16-
/// assert_eq!(buffer.as_slice(), b"{\"foo\":1}");
17+
/// assert_eq!(writer.finish()?.as_slice(), b"{\"foo\":1}");
18+
/// # std::io::Result::Ok(())
19+
/// ```
20+
pub struct ToWriteJsonWriter<W: Write> {
21+
write: W,
22+
writer: LowLevelJsonWriter,
23+
}
24+
25+
impl<W: Write> ToWriteJsonWriter<W> {
26+
pub const fn new(write: W) -> Self {
27+
Self {
28+
write,
29+
writer: LowLevelJsonWriter::new(),
30+
}
31+
}
32+
33+
pub fn write_event(&mut self, event: JsonEvent<'_>) -> Result<()> {
34+
self.writer.write_event(event, &mut self.write)
35+
}
36+
37+
pub fn finish(self) -> Result<W> {
38+
self.writer.validate_eof()?;
39+
Ok(self.write)
40+
}
41+
}
42+
43+
/// A JSON streaming writer writing to an [`AsyncWrite`] implementation.
44+
///
45+
/// ```
46+
/// use json_event_parser::{ToTokioAsyncWriteJsonWriter, JsonEvent};
1747
///
48+
/// # #[tokio::main(flavor = "current_thread")]
49+
/// # async fn main() -> ::std::io::Result<()> {
50+
/// let mut writer = ToTokioAsyncWriteJsonWriter::new(Vec::new());
51+
/// writer.write_event(JsonEvent::StartObject).await?;
52+
/// writer.write_event(JsonEvent::ObjectKey("foo".into())).await?;
53+
/// writer.write_event(JsonEvent::Number("1".into())).await?;
54+
/// writer.write_event(JsonEvent::EndObject).await?;
55+
/// assert_eq!(writer.finish()?.as_slice(), b"{\"foo\":1}");
56+
/// # Ok(())
57+
/// # }
58+
/// ```
59+
#[cfg(feature = "async-tokio")]
60+
pub struct ToTokioAsyncWriteJsonWriter<W: AsyncWrite + Unpin> {
61+
write: W,
62+
writer: LowLevelJsonWriter,
63+
buffer: Vec<u8>,
64+
}
65+
66+
#[cfg(feature = "async-tokio")]
67+
impl<W: AsyncWrite + Unpin> ToTokioAsyncWriteJsonWriter<W> {
68+
pub const fn new(write: W) -> Self {
69+
Self {
70+
write,
71+
writer: LowLevelJsonWriter::new(),
72+
buffer: Vec::new(),
73+
}
74+
}
75+
76+
pub async fn write_event(&mut self, event: JsonEvent<'_>) -> Result<()> {
77+
self.writer.write_event(event, &mut self.buffer)?;
78+
self.write.write_all(&self.buffer).await?;
79+
self.buffer.clear();
80+
Ok(())
81+
}
82+
83+
pub fn finish(self) -> Result<W> {
84+
self.writer.validate_eof()?;
85+
Ok(self.write)
86+
}
87+
}
88+
89+
/// A low-level JSON streaming writer writing to a [`Write`] implementation.
90+
///
91+
/// YOu probably want to use [`ToWriteJsonWriter`] instead.
92+
///
93+
/// ```
94+
/// use json_event_parser::{JsonEvent, LowLevelJsonWriter};
95+
///
96+
/// let mut writer = LowLevelJsonWriter::new();
97+
/// let mut output = Vec::new();
98+
/// writer.write_event(JsonEvent::StartObject, &mut output)?;
99+
/// writer.write_event(JsonEvent::ObjectKey("foo".into()), &mut output)?;
100+
/// writer.write_event(JsonEvent::Number("1".into()), &mut output)?;
101+
/// writer.write_event(JsonEvent::EndObject, &mut output)?;
102+
///
103+
/// assert_eq!(output.as_slice(), b"{\"foo\":1}");
18104
/// # std::io::Result::Ok(())
19105
/// ```
20-
pub struct JsonWriter<W: Write> {
21-
writer: W,
106+
107+
#[derive(Default)]
108+
pub struct LowLevelJsonWriter {
22109
state_stack: Vec<JsonState>,
23110
element_written: bool,
24111
}
25112

26-
impl<W: Write> JsonWriter<W> {
27-
pub fn from_writer(writer: W) -> Self {
113+
impl LowLevelJsonWriter {
114+
pub const fn new() -> Self {
28115
Self {
29-
writer,
30116
state_stack: Vec::new(),
31117
element_written: false,
32118
}
33119
}
34120

35-
pub fn into_inner(self) -> W {
36-
self.writer
37-
}
38-
39-
pub fn inner(&mut self) -> &mut W {
40-
&mut self.writer
41-
}
42-
43-
pub fn write_event(&mut self, event: JsonEvent<'_>) -> Result<()> {
121+
pub fn write_event(&mut self, event: JsonEvent<'_>, mut write: impl Write) -> Result<()> {
44122
match event {
45123
JsonEvent::String(s) => {
46-
self.before_value()?;
47-
write_escaped_json_string(&s, &mut self.writer)
124+
self.before_value(&mut write)?;
125+
write_escaped_json_string(&s, write)
48126
}
49127
JsonEvent::Number(number) => {
50-
self.before_value()?;
51-
self.writer.write_all(number.as_bytes())
128+
self.before_value(&mut write)?;
129+
write.write_all(number.as_bytes())
52130
}
53131
JsonEvent::Boolean(b) => {
54-
self.before_value()?;
55-
self.writer.write_all(if b { b"true" } else { b"false" })
132+
self.before_value(&mut write)?;
133+
write.write_all(if b { b"true" } else { b"false" })
56134
}
57135
JsonEvent::Null => {
58-
self.before_value()?;
59-
self.writer.write_all(b"null")
136+
self.before_value(&mut write)?;
137+
write.write_all(b"null")
60138
}
61139
JsonEvent::StartArray => {
62-
self.before_value()?;
140+
self.before_value(&mut write)?;
63141
self.state_stack.push(JsonState::OpenArray);
64-
self.writer.write_all(b"[")
142+
write.write_all(b"[")
65143
}
66144
JsonEvent::EndArray => match self.state_stack.pop() {
67145
Some(JsonState::OpenArray) | Some(JsonState::ContinuationArray) => {
68-
self.writer.write_all(b"]")
146+
write.write_all(b"]")
69147
}
70148
Some(s) => {
71149
self.state_stack.push(s);
@@ -80,13 +158,13 @@ impl<W: Write> JsonWriter<W> {
80158
)),
81159
},
82160
JsonEvent::StartObject => {
83-
self.before_value()?;
161+
self.before_value(&mut write)?;
84162
self.state_stack.push(JsonState::OpenObject);
85-
self.writer.write_all(b"{")
163+
write.write_all(b"{")
86164
}
87165
JsonEvent::EndObject => match self.state_stack.pop() {
88166
Some(JsonState::OpenObject) | Some(JsonState::ContinuationObject) => {
89-
self.writer.write_all(b"}")
167+
write.write_all(b"}")
90168
}
91169
Some(s) => {
92170
self.state_stack.push(s);
@@ -103,7 +181,7 @@ impl<W: Write> JsonWriter<W> {
103181
JsonEvent::ObjectKey(key) => {
104182
match self.state_stack.pop() {
105183
Some(JsonState::OpenObject) => (),
106-
Some(JsonState::ContinuationObject) => self.writer.write_all(b",")?,
184+
Some(JsonState::ContinuationObject) => write.write_all(b",")?,
107185
_ => {
108186
return Err(Error::new(
109187
ErrorKind::InvalidInput,
@@ -113,8 +191,8 @@ impl<W: Write> JsonWriter<W> {
113191
}
114192
self.state_stack.push(JsonState::ContinuationObject);
115193
self.state_stack.push(JsonState::ObjectValue);
116-
write_escaped_json_string(&key, &mut self.writer)?;
117-
self.writer.write_all(b":")
194+
write_escaped_json_string(&key, &mut write)?;
195+
write.write_all(b":")
118196
}
119197
JsonEvent::Eof => Err(Error::new(
120198
ErrorKind::InvalidInput,
@@ -123,15 +201,15 @@ impl<W: Write> JsonWriter<W> {
123201
}
124202
}
125203

126-
fn before_value(&mut self) -> Result<()> {
204+
fn before_value(&mut self, mut write: impl Write) -> Result<()> {
127205
match self.state_stack.pop() {
128206
Some(JsonState::OpenArray) => {
129207
self.state_stack.push(JsonState::ContinuationArray);
130208
Ok(())
131209
}
132210
Some(JsonState::ContinuationArray) => {
133211
self.state_stack.push(JsonState::ContinuationArray);
134-
self.writer.write_all(b",")?;
212+
write.write_all(b",")?;
135213
Ok(())
136214
}
137215
Some(last_state @ JsonState::OpenObject)
@@ -156,6 +234,22 @@ impl<W: Write> JsonWriter<W> {
156234
}
157235
}
158236
}
237+
238+
fn validate_eof(&self) -> Result<()> {
239+
if !self.state_stack.is_empty() {
240+
return Err(Error::new(
241+
ErrorKind::InvalidInput,
242+
"The written JSON is not balanced: an object or an array has not been closed",
243+
));
244+
}
245+
if !self.element_written {
246+
return Err(Error::new(
247+
ErrorKind::InvalidInput,
248+
"A JSON file can't be empty",
249+
));
250+
}
251+
Ok(())
252+
}
159253
}
160254

161255
enum JsonState {
@@ -166,37 +260,37 @@ enum JsonState {
166260
ObjectValue,
167261
}
168262

169-
fn write_escaped_json_string(s: &str, sink: &mut impl Write) -> Result<()> {
170-
sink.write_all(b"\"")?;
263+
fn write_escaped_json_string(s: &str, mut write: impl Write) -> Result<()> {
264+
write.write_all(b"\"")?;
171265
let mut buffer = [b'\\', b'u', 0, 0, 0, 0];
172266
for c in s.chars() {
173267
match c {
174-
'\\' => sink.write_all(b"\\\\"),
175-
'"' => sink.write_all(b"\\\""),
268+
'\\' => write.write_all(b"\\\\"),
269+
'"' => write.write_all(b"\\\""),
176270
c => {
177271
if c < char::from(32) {
178272
match c {
179-
'\u{08}' => sink.write_all(b"\\b"),
180-
'\u{0C}' => sink.write_all(b"\\f"),
181-
'\n' => sink.write_all(b"\\n"),
182-
'\r' => sink.write_all(b"\\r"),
183-
'\t' => sink.write_all(b"\\t"),
273+
'\u{08}' => write.write_all(b"\\b"),
274+
'\u{0C}' => write.write_all(b"\\f"),
275+
'\n' => write.write_all(b"\\n"),
276+
'\r' => write.write_all(b"\\r"),
277+
'\t' => write.write_all(b"\\t"),
184278
c => {
185279
let mut c = c as u8;
186280
for i in (2..6).rev() {
187281
let ch = c % 16;
188282
buffer[i] = if ch < 10 { b'0' + ch } else { b'A' + ch - 10 };
189283
c /= 16;
190284
}
191-
sink.write_all(&buffer)
285+
write.write_all(&buffer)
192286
}
193287
}
194288
} else {
195-
sink.write_all(c.encode_utf8(&mut buffer[2..]).as_bytes())
289+
write.write_all(c.encode_utf8(&mut buffer[2..]).as_bytes())
196290
}
197291
}
198292
}?;
199293
}
200-
sink.write_all(b"\"")?;
294+
write.write_all(b"\"")?;
201295
Ok(())
202296
}

0 commit comments

Comments
 (0)