InputParser.h

Go to the documentation of this file.
00001 /*
00002  * This program is free software; you can redistribute it and/or modify
00003  * it under the terms of the GNU General Public License as published by
00004  * the Free Software Foundation; either version 3 of the License, or
00005  * (at your option) any later version.
00006  *
00007  * Written (W) 2011 Shashwat Lal Das
00008  * Copyright (C) 2011 Berlin Institute of Technology and Max-Planck-Society
00009  */
00010 
00011 #ifndef __INPUTPARSER_H__
00012 #define __INPUTPARSER_H__
00013 
00014 #include <shogun/io/SGIO.h>
00015 #include <shogun/io/StreamingFile.h>
00016 #include <shogun/lib/common.h>
00017 #include <shogun/io/ParseBuffer.h>
00018 #include <pthread.h>
00019 
00020 #define PARSER_DEFAULT_BUFFSIZE 100
00021 
00022 namespace shogun
00023 {
00026     enum E_EXAMPLE_TYPE
00027     {
00028         E_LABELLED = 1,
00029         E_UNLABELLED = 2
00030     };
00031 
00080 template <class T> class CInputParser
00081 {
00082 public:
00083 
00088     CInputParser();
00089 
00094     ~CInputParser();
00095 
00107     void init(CStreamingFile* input_file, bool is_labelled = true, int32_t size = PARSER_DEFAULT_BUFFSIZE);
00108 
00114     bool is_running();
00115 
00122     int32_t get_number_of_features() { return number_of_features; }
00123 
00135     void set_read_vector(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len));
00136 
00148     void set_read_vector_and_label(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len, float64_t &label));
00149 
00161     int32_t get_vector_and_label(T* &feature_vector,
00162                      int32_t &length,
00163                      float64_t &label);
00164 
00175     int32_t get_vector_only(T* &feature_vector, int32_t &length);
00176 
00183     void set_free_vector_after_release(bool free_vec);
00184 
00191     void set_free_vectors_on_destruct(bool destroy);
00192 
00198     void start_parser();
00199 
00208     void* main_parse_loop(void* params);
00209 
00210 
00216     void copy_example_into_buffer(Example<T>* ex);
00217 
00224     Example<T>* retrieve_example();
00225 
00238     int32_t get_next_example(T* &feature_vector,
00239                  int32_t &length,
00240                  float64_t &label);
00241 
00250     int32_t get_next_example(T* &feature_vector,
00251                  int32_t &length);
00252 
00260     void finalize_example();
00261 
00266     void end_parser();
00267 
00270     void exit_parser();
00271 
00277     int32_t get_ring_size() { return ring_size; }
00278 
00279 private:
00287     static void* parse_loop_entry_point(void* params);
00288 
00289 public:
00290     bool parsing_done;  
00291     bool reading_done;  
00293     E_EXAMPLE_TYPE example_type; 
00295 protected:
00302     void (CStreamingFile::*read_vector) (T* &vec, int32_t &len);
00303 
00310     void (CStreamingFile::*read_vector_and_label) (T* &vec, int32_t &len, float64_t &label);
00311 
00313     CStreamingFile* input_source;
00314 
00316     pthread_t parse_thread;
00317 
00319     CParseBuffer<T>* examples_ring;
00320 
00322     int32_t number_of_features;
00323 
00325     int32_t number_of_vectors_parsed;
00326 
00328     int32_t number_of_vectors_read;
00329 
00331     Example<T>* current_example;
00332 
00334     T* current_feature_vector;
00335 
00337     float64_t current_label;
00338 
00340     int32_t current_len;
00341 
00343     bool free_after_release;
00344 
00346     int32_t ring_size;
00347 
00349     pthread_mutex_t examples_state_lock;
00350 
00352     pthread_cond_t examples_state_changed;
00353 };
00354 
00355 template <class T>
00356     void CInputParser<T>::set_read_vector(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len))
00357 {
00358     // Set read_vector to point to the function passed as arg
00359     read_vector=func_ptr;
00360 }
00361 
00362 template <class T>
00363     void CInputParser<T>::set_read_vector_and_label(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len, float64_t &label))
00364 {
00365     // Set read_vector_and_label to point to the function passed as arg
00366     read_vector_and_label=func_ptr;
00367 }
00368 
00369 template <class T>
00370     CInputParser<T>::CInputParser()
00371 {
00372     //init(NULL, true, PARSER_DEFAULT_BUFFSIZE);
00373 }
00374 
00375 template <class T>
00376     CInputParser<T>::~CInputParser()
00377 {
00378     end_parser();
00379 
00380     pthread_mutex_destroy(&examples_state_lock);
00381     pthread_cond_destroy(&examples_state_changed);
00382 
00383     delete examples_ring;
00384 }
00385 
00386 template <class T>
00387     void CInputParser<T>::init(CStreamingFile* input_file, bool is_labelled, int32_t size)
00388 {
00389     input_source = input_file;
00390 
00391     if (is_labelled == true)
00392         example_type = E_LABELLED;
00393     else
00394         example_type = E_UNLABELLED;
00395 
00396     examples_ring = new CParseBuffer<T>(size);
00397 
00398     parsing_done = false;
00399     reading_done = false;
00400     number_of_vectors_parsed = 0;
00401     number_of_vectors_read = 0;
00402 
00403     current_len = -1;
00404     current_label = -1;
00405     current_feature_vector = NULL;
00406 
00407     free_after_release=true;
00408     ring_size=size;
00409 
00410     pthread_mutex_init(&examples_state_lock, NULL);
00411     pthread_cond_init(&examples_state_changed, NULL);
00412 }
00413 
00414 template <class T>
00415     void CInputParser<T>::set_free_vector_after_release(bool free_vec)
00416 {
00417     free_after_release=free_vec;
00418 }
00419 
00420 template <class T>
00421     void CInputParser<T>::set_free_vectors_on_destruct(bool destroy)
00422 {
00423     examples_ring->set_free_vectors_on_destruct(destroy);
00424 }
00425 
00426 template <class T>
00427     void CInputParser<T>::start_parser()
00428 {
00429     if (is_running())
00430     {
00431         SG_SERROR("Parser thread is already running! Multiple parse threads not supported.\n");
00432     }
00433 
00434     pthread_create(&parse_thread, NULL, parse_loop_entry_point, this);
00435 }
00436 
00437 template <class T>
00438     void* CInputParser<T>::parse_loop_entry_point(void* params)
00439 {
00440     ((CInputParser *) params)->main_parse_loop(params);
00441 
00442     return NULL;
00443 }
00444 
00445 template <class T>
00446     bool CInputParser<T>::is_running()
00447 {
00448     bool ret;
00449 
00450     pthread_mutex_lock(&examples_state_lock);
00451 
00452     if (parsing_done)
00453         if (reading_done)
00454             ret = false;
00455         else
00456             ret = true;
00457     else
00458         ret = false;
00459 
00460     pthread_mutex_unlock(&examples_state_lock);
00461     return ret;
00462 }
00463 
00464 template <class T>
00465     int32_t CInputParser<T>::get_vector_and_label(T* &feature_vector,
00466                               int32_t &length,
00467                               float64_t &label)
00468 {
00469     (input_source->*read_vector_and_label)(feature_vector, length, label);
00470 
00471     if (length < 1)
00472     {
00473         // Problem reading the example
00474         return 0;
00475     }
00476 
00477     return 1;
00478 }
00479 
00480 template <class T>
00481     int32_t CInputParser<T>::get_vector_only(T* &feature_vector,
00482                          int32_t &length)
00483 {
00484     (input_source->*read_vector)(feature_vector, length);
00485 
00486     if (length < 1)
00487     {
00488         // Problem reading the example
00489         return 0;
00490     }
00491 
00492     return 1;
00493 }
00494 
00495 template <class T>
00496     void CInputParser<T>::copy_example_into_buffer(Example<T>* ex)
00497 {
00498     examples_ring->copy_example(ex);
00499 }
00500 
00501 template <class T> void* CInputParser<T>::main_parse_loop(void* params)
00502 {
00503     // Read the examples into current_* objects
00504     // Instead of allocating mem for new objects each time
00505 
00506     CInputParser* this_obj = (CInputParser *) params;
00507     this->input_source = this_obj->input_source;
00508 
00509     while (1)
00510     {
00511         pthread_mutex_lock(&examples_state_lock);
00512         if (parsing_done)
00513         {
00514             pthread_mutex_unlock(&examples_state_lock);
00515             return NULL;
00516         }
00517         pthread_mutex_unlock(&examples_state_lock);
00518 
00519         pthread_testcancel();
00520 
00521     current_example = examples_ring->get_free_example();
00522     current_feature_vector = current_example->fv.vector;
00523     current_len = current_example->fv.vlen;
00524     current_label = current_example->label;
00525 
00526         if (example_type == E_LABELLED)
00527             get_vector_and_label(current_feature_vector, current_len, current_label);
00528         else
00529             get_vector_only(current_feature_vector, current_len);
00530 
00531         if (current_len < 0)
00532         {
00533             pthread_mutex_lock(&examples_state_lock);
00534             parsing_done = true;
00535             pthread_cond_signal(&examples_state_changed);
00536             pthread_mutex_unlock(&examples_state_lock);
00537             return NULL;
00538         }
00539 
00540         current_example->label = current_label;
00541         current_example->fv.vector = current_feature_vector;
00542         current_example->fv.vlen = current_len;
00543 
00544         examples_ring->copy_example(current_example);
00545 
00546         pthread_mutex_lock(&examples_state_lock);
00547         number_of_vectors_parsed++;
00548         pthread_cond_signal(&examples_state_changed);
00549         pthread_mutex_unlock(&examples_state_lock);
00550     }
00551 
00552     return NULL;
00553 }
00554 
00555 template <class T> Example<T>* CInputParser<T>::retrieve_example()
00556 {
00557     /* This function should be guarded by mutexes while calling  */
00558     Example<T> *ex;
00559 
00560     if (parsing_done)
00561     {
00562         if (number_of_vectors_read == number_of_vectors_parsed)
00563         {
00564             reading_done = true;
00565             /* Signal to waiting threads that no more examples are left */
00566             pthread_cond_signal(&examples_state_changed);
00567             return NULL;
00568         }
00569     }
00570 
00571     if (number_of_vectors_parsed <= 0)
00572         return NULL;
00573 
00574     if (number_of_vectors_read == number_of_vectors_parsed)
00575     {
00576         return NULL;
00577     }
00578 
00579     ex = examples_ring->get_unused_example();
00580     number_of_vectors_read++;
00581 
00582     return ex;
00583 }
00584 
00585 template <class T> int32_t CInputParser<T>::get_next_example(T* &fv,
00586         int32_t &length, float64_t &label)
00587 {
00588     /* if reading is done, no more examples can be fetched. return 0
00589        else, if example can be read, get the example and return 1.
00590        otherwise, wait for further parsing, get the example and
00591        return 1 */
00592 
00593     Example<T> *ex;
00594 
00595     while (1)
00596     {
00597         if (reading_done)
00598             return 0;
00599 
00600         pthread_mutex_lock(&examples_state_lock);
00601         ex = retrieve_example();
00602 
00603         if (ex == NULL)
00604         {
00605             if (reading_done)
00606             {
00607                 /* No more examples left, return */
00608                 pthread_mutex_unlock(&examples_state_lock);
00609                 return 0;
00610             }
00611             else
00612             {
00613                 /* Examples left, wait for one to become ready */
00614                 pthread_cond_wait(&examples_state_changed, &examples_state_lock);
00615                 pthread_mutex_unlock(&examples_state_lock);
00616                 continue;
00617             }
00618         }
00619         else
00620         {
00621             /* Example ready, return the example */
00622             pthread_mutex_unlock(&examples_state_lock);
00623             break;
00624         }
00625     }
00626 
00627     fv = ex->fv.vector;
00628     length = ex->fv.vlen;
00629     label = ex->label;
00630 
00631     return 1;
00632 }
00633 
00634 template <class T>
00635     int32_t CInputParser<T>::get_next_example(T* &fv, int32_t &length)
00636 {
00637     float64_t label_dummy;
00638 
00639     return get_next_example(fv, length, label_dummy);
00640 }
00641 
00642 template <class T>
00643     void CInputParser<T>::finalize_example()
00644 {
00645     examples_ring->finalize_example(free_after_release);
00646 }
00647 
00648 template <class T> void CInputParser<T>::end_parser()
00649 {
00650     pthread_join(parse_thread, NULL);
00651 }
00652 
00653 template <class T> void CInputParser<T>::exit_parser()
00654 {
00655     pthread_cancel(parse_thread);
00656 }
00657 }
00658 #endif // __INPUTPARSER_H__
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

SHOGUN Machine Learning Toolbox - Documentation