InputParser.h

Go to the documentation of this file.
00001 /*
00002  * This program is free software; you can redistribute it and/or modify
00003  * it under the terms of the GNU General Public License as published by
00004  * the Free Software Foundation; either version 3 of the License, or
00005  * (at your option) any later version.
00006  *
00007  * Written (W) 2011 Shashwat Lal Das
00008  * Copyright (C) 2011 Berlin Institute of Technology and Max-Planck-Society
00009  */
00010 
00011 #ifndef __INPUTPARSER_H__
00012 #define __INPUTPARSER_H__
00013 
00014 #include <shogun/io/SGIO.h>
00015 #include <shogun/io/streaming/StreamingFile.h>
00016 #include <shogun/lib/common.h>
00017 #include <shogun/io/streaming/ParseBuffer.h>
00018 #include <pthread.h>
00019 
00020 #define PARSER_DEFAULT_BUFFSIZE 100
00021 
00022 namespace shogun
00023 {
00026     enum E_EXAMPLE_TYPE
00027     {
00028         E_LABELLED = 1,
00029         E_UNLABELLED = 2
00030     };
00031 
00080 template <class T> class CInputParser
00081 {
00082 public:
00083 
00088     CInputParser();
00089 
00094     ~CInputParser();
00095 
00107     void init(CStreamingFile* input_file, bool is_labelled = true, int32_t size = PARSER_DEFAULT_BUFFSIZE);
00108 
00114     bool is_running();
00115 
00122     int32_t get_number_of_features() { return number_of_features; }
00123 
00135     void set_read_vector(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len));
00136 
00148     void set_read_vector_and_label(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len, float64_t &label));
00149 
00161     int32_t get_vector_and_label(T* &feature_vector,
00162                      int32_t &length,
00163                      float64_t &label);
00164 
00175     int32_t get_vector_only(T* &feature_vector, int32_t &length);
00176 
00183     void set_free_vector_after_release(bool free_vec);
00184 
00191     void set_free_vectors_on_destruct(bool destroy);
00192 
00198     void start_parser();
00199 
00208     void* main_parse_loop(void* params);
00209 
00210 
00216     void copy_example_into_buffer(Example<T>* ex);
00217 
00224     Example<T>* retrieve_example();
00225 
00238     int32_t get_next_example(T* &feature_vector,
00239                  int32_t &length,
00240                  float64_t &label);
00241 
00250     int32_t get_next_example(T* &feature_vector,
00251                  int32_t &length);
00252 
00260     void finalize_example();
00261 
00266     void end_parser();
00267 
00270     void exit_parser();
00271 
00277     int32_t get_ring_size() { return ring_size; }
00278 
00279 private:
00287     static void* parse_loop_entry_point(void* params);
00288 
00289 public:
00290     bool parsing_done;  
00291     bool reading_done;  
00293     E_EXAMPLE_TYPE example_type; 
00295 protected:
00302     void (CStreamingFile::*read_vector) (T* &vec, int32_t &len);
00303 
00310     void (CStreamingFile::*read_vector_and_label) (T* &vec, int32_t &len, float64_t &label);
00311 
00313     CStreamingFile* input_source;
00314 
00316     pthread_t parse_thread;
00317 
00319     CParseBuffer<T>* examples_ring;
00320 
00322     int32_t number_of_features;
00323 
00325     int32_t number_of_vectors_parsed;
00326 
00328     int32_t number_of_vectors_read;
00329 
00331     Example<T>* current_example;
00332 
00334     T* current_feature_vector;
00335 
00337     float64_t current_label;
00338 
00340     int32_t current_len;
00341 
00343     bool free_after_release;
00344 
00346     int32_t ring_size;
00347 
00349     pthread_mutex_t examples_state_lock;
00350 
00352     pthread_cond_t examples_state_changed;
00353 
00354 };
00355 
00356 template <class T>
00357     void CInputParser<T>::set_read_vector(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len))
00358 {
00359     // Set read_vector to point to the function passed as arg
00360     read_vector=func_ptr;
00361 }
00362 
00363 template <class T>
00364     void CInputParser<T>::set_read_vector_and_label(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len, float64_t &label))
00365 {
00366     // Set read_vector_and_label to point to the function passed as arg
00367     read_vector_and_label=func_ptr;
00368 }
00369 
00370 template <class T>
00371     CInputParser<T>::CInputParser()
00372 {
00373     /* this line was commented out when I found it. However, the mutex locks
00374      * have to be initialised. Otherwise uninitialised memory error */
00375     //init(NULL, true, PARSER_DEFAULT_BUFFSIZE);
00376     pthread_mutex_init(&examples_state_lock, NULL);
00377     pthread_cond_init(&examples_state_changed, NULL);
00378     examples_ring=NULL;
00379     parsing_done=true;
00380     reading_done=true;
00381 }
00382 
00383 template <class T>
00384     CInputParser<T>::~CInputParser()
00385 {
00386     pthread_mutex_destroy(&examples_state_lock);
00387     pthread_cond_destroy(&examples_state_changed);
00388 
00389     SG_UNREF(examples_ring);
00390 }
00391 
00392 template <class T>
00393     void CInputParser<T>::init(CStreamingFile* input_file, bool is_labelled, int32_t size)
00394 {
00395     input_source = input_file;
00396 
00397     if (is_labelled == true)
00398         example_type = E_LABELLED;
00399     else
00400         example_type = E_UNLABELLED;
00401 
00402     examples_ring = new CParseBuffer<T>(size);
00403     SG_REF(examples_ring);
00404 
00405     parsing_done = false;
00406     reading_done = false;
00407     number_of_vectors_parsed = 0;
00408     number_of_vectors_read = 0;
00409 
00410     current_len = -1;
00411     current_label = -1;
00412     current_feature_vector = NULL;
00413 
00414     free_after_release=true;
00415     ring_size=size;
00416 
00417     pthread_mutex_init(&examples_state_lock, NULL);
00418     pthread_cond_init(&examples_state_changed, NULL);
00419 }
00420 
00421 template <class T>
00422     void CInputParser<T>::set_free_vector_after_release(bool free_vec)
00423 {
00424     free_after_release=free_vec;
00425 }
00426 
00427 template <class T>
00428     void CInputParser<T>::set_free_vectors_on_destruct(bool destroy)
00429 {
00430     examples_ring->set_free_vectors_on_destruct(destroy);
00431 }
00432 
00433 template <class T>
00434     void CInputParser<T>::start_parser()
00435 {
00436     SG_SDEBUG("entering CInputParser::start_parser()\n");
00437     if (is_running())
00438     {
00439         SG_SERROR("Parser thread is already running! Multiple parse threads not supported.\n");
00440     }
00441 
00442     SG_SDEBUG("creating parse thread\n");
00443     pthread_create(&parse_thread, NULL, parse_loop_entry_point, this);
00444 
00445     SG_SDEBUG("leaving CInputParser::start_parser()\n");
00446 }
00447 
00448 template <class T>
00449     void* CInputParser<T>::parse_loop_entry_point(void* params)
00450 {
00451     ((CInputParser *) params)->main_parse_loop(params);
00452 
00453     return NULL;
00454 }
00455 
00456 template <class T>
00457     bool CInputParser<T>::is_running()
00458 {
00459     SG_SDEBUG("entering CInputParser::is_running()\n");
00460     bool ret;
00461 
00462     pthread_mutex_lock(&examples_state_lock);
00463 
00464     if (parsing_done)
00465         if (reading_done)
00466             ret = false;
00467         else
00468             ret = true;
00469     else
00470         ret = false;
00471 
00472     pthread_mutex_unlock(&examples_state_lock);
00473 
00474     SG_SDEBUG("leaving CInputParser::is_running(), returnung %d\n", ret);
00475     return ret;
00476 }
00477 
00478 template <class T>
00479     int32_t CInputParser<T>::get_vector_and_label(T* &feature_vector,
00480                               int32_t &length,
00481                               float64_t &label)
00482 {
00483     (input_source->*read_vector_and_label)(feature_vector, length, label);
00484 
00485     if (length < 1)
00486     {
00487         // Problem reading the example
00488         return 0;
00489     }
00490 
00491     return 1;
00492 }
00493 
00494 template <class T>
00495     int32_t CInputParser<T>::get_vector_only(T* &feature_vector,
00496                          int32_t &length)
00497 {
00498     (input_source->*read_vector)(feature_vector, length);
00499 
00500     if (length < 1)
00501     {
00502         // Problem reading the example
00503         return 0;
00504     }
00505 
00506     return 1;
00507 }
00508 
00509 template <class T>
00510     void CInputParser<T>::copy_example_into_buffer(Example<T>* ex)
00511 {
00512     examples_ring->copy_example(ex);
00513 }
00514 
00515 template <class T> void* CInputParser<T>::main_parse_loop(void* params)
00516 {
00517     // Read the examples into current_* objects
00518     // Instead of allocating mem for new objects each time
00519 #ifdef HAVE_PTHREAD
00520     CInputParser* this_obj = (CInputParser *) params;
00521     this->input_source = this_obj->input_source;
00522 
00523     while (1)
00524     {
00525         pthread_mutex_lock(&examples_state_lock);
00526         if (parsing_done)
00527         {
00528             pthread_mutex_unlock(&examples_state_lock);
00529             return NULL;
00530         }
00531         pthread_mutex_unlock(&examples_state_lock);
00532 
00533         pthread_testcancel();
00534 
00535     current_example = examples_ring->get_free_example();
00536     current_feature_vector = current_example->fv.vector;
00537     current_len = current_example->fv.vlen;
00538     current_label = current_example->label;
00539 
00540         if (example_type == E_LABELLED)
00541             get_vector_and_label(current_feature_vector, current_len, current_label);
00542         else
00543             get_vector_only(current_feature_vector, current_len);
00544 
00545         if (current_len < 0)
00546         {
00547             pthread_mutex_lock(&examples_state_lock);
00548             parsing_done = true;
00549             pthread_cond_signal(&examples_state_changed);
00550             pthread_mutex_unlock(&examples_state_lock);
00551             return NULL;
00552         }
00553 
00554         current_example->label = current_label;
00555         current_example->fv.vector = current_feature_vector;
00556         current_example->fv.vlen = current_len;
00557 
00558         examples_ring->copy_example(current_example);
00559 
00560         pthread_mutex_lock(&examples_state_lock);
00561         number_of_vectors_parsed++;
00562         pthread_cond_signal(&examples_state_changed);
00563         pthread_mutex_unlock(&examples_state_lock);
00564     }
00565 #endif /* HAVE_PTHREAD */
00566     return NULL;
00567 }
00568 
00569 template <class T> Example<T>* CInputParser<T>::retrieve_example()
00570 {
00571     /* This function should be guarded by mutexes while calling  */
00572     Example<T> *ex;
00573 
00574     if (parsing_done)
00575     {
00576         if (number_of_vectors_read == number_of_vectors_parsed)
00577         {
00578             reading_done = true;
00579             /* Signal to waiting threads that no more examples are left */
00580             pthread_cond_signal(&examples_state_changed);
00581             return NULL;
00582         }
00583     }
00584 
00585     if (number_of_vectors_parsed <= 0)
00586         return NULL;
00587 
00588     if (number_of_vectors_read == number_of_vectors_parsed)
00589     {
00590         return NULL;
00591     }
00592 
00593     ex = examples_ring->get_unused_example();
00594     number_of_vectors_read++;
00595 
00596     return ex;
00597 }
00598 
00599 template <class T> int32_t CInputParser<T>::get_next_example(T* &fv,
00600         int32_t &length, float64_t &label)
00601 {
00602     /* if reading is done, no more examples can be fetched. return 0
00603        else, if example can be read, get the example and return 1.
00604        otherwise, wait for further parsing, get the example and
00605        return 1 */
00606 
00607     Example<T> *ex;
00608 
00609     while (1)
00610     {
00611         if (reading_done)
00612             return 0;
00613 
00614         pthread_mutex_lock(&examples_state_lock);
00615         ex = retrieve_example();
00616 
00617         if (ex == NULL)
00618         {
00619             if (reading_done)
00620             {
00621                 /* No more examples left, return */
00622                 pthread_mutex_unlock(&examples_state_lock);
00623                 return 0;
00624             }
00625             else
00626             {
00627                 /* Examples left, wait for one to become ready */
00628                 pthread_cond_wait(&examples_state_changed, &examples_state_lock);
00629                 pthread_mutex_unlock(&examples_state_lock);
00630                 continue;
00631             }
00632         }
00633         else
00634         {
00635             /* Example ready, return the example */
00636             pthread_mutex_unlock(&examples_state_lock);
00637             break;
00638         }
00639     }
00640 
00641     fv = ex->fv.vector;
00642     length = ex->fv.vlen;
00643     label = ex->label;
00644 
00645     return 1;
00646 }
00647 
00648 template <class T>
00649     int32_t CInputParser<T>::get_next_example(T* &fv, int32_t &length)
00650 {
00651     float64_t label_dummy;
00652 
00653     return get_next_example(fv, length, label_dummy);
00654 }
00655 
00656 template <class T>
00657     void CInputParser<T>::finalize_example()
00658 {
00659     examples_ring->finalize_example(free_after_release);
00660 }
00661 
00662 template <class T> void CInputParser<T>::end_parser()
00663 {
00664     SG_SDEBUG("entering CInputParser::end_parser\n");
00665     SG_SDEBUG("joining parse thread\n");
00666     pthread_join(parse_thread, NULL);
00667     SG_SDEBUG("leaving CInputParser::end_parser\n");
00668 }
00669 
00670 template <class T> void CInputParser<T>::exit_parser()
00671 {
00672     SG_SDEBUG("cancelling parse thread\n");
00673     pthread_cancel(parse_thread);
00674 }
00675 }
00676 #endif // __INPUTPARSER_H__
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

SHOGUN Machine Learning Toolbox - Documentation