kaori
A C++ library for barcode extraction and matching
Loading...
Searching...
No Matches
process_data.hpp
Go to the documentation of this file.
1#ifndef KAORI_PROCESS_DATA_HPP
2#define KAORI_PROCESS_DATA_HPP
3
4#include <vector>
5#include <thread>
6#include <mutex>
7#include <condition_variable>
8#include <stdexcept>
9#include <cstddef>
10
11#include "FastqReader.hpp"
12
13#include "byteme/byteme.hpp"
14
21namespace kaori {
22
26typedef std::size_t ReadIndex;
27
28class ChunkOfReads {
29public:
30 ChunkOfReads() : my_sequence_offset(1), my_name_offset(1) {} // zero is always the first element.
31
32 void clear(bool use_names) {
33 my_sequence_buffer.clear();
34 my_sequence_offset.resize(1);
35 if (use_names) {
36 my_name_buffer.clear();
37 my_name_offset.resize(1);
38 }
39 }
40
41 void add_read_sequence(const std::vector<char>& sequence) {
42 add_read_details(sequence, my_sequence_buffer, my_sequence_offset);
43 }
44
45 void add_read_name(const std::vector<char>& name) {
46 add_read_details(name, my_name_buffer, my_name_offset);
47 }
48
49 ReadIndex size() const {
50 return my_sequence_offset.size() - 1;
51 }
52
53 std::pair<const char*, const char*> get_sequence(ReadIndex i) const {
54 return get_details(i, my_sequence_buffer, my_sequence_offset);
55 }
56
57 std::pair<const char*, const char*> get_name(ReadIndex i) const {
58 return get_details(i, my_name_buffer, my_name_offset);
59 }
60
61private:
62 std::vector<char> my_sequence_buffer;
63 std::vector<std::size_t> my_sequence_offset;
64 std::vector<char> my_name_buffer;
65 std::vector<std::size_t> my_name_offset;
66
67 static void add_read_details(const std::vector<char>& src, std::vector<char>& dst, std::vector<std::size_t>& offset) {
68 dst.insert(dst.end(), src.begin(), src.end());
69 auto last = offset.back();
70 offset.push_back(last + src.size());
71 }
72
73 static std::pair<const char*, const char*> get_details(ReadIndex i, const std::vector<char>& dest, const std::vector<std::size_t>& offset) {
74 const char * base = dest.data();
75 return std::make_pair(base + offset[i], base + offset[i + 1]);
76 }
77};
78
79template<typename Workspace_>
80class ThreadPool {
81public:
82 template<typename RunJob_>
83 ThreadPool(RunJob_ run_job, int num_threads) : my_helpers(num_threads) {
84 std::mutex init_mut;
85 std::condition_variable init_cv;
86 int num_initialized = 0;
87
88 my_threads.reserve(num_threads);
89 for (int t = 0; t < num_threads; ++t) {
90 // Copy lambda as it will be gone once this constructor finishes.
91 my_threads.emplace_back([run_job,this,&init_mut,&init_cv,&num_initialized](int thread) -> void {
92 Helper env; // allocating this locally within each thread to reduce the risk of false sharing.
93 my_helpers[thread] = &env;
94
95 {
96 std::lock_guard lck(init_mut);
97 ++num_initialized;
98 }
99 init_cv.notify_one(); // only notify once the lock is released for better performance.
100
101 while (1) {
102 std::unique_lock lck(env.mut);
103 env.cv.wait(lck, [&]() -> bool { return env.input_ready; });
104 if (env.terminated) {
105 return;
106 }
107 env.input_ready = false;
108
109 try {
110 run_job(env.work);
111 } catch (...) {
112 std::lock_guard elck(my_error_mut);
113 if (!my_error) {
114 my_error = std::current_exception();
115 }
116 }
117
118 env.has_output = true;
119 env.available = true;
120 lck.unlock(); // again, release the lock before notifying the waiting thread.
121 env.cv.notify_one();
122 }
123 }, t);
124 }
125
126 // Only returning once all threads (and their specific mutexes) are initialized.
127 {
128 std::unique_lock ilck(init_mut);
129 init_cv.wait(ilck, [&]() -> bool { return num_initialized == num_threads; });
130 }
131 }
132
133 ~ThreadPool() {
134 for (auto envptr : my_helpers) {
135 auto& env = *envptr;
136 {
137 std::lock_guard lck(env.mut);
138 env.terminated = true;
139 env.input_ready = true;
140 }
141 env.cv.notify_one();
142 }
143 for (auto& thread : my_threads) {
144 thread.join();
145 }
146 }
147
148private:
149 std::vector<std::thread> my_threads;
150
151 struct Helper {
152 std::mutex mut;
153 std::condition_variable cv;
154 bool input_ready = false;
155 bool available = true;
156 bool has_output = false;
157 bool terminated = false;
158 Workspace_ work;
159 };
160 std::vector<Helper*> my_helpers;
161
162 std::mutex my_error_mut;
163 std::exception_ptr my_error;
164
165public:
166 template<typename CreateJob_, typename MergeJob_>
167 void run(CreateJob_ create_job, MergeJob_ merge_job) {
168 auto num_threads = my_threads.size();
169 bool finished = false;
170 decltype(num_threads) thread = 0, finished_count = 0;
171
172 // We submit jobs by cycling through all threads, then we merge their results in order of submission.
173 // This is a less efficient worksharing scheme but it guarantees the same order of merges.
174 while (1) {
175 auto& env = (*my_helpers[thread]);
176 std::unique_lock lck(env.mut);
177 env.cv.wait(lck, [&]() -> bool { return env.available; });
178
179 {
180 std::lock_guard elck(my_error_mut);
181 if (my_error) {
182 std::rethrow_exception(my_error);
183 }
184 }
185 env.available = false;
186
187 if (env.has_output) {
188 merge_job(env.work);
189 env.has_output = false;
190 }
191
192 if (finished) {
193 // Go through all threads one last time, making sure all results are merged.
194 ++finished_count;
195 if (finished_count == num_threads) {
196 break;
197 }
198 } else {
199 // 'create_job' is responsible for parsing the FASTQ file and
200 // creating a ChunkOfReads. Unfortunately I can't figure out
201 // how to parallelize the parsing as it is very hard to jump
202 // into a middle of a FASTQ file and figure out where the next
203 // entry is; this is because (i) multiline sequences are legal
204 // and (ii) +/@ are not sufficient delimiters when they can
205 // show up in the quality scores.
206 finished = create_job(env.work);
207 env.input_ready = true;
208 lck.unlock();
209 env.cv.notify_one();
210 }
211
212 ++thread;
213 if (thread == num_threads) {
214 thread = 0;
215 }
216 }
217 }
218};
219
231 int num_threads = 1;
232
237 std::size_t buffer_size = 65535;
238
243 std::size_t block_size = 65535; // use the smallest maximum value for a size_t.
244};
245
279template<typename Pointer_, class Handler_>
280void process_single_end_data(Pointer_ input, Handler_& handler, const ProcessSingleEndDataOptions& options) {
281 struct SingleEndWorkspace {
282 ChunkOfReads reads;
283 decltype(handler.initialize()) state;
284 };
285
286 FastqReader<Pointer_> fastq(input, options.buffer_size);
287 const Handler_& conhandler = handler; // Safety measure to enforce const-ness within each thread.
288
289 ThreadPool<SingleEndWorkspace> tp(
290 [&](SingleEndWorkspace& work) -> void {
291 auto& state = work.state;
292 state = conhandler.initialize(); // reinitializing for simplicity and to avoid accumulation of reserved memory.
293 const auto& curreads = work.reads;
294 auto nreads = curreads.size();
295
296 if constexpr(!Handler_::use_names) {
297 for (decltype(nreads) b = 0; b < nreads; ++b) {
298 conhandler.process(state, curreads.get_sequence(b));
299 }
300 } else {
301 for (decltype(nreads) b = 0; b < nreads; ++b) {
302 conhandler.process(state, curreads.get_name(b), curreads.get_sequence(b));
303 }
304 }
305 },
306 options.num_threads
307 );
308
309 tp.run(
310 [&](SingleEndWorkspace& work) -> bool {
311 auto& curreads = work.reads;
312 for (ReadIndex b = 0; b < options.block_size; ++b) {
313 if (!fastq()) {
314 return true;
315 }
316
317 curreads.add_read_sequence(fastq.get_sequence());
318 if constexpr(Handler_::use_names) {
319 curreads.add_read_name(fastq.get_name());
320 }
321 }
322 return false;
323 },
324 [&](SingleEndWorkspace& work) -> void {
325 handler.reduce(work.state);
326 work.reads.clear(Handler_::use_names);
327 }
328 );
329}
330
338 int num_threads = 1;
339
344 std::size_t buffer_size = 65535;
345
350 std::size_t block_size = 100000;
351};
352
390template<class Pointer_, class Handler_>
391void process_paired_end_data(Pointer_ input1, Pointer_ input2, Handler_& handler, const ProcessPairedEndDataOptions& options) {
392 struct PairedEndWorkspace {
393 ChunkOfReads reads1, reads2;
394 decltype(handler.initialize()) state;
395 };
396
397 FastqReader<Pointer_> fastq1(input1, options.buffer_size);
398 FastqReader<Pointer_> fastq2(input2, options.buffer_size);
399 const Handler_& conhandler = handler; // Safety measure to enforce const-ness within each thread.
400
401 ThreadPool<PairedEndWorkspace> tp(
402 [&](PairedEndWorkspace& work) -> void {
403 auto& state = work.state;
404 state = conhandler.initialize(); // reinitializing for simplicity and to avoid accumulation of reserved memory.
405 const auto& curreads1 = work.reads1;
406 const auto& curreads2 = work.reads2;
407 ReadIndex nreads = curreads1.size();
408
409 if constexpr(!Handler_::use_names) {
410 for (ReadIndex b = 0; b < nreads; ++b) {
411 conhandler.process(state, curreads1.get_sequence(b), curreads2.get_sequence(b));
412 }
413 } else {
414 for (ReadIndex b = 0; b < nreads; ++b) {
415 conhandler.process(
416 state,
417 curreads1.get_name(b),
418 curreads1.get_sequence(b),
419 curreads2.get_name(b),
420 curreads2.get_sequence(b)
421 );
422 }
423 }
424 },
425 options.num_threads
426 );
427
428 tp.run(
429 [&](PairedEndWorkspace& work) -> bool {
430 bool finished1 = false;
431 {
432 auto& curreads = work.reads1;
433 for (ReadIndex b = 0; b < options.block_size; ++b) {
434 if (!fastq1()) {
435 finished1 = true;
436 break;
437 }
438
439 curreads.add_read_sequence(fastq1.get_sequence());
440 if constexpr(Handler_::use_names) {
441 curreads.add_read_name(fastq1.get_name());
442 }
443 }
444 }
445
446 bool finished2 = false;
447 {
448 auto& curreads = work.reads2;
449 for (ReadIndex b = 0; b < options.block_size; ++b) {
450 if (!fastq2()) {
451 finished2 = true;
452 break;
453 }
454
455 curreads.add_read_sequence(fastq2.get_sequence());
456 if constexpr(Handler_::use_names) {
457 curreads.add_read_name(fastq2.get_name());
458 }
459 }
460 }
461
462 if (finished1 != finished2 || work.reads1.size() != work.reads2.size()) {
463 throw std::runtime_error("different number of reads in paired FASTQ files");
464 }
465 return finished1;
466 },
467 [&](PairedEndWorkspace& work) -> void {
468 handler.reduce(work.state);
469 work.reads1.clear(Handler_::use_names);
470 work.reads2.clear(Handler_::use_names);
471 }
472 );
473}
474
475}
476
477#endif
Defines the FastqReader class.
Stream reads from a FASTQ file.
Definition FastqReader.hpp:32
const std::vector< char > & get_sequence() const
Definition FastqReader.hpp:154
const std::vector< char > & get_name() const
Definition FastqReader.hpp:163
Namespace for the kaori barcode-matching library.
Definition BarcodePool.hpp:16
void process_single_end_data(Pointer_ input, Handler_ &handler, const ProcessSingleEndDataOptions &options)
Definition process_data.hpp:280
void process_paired_end_data(Pointer_ input1, Pointer_ input2, Handler_ &handler, const ProcessPairedEndDataOptions &options)
Definition process_data.hpp:391
Options for process_paired_end_data().
Definition process_data.hpp:334
std::size_t buffer_size
Definition process_data.hpp:344
std::size_t block_size
Definition process_data.hpp:350
int num_threads
Definition process_data.hpp:338
Options for process_single_end_data().
Definition process_data.hpp:227
std::size_t buffer_size
Definition process_data.hpp:237
std::size_t block_size
Definition process_data.hpp:243
int num_threads
Definition process_data.hpp:231