SHOGUN  4.2.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
InputParser.h
Go to the documentation of this file.
1 /*
2  * This program is free software; you can redistribute it and/or modify
3  * it under the terms of the GNU General Public License as published by
4  * the Free Software Foundation; either version 3 of the License, or
5  * (at your option) any later version.
6  *
7  * Written (W) 2011 Shashwat Lal Das
8  * Copyright (C) 2011 Berlin Institute of Technology and Max-Planck-Society
9  */
10 
11 #ifndef __INPUTPARSER_H__
12 #define __INPUTPARSER_H__
13 
14 #include <shogun/lib/config.h>
15 
16 #include <shogun/lib/common.h>
17 #ifdef HAVE_PTHREAD
18 
19 #include <shogun/io/SGIO.h>
22 #include <pthread.h>
23 
24 #define PARSER_DEFAULT_BUFFSIZE 100
25 
26 namespace shogun
27 {
30  enum E_EXAMPLE_TYPE
31  {
32  E_LABELLED = 1,
33  E_UNLABELLED = 2
34  };
35 
84 template <class T> class CInputParser
85 {
86 public:
87 
92  CInputParser();
93 
98  ~CInputParser();
99 
111  void init(CStreamingFile* input_file, bool is_labelled = true, int32_t size = PARSER_DEFAULT_BUFFSIZE);
112 
118  bool is_running();
119 
126  int32_t get_number_of_features() { return number_of_features; }
127 
139  void set_read_vector(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len));
140 
152  void set_read_vector_and_label(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len, float64_t &label));
153 
165  int32_t get_vector_and_label(T* &feature_vector,
166  int32_t &length,
167  float64_t &label);
168 
179  int32_t get_vector_only(T* &feature_vector, int32_t &length);
180 
187  void set_free_vector_after_release(bool free_vec);
188 
195  void set_free_vectors_on_destruct(bool destroy);
196 
202  void start_parser();
203 
212  void* main_parse_loop(void* params);
213 
214 
220  void copy_example_into_buffer(Example<T>* ex);
221 
228  Example<T>* retrieve_example();
229 
242  int32_t get_next_example(T* &feature_vector,
243  int32_t &length,
244  float64_t &label);
245 
254  int32_t get_next_example(T* &feature_vector,
255  int32_t &length);
256 
264  void finalize_example();
265 
270  void end_parser();
271 
274  void exit_parser();
275 
281  int32_t get_ring_size() { return ring_size; }
282 
283 private:
291  static void* parse_loop_entry_point(void* params);
292 
293 public:
294  bool parsing_done;
295  bool reading_done;
297  E_EXAMPLE_TYPE example_type;
299 protected:
306  void (CStreamingFile::*read_vector) (T* &vec, int32_t &len);
307 
314  void (CStreamingFile::*read_vector_and_label) (T* &vec, int32_t &len, float64_t &label);
315 
317  CStreamingFile* input_source;
318 
320  pthread_t parse_thread;
321 
323  CParseBuffer<T>* examples_ring;
324 
326  int32_t number_of_features;
327 
329  int32_t number_of_vectors_parsed;
330 
332  int32_t number_of_vectors_read;
333 
335  Example<T>* current_example;
336 
338  T* current_feature_vector;
339 
341  float64_t current_label;
342 
344  int32_t current_len;
345 
347  bool free_after_release;
348 
350  int32_t ring_size;
351 
353  pthread_mutex_t examples_state_lock;
354 
356  pthread_cond_t examples_state_changed;
357 
358 };
359 
360 template <class T>
361  void CInputParser<T>::set_read_vector(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len))
362 {
363  // Set read_vector to point to the function passed as arg
364  read_vector=func_ptr;
365 }
366 
367 template <class T>
368  void CInputParser<T>::set_read_vector_and_label(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len, float64_t &label))
369 {
370  // Set read_vector_and_label to point to the function passed as arg
371  read_vector_and_label=func_ptr;
372 }
373 
374 template <class T>
375  CInputParser<T>::CInputParser()
376 {
377  /* this line was commented out when I found it. However, the mutex locks
378  * have to be initialised. Otherwise uninitialised memory error */
379  //init(NULL, true, PARSER_DEFAULT_BUFFSIZE);
380  pthread_mutex_init(&examples_state_lock, NULL);
381  pthread_cond_init(&examples_state_changed, NULL);
382  examples_ring=NULL;
383  parsing_done=true;
384  reading_done=true;
385 }
386 
387 template <class T>
388  CInputParser<T>::~CInputParser()
389 {
390  pthread_mutex_destroy(&examples_state_lock);
391  pthread_cond_destroy(&examples_state_changed);
392 
393  SG_UNREF(examples_ring);
394 }
395 
396 template <class T>
397  void CInputParser<T>::init(CStreamingFile* input_file, bool is_labelled, int32_t size)
398 {
399  input_source = input_file;
400 
401  if (is_labelled == true)
402  example_type = E_LABELLED;
403  else
404  example_type = E_UNLABELLED;
405 
406  examples_ring = new CParseBuffer<T>(size);
407  SG_REF(examples_ring);
408 
409  parsing_done = false;
410  reading_done = false;
411  number_of_vectors_parsed = 0;
412  number_of_vectors_read = 0;
413 
414  current_len = -1;
415  current_label = -1;
416  current_feature_vector = NULL;
417 
418  free_after_release=true;
419  ring_size=size;
420 }
421 
422 template <class T>
423  void CInputParser<T>::set_free_vector_after_release(bool free_vec)
424 {
425  free_after_release=free_vec;
426 }
427 
428 template <class T>
429  void CInputParser<T>::set_free_vectors_on_destruct(bool destroy)
430 {
431  examples_ring->set_free_vectors_on_destruct(destroy);
432 }
433 
434 template <class T>
435  void CInputParser<T>::start_parser()
436 {
437  SG_SDEBUG("entering CInputParser::start_parser()\n")
438  if (is_running())
439  {
440  SG_SERROR("Parser thread is already running! Multiple parse threads not supported.\n")
441  }
442 
443  SG_SDEBUG("creating parse thread\n")
444  examples_ring->init_vector();
445  pthread_create(&parse_thread, NULL, parse_loop_entry_point, this);
446 
447  SG_SDEBUG("leaving CInputParser::start_parser()\n")
448 }
449 
450 template <class T>
451  void* CInputParser<T>::parse_loop_entry_point(void* params)
452 {
453  ((CInputParser *) params)->main_parse_loop(params);
454 
455  return NULL;
456 }
457 
458 template <class T>
459  bool CInputParser<T>::is_running()
460 {
461  SG_SDEBUG("entering CInputParser::is_running()\n")
462  bool ret;
463 
464  pthread_mutex_lock(&examples_state_lock);
465 
466  if (parsing_done)
467  if (reading_done)
468  ret = false;
469  else
470  ret = true;
471  else
472  ret = false;
473 
474  pthread_mutex_unlock(&examples_state_lock);
475 
476  SG_SDEBUG("leaving CInputParser::is_running(), returning %d\n", ret)
477  return ret;
478 }
479 
480 template <class T>
481  int32_t CInputParser<T>::get_vector_and_label(T* &feature_vector,
482  int32_t &length,
483  float64_t &label)
484 {
485  (input_source->*read_vector_and_label)(feature_vector, length, label);
486 
487  if (length < 1)
488  {
489  // Problem reading the example
490  return 0;
491  }
492 
493  return 1;
494 }
495 
496 template <class T>
497  int32_t CInputParser<T>::get_vector_only(T* &feature_vector,
498  int32_t &length)
499 {
500  (input_source->*read_vector)(feature_vector, length);
501 
502  if (length < 1)
503  {
504  // Problem reading the example
505  return 0;
506  }
507 
508  return 1;
509 }
510 
511 template <class T>
512  void CInputParser<T>::copy_example_into_buffer(Example<T>* ex)
513 {
514  examples_ring->copy_example(ex);
515 }
516 
517 template <class T> void* CInputParser<T>::main_parse_loop(void* params)
518 {
519  // Read the examples into current_* objects
520  // Instead of allocating mem for new objects each time
521 #ifdef HAVE_PTHREAD
522  CInputParser* this_obj = (CInputParser *) params;
523  this->input_source = this_obj->input_source;
524 
525  while (1)
526  {
527  pthread_mutex_lock(&examples_state_lock);
528  if (parsing_done)
529  {
530  pthread_mutex_unlock(&examples_state_lock);
531  return NULL;
532  }
533  pthread_mutex_unlock(&examples_state_lock);
534 
535  pthread_testcancel();
536 
537  current_example = examples_ring->get_free_example();
538  current_feature_vector = current_example->fv;
539  current_len = current_example->length;
540  current_label = current_example->label;
541 
542  if (example_type == E_LABELLED)
543  get_vector_and_label(current_feature_vector, current_len, current_label);
544  else
545  get_vector_only(current_feature_vector, current_len);
546 
547  if (current_len < 0)
548  {
549  pthread_mutex_lock(&examples_state_lock);
550  parsing_done = true;
551  pthread_cond_signal(&examples_state_changed);
552  pthread_mutex_unlock(&examples_state_lock);
553  return NULL;
554  }
555 
556  current_example->label = current_label;
557  current_example->fv = current_feature_vector;
558  current_example->length = current_len;
559 
560  examples_ring->copy_example(current_example);
561 
562  pthread_mutex_lock(&examples_state_lock);
563  number_of_vectors_parsed++;
564  pthread_cond_signal(&examples_state_changed);
565  pthread_mutex_unlock(&examples_state_lock);
566  }
567 #endif /* HAVE_PTHREAD */
568  return NULL;
569 }
570 
571 template <class T> Example<T>* CInputParser<T>::retrieve_example()
572 {
573  /* This function should be guarded by mutexes while calling */
574  Example<T> *ex;
575 
576  if (parsing_done)
577  {
578  if (number_of_vectors_read == number_of_vectors_parsed)
579  {
580  reading_done = true;
581  /* Signal to waiting threads that no more examples are left */
582  pthread_cond_signal(&examples_state_changed);
583  return NULL;
584  }
585  }
586 
587  if (number_of_vectors_parsed <= 0)
588  return NULL;
589 
590  if (number_of_vectors_read == number_of_vectors_parsed)
591  {
592  return NULL;
593  }
594 
595  ex = examples_ring->get_unused_example();
596  number_of_vectors_read++;
597 
598  return ex;
599 }
600 
601 template <class T> int32_t CInputParser<T>::get_next_example(T* &fv,
602  int32_t &length, float64_t &label)
603 {
604  /* if reading is done, no more examples can be fetched. return 0
605  else, if example can be read, get the example and return 1.
606  otherwise, wait for further parsing, get the example and
607  return 1 */
608 
609  Example<T> *ex;
610 
611  while (1)
612  {
613  if (reading_done)
614  return 0;
615 
616  pthread_mutex_lock(&examples_state_lock);
617  ex = retrieve_example();
618 
619  if (ex == NULL)
620  {
621  if (reading_done)
622  {
623  /* No more examples left, return */
624  pthread_mutex_unlock(&examples_state_lock);
625  return 0;
626  }
627  else
628  {
629  /* Examples left, wait for one to become ready */
630  pthread_cond_wait(&examples_state_changed, &examples_state_lock);
631  pthread_mutex_unlock(&examples_state_lock);
632  continue;
633  }
634  }
635  else
636  {
637  /* Example ready, return the example */
638  pthread_mutex_unlock(&examples_state_lock);
639  break;
640  }
641  }
642 
643  fv = ex->fv;
644  length = ex->length;
645  label = ex->label;
646 
647  return 1;
648 }
649 
650 template <class T>
651  int32_t CInputParser<T>::get_next_example(T* &fv, int32_t &length)
652 {
653  float64_t label_dummy;
654 
655  return get_next_example(fv, length, label_dummy);
656 }
657 
658 template <class T>
659  void CInputParser<T>::finalize_example()
660 {
661  examples_ring->finalize_example(free_after_release);
662 }
663 
664 template <class T> void CInputParser<T>::end_parser()
665 {
666  SG_SDEBUG("entering CInputParser::end_parser\n")
667  SG_SDEBUG("joining parse thread\n")
668  pthread_join(parse_thread, NULL);
669  SG_SDEBUG("leaving CInputParser::end_parser\n")
670 }
671 
672 template <class T> void CInputParser<T>::exit_parser()
673 {
674  SG_SDEBUG("cancelling parse thread\n")
675  pthread_cancel(parse_thread);
676 }
677 }
678 
679 #endif /* HAVE_PTHREAD */
680 
681 #endif // __INPUTPARSER_H__
#define SG_REF(x)
Definition: SGObject.h:54
double float64_t
Definition: common.h:50
#define SG_UNREF(x)
Definition: SGObject.h:55
all of classes and functions are contained in the shogun namespace
Definition: class_list.h:18
#define SG_SDEBUG(...)
Definition: SGIO.h:168
#define SG_SERROR(...)
Definition: SGIO.h:179

SHOGUN Machine Learning Toolbox - Documentation