kaori
A C++ library for barcode extraction and matching
Loading...
Searching...
No Matches
process_data.hpp
Go to the documentation of this file.
1#ifndef KAORI_PROCESS_DATA_HPP
2#define KAORI_PROCESS_DATA_HPP
3
4#include <vector>
5#include <thread>
6#include <mutex>
7#include <condition_variable>
8#include <stdexcept>
9#include <cstddef>
10
11#include "FastqReader.hpp"
12
13#include "byteme/Reader.hpp"
14
21namespace kaori {
22
26typedef std::size_t ReadIndex;
27
28class ChunkOfReads {
29public:
30 ChunkOfReads() : my_sequence_offset(1), my_name_offset(1) {} // zero is always the first element.
31
32 void clear(bool use_names) {
33 my_sequence_buffer.clear();
34 my_sequence_offset.resize(1);
35 if (use_names) {
36 my_name_buffer.clear();
37 my_name_offset.resize(1);
38 }
39 }
40
41 void add_read_sequence(const std::vector<char>& sequence) {
42 add_read_details(sequence, my_sequence_buffer, my_sequence_offset);
43 }
44
45 void add_read_name(const std::vector<char>& name) {
46 add_read_details(name, my_name_buffer, my_name_offset);
47 }
48
49 ReadIndex size() const {
50 return my_sequence_offset.size() - 1;
51 }
52
53 std::pair<const char*, const char*> get_sequence(ReadIndex i) const {
54 return get_details(i, my_sequence_buffer, my_sequence_offset);
55 }
56
57 std::pair<const char*, const char*> get_name(ReadIndex i) const {
58 return get_details(i, my_name_buffer, my_name_offset);
59 }
60
61private:
62 std::vector<char> my_sequence_buffer;
63 std::vector<std::size_t> my_sequence_offset;
64 std::vector<char> my_name_buffer;
65 std::vector<std::size_t> my_name_offset;
66
67 static void add_read_details(const std::vector<char>& src, std::vector<char>& dst, std::vector<std::size_t>& offset) {
68 dst.insert(dst.end(), src.begin(), src.end());
69 auto last = offset.back();
70 offset.push_back(last + src.size());
71 }
72
73 static std::pair<const char*, const char*> get_details(ReadIndex i, const std::vector<char>& dest, const std::vector<std::size_t>& offset) {
74 const char * base = dest.data();
75 return std::make_pair(base + offset[i], base + offset[i + 1]);
76 }
77};
78
79template<typename Workspace_>
80class ThreadPool {
81public:
82 template<typename RunJob_>
83 ThreadPool(RunJob_ run_job, int num_threads) : my_helpers(num_threads) {
84 std::mutex init_mut;
85 std::condition_variable init_cv;
86 int num_initialized = 0;
87
88 my_threads.reserve(num_threads);
89 for (int t = 0; t < num_threads; ++t) {
90 // Copy lambda as it will be gone once this constructor finishes.
91 my_threads.emplace_back([run_job,this,&init_mut,&init_cv,&num_initialized](int thread) -> void {
92 Helper env; // allocating this locally within each thread to reduce the risk of false sharing.
93 my_helpers[thread] = &env;
94 {
95 std::lock_guard lck(init_mut);
96 ++num_initialized;
97 init_cv.notify_one();
98 }
99
100 while (1) {
101 std::unique_lock lck(env.mut);
102 env.cv.wait(lck, [&]() -> bool { return env.input_ready; });
103 if (env.terminated) {
104 return;
105 }
106 env.input_ready = false;
107
108 try {
109 run_job(env.work);
110 } catch (...) {
111 std::lock_guard elck(my_error_mut);
112 if (!my_error) {
113 my_error = std::current_exception();
114 }
115 }
116
117 env.has_output = true;
118 env.available = true;
119 env.cv.notify_one();
120 }
121 }, t);
122 }
123
124 // Only returning once all threads (and their specific mutexes) are initialized.
125 {
126 std::unique_lock ilck(init_mut);
127 init_cv.wait(ilck, [&]() -> bool { return num_initialized == num_threads; });
128 }
129 }
130
131 ~ThreadPool() {
132 for (auto envptr : my_helpers) {
133 auto& env = *envptr;
134 {
135 std::lock_guard lck(env.mut);
136 env.terminated = true;
137 env.input_ready = true;
138 }
139 env.cv.notify_one();
140 }
141 for (auto& thread : my_threads) {
142 thread.join();
143 }
144 }
145
146private:
147 std::vector<std::thread> my_threads;
148
149 struct Helper {
150 std::mutex mut;
151 std::condition_variable cv;
152 bool input_ready = false;
153 bool available = true;
154 bool has_output = false;
155 bool terminated = false;
156 Workspace_ work;
157 };
158 std::vector<Helper*> my_helpers;
159
160 std::mutex my_error_mut;
161 std::exception_ptr my_error;
162
163public:
164 template<typename CreateJob_, typename MergeJob_>
165 void run(CreateJob_ create_job, MergeJob_ merge_job) {
166 auto num_threads = my_threads.size();
167 bool finished = false;
168 decltype(num_threads) thread = 0, finished_count = 0;
169
170 // We submit jobs by cycling through all threads, then we merge their results in order of submission.
171 // This is a less efficient worksharing scheme but it guarantees the same order of merges.
172 while (1) {
173 auto& env = (*my_helpers[thread]);
174 std::unique_lock lck(env.mut);
175 env.cv.wait(lck, [&]() -> bool { return env.available; });
176
177 {
178 std::lock_guard elck(my_error_mut);
179 if (my_error) {
180 std::rethrow_exception(my_error);
181 }
182 }
183 env.available = false;
184
185 if (env.has_output) {
186 merge_job(env.work);
187 env.has_output = false;
188 }
189
190 if (finished) {
191 // Go through all threads one last time, making sure all results are merged.
192 ++finished_count;
193 if (finished_count == num_threads) {
194 break;
195 }
196 } else {
197 // 'create_job' is responsible for parsing the FASTQ file and
198 // creating a ChunkOfReads. Unfortunately I can't figure out
199 // how to parallelize the parsing as it is very hard to jump
200 // into a middle of a FASTQ file and figure out where the next
201 // entry is; this is because (i) multiline sequences are legal
202 // and (ii) +/@ are not sufficient delimiters when they can
203 // show up in the quality scores.
204 finished = create_job(env.work);
205 env.input_ready = true;
206 lck.unlock();
207 env.cv.notify_one();
208 }
209
210 ++thread;
211 if (thread == num_threads) {
212 thread = 0;
213 }
214 }
215 }
216};
217
229 int num_threads = 1;
230
234 std::size_t block_size = 65535; // use the smallest maximum value for a size_t.
235};
236
270template<typename Pointer_, class Handler_>
271void process_single_end_data(Pointer_ input, Handler_& handler, const ProcessSingleEndDataOptions& options) {
272 struct SingleEndWorkspace {
273 ChunkOfReads reads;
274 decltype(handler.initialize()) state;
275 };
276
277 FastqReader<Pointer_> fastq(input);
278 const Handler_& conhandler = handler; // Safety measure to enforce const-ness within each thread.
279
280 ThreadPool<SingleEndWorkspace> tp(
281 [&](SingleEndWorkspace& work) -> void {
282 auto& state = work.state;
283 state = conhandler.initialize(); // reinitializing for simplicity and to avoid accumulation of reserved memory.
284 const auto& curreads = work.reads;
285 auto nreads = curreads.size();
286
287 if constexpr(!Handler_::use_names) {
288 for (decltype(nreads) b = 0; b < nreads; ++b) {
289 conhandler.process(state, curreads.get_sequence(b));
290 }
291 } else {
292 for (decltype(nreads) b = 0; b < nreads; ++b) {
293 conhandler.process(state, curreads.get_name(b), curreads.get_sequence(b));
294 }
295 }
296 },
297 options.num_threads
298 );
299
300 tp.run(
301 [&](SingleEndWorkspace& work) -> bool {
302 auto& curreads = work.reads;
303 for (ReadIndex b = 0; b < options.block_size; ++b) {
304 if (!fastq()) {
305 return true;
306 }
307
308 curreads.add_read_sequence(fastq.get_sequence());
309 if constexpr(Handler_::use_names) {
310 curreads.add_read_name(fastq.get_name());
311 }
312 }
313 return false;
314 },
315 [&](SingleEndWorkspace& work) -> void {
316 handler.reduce(work.state);
317 work.reads.clear(Handler_::use_names);
318 }
319 );
320}
321
329 int num_threads = 1;
330
334 std::size_t block_size = 100000;
335};
336
374template<class Pointer_, class Handler_>
375void process_paired_end_data(Pointer_ input1, Pointer_ input2, Handler_& handler, const ProcessPairedEndDataOptions& options) {
376 struct PairedEndWorkspace {
377 ChunkOfReads reads1, reads2;
378 decltype(handler.initialize()) state;
379 };
380
381 FastqReader<Pointer_> fastq1(input1);
382 FastqReader<Pointer_> fastq2(input2);
383 const Handler_& conhandler = handler; // Safety measure to enforce const-ness within each thread.
384
385 ThreadPool<PairedEndWorkspace> tp(
386 [&](PairedEndWorkspace& work) -> void {
387 auto& state = work.state;
388 state = conhandler.initialize(); // reinitializing for simplicity and to avoid accumulation of reserved memory.
389 const auto& curreads1 = work.reads1;
390 const auto& curreads2 = work.reads2;
391 ReadIndex nreads = curreads1.size();
392
393 if constexpr(!Handler_::use_names) {
394 for (ReadIndex b = 0; b < nreads; ++b) {
395 conhandler.process(state, curreads1.get_sequence(b), curreads2.get_sequence(b));
396 }
397 } else {
398 for (ReadIndex b = 0; b < nreads; ++b) {
399 conhandler.process(
400 state,
401 curreads1.get_name(b),
402 curreads1.get_sequence(b),
403 curreads2.get_name(b),
404 curreads2.get_sequence(b)
405 );
406 }
407 }
408 },
409 options.num_threads
410 );
411
412 tp.run(
413 [&](PairedEndWorkspace& work) -> bool {
414 bool finished1 = false;
415 {
416 auto& curreads = work.reads1;
417 for (ReadIndex b = 0; b < options.block_size; ++b) {
418 if (!fastq1()) {
419 finished1 = true;
420 break;
421 }
422
423 curreads.add_read_sequence(fastq1.get_sequence());
424 if constexpr(Handler_::use_names) {
425 curreads.add_read_name(fastq1.get_name());
426 }
427 }
428 }
429
430 bool finished2 = false;
431 {
432 auto& curreads = work.reads2;
433 for (ReadIndex b = 0; b < options.block_size; ++b) {
434 if (!fastq2()) {
435 finished2 = true;
436 break;
437 }
438
439 curreads.add_read_sequence(fastq2.get_sequence());
440 if constexpr(Handler_::use_names) {
441 curreads.add_read_name(fastq2.get_name());
442 }
443 }
444 }
445
446 if (finished1 != finished2 || work.reads1.size() != work.reads2.size()) {
447 throw std::runtime_error("different number of reads in paired FASTQ files");
448 }
449 return finished1;
450 },
451 [&](PairedEndWorkspace& work) -> void {
452 handler.reduce(work.state);
453 work.reads1.clear(Handler_::use_names);
454 work.reads2.clear(Handler_::use_names);
455 }
456 );
457}
458
459}
460
461#endif
Defines the FastqReader class.
Stream reads from a FASTQ file.
Definition FastqReader.hpp:32
const std::vector< char > & get_sequence() const
Definition FastqReader.hpp:147
const std::vector< char > & get_name() const
Definition FastqReader.hpp:156
Namespace for the kaori barcode-matching library.
Definition BarcodePool.hpp:16
void process_single_end_data(Pointer_ input, Handler_ &handler, const ProcessSingleEndDataOptions &options)
Definition process_data.hpp:271
void process_paired_end_data(Pointer_ input1, Pointer_ input2, Handler_ &handler, const ProcessPairedEndDataOptions &options)
Definition process_data.hpp:375
Options for process_paired_end_data().
Definition process_data.hpp:325
std::size_t block_size
Definition process_data.hpp:334
int num_threads
Definition process_data.hpp:329
Options for process_single_end_data().
Definition process_data.hpp:225
std::size_t block_size
Definition process_data.hpp:234
int num_threads
Definition process_data.hpp:229