SHOGUN  4.1.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
InputParser.h
Go to the documentation of this file.
1 /*
2  * This program is free software; you can redistribute it and/or modify
3  * it under the terms of the GNU General Public License as published by
4  * the Free Software Foundation; either version 3 of the License, or
5  * (at your option) any later version.
6  *
7  * Written (W) 2011 Shashwat Lal Das
8  * Copyright (C) 2011 Berlin Institute of Technology and Max-Planck-Society
9  */
10 
11 #ifndef __INPUTPARSER_H__
12 #define __INPUTPARSER_H__
13 
14 #include <shogun/lib/config.h>
15 
16 #include <shogun/lib/common.h>
17 #ifdef HAVE_PTHREAD
18 
19 #include <shogun/io/SGIO.h>
22 #include <pthread.h>
23 
24 #define PARSER_DEFAULT_BUFFSIZE 100
25 
26 namespace shogun
27 {
30  enum E_EXAMPLE_TYPE
31  {
32  E_LABELLED = 1,
33  E_UNLABELLED = 2
34  };
35 
84 template <class T> class CInputParser
85 {
86 public:
87 
92  CInputParser();
93 
98  ~CInputParser();
99 
111  void init(CStreamingFile* input_file, bool is_labelled = true, int32_t size = PARSER_DEFAULT_BUFFSIZE);
112 
118  bool is_running();
119 
126  int32_t get_number_of_features() { return number_of_features; }
127 
139  void set_read_vector(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len));
140 
152  void set_read_vector_and_label(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len, float64_t &label));
153 
165  int32_t get_vector_and_label(T* &feature_vector,
166  int32_t &length,
167  float64_t &label);
168 
179  int32_t get_vector_only(T* &feature_vector, int32_t &length);
180 
187  void set_free_vector_after_release(bool free_vec);
188 
195  void set_free_vectors_on_destruct(bool destroy);
196 
202  void start_parser();
203 
212  void* main_parse_loop(void* params);
213 
214 
220  void copy_example_into_buffer(Example<T>* ex);
221 
228  Example<T>* retrieve_example();
229 
242  int32_t get_next_example(T* &feature_vector,
243  int32_t &length,
244  float64_t &label);
245 
254  int32_t get_next_example(T* &feature_vector,
255  int32_t &length);
256 
264  void finalize_example();
265 
270  void end_parser();
271 
274  void exit_parser();
275 
281  int32_t get_ring_size() { return ring_size; }
282 
283 private:
291  static void* parse_loop_entry_point(void* params);
292 
293 public:
294  bool parsing_done;
295  bool reading_done;
297  E_EXAMPLE_TYPE example_type;
299 protected:
306  void (CStreamingFile::*read_vector) (T* &vec, int32_t &len);
307 
314  void (CStreamingFile::*read_vector_and_label) (T* &vec, int32_t &len, float64_t &label);
315 
317  CStreamingFile* input_source;
318 
320  pthread_t parse_thread;
321 
323  CParseBuffer<T>* examples_ring;
324 
326  int32_t number_of_features;
327 
329  int32_t number_of_vectors_parsed;
330 
332  int32_t number_of_vectors_read;
333 
335  Example<T>* current_example;
336 
338  T* current_feature_vector;
339 
341  float64_t current_label;
342 
344  int32_t current_len;
345 
347  bool free_after_release;
348 
350  int32_t ring_size;
351 
353  pthread_mutex_t examples_state_lock;
354 
356  pthread_cond_t examples_state_changed;
357 
358 };
359 
360 template <class T>
361  void CInputParser<T>::set_read_vector(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len))
362 {
363  // Set read_vector to point to the function passed as arg
364  read_vector=func_ptr;
365 }
366 
367 template <class T>
368  void CInputParser<T>::set_read_vector_and_label(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len, float64_t &label))
369 {
370  // Set read_vector_and_label to point to the function passed as arg
371  read_vector_and_label=func_ptr;
372 }
373 
374 template <class T>
375  CInputParser<T>::CInputParser()
376 {
377  /* this line was commented out when I found it. However, the mutex locks
378  * have to be initialised. Otherwise uninitialised memory error */
379  //init(NULL, true, PARSER_DEFAULT_BUFFSIZE);
380  pthread_mutex_init(&examples_state_lock, NULL);
381  pthread_cond_init(&examples_state_changed, NULL);
382  examples_ring=NULL;
383  parsing_done=true;
384  reading_done=true;
385 }
386 
387 template <class T>
388  CInputParser<T>::~CInputParser()
389 {
390  pthread_mutex_destroy(&examples_state_lock);
391  pthread_cond_destroy(&examples_state_changed);
392 
393  SG_UNREF(examples_ring);
394 }
395 
396 template <class T>
397  void CInputParser<T>::init(CStreamingFile* input_file, bool is_labelled, int32_t size)
398 {
399  input_source = input_file;
400 
401  if (is_labelled == true)
402  example_type = E_LABELLED;
403  else
404  example_type = E_UNLABELLED;
405 
406  examples_ring = new CParseBuffer<T>(size);
407  SG_REF(examples_ring);
408 
409  parsing_done = false;
410  reading_done = false;
411  number_of_vectors_parsed = 0;
412  number_of_vectors_read = 0;
413 
414  current_len = -1;
415  current_label = -1;
416  current_feature_vector = NULL;
417 
418  free_after_release=true;
419  ring_size=size;
420 }
421 
422 template <class T>
423  void CInputParser<T>::set_free_vector_after_release(bool free_vec)
424 {
425  free_after_release=free_vec;
426 }
427 
428 template <class T>
429  void CInputParser<T>::set_free_vectors_on_destruct(bool destroy)
430 {
431  examples_ring->set_free_vectors_on_destruct(destroy);
432 }
433 
434 template <class T>
435  void CInputParser<T>::start_parser()
436 {
437  SG_SDEBUG("entering CInputParser::start_parser()\n")
438  if (is_running())
439  {
440  SG_SERROR("Parser thread is already running! Multiple parse threads not supported.\n")
441  }
442 
443  SG_SDEBUG("creating parse thread\n")
444  pthread_create(&parse_thread, NULL, parse_loop_entry_point, this);
445 
446  SG_SDEBUG("leaving CInputParser::start_parser()\n")
447 }
448 
449 template <class T>
450  void* CInputParser<T>::parse_loop_entry_point(void* params)
451 {
452  ((CInputParser *) params)->main_parse_loop(params);
453 
454  return NULL;
455 }
456 
457 template <class T>
458  bool CInputParser<T>::is_running()
459 {
460  SG_SDEBUG("entering CInputParser::is_running()\n")
461  bool ret;
462 
463  pthread_mutex_lock(&examples_state_lock);
464 
465  if (parsing_done)
466  if (reading_done)
467  ret = false;
468  else
469  ret = true;
470  else
471  ret = false;
472 
473  pthread_mutex_unlock(&examples_state_lock);
474 
475  SG_SDEBUG("leaving CInputParser::is_running(), returning %d\n", ret)
476  return ret;
477 }
478 
479 template <class T>
480  int32_t CInputParser<T>::get_vector_and_label(T* &feature_vector,
481  int32_t &length,
482  float64_t &label)
483 {
484  (input_source->*read_vector_and_label)(feature_vector, length, label);
485 
486  if (length < 1)
487  {
488  // Problem reading the example
489  return 0;
490  }
491 
492  return 1;
493 }
494 
495 template <class T>
496  int32_t CInputParser<T>::get_vector_only(T* &feature_vector,
497  int32_t &length)
498 {
499  (input_source->*read_vector)(feature_vector, length);
500 
501  if (length < 1)
502  {
503  // Problem reading the example
504  return 0;
505  }
506 
507  return 1;
508 }
509 
510 template <class T>
511  void CInputParser<T>::copy_example_into_buffer(Example<T>* ex)
512 {
513  examples_ring->copy_example(ex);
514 }
515 
516 template <class T> void* CInputParser<T>::main_parse_loop(void* params)
517 {
518  // Read the examples into current_* objects
519  // Instead of allocating mem for new objects each time
520 #ifdef HAVE_PTHREAD
521  CInputParser* this_obj = (CInputParser *) params;
522  this->input_source = this_obj->input_source;
523 
524  while (1)
525  {
526  pthread_mutex_lock(&examples_state_lock);
527  if (parsing_done)
528  {
529  pthread_mutex_unlock(&examples_state_lock);
530  return NULL;
531  }
532  pthread_mutex_unlock(&examples_state_lock);
533 
534  pthread_testcancel();
535 
536  current_example = examples_ring->get_free_example();
537  current_feature_vector = current_example->fv;
538  current_len = current_example->length;
539  current_label = current_example->label;
540 
541  if (example_type == E_LABELLED)
542  get_vector_and_label(current_feature_vector, current_len, current_label);
543  else
544  get_vector_only(current_feature_vector, current_len);
545 
546  if (current_len < 0)
547  {
548  pthread_mutex_lock(&examples_state_lock);
549  parsing_done = true;
550  pthread_cond_signal(&examples_state_changed);
551  pthread_mutex_unlock(&examples_state_lock);
552  return NULL;
553  }
554 
555  current_example->label = current_label;
556  current_example->fv = current_feature_vector;
557  current_example->length = current_len;
558 
559  examples_ring->copy_example(current_example);
560 
561  pthread_mutex_lock(&examples_state_lock);
562  number_of_vectors_parsed++;
563  pthread_cond_signal(&examples_state_changed);
564  pthread_mutex_unlock(&examples_state_lock);
565  }
566 #endif /* HAVE_PTHREAD */
567  return NULL;
568 }
569 
570 template <class T> Example<T>* CInputParser<T>::retrieve_example()
571 {
572  /* This function should be guarded by mutexes while calling */
573  Example<T> *ex;
574 
575  if (parsing_done)
576  {
577  if (number_of_vectors_read == number_of_vectors_parsed)
578  {
579  reading_done = true;
580  /* Signal to waiting threads that no more examples are left */
581  pthread_cond_signal(&examples_state_changed);
582  return NULL;
583  }
584  }
585 
586  if (number_of_vectors_parsed <= 0)
587  return NULL;
588 
589  if (number_of_vectors_read == number_of_vectors_parsed)
590  {
591  return NULL;
592  }
593 
594  ex = examples_ring->get_unused_example();
595  number_of_vectors_read++;
596 
597  return ex;
598 }
599 
600 template <class T> int32_t CInputParser<T>::get_next_example(T* &fv,
601  int32_t &length, float64_t &label)
602 {
603  /* if reading is done, no more examples can be fetched. return 0
604  else, if example can be read, get the example and return 1.
605  otherwise, wait for further parsing, get the example and
606  return 1 */
607 
608  Example<T> *ex;
609 
610  while (1)
611  {
612  if (reading_done)
613  return 0;
614 
615  pthread_mutex_lock(&examples_state_lock);
616  ex = retrieve_example();
617 
618  if (ex == NULL)
619  {
620  if (reading_done)
621  {
622  /* No more examples left, return */
623  pthread_mutex_unlock(&examples_state_lock);
624  return 0;
625  }
626  else
627  {
628  /* Examples left, wait for one to become ready */
629  pthread_cond_wait(&examples_state_changed, &examples_state_lock);
630  pthread_mutex_unlock(&examples_state_lock);
631  continue;
632  }
633  }
634  else
635  {
636  /* Example ready, return the example */
637  pthread_mutex_unlock(&examples_state_lock);
638  break;
639  }
640  }
641 
642  fv = ex->fv;
643  length = ex->length;
644  label = ex->label;
645 
646  return 1;
647 }
648 
649 template <class T>
650  int32_t CInputParser<T>::get_next_example(T* &fv, int32_t &length)
651 {
652  float64_t label_dummy;
653 
654  return get_next_example(fv, length, label_dummy);
655 }
656 
657 template <class T>
658  void CInputParser<T>::finalize_example()
659 {
660  examples_ring->finalize_example(free_after_release);
661 }
662 
663 template <class T> void CInputParser<T>::end_parser()
664 {
665  SG_SDEBUG("entering CInputParser::end_parser\n")
666  SG_SDEBUG("joining parse thread\n")
667  pthread_join(parse_thread, NULL);
668  SG_SDEBUG("leaving CInputParser::end_parser\n")
669 }
670 
671 template <class T> void CInputParser<T>::exit_parser()
672 {
673  SG_SDEBUG("cancelling parse thread\n")
674  pthread_cancel(parse_thread);
675 }
676 }
677 
678 #endif /* HAVE_PTHREAD */
679 
680 #endif // __INPUTPARSER_H__
#define SG_REF(x)
Definition: SGObject.h:51
double float64_t
Definition: common.h:50
#define SG_UNREF(x)
Definition: SGObject.h:52
all of classes and functions are contained in the shogun namespace
Definition: class_list.h:18
#define SG_SDEBUG(...)
Definition: SGIO.h:168
#define SG_SERROR(...)
Definition: SGIO.h:179

SHOGUN Machine Learning Toolbox - Documentation