SHOGUN  4.1.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
ParseBuffer.h
Go to the documentation of this file.
1 /*
2  * This program is free software; you can redistribute it and/or modify
3  * it under the terms of the GNU General Public License as published by
4  * the Free Software Foundation; either version 3 of the License, or
5  * (at your option) any later version.
6  *
7  * Written (W) 2011 Shashwat Lal Das
8  * Copyright (C) 2011 Berlin Institute of Technology and Max-Planck-Society
9  */
10 #ifndef __PARSEBUFFER_H__
11 #define __PARSEBUFFER_H__
12 
13 #include <shogun/lib/config.h>
14 
15 #include <shogun/lib/common.h>
16 #ifdef HAVE_PTHREAD
17 
18 #include <shogun/lib/DataType.h>
19 #include <pthread.h>
20 
21 namespace shogun
22 {
23 
26 enum E_IS_EXAMPLE_USED
27 {
28  E_EMPTY = 1,
29  E_NOT_USED = 2,
30  E_USED = 3
31 };
32 
42 template <class T>
43 class Example
44 {
45 public:
47  float64_t label;
49  T* fv;
50  index_t length;
51 };
52 
69 template <class T> class CParseBuffer: public CSGObject
70 {
71 public:
77  CParseBuffer(int32_t size = 1024);
78 
83  ~CParseBuffer();
84 
91  Example<T>* get_free_example()
92  {
93  pthread_mutex_lock(write_lock);
94  pthread_mutex_lock(&ex_in_use_mutex[ex_write_index]);
95  while (ex_used[ex_write_index] == E_NOT_USED)
96  pthread_cond_wait(&ex_in_use_cond[ex_write_index], &ex_in_use_mutex[ex_write_index]);
97  Example<T>* ex=&ex_ring[ex_write_index];
98  pthread_mutex_unlock(&ex_in_use_mutex[ex_write_index]);
99  pthread_mutex_unlock(write_lock);
100 
101  return ex;
102  }
103 
112  int32_t write_example(Example<T>* ex);
113 
119  Example<T>* return_example_to_read();
120 
126  Example<T>* get_unused_example();
127 
136  int32_t copy_example(Example<T>* ex);
137 
145  void finalize_example(bool free_after_release);
146 
156  void set_free_vectors_on_destruct(bool destroy)
157  {
158  free_vectors_on_destruct = destroy;
159  }
160 
165  bool get_free_vectors_on_destruct()
166  {
167  return free_vectors_on_destruct;
168  }
169 
175  virtual const char* get_name() const { return "ParseBuffer"; }
176 
177 protected:
182  virtual void inc_read_index()
183  {
184  ex_read_index=(ex_read_index + 1) % ring_size;
185  }
186 
191  virtual void inc_write_index()
192  {
193  ex_write_index=(ex_write_index + 1) % ring_size;
194  }
195 
196 protected:
197 
199  int32_t ring_size;
201  Example<T>* ex_ring;
202 
204  E_IS_EXAMPLE_USED* ex_used;
206  pthread_mutex_t* ex_in_use_mutex;
208  pthread_cond_t* ex_in_use_cond;
210  pthread_mutex_t* read_lock;
212  pthread_mutex_t* write_lock;
213 
215  int32_t ex_write_index;
217  int32_t ex_read_index;
218 
220  bool free_vectors_on_destruct;
221 };
222 
223 template <class T> CParseBuffer<T>::CParseBuffer(int32_t size)
224 {
225  ring_size = size;
226  ex_ring = SG_CALLOC(Example<T>, ring_size);
227  ex_used = SG_MALLOC(E_IS_EXAMPLE_USED, ring_size);
228  ex_in_use_mutex = SG_MALLOC(pthread_mutex_t, ring_size);
229  ex_in_use_cond = SG_MALLOC(pthread_cond_t, ring_size);
230  read_lock = SG_MALLOC(pthread_mutex_t, 1);
231  write_lock = SG_MALLOC(pthread_mutex_t, 1);
232 
233  SG_SINFO("Initialized with ring size: %d.\n", ring_size)
234 
235  ex_write_index = 0;
236  ex_read_index = 0;
237 
238  for (int32_t i=0; i<ring_size; i++)
239  {
240  ex_used[i] = E_EMPTY;
241 
242  /* this closes a memory leak, seems to have no bad consequences,
243  * but I am not completely sure due to lack of any tests */
244  //ex_ring[i].fv = SG_MALLOC(T, 1);
245  //ex_ring[i].length = 1;
246  ex_ring[i].label = FLT_MAX;
247 
248  pthread_cond_init(&ex_in_use_cond[i], NULL);
249  pthread_mutex_init(&ex_in_use_mutex[i], NULL);
250  }
251  pthread_mutex_init(read_lock, NULL);
252  pthread_mutex_init(write_lock, NULL);
253 
254  free_vectors_on_destruct = true;
255 }
256 
257 template <class T> CParseBuffer<T>::~CParseBuffer()
258 {
259  for (int32_t i=0; i<ring_size; i++)
260  {
261  if (ex_ring[i].fv != NULL && free_vectors_on_destruct)
262  {
263  SG_DEBUG("%s::~%s(): destroying examples ring vector %d at %p\n",
264  get_name(), get_name(), i, ex_ring[i].fv);
265  SG_FREE(ex_ring[i].fv);
266  }
267  pthread_mutex_destroy(&ex_in_use_mutex[i]);
268  pthread_cond_destroy(&ex_in_use_cond[i]);
269  }
270  SG_FREE(ex_ring);
271  SG_FREE(ex_used);
272  SG_FREE(ex_in_use_mutex);
273  SG_FREE(ex_in_use_cond);
274 
275  SG_FREE(read_lock);
276  SG_FREE(write_lock);
277 }
278 
279 template <class T>
280 int32_t CParseBuffer<T>::write_example(Example<T> *ex)
281 {
282  ex_ring[ex_write_index].label = ex->label;
283  ex_ring[ex_write_index].fv = ex->fv;
284  ex_ring[ex_write_index].length = ex->length;
285  ex_used[ex_write_index] = E_NOT_USED;
286  inc_write_index();
287 
288  return 1;
289 }
290 
291 template <class T>
292 Example<T>* CParseBuffer<T>::return_example_to_read()
293 {
294  if (ex_read_index >= 0)
295  return &ex_ring[ex_read_index];
296  else
297  return NULL;
298 }
299 
300 template <class T>
301 Example<T>* CParseBuffer<T>::get_unused_example()
302 {
303  pthread_mutex_lock(read_lock);
304 
305  Example<T> *ex;
306  int32_t current_index = ex_read_index;
307  // Because read index will change after return_example_to_read
308 
309  pthread_mutex_lock(&ex_in_use_mutex[current_index]);
310 
311  if (ex_used[current_index] == E_NOT_USED)
312  ex = return_example_to_read();
313  else
314  ex = NULL;
315 
316  pthread_mutex_unlock(&ex_in_use_mutex[current_index]);
317 
318  pthread_mutex_unlock(read_lock);
319  return ex;
320 }
321 
322 template <class T>
323 int32_t CParseBuffer<T>::copy_example(Example<T> *ex)
324 {
325  pthread_mutex_lock(write_lock);
326  int32_t ret;
327  int32_t current_index = ex_write_index;
328 
329  pthread_mutex_lock(&ex_in_use_mutex[current_index]);
330  while (ex_used[ex_write_index] == E_NOT_USED)
331  {
332  pthread_cond_wait(&ex_in_use_cond[ex_write_index], &ex_in_use_mutex[ex_write_index]);
333  }
334 
335  ret = write_example(ex);
336 
337  pthread_mutex_unlock(&ex_in_use_mutex[current_index]);
338  pthread_mutex_unlock(write_lock);
339 
340  return ret;
341 }
342 
343 template <class T>
344 void CParseBuffer<T>::finalize_example(bool free_after_release)
345 {
346  pthread_mutex_lock(read_lock);
347  pthread_mutex_lock(&ex_in_use_mutex[ex_read_index]);
348  ex_used[ex_read_index] = E_USED;
349 
350  if (free_after_release)
351  {
352  SG_DEBUG("Freeing object in ring at index %d and address: %p.\n",
353  ex_read_index, ex_ring[ex_read_index].fv);
354 
355  SG_FREE(ex_ring[ex_read_index].fv);
356  ex_ring[ex_read_index].fv=NULL;
357  }
358 
359  pthread_cond_signal(&ex_in_use_cond[ex_read_index]);
360  pthread_mutex_unlock(&ex_in_use_mutex[ex_read_index]);
361  inc_read_index();
362 
363  pthread_mutex_unlock(read_lock);
364 }
365 
366 }
367 #endif // HAVE_PTHREAD
368 #endif // __PARSEBUFFER_H__
int32_t index_t
Definition: common.h:62
double float64_t
Definition: common.h:50
#define SG_DEBUG(...)
Definition: SGIO.h:107
all of classes and functions are contained in the shogun namespace
Definition: class_list.h:18
#define SG_SINFO(...)
Definition: SGIO.h:173

SHOGUN Machine Learning Toolbox - Documentation