forked from ravinet/mahimahi
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbytestream_queue.cc
91 lines (71 loc) · 2.73 KB
/
bytestream_queue.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
/* -*-mode:c++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
#include <cassert>
#include "bytestream_queue.hh"
using namespace std;
ByteStreamQueue::ByteStreamQueue( const size_t size )
: buffer_( size, 0 ),
next_byte_to_push( 0 ),
next_byte_to_pop( 0 ),
space_available( [&] () { return available_to_push() > 0; } ),
non_empty( [&] () { return available_to_pop() > 0; } )
{
assert( size > 1 );
}
size_t ByteStreamQueue::available_to_pop( void ) const
{
return next_byte_to_push - next_byte_to_pop
+ ( next_byte_to_pop > next_byte_to_push ? buffer_.size() : 0 );
}
size_t ByteStreamQueue::available_to_push( void ) const
{
return next_byte_to_pop - next_byte_to_push - 1
+ ( next_byte_to_pop > next_byte_to_push ? 0 : buffer_.size() );
}
ByteStreamQueue::Result ByteStreamQueue::push( FileDescriptor & fd )
{
/* will need to handle case if it's possible that more than
one action might push to this queue */
assert( space_available() );
size_t contiguous_space_to_push = available_to_push();
if ( next_byte_to_push + contiguous_space_to_push >= buffer_.size() ) {
contiguous_space_to_push = buffer_.size() - next_byte_to_push;
}
assert( contiguous_space_to_push > 0 );
/* read from the fd */
string new_chunk = fd.read( contiguous_space_to_push );
if ( new_chunk.empty() ) {
return Result::EndOfFile;
}
assert( new_chunk.size() <= buffer_.size() - next_byte_to_push );
memcpy( &buffer_.at( next_byte_to_push ), new_chunk.data(), new_chunk.size() );
next_byte_to_push += new_chunk.size();
assert( next_byte_to_push <= buffer_.size() );
if ( next_byte_to_push == buffer_.size() ) {
next_byte_to_push = 0;
}
assert( non_empty() );
return Result::Success;
}
void ByteStreamQueue::pop( FileDescriptor & fd )
{
/* will need to handle case if it's possible that more than
one action might pop from this queue */
assert( non_empty() );
size_t contiguous_space_to_pop = available_to_pop();
if ( next_byte_to_pop + contiguous_space_to_pop >= buffer_.size() ) {
contiguous_space_to_pop = buffer_.size() - next_byte_to_pop;
}
decltype(buffer_)::const_iterator pop_iterator = buffer_.begin() + next_byte_to_pop;
auto end_of_pop = pop_iterator + contiguous_space_to_pop;
assert( end_of_pop >= pop_iterator );
pop_iterator = fd.write_some( pop_iterator, end_of_pop );
next_byte_to_pop = pop_iterator - buffer_.begin();
assert( next_byte_to_pop <= buffer_.size() );
if ( next_byte_to_pop == buffer_.size() ) {
next_byte_to_pop = 0;
}
}
bool eof( const ByteStreamQueue::Result & r )
{
return r == ByteStreamQueue::Result::EndOfFile;
}