00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 #ifndef __INPUTPARSER_H__
00012 #define __INPUTPARSER_H__
00013
00014 #include <shogun/io/SGIO.h>
00015 #include <shogun/io/streaming/StreamingFile.h>
00016 #include <shogun/lib/common.h>
00017 #include <shogun/io/streaming/ParseBuffer.h>
00018 #include <pthread.h>
00019
00020 #define PARSER_DEFAULT_BUFFSIZE 100
00021
00022 namespace shogun
00023 {
00026 enum E_EXAMPLE_TYPE
00027 {
00028 E_LABELLED = 1,
00029 E_UNLABELLED = 2
00030 };
00031
00080 template <class T> class CInputParser
00081 {
00082 public:
00083
00088 CInputParser();
00089
00094 ~CInputParser();
00095
00107 void init(CStreamingFile* input_file, bool is_labelled = true, int32_t size = PARSER_DEFAULT_BUFFSIZE);
00108
00114 bool is_running();
00115
00122 int32_t get_number_of_features() { return number_of_features; }
00123
00135 void set_read_vector(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len));
00136
00148 void set_read_vector_and_label(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len, float64_t &label));
00149
00161 int32_t get_vector_and_label(T* &feature_vector,
00162 int32_t &length,
00163 float64_t &label);
00164
00175 int32_t get_vector_only(T* &feature_vector, int32_t &length);
00176
00183 void set_free_vector_after_release(bool free_vec);
00184
00191 void set_free_vectors_on_destruct(bool destroy);
00192
00198 void start_parser();
00199
00208 void* main_parse_loop(void* params);
00209
00210
00216 void copy_example_into_buffer(Example<T>* ex);
00217
00224 Example<T>* retrieve_example();
00225
00238 int32_t get_next_example(T* &feature_vector,
00239 int32_t &length,
00240 float64_t &label);
00241
00250 int32_t get_next_example(T* &feature_vector,
00251 int32_t &length);
00252
00260 void finalize_example();
00261
00266 void end_parser();
00267
00270 void exit_parser();
00271
00277 int32_t get_ring_size() { return ring_size; }
00278
00279 private:
00287 static void* parse_loop_entry_point(void* params);
00288
00289 public:
00290 bool parsing_done;
00291 bool reading_done;
00293 E_EXAMPLE_TYPE example_type;
00295 protected:
00302 void (CStreamingFile::*read_vector) (T* &vec, int32_t &len);
00303
00310 void (CStreamingFile::*read_vector_and_label) (T* &vec, int32_t &len, float64_t &label);
00311
00313 CStreamingFile* input_source;
00314
00316 pthread_t parse_thread;
00317
00319 CParseBuffer<T>* examples_ring;
00320
00322 int32_t number_of_features;
00323
00325 int32_t number_of_vectors_parsed;
00326
00328 int32_t number_of_vectors_read;
00329
00331 Example<T>* current_example;
00332
00334 T* current_feature_vector;
00335
00337 float64_t current_label;
00338
00340 int32_t current_len;
00341
00343 bool free_after_release;
00344
00346 int32_t ring_size;
00347
00349 pthread_mutex_t examples_state_lock;
00350
00352 pthread_cond_t examples_state_changed;
00353
00354 };
00355
00356 template <class T>
00357 void CInputParser<T>::set_read_vector(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len))
00358 {
00359
00360 read_vector=func_ptr;
00361 }
00362
00363 template <class T>
00364 void CInputParser<T>::set_read_vector_and_label(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len, float64_t &label))
00365 {
00366
00367 read_vector_and_label=func_ptr;
00368 }
00369
00370 template <class T>
00371 CInputParser<T>::CInputParser()
00372 {
00373
00374
00375
00376 pthread_mutex_init(&examples_state_lock, NULL);
00377 pthread_cond_init(&examples_state_changed, NULL);
00378 examples_ring=NULL;
00379 parsing_done=true;
00380 reading_done=true;
00381 }
00382
00383 template <class T>
00384 CInputParser<T>::~CInputParser()
00385 {
00386 pthread_mutex_destroy(&examples_state_lock);
00387 pthread_cond_destroy(&examples_state_changed);
00388
00389 SG_UNREF(examples_ring);
00390 }
00391
00392 template <class T>
00393 void CInputParser<T>::init(CStreamingFile* input_file, bool is_labelled, int32_t size)
00394 {
00395 input_source = input_file;
00396
00397 if (is_labelled == true)
00398 example_type = E_LABELLED;
00399 else
00400 example_type = E_UNLABELLED;
00401
00402 examples_ring = new CParseBuffer<T>(size);
00403 SG_REF(examples_ring);
00404
00405 parsing_done = false;
00406 reading_done = false;
00407 number_of_vectors_parsed = 0;
00408 number_of_vectors_read = 0;
00409
00410 current_len = -1;
00411 current_label = -1;
00412 current_feature_vector = NULL;
00413
00414 free_after_release=true;
00415 ring_size=size;
00416
00417 pthread_mutex_init(&examples_state_lock, NULL);
00418 pthread_cond_init(&examples_state_changed, NULL);
00419 }
00420
00421 template <class T>
00422 void CInputParser<T>::set_free_vector_after_release(bool free_vec)
00423 {
00424 free_after_release=free_vec;
00425 }
00426
00427 template <class T>
00428 void CInputParser<T>::set_free_vectors_on_destruct(bool destroy)
00429 {
00430 examples_ring->set_free_vectors_on_destruct(destroy);
00431 }
00432
00433 template <class T>
00434 void CInputParser<T>::start_parser()
00435 {
00436 SG_SDEBUG("entering CInputParser::start_parser()\n");
00437 if (is_running())
00438 {
00439 SG_SERROR("Parser thread is already running! Multiple parse threads not supported.\n");
00440 }
00441
00442 SG_SDEBUG("creating parse thread\n");
00443 pthread_create(&parse_thread, NULL, parse_loop_entry_point, this);
00444
00445 SG_SDEBUG("leaving CInputParser::start_parser()\n");
00446 }
00447
00448 template <class T>
00449 void* CInputParser<T>::parse_loop_entry_point(void* params)
00450 {
00451 ((CInputParser *) params)->main_parse_loop(params);
00452
00453 return NULL;
00454 }
00455
00456 template <class T>
00457 bool CInputParser<T>::is_running()
00458 {
00459 SG_SDEBUG("entering CInputParser::is_running()\n");
00460 bool ret;
00461
00462 pthread_mutex_lock(&examples_state_lock);
00463
00464 if (parsing_done)
00465 if (reading_done)
00466 ret = false;
00467 else
00468 ret = true;
00469 else
00470 ret = false;
00471
00472 pthread_mutex_unlock(&examples_state_lock);
00473
00474 SG_SDEBUG("leaving CInputParser::is_running(), returnung %d\n", ret);
00475 return ret;
00476 }
00477
00478 template <class T>
00479 int32_t CInputParser<T>::get_vector_and_label(T* &feature_vector,
00480 int32_t &length,
00481 float64_t &label)
00482 {
00483 (input_source->*read_vector_and_label)(feature_vector, length, label);
00484
00485 if (length < 1)
00486 {
00487
00488 return 0;
00489 }
00490
00491 return 1;
00492 }
00493
00494 template <class T>
00495 int32_t CInputParser<T>::get_vector_only(T* &feature_vector,
00496 int32_t &length)
00497 {
00498 (input_source->*read_vector)(feature_vector, length);
00499
00500 if (length < 1)
00501 {
00502
00503 return 0;
00504 }
00505
00506 return 1;
00507 }
00508
00509 template <class T>
00510 void CInputParser<T>::copy_example_into_buffer(Example<T>* ex)
00511 {
00512 examples_ring->copy_example(ex);
00513 }
00514
00515 template <class T> void* CInputParser<T>::main_parse_loop(void* params)
00516 {
00517
00518
00519 #ifdef HAVE_PTHREAD
00520 CInputParser* this_obj = (CInputParser *) params;
00521 this->input_source = this_obj->input_source;
00522
00523 while (1)
00524 {
00525 pthread_mutex_lock(&examples_state_lock);
00526 if (parsing_done)
00527 {
00528 pthread_mutex_unlock(&examples_state_lock);
00529 return NULL;
00530 }
00531 pthread_mutex_unlock(&examples_state_lock);
00532
00533 pthread_testcancel();
00534
00535 current_example = examples_ring->get_free_example();
00536 current_feature_vector = current_example->fv.vector;
00537 current_len = current_example->fv.vlen;
00538 current_label = current_example->label;
00539
00540 if (example_type == E_LABELLED)
00541 get_vector_and_label(current_feature_vector, current_len, current_label);
00542 else
00543 get_vector_only(current_feature_vector, current_len);
00544
00545 if (current_len < 0)
00546 {
00547 pthread_mutex_lock(&examples_state_lock);
00548 parsing_done = true;
00549 pthread_cond_signal(&examples_state_changed);
00550 pthread_mutex_unlock(&examples_state_lock);
00551 return NULL;
00552 }
00553
00554 current_example->label = current_label;
00555 current_example->fv.vector = current_feature_vector;
00556 current_example->fv.vlen = current_len;
00557
00558 examples_ring->copy_example(current_example);
00559
00560 pthread_mutex_lock(&examples_state_lock);
00561 number_of_vectors_parsed++;
00562 pthread_cond_signal(&examples_state_changed);
00563 pthread_mutex_unlock(&examples_state_lock);
00564 }
00565 #endif
00566 return NULL;
00567 }
00568
00569 template <class T> Example<T>* CInputParser<T>::retrieve_example()
00570 {
00571
00572 Example<T> *ex;
00573
00574 if (parsing_done)
00575 {
00576 if (number_of_vectors_read == number_of_vectors_parsed)
00577 {
00578 reading_done = true;
00579
00580 pthread_cond_signal(&examples_state_changed);
00581 return NULL;
00582 }
00583 }
00584
00585 if (number_of_vectors_parsed <= 0)
00586 return NULL;
00587
00588 if (number_of_vectors_read == number_of_vectors_parsed)
00589 {
00590 return NULL;
00591 }
00592
00593 ex = examples_ring->get_unused_example();
00594 number_of_vectors_read++;
00595
00596 return ex;
00597 }
00598
00599 template <class T> int32_t CInputParser<T>::get_next_example(T* &fv,
00600 int32_t &length, float64_t &label)
00601 {
00602
00603
00604
00605
00606
00607 Example<T> *ex;
00608
00609 while (1)
00610 {
00611 if (reading_done)
00612 return 0;
00613
00614 pthread_mutex_lock(&examples_state_lock);
00615 ex = retrieve_example();
00616
00617 if (ex == NULL)
00618 {
00619 if (reading_done)
00620 {
00621
00622 pthread_mutex_unlock(&examples_state_lock);
00623 return 0;
00624 }
00625 else
00626 {
00627
00628 pthread_cond_wait(&examples_state_changed, &examples_state_lock);
00629 pthread_mutex_unlock(&examples_state_lock);
00630 continue;
00631 }
00632 }
00633 else
00634 {
00635
00636 pthread_mutex_unlock(&examples_state_lock);
00637 break;
00638 }
00639 }
00640
00641 fv = ex->fv.vector;
00642 length = ex->fv.vlen;
00643 label = ex->label;
00644
00645 return 1;
00646 }
00647
00648 template <class T>
00649 int32_t CInputParser<T>::get_next_example(T* &fv, int32_t &length)
00650 {
00651 float64_t label_dummy;
00652
00653 return get_next_example(fv, length, label_dummy);
00654 }
00655
00656 template <class T>
00657 void CInputParser<T>::finalize_example()
00658 {
00659 examples_ring->finalize_example(free_after_release);
00660 }
00661
00662 template <class T> void CInputParser<T>::end_parser()
00663 {
00664 SG_SDEBUG("entering CInputParser::end_parser\n");
00665 SG_SDEBUG("joining parse thread\n");
00666 pthread_join(parse_thread, NULL);
00667 SG_SDEBUG("leaving CInputParser::end_parser\n");
00668 }
00669
00670 template <class T> void CInputParser<T>::exit_parser()
00671 {
00672 SG_SDEBUG("cancelling parse thread\n");
00673 pthread_cancel(parse_thread);
00674 }
00675 }
00676 #endif // __INPUTPARSER_H__