11 #ifndef __INPUTPARSER_H__
12 #define __INPUTPARSER_H__
22 #define PARSER_DEFAULT_BUFFSIZE 100
82 template <
class T>
class CInputParser
109 void init(CStreamingFile* input_file,
bool is_labelled =
true, int32_t size = PARSER_DEFAULT_BUFFSIZE);
124 int32_t get_number_of_features() {
return number_of_features; }
137 void set_read_vector(
void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len));
150 void set_read_vector_and_label(
void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len,
float64_t &label));
163 int32_t get_vector_and_label(T* &feature_vector,
177 int32_t get_vector_only(T* &feature_vector, int32_t &length);
185 void set_free_vector_after_release(
bool free_vec);
193 void set_free_vectors_on_destruct(
bool destroy);
210 void* main_parse_loop(
void* params);
218 void copy_example_into_buffer(Example<T>* ex);
226 Example<T>* retrieve_example();
240 int32_t get_next_example(T* &feature_vector,
252 int32_t get_next_example(T* &feature_vector,
262 void finalize_example();
279 int32_t get_ring_size() {
return ring_size; }
289 static void* parse_loop_entry_point(
void* params);
295 E_EXAMPLE_TYPE example_type;
304 void (CStreamingFile::*read_vector) (T* &vec, int32_t &len);
312 void (CStreamingFile::*read_vector_and_label) (T* &vec, int32_t &len,
float64_t &label);
315 CStreamingFile* input_source;
318 pthread_t parse_thread;
321 CParseBuffer<T>* examples_ring;
324 int32_t number_of_features;
327 int32_t number_of_vectors_parsed;
330 int32_t number_of_vectors_read;
333 Example<T>* current_example;
336 T* current_feature_vector;
345 bool free_after_release;
351 pthread_mutex_t examples_state_lock;
354 pthread_cond_t examples_state_changed;
359 void CInputParser<T>::set_read_vector(
void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len))
362 read_vector=func_ptr;
366 void CInputParser<T>::set_read_vector_and_label(
void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len,
float64_t &label))
369 read_vector_and_label=func_ptr;
373 CInputParser<T>::CInputParser()
378 pthread_mutex_init(&examples_state_lock, NULL);
379 pthread_cond_init(&examples_state_changed, NULL);
386 CInputParser<T>::~CInputParser()
388 pthread_mutex_destroy(&examples_state_lock);
389 pthread_cond_destroy(&examples_state_changed);
395 void CInputParser<T>::init(CStreamingFile* input_file,
bool is_labelled, int32_t size)
397 input_source = input_file;
399 if (is_labelled ==
true)
400 example_type = E_LABELLED;
402 example_type = E_UNLABELLED;
404 examples_ring =
new CParseBuffer<T>(size);
407 parsing_done =
false;
408 reading_done =
false;
409 number_of_vectors_parsed = 0;
410 number_of_vectors_read = 0;
414 current_feature_vector = NULL;
416 free_after_release=
true;
421 void CInputParser<T>::set_free_vector_after_release(
bool free_vec)
423 free_after_release=free_vec;
427 void CInputParser<T>::set_free_vectors_on_destruct(
bool destroy)
429 examples_ring->set_free_vectors_on_destruct(destroy);
433 void CInputParser<T>::start_parser()
435 SG_SDEBUG(
"entering CInputParser::start_parser()\n")
438 SG_SERROR(
"Parser thread is already running! Multiple parse threads not supported.\n")
442 pthread_create(&parse_thread, NULL, parse_loop_entry_point, this);
444 SG_SDEBUG("leaving CInputParser::start_parser()\n")
448 void* CInputParser<T>::parse_loop_entry_point(
void* params)
450 ((CInputParser *) params)->main_parse_loop(params);
456 bool CInputParser<T>::is_running()
458 SG_SDEBUG(
"entering CInputParser::is_running()\n")
461 pthread_mutex_lock(&examples_state_lock);
471 pthread_mutex_unlock(&examples_state_lock);
473 SG_SDEBUG("leaving CInputParser::is_running(), returning %d\n", ret)
478 int32_t CInputParser<T>::get_vector_and_label(T* &feature_vector,
482 (input_source->*read_vector_and_label)(feature_vector, length, label);
494 int32_t CInputParser<T>::get_vector_only(T* &feature_vector,
497 (input_source->*read_vector)(feature_vector, length);
509 void CInputParser<T>::copy_example_into_buffer(Example<T>* ex)
511 examples_ring->copy_example(ex);
514 template <
class T>
void* CInputParser<T>::main_parse_loop(
void* params)
519 CInputParser* this_obj = (CInputParser *) params;
520 this->input_source = this_obj->input_source;
524 pthread_mutex_lock(&examples_state_lock);
527 pthread_mutex_unlock(&examples_state_lock);
530 pthread_mutex_unlock(&examples_state_lock);
532 pthread_testcancel();
534 current_example = examples_ring->get_free_example();
535 current_feature_vector = current_example->fv;
536 current_len = current_example->length;
537 current_label = current_example->label;
539 if (example_type == E_LABELLED)
540 get_vector_and_label(current_feature_vector, current_len, current_label);
542 get_vector_only(current_feature_vector, current_len);
546 pthread_mutex_lock(&examples_state_lock);
548 pthread_cond_signal(&examples_state_changed);
549 pthread_mutex_unlock(&examples_state_lock);
553 current_example->label = current_label;
554 current_example->fv = current_feature_vector;
555 current_example->length = current_len;
557 examples_ring->copy_example(current_example);
559 pthread_mutex_lock(&examples_state_lock);
560 number_of_vectors_parsed++;
561 pthread_cond_signal(&examples_state_changed);
562 pthread_mutex_unlock(&examples_state_lock);
568 template <
class T> Example<T>* CInputParser<T>::retrieve_example()
575 if (number_of_vectors_read == number_of_vectors_parsed)
579 pthread_cond_signal(&examples_state_changed);
584 if (number_of_vectors_parsed <= 0)
587 if (number_of_vectors_read == number_of_vectors_parsed)
592 ex = examples_ring->get_unused_example();
593 number_of_vectors_read++;
598 template <
class T> int32_t CInputParser<T>::get_next_example(T* &fv,
613 pthread_mutex_lock(&examples_state_lock);
614 ex = retrieve_example();
621 pthread_mutex_unlock(&examples_state_lock);
627 pthread_cond_wait(&examples_state_changed, &examples_state_lock);
628 pthread_mutex_unlock(&examples_state_lock);
635 pthread_mutex_unlock(&examples_state_lock);
648 int32_t CInputParser<T>::get_next_example(T* &fv, int32_t &length)
652 return get_next_example(fv, length, label_dummy);
656 void CInputParser<T>::finalize_example()
658 examples_ring->finalize_example(free_after_release);
661 template <
class T>
void CInputParser<T>::end_parser()
663 SG_SDEBUG(
"entering CInputParser::end_parser\n")
665 pthread_join(parse_thread, NULL);
666 SG_SDEBUG("leaving CInputParser::end_parser\n")
669 template <
class T>
void CInputParser<T>::exit_parser()
672 pthread_cancel(parse_thread);
678 #endif // __INPUTPARSER_H__