SHOGUN  v2.0.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ParseBuffer.h
Go to the documentation of this file.
1 /*
2  * This program is free software; you can redistribute it and/or modify
3  * it under the terms of the GNU General Public License as published by
4  * the Free Software Foundation; either version 3 of the License, or
5  * (at your option) any later version.
6  *
7  * Written (W) 2011 Shashwat Lal Das
8  * Copyright (C) 2011 Berlin Institute of Technology and Max-Planck-Society
9  */
10 
11 #include <shogun/lib/common.h>
12 #include <shogun/lib/DataType.h>
13 #include <pthread.h>
14 
15 #ifndef __PARSEBUFFER_H__
16 #define __PARSEBUFFER_H__
17 
18 namespace shogun
19 {
20 
24 {
25  E_EMPTY = 1,
27  E_USED = 3
28 };
29 
39 template <class T>
40 class Example
41 {
42 public:
47 };
48 
65 template <class T> class CParseBuffer: public CSGObject
66 {
67 public:
73  CParseBuffer(int32_t size = 1024);
74 
79  ~CParseBuffer();
80 
88  {
89  pthread_mutex_lock(write_lock);
90  pthread_mutex_lock(&ex_in_use_mutex[ex_write_index]);
91  while (ex_used[ex_write_index] == E_NOT_USED)
92  pthread_cond_wait(&ex_in_use_cond[ex_write_index], &ex_in_use_mutex[ex_write_index]);
94  pthread_mutex_unlock(&ex_in_use_mutex[ex_write_index]);
95  pthread_mutex_unlock(write_lock);
96 
97  return ex;
98  }
99 
108  int32_t write_example(Example<T>* ex);
109 
116 
123 
132  int32_t copy_example(Example<T>* ex);
133 
141  void finalize_example(bool free_after_release);
142 
152  void set_free_vectors_on_destruct(bool destroy) { free_vectors_on_destruct = destroy; }
153 
159 
165  inline virtual const char* get_name() const { return "ParseBuffer"; }
166 
167 protected:
172  inline virtual void inc_read_index()
173  {
175  }
176 
181  inline virtual void inc_write_index()
182  {
184  }
185 
186 protected:
187 
189  int32_t ring_size;
192 
196  pthread_mutex_t* ex_in_use_mutex;
198  pthread_cond_t* ex_in_use_cond;
200  pthread_mutex_t* read_lock;
202  pthread_mutex_t* write_lock;
203 
205  int32_t ex_write_index;
207  int32_t ex_read_index;
208 
211 };
212 
213 template <class T> CParseBuffer<T>::CParseBuffer(int32_t size)
214 {
215  ring_size = size;
216  ex_ring = SG_CALLOC(Example<T>, ring_size);
217  ex_used = SG_MALLOC(E_IS_EXAMPLE_USED, ring_size);
218  ex_in_use_mutex = SG_MALLOC(pthread_mutex_t, ring_size);
219  ex_in_use_cond = SG_MALLOC(pthread_cond_t, ring_size);
220  read_lock = new pthread_mutex_t;
221  write_lock = new pthread_mutex_t;
222 
223  SG_SINFO("Initialized with ring size: %d.\n", ring_size);
224 
225  ex_write_index = 0;
226  ex_read_index = 0;
227 
228  for (int32_t i=0; i<ring_size; i++)
229  {
230  ex_used[i] = E_EMPTY;
231  ex_ring[i].fv.vector = new T();
232  ex_ring[i].fv.vlen = 1;
233  ex_ring[i].label = FLT_MAX;
234 
235  pthread_cond_init(&ex_in_use_cond[i], NULL);
236  pthread_mutex_init(&ex_in_use_mutex[i], NULL);
237  }
238  pthread_mutex_init(read_lock, NULL);
239  pthread_mutex_init(write_lock, NULL);
240 
241  free_vectors_on_destruct = true;
242 }
243 
244 template <class T> CParseBuffer<T>::~CParseBuffer()
245 {
246  for (int32_t i=0; i<ring_size; i++)
247  {
248  if (ex_ring[i].fv.vector != NULL && free_vectors_on_destruct)
249  delete ex_ring[i].fv.vector;
250  pthread_mutex_destroy(&ex_in_use_mutex[i]);
251  pthread_cond_destroy(&ex_in_use_cond[i]);
252  }
253  SG_FREE(ex_ring);
254  SG_FREE(ex_used);
255  SG_FREE(ex_in_use_mutex);
256  SG_FREE(ex_in_use_cond);
257 
258  delete read_lock;
259  delete write_lock;
260 }
261 
262 template <class T>
264 {
265  ex_ring[ex_write_index].label = ex->label;
266  ex_ring[ex_write_index].fv.vector = ex->fv.vector;
267  ex_ring[ex_write_index].fv.vlen = ex->fv.vlen;
268  ex_used[ex_write_index] = E_NOT_USED;
269  inc_write_index();
270 
271  return 1;
272 }
273 
274 template <class T>
276 {
277  if (ex_read_index >= 0)
278  return &ex_ring[ex_read_index];
279  else
280  return NULL;
281 }
282 
283 template <class T>
285 {
286  pthread_mutex_lock(read_lock);
287 
288  Example<T> *ex;
289  int32_t current_index = ex_read_index;
290  // Because read index will change after return_example_to_read
291 
292  pthread_mutex_lock(&ex_in_use_mutex[current_index]);
293 
294  if (ex_used[current_index] == E_NOT_USED)
295  ex = return_example_to_read();
296  else
297  ex = NULL;
298 
299  pthread_mutex_unlock(&ex_in_use_mutex[current_index]);
300 
301  pthread_mutex_unlock(read_lock);
302  return ex;
303 }
304 
305 template <class T>
307 {
308  pthread_mutex_lock(write_lock);
309  int32_t ret;
310  int32_t current_index = ex_write_index;
311 
312  pthread_mutex_lock(&ex_in_use_mutex[current_index]);
313  while (ex_used[ex_write_index] == E_NOT_USED)
314  {
315  pthread_cond_wait(&ex_in_use_cond[ex_write_index], &ex_in_use_mutex[ex_write_index]);
316  }
317 
318  ret = write_example(ex);
319 
320  pthread_mutex_unlock(&ex_in_use_mutex[current_index]);
321  pthread_mutex_unlock(write_lock);
322 
323  return ret;
324 }
325 
326 template <class T>
327 void CParseBuffer<T>::finalize_example(bool free_after_release)
328 {
329  pthread_mutex_lock(read_lock);
330  pthread_mutex_lock(&ex_in_use_mutex[ex_read_index]);
331  ex_used[ex_read_index] = E_USED;
332 
333  if (free_after_release)
334  {
335  SG_DEBUG("Freeing object in ring at index %d and address: %p.\n",
336  ex_read_index, ex_ring[ex_read_index].fv.vector);
337 
338  delete ex_ring[ex_read_index].fv.vector;
339  ex_ring[ex_read_index].fv.vector=NULL;
340  }
341 
342  pthread_cond_signal(&ex_in_use_cond[ex_read_index]);
343  pthread_mutex_unlock(&ex_in_use_mutex[ex_read_index]);
344  inc_read_index();
345 
346  pthread_mutex_unlock(read_lock);
347 }
348 
349 }
350 #endif // __PARSEBUFFER_H__

SHOGUN Machine Learning Toolbox - Documentation