SHOGUN  v2.0.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
InputParser.h
Go to the documentation of this file.
1 /*
2  * This program is free software; you can redistribute it and/or modify
3  * it under the terms of the GNU General Public License as published by
4  * the Free Software Foundation; either version 3 of the License, or
5  * (at your option) any later version.
6  *
7  * Written (W) 2011 Shashwat Lal Das
8  * Copyright (C) 2011 Berlin Institute of Technology and Max-Planck-Society
9  */
10 
11 #ifndef __INPUTPARSER_H__
12 #define __INPUTPARSER_H__
13 
14 #include <shogun/io/SGIO.h>
16 #include <shogun/lib/common.h>
18 #include <pthread.h>
19 
20 #define PARSER_DEFAULT_BUFFSIZE 100
21 
22 namespace shogun
23 {
27  {
30  };
31 
80 template <class T> class CInputParser
81 {
82 public:
83 
88  CInputParser();
89 
94  ~CInputParser();
95 
107  void init(CStreamingFile* input_file, bool is_labelled = true, int32_t size = PARSER_DEFAULT_BUFFSIZE);
108 
114  bool is_running();
115 
123 
135  void set_read_vector(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len));
136 
148  void set_read_vector_and_label(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len, float64_t &label));
149 
161  int32_t get_vector_and_label(T* &feature_vector,
162  int32_t &length,
163  float64_t &label);
164 
175  int32_t get_vector_only(T* &feature_vector, int32_t &length);
176 
183  void set_free_vector_after_release(bool free_vec);
184 
191  void set_free_vectors_on_destruct(bool destroy);
192 
198  void start_parser();
199 
208  void* main_parse_loop(void* params);
209 
210 
217 
225 
238  int32_t get_next_example(T* &feature_vector,
239  int32_t &length,
240  float64_t &label);
241 
250  int32_t get_next_example(T* &feature_vector,
251  int32_t &length);
252 
260  void finalize_example();
261 
266  void end_parser();
267 
270  void exit_parser();
271 
277  int32_t get_ring_size() { return ring_size; }
278 
279 private:
287  static void* parse_loop_entry_point(void* params);
288 
289 public:
295 protected:
302  void (CStreamingFile::*read_vector) (T* &vec, int32_t &len);
303 
310  void (CStreamingFile::*read_vector_and_label) (T* &vec, int32_t &len, float64_t &label);
311 
314 
316  pthread_t parse_thread;
317 
320 
323 
326 
329 
332 
335 
338 
340  int32_t current_len;
341 
344 
346  int32_t ring_size;
347 
349  pthread_mutex_t examples_state_lock;
350 
352  pthread_cond_t examples_state_changed;
353 };
354 
355 template <class T>
356  void CInputParser<T>::set_read_vector(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len))
357 {
358  // Set read_vector to point to the function passed as arg
359  read_vector=func_ptr;
360 }
361 
362 template <class T>
363  void CInputParser<T>::set_read_vector_and_label(void (CStreamingFile::*func_ptr)(T* &vec, int32_t &len, float64_t &label))
364 {
365  // Set read_vector_and_label to point to the function passed as arg
366  read_vector_and_label=func_ptr;
367 }
368 
369 template <class T>
371 {
372  //init(NULL, true, PARSER_DEFAULT_BUFFSIZE);
373 }
374 
375 template <class T>
377 {
378  end_parser();
379 
380  pthread_mutex_destroy(&examples_state_lock);
381  pthread_cond_destroy(&examples_state_changed);
382 
383  delete examples_ring;
384 }
385 
386 template <class T>
387  void CInputParser<T>::init(CStreamingFile* input_file, bool is_labelled, int32_t size)
388 {
389  input_source = input_file;
390 
391  if (is_labelled == true)
392  example_type = E_LABELLED;
393  else
394  example_type = E_UNLABELLED;
395 
396  examples_ring = new CParseBuffer<T>(size);
397 
398  parsing_done = false;
399  reading_done = false;
400  number_of_vectors_parsed = 0;
401  number_of_vectors_read = 0;
402 
403  current_len = -1;
404  current_label = -1;
405  current_feature_vector = NULL;
406 
407  free_after_release=true;
408  ring_size=size;
409 
410  pthread_mutex_init(&examples_state_lock, NULL);
411  pthread_cond_init(&examples_state_changed, NULL);
412 }
413 
414 template <class T>
416 {
417  free_after_release=free_vec;
418 }
419 
420 template <class T>
422 {
423  examples_ring->set_free_vectors_on_destruct(destroy);
424 }
425 
426 template <class T>
428 {
429  if (is_running())
430  {
431  SG_SERROR("Parser thread is already running! Multiple parse threads not supported.\n");
432  }
433 
434  pthread_create(&parse_thread, NULL, parse_loop_entry_point, this);
435 }
436 
437 template <class T>
438  void* CInputParser<T>::parse_loop_entry_point(void* params)
439 {
440  ((CInputParser *) params)->main_parse_loop(params);
441 
442  return NULL;
443 }
444 
445 template <class T>
447 {
448  bool ret;
449 
450  pthread_mutex_lock(&examples_state_lock);
451 
452  if (parsing_done)
453  if (reading_done)
454  ret = false;
455  else
456  ret = true;
457  else
458  ret = false;
459 
460  pthread_mutex_unlock(&examples_state_lock);
461  return ret;
462 }
463 
464 template <class T>
465  int32_t CInputParser<T>::get_vector_and_label(T* &feature_vector,
466  int32_t &length,
467  float64_t &label)
468 {
469  (input_source->*read_vector_and_label)(feature_vector, length, label);
470 
471  if (length < 1)
472  {
473  // Problem reading the example
474  return 0;
475  }
476 
477  return 1;
478 }
479 
480 template <class T>
481  int32_t CInputParser<T>::get_vector_only(T* &feature_vector,
482  int32_t &length)
483 {
484  (input_source->*read_vector)(feature_vector, length);
485 
486  if (length < 1)
487  {
488  // Problem reading the example
489  return 0;
490  }
491 
492  return 1;
493 }
494 
495 template <class T>
497 {
498  examples_ring->copy_example(ex);
499 }
500 
501 template <class T> void* CInputParser<T>::main_parse_loop(void* params)
502 {
503  // Read the examples into current_* objects
504  // Instead of allocating mem for new objects each time
505 #ifdef HAVE_PTHREAD
506  CInputParser* this_obj = (CInputParser *) params;
507  this->input_source = this_obj->input_source;
508 
509  while (1)
510  {
511  pthread_mutex_lock(&examples_state_lock);
512  if (parsing_done)
513  {
514  pthread_mutex_unlock(&examples_state_lock);
515  return NULL;
516  }
517  pthread_mutex_unlock(&examples_state_lock);
518 
519  pthread_testcancel();
520 
521  current_example = examples_ring->get_free_example();
522  current_feature_vector = current_example->fv.vector;
523  current_len = current_example->fv.vlen;
524  current_label = current_example->label;
525 
526  if (example_type == E_LABELLED)
527  get_vector_and_label(current_feature_vector, current_len, current_label);
528  else
529  get_vector_only(current_feature_vector, current_len);
530 
531  if (current_len < 0)
532  {
533  pthread_mutex_lock(&examples_state_lock);
534  parsing_done = true;
535  pthread_cond_signal(&examples_state_changed);
536  pthread_mutex_unlock(&examples_state_lock);
537  return NULL;
538  }
539 
540  current_example->label = current_label;
541  current_example->fv.vector = current_feature_vector;
542  current_example->fv.vlen = current_len;
543 
544  examples_ring->copy_example(current_example);
545 
546  pthread_mutex_lock(&examples_state_lock);
547  number_of_vectors_parsed++;
548  pthread_cond_signal(&examples_state_changed);
549  pthread_mutex_unlock(&examples_state_lock);
550  }
551 #endif /* HAVE_PTHREAD */
552  return NULL;
553 }
554 
556 {
557  /* This function should be guarded by mutexes while calling */
558  Example<T> *ex;
559 
560  if (parsing_done)
561  {
562  if (number_of_vectors_read == number_of_vectors_parsed)
563  {
564  reading_done = true;
565  /* Signal to waiting threads that no more examples are left */
566  pthread_cond_signal(&examples_state_changed);
567  return NULL;
568  }
569  }
570 
571  if (number_of_vectors_parsed <= 0)
572  return NULL;
573 
574  if (number_of_vectors_read == number_of_vectors_parsed)
575  {
576  return NULL;
577  }
578 
579  ex = examples_ring->get_unused_example();
580  number_of_vectors_read++;
581 
582  return ex;
583 }
584 
585 template <class T> int32_t CInputParser<T>::get_next_example(T* &fv,
586  int32_t &length, float64_t &label)
587 {
588  /* if reading is done, no more examples can be fetched. return 0
589  else, if example can be read, get the example and return 1.
590  otherwise, wait for further parsing, get the example and
591  return 1 */
592 
593  Example<T> *ex;
594 
595  while (1)
596  {
597  if (reading_done)
598  return 0;
599 
600  pthread_mutex_lock(&examples_state_lock);
601  ex = retrieve_example();
602 
603  if (ex == NULL)
604  {
605  if (reading_done)
606  {
607  /* No more examples left, return */
608  pthread_mutex_unlock(&examples_state_lock);
609  return 0;
610  }
611  else
612  {
613  /* Examples left, wait for one to become ready */
614  pthread_cond_wait(&examples_state_changed, &examples_state_lock);
615  pthread_mutex_unlock(&examples_state_lock);
616  continue;
617  }
618  }
619  else
620  {
621  /* Example ready, return the example */
622  pthread_mutex_unlock(&examples_state_lock);
623  break;
624  }
625  }
626 
627  fv = ex->fv.vector;
628  length = ex->fv.vlen;
629  label = ex->label;
630 
631  return 1;
632 }
633 
634 template <class T>
635  int32_t CInputParser<T>::get_next_example(T* &fv, int32_t &length)
636 {
637  float64_t label_dummy;
638 
639  return get_next_example(fv, length, label_dummy);
640 }
641 
642 template <class T>
644 {
645  examples_ring->finalize_example(free_after_release);
646 }
647 
648 template <class T> void CInputParser<T>::end_parser()
649 {
650  pthread_join(parse_thread, NULL);
651 }
652 
653 template <class T> void CInputParser<T>::exit_parser()
654 {
655  pthread_cancel(parse_thread);
656 }
657 }
658 #endif // __INPUTPARSER_H__

SHOGUN Machine Learning Toolbox - Documentation