SHOGUN  v3.0.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ParseBuffer.h
Go to the documentation of this file.
1 /*
2  * This program is free software; you can redistribute it and/or modify
3  * it under the terms of the GNU General Public License as published by
4  * the Free Software Foundation; either version 3 of the License, or
5  * (at your option) any later version.
6  *
7  * Written (W) 2011 Shashwat Lal Das
8  * Copyright (C) 2011 Berlin Institute of Technology and Max-Planck-Society
9  */
10 #ifndef __PARSEBUFFER_H__
11 #define __PARSEBUFFER_H__
12 
13 #include <shogun/lib/common.h>
14 #ifdef HAVE_PTHREAD
15 
16 #include <shogun/lib/DataType.h>
17 #include <pthread.h>
18 
19 namespace shogun
20 {
21 
24 enum E_IS_EXAMPLE_USED
25 {
26  E_EMPTY = 1,
27  E_NOT_USED = 2,
28  E_USED = 3
29 };
30 
40 template <class T>
41 class Example
42 {
43 public:
45  float64_t label;
47  T* fv;
48  index_t length;
49 };
50 
67 template <class T> class CParseBuffer: public CSGObject
68 {
69 public:
75  CParseBuffer(int32_t size = 1024);
76 
81  ~CParseBuffer();
82 
89  Example<T>* get_free_example()
90  {
91  pthread_mutex_lock(write_lock);
92  pthread_mutex_lock(&ex_in_use_mutex[ex_write_index]);
93  while (ex_used[ex_write_index] == E_NOT_USED)
94  pthread_cond_wait(&ex_in_use_cond[ex_write_index], &ex_in_use_mutex[ex_write_index]);
95  Example<T>* ex=&ex_ring[ex_write_index];
96  pthread_mutex_unlock(&ex_in_use_mutex[ex_write_index]);
97  pthread_mutex_unlock(write_lock);
98 
99  return ex;
100  }
101 
110  int32_t write_example(Example<T>* ex);
111 
117  Example<T>* return_example_to_read();
118 
124  Example<T>* get_unused_example();
125 
134  int32_t copy_example(Example<T>* ex);
135 
143  void finalize_example(bool free_after_release);
144 
154  void set_free_vectors_on_destruct(bool destroy)
155  {
156  free_vectors_on_destruct = destroy;
157  }
158 
163  bool get_free_vectors_on_destruct()
164  {
165  return free_vectors_on_destruct;
166  }
167 
173  virtual const char* get_name() const { return "ParseBuffer"; }
174 
175 protected:
180  virtual void inc_read_index()
181  {
182  ex_read_index=(ex_read_index + 1) % ring_size;
183  }
184 
189  virtual void inc_write_index()
190  {
191  ex_write_index=(ex_write_index + 1) % ring_size;
192  }
193 
194 protected:
195 
197  int32_t ring_size;
199  Example<T>* ex_ring;
200 
202  E_IS_EXAMPLE_USED* ex_used;
204  pthread_mutex_t* ex_in_use_mutex;
206  pthread_cond_t* ex_in_use_cond;
208  pthread_mutex_t* read_lock;
210  pthread_mutex_t* write_lock;
211 
213  int32_t ex_write_index;
215  int32_t ex_read_index;
216 
218  bool free_vectors_on_destruct;
219 };
220 
221 template <class T> CParseBuffer<T>::CParseBuffer(int32_t size)
222 {
223  ring_size = size;
224  ex_ring = SG_CALLOC(Example<T>, ring_size);
225  ex_used = SG_MALLOC(E_IS_EXAMPLE_USED, ring_size);
226  ex_in_use_mutex = SG_MALLOC(pthread_mutex_t, ring_size);
227  ex_in_use_cond = SG_MALLOC(pthread_cond_t, ring_size);
228  read_lock = SG_MALLOC(pthread_mutex_t, 1);
229  write_lock = SG_MALLOC(pthread_mutex_t, 1);
230 
231  SG_SINFO("Initialized with ring size: %d.\n", ring_size)
232 
233  ex_write_index = 0;
234  ex_read_index = 0;
235 
236  for (int32_t i=0; i<ring_size; i++)
237  {
238  ex_used[i] = E_EMPTY;
239 
240  /* this closes a memory leak, seems to have no bad consequences,
241  * but I am not completely sure due to lack of any tests */
242  //ex_ring[i].fv = SG_MALLOC(T, 1);
243  //ex_ring[i].length = 1;
244  ex_ring[i].label = FLT_MAX;
245 
246  pthread_cond_init(&ex_in_use_cond[i], NULL);
247  pthread_mutex_init(&ex_in_use_mutex[i], NULL);
248  }
249  pthread_mutex_init(read_lock, NULL);
250  pthread_mutex_init(write_lock, NULL);
251 
252  free_vectors_on_destruct = true;
253 }
254 
255 template <class T> CParseBuffer<T>::~CParseBuffer()
256 {
257  for (int32_t i=0; i<ring_size; i++)
258  {
259  if (ex_ring[i].fv != NULL && free_vectors_on_destruct)
260  {
261  SG_DEBUG("%s::~%s(): destroying examples ring vector %d at %p\n",
262  get_name(), get_name(), i, ex_ring[i].fv);
263  SG_FREE(ex_ring[i].fv);
264  }
265  pthread_mutex_destroy(&ex_in_use_mutex[i]);
266  pthread_cond_destroy(&ex_in_use_cond[i]);
267  }
268  SG_FREE(ex_ring);
269  SG_FREE(ex_used);
270  SG_FREE(ex_in_use_mutex);
271  SG_FREE(ex_in_use_cond);
272 
273  SG_FREE(read_lock);
274  SG_FREE(write_lock);
275 }
276 
277 template <class T>
278 int32_t CParseBuffer<T>::write_example(Example<T> *ex)
279 {
280  ex_ring[ex_write_index].label = ex->label;
281  ex_ring[ex_write_index].fv = ex->fv;
282  ex_ring[ex_write_index].length = ex->length;
283  ex_used[ex_write_index] = E_NOT_USED;
284  inc_write_index();
285 
286  return 1;
287 }
288 
289 template <class T>
290 Example<T>* CParseBuffer<T>::return_example_to_read()
291 {
292  if (ex_read_index >= 0)
293  return &ex_ring[ex_read_index];
294  else
295  return NULL;
296 }
297 
298 template <class T>
299 Example<T>* CParseBuffer<T>::get_unused_example()
300 {
301  pthread_mutex_lock(read_lock);
302 
303  Example<T> *ex;
304  int32_t current_index = ex_read_index;
305  // Because read index will change after return_example_to_read
306 
307  pthread_mutex_lock(&ex_in_use_mutex[current_index]);
308 
309  if (ex_used[current_index] == E_NOT_USED)
310  ex = return_example_to_read();
311  else
312  ex = NULL;
313 
314  pthread_mutex_unlock(&ex_in_use_mutex[current_index]);
315 
316  pthread_mutex_unlock(read_lock);
317  return ex;
318 }
319 
320 template <class T>
321 int32_t CParseBuffer<T>::copy_example(Example<T> *ex)
322 {
323  pthread_mutex_lock(write_lock);
324  int32_t ret;
325  int32_t current_index = ex_write_index;
326 
327  pthread_mutex_lock(&ex_in_use_mutex[current_index]);
328  while (ex_used[ex_write_index] == E_NOT_USED)
329  {
330  pthread_cond_wait(&ex_in_use_cond[ex_write_index], &ex_in_use_mutex[ex_write_index]);
331  }
332 
333  ret = write_example(ex);
334 
335  pthread_mutex_unlock(&ex_in_use_mutex[current_index]);
336  pthread_mutex_unlock(write_lock);
337 
338  return ret;
339 }
340 
341 template <class T>
342 void CParseBuffer<T>::finalize_example(bool free_after_release)
343 {
344  pthread_mutex_lock(read_lock);
345  pthread_mutex_lock(&ex_in_use_mutex[ex_read_index]);
346  ex_used[ex_read_index] = E_USED;
347 
348  if (free_after_release)
349  {
350  SG_DEBUG("Freeing object in ring at index %d and address: %p.\n",
351  ex_read_index, ex_ring[ex_read_index].fv);
352 
353  SG_FREE(ex_ring[ex_read_index].fv);
354  ex_ring[ex_read_index].fv=NULL;
355  }
356 
357  pthread_cond_signal(&ex_in_use_cond[ex_read_index]);
358  pthread_mutex_unlock(&ex_in_use_mutex[ex_read_index]);
359  inc_read_index();
360 
361  pthread_mutex_unlock(read_lock);
362 }
363 
364 }
365 #endif // HAVE_PTHREAD
366 #endif // __PARSEBUFFER_H__

SHOGUN Machine Learning Toolbox - Documentation