SCIRun  5.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Mailbox.h
Go to the documentation of this file.
1 /*
2  For more information, please see: http://software.sci.utah.edu
3 
4  The MIT License
5 
6  Copyright (c) 2009 Scientific Computing and Imaging Institute,
7  University of Utah.
8 
9 
10  Permission is hereby granted, free of charge, to any person obtaining a
11  copy of this software and associated documentation files (the "Software"),
12  to deal in the Software without restriction, including without limitation
13  the rights to use, copy, modify, merge, publish, distribute, sublicense,
14  and/or sell copies of the Software, and to permit persons to whom the
15  Software is furnished to do so, subject to the following conditions:
16 
17  The above copyright notice and this permission notice shall be included
18  in all copies or substantial portions of the Software.
19 
20  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
21  OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
23  THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
25  FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
26  DEALINGS IN THE SOFTWARE.
27 */
28 
29 
30 
31 ///
32 ///@file Mailbox.h
33 ///@brief Threadsafe FIFO
34 ///
35 ///@author Steve Parker
36 /// Department of Computer Science
37 /// University of Utah
38 ///@date June 1997
39 ///
40 
41 #ifndef Core_Thread_Mailbox_h
42 #define Core_Thread_Mailbox_h
43 
44 #include <Core/Thread/Legacy/Legacy/ConditionVariable.h>
45 #include <Core/Thread/Legacy/Legacy/Mutex.h>
46 #include <Core/Thread/Legacy/Legacy/Semaphore.h>
47 #include <Core/Thread/Legacy/Legacy/Thread.h>
48 
49 #include <vector>
50 
51 
52 namespace SCIRun {
53 /**************************************
54 
55 @class
56  Mailbox
57 
58  KEYWORDS
59  Thread, FIFO
60 
61 @details
62  A thread-safe, fixed-length FIFO queue which allows multiple
63  concurrent senders and receivers. Multiple threads send <b>Item</b>s
64  to the mailbox, and multiple thread may receive <b>Item</b>s from the
65  mailbox. Items are typically pointers to a message structure.
66 
67 ****************************************/
68 template<class Item> class Mailbox {
69 public:
70  //////////
71  /// Create a mailbox with a maximum queue size of <i>size</i>
72  /// items. If size is zero, then the mailbox will use
73  /// <i>rendevous semantics</i>, where a sender will block
74  /// until a reciever is waiting for the item. The item will
75  /// be handed off synchronously. <i>name</i> should be a
76  /// static string which describes the primitive for debugging
77  /// purposes.
78  Mailbox(const char* name, int size);
79 
80  //////////
81  /// Destroy the mailbox. All items in the queue are silently
82  /// dropped.
83  ~Mailbox();
84 
85  //////////
86  /// Puts <i>msg</i> in the queue. If the queue is full, the
87  /// thread will be blocked until there is room in the queue.
88  /// Messages from the same thread will be placed in the
89  /// queue in a first-in/first out order. Multiple threads may
90  /// call <i>send</i> concurrently, and the messages will be
91  /// placed in the queue in an arbitrary order.
92  void send(const Item& msg);
93 
94  //////////
95  /// Attempt to send <i>msg</i> to the queue. If the queue is
96  /// full, the thread will not be blocked, and <i>trySend</i>
97  /// will return false. Otherwise, <i>trySend</i> will return
98  /// true. This may never complete if the reciever only uses
99  /// <i>tryRecieve</i>.
100  bool trySend(const Item& msg);
101 
102 
103  //////////
104  /// Send <i>msg</i> to the queue only if the <i>checker</i> function
105  /// fails to compare the <i>msg</i> to what is already the last item
106  /// there. This is useful if we want to make certain that at least
107  /// one of a particular message (like a viewer resize redraw) gets
108  /// put on the mailbox without spamming it. This blocks like
109  /// <i>send</i> does. It returns true if <i>msg</i> was added to the
110  /// queue and false if it wasn't.
111  bool sendIfNotSentLast(const Item& msg,
112  bool (*checker)(const Item &a, const Item &b));
113 
114  //////////
115  /// Receive an item from the queue. If the queue is empty,
116  /// the thread will block until another thread sends an item.
117  /// Multiple threads may call <i>recieve</i> concurrently, but
118  /// no guarantee is made as to which thread will recieve the
119  /// next token. However, implementors should give preference
120  /// to the thread that has been waiting the longest.
121  Item receive();
122 
123  //////////
124  /// Attempt to recieve <i>item</i> from the mailbox. If the
125  /// queue is empty, the thread is blocked and <i>tryRecieve</i>
126  /// will return false. Otherwise, <i>tryRecieve</i> returns true.
127  bool tryReceive(Item& item);
128 
129  //////////
130  /// Return the maximum size of the mailbox queue, as given in the
131  /// constructor.
132  int size() const;
133 
134  //////////
135  /// Return the number of items currently in the queue.
136  int numItems() const;
137 
138 private:
139  const char* name_;
140  Mutex mutex_;
141  std::vector<Item> ring_buffer_;
142  int head_;
143  int len_;
144  int max_;
145  ConditionVariable empty_;
146  ConditionVariable full_;
147  Semaphore rendezvous_;
148  int send_wait_;
149  int recv_wait_;
150  inline int ringNext(int inc);
151 
152  // Cannot copy them
153  Mailbox(const Mailbox<Item>&);
154  Mailbox<Item> operator=(const Mailbox<Item>&);
155 };
156 
157 template<class Item> inline
158 int
160 {
161  return max_==0?0:((head_+inc)%max_);
162 }
163 
164 template<class Item>
166  : name_(name), mutex_("Mailbox lock"), ring_buffer_(size==0?1:size),
167  empty_("Mailbox empty condition"), full_("Mailbox full condition"),
168  rendezvous_("Mailbox rendezvous semaphore", 0)
169 {
170  head_=0;
171  len_=0;
172  send_wait_=0;
173  recv_wait_=0;
174  max_=size;
175 }
176 
177 template<class Item>
179 {
180  mutex_.tryLock();
181  mutex_.unlock();
182 
183  // release all waiting threads
184  // empty_.conditionBroadcast();
185  // full_.conditionBroadcast();
186 }
187 
188 template<class Item>
189 void
191 {
192  int s=Thread::couldBlock(name_);
193  mutex_.lock();
194  // See if the message buffer is full...
195  int rmax=max_==0?1:max_;
196  while(len_ == rmax){
197  send_wait_++;
198  full_.wait(mutex_);
199  send_wait_--;
200  }
201  ring_buffer_[ringNext(len_)]=msg;
202  len_++;
203  if(recv_wait_)
204  empty_.conditionSignal();
205  mutex_.unlock();
206  if(max_==0)
207  rendezvous_.down();
209 }
210 
211 template<class Item>
212 bool
214 {
215  mutex_.lock();
216  // See if the message buffer is full...
217  int rmax=max_==0?1:max_;
218  if(len_ == rmax){
219  mutex_.unlock();
220  return false;
221  }
222  if(max_ == 0 && recv_wait_==0){
223  // No receivers waiting, so rendezvous will fail. Return now.
224  mutex_.unlock();
225  return false;
226  }
227 
228  ring_buffer_[ringNext(len_)]=msg;
229  len_++;
230  if(recv_wait_)
231  empty_.conditionSignal();
232  mutex_.unlock();
233  if(max_==0)
234  rendezvous_.down(); // Won't block for long, since a receiver
235  // will wake us up
236  return true;
237 }
238 
239 
240 template<class Item>
241 bool
243  bool (*checker)(const Item &a, const Item &b))
244 {
245  int s = Thread::couldBlock(name_);
246  mutex_.lock();
247  // See if the message buffer is full...
248  int rmax=max_==0?1:max_;
249  while(len_ == rmax){
250  send_wait_++;
251  full_.wait(mutex_);
252  send_wait_--;
253  }
254  if (len_ < 1 || checker(ring_buffer_[ringNext(len_-1)], msg))
255  {
256  ring_buffer_[ringNext(len_)]=msg;
257  len_++;
258  if(recv_wait_)
259  empty_.conditionSignal();
260  mutex_.unlock();
261  if(max_==0)
262  rendezvous_.down();
264  return true;
265  }
266  else
267  {
268  mutex_.unlock();
270  return false;
271  }
272 }
273 
274 
275 template<class Item>
276 Item
278 {
279  int s=Thread::couldBlock(name_);
280  mutex_.lock();
281  while(len_ == 0){
282  recv_wait_++;
283  empty_.wait(mutex_);
284  recv_wait_--;
285  }
286  Item val=ring_buffer_[head_];
287  ring_buffer_[head_] = 0; //if it is a handle clear it.
288  head_=ringNext(1);
289  len_--;
290  if(send_wait_)
291  full_.conditionSignal();
292  mutex_.unlock();
293  if(max_==0)
294  rendezvous_.up();
296  return val;
297 }
298 
299 template<class Item>
300 bool
302 {
303  mutex_.lock();
304  if(len_ == 0){
305  mutex_.unlock();
306  return false;
307  }
308  item=ring_buffer_[head_];
309  ring_buffer_[head_] = 0; //if it is a handle clear it.
310  head_=ringNext(1);
311  len_--;
312  if(send_wait_)
313  full_.conditionSignal();
314  mutex_.unlock();
315  if(max_==0)
316  rendezvous_.up();
317  return true;
318 }
319 
320 template<class Item>
321 int
323 {
324  return max_;
325 } // End namespace SCIRun
326 
327 template<class Item>
328 int
330 {
331  return len_;
332 }
333 
334 } // End namespace SCIRun
335 #endif
336 
337 
#define msg(m)
Definition: PiecewiseInterp.h:55
Item receive()
Definition: Mailbox.h:277
int numItems() const
Return the number of items currently in the queue.
Definition: Mailbox.h:329
bool sendIfNotSentLast(const Item &msg, bool(*checker)(const Item &a, const Item &b))
Definition: Mailbox.h:242
bool tryReceive(Item &item)
Definition: Mailbox.h:301
Definition: ConditionVariable.h:80
Definition: Mutex.h:65
static void couldBlockDone(int restore)
~Mailbox()
Definition: Mailbox.h:178
const char * name[]
Definition: BoostGraphExampleTests.cc:87
int size() const
Definition: Mailbox.h:322
Definition: Mailbox.h:68
Definition: Semaphore.h:67
bool trySend(const Item &msg)
Definition: Mailbox.h:213
Mailbox(const char *name, int size)
Definition: Mailbox.h:165
static int couldBlock(const char *why)
void send(const Item &msg)
Definition: Mailbox.h:190
int size
Definition: eabLatVolData.py:2