kaldi-thread.h
Go to the documentation of this file.
1 // util/kaldi-thread.h
2 
3 // Copyright 2012 Johns Hopkins University (Author: Daniel Povey)
4 // Frantisek Skala
5 // 2017 University of Southern California (Author: Dogan Can)
6 
7 // See ../../COPYING for clarification regarding multiple authors
8 //
9 // Licensed under the Apache License, Version 2.0 (the "License");
10 // you may not use this file except in compliance with the License.
11 // You may obtain a copy of the License at
12 //
13 // http://www.apache.org/licenses/LICENSE-2.0
14 //
15 // THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16 // KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED
17 // WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE,
18 // MERCHANTABLITY OR NON-INFRINGEMENT.
19 // See the Apache 2 License for the specific language governing permissions and
20 // limitations under the License.
21 
22 #ifndef KALDI_THREAD_KALDI_THREAD_H_
23 #define KALDI_THREAD_KALDI_THREAD_H_ 1
24 
25 #include <thread>
26 #include <algorithm>
27 #include "itf/options-itf.h"
28 #include "util/kaldi-semaphore.h"
29 
30 // This header provides convenient mechanisms for parallelization.
31 //
32 // The class MultiThreader, and the function RunMultiThreaded provide a
33 // mechanism to run a specified number of jobs in parellel and wait for them
34 // all to finish. They accept objects of some class C that derives from the
35 // base class MultiThreadable. C needs to define the operator () that takes
36 // no arguments. See ExampleClass below.
37 //
38 // The class TaskSequencer addresses a different problem typically encountered
39 // in Kaldi command-line programs that process a sequence of items. The items
40 // to be processed are coming in. They are all of different sizes, e.g.
41 // utterances with different numbers of frames. We would like them to be
42 // processed in parallel to make good use of the threads available but they
43 // must be output in the same order they came in. Here, we again accept objects
44 // of some class C with an operator () that takes no arguments. C may also have
45 // a destructor with side effects (typically some kind of output).
46 // TaskSequencer is responsible for running the jobs in parallel. It has a
47 // function Run() that will accept a new object of class C; this will block
48 // until a thread is free, at which time it will spawn a thread that starts
49 // running the operator () of the object. When threads are finished running,
50 // the objects will be deleted. TaskSequencer guarantees that the destructors
51 // will be called sequentially (not in parallel) and in the same order the
52 // objects were given to the Run() function, so that it is safe for the
53 // destructor to have side effects such as outputting data.
54 // Note: the destructor of TaskSequencer will wait for any remaining jobs that
55 // are still running and will call the destructors.
56 
57 
58 namespace kaldi {
59 
60 extern int32 g_num_threads; // Maximum number of threads (for programs that
61 // use threads, which is not many of them, e.g. the SGMM update program does.
62 // This is 8 by default. You can change this on the command line, where
63 // used, with --num-threads. Programs that think they will use threads
64 // should register it with their ParseOptions, as something like:
65 // po.Register("num-threads", &g_num_threads, "Number of threads to use.");
66 
68  // To create a function object that does part of the job, inherit from this
69  // class, implement a copy constructor calling the default copy constructor
70  // of this base class (so that thread_id_ and num_threads_ are copied to new
71  // instances), and finally implement the operator() that does part of the job
72  // based on thread_id_ and num_threads_ variables.
73  // Note: example implementations are in util/kaldi-thread-test.cc
74  public:
75  virtual void operator() () = 0;
76  // Does the main function of the class
77  // Subclasses have to redefine this
78  virtual ~MultiThreadable();
79  // Optional destructor. Note: the destructor of the object passed by the user
80  // will also be called, so watch out.
81 
82  public:
83  // Do not redeclare thread_id_ and num_threads_ in derived classes.
84  int32 thread_id_; // 0 <= thread_id_ < num_threads_
86 
87  private:
88  // Have additional member variables as needed.
89 };
90 
91 
93  public:
94  ExampleClass(int32 *foo); // Typically there will be an initializer that
95  // takes arguments.
96 
97  ExampleClass(const ExampleClass &other); // A copy constructor is also needed;
98  // some example classes use the default version of this.
99 
100  void operator() () {
101  // Does the main function of the class. This
102  // function will typically want to look at the values of the
103  // member variables thread_id_ and num_threads_, inherited
104  // from MultiThreadable.
105  }
107  // Optional destructor. Sometimes useful things happen here,
108  // for example summing up of certain quantities. See code
109  // that uses RunMultiThreaded for examples.
110  }
111  private:
112  // Have additional member variables as needed.
113 };
114 
115 
116 template<class C>
118  public:
119  MultiThreader(int32 num_threads, const C &c_in) :
120  threads_(std::max<int32>(1, num_threads)),
121  cvec_(std::max<int32>(1, num_threads), c_in) {
122  if (num_threads == 0) {
123  // This is a special case with num_threads == 0, which behaves like with
124  // num_threads == 1 but without creating extra threads. This can be
125  // useful in GPU computations where threads cannot be used.
126  cvec_[0].thread_id_ = 0;
127  cvec_[0].num_threads_ = 1;
128  (cvec_[0])();
129  } else {
130  for (int32 i = 0; i < threads_.size(); i++) {
131  cvec_[i].thread_id_ = i;
132  cvec_[i].num_threads_ = threads_.size();
133  threads_[i] = std::thread(std::ref(cvec_[i]));
134  }
135  }
136  }
138  for (size_t i = 0; i < threads_.size(); i++)
139  if (threads_[i].joinable())
140  threads_[i].join();
141  }
142  private:
143  std::vector<std::thread> threads_;
144  std::vector<C> cvec_;
145 };
146 
151 template<class C> void RunMultiThreaded(const C &c_in) {
152  MultiThreader<C> m(g_num_threads, c_in);
153 }
154 
155 
159  TaskSequencerConfig(): num_threads(1), num_threads_total(0) { }
160  void Register(OptionsItf *opts) {
161  opts->Register("num-threads", &num_threads, "Number of actively processing "
162  "threads to run in parallel");
163  opts->Register("num-threads-total", &num_threads_total, "Total number of "
164  "threads, including those that are waiting on other threads "
165  "to produce their output. Controls memory use. If <= 0, "
166  "defaults to --num-threads plus 20. Otherwise, must "
167  "be >= num-threads.");
168  }
169 };
170 
171 // C should have an operator () taking no arguments, that does some kind
172 // of computation, and a destructor that produces some kind of output (the
173 // destructors will be run sequentially in the same order Run as called.
174 template<class C>
176  public:
178  num_threads_(config.num_threads),
179  threads_avail_(config.num_threads),
180  tot_threads_avail_(config.num_threads_total > 0 ? config.num_threads_total :
181  config.num_threads + 20),
182  thread_list_(NULL) {
183  KALDI_ASSERT((config.num_threads_total <= 0 ||
184  config.num_threads_total >= config.num_threads) &&
185  "num-threads-total, if specified, must be >= num-threads");
186  }
187 
190  void Run(C *c) {
191  // run in main thread
192  if (num_threads_ == 0) {
193  (*c)();
194  delete c;
195  return;
196  }
197 
198  threads_avail_.Wait(); // wait till we have a thread for computation free.
199  tot_threads_avail_.Wait(); // this ensures we don't have too many threads
200  // waiting on I/O, and consume too much memory.
201 
202  // put the new RunTaskArgsList object at head of the singly
203  // linked list thread_list_.
204  thread_list_ = new RunTaskArgsList(this, c, thread_list_);
205  thread_list_->thread = std::thread(TaskSequencer<C>::RunTask,
206  thread_list_);
207  }
208 
209  void Wait() { // You call this at the end if it's more convenient
210  // than waiting for the destructor. It waits for all tasks to finish.
211  if (thread_list_ != NULL) {
212  thread_list_->thread.join();
213  KALDI_ASSERT(thread_list_->tail == NULL); // thread would not
214  // have exited without setting tail to NULL.
215  delete thread_list_;
216  thread_list_ = NULL;
217  }
218  }
219 
222  Wait();
223  }
224  private:
226  TaskSequencer *me; // Think of this as a "this" pointer.
227  C *c; // Clist element of the task we're expected
228  std::thread thread;
231  me(me), c(c), tail(tail) {}
232  };
233  // This static function gets run in the threads that we create.
234  static void RunTask(RunTaskArgsList *args) {
235  // (1) run the job.
236  (*(args->c))(); // call operator () on args->c, which does the computation.
237  args->me->threads_avail_.Signal(); // Signal that the compute-intensive
238  // part of the thread is done (we want to run no more than
239  // config_.num_threads of these.)
240 
241  // (2) we want to destroy the object "c" now, by deleting it. But for
242  // correct sequencing (this is the whole point of this class, it
243  // is intended to ensure the output of the program is in correct order),
244  // we first wait till the previous thread, whose details will be in "tail",
245  // is finished.
246  if (args->tail != NULL) {
247  args->tail->thread.join();
248  }
249 
250  delete args->c; // delete the object "c". This may cause some output,
251  // e.g. to a stream. We don't need to worry about concurrent access to
252  // the output stream, because each thread waits for the previous thread
253  // to be done, before doing this. So there is no risk of concurrent
254  // access.
255  args->c = NULL;
256 
257  if (args->tail != NULL) {
258  KALDI_ASSERT(args->tail->tail == NULL); // Because we already
259  // did join on args->tail->thread, which means that
260  // thread was done, and before it exited, it would have
261  // deleted and set to NULL its tail (which is the next line of code).
262  delete args->tail;
263  args->tail = NULL;
264  }
265  // At this point we are exiting from the thread. Signal the
266  // "tot_threads_avail_" semaphore which is used to limit the total number of threads that are alive, including
267  // not onlhy those that are in active computation in c->operator (), but those
268  // that are waiting on I/O or other threads.
269  args->me->tot_threads_avail_.Signal();
270  }
271 
272  int32 num_threads_; // copy of config.num_threads (since Semaphore doesn't store original count)
273 
274  Semaphore threads_avail_; // Initialized to the number of threads we are
275  // supposed to run with; the function Run() waits on this.
276 
277  Semaphore tot_threads_avail_; // We use this semaphore to ensure we don't
278  // consume too much memory...
280 
281 };
282 
283 } // namespace kaldi
284 
285 #endif // KALDI_THREAD_KALDI_THREAD_H_
virtual void operator()()=0
This code computes Goodness of Pronunciation (GOP) and extracts phone-level pronunciation feature for...
Definition: chain.dox:20
TaskSequencer(const TaskSequencerConfig &config)
Definition: kaldi-thread.h:177
void Run(C *c)
This function takes ownership of the pointer "c", and will delete it in the same sequence as Run was ...
Definition: kaldi-thread.h:190
static void RunTask(RunTaskArgsList *args)
Definition: kaldi-thread.h:234
void Signal()
increase the counter
int32 g_num_threads
Definition: kaldi-thread.cc:25
MultiThreader(int32 num_threads, const C &c_in)
Definition: kaldi-thread.h:119
kaldi::int32 int32
Semaphore tot_threads_avail_
Definition: kaldi-thread.h:277
virtual void Register(const std::string &name, bool *ptr, const std::string &doc)=0
RunTaskArgsList * thread_list_
Definition: kaldi-thread.h:279
void RunMultiThreaded(const C &c_in)
Here, class C should inherit from MultiThreadable.
Definition: kaldi-thread.h:151
std::vector< C > cvec_
Definition: kaldi-thread.h:144
RunTaskArgsList(TaskSequencer *me, C *c, RunTaskArgsList *tail)
Definition: kaldi-thread.h:230
#define KALDI_ASSERT(cond)
Definition: kaldi-error.h:185
~TaskSequencer()
The destructor waits for the last thread to exit.
Definition: kaldi-thread.h:221
Semaphore threads_avail_
Definition: kaldi-thread.h:274
void Register(OptionsItf *opts)
Definition: kaldi-thread.h:160
std::vector< std::thread > threads_
Definition: kaldi-thread.h:143