11 #ifndef __INPUTPARSER_H__
12 #define __INPUTPARSER_H__
24 #define PARSER_DEFAULT_BUFFSIZE 100
84 template <
class T>
class CInputParser
111 void init(CStreamingFile* input_file,
bool is_labelled =
true, int32_t size = PARSER_DEFAULT_BUFFSIZE);
126 int32_t get_number_of_features() {
return number_of_features; }
139 void set_read_vector(
void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len));
152 void set_read_vector_and_label(
void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len,
float64_t &label));
165 int32_t get_vector_and_label(T* &feature_vector,
179 int32_t get_vector_only(T* &feature_vector, int32_t &length);
187 void set_free_vector_after_release(
bool free_vec);
195 void set_free_vectors_on_destruct(
bool destroy);
212 void* main_parse_loop(
void* params);
220 void copy_example_into_buffer(Example<T>* ex);
228 Example<T>* retrieve_example();
242 int32_t get_next_example(T* &feature_vector,
254 int32_t get_next_example(T* &feature_vector,
264 void finalize_example();
281 int32_t get_ring_size() {
return ring_size; }
291 static void* parse_loop_entry_point(
void* params);
297 E_EXAMPLE_TYPE example_type;
306 void (CStreamingFile::*read_vector) (T* &vec, int32_t &len);
314 void (CStreamingFile::*read_vector_and_label) (T* &vec, int32_t &len,
float64_t &label);
317 CStreamingFile* input_source;
320 pthread_t parse_thread;
323 CParseBuffer<T>* examples_ring;
326 int32_t number_of_features;
329 int32_t number_of_vectors_parsed;
332 int32_t number_of_vectors_read;
335 Example<T>* current_example;
338 T* current_feature_vector;
347 bool free_after_release;
353 pthread_mutex_t examples_state_lock;
356 pthread_cond_t examples_state_changed;
361 void CInputParser<T>::set_read_vector(
void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len))
364 read_vector=func_ptr;
368 void CInputParser<T>::set_read_vector_and_label(
void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len,
float64_t &label))
371 read_vector_and_label=func_ptr;
375 CInputParser<T>::CInputParser()
380 pthread_mutex_init(&examples_state_lock, NULL);
381 pthread_cond_init(&examples_state_changed, NULL);
388 CInputParser<T>::~CInputParser()
390 pthread_mutex_destroy(&examples_state_lock);
391 pthread_cond_destroy(&examples_state_changed);
397 void CInputParser<T>::init(CStreamingFile* input_file,
bool is_labelled, int32_t size)
399 input_source = input_file;
401 if (is_labelled ==
true)
402 example_type = E_LABELLED;
404 example_type = E_UNLABELLED;
406 examples_ring =
new CParseBuffer<T>(size);
409 parsing_done =
false;
410 reading_done =
false;
411 number_of_vectors_parsed = 0;
412 number_of_vectors_read = 0;
416 current_feature_vector = NULL;
418 free_after_release=
true;
423 void CInputParser<T>::set_free_vector_after_release(
bool free_vec)
425 free_after_release=free_vec;
429 void CInputParser<T>::set_free_vectors_on_destruct(
bool destroy)
431 examples_ring->set_free_vectors_on_destruct(destroy);
435 void CInputParser<T>::start_parser()
437 SG_SDEBUG(
"entering CInputParser::start_parser()\n")
440 SG_SERROR(
"Parser thread is already running! Multiple parse threads not supported.\n")
444 pthread_create(&parse_thread, NULL, parse_loop_entry_point, this);
446 SG_SDEBUG("leaving CInputParser::start_parser()\n")
450 void* CInputParser<T>::parse_loop_entry_point(
void* params)
452 ((CInputParser *) params)->main_parse_loop(params);
458 bool CInputParser<T>::is_running()
460 SG_SDEBUG(
"entering CInputParser::is_running()\n")
463 pthread_mutex_lock(&examples_state_lock);
473 pthread_mutex_unlock(&examples_state_lock);
475 SG_SDEBUG("leaving CInputParser::is_running(), returning %d\n", ret)
480 int32_t CInputParser<T>::get_vector_and_label(T* &feature_vector,
484 (input_source->*read_vector_and_label)(feature_vector, length, label);
496 int32_t CInputParser<T>::get_vector_only(T* &feature_vector,
499 (input_source->*read_vector)(feature_vector, length);
511 void CInputParser<T>::copy_example_into_buffer(Example<T>* ex)
513 examples_ring->copy_example(ex);
516 template <
class T>
void* CInputParser<T>::main_parse_loop(
void* params)
521 CInputParser* this_obj = (CInputParser *) params;
522 this->input_source = this_obj->input_source;
526 pthread_mutex_lock(&examples_state_lock);
529 pthread_mutex_unlock(&examples_state_lock);
532 pthread_mutex_unlock(&examples_state_lock);
534 pthread_testcancel();
536 current_example = examples_ring->get_free_example();
537 current_feature_vector = current_example->fv;
538 current_len = current_example->length;
539 current_label = current_example->label;
541 if (example_type == E_LABELLED)
542 get_vector_and_label(current_feature_vector, current_len, current_label);
544 get_vector_only(current_feature_vector, current_len);
548 pthread_mutex_lock(&examples_state_lock);
550 pthread_cond_signal(&examples_state_changed);
551 pthread_mutex_unlock(&examples_state_lock);
555 current_example->label = current_label;
556 current_example->fv = current_feature_vector;
557 current_example->length = current_len;
559 examples_ring->copy_example(current_example);
561 pthread_mutex_lock(&examples_state_lock);
562 number_of_vectors_parsed++;
563 pthread_cond_signal(&examples_state_changed);
564 pthread_mutex_unlock(&examples_state_lock);
570 template <
class T> Example<T>* CInputParser<T>::retrieve_example()
577 if (number_of_vectors_read == number_of_vectors_parsed)
581 pthread_cond_signal(&examples_state_changed);
586 if (number_of_vectors_parsed <= 0)
589 if (number_of_vectors_read == number_of_vectors_parsed)
594 ex = examples_ring->get_unused_example();
595 number_of_vectors_read++;
600 template <
class T> int32_t CInputParser<T>::get_next_example(T* &fv,
615 pthread_mutex_lock(&examples_state_lock);
616 ex = retrieve_example();
623 pthread_mutex_unlock(&examples_state_lock);
629 pthread_cond_wait(&examples_state_changed, &examples_state_lock);
630 pthread_mutex_unlock(&examples_state_lock);
637 pthread_mutex_unlock(&examples_state_lock);
650 int32_t CInputParser<T>::get_next_example(T* &fv, int32_t &length)
654 return get_next_example(fv, length, label_dummy);
658 void CInputParser<T>::finalize_example()
660 examples_ring->finalize_example(free_after_release);
663 template <
class T>
void CInputParser<T>::end_parser()
665 SG_SDEBUG(
"entering CInputParser::end_parser\n")
667 pthread_join(parse_thread, NULL);
668 SG_SDEBUG("leaving CInputParser::end_parser\n")
671 template <class T>
void CInputParser<T>::exit_parser()
674 pthread_cancel(parse_thread);
680 #endif // __INPUTPARSER_H__
all of classes and functions are contained in the shogun namespace