nnet-batch-compute.cc
Go to the documentation of this file.
1 // nnet3/nnet-batch-compute.cc
2 
3 // Copyright 2012-2018 Johns Hopkins University (author: Daniel Povey)
4 // 2018 Hang Lyu
5 
6 // See ../../COPYING for clarification regarding multiple authors
7 //
8 // Licensed under the Apache License, Version 2.0 (the "License");
9 // you may not use this file except in compliance with the License.
10 // You may obtain a copy of the License at
11 //
12 // http://www.apache.org/licenses/LICENSE-2.0
13 //
14 // THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 // KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED
16 // WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE,
17 // MERCHANTABLITY OR NON-INFRINGEMENT.
18 // See the Apache 2 License for the specific language governing permissions and
19 // limitations under the License.
20 
21 #include <algorithm>
22 #include <iomanip>
24 #include "nnet3/nnet-utils.h"
26 
27 namespace kaldi {
28 namespace nnet3 {
29 
30 
32  const NnetBatchComputerOptions &opts,
33  const Nnet &nnet,
34  const VectorBase<BaseFloat> &priors):
35  opts_(opts),
36  nnet_(nnet),
37  compiler_(nnet_, opts.optimize_config),
38  log_priors_(priors),
39  num_full_minibatches_(0) {
40  log_priors_.ApplyLog();
45 
47  input_dim_ = nnet.InputDim("input");
48  ivector_dim_ = std::max<int32>(0, nnet.InputDim("ivector"));
49  output_dim_ = nnet.OutputDim("output");
50  KALDI_ASSERT(input_dim_ > 0 && output_dim_ > 0);
51 }
52 
54  int32 max_stats_to_print = 10;
55  int64 tot_tasks = 0, tot_minibatches = 0;
56  double tot_time = 0.0;
57  std::ostringstream os;
58  struct MinibatchStats {
59  int32 num_frames_out;
60  int32 num_frames_in;
61  int32 minibatch_size;
62  int32 num_done;
63  int32 percent_full;
64  BaseFloat seconds_taken;
65 
66  bool operator < (const MinibatchStats &other) const {
67  return seconds_taken > other.seconds_taken; // sort from most to least time.
68  }
69  };
70  std::vector<MinibatchStats> all_stats;
71  os << "Minibatch stats: seconds-taken,frames-in:frames-out*minibatch-size=num-done(percent-full%) ";
72 
73  for (MapType::const_iterator iter = tasks_.begin();
74  iter != tasks_.end(); ++iter) {
75  for (std::map<int32, MinibatchSizeInfo>::const_iterator
76  miter = iter->second.minibatch_info.begin();
77  miter != iter->second.minibatch_info.end(); ++miter) {
78  const ComputationGroupKey &key = iter->first;
79  const MinibatchSizeInfo &minfo = miter->second;
80  MinibatchStats stats;
81  stats.num_frames_in = key.num_input_frames;
82  stats.num_frames_out = key.num_output_frames;
83  stats.minibatch_size = miter->first;
84  stats.num_done = minfo.num_done;
85  stats.seconds_taken = minfo.seconds_taken;
86 
87  tot_tasks += minfo.tot_num_tasks;
88  tot_minibatches += minfo.num_done;
89  tot_time += minfo.seconds_taken;
90  stats.percent_full = int32(minfo.tot_num_tasks * 100.0 /
91  (stats.minibatch_size * stats.num_done));
92  all_stats.push_back(stats);
93  }
94  }
95 
96  std::sort(all_stats.begin(), all_stats.end());
97  os << std::fixed << std::setprecision(2);
98  int32 num_stats = all_stats.size();
99  for (int32 i = 0; i < std::min<int32>(num_stats, max_stats_to_print); i++) {
100  MinibatchStats &stats = all_stats[i];
101  os << stats.seconds_taken << ',' << stats.num_frames_in << ':'
102  << stats.num_frames_out << '*' << stats.minibatch_size
103  << '=' << stats.num_done << '(' << stats.percent_full << "%) ";
104  }
105  if (num_stats > max_stats_to_print)
106  os << "...";
107  KALDI_LOG << os.str();
108  KALDI_LOG << "Did " << tot_tasks << " tasks in " << tot_minibatches
109  << " minibatches, taking " << tot_time << " seconds.";
110 }
111 
114  // the destructor shouldn't be called while the mutex is locked; if it is, it
115  // likely means the program has already crashed, or it's a programming error.
116  if (!mutex_.try_lock())
117  KALDI_ERR << "Destructor called while object locked.";
118  int32 num_pending_tasks = 0;
119  for (auto iter = tasks_.begin(); iter != tasks_.end(); ++iter)
120  num_pending_tasks += iter->second.tasks.size();
121  if (num_pending_tasks > 0)
122  KALDI_ERR << "Tasks are pending but object is being destroyed";
123  for (auto iter = no_more_than_n_minibatches_full_.begin();
124  iter != no_more_than_n_minibatches_full_.end(); ++iter) {
125  std::condition_variable *cond = iter->second;
126  // the next call will notify any threads that were waiting on this condition
127  // variable-- there shouldn't be any, though, as it would be a programming
128  // error, but better to wake them up so we can see any messages they print.
129  cond->notify_all();
130  delete cond;
131  }
132  KALDI_ASSERT(num_full_minibatches_ == 0); // failure would be a coding error.
133 }
134 
137  bool allow_partial_minibatch,
138  int32 *minibatch_size_out,
139  std::vector<NnetInferenceTask*> *tasks) {
140  tasks->clear();
141  std::unique_lock<std::mutex> lock(mutex_);
142  MapType::iterator iter = tasks_.begin(), end = tasks_.end(),
143  best_iter = tasks_.end();
144  double highest_priority = -std::numeric_limits<double>::infinity();
145 
146  for (; iter != end; ++iter) {
147  ComputationGroupInfo &info = iter->second;
148  double this_priority = GetPriority(allow_partial_minibatch, info);
149  if (this_priority > highest_priority) {
150  highest_priority = this_priority;
151  best_iter = iter;
152  }
153  }
154  if (best_iter == tasks_.end()) {
155  // either allow_partial_minibatch == false and there were no full
156  // minibatches, or there were no pending tasks at all.
157  return NULL;
158  }
159  ComputationGroupInfo &info = best_iter->second;
160  int32 actual_minibatch_size = GetActualMinibatchSize(info);
161  *minibatch_size_out = actual_minibatch_size;
162  MinibatchSizeInfo *minfo = &(info.minibatch_info[actual_minibatch_size]);
163  if (minfo->computation == NULL)
164  minfo->computation = GetComputation(info, actual_minibatch_size);
165  GetHighestPriorityTasks(actual_minibatch_size, &info, tasks);
166  return minfo;
167 }
168 
169 
171  int32 num_tasks_needed,
172  ComputationGroupInfo *info,
173  std::vector<NnetInferenceTask*> *tasks) {
174  int32 num_tasks_present = info->tasks.size(),
175  minibatch_size = GetMinibatchSize(*info);
176  KALDI_ASSERT(tasks->empty());
177  if (num_tasks_needed >= num_tasks_present) {
178  tasks->swap(info->tasks);
179  } else {
180  int32 num_tasks_not_needed = num_tasks_present - num_tasks_needed;
181  // We don't sort the tasks with a comparator that dereferences the pointers,
182  // because the priorities can change asynchronously, and we're concerned that
183  // something weird might happen in the sorting if the things it's comparing
184  // are changing.
185  std::vector<std::pair<double, NnetInferenceTask*> > pairs(num_tasks_present);
186  for (int32 i = 0; i < num_tasks_present; i++) {
187  pairs[i].first = info->tasks[i]->priority;
188  pairs[i].second = info->tasks[i];
189  }
190  std::nth_element(pairs.begin(), pairs.begin() + num_tasks_not_needed,
191  pairs.end());
192 
193  // The lowest-priority 'num_tasks_not_needed' stay in the 'info' struct.
194  info->tasks.clear();
195  for (int32 i = 0; i < num_tasks_not_needed; i++)
196  info->tasks.push_back(pairs[i].second);
197  // The highest-priority 'num_tasks_needed' tasks go to the output 'tasks'
198  // array.
199  for (int32 i = num_tasks_not_needed; i < num_tasks_present; i++)
200  tasks->push_back(pairs[i].second);
201  // The following assertion checks that the is_edge and is_irregular values
202  // are the same for the entire minibatch, which they should always be.
203  KALDI_ASSERT(GetMinibatchSize(*info) == minibatch_size);
204  }
205 
206  {
207  // This block updates num_full_minibatches_ and notifies threads waiting on
208  // any related condition variable.
209  int32 new_num_tasks_present = info->tasks.size(),
210  full_minibatch_reduction =
211  (num_tasks_present / minibatch_size) -
212  (new_num_tasks_present / minibatch_size);
213  for (int32 i = 0; i < full_minibatch_reduction; i++) {
216  std::unordered_map<int32, std::condition_variable*>::const_iterator
218  if (iter != no_more_than_n_minibatches_full_.end()) {
219  std::condition_variable *cond = iter->second;
220  cond->notify_all();
221  }
222  }
223  }
224 }
225 
226 
228  const ComputationGroupInfo &info) const {
229  if (info.tasks.empty()) {
230  return opts_.minibatch_size; // actually it shouldn't matter what we return
231  // in this case.
232  }
233  const NnetInferenceTask &task = *(info.tasks[0]);
234  if (task.is_irregular)
235  return 1;
236  else if (task.is_edge)
237  return opts_.edge_minibatch_size;
238  else
239  return opts_.minibatch_size;
240 }
241 
243  const ComputationGroupInfo &info) const {
244  KALDI_ASSERT(!info.tasks.empty());
245  int32 num_tasks = info.tasks.size(),
246  this_minibatch_size = GetMinibatchSize(info);
247  KALDI_ASSERT(num_tasks > 0);
248  while (num_tasks <
249  int32(opts_.partial_minibatch_factor * this_minibatch_size))
250  this_minibatch_size *= opts_.partial_minibatch_factor;
251  return int32(this_minibatch_size);
252 }
253 
254 
255 std::shared_ptr<const NnetComputation> NnetBatchComputer::GetComputation(
256  const ComputationGroupInfo &info,
257  int32 minibatch_size) {
258  KALDI_ASSERT(!info.tasks.empty());
259  // note: all the tasks will have the same structure, in the respects that
260  // would affect the computation.
261  NnetInferenceTask *example_task = info.tasks[0];
262  ComputationRequest request;
263  GetComputationRequest(*example_task, minibatch_size, &request);
264  return compiler_.Compile(request);
265 }
266 
267 
268 double NnetBatchComputer::GetPriority(bool allow_partial_minibatch,
269  const ComputationGroupInfo &info) const {
270  if (info.tasks.empty())
271  return -std::numeric_limits<double>::infinity();
272  int32 this_minibatch_size = GetMinibatchSize(info);
273  int32 num_tasks = info.tasks.size();
274 
275  if (!allow_partial_minibatch && num_tasks < this_minibatch_size)
276  return -std::numeric_limits<double>::infinity();
277 
278  // penalty_for_not_full will be negative if the minibatch is not full, up to a
279  // maximum of 10. the 10 is a heuristic; it could be changed.
280  // Note: the penalty is effectively infinity if allow_partial_minibatch == false;
281  // see the 'return' above.
282  double proportion_full = std::min<int32>(num_tasks, this_minibatch_size) /
283  double(this_minibatch_size),
284  penalty_for_not_full = 10.0 * (proportion_full - 1.0),
285  task_priority_sum = 0.0;
286 
287 
288  if (num_tasks > this_minibatch_size) {
289  // Get the average of the priorities of the highest-priority tasks (no more
290  // than 'minibatch_size' of them.
291  std::vector<double> priorities;
292  priorities.resize(num_tasks);
293  for (int32 i = 0; i < num_tasks; i++)
294  priorities[i] = info.tasks[i]->priority;
295  // sort from greatest to least.
296  std::nth_element(priorities.begin(),
297  priorities.begin() + this_minibatch_size,
298  priorities.end(),
299  std::greater<double>());
300  for (int32 i = 0; i < this_minibatch_size; i++)
301  task_priority_sum += priorities[i];
302  return penalty_for_not_full + task_priority_sum / this_minibatch_size;
303  } else {
304  for (int32 i = 0; i < num_tasks; i++)
305  task_priority_sum += info.tasks[i]->priority;
306  return penalty_for_not_full + task_priority_sum / num_tasks;
307  }
308 }
309 
310 
311 // static
313  const NnetInferenceTask &task,
314  int32 minibatch_size,
315  ComputationRequest *request) {
316  request->need_model_derivative = false;
317  request->store_component_stats = false;
318  request->inputs.reserve(2);
319 
320  int32 num_input_frames = task.input.NumRows(),
321  first_input_t = task.first_input_t,
322  num_output_frames = task.num_output_frames,
323  output_t_stride = task.output_t_stride;
324  bool has_ivector = (task.ivector.Dim() != 0);
325 
326  std::vector<Index> input_indexes, ivector_indexes, output_indexes;
327  input_indexes.reserve(minibatch_size * num_input_frames);
328  output_indexes.reserve(minibatch_size * num_output_frames);
329  if (has_ivector)
330  ivector_indexes.reserve(minibatch_size);
331 
332  for (int32 n = 0; n < minibatch_size; n++) {
333  for (int32 t = first_input_t; t < first_input_t + num_input_frames; t++)
334  input_indexes.push_back(Index(n, t, 0));
335  if (has_ivector)
336  ivector_indexes.push_back(Index(n, 0, 0));
337  for (int32 t = 0; t < num_output_frames; t++)
338  output_indexes.push_back(Index(n, t * output_t_stride, 0));
339  }
340  request->inputs.push_back(IoSpecification("input", input_indexes));
341  if (has_ivector)
342  request->inputs.push_back(IoSpecification("ivector", ivector_indexes));
343  request->outputs.push_back(IoSpecification("output", output_indexes));
344 }
345 
347  int32 minibatch_size,
348  const std::vector<NnetInferenceTask*> &tasks,
349  CuMatrix<BaseFloat> *input,
350  CuMatrix<BaseFloat> *ivector) {
351  int32 num_input_frames = tasks[0]->input.NumRows(),
352  input_dim = tasks[0]->input.NumCols(),
353  ivector_dim = tasks[0]->ivector.Dim(),
354  num_tasks = tasks.size();
355  KALDI_ASSERT(num_tasks > 0 && num_tasks <= minibatch_size);
356 
357  // destination matrix
358  input->Resize(minibatch_size * num_input_frames, input_dim,
359  kUndefined);
360 
361 #if HAVE_CUDA == 1
362  if (CuDevice::Instantiate().Enabled()) {
363 
364  std::vector<const BaseFloat*> inputs(num_tasks);
365  std::vector<BaseFloat*> outputs(num_tasks);
366  std::vector<int32_t> ldi(num_tasks), ldo(num_tasks);
367  std::vector<int32_t> num_rows(num_tasks), num_cols(num_tasks);
368 
369  // compute matrix descriptions for each copy
370  for (int32 n = 0; n < num_tasks; n++) {
371  const CuMatrix<BaseFloat> &input_mat = tasks[n]->input;
372  CuSubMatrix<BaseFloat> output_mat = input->RowRange(
373  n * num_input_frames, num_input_frames);
374 
375  // create matrix batch description arrays
376  num_rows[n] = num_input_frames;
377  num_cols[n] = input_dim;
378  outputs[n] = output_mat.Data();
379  inputs[n] = input_mat.Data();
380  ldo[n] = output_mat.Stride();
381  ldi[n] = input_mat.Stride();
382  }
383 
384  // execute batched copy
385  cuda_batched_copy_mats(num_tasks, &num_rows[0], &num_cols[0], &inputs[0],
386  &ldi[0], &outputs[0], &ldo[0]);
387 
388  } else
389 #endif
390  {
391  for (int32 n = 0; n < num_tasks; n++) {
392  CuSubMatrix<BaseFloat> input_part(*input,
393  n * num_input_frames, num_input_frames,
394  0, input_dim);
395  input_part.CopyFromMat(tasks[n]->input);
396  }
397  }
398 
399  if (GetVerboseLevel() >=2 ) {
400  if (num_tasks < minibatch_size) {
401  // The following will make things easier to debug if something fails, but
402  // shouldn't be strictly necessary.
403  // the -1 means 'take all remaining rows'.
404  input->RowRange(num_tasks * num_input_frames,
405  (minibatch_size - num_tasks) * num_input_frames).SetZero();
406  }
407  }
408 
409  if (ivector_dim != 0) {
410  ivector->Resize(minibatch_size, ivector_dim, kUndefined);
411 
412 #if HAVE_CUDA == 1
413  if (CuDevice::Instantiate().Enabled()) {
414 
415  // using the batched matrix copy routine for this. This isn't
416  // extremely efficient but the kernel takes a minimal amount of
417  // time so making a batched vector copy is not worth the effort.
418  std::vector<const BaseFloat*> inputs(num_tasks);
419  std::vector<BaseFloat*> outputs(num_tasks);
420  std::vector<int32_t> ldi(num_tasks), ldo(num_tasks);
421  std::vector<int32_t> num_rows(num_tasks), num_cols(num_tasks);
422 
423  // compute source pointers for each input
424  for (int32 n = 0; n < num_tasks; n++) {
425  const CuVector<BaseFloat> &input_vec = tasks[n]->ivector;
426  CuSubVector<BaseFloat> output_vec = ivector->Row(n);
427  // create matrix batch description arrays
428  num_rows[n] = 1;
429  num_cols[n] = ivector_dim;
430  outputs[n] = output_vec.Data();
431  inputs[n] = input_vec.Data();
432  ldo[n] = 1;
433  ldi[n] = 1;
434  }
435 
436  // execute batched copy
437  cuda_batched_copy_mats(num_tasks, &num_rows[0], &num_cols[0], &inputs[0], &ldi[0],
438  &outputs[0], &ldo[0]);
439 
440  } else
441 #endif
442  {
443  for (int32 n = 0; n < num_tasks; n++) {
444  ivector->Row(n).CopyFromVec(tasks[n]->ivector);
445  }
446  }
447 
448  if (GetVerboseLevel() >= 2) {
449  if (num_tasks < minibatch_size) {
450  // The following will make things easier to debug if something fails, but
451  // shouldn't be strictly necessary.
452  // the -1 means 'take all remaining rows'.
453  ivector->RowRange(num_tasks, minibatch_size - num_tasks).SetZero();
454  }
455  }
456  }
457 }
458 
460  const CuMatrix<BaseFloat> &output,
461  const std::vector<NnetInferenceTask*> &tasks) {
462  KALDI_ASSERT(!tasks.empty());
463  int32 num_output_frames = tasks[0]->num_output_frames,
464  output_dim = output.NumCols(),
465  num_tasks = tasks.size();
466  bool did_output_to_gpu = false;
467 
468  // We don't bother zeroing frames of the output that are unused, but you could
469  // un-comment the commented lines of code below to do so and add equivalent
470  // calls to the cuda version.
471 
472 #if HAVE_CUDA == 1
473  if (CuDevice::Instantiate().Enabled()) {
474 
475  std::vector<const BaseFloat*> inputs(num_tasks);
476  std::vector<BaseFloat*> outputs(num_tasks);
477  std::vector<int32_t> ldi(num_tasks), ldo(num_tasks);
478  std::vector<int32_t> num_rows(num_tasks), num_cols(num_tasks);
479 
480  int b=0; // batch counter
481  for (int32 n = 0; n < num_tasks; n++) {
482  NnetInferenceTask *task = tasks[n];
483 
484  int32 left_unused = task->num_initial_unused_output_frames,
485  used = task->num_used_output_frames;
486  // int32 right_unused = num_output_frames - used - left_unused;
487 
488  // TODO do we really expect different tasks to output CPU or GPU?
489  // This adds a bit of code complexity. Perhaps output_to_cpu should
490  // be a property of the batch computer and not the tasks
491  if (task->output_to_cpu) {
492  task->output_cpu.Resize(num_output_frames, output_dim,
493  kUndefined);
494  // if (left_unused > 0)
495  // task->output_cpu.RowRange(0, left_unused).SetZero();
496  task->output_cpu.RowRange(left_unused, used).CopyFromMat(
497  output.RowRange(n * num_output_frames + left_unused, used));
498  // if (right_unused > 0)
499  // task->output_cpu.RowRange(
500  // 0, left_unused + used, right_unused).SetZero();
501 
502  } else {
503  did_output_to_gpu = true;
504  task->output.Resize(num_output_frames, output_dim,
505  kUndefined);
506 
507  CuSubMatrix<BaseFloat> output_mat = task->output.RowRange(
508  left_unused, used);
509  const CuSubMatrix<BaseFloat> input_mat = output.RowRange(
510  n * num_output_frames + left_unused, used);
511 
512  // create matrix batch description arrays
513  num_rows[b] = output_mat.NumRows();
514  num_cols[b] = output_mat.NumCols();
515  outputs[b] = output_mat.Data();
516  inputs[b] = input_mat.Data();
517  ldo[b] = output_mat.Stride();
518  ldi[b] = input_mat.Stride();
519  b++; // increase batch count
520  }
521  }
522 
523  // execute batched copy
524  cuda_batched_copy_mats(b, &num_rows[0], &num_cols[0], &inputs[0], &ldi[0],
525  &outputs[0], &ldo[0]);
526 
527  } else
528 #endif
529  {
530  //TODO i don't think all of these paths are actually possible. We should simplify this.
531  //Is it possible to output_to_gpu with HAVE_CUDA == 0 or when the device is disabled?
532  for (int32 n = 0; n < num_tasks; n++) {
533  NnetInferenceTask *task = tasks[n];
534 
535  int32 left_unused = task->num_initial_unused_output_frames,
536  used = task->num_used_output_frames;
537  // int32 right_unused = num_output_frames - used - left_unused;
538 
539  if (task->output_to_cpu) {
540  task->output_cpu.Resize(num_output_frames, output_dim,
541  kUndefined);
542  // if (left_unused > 0)
543  // task->output_cpu.RowRange(0, left_unused).SetZero();
544  task->output_cpu.RowRange(left_unused, used).CopyFromMat(
545  output.RowRange(n * num_output_frames + left_unused, used));
546  // if (right_unused > 0)
547  // task->output_cpu.RowRange(0, left_unused + used, right_unused).SetZero();
548  } else {
549  did_output_to_gpu = true;
550  task->output.Resize(num_output_frames, output_dim,
551  kUndefined);
552  // if (left_unused > 0)
553  // task->output.RowRange(0, left_unused).SetZero();
554  task->output.RowRange(left_unused, used).CopyFromMat(
555  output.RowRange(n * num_output_frames + left_unused, used));
556  // if (right_unused > 0)
557  // task->output.RowRange(0, left_unused + used, right_unused).SetZero();
558  }
559  }
560  }
561  // The output of this function will likely be consumed by another thread.
562  // The following call will make sure the relevant kernels complete before
563  // any kernels from the other thread use the output.
564  if (did_output_to_gpu)
565  SynchronizeGpu();
566 }
567 
569  int32 max_minibatches_full) {
570  std::unique_lock<std::mutex> lock(mutex_);
571 
572  if (max_minibatches_full > 0 && num_full_minibatches_ > max_minibatches_full) {
573  std::unordered_map<int32, std::condition_variable*>::iterator
574  iter = no_more_than_n_minibatches_full_.find(max_minibatches_full);
575  std::condition_variable *cond;
576  if (iter != no_more_than_n_minibatches_full_.end()) {
577  cond = iter->second;
578  } else {
579  cond = new std::condition_variable();
580  no_more_than_n_minibatches_full_[max_minibatches_full] = cond;
581  }
582  while (num_full_minibatches_ > max_minibatches_full)
583  cond->wait(lock);
584  }
585  ComputationGroupKey key(*task);
586  ComputationGroupInfo &info = tasks_[key];
587  info.tasks.push_back(task);
588  int32 minibatch_size = GetMinibatchSize(info);
589  if (static_cast<int32>(info.tasks.size()) % minibatch_size == 0)
591 }
592 
593 bool NnetBatchComputer::Compute(bool allow_partial_minibatch) {
594  int32 minibatch_size;
595  std::vector<NnetInferenceTask*> tasks;
596  MinibatchSizeInfo *minfo =
597  GetHighestPriorityComputation(allow_partial_minibatch,
598  &minibatch_size,
599  &tasks);
600  if (minfo == NULL)
601  return false;
602 
603  Timer tim;
604  Nnet *nnet_to_update = NULL; // we're not doing any update
605  NnetComputer computer(opts_.compute_config, *(minfo->computation),
606  nnet_, nnet_to_update);
607 
608 
609  CuMatrix<BaseFloat> input;
610  CuMatrix<BaseFloat> ivector;
611  FormatInputs(minibatch_size, tasks, &input, &ivector);
612  computer.AcceptInput("input", &input);
613  if (ivector.NumRows() != 0)
614  computer.AcceptInput("ivector", &ivector);
615  computer.Run();
616  CuMatrix<BaseFloat> output;
617  computer.GetOutputDestructive("output", &output);
618  if (log_priors_.Dim() != 0) {
619  output.AddVecToRows(-1.0, log_priors_);
620  }
621  output.Scale(opts_.acoustic_scale);
622  FormatOutputs(output, tasks);
623 
624  // Update the stats, for diagnostics.
625  minfo->num_done++;
626  minfo->tot_num_tasks += static_cast<int64>(tasks.size());
627  minfo->seconds_taken += tim.Elapsed();
628 
629  SynchronizeGpu();
630 
631  for (size_t i = 0; i < tasks.size(); i++)
632  tasks[i]->semaphore.Signal();
633 
634  return true;
635 }
636 
637 
642 namespace utterance_splitting {
662  const NnetBatchComputerOptions &opts,
663  int32 num_subsampled_frames,
664  int32 num_subsampled_frames_per_chunk,
665  std::vector<NnetInferenceTask> *tasks) {
666  KALDI_ASSERT(num_subsampled_frames > 0);
667  int32 fpc = num_subsampled_frames_per_chunk;
668  int32 num_tasks = (num_subsampled_frames + fpc - 1) / fpc;
669  tasks->resize(num_tasks);
670  for (int32 i = 0; i < num_tasks; i++) {
671  (*tasks)[i].output_t_stride = opts.frame_subsampling_factor;
672  }
673  if (num_subsampled_frames <= fpc) { // there is one chunk.
674  KALDI_ASSERT(num_tasks == 1); // TODO: remove this.
675  NnetInferenceTask &task = (*tasks)[0];
677  if (opts.ensure_exact_final_context) {
678  task.num_output_frames = num_subsampled_frames;
680  task.num_used_output_frames = num_subsampled_frames;
681  task.is_irregular = true;
682  } else {
683  task.num_output_frames = fpc;
685  task.num_used_output_frames = num_subsampled_frames;
686  task.is_irregular = false;
687  }
688  } else {
689  for (int32 i = 0; i + 1 < num_tasks; i++) {
690  NnetInferenceTask &task = (*tasks)[i];
691  task.num_output_frames = fpc;
693  task.num_used_output_frames = fpc;
694  task.first_used_output_frame_index = i * fpc;
695  task.is_irregular = false;
696  }
697  // The last chunk will end on the last frame of the file, but we won't use
698  // the part of its output that overlaps with the preceding chunk.
699  NnetInferenceTask &task = (*tasks)[num_tasks - 1];
700  task.num_output_frames = fpc;
701  task.num_initial_unused_output_frames = ((num_tasks - 1) * fpc) -
702  (num_subsampled_frames - fpc);
704  num_subsampled_frames - ((num_tasks - 1) * fpc);
705  task.first_used_output_frame_index = (num_tasks - 1) * fpc;
706  task.is_irregular = false;
707  }
708 
709  if (true) {
710  // Do some checking. TODO: remove this.
711  KALDI_ASSERT((*tasks)[0].first_used_output_frame_index == 0);
712  for (int32 i = 1; i < num_tasks; i++) {
713  KALDI_ASSERT((*tasks)[i].first_used_output_frame_index ==
714  (*tasks)[i-1].first_used_output_frame_index +
715  (*tasks)[i-1].num_used_output_frames);
716  }
717  KALDI_ASSERT((*tasks)[num_tasks-1].first_used_output_frame_index +
718  (*tasks)[num_tasks-1].num_used_output_frames ==
719  num_subsampled_frames);
720  for (int32 i = 0; i < num_tasks; i++) {
721  const NnetInferenceTask &task = (*tasks)[i];
724  task.num_output_frames);
725  }
726  }
727 }
728 
730  const NnetBatchComputerOptions &opts,
731  const CuMatrix<BaseFloat> &online_ivectors,
732  int32 online_ivector_period,
733  std::vector<NnetInferenceTask> *tasks) {
735  num_tasks = tasks->size();
736  for (int32 i = 0; i < num_tasks; i++) {
737  NnetInferenceTask &task = (*tasks)[i];
738  // begin_output_t and end_output_t are the subsampled frame indexes at
739  // the output; you'd have to multiply them by f to get real frame indexes.
740  int32 begin_output_t = task.first_used_output_frame_index -
742  mid_output_t = begin_output_t + (task.num_output_frames / 2),
743  mid_input_t = mid_output_t * f,
744  ivector_frame = mid_input_t / online_ivector_period,
745  num_ivector_frames = online_ivectors.NumRows(),
746  margin_in_frames = 20,
747  margin_in_ivector_frames =
748  (margin_in_frames + online_ivector_period - 1) / online_ivector_period;
749  // the 'margin' is our tolerance for when the number of rows of
750  // 'online_ivectors' is less than what we expected; we allow 20 frames of
751  // tolerance in the numbering of the original (input) features.
752  if (ivector_frame >= num_ivector_frames) {
753  if (num_ivector_frames > 0 && ivector_frame > num_ivector_frames -
754  margin_in_ivector_frames) {
755  ivector_frame = num_ivector_frames - 1; // Just take the last available one.
756  } else {
757  KALDI_ERR << "Could not get iVector for frame " << ivector_frame
758  << ", online-ivectors matrix has "
759  << online_ivectors.NumRows()
760  << " rows. Mismatched --online-ivector-period?";
761  }
762  }
763  task.ivector = online_ivectors.Row(ivector_frame);
764  }
765 }
766 
767 
768 
780  int32 nnet_left_context,
781  int32 nnet_right_context,
782  const CuMatrix<BaseFloat> &input,
783  std::vector<NnetInferenceTask> *tasks) {
784  int32 num_input_frames = input.NumRows(),
785  f = opts.frame_subsampling_factor,
786  num_subsampled_frames = (num_input_frames + f - 1) / f,
787  extra_left_context_initial = (opts.extra_left_context_initial < 0 ?
788  opts.extra_left_context :
790  extra_right_context_final = (opts.extra_right_context_final < 0 ?
791  opts.extra_right_context :
793  num_tasks = tasks->size();
794 
795  for (int32 i = 0; i < num_tasks; i++) {
796  NnetInferenceTask &task = (*tasks)[i];
797  // begin_output_t and end_output_t are the subsampled frame indexes at
798  // the output; you'd have to multiply them by f to get real frame indexes.
799  int32 begin_output_t = task.first_used_output_frame_index -
801  end_output_t = begin_output_t + task.num_output_frames;
802  // begin_input_t and end_input_t are the real 't' values corresponding to
803  // begin_output_t and end_output_t; they are the beginning and end
804  // (i.e. first and last-plus-one) frame indexes without any left or right
805  // context.
806  int32 begin_input_t = begin_output_t * f,
807  end_input_t = end_output_t * f;
808  // Detect whether the left and right edges touch (or pass over) the left
809  // and right boundaries. Note: we don't expect begin_output_t to ever be
810  // negative.
811  bool left_edge = (begin_output_t <= 0),
812  right_edge = (end_output_t >= num_subsampled_frames);
813  int32 tot_left_context = nnet_left_context +
814  (left_edge ? extra_left_context_initial : opts.extra_left_context),
815  tot_right_context = nnet_right_context +
816  (right_edge ? extra_right_context_final : opts.extra_right_context);
817 
818  // 'is_edge' is only true if it's an edge minibatch *and* its being an
819  // edge actually made a difference to the structure of the example.
820  task.is_edge =
821  (tot_left_context != nnet_left_context + opts.extra_left_context ||
822  tot_right_context != nnet_right_context + opts.extra_right_context);
823 
824  int32 begin_input_t_padded = begin_input_t - tot_left_context,
825  end_input_t_padded = end_input_t + tot_right_context;
826 
827  // 'task.first_input_t' is a representation of 'begin_input_t_padded' in a
828  // shifted/normalized numbering where the output time indexes start from
829  // zero.
830  task.first_input_t = begin_input_t_padded - (begin_output_t * f);
831 
832  task.input.Resize(end_input_t_padded - begin_input_t_padded,
833  input.NumCols(), kUndefined);
834 
835  // Copy from intput into task input with clamping
836  task.input.CopyRangeFromMatClamped(input, begin_input_t_padded,
837  end_input_t_padded, 0, num_input_frames-1);
838  }
839 }
840 
841 } // namespace utterance_splitting
842 
844  bool output_to_cpu,
845  const Matrix<BaseFloat> &input,
846  const Vector<BaseFloat> *h_ivector,
847  const Matrix<BaseFloat> *h_online_ivectors,
848  int32 online_ivector_period,
849  std::vector<NnetInferenceTask> *tasks) {
850 
851  // Inputs are expected to be in device memory.
852  // create temporary device arrays and copy
853  // inputs into them
854  CuMatrix<BaseFloat> cu_input(input);
855  CuVector<BaseFloat> cu_ivector, *ivector = NULL;
856  CuMatrix<BaseFloat> cu_online_ivectors, *online_ivectors = NULL;
857 
858  if (h_ivector!=NULL) {
859  cu_ivector.Resize(h_ivector->Dim(), kUndefined);
860  cu_ivector.CopyFromVec(*h_ivector);
861  ivector = &cu_ivector;
862  }
863  if (h_online_ivectors!=NULL) {
864  cu_online_ivectors.Resize(h_online_ivectors->NumRows(), h_online_ivectors->NumCols(), kUndefined);
865  cu_online_ivectors.CopyFromMat(*h_online_ivectors);
866  online_ivectors = &cu_online_ivectors;
867  }
868 
869  SplitUtteranceIntoTasks(output_to_cpu, cu_input, ivector,
870  online_ivectors, online_ivector_period, tasks);
871 }
872 
874  bool output_to_cpu,
875  const CuMatrix<BaseFloat> &input,
876  const CuVector<BaseFloat> *ivector,
877  const CuMatrix<BaseFloat> *online_ivectors,
878  int32 online_ivector_period,
879  std::vector<NnetInferenceTask> *tasks) {
880  using namespace utterance_splitting;
881 
882 
883  { // This block does some checking.
884  if (input.NumCols() != input_dim_) {
885  KALDI_ERR << "Input features did not have expected dimension: expected "
886  << input_dim_ << ", got " << input.NumCols();
887  }
888  int32 ivector_dim = (ivector != NULL ? ivector->Dim() :
889  (online_ivectors != NULL ?
890  online_ivectors->NumCols() : 0));
891  if (ivector_dim_ != 0 && ivector_dim == 0)
892  KALDI_ERR << "Model expects i-vectors but none were supplied";
893  else if (ivector_dim_ == 0 && ivector_dim != 0)
894  KALDI_ERR << "You supplied i-vectors but model does not expect them.";
895  else if (ivector_dim != ivector_dim_)
896  KALDI_ERR << "I-vector dimensions mismatch: model expects "
897  << ivector_dim_ << ", you supplied " << ivector_dim;
898  }
899 
900 
901  int32 num_input_frames = input.NumRows(),
903  num_subsampled_frames = (num_input_frames + f - 1) / f,
904  num_subsampled_frames_per_chunk = opts_.frames_per_chunk / f;
905 
906  GetOutputFrameInfoForTasks(opts_, num_subsampled_frames,
907  num_subsampled_frames_per_chunk,
908  tasks);
909 
911  input, tasks);
912 
913 
914  if (ivector != NULL) {
915  KALDI_ASSERT(online_ivectors == NULL);
916 
917 #if HAVE_CUDA == 1
918  if (CuDevice::Instantiate().Enabled()) {
919  int32_t num_tasks = tasks->size();
920 
921  std::vector<const BaseFloat*> inputs(num_tasks);
922  std::vector<BaseFloat*> outputs(num_tasks);
923  std::vector<int32_t> ldi(num_tasks), ldo(num_tasks);
924  std::vector<int32_t> num_rows(num_tasks), num_cols(num_tasks);
925 
926  int b=0; // batch counter
927 
928  for (size_t i = 0; i < tasks->size(); i++) {
929  CuVector<BaseFloat> &output_vec = (*tasks)[i].ivector;
930  const CuVector<BaseFloat> &input_vec = *ivector;
931 
932  output_vec.Resize(input_vec.Dim(), kUndefined);
933 
934  // create matrix batch description arrays
935  num_rows[b] = 1;
936  num_cols[b] = output_vec.Dim();
937  outputs[b] = output_vec.Data();
938  inputs[b] = input_vec.Data();
939  ldo[b] = 0;
940  ldi[b] = 0;
941  b++; // increase batch count
942  }
943 
944  // execute batched copy
945  cuda_batched_copy_mats(b, &num_rows[0], &num_cols[0], &inputs[0], &ldi[0],
946  &outputs[0], &ldo[0]);
947  } else
948 #endif
949  {
950  for (size_t i = 0; i < tasks->size(); i++)
951  (*tasks)[i].ivector = *ivector;
952  }
953 
954  } else if (online_ivectors != NULL) {
955  AddOnlineIvectorsToTasks(opts_, *online_ivectors,
956  online_ivector_period, tasks);
957  }
958 
959  for (size_t i = 0; i < tasks->size(); i++) {
960  (*tasks)[i].output_to_cpu = output_to_cpu;
961  // The priority will be set by the user; this just avoids undefined
962  // behavior.
963  (*tasks)[i].priority = 0.0;
964  }
965 }
966 
967 
969  const std::vector<NnetInferenceTask> &tasks,
970  Matrix<BaseFloat> *output) {
971  int32 num_tasks = tasks.size(),
972  num_output_frames = 0,
973  output_dim = -1;
974  for (int32 i = 0; i < num_tasks; i++) {
975  const NnetInferenceTask &task = tasks[i];
976  num_output_frames += task.num_used_output_frames;
977  if (i == 0) {
978  output_dim = (task.output_to_cpu ?
979  task.output_cpu.NumCols() :
980  task.output.NumCols());
981  }
982  }
983  KALDI_ASSERT(num_output_frames != 0 && output_dim != 0);
984  int32 cur_output_frame = 0;
985  output->Resize(num_output_frames, output_dim);
986  for (int32 i = 0; i < num_tasks; i++) {
987  const NnetInferenceTask &task = tasks[i];
989  num_used = task.num_used_output_frames;
990  KALDI_ASSERT(cur_output_frame == task.first_used_output_frame_index);
991  if (task.output_to_cpu) {
992  output->RowRange(cur_output_frame, num_used).CopyFromMat(
993  task.output_cpu.RowRange(skip, num_used));
994  } else {
995  output->RowRange(cur_output_frame, num_used).CopyFromMat(
996  task.output.RowRange(skip, num_used));
997  }
998  cur_output_frame += num_used;
999  }
1000  KALDI_ASSERT(cur_output_frame == num_output_frames);
1001 }
1003  const std::vector<NnetInferenceTask> &tasks,
1004  CuMatrix<BaseFloat> *output) {
1005  int32 num_tasks = tasks.size(),
1006  num_output_frames = 0,
1007  output_dim = -1;
1008  for (int32 i = 0; i < num_tasks; i++) {
1009  const NnetInferenceTask &task = tasks[i];
1010  num_output_frames += task.num_used_output_frames;
1011  if (i == 0) {
1012  output_dim = (task.output_to_cpu ?
1013  task.output_cpu.NumCols() :
1014  task.output.NumCols());
1015  }
1016  }
1017  KALDI_ASSERT(num_output_frames != 0 && output_dim != 0);
1018  int32 cur_output_frame = 0;
1019  output->Resize(num_output_frames, output_dim, kUndefined);
1020 
1021 #if HAVE_CUDA == 1
1022  if (CuDevice::Instantiate().Enabled()) {
1023 
1024  std::vector<const BaseFloat*> inputs(num_tasks);
1025  std::vector<BaseFloat*> outputs(num_tasks);
1026  std::vector<int32_t> ldi(num_tasks), ldo(num_tasks);
1027  std::vector<int32_t> num_rows(num_tasks), num_cols(num_tasks);
1028 
1029  int b=0; // batch counter
1030  for (int32 i = 0; i < num_tasks; i++) {
1031  const NnetInferenceTask &task = tasks[i];
1033  num_used = task.num_used_output_frames;
1034  KALDI_ASSERT(cur_output_frame == task.first_used_output_frame_index);
1035  if (task.output_to_cpu) {
1036  output->RowRange(cur_output_frame, num_used).CopyFromMat(
1037  task.output_cpu.RowRange(skip, num_used));
1038  } else {
1039  CuSubMatrix<BaseFloat> output_mat =
1040  output->RowRange(cur_output_frame, num_used);
1041  const CuSubMatrix<BaseFloat> input_mat =
1042  task.output.RowRange(skip, num_used);
1043 
1044  // create matrix batch description arrays
1045  num_rows[b] = output_mat.NumRows();
1046  num_cols[b] = output_mat.NumCols();
1047  outputs[b] = output_mat.Data();
1048  inputs[b] = input_mat.Data();
1049  ldo[b] = output_mat.Stride();
1050  ldi[b] = input_mat.Stride();
1051  b++; // increase batch count
1052  }
1053  cur_output_frame += num_used;
1054  }
1055 
1056  // execute batched copy
1057  cuda_batched_copy_mats(b, &num_rows[0], &num_cols[0], &inputs[0], &ldi[0],
1058  &outputs[0], &ldo[0]);
1059 
1060  } else
1061 #endif
1062  {
1063  for (int32 i = 0; i < num_tasks; i++) {
1064  const NnetInferenceTask &task = tasks[i];
1066  num_used = task.num_used_output_frames;
1067  KALDI_ASSERT(cur_output_frame == task.first_used_output_frame_index);
1068  if (task.output_to_cpu) {
1069  output->RowRange(cur_output_frame, num_used).CopyFromMat(
1070  task.output_cpu.RowRange(skip, num_used));
1071  } else {
1072  output->RowRange(cur_output_frame, num_used).CopyFromMat(
1073  task.output.RowRange(skip, num_used));
1074  }
1075  cur_output_frame += num_used;
1076  }
1077  }
1078 
1079  KALDI_ASSERT(cur_output_frame == num_output_frames);
1080 }
1081 
1082 
1084  const NnetBatchComputerOptions &opts,
1085  const Nnet &nnet,
1086  const VectorBase<BaseFloat> &priors):
1087  computer_(opts, nnet, priors),
1088  is_finished_(false),
1089  utterance_counter_(0) {
1090  // 'thread_' will run the Compute() function in the background.
1091  compute_thread_ = std::thread(ComputeFunc, this);
1092 }
1093 
1094 
1096  const std::string &utterance_id,
1097  const Matrix<BaseFloat> &input,
1098  const Vector<BaseFloat> *ivector,
1099  const Matrix<BaseFloat> *online_ivectors,
1100  int32 online_ivector_period) {
1101 
1102  UtteranceInfo *info = new UtteranceInfo();
1103  info->utterance_id = utterance_id;
1104  info->num_tasks_finished = 0;
1105  bool output_to_cpu = true; // This wrapper is for when you need the nnet
1106  // output on CPU, e.g. because you want it
1107  // written to disk. If this needs to be
1108  // configurable in the future, we can make changes
1109  // then.
1111  output_to_cpu, input, ivector, online_ivectors,
1112  online_ivector_period, &(info->tasks));
1113 
1114  // Setting this to a nonzero value will cause the AcceptTask() call below to
1115  // hang until the computation thread has made some progress, if too much
1116  // data is already queued.
1117  int32 max_full_minibatches = 2;
1118 
1119  // Earlier utterances have higher priority, which is important to make sure
1120  // that their corresponding tasks are completed and they can be output to disk.
1121  double priority = -1.0 * (utterance_counter_++);
1122  for (size_t i = 0; i < info->tasks.size(); i++) {
1123  info->tasks[i].priority = priority;
1124  computer_.AcceptTask(&(info->tasks[i]), max_full_minibatches);
1125  }
1126  utts_.push_back(info);
1128 }
1129 
1130 bool NnetBatchInference::GetOutput(std::string *utterance_id,
1131  Matrix<BaseFloat> *output) {
1132  if (utts_.empty())
1133  return false;
1134 
1135  UtteranceInfo *info = *utts_.begin();
1136  std::vector<NnetInferenceTask> &tasks = info->tasks;
1137  int32 num_tasks = tasks.size();
1138  for (; info->num_tasks_finished < num_tasks; ++info->num_tasks_finished) {
1139  Semaphore &semaphore = tasks[info->num_tasks_finished].semaphore;
1140  if (is_finished_) {
1141  semaphore.Wait();
1142  } else {
1143  if (!semaphore.TryWait()) {
1144  // If not all of the tasks of this utterance are ready yet,
1145  // just return false.
1146  return false;
1147  }
1148  }
1149  }
1150  MergeTaskOutput(tasks, output);
1151  *utterance_id = info->utterance_id;
1152  delete info;
1153  utts_.pop_front();
1154  return true;
1155 }
1156 
1158  if (!is_finished_)
1159  KALDI_ERR << "Object destroyed before Finished() was called.";
1160  if (!utts_.empty())
1161  KALDI_ERR << "You should get all output before destroying this object.";
1162  compute_thread_.join();
1163 }
1164 
1166  is_finished_ = true;
1168 }
1169 
1170 // This is run as the thread of class NnetBatchInference.
1172  bool allow_partial_minibatch = false;
1173  while (true) {
1174  // keep calling Compute() as long as it makes progress.
1175  while (computer_.Compute(allow_partial_minibatch));
1176 
1177  // ... then wait on tasks_ready_semaphore_.
1179  if (is_finished_) {
1180  allow_partial_minibatch = true;
1181  while (computer_.Compute(allow_partial_minibatch));
1182  return;
1183  }
1184  }
1185 }
1186 
1187 
1189  const fst::Fst<fst::StdArc> &fst,
1190  const LatticeFasterDecoderConfig &decoder_opts,
1191  const TransitionModel &trans_model,
1192  const fst::SymbolTable *word_syms,
1193  bool allow_partial,
1194  int32 num_threads,
1195  NnetBatchComputer *computer):
1196  fst_(fst), decoder_opts_(decoder_opts),
1197  trans_model_(trans_model), word_syms_(word_syms),
1198  allow_partial_(allow_partial), computer_(computer),
1199  is_finished_(false), tasks_finished_(false), priority_offset_(0.0),
1200  tot_like_(0.0), frame_count_(0), num_success_(0), num_fail_(0),
1201  num_partial_(0) {
1202  KALDI_ASSERT(num_threads > 0);
1203  for (int32 i = 0; i < num_threads; i++)
1204  decode_threads_.push_back(new std::thread(DecodeFunc, this));
1205  compute_thread_ = std::thread(ComputeFunc, this);
1206 }
1207 
1208 void NnetBatchDecoder::SetPriorities(std::vector<NnetInferenceTask> *tasks) {
1209  size_t num_tasks = tasks->size();
1210  double priority_offset = priority_offset_;
1211  for (size_t i = 0; i < num_tasks; i++)
1212  (*tasks)[i].priority = priority_offset - (double)i;
1213 }
1214 
1216  size_t num_tasks = decode_threads_.size(),
1217  new_weight = 1.0 / num_tasks,
1218  old_weight = 1.0 - new_weight;
1219  // The next line is vulnerable to a race condition but if it happened it
1220  // wouldn't matter.
1221  priority_offset_ = priority_offset_ * old_weight + priority * new_weight;
1222 }
1223 
1225  const std::string &utterance_id,
1226  const Matrix<BaseFloat> &input,
1227  const Vector<BaseFloat> *ivector,
1228  const Matrix<BaseFloat> *online_ivectors,
1229  int32 online_ivector_period){
1230  // This function basically does a handshake with one of the decoder threads.
1231  // It may have to wait till one of the decoder threads becomes ready.
1232  input_utterance_.utterance_id = utterance_id;
1233  input_utterance_.input = &input;
1234  input_utterance_.ivector = ivector;
1235  input_utterance_.online_ivectors = online_ivectors;
1236  input_utterance_.online_ivector_period = online_ivector_period;
1237 
1238 
1239  UtteranceOutput *this_output = new UtteranceOutput();
1240  this_output->utterance_id = utterance_id;
1241  pending_utts_.push_back(this_output);
1242 
1245 }
1246 
1248  is_finished_ = true;
1249  for (size_t i = 0; i < decode_threads_.size(); i++)
1251  for (size_t i = 0; i < decode_threads_.size(); i++) {
1252  decode_threads_[i]->join();
1253  delete decode_threads_[i];
1254  decode_threads_[i] = NULL;
1255  }
1256  // don't clear decode_threads_, since its size is needed in the destructor to
1257  // compute timing.
1258 
1259  tasks_finished_ = true;
1261  compute_thread_.join();
1262  return num_success_;
1263 }
1264 
1265 
1267  std::string *utterance_id,
1268  CompactLattice *clat,
1269  std::string *sentence) {
1271  KALDI_ERR << "Don't call this version of GetOutput if you are "
1272  "not determinizing.";
1273  while (true) {
1274  if (pending_utts_.empty())
1275  return false;
1276  if (!pending_utts_.front()->finished)
1277  return false;
1278  UtteranceOutput *this_output = pending_utts_.front();
1279  pending_utts_.pop_front();
1280  if (this_output->compact_lat.NumStates() == 0) {
1281  delete this_output;
1282  // ... and continue round the loop, without returning any output to the
1283  // user for this utterance. Something went wrong in decoding: for
1284  // example, the user specified allow_partial == false and no final-states
1285  // were active on the last frame, or something more unexpected. A warning
1286  // would have been printed in the decoder thread.
1287  } else {
1288  *clat = this_output->compact_lat;
1289  utterance_id->swap(this_output->utterance_id);
1290  sentence->swap(this_output->sentence);
1291  delete this_output;
1292  return true;
1293  }
1294  }
1295 }
1296 
1297 
1299  std::string *utterance_id,
1300  Lattice *lat,
1301  std::string *sentence) {
1303  KALDI_ERR << "Don't call this version of GetOutput if you are "
1304  "determinizing.";
1305  while (true) {
1306  if (pending_utts_.empty())
1307  return false;
1308  if (!pending_utts_.front()->finished)
1309  return false;
1310  UtteranceOutput *this_output = pending_utts_.front();
1311  pending_utts_.pop_front();
1312  if (this_output->lat.NumStates() == 0) {
1313  delete this_output;
1314  // ... and continue round the loop, without returning any output to the
1315  // user for this utterance. Something went wrong in decoding: for
1316  // example, the user specified allow_partial == false and no final-states
1317  // were active on the last frame, or something more unexpected. A warning
1318  // would have been printed in the decoder thread.
1319  } else {
1320  *lat = this_output->lat; // OpenFST has shallow copy so no need to swap.
1321  utterance_id->swap(this_output->utterance_id);
1322  sentence->swap(this_output->sentence);
1323  delete this_output;
1324  return true;
1325  }
1326  }
1327 }
1328 
1330  while (!tasks_finished_) {
1332  bool allow_partial_minibatch = true;
1333  while (computer_->Compute(allow_partial_minibatch));
1334  }
1335 }
1336 
1338  while (true) {
1340  if (is_finished_)
1341  return;
1342 
1343  std::vector<NnetInferenceTask> tasks;
1344  std::string utterance_id;
1345  // we can be confident that the last element of 'pending_utts_' is the one
1346  // for this utterance, as we know exactly at what point in the code the main
1347  // thread will be in AcceptInput().
1348  UtteranceOutput *output_utterance = pending_utts_.back();
1349  {
1350  UtteranceInput input_utterance(input_utterance_);
1351  utterance_id = input_utterance.utterance_id;
1352  bool output_to_cpu = true;
1353  computer_->SplitUtteranceIntoTasks(output_to_cpu,
1354  *(input_utterance.input),
1355  input_utterance.ivector,
1356  input_utterance.online_ivectors,
1357  input_utterance.online_ivector_period,
1358  &tasks);
1359  KALDI_ASSERT(output_utterance->utterance_id == utterance_id);
1361  // Now let input_utterance go out of scope; it's no longer valid as it may
1362  // be overwritten by something else.
1363  }
1364 
1365  SetPriorities(&tasks);
1366  for (size_t i = 0; i < tasks.size(); i++)
1367  computer_->AcceptTask(&(tasks[i]));
1369 
1370  {
1371  int32 frame_offset = 0;
1373  decoder.InitDecoding();
1374 
1375 
1376  for (size_t i = 0; i < tasks.size(); i++) {
1377  NnetInferenceTask &task = tasks[i];
1378  task.semaphore.Wait();
1380 
1381  SubMatrix<BaseFloat> post(task.output_cpu,
1384  0, task.output_cpu.NumCols());
1385  DecodableMatrixMapped decodable(trans_model_, post, frame_offset);
1386  frame_offset += post.NumRows();
1387  decoder.AdvanceDecoding(&decodable);
1388  task.output.Resize(0, 0); // Free some memory.
1389  }
1390 
1391  bool use_final_probs = true;
1392  if (!decoder.ReachedFinal()) {
1393  if (allow_partial_) {
1394  KALDI_WARN << "Outputting partial output for utterance "
1395  << utterance_id << " since no final-state reached\n";
1396  use_final_probs = false;
1397  std::unique_lock<std::mutex> lock(stats_mutex_);
1398  num_partial_++;
1399  } else {
1400  KALDI_WARN << "Not producing output for utterance " << utterance_id
1401  << " since no final-state reached and "
1402  << "--allow-partial=false.\n";
1403  std::unique_lock<std::mutex> lock(stats_mutex_);
1404  num_fail_++;
1405  continue;
1406  }
1407  }
1408  // if we reached this point, we are getting a lattice.
1409  decoder.GetRawLattice(&output_utterance->lat, use_final_probs);
1410  // Let the decoder and the decodable object go out of scope, to save
1411  // memory.
1412  }
1413  ProcessOutputUtterance(output_utterance);
1414  }
1415 }
1416 
1417 
1419  std::unique_lock<std::mutex> lock(stats_mutex_);
1420  num_fail_++;
1421 }
1422 
1424  fst::Connect(&(output->lat));
1425  if (output->lat.NumStates() == 0) {
1426  KALDI_WARN << "Unexpected problem getting lattice for utterance "
1427  << output->utterance_id;
1428  std::unique_lock<std::mutex> lock(stats_mutex_);
1429  num_fail_++;
1430  return;
1431  }
1432 
1433  { // This block accumulates diagnostics, prints log messages, and sets
1434  // output->sentence.
1435  Lattice best_path;
1436  LatticeWeight weight;
1437  ShortestPath(output->lat, &best_path);
1438  std::vector<int32> alignment;
1439  std::vector<int32> words;
1440  GetLinearSymbolSequence(best_path, &alignment, &words, &weight);
1441  int32 num_frames = alignment.size();
1442  if (word_syms_ != NULL) {
1443  std::ostringstream os;
1444  for (size_t i = 0; i < words.size(); i++) {
1445  std::string s = word_syms_->Find(words[i]);
1446  if (s == "")
1447  KALDI_ERR << "Word-id " << words[i] << " not in symbol table.";
1448  os << s << ' ';
1449  }
1450  output->sentence = os.str();
1451  }
1452  double likelihood = -(weight.Value1() + weight.Value2());
1453  // Note: these logging messages will be out-of-order w.r.t. the transcripts
1454  // that are printed to cerr; we keep those transcripts in the same order
1455  // that the utterances were in, but these logging messages may be out of
1456  // order (due to multiple threads).
1457  KALDI_LOG << "Log-like per frame for utterance " << output->utterance_id
1458  << " is " << (likelihood / num_frames) << " over "
1459  << num_frames << " frames.";
1460  KALDI_VLOG(2) << "Cost for utterance " << output->utterance_id << " is "
1461  << weight.Value1() << " + " << weight.Value2();
1462 
1463  std::unique_lock<std::mutex> lock(stats_mutex_);
1464  tot_like_ += likelihood;
1465  frame_count_ += num_frames;
1466  num_success_ += 1;
1467  }
1468 
1471  trans_model_,
1472  &output->lat,
1474  &(output->compact_lat),
1476  KALDI_WARN << "Determinization finished earlier than the beam for "
1477  << "utterance " << output->utterance_id;
1478  output->lat.DeleteStates(); // Save memory.
1479  }
1480 
1481  // We'll write the lattice without acoustic scaling, so we need to reverse
1482  // the scale that we applied when decoding.
1483  BaseFloat acoustic_scale = computer_->GetOptions().acoustic_scale;
1484  if (acoustic_scale != 0.0) {
1486  fst::ScaleLattice(fst::AcousticLatticeScale(1.0 / acoustic_scale),
1487  &(output->compact_lat));
1488  else
1489  fst::ScaleLattice(fst::AcousticLatticeScale(1.0 / acoustic_scale),
1490  &(output->lat));
1491  }
1492  output->finished = true;
1493 }
1494 
1495 
1496 
1498  if (!is_finished_ || !pending_utts_.empty()) {
1499  // At this point the application is bound to fail so raising another
1500  // exception is not a big problem.
1501  KALDI_ERR << "Destroying NnetBatchDecoder object without calling "
1502  "Finished() and consuming the remaining output";
1503  }
1504  // Print diagnostics.
1505 
1506  kaldi::int64 input_frame_count =
1508  int32 num_threads = static_cast<int32>(decode_threads_.size());
1509 
1510  KALDI_LOG << "Overall likelihood per frame was "
1511  << tot_like_ / std::max<int64>(1, frame_count_)
1512  << " over " << frame_count_ << " frames.";
1513 
1514  double elapsed = timer_.Elapsed();
1515  // the +1 below is just to avoid division-by-zero errors.
1516  KALDI_LOG << "Time taken "<< elapsed
1517  << "s: real-time factor assuming 100 frames/sec is "
1518  << (num_threads * elapsed * 100.0 /
1519  std::max<int64>(input_frame_count, 1))
1520  << " (per thread; with " << num_threads << " threads).";
1521  KALDI_LOG << "Done " << num_success_ << " utterances ("
1522  << num_partial_ << " forced out); failed for "
1523  << num_fail_;
1524 }
1525 
1526 
1527 } // namespace nnet3
1528 } // namespace kaldi
int32 words[kMaxOrder]
NnetBatchInference(const NnetBatchComputerOptions &opts, const Nnet &nnet, const VectorBase< BaseFloat > &priors)
void CopyFromMat(const MatrixBase< OtherReal > &src, MatrixTransposeType trans=kNoTrans)
Definition: cu-matrix.cc:344
This code computes Goodness of Pronunciation (GOP) and extracts phone-level pronunciation feature for...
Definition: chain.dox:20
MatrixIndexT Stride() const
Definition: cu-matrix.h:217
int32 InputDim(const std::string &input_name) const
Definition: nnet-nnet.cc:669
static void SplitInputToTasks(const NnetBatchComputerOptions &opts, int32 nnet_left_context, int32 nnet_right_context, const CuMatrix< BaseFloat > &input, std::vector< NnetInferenceTask > *tasks)
This function sets up the &#39;input&#39; and &#39;first_input_t&#39; and &#39;is_edge&#39; members of the &#39;tasks&#39; array; it ...
bool store_component_stats
you should set need_component_stats to true if you need the average-activation and average-derivative...
static void ComputeFunc(NnetBatchInference *object)
const CuSubVector< Real > Row(MatrixIndexT i) const
Definition: cu-matrix.h:670
bool GetRawLattice(Lattice *ofst, bool use_final_probs=true) const
Outputs an FST corresponding to the raw, state-level tracebacks.
MinibatchSizeInfo * GetHighestPriorityComputation(bool allow_partial_minibatch, int32 *minibatch_size, std::vector< NnetInferenceTask *> *tasks)
This function finds and returns the computation corresponding to the highest-priority group of tasks...
bool need_model_derivative
if need_model_derivative is true, then we&#39;ll be doing either model training or model-derivative compu...
void AddOnlineIvectorsToTasks(const NnetBatchComputerOptions &opts, const CuMatrix< BaseFloat > &online_ivectors, int32 online_ivector_period, std::vector< NnetInferenceTask > *tasks)
MatrixIndexT NumCols() const
Returns number of columns (or zero for empty matrix).
Definition: kaldi-matrix.h:67
For an extended explanation of the framework of which grammar-fsts are a part, please see Support for...
Definition: graph.dox:21
bool ReachedFinal() const
says whether a final-state was active on the last frame.
void GetOutputFrameInfoForTasks(const NnetBatchComputerOptions &opts, int32 num_subsampled_frames, int32 num_subsampled_frames_per_chunk, std::vector< NnetInferenceTask > *tasks)
This function figures out how many chunks are needed for this utterance, sets &#39;tasks&#39; to a vector wit...
int32 GetVerboseLevel()
Get verbosity level, usually set via command line &#39;–verbose=&#39; switch.
Definition: kaldi-error.h:60
void Signal()
increase the counter
kaldi::int32 int32
std::vector< IoSpecification > inputs
This class represents a matrix that&#39;s stored on the GPU if we have one, and in memory if not...
Definition: matrix-common.h:71
static void ComputeFunc(NnetBatchDecoder *object)
const fst::SymbolTable * word_syms_
void AcceptInput(const std::string &utterance_id, const Matrix< BaseFloat > &input, const Vector< BaseFloat > *ivector, const Matrix< BaseFloat > *online_ivectors, int32 online_ivector_period)
The user should call this one by one for the utterances that this class needs to compute (intersperse...
bool GetLinearSymbolSequence(const Fst< Arc > &fst, std::vector< I > *isymbols_out, std::vector< I > *osymbols_out, typename Arc::Weight *tot_weight_out)
GetLinearSymbolSequence gets the symbol sequence from a linear FST.
const fst::Fst< fst::StdArc > & fst_
bool TryWait()
Returns true if Wait() goes through.
int32 OutputDim(const std::string &output_name) const
Definition: nnet-nnet.cc:677
struct Index is intended to represent the various indexes by which we number the rows of the matrices...
Definition: nnet-common.h:44
This file contains some miscellaneous functions dealing with class Nnet.
int32 Modulus() const
[Relevant for clockwork RNNs and similar].
Definition: nnet-nnet.cc:658
std::map< int32, MinibatchSizeInfo > minibatch_info
void GetHighestPriorityTasks(int32 num_tasks, ComputationGroupInfo *info, std::vector< NnetInferenceTask *> *tasks)
void Scale(Real value)
Definition: cu-matrix.cc:644
std::vector< std::vector< double > > AcousticLatticeScale(double acwt)
void AcceptInput(const std::string &node_name, CuMatrix< BaseFloat > *input)
e.g.
void CopyFromVec(const CuVectorBase< Real > &src)
Copy functions; these will crash if the dimension do not match.
Definition: cu-vector.cc:1078
void FormatInputs(int32 minibatch_size, const std::vector< NnetInferenceTask *> &tasks, CuMatrix< BaseFloat > *input, CuMatrix< BaseFloat > *ivector)
formats the inputs to the computation and transfers them to GPU.
class NnetInferenceTask represents a chunk of an utterance that is requested to be computed...
void InitDecoding()
InitDecoding initializes the decoding, and should only be used if you intend to call AdvanceDecoding(...
std::shared_ptr< const NnetComputation > computation
void SplitUtteranceIntoTasks(bool output_to_cpu, const Matrix< BaseFloat > &input, const Vector< BaseFloat > *ivector, const Matrix< BaseFloat > *online_ivectors, int32 online_ivector_period, std::vector< NnetInferenceTask > *tasks)
Split a single utterance into a list of separate tasks which can then be given to this class by Accep...
void AddVecToRows(Real alpha, const CuVectorBase< Real > &row, Real beta=1.0)
(for each row r of *this), r = alpha * row + beta * r
Definition: cu-matrix.cc:1261
void ComputeSimpleNnetContext(const Nnet &nnet, int32 *left_context, int32 *right_context)
ComputeSimpleNnetContext computes the left-context and right-context of a nnet.
Definition: nnet-utils.cc:146
int32 GetActualMinibatchSize(const ComputationGroupInfo &info) const
void AcceptTask(NnetInferenceTask *task, int32 max_minibatches_full=-1)
Accepts a task, meaning the task will be queued.
void ScaleLattice(const std::vector< std::vector< ScaleFloat > > &scale, MutableFst< ArcTpl< Weight > > *fst)
Scales the pairs of weights in LatticeWeight or CompactLatticeWeight by viewing the pair (a...
void Finished()
The user should call this after the last input has been provided via AcceptInput().
void FormatOutputs(const CuMatrix< BaseFloat > &output, const std::vector< NnetInferenceTask *> &tasks)
struct rnnlm::@11::@12 n
void SynchronizeGpu()
The function SynchronizeGpu(), which for convenience is defined whether or not we have compiled for C...
Definition: cu-device.cc:638
static void DecodeFunc(NnetBatchDecoder *object)
fst::VectorFst< LatticeArc > Lattice
Definition: kaldi-lattice.h:44
void Resize(MatrixIndexT dim, MatrixResizeType t=kSetZero)
Allocate the memory.
Definition: cu-vector.cc:993
#define KALDI_ERR
Definition: kaldi-error.h:147
const TransitionModel & trans_model_
NnetBatchDecoder(const fst::Fst< fst::StdArc > &fst, const LatticeFasterDecoderConfig &decoder_config, const TransitionModel &trans_model, const fst::SymbolTable *word_syms, bool allow_partial, int32 num_threads, NnetBatchComputer *computer)
Constructor.
#define KALDI_WARN
Definition: kaldi-error.h:150
This class is used for a piece of a CuMatrix.
Definition: matrix-common.h:70
std::shared_ptr< const NnetComputation > GetComputation(const ComputationGroupInfo &info, int32 minibatch_size)
MatrixIndexT Dim() const
Returns the dimension of the vector.
Definition: kaldi-vector.h:64
void UpdatePriorityOffset(double priority)
CuSubMatrix< Real > RowRange(const MatrixIndexT row_offset, const MatrixIndexT num_rows) const
Definition: cu-matrix.h:660
fst::VectorFst< CompactLatticeArc > CompactLattice
Definition: kaldi-lattice.h:46
void SetPriorities(std::vector< NnetInferenceTask > *tasks)
bool operator<(const Int32Pair &a, const Int32Pair &b)
Definition: cu-matrixdim.h:83
const Real * Data() const
Return data pointer (const).
Definition: cu-matrix.h:746
fst::DeterminizeLatticePhonePrunedOptions det_opts
This is the "normal" lattice-generating decoder.
std::shared_ptr< const NnetComputation > Compile(const ComputationRequest &request)
Does the compilation and returns a const pointer to the result, which is owned by this class...
void AcceptInput(const std::string &utterance_id, const Matrix< BaseFloat > &input, const Vector< BaseFloat > *ivector, const Matrix< BaseFloat > *online_ivectors, int32 online_ivector_period)
The user should call this one by one for the utterances that it needs to compute (interspersed with c...
std::list< UtteranceOutput * > pending_utts_
MatrixIndexT NumCols() const
Definition: cu-matrix.h:216
NnetBatchComputer(const NnetBatchComputerOptions &opts, const Nnet &nnet, const VectorBase< BaseFloat > &priors)
Constructor.
double GetPriority(bool allow_partial_minibatch, const ComputationGroupInfo &info) const
SubMatrix< Real > RowRange(const MatrixIndexT row_offset, const MatrixIndexT num_rows) const
Definition: kaldi-matrix.h:209
void AdvanceDecoding(DecodableInterface *decodable, int32 max_num_frames=-1)
This will decode until there are no more frames ready in the decodable object.
A class representing a vector.
Definition: kaldi-vector.h:406
class NnetComputer is responsible for executing the computation described in the "computation" object...
Definition: nnet-compute.h:59
#define KALDI_ASSERT(cond)
Definition: kaldi-error.h:185
CachingOptimizingCompiler compiler_
std::vector< IoSpecification > outputs
MatrixIndexT NumRows() const
Returns number of rows (or zero for empty matrix).
Definition: kaldi-matrix.h:64
NnetBatchComputerOptions opts_
Real * Data()
Returns a pointer to the start of the vector&#39;s data.
Definition: cu-vector.h:72
std::unordered_map< int32, std::condition_variable * > no_more_than_n_minibatches_full_
#define KALDI_VLOG(v)
Definition: kaldi-error.h:156
int32 GetMinibatchSize(const ComputationGroupInfo &info) const
bool Compute(bool allow_partial_minibatch)
Does some kind of computation, choosing the highest-priority thing to compute.
void Resize(const MatrixIndexT r, const MatrixIndexT c, MatrixResizeType resize_type=kSetZero, MatrixStrideType stride_type=kDefaultStride)
Sets matrix to a specified size (zero is OK as long as both r and c are zero).
This is like DecodableMatrixScaledMapped, but it doesn&#39;t support an acoustic scale, and it does support a frame offset, whereby you can state that the first row of &#39;likes&#39; is actually the n&#39;th row of the matrix of available log-likelihoods.
const NnetBatchComputerOptions & GetOptions()
std::list< UtteranceInfo * > utts_
void ProcessOutputUtterance(UtteranceOutput *output)
This class does neural net inference in a way that is optimized for GPU use: it combines chunks of mu...
void GetOutputDestructive(const std::string &output_name, CuMatrix< BaseFloat > *output)
MatrixIndexT NumRows() const
Dimensions.
Definition: cu-matrix.h:215
Provides a vector abstraction class.
Definition: kaldi-vector.h:41
const LatticeFasterDecoderConfig & decoder_opts_
#define KALDI_LOG
Definition: kaldi-error.h:153
double Elapsed() const
Returns time in seconds.
Definition: timer.h:74
void MergeTaskOutput(const std::vector< NnetInferenceTask > &tasks, Matrix< BaseFloat > *output)
Merges together the &#39;output_cpu&#39; (if the &#39;output_to_cpu&#39; members are true) or the &#39;output&#39; members of...
std::vector< std::thread * > decode_threads_
Sub-matrix representation.
Definition: kaldi-matrix.h:988
bool GetOutput(std::string *utterance_id, Matrix< BaseFloat > *output)
The user should call this to obtain output.
bool GetOutput(std::string *utterance_id, CompactLattice *clat, std::string *sentence)
The user should call this to obtain output (This version should only be called if config...
bool DeterminizeLatticePhonePrunedWrapper(const kaldi::TransitionModel &trans_model, MutableFst< kaldi::LatticeArc > *ifst, double beam, MutableFst< kaldi::CompactLatticeArc > *ofst, DeterminizeLatticePhonePrunedOptions opts)
This function is a wrapper of DeterminizeLatticePhonePruned() that works for Lattice type FSTs...
void Wait()
decrease the counter
void Resize(MatrixIndexT rows, MatrixIndexT cols, MatrixResizeType resize_type=kSetZero, MatrixStrideType stride_type=kDefaultStride)
Allocate the memory.
Definition: cu-matrix.cc:50
static void GetComputationRequest(const NnetInferenceTask &task, int32 minibatch_size, ComputationRequest *request)
MatrixIndexT Dim() const
Dimensions.
Definition: cu-vector.h:69
void Run()
This does either the forward or backward computation, depending when it is called (in a typical compu...