kaori
A C++ library for barcode extraction and matching
Loading...
Searching...
No Matches
process_data.hpp
Go to the documentation of this file.
1#ifndef KAORI_PROCESS_DATA_HPP
2#define KAORI_PROCESS_DATA_HPP
3
4#include <vector>
5#include <thread>
6#include <mutex>
7#include <condition_variable>
8#include <stdexcept>
9#include <cstddef>
10
11#include "FastqReader.hpp"
12
13#include "byteme/byteme.hpp"
14
21namespace kaori {
22
26typedef std::size_t ReadIndex;
27
28class ChunkOfReads {
29public:
30 ChunkOfReads() : my_sequence_offset(1), my_name_offset(1) {} // zero is always the first element.
31
32 void clear(bool use_names) {
33 my_sequence_buffer.clear();
34 my_sequence_offset.resize(1);
35 if (use_names) {
36 my_name_buffer.clear();
37 my_name_offset.resize(1);
38 }
39 }
40
41 void add_read_sequence(const std::vector<char>& sequence) {
42 add_read_details(sequence, my_sequence_buffer, my_sequence_offset);
43 }
44
45 void add_read_name(const std::vector<char>& name) {
46 add_read_details(name, my_name_buffer, my_name_offset);
47 }
48
49 ReadIndex size() const {
50 return my_sequence_offset.size() - 1;
51 }
52
53 std::pair<const char*, const char*> get_sequence(ReadIndex i) const {
54 return get_details(i, my_sequence_buffer, my_sequence_offset);
55 }
56
57 std::pair<const char*, const char*> get_name(ReadIndex i) const {
58 return get_details(i, my_name_buffer, my_name_offset);
59 }
60
61private:
62 std::vector<char> my_sequence_buffer;
63 std::vector<std::size_t> my_sequence_offset;
64 std::vector<char> my_name_buffer;
65 std::vector<std::size_t> my_name_offset;
66
67 static void add_read_details(const std::vector<char>& src, std::vector<char>& dst, std::vector<std::size_t>& offset) {
68 dst.insert(dst.end(), src.begin(), src.end());
69 auto last = offset.back();
70 offset.push_back(last + src.size());
71 }
72
73 static std::pair<const char*, const char*> get_details(ReadIndex i, const std::vector<char>& dest, const std::vector<std::size_t>& offset) {
74 const char * base = dest.data();
75 return std::make_pair(base + offset[i], base + offset[i + 1]);
76 }
77};
78
79template<typename Workspace_>
80class ThreadPool {
81public:
82 template<typename RunJob_>
83 ThreadPool(RunJob_ run_job, int num_threads) : my_helpers(num_threads) {
84 std::mutex init_mut;
85 std::condition_variable init_cv;
86 int num_initialized = 0;
87
88 my_threads.reserve(num_threads);
89 for (int t = 0; t < num_threads; ++t) {
90 // Copy lambda as it will be gone once this constructor finishes.
91 my_threads.emplace_back([run_job,this,&init_mut,&init_cv,&num_initialized](int thread) -> void {
92 Helper env; // allocating this locally within each thread to reduce the risk of false sharing.
93 my_helpers[thread] = &env;
94 {
95 std::lock_guard lck(init_mut);
96 ++num_initialized;
97 init_cv.notify_one();
98 }
99
100 while (1) {
101 std::unique_lock lck(env.mut);
102 env.cv.wait(lck, [&]() -> bool { return env.input_ready; });
103 if (env.terminated) {
104 return;
105 }
106 env.input_ready = false;
107
108 try {
109 run_job(env.work);
110 } catch (...) {
111 std::lock_guard elck(my_error_mut);
112 if (!my_error) {
113 my_error = std::current_exception();
114 }
115 }
116
117 env.has_output = true;
118 env.available = true;
119 env.cv.notify_one();
120 }
121 }, t);
122 }
123
124 // Only returning once all threads (and their specific mutexes) are initialized.
125 {
126 std::unique_lock ilck(init_mut);
127 init_cv.wait(ilck, [&]() -> bool { return num_initialized == num_threads; });
128 }
129 }
130
131 ~ThreadPool() {
132 for (auto envptr : my_helpers) {
133 auto& env = *envptr;
134 {
135 std::lock_guard lck(env.mut);
136 env.terminated = true;
137 env.input_ready = true;
138 }
139 env.cv.notify_one();
140 }
141 for (auto& thread : my_threads) {
142 thread.join();
143 }
144 }
145
146private:
147 std::vector<std::thread> my_threads;
148
149 struct Helper {
150 std::mutex mut;
151 std::condition_variable cv;
152 bool input_ready = false;
153 bool available = true;
154 bool has_output = false;
155 bool terminated = false;
156 Workspace_ work;
157 };
158 std::vector<Helper*> my_helpers;
159
160 std::mutex my_error_mut;
161 std::exception_ptr my_error;
162
163public:
164 template<typename CreateJob_, typename MergeJob_>
165 void run(CreateJob_ create_job, MergeJob_ merge_job) {
166 auto num_threads = my_threads.size();
167 bool finished = false;
168 decltype(num_threads) thread = 0, finished_count = 0;
169
170 // We submit jobs by cycling through all threads, then we merge their results in order of submission.
171 // This is a less efficient worksharing scheme but it guarantees the same order of merges.
172 while (1) {
173 auto& env = (*my_helpers[thread]);
174 std::unique_lock lck(env.mut);
175 env.cv.wait(lck, [&]() -> bool { return env.available; });
176
177 {
178 std::lock_guard elck(my_error_mut);
179 if (my_error) {
180 std::rethrow_exception(my_error);
181 }
182 }
183 env.available = false;
184
185 if (env.has_output) {
186 merge_job(env.work);
187 env.has_output = false;
188 }
189
190 if (finished) {
191 // Go through all threads one last time, making sure all results are merged.
192 ++finished_count;
193 if (finished_count == num_threads) {
194 break;
195 }
196 } else {
197 // 'create_job' is responsible for parsing the FASTQ file and
198 // creating a ChunkOfReads. Unfortunately I can't figure out
199 // how to parallelize the parsing as it is very hard to jump
200 // into a middle of a FASTQ file and figure out where the next
201 // entry is; this is because (i) multiline sequences are legal
202 // and (ii) +/@ are not sufficient delimiters when they can
203 // show up in the quality scores.
204 finished = create_job(env.work);
205 env.input_ready = true;
206 lck.unlock();
207 env.cv.notify_one();
208 }
209
210 ++thread;
211 if (thread == num_threads) {
212 thread = 0;
213 }
214 }
215 }
216};
217
229 int num_threads = 1;
230
235 std::size_t buffer_size = 65535;
236
241 std::size_t block_size = 65535; // use the smallest maximum value for a size_t.
242};
243
277template<typename Pointer_, class Handler_>
278void process_single_end_data(Pointer_ input, Handler_& handler, const ProcessSingleEndDataOptions& options) {
279 struct SingleEndWorkspace {
280 ChunkOfReads reads;
281 decltype(handler.initialize()) state;
282 };
283
284 FastqReader<Pointer_> fastq(input, options.buffer_size);
285 const Handler_& conhandler = handler; // Safety measure to enforce const-ness within each thread.
286
287 ThreadPool<SingleEndWorkspace> tp(
288 [&](SingleEndWorkspace& work) -> void {
289 auto& state = work.state;
290 state = conhandler.initialize(); // reinitializing for simplicity and to avoid accumulation of reserved memory.
291 const auto& curreads = work.reads;
292 auto nreads = curreads.size();
293
294 if constexpr(!Handler_::use_names) {
295 for (decltype(nreads) b = 0; b < nreads; ++b) {
296 conhandler.process(state, curreads.get_sequence(b));
297 }
298 } else {
299 for (decltype(nreads) b = 0; b < nreads; ++b) {
300 conhandler.process(state, curreads.get_name(b), curreads.get_sequence(b));
301 }
302 }
303 },
304 options.num_threads
305 );
306
307 tp.run(
308 [&](SingleEndWorkspace& work) -> bool {
309 auto& curreads = work.reads;
310 for (ReadIndex b = 0; b < options.block_size; ++b) {
311 if (!fastq()) {
312 return true;
313 }
314
315 curreads.add_read_sequence(fastq.get_sequence());
316 if constexpr(Handler_::use_names) {
317 curreads.add_read_name(fastq.get_name());
318 }
319 }
320 return false;
321 },
322 [&](SingleEndWorkspace& work) -> void {
323 handler.reduce(work.state);
324 work.reads.clear(Handler_::use_names);
325 }
326 );
327}
328
336 int num_threads = 1;
337
342 std::size_t buffer_size = 65535;
343
348 std::size_t block_size = 100000;
349};
350
388template<class Pointer_, class Handler_>
389void process_paired_end_data(Pointer_ input1, Pointer_ input2, Handler_& handler, const ProcessPairedEndDataOptions& options) {
390 struct PairedEndWorkspace {
391 ChunkOfReads reads1, reads2;
392 decltype(handler.initialize()) state;
393 };
394
395 FastqReader<Pointer_> fastq1(input1, options.buffer_size);
396 FastqReader<Pointer_> fastq2(input2, options.buffer_size);
397 const Handler_& conhandler = handler; // Safety measure to enforce const-ness within each thread.
398
399 ThreadPool<PairedEndWorkspace> tp(
400 [&](PairedEndWorkspace& work) -> void {
401 auto& state = work.state;
402 state = conhandler.initialize(); // reinitializing for simplicity and to avoid accumulation of reserved memory.
403 const auto& curreads1 = work.reads1;
404 const auto& curreads2 = work.reads2;
405 ReadIndex nreads = curreads1.size();
406
407 if constexpr(!Handler_::use_names) {
408 for (ReadIndex b = 0; b < nreads; ++b) {
409 conhandler.process(state, curreads1.get_sequence(b), curreads2.get_sequence(b));
410 }
411 } else {
412 for (ReadIndex b = 0; b < nreads; ++b) {
413 conhandler.process(
414 state,
415 curreads1.get_name(b),
416 curreads1.get_sequence(b),
417 curreads2.get_name(b),
418 curreads2.get_sequence(b)
419 );
420 }
421 }
422 },
423 options.num_threads
424 );
425
426 tp.run(
427 [&](PairedEndWorkspace& work) -> bool {
428 bool finished1 = false;
429 {
430 auto& curreads = work.reads1;
431 for (ReadIndex b = 0; b < options.block_size; ++b) {
432 if (!fastq1()) {
433 finished1 = true;
434 break;
435 }
436
437 curreads.add_read_sequence(fastq1.get_sequence());
438 if constexpr(Handler_::use_names) {
439 curreads.add_read_name(fastq1.get_name());
440 }
441 }
442 }
443
444 bool finished2 = false;
445 {
446 auto& curreads = work.reads2;
447 for (ReadIndex b = 0; b < options.block_size; ++b) {
448 if (!fastq2()) {
449 finished2 = true;
450 break;
451 }
452
453 curreads.add_read_sequence(fastq2.get_sequence());
454 if constexpr(Handler_::use_names) {
455 curreads.add_read_name(fastq2.get_name());
456 }
457 }
458 }
459
460 if (finished1 != finished2 || work.reads1.size() != work.reads2.size()) {
461 throw std::runtime_error("different number of reads in paired FASTQ files");
462 }
463 return finished1;
464 },
465 [&](PairedEndWorkspace& work) -> void {
466 handler.reduce(work.state);
467 work.reads1.clear(Handler_::use_names);
468 work.reads2.clear(Handler_::use_names);
469 }
470 );
471}
472
473}
474
475#endif
Defines the FastqReader class.
Stream reads from a FASTQ file.
Definition FastqReader.hpp:32
const std::vector< char > & get_sequence() const
Definition FastqReader.hpp:154
const std::vector< char > & get_name() const
Definition FastqReader.hpp:163
Namespace for the kaori barcode-matching library.
Definition BarcodePool.hpp:16
void process_single_end_data(Pointer_ input, Handler_ &handler, const ProcessSingleEndDataOptions &options)
Definition process_data.hpp:278
void process_paired_end_data(Pointer_ input1, Pointer_ input2, Handler_ &handler, const ProcessPairedEndDataOptions &options)
Definition process_data.hpp:389
Options for process_paired_end_data().
Definition process_data.hpp:332
std::size_t buffer_size
Definition process_data.hpp:342
std::size_t block_size
Definition process_data.hpp:348
int num_threads
Definition process_data.hpp:336
Options for process_single_end_data().
Definition process_data.hpp:225
std::size_t buffer_size
Definition process_data.hpp:235
std::size_t block_size
Definition process_data.hpp:241
int num_threads
Definition process_data.hpp:229