@@ -63,8 +63,15 @@ void ShmQueueMeta::Initialize(size_t max_block_num, size_t max_buf_size) {
63
63
read_block_id_ = 0 ;
64
64
alloc_offset_ = 0 ;
65
65
released_offset_ = 0 ;
66
- sem_init (&alloc_lock_, 1 , 1 );
67
- sem_init (&release_lock_, 1 , 1 );
66
+ pthread_mutexattr_t mutex_attr;
67
+ pthread_mutexattr_init (&mutex_attr);
68
+ pthread_mutexattr_setpshared (&mutex_attr, PTHREAD_PROCESS_SHARED);
69
+ pthread_mutex_init (&mutex_, &mutex_attr);
70
+ pthread_condattr_t cond_attr;
71
+ pthread_condattr_init (&cond_attr);
72
+ pthread_condattr_setpshared (&cond_attr, PTHREAD_PROCESS_SHARED);
73
+ pthread_cond_init (&alloc_cond_, &cond_attr);
74
+ pthread_cond_init (&release_cond_, &cond_attr);
68
75
for (size_t i = 0 ; i < max_block_num_; i++) {
69
76
GetBlockMeta (i).Initialize ();
70
77
}
@@ -74,16 +81,17 @@ void ShmQueueMeta::Finalize() {
74
81
for (size_t i = 0 ; i < max_block_num_; i++) {
75
82
GetBlockMeta (i).Finalize ();
76
83
}
77
- sem_destroy (&alloc_lock_);
78
- sem_destroy (&release_lock_);
84
+ pthread_mutex_destroy (&mutex_);
85
+ pthread_cond_destroy (&alloc_cond_);
86
+ pthread_cond_destroy (&release_cond_);
79
87
}
80
88
81
89
size_t ShmQueueMeta::GetBlockToWrite (size_t size,
82
90
size_t * begin_offset,
83
91
size_t * data_offset,
84
92
size_t * end_offset) {
85
- sem_wait (&alloc_lock_ );
86
- auto id = write_block_id_++;
93
+ pthread_mutex_lock (&mutex_ );
94
+ size_t id = write_block_id_++;
87
95
auto ring_offset = alloc_offset_ % max_buf_size_;
88
96
auto tail_frag_size = max_buf_size_ - ring_offset;
89
97
*begin_offset = alloc_offset_;
@@ -94,29 +102,78 @@ size_t ShmQueueMeta::GetBlockToWrite(size_t size,
94
102
alloc_offset_ += size;
95
103
*end_offset = alloc_offset_;
96
104
Check (*end_offset - *begin_offset < max_buf_size_, " message is too large!" );
97
- sem_post (&alloc_lock_);
105
+ pthread_mutex_unlock (&mutex_);
106
+
107
+ // Notify one reader thread
108
+ pthread_cond_signal (&alloc_cond_);
109
+
110
+ // Wait until no conflict
111
+ pthread_mutex_lock (&mutex_);
112
+ auto condition = [this , id, end_offset] {
113
+ return (id < read_block_id_ + max_block_num_) &&
114
+ (*end_offset < released_offset_ + max_buf_size_);
115
+ };
116
+ while (!condition ()) {
117
+ pthread_cond_wait (&release_cond_, &mutex_);
118
+ }
119
+ pthread_mutex_unlock (&mutex_);
120
+
98
121
return id;
99
122
}
100
123
101
- size_t ShmQueueMeta::GetBlockToRead () {
102
- return __sync_fetch_and_add (&read_block_id_, 1 );
124
+ size_t ShmQueueMeta::GetBlockToRead (uint32_t timeout_ms) {
125
+ auto condition = [this ] {
126
+ if (read_block_id_ >= write_block_id_) {
127
+ return false ;
128
+ }
129
+ return true ;
130
+ };
131
+ pthread_mutex_lock (&mutex_);
132
+ if (timeout_ms == 0 ) {
133
+ while (!condition ()) {
134
+ pthread_cond_wait (&alloc_cond_, &mutex_);
135
+ }
136
+ } else {
137
+ struct timespec until {};
138
+ clock_gettime (CLOCK_REALTIME, &until);
139
+ until.tv_sec += timeout_ms / 1000 ;
140
+ until.tv_nsec += (timeout_ms % 1000 ) * 1000000 ;
141
+ while (!condition ()) {
142
+ int ret = pthread_cond_timedwait (&alloc_cond_, &mutex_, &until);
143
+ if (ret == ETIMEDOUT) {
144
+ throw QueueTimeoutError ();
145
+ }
146
+ }
147
+ }
148
+ auto id = read_block_id_++;
149
+ pthread_mutex_unlock (&mutex_);
150
+
151
+ // Notify all waiting writer thread
152
+ pthread_cond_broadcast (&release_cond_);
153
+
154
+ return id;
103
155
}
104
156
105
157
void ShmQueueMeta::ReleaseBlock (size_t id) {
106
- sem_wait (&release_lock_ );
158
+ pthread_mutex_lock (&mutex_ );
107
159
GetBlockMeta (id).release = true ;
160
+ bool release_some = false ;
108
161
while (id < read_block_id_) {
109
162
auto & block = GetBlockMeta (id);
110
163
if (block.release && block.begin == released_offset_) {
111
164
released_offset_ = block.end ;
112
165
block.release = false ;
113
166
block.NotifyToWrite ();
167
+ release_some = true ;
114
168
} else {
115
169
break ;
116
170
}
117
171
id++;
118
172
}
119
- sem_post (&release_lock_);
173
+ pthread_mutex_unlock (&mutex_);
174
+ if (release_some) {
175
+ pthread_cond_broadcast (&release_cond_);
176
+ }
120
177
}
121
178
122
179
ShmQueueMeta::BlockMeta& ShmQueueMeta::GetBlockMeta (size_t id) {
@@ -192,13 +249,8 @@ void ShmQueue::Enqueue(const void* data, size_t size) {
192
249
193
250
void ShmQueue::Enqueue (size_t size, WriteFunc func) {
194
251
size_t begin_offset, data_offset, end_offset;
195
- auto block_id = meta_->GetBlockToWrite (
196
- size, &begin_offset, &data_offset, &end_offset);
197
- // Check for ring buffer conflicts.
198
- while (block_id >= meta_->read_block_id_ + max_block_num_ ||
199
- end_offset >= meta_->released_offset_ + max_buf_size_) {
200
- std::this_thread::sleep_for (std::chrono::milliseconds (1 ));
201
- }
252
+ auto block_id =
253
+ meta_->GetBlockToWrite (size, &begin_offset, &data_offset, &end_offset);
202
254
203
255
auto & block = meta_->GetBlockMeta (block_id);
204
256
block.WaitForWriting ();
@@ -213,19 +265,8 @@ void ShmQueue::Enqueue(size_t size, WriteFunc func) {
213
265
block.NotifyToRead ();
214
266
}
215
267
216
- ShmData ShmQueue::Dequeue (unsigned int timeout_ms) {
217
- auto timeout_duration = std::chrono::milliseconds (timeout_ms);
218
- auto start_time = std::chrono::steady_clock::now ();
219
- while (meta_->read_block_id_ >= meta_->write_block_id_ ) {
220
- if (timeout_ms > 0 ) {
221
- auto elapsed_time = std::chrono::steady_clock::now () - start_time;
222
- if (elapsed_time > timeout_duration) {
223
- throw QueueTimeoutError ();
224
- }
225
- }
226
- std::this_thread::sleep_for (std::chrono::milliseconds (1 ));
227
- }
228
- auto block_id = meta_->GetBlockToRead ();
268
+ ShmData ShmQueue::Dequeue (uint32_t timeout_ms) {
269
+ auto block_id = meta_->GetBlockToRead (timeout_ms);
229
270
230
271
auto & block = meta_->GetBlockMeta (block_id);
231
272
block.WaitForReading ();
0 commit comments