ParseBuffer.h

Go to the documentation of this file.
00001 /*
00002  * This program is free software; you can redistribute it and/or modify
00003  * it under the terms of the GNU General Public License as published by
00004  * the Free Software Foundation; either version 3 of the License, or
00005  * (at your option) any later version.
00006  *
00007  * Written (W) 2011 Shashwat Lal Das
00008  * Copyright (C) 2011 Berlin Institute of Technology and Max-Planck-Society
00009  */
00010 
00011 #include <shogun/lib/common.h>
00012 #include <shogun/lib/DataType.h>
00013 #include <pthread.h>
00014 
00015 #ifndef __PARSEBUFFER_H__
00016 #define __PARSEBUFFER_H__
00017 
00018 namespace shogun
00019 {
00020 
00023 enum E_IS_EXAMPLE_USED
00024 {
00025     E_EMPTY = 1,
00026     E_NOT_USED = 2,
00027     E_USED = 3
00028 };
00029 
00039 template <class T>
00040 class Example
00041 {
00042 public:
00044     float64_t label;
00046     SGVector<T> fv;
00047 };
00048 
00065 template <class T> class CParseBuffer: public CSGObject
00066 {
00067 public:
00073     CParseBuffer(int32_t size = 1024);
00074 
00079     ~CParseBuffer();
00080 
00087     Example<T>* get_free_example()
00088     {
00089         pthread_mutex_lock(write_lock);
00090         pthread_mutex_lock(&ex_in_use_mutex[ex_write_index]);
00091         while (ex_used[ex_write_index] == E_NOT_USED)
00092             pthread_cond_wait(&ex_in_use_cond[ex_write_index], &ex_in_use_mutex[ex_write_index]);
00093         Example<T>* ex=&ex_ring[ex_write_index];
00094         pthread_mutex_unlock(&ex_in_use_mutex[ex_write_index]);
00095         pthread_mutex_unlock(write_lock);
00096 
00097         return ex;
00098     }
00099 
00108     int32_t write_example(Example<T>* ex);
00109 
00115     Example<T>* return_example_to_read();
00116 
00122     Example<T>* get_unused_example();
00123 
00132     int32_t copy_example(Example<T>* ex);
00133 
00141     void finalize_example(bool free_after_release);
00142 
00152     void set_free_vectors_on_destruct(bool destroy)
00153     {
00154         free_vectors_on_destruct = destroy;
00155     }
00156 
00161     bool get_free_vectors_on_destruct()
00162     {
00163         return free_vectors_on_destruct;
00164     }
00165 
00171     virtual const char* get_name() const { return "ParseBuffer"; }
00172 
00173 protected:
00178     virtual void inc_read_index()
00179     {
00180         ex_read_index=(ex_read_index + 1) % ring_size;
00181     }
00182 
00187     virtual void inc_write_index()
00188     {
00189         ex_write_index=(ex_write_index + 1) % ring_size;
00190     }
00191 
00192 protected:
00193 
00195     int32_t ring_size;
00197     Example<T>* ex_ring;
00198 
00200     E_IS_EXAMPLE_USED* ex_used;
00202     pthread_mutex_t* ex_in_use_mutex;
00204     pthread_cond_t* ex_in_use_cond;
00206     pthread_mutex_t* read_lock;
00208     pthread_mutex_t* write_lock;
00209 
00211     int32_t ex_write_index;
00213     int32_t ex_read_index;
00214 
00216     bool free_vectors_on_destruct;
00217 };
00218 
00219 template <class T> CParseBuffer<T>::CParseBuffer(int32_t size)
00220 {
00221     ring_size = size;
00222     ex_ring = SG_CALLOC(Example<T>, ring_size);
00223     ex_used = SG_MALLOC(E_IS_EXAMPLE_USED, ring_size);
00224     ex_in_use_mutex = SG_MALLOC(pthread_mutex_t, ring_size);
00225     ex_in_use_cond = SG_MALLOC(pthread_cond_t, ring_size);
00226     read_lock = SG_MALLOC(pthread_mutex_t, 1);
00227     write_lock = SG_MALLOC(pthread_mutex_t, 1);
00228 
00229     SG_SINFO("Initialized with ring size: %d.\n", ring_size);
00230 
00231     ex_write_index = 0;
00232     ex_read_index = 0;
00233 
00234     for (int32_t i=0; i<ring_size; i++)
00235     {
00236         ex_used[i] = E_EMPTY;
00237 
00238         /* this closes a memory leak, seems to have no bad consequences,
00239          * but I am not completely sure due to lack of any tests */
00240 //      ex_ring[i].fv.vector = SG_MALLOC(T, 1);
00241 //      ex_ring[i].fv.vlen = 1;
00242         ex_ring[i].label = FLT_MAX;
00243 
00244         pthread_cond_init(&ex_in_use_cond[i], NULL);
00245         pthread_mutex_init(&ex_in_use_mutex[i], NULL);
00246     }
00247     pthread_mutex_init(read_lock, NULL);
00248     pthread_mutex_init(write_lock, NULL);
00249 
00250     free_vectors_on_destruct = true;
00251 }
00252 
00253 template <class T> CParseBuffer<T>::~CParseBuffer()
00254 {
00255     for (int32_t i=0; i<ring_size; i++)
00256     {
00257         if (ex_ring[i].fv.vector != NULL && free_vectors_on_destruct)
00258         {
00259             SG_DEBUG("%s::~%s(): destroying examples ring vector %d at %p\n",
00260                     get_name(), get_name(), i, ex_ring[i].fv.vector);
00261             SG_FREE(ex_ring[i].fv.vector);
00262         }
00263         pthread_mutex_destroy(&ex_in_use_mutex[i]);
00264         pthread_cond_destroy(&ex_in_use_cond[i]);
00265     }
00266     SG_FREE(ex_ring);
00267     SG_FREE(ex_used);
00268     SG_FREE(ex_in_use_mutex);
00269     SG_FREE(ex_in_use_cond);
00270 
00271     SG_FREE(read_lock);
00272     SG_FREE(write_lock);
00273 }
00274 
00275 template <class T>
00276 int32_t CParseBuffer<T>::write_example(Example<T> *ex)
00277 {
00278     ex_ring[ex_write_index].label = ex->label;
00279     ex_ring[ex_write_index].fv.vector = ex->fv.vector;
00280     ex_ring[ex_write_index].fv.vlen = ex->fv.vlen;
00281     ex_used[ex_write_index] = E_NOT_USED;
00282     inc_write_index();
00283 
00284     return 1;
00285 }
00286 
00287 template <class T>
00288 Example<T>* CParseBuffer<T>::return_example_to_read()
00289 {
00290     if (ex_read_index >= 0)
00291         return &ex_ring[ex_read_index];
00292     else
00293         return NULL;
00294 }
00295 
00296 template <class T>
00297 Example<T>* CParseBuffer<T>::get_unused_example()
00298 {
00299     pthread_mutex_lock(read_lock);
00300 
00301     Example<T> *ex;
00302     int32_t current_index = ex_read_index;
00303     // Because read index will change after return_example_to_read
00304 
00305     pthread_mutex_lock(&ex_in_use_mutex[current_index]);
00306 
00307     if (ex_used[current_index] == E_NOT_USED)
00308         ex = return_example_to_read();
00309     else
00310         ex = NULL;
00311 
00312     pthread_mutex_unlock(&ex_in_use_mutex[current_index]);
00313 
00314     pthread_mutex_unlock(read_lock);
00315     return ex;
00316 }
00317 
00318 template <class T>
00319 int32_t CParseBuffer<T>::copy_example(Example<T> *ex)
00320 {
00321     pthread_mutex_lock(write_lock);
00322     int32_t ret;
00323     int32_t current_index = ex_write_index;
00324 
00325     pthread_mutex_lock(&ex_in_use_mutex[current_index]);
00326     while (ex_used[ex_write_index] == E_NOT_USED)
00327     {
00328         pthread_cond_wait(&ex_in_use_cond[ex_write_index], &ex_in_use_mutex[ex_write_index]);
00329     }
00330 
00331     ret = write_example(ex);
00332 
00333     pthread_mutex_unlock(&ex_in_use_mutex[current_index]);
00334     pthread_mutex_unlock(write_lock);
00335 
00336     return ret;
00337 }
00338 
00339 template <class T>
00340 void CParseBuffer<T>::finalize_example(bool free_after_release)
00341 {
00342     pthread_mutex_lock(read_lock);
00343     pthread_mutex_lock(&ex_in_use_mutex[ex_read_index]);
00344     ex_used[ex_read_index] = E_USED;
00345 
00346     if (free_after_release)
00347     {
00348         SG_DEBUG("Freeing object in ring at index %d and address: %p.\n",
00349              ex_read_index, ex_ring[ex_read_index].fv.vector);
00350 
00351         SG_FREE(ex_ring[ex_read_index].fv.vector);
00352         ex_ring[ex_read_index].fv.vector=NULL;
00353     }
00354 
00355     pthread_cond_signal(&ex_in_use_cond[ex_read_index]);
00356     pthread_mutex_unlock(&ex_in_use_mutex[ex_read_index]);
00357     inc_read_index();
00358 
00359     pthread_mutex_unlock(read_lock);
00360 }
00361 
00362 }
00363 #endif // __PARSEBUFFER_H__
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

SHOGUN Machine Learning Toolbox - Documentation