Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 #ifndef __INPUTPARSER_H__
00012 #define __INPUTPARSER_H__
00013
00014 #include <shogun/io/SGIO.h>
00015 #include <shogun/io/StreamingFile.h>
00016 #include <shogun/lib/common.h>
00017 #include <shogun/io/ParseBuffer.h>
00018 #include <pthread.h>
00019
00020 #define PARSER_DEFAULT_BUFFSIZE 100
00021
00022 namespace shogun
00023 {
00026 enum E_EXAMPLE_TYPE
00027 {
00028 E_LABELLED = 1,
00029 E_UNLABELLED = 2
00030 };
00031
00080 template <class T> class CInputParser
00081 {
00082 public:
00083
00088 CInputParser();
00089
00094 ~CInputParser();
00095
00107 void init(CStreamingFile* input_file, bool is_labelled = true, int32_t size = PARSER_DEFAULT_BUFFSIZE);
00108
00114 bool is_running();
00115
00122 int32_t get_number_of_features() { return number_of_features; }
00123
00135 void set_read_vector(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len));
00136
00148 void set_read_vector_and_label(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len, float64_t &label));
00149
00161 int32_t get_vector_and_label(T* &feature_vector,
00162 int32_t &length,
00163 float64_t &label);
00164
00175 int32_t get_vector_only(T* &feature_vector, int32_t &length);
00176
00183 void set_free_vector_after_release(bool free_vec);
00184
00191 void set_free_vectors_on_destruct(bool destroy);
00192
00198 void start_parser();
00199
00208 void* main_parse_loop(void* params);
00209
00210
00216 void copy_example_into_buffer(Example<T>* ex);
00217
00224 Example<T>* retrieve_example();
00225
00238 int32_t get_next_example(T* &feature_vector,
00239 int32_t &length,
00240 float64_t &label);
00241
00250 int32_t get_next_example(T* &feature_vector,
00251 int32_t &length);
00252
00260 void finalize_example();
00261
00266 void end_parser();
00267
00270 void exit_parser();
00271
00277 int32_t get_ring_size() { return ring_size; }
00278
00279 private:
00287 static void* parse_loop_entry_point(void* params);
00288
00289 public:
00290 bool parsing_done;
00291 bool reading_done;
00293 E_EXAMPLE_TYPE example_type;
00295 protected:
00302 void (CStreamingFile::*read_vector) (T* &vec, int32_t &len);
00303
00310 void (CStreamingFile::*read_vector_and_label) (T* &vec, int32_t &len, float64_t &label);
00311
00313 CStreamingFile* input_source;
00314
00316 pthread_t parse_thread;
00317
00319 CParseBuffer<T>* examples_ring;
00320
00322 int32_t number_of_features;
00323
00325 int32_t number_of_vectors_parsed;
00326
00328 int32_t number_of_vectors_read;
00329
00331 Example<T>* current_example;
00332
00334 T* current_feature_vector;
00335
00337 float64_t current_label;
00338
00340 int32_t current_len;
00341
00343 bool free_after_release;
00344
00346 int32_t ring_size;
00347
00349 pthread_mutex_t examples_state_lock;
00350
00352 pthread_cond_t examples_state_changed;
00353 };
00354
00355 template <class T>
00356 void CInputParser<T>::set_read_vector(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len))
00357 {
00358
00359 read_vector=func_ptr;
00360 }
00361
00362 template <class T>
00363 void CInputParser<T>::set_read_vector_and_label(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len, float64_t &label))
00364 {
00365
00366 read_vector_and_label=func_ptr;
00367 }
00368
00369 template <class T>
00370 CInputParser<T>::CInputParser()
00371 {
00372
00373 }
00374
00375 template <class T>
00376 CInputParser<T>::~CInputParser()
00377 {
00378 end_parser();
00379
00380 pthread_mutex_destroy(&examples_state_lock);
00381 pthread_cond_destroy(&examples_state_changed);
00382
00383 delete examples_ring;
00384 }
00385
00386 template <class T>
00387 void CInputParser<T>::init(CStreamingFile* input_file, bool is_labelled, int32_t size)
00388 {
00389 input_source = input_file;
00390
00391 if (is_labelled == true)
00392 example_type = E_LABELLED;
00393 else
00394 example_type = E_UNLABELLED;
00395
00396 examples_ring = new CParseBuffer<T>(size);
00397
00398 parsing_done = false;
00399 reading_done = false;
00400 number_of_vectors_parsed = 0;
00401 number_of_vectors_read = 0;
00402
00403 current_len = -1;
00404 current_label = -1;
00405 current_feature_vector = NULL;
00406
00407 free_after_release=true;
00408 ring_size=size;
00409
00410 pthread_mutex_init(&examples_state_lock, NULL);
00411 pthread_cond_init(&examples_state_changed, NULL);
00412 }
00413
00414 template <class T>
00415 void CInputParser<T>::set_free_vector_after_release(bool free_vec)
00416 {
00417 free_after_release=free_vec;
00418 }
00419
00420 template <class T>
00421 void CInputParser<T>::set_free_vectors_on_destruct(bool destroy)
00422 {
00423 examples_ring->set_free_vectors_on_destruct(destroy);
00424 }
00425
00426 template <class T>
00427 void CInputParser<T>::start_parser()
00428 {
00429 if (is_running())
00430 {
00431 SG_SERROR("Parser thread is already running! Multiple parse threads not supported.\n");
00432 }
00433
00434 pthread_create(&parse_thread, NULL, parse_loop_entry_point, this);
00435 }
00436
00437 template <class T>
00438 void* CInputParser<T>::parse_loop_entry_point(void* params)
00439 {
00440 ((CInputParser *) params)->main_parse_loop(params);
00441
00442 return NULL;
00443 }
00444
00445 template <class T>
00446 bool CInputParser<T>::is_running()
00447 {
00448 bool ret;
00449
00450 pthread_mutex_lock(&examples_state_lock);
00451
00452 if (parsing_done)
00453 if (reading_done)
00454 ret = false;
00455 else
00456 ret = true;
00457 else
00458 ret = false;
00459
00460 pthread_mutex_unlock(&examples_state_lock);
00461 return ret;
00462 }
00463
00464 template <class T>
00465 int32_t CInputParser<T>::get_vector_and_label(T* &feature_vector,
00466 int32_t &length,
00467 float64_t &label)
00468 {
00469 (input_source->*read_vector_and_label)(feature_vector, length, label);
00470
00471 if (length < 1)
00472 {
00473
00474 return 0;
00475 }
00476
00477 return 1;
00478 }
00479
00480 template <class T>
00481 int32_t CInputParser<T>::get_vector_only(T* &feature_vector,
00482 int32_t &length)
00483 {
00484 (input_source->*read_vector)(feature_vector, length);
00485
00486 if (length < 1)
00487 {
00488
00489 return 0;
00490 }
00491
00492 return 1;
00493 }
00494
00495 template <class T>
00496 void CInputParser<T>::copy_example_into_buffer(Example<T>* ex)
00497 {
00498 examples_ring->copy_example(ex);
00499 }
00500
00501 template <class T> void* CInputParser<T>::main_parse_loop(void* params)
00502 {
00503
00504
00505
00506 CInputParser* this_obj = (CInputParser *) params;
00507 this->input_source = this_obj->input_source;
00508
00509 while (1)
00510 {
00511 pthread_mutex_lock(&examples_state_lock);
00512 if (parsing_done)
00513 {
00514 pthread_mutex_unlock(&examples_state_lock);
00515 return NULL;
00516 }
00517 pthread_mutex_unlock(&examples_state_lock);
00518
00519 pthread_testcancel();
00520
00521 current_example = examples_ring->get_free_example();
00522 current_feature_vector = current_example->fv.vector;
00523 current_len = current_example->fv.vlen;
00524 current_label = current_example->label;
00525
00526 if (example_type == E_LABELLED)
00527 get_vector_and_label(current_feature_vector, current_len, current_label);
00528 else
00529 get_vector_only(current_feature_vector, current_len);
00530
00531 if (current_len < 0)
00532 {
00533 pthread_mutex_lock(&examples_state_lock);
00534 parsing_done = true;
00535 pthread_cond_signal(&examples_state_changed);
00536 pthread_mutex_unlock(&examples_state_lock);
00537 return NULL;
00538 }
00539
00540 current_example->label = current_label;
00541 current_example->fv.vector = current_feature_vector;
00542 current_example->fv.vlen = current_len;
00543
00544 examples_ring->copy_example(current_example);
00545
00546 pthread_mutex_lock(&examples_state_lock);
00547 number_of_vectors_parsed++;
00548 pthread_cond_signal(&examples_state_changed);
00549 pthread_mutex_unlock(&examples_state_lock);
00550 }
00551
00552 return NULL;
00553 }
00554
00555 template <class T> Example<T>* CInputParser<T>::retrieve_example()
00556 {
00557
00558 Example<T> *ex;
00559
00560 if (parsing_done)
00561 {
00562 if (number_of_vectors_read == number_of_vectors_parsed)
00563 {
00564 reading_done = true;
00565
00566 pthread_cond_signal(&examples_state_changed);
00567 return NULL;
00568 }
00569 }
00570
00571 if (number_of_vectors_parsed <= 0)
00572 return NULL;
00573
00574 if (number_of_vectors_read == number_of_vectors_parsed)
00575 {
00576 return NULL;
00577 }
00578
00579 ex = examples_ring->get_unused_example();
00580 number_of_vectors_read++;
00581
00582 return ex;
00583 }
00584
00585 template <class T> int32_t CInputParser<T>::get_next_example(T* &fv,
00586 int32_t &length, float64_t &label)
00587 {
00588
00589
00590
00591
00592
00593 Example<T> *ex;
00594
00595 while (1)
00596 {
00597 if (reading_done)
00598 return 0;
00599
00600 pthread_mutex_lock(&examples_state_lock);
00601 ex = retrieve_example();
00602
00603 if (ex == NULL)
00604 {
00605 if (reading_done)
00606 {
00607
00608 pthread_mutex_unlock(&examples_state_lock);
00609 return 0;
00610 }
00611 else
00612 {
00613
00614 pthread_cond_wait(&examples_state_changed, &examples_state_lock);
00615 pthread_mutex_unlock(&examples_state_lock);
00616 continue;
00617 }
00618 }
00619 else
00620 {
00621
00622 pthread_mutex_unlock(&examples_state_lock);
00623 break;
00624 }
00625 }
00626
00627 fv = ex->fv.vector;
00628 length = ex->fv.vlen;
00629 label = ex->label;
00630
00631 return 1;
00632 }
00633
00634 template <class T>
00635 int32_t CInputParser<T>::get_next_example(T* &fv, int32_t &length)
00636 {
00637 float64_t label_dummy;
00638
00639 return get_next_example(fv, length, label_dummy);
00640 }
00641
00642 template <class T>
00643 void CInputParser<T>::finalize_example()
00644 {
00645 examples_ring->finalize_example(free_after_release);
00646 }
00647
00648 template <class T> void CInputParser<T>::end_parser()
00649 {
00650 pthread_join(parse_thread, NULL);
00651 }
00652
00653 template <class T> void CInputParser<T>::exit_parser()
00654 {
00655 pthread_cancel(parse_thread);
00656 }
00657 }
00658 #endif // __INPUTPARSER_H__