Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 #include <shogun/lib/common.h>
00012 #include <shogun/lib/DataType.h>
00013 #include <pthread.h>
00014
00015 #ifndef __PARSEBUFFER_H__
00016 #define __PARSEBUFFER_H__
00017
00018 namespace shogun
00019 {
00020
00023 enum E_IS_EXAMPLE_USED
00024 {
00025 E_EMPTY = 1,
00026 E_NOT_USED = 2,
00027 E_USED = 3
00028 };
00029
00039 template <class T>
00040 class Example
00041 {
00042 public:
00044 float64_t label;
00046 SGVector<T> fv;
00047 };
00048
00065 template <class T> class CParseBuffer: public CSGObject
00066 {
00067 public:
00073 CParseBuffer(int32_t size = 1024);
00074
00079 ~CParseBuffer();
00080
00087 Example<T>* get_free_example()
00088 {
00089 pthread_mutex_lock(write_lock);
00090 pthread_mutex_lock(&ex_in_use_mutex[ex_write_index]);
00091 while (ex_used[ex_write_index] == E_NOT_USED)
00092 pthread_cond_wait(&ex_in_use_cond[ex_write_index], &ex_in_use_mutex[ex_write_index]);
00093 Example<T>* ex=&ex_ring[ex_write_index];
00094 pthread_mutex_unlock(&ex_in_use_mutex[ex_write_index]);
00095 pthread_mutex_unlock(write_lock);
00096
00097 return ex;
00098 }
00099
00108 int32_t write_example(Example<T>* ex);
00109
00115 Example<T>* return_example_to_read();
00116
00122 Example<T>* get_unused_example();
00123
00132 int32_t copy_example(Example<T>* ex);
00133
00141 void finalize_example(bool free_after_release);
00142
00152 void set_free_vectors_on_destruct(bool destroy)
00153 {
00154 free_vectors_on_destruct = destroy;
00155 }
00156
00161 bool get_free_vectors_on_destruct()
00162 {
00163 return free_vectors_on_destruct;
00164 }
00165
00171 virtual const char* get_name() const { return "ParseBuffer"; }
00172
00173 protected:
00178 virtual void inc_read_index()
00179 {
00180 ex_read_index=(ex_read_index + 1) % ring_size;
00181 }
00182
00187 virtual void inc_write_index()
00188 {
00189 ex_write_index=(ex_write_index + 1) % ring_size;
00190 }
00191
00192 protected:
00193
00195 int32_t ring_size;
00197 Example<T>* ex_ring;
00198
00200 E_IS_EXAMPLE_USED* ex_used;
00202 pthread_mutex_t* ex_in_use_mutex;
00204 pthread_cond_t* ex_in_use_cond;
00206 pthread_mutex_t* read_lock;
00208 pthread_mutex_t* write_lock;
00209
00211 int32_t ex_write_index;
00213 int32_t ex_read_index;
00214
00216 bool free_vectors_on_destruct;
00217 };
00218
00219 template <class T> CParseBuffer<T>::CParseBuffer(int32_t size)
00220 {
00221 ring_size = size;
00222 ex_ring = SG_CALLOC(Example<T>, ring_size);
00223 ex_used = SG_MALLOC(E_IS_EXAMPLE_USED, ring_size);
00224 ex_in_use_mutex = SG_MALLOC(pthread_mutex_t, ring_size);
00225 ex_in_use_cond = SG_MALLOC(pthread_cond_t, ring_size);
00226 read_lock = SG_MALLOC(pthread_mutex_t, 1);
00227 write_lock = SG_MALLOC(pthread_mutex_t, 1);
00228
00229 SG_SINFO("Initialized with ring size: %d.\n", ring_size);
00230
00231 ex_write_index = 0;
00232 ex_read_index = 0;
00233
00234 for (int32_t i=0; i<ring_size; i++)
00235 {
00236 ex_used[i] = E_EMPTY;
00237
00238
00239
00240
00241
00242 ex_ring[i].label = FLT_MAX;
00243
00244 pthread_cond_init(&ex_in_use_cond[i], NULL);
00245 pthread_mutex_init(&ex_in_use_mutex[i], NULL);
00246 }
00247 pthread_mutex_init(read_lock, NULL);
00248 pthread_mutex_init(write_lock, NULL);
00249
00250 free_vectors_on_destruct = true;
00251 }
00252
00253 template <class T> CParseBuffer<T>::~CParseBuffer()
00254 {
00255 for (int32_t i=0; i<ring_size; i++)
00256 {
00257 if (ex_ring[i].fv.vector != NULL && free_vectors_on_destruct)
00258 {
00259 SG_DEBUG("%s::~%s(): destroying examples ring vector %d at %p\n",
00260 get_name(), get_name(), i, ex_ring[i].fv.vector);
00261 SG_FREE(ex_ring[i].fv.vector);
00262 }
00263 pthread_mutex_destroy(&ex_in_use_mutex[i]);
00264 pthread_cond_destroy(&ex_in_use_cond[i]);
00265 }
00266 SG_FREE(ex_ring);
00267 SG_FREE(ex_used);
00268 SG_FREE(ex_in_use_mutex);
00269 SG_FREE(ex_in_use_cond);
00270
00271 SG_FREE(read_lock);
00272 SG_FREE(write_lock);
00273 }
00274
00275 template <class T>
00276 int32_t CParseBuffer<T>::write_example(Example<T> *ex)
00277 {
00278 ex_ring[ex_write_index].label = ex->label;
00279 ex_ring[ex_write_index].fv.vector = ex->fv.vector;
00280 ex_ring[ex_write_index].fv.vlen = ex->fv.vlen;
00281 ex_used[ex_write_index] = E_NOT_USED;
00282 inc_write_index();
00283
00284 return 1;
00285 }
00286
00287 template <class T>
00288 Example<T>* CParseBuffer<T>::return_example_to_read()
00289 {
00290 if (ex_read_index >= 0)
00291 return &ex_ring[ex_read_index];
00292 else
00293 return NULL;
00294 }
00295
00296 template <class T>
00297 Example<T>* CParseBuffer<T>::get_unused_example()
00298 {
00299 pthread_mutex_lock(read_lock);
00300
00301 Example<T> *ex;
00302 int32_t current_index = ex_read_index;
00303
00304
00305 pthread_mutex_lock(&ex_in_use_mutex[current_index]);
00306
00307 if (ex_used[current_index] == E_NOT_USED)
00308 ex = return_example_to_read();
00309 else
00310 ex = NULL;
00311
00312 pthread_mutex_unlock(&ex_in_use_mutex[current_index]);
00313
00314 pthread_mutex_unlock(read_lock);
00315 return ex;
00316 }
00317
00318 template <class T>
00319 int32_t CParseBuffer<T>::copy_example(Example<T> *ex)
00320 {
00321 pthread_mutex_lock(write_lock);
00322 int32_t ret;
00323 int32_t current_index = ex_write_index;
00324
00325 pthread_mutex_lock(&ex_in_use_mutex[current_index]);
00326 while (ex_used[ex_write_index] == E_NOT_USED)
00327 {
00328 pthread_cond_wait(&ex_in_use_cond[ex_write_index], &ex_in_use_mutex[ex_write_index]);
00329 }
00330
00331 ret = write_example(ex);
00332
00333 pthread_mutex_unlock(&ex_in_use_mutex[current_index]);
00334 pthread_mutex_unlock(write_lock);
00335
00336 return ret;
00337 }
00338
00339 template <class T>
00340 void CParseBuffer<T>::finalize_example(bool free_after_release)
00341 {
00342 pthread_mutex_lock(read_lock);
00343 pthread_mutex_lock(&ex_in_use_mutex[ex_read_index]);
00344 ex_used[ex_read_index] = E_USED;
00345
00346 if (free_after_release)
00347 {
00348 SG_DEBUG("Freeing object in ring at index %d and address: %p.\n",
00349 ex_read_index, ex_ring[ex_read_index].fv.vector);
00350
00351 SG_FREE(ex_ring[ex_read_index].fv.vector);
00352 ex_ring[ex_read_index].fv.vector=NULL;
00353 }
00354
00355 pthread_cond_signal(&ex_in_use_cond[ex_read_index]);
00356 pthread_mutex_unlock(&ex_in_use_mutex[ex_read_index]);
00357 inc_read_index();
00358
00359 pthread_mutex_unlock(read_lock);
00360 }
00361
00362 }
00363 #endif // __PARSEBUFFER_H__