SHOGUN  v3.0.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
InputParser.h
Go to the documentation of this file.
1 /*
2  * This program is free software; you can redistribute it and/or modify
3  * it under the terms of the GNU General Public License as published by
4  * the Free Software Foundation; either version 3 of the License, or
5  * (at your option) any later version.
6  *
7  * Written (W) 2011 Shashwat Lal Das
8  * Copyright (C) 2011 Berlin Institute of Technology and Max-Planck-Society
9  */
10 
11 #ifndef __INPUTPARSER_H__
12 #define __INPUTPARSER_H__
13 
14 #include <shogun/lib/common.h>
15 #ifdef HAVE_PTHREAD
16 
17 #include <shogun/io/SGIO.h>
20 #include <pthread.h>
21 
22 #define PARSER_DEFAULT_BUFFSIZE 100
23 
24 namespace shogun
25 {
28  enum E_EXAMPLE_TYPE
29  {
30  E_LABELLED = 1,
31  E_UNLABELLED = 2
32  };
33 
82 template <class T> class CInputParser
83 {
84 public:
85 
90  CInputParser();
91 
96  ~CInputParser();
97 
109  void init(CStreamingFile* input_file, bool is_labelled = true, int32_t size = PARSER_DEFAULT_BUFFSIZE);
110 
116  bool is_running();
117 
124  int32_t get_number_of_features() { return number_of_features; }
125 
137  void set_read_vector(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len));
138 
150  void set_read_vector_and_label(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len, float64_t &label));
151 
163  int32_t get_vector_and_label(T* &feature_vector,
164  int32_t &length,
165  float64_t &label);
166 
177  int32_t get_vector_only(T* &feature_vector, int32_t &length);
178 
185  void set_free_vector_after_release(bool free_vec);
186 
193  void set_free_vectors_on_destruct(bool destroy);
194 
200  void start_parser();
201 
210  void* main_parse_loop(void* params);
211 
212 
218  void copy_example_into_buffer(Example<T>* ex);
219 
226  Example<T>* retrieve_example();
227 
240  int32_t get_next_example(T* &feature_vector,
241  int32_t &length,
242  float64_t &label);
243 
252  int32_t get_next_example(T* &feature_vector,
253  int32_t &length);
254 
262  void finalize_example();
263 
268  void end_parser();
269 
272  void exit_parser();
273 
279  int32_t get_ring_size() { return ring_size; }
280 
281 private:
289  static void* parse_loop_entry_point(void* params);
290 
291 public:
292  bool parsing_done;
293  bool reading_done;
295  E_EXAMPLE_TYPE example_type;
297 protected:
304  void (CStreamingFile::*read_vector) (T* &vec, int32_t &len);
305 
312  void (CStreamingFile::*read_vector_and_label) (T* &vec, int32_t &len, float64_t &label);
313 
315  CStreamingFile* input_source;
316 
318  pthread_t parse_thread;
319 
321  CParseBuffer<T>* examples_ring;
322 
324  int32_t number_of_features;
325 
327  int32_t number_of_vectors_parsed;
328 
330  int32_t number_of_vectors_read;
331 
333  Example<T>* current_example;
334 
336  T* current_feature_vector;
337 
339  float64_t current_label;
340 
342  int32_t current_len;
343 
345  bool free_after_release;
346 
348  int32_t ring_size;
349 
351  pthread_mutex_t examples_state_lock;
352 
354  pthread_cond_t examples_state_changed;
355 
356 };
357 
358 template <class T>
359  void CInputParser<T>::set_read_vector(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len))
360 {
361  // Set read_vector to point to the function passed as arg
362  read_vector=func_ptr;
363 }
364 
365 template <class T>
366  void CInputParser<T>::set_read_vector_and_label(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len, float64_t &label))
367 {
368  // Set read_vector_and_label to point to the function passed as arg
369  read_vector_and_label=func_ptr;
370 }
371 
372 template <class T>
373  CInputParser<T>::CInputParser()
374 {
375  /* this line was commented out when I found it. However, the mutex locks
376  * have to be initialised. Otherwise uninitialised memory error */
377  //init(NULL, true, PARSER_DEFAULT_BUFFSIZE);
378  pthread_mutex_init(&examples_state_lock, NULL);
379  pthread_cond_init(&examples_state_changed, NULL);
380  examples_ring=NULL;
381  parsing_done=true;
382  reading_done=true;
383 }
384 
385 template <class T>
386  CInputParser<T>::~CInputParser()
387 {
388  pthread_mutex_destroy(&examples_state_lock);
389  pthread_cond_destroy(&examples_state_changed);
390 
391  SG_UNREF(examples_ring);
392 }
393 
394 template <class T>
395  void CInputParser<T>::init(CStreamingFile* input_file, bool is_labelled, int32_t size)
396 {
397  input_source = input_file;
398 
399  if (is_labelled == true)
400  example_type = E_LABELLED;
401  else
402  example_type = E_UNLABELLED;
403 
404  examples_ring = new CParseBuffer<T>(size);
405  SG_REF(examples_ring);
406 
407  parsing_done = false;
408  reading_done = false;
409  number_of_vectors_parsed = 0;
410  number_of_vectors_read = 0;
411 
412  current_len = -1;
413  current_label = -1;
414  current_feature_vector = NULL;
415 
416  free_after_release=true;
417  ring_size=size;
418 }
419 
420 template <class T>
421  void CInputParser<T>::set_free_vector_after_release(bool free_vec)
422 {
423  free_after_release=free_vec;
424 }
425 
426 template <class T>
427  void CInputParser<T>::set_free_vectors_on_destruct(bool destroy)
428 {
429  examples_ring->set_free_vectors_on_destruct(destroy);
430 }
431 
432 template <class T>
433  void CInputParser<T>::start_parser()
434 {
435  SG_SDEBUG("entering CInputParser::start_parser()\n")
436  if (is_running())
437  {
438  SG_SERROR("Parser thread is already running! Multiple parse threads not supported.\n")
439  }
440 
441  SG_SDEBUG("creating parse thread\n")
442  pthread_create(&parse_thread, NULL, parse_loop_entry_point, this);
443 
444  SG_SDEBUG("leaving CInputParser::start_parser()\n")
445 }
446 
447 template <class T>
448  void* CInputParser<T>::parse_loop_entry_point(void* params)
449 {
450  ((CInputParser *) params)->main_parse_loop(params);
451 
452  return NULL;
453 }
454 
455 template <class T>
456  bool CInputParser<T>::is_running()
457 {
458  SG_SDEBUG("entering CInputParser::is_running()\n")
459  bool ret;
460 
461  pthread_mutex_lock(&examples_state_lock);
462 
463  if (parsing_done)
464  if (reading_done)
465  ret = false;
466  else
467  ret = true;
468  else
469  ret = false;
470 
471  pthread_mutex_unlock(&examples_state_lock);
472 
473  SG_SDEBUG("leaving CInputParser::is_running(), returning %d\n", ret)
474  return ret;
475 }
476 
477 template <class T>
478  int32_t CInputParser<T>::get_vector_and_label(T* &feature_vector,
479  int32_t &length,
480  float64_t &label)
481 {
482  (input_source->*read_vector_and_label)(feature_vector, length, label);
483 
484  if (length < 1)
485  {
486  // Problem reading the example
487  return 0;
488  }
489 
490  return 1;
491 }
492 
493 template <class T>
494  int32_t CInputParser<T>::get_vector_only(T* &feature_vector,
495  int32_t &length)
496 {
497  (input_source->*read_vector)(feature_vector, length);
498 
499  if (length < 1)
500  {
501  // Problem reading the example
502  return 0;
503  }
504 
505  return 1;
506 }
507 
508 template <class T>
509  void CInputParser<T>::copy_example_into_buffer(Example<T>* ex)
510 {
511  examples_ring->copy_example(ex);
512 }
513 
514 template <class T> void* CInputParser<T>::main_parse_loop(void* params)
515 {
516  // Read the examples into current_* objects
517  // Instead of allocating mem for new objects each time
518 #ifdef HAVE_PTHREAD
519  CInputParser* this_obj = (CInputParser *) params;
520  this->input_source = this_obj->input_source;
521 
522  while (1)
523  {
524  pthread_mutex_lock(&examples_state_lock);
525  if (parsing_done)
526  {
527  pthread_mutex_unlock(&examples_state_lock);
528  return NULL;
529  }
530  pthread_mutex_unlock(&examples_state_lock);
531 
532  pthread_testcancel();
533 
534  current_example = examples_ring->get_free_example();
535  current_feature_vector = current_example->fv;
536  current_len = current_example->length;
537  current_label = current_example->label;
538 
539  if (example_type == E_LABELLED)
540  get_vector_and_label(current_feature_vector, current_len, current_label);
541  else
542  get_vector_only(current_feature_vector, current_len);
543 
544  if (current_len < 0)
545  {
546  pthread_mutex_lock(&examples_state_lock);
547  parsing_done = true;
548  pthread_cond_signal(&examples_state_changed);
549  pthread_mutex_unlock(&examples_state_lock);
550  return NULL;
551  }
552 
553  current_example->label = current_label;
554  current_example->fv = current_feature_vector;
555  current_example->length = current_len;
556 
557  examples_ring->copy_example(current_example);
558 
559  pthread_mutex_lock(&examples_state_lock);
560  number_of_vectors_parsed++;
561  pthread_cond_signal(&examples_state_changed);
562  pthread_mutex_unlock(&examples_state_lock);
563  }
564 #endif /* HAVE_PTHREAD */
565  return NULL;
566 }
567 
568 template <class T> Example<T>* CInputParser<T>::retrieve_example()
569 {
570  /* This function should be guarded by mutexes while calling */
571  Example<T> *ex;
572 
573  if (parsing_done)
574  {
575  if (number_of_vectors_read == number_of_vectors_parsed)
576  {
577  reading_done = true;
578  /* Signal to waiting threads that no more examples are left */
579  pthread_cond_signal(&examples_state_changed);
580  return NULL;
581  }
582  }
583 
584  if (number_of_vectors_parsed <= 0)
585  return NULL;
586 
587  if (number_of_vectors_read == number_of_vectors_parsed)
588  {
589  return NULL;
590  }
591 
592  ex = examples_ring->get_unused_example();
593  number_of_vectors_read++;
594 
595  return ex;
596 }
597 
598 template <class T> int32_t CInputParser<T>::get_next_example(T* &fv,
599  int32_t &length, float64_t &label)
600 {
601  /* if reading is done, no more examples can be fetched. return 0
602  else, if example can be read, get the example and return 1.
603  otherwise, wait for further parsing, get the example and
604  return 1 */
605 
606  Example<T> *ex;
607 
608  while (1)
609  {
610  if (reading_done)
611  return 0;
612 
613  pthread_mutex_lock(&examples_state_lock);
614  ex = retrieve_example();
615 
616  if (ex == NULL)
617  {
618  if (reading_done)
619  {
620  /* No more examples left, return */
621  pthread_mutex_unlock(&examples_state_lock);
622  return 0;
623  }
624  else
625  {
626  /* Examples left, wait for one to become ready */
627  pthread_cond_wait(&examples_state_changed, &examples_state_lock);
628  pthread_mutex_unlock(&examples_state_lock);
629  continue;
630  }
631  }
632  else
633  {
634  /* Example ready, return the example */
635  pthread_mutex_unlock(&examples_state_lock);
636  break;
637  }
638  }
639 
640  fv = ex->fv;
641  length = ex->length;
642  label = ex->label;
643 
644  return 1;
645 }
646 
647 template <class T>
648  int32_t CInputParser<T>::get_next_example(T* &fv, int32_t &length)
649 {
650  float64_t label_dummy;
651 
652  return get_next_example(fv, length, label_dummy);
653 }
654 
655 template <class T>
656  void CInputParser<T>::finalize_example()
657 {
658  examples_ring->finalize_example(free_after_release);
659 }
660 
661 template <class T> void CInputParser<T>::end_parser()
662 {
663  SG_SDEBUG("entering CInputParser::end_parser\n")
664  SG_SDEBUG("joining parse thread\n")
665  pthread_join(parse_thread, NULL);
666  SG_SDEBUG("leaving CInputParser::end_parser\n")
667 }
668 
669 template <class T> void CInputParser<T>::exit_parser()
670 {
671  SG_SDEBUG("cancelling parse thread\n")
672  pthread_cancel(parse_thread);
673 }
674 }
675 
676 #endif /* HAVE_PTHREAD */
677 
678 #endif // __INPUTPARSER_H__

SHOGUN Machine Learning Toolbox - Documentation