PIDUINO
Loading...
Searching...
No Matches
threadsafebuffer.h
1/* Copyright © 2014 Chris Desjardins, http://blog.chrisd.info cjd@chrisd.info
2 * Copyright © 2018-2025 Pascal JEAN, https://github.com/epsilonrt
3 * This file is part of the Piduino Library.
4 *
5 * The Piduino Library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 3 of the License, or (at your option) any later version.
9 *
10 * The Piduino Library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Lesser General Public License for more details.
14 *
15 * You should have received a copy of the GNU Lesser General Public License
16 * along with the Piduino Library; if not, see <http://www.gnu.org/licenses/>.
17 */
18#pragma once
19
20#include <list>
21#include <vector>
22#include <mutex>
23#include <condition_variable>
24
25namespace Piduino {
26
33 template <class T> class ThreadSafeBuffer {
34 public:
41
43 }
44
45 virtual void write (const T& data) {
46 {
47 // create a new scope for the mutex
48 std::unique_lock<std::mutex> lock (_queueMutex);
49 writeData (data);
50 _msgNotification.notify_all();
51 }
52 }
53
54 virtual void write (const T * buf, size_t len) {
55 {
56 // create a new scope for the mutex
57 std::unique_lock<std::mutex> lock (_queueMutex);
58 writeData (buf, len);
59 _msgNotification.notify_all();
60 }
61 }
62
63 virtual void write (const std::vector<T>& dataVec) {
64 write (dataVec.data(), dataVec.size());
65 }
66
67 virtual bool read (T& data, long msTimeout = 0) {
68 bool ret = false;
69 {
70 // create a new scope for the mutex
71 std::unique_lock<std::mutex> lock (_queueMutex);
72 waitForData (lock, msTimeout);
73 ret = readData (data);
74 }
75 return ret;
76 }
77
78 // Dequeue everything
79 virtual size_t read (std::vector<T>& dataVec, long msTimeout = 0) {
80 size_t size = 0;
81 {
82 // create a new scope for the mutex
83 std::unique_lock<std::mutex> lock (_queueMutex);
84 waitForData (lock, msTimeout);
85 size = readData (dataVec);
86 }
87 return size;
88 }
89
90 // Dequeue everything
91 virtual size_t read (T * buf, size_t max, long msTimeout = 0) {
92 size_t size = 0;
93 {
94 // create a new scope for the mutex
95 std::unique_lock<std::mutex> lock (_queueMutex);
96 waitForData (lock, msTimeout);
97 size = readData (buf, max);
98 }
99 return size;
100 }
101
102 virtual bool peek (T& data, long msTimeout = 0) {
103 bool ret = false;
104 {
105 // create a new scope for the mutex
106 std::unique_lock<std::mutex> lock (_queueMutex);
107 waitForData (lock, msTimeout);
108 ret = peekData (data);
109 }
110 return ret;
111 }
112
113 // Dequeue everything
114 virtual size_t peek (std::vector<T>& dataVec, long msTimeout = 0) {
115 size_t size = 0;
116 {
117 // create a new scope for the mutex
118 std::unique_lock<std::mutex> lock (_queueMutex);
119 waitForData (lock, msTimeout);
120 size = peekData (dataVec);
121 }
122 return size;
123 }
124
125 // Dequeue everything
126 virtual size_t peek (T * buf, size_t max, long msTimeout = 0) {
127 size_t size = 0;
128 {
129 // create a new scope for the mutex
130 std::unique_lock<std::mutex> lock (_queueMutex);
131 waitForData (lock, msTimeout);
132 size = peekData (buf, max);
133 }
134 return size;
135 }
136
137 size_t size() const {
138 return _numEnqueued;
139 }
140
141 /*
142 ** This function allows you to perform operations on the
143 ** vector in a thread safe way. The functor is a function
144 ** with the following signature:
145 ** int func(std::list<T> &);
146 ** The return value is the number of elements added or removed
147 ** from the list, for example if 5 elements were removed
148 ** and 3 new elements were added then the return value should
149 ** be -2.
150 */
151 template <typename Functor> void iterate (Functor functor) {
152 {
153 // create a new scope for the mutex
154 std::unique_lock<std::mutex> lock (_queueMutex);
155 // the return value of this functor is added to the _numEnqueued
156 // so if you add buffers then return the number of buffers added
157 // or if you remove buffers then return -number of buffers removed
158 int numChanged = functor (_queue);
159 _numEnqueued += numChanged;
160 }
161 }
162
163 protected:
164
165 void waitForData (std::unique_lock<std::mutex>& lock, long msTimeout) {
166 if (msTimeout != 0) {
167 // This function assumes that _queueMutex is locked already!
168 std::chrono::system_clock::time_point timeLimit = std::chrono::system_clock::now() +
169 std::chrono::milliseconds (msTimeout);
170 while (_queue.empty() == true) {
171 // if timeout is specified, then wait until the time is up
172 // otherwise wait forever (forever is msTimeout = -1)
173 if (msTimeout > 0) {
174 _msgNotification.wait_until (lock, timeLimit);
175 if (std::chrono::system_clock::now() >= timeLimit) {
176 break;
177 }
178 }
179 else {
180 _msgNotification.wait (lock);
181 }
182 }
183 }
184 }
185
186 void writeData (const T& data) {
187 // This function assumes that _queueMutex is locked already!
188 _queue.push_back (data);
189 _numEnqueued++;
190 }
191
192 void writeData (const T * buf, size_t len) {
193 // This function assumes that _queueMutex is locked already!
194 while (len--) {
195 writeData (*buf++);
196 }
197 }
198
199 bool readData (T& data) {
200 // This function assumes that _queueMutex is locked already!
201 bool ret = false;
202 if (_queue.empty() == false) {
203 data = _queue.front();
204 _queue.pop_front();
205 _numEnqueued--;
206 ret = true;
207 }
208 return ret;
209 }
210
211 size_t readData (std::vector<T>& dataVec) {
212 // This function assumes that _queueMutex is locked already!
213 size_t size = 0;
214 T data;
215 while (readData (data) == true) {
216 size += sizeOfData (data);
217 dataVec.push_back (data);
218 }
219 return size;
220 }
221
222 size_t readData (T * buf, size_t max) {
223 // This function assumes that _queueMutex is locked already!
224 max = std::min (_numEnqueued, max);
225 size_t size = 0;
226 T data;
227 while ( (readData (data) == true) && (size <= max)) {
228 *buf++ = data;
229 size += sizeOfData (data);
230 }
231 return size;
232 }
233
234 bool peekData (T& data) {
235 // This function assumes that _queueMutex is locked already!
236 bool ret = false;
237 if (_queue.empty() == false) {
238 data = _queue.front();
239 ret = true;
240 }
241 return ret;
242 }
243
244 size_t peekData (std::vector<T>& dataVec) {
245 // This function assumes that _queueMutex is locked already!
246 size_t size = 0;
247 T data;
248 while (peekData (data) == true) {
249 size += sizeOfData (data);
250 dataVec.push_back (data);
251 }
252 return size;
253 }
254
255 size_t peekData (T * buf, size_t max) {
256 // This function assumes that _queueMutex is locked already!
257 max = std::min (_numEnqueued, max);
258 size_t size = 0;
259 T data;
260 while ( (peekData (data) == true) && (size <= max)) {
261 *buf++ = data;
262 size += sizeOfData (data);
263 }
264 return size;
265 }
266
267 virtual size_t sizeOfData (const T&) const {
268 return sizeof (T);
269 }
270
271 std::list<T> _queue;
272 std::mutex _queueMutex;
273 std::condition_variable _msgNotification;
275 };
276}
277/* ========================================================================== */
This provides a thread safe buffer.
virtual bool read(T &data, long msTimeout=0)
virtual size_t read(std::vector< T > &dataVec, long msTimeout=0)
virtual void write(const std::vector< T > &dataVec)
void iterate(Functor functor)
virtual size_t peek(std::vector< T > &dataVec, long msTimeout=0)
void waitForData(std::unique_lock< std::mutex > &lock, long msTimeout)
virtual size_t read(T *buf, size_t max, long msTimeout=0)
void writeData(const T *buf, size_t len)
size_t readData(std::vector< T > &dataVec)
virtual size_t sizeOfData(const T &) const
void writeData(const T &data)
size_t peekData(std::vector< T > &dataVec)
virtual void write(const T &data)
virtual void write(const T *buf, size_t len)
size_t peekData(T *buf, size_t max)
size_t readData(T *buf, size_t max)
std::condition_variable _msgNotification
virtual size_t peek(T *buf, size_t max, long msTimeout=0)
virtual bool peek(T &data, long msTimeout=0)
Global namespace for Piduino.
Definition board.h:28