kaori
A C++ library for barcode extraction and matching
Loading...
Searching...
No Matches
process_data.hpp
Go to the documentation of this file.
1#ifndef KAORI_PROCESS_DATA_HPP
2#define KAORI_PROCESS_DATA_HPP
3
4#include <thread>
5#include "FastqReader.hpp"
6#include "byteme/Reader.hpp"
7
14namespace kaori {
15
19template<bool use_names>
20struct ChunkOfReads {
21 ChunkOfReads() : sequence_offset(1), name_offset(1) {} // zero is always the first element.
22
23 void clear() {
24 sequence_buffer.clear();
25 sequence_offset.resize(1);
26 if constexpr(use_names) {
27 name_buffer.clear();
28 name_offset.resize(1);
29 }
30 }
31
32 void add_read_sequence(const std::vector<char>& sequence) {
33 add_read_details(sequence, sequence_buffer, sequence_offset);
34 }
35
36 void add_read_name(const std::vector<char>& name) {
37 add_read_details(name, name_buffer, name_offset);
38 }
39
40 size_t size() const {
41 return sequence_offset.size() - 1;
42 }
43
44 std::pair<const char*, const char*> get_sequence(size_t i) const {
45 return get_details(i, sequence_buffer, sequence_offset);
46 }
47
48 std::pair<const char*, const char*> get_name(size_t i) const {
49 return get_details(i, name_buffer, name_offset);
50 }
51
52private:
53 std::vector<char> sequence_buffer;
54 std::vector<size_t> sequence_offset;
55 std::vector<char> name_buffer;
56 std::vector<size_t> name_offset;
57
58 static void add_read_details(const std::vector<char>& src, std::vector<char>& dst, std::vector<size_t>& offset) {
59 dst.insert(dst.end(), src.begin(), src.end());
60 auto last = offset.back();
61 offset.push_back(last + src.size());
62 }
63
64 static std::pair<const char*, const char*> get_details(size_t i, const std::vector<char>& dest, const std::vector<size_t>& offset) {
65 const char * base = dest.data();
66 return std::make_pair(base + offset[i], base + offset[i + 1]);
67 }
68};
105template<class Handler>
106void process_single_end_data(byteme::Reader* input, Handler& handler, int num_threads = 1, int block_size = 100000) {
107 FastqReader fastq(input);
108 bool finished = false;
109
110 std::vector<ChunkOfReads<Handler::use_names> > reads(num_threads);
111 std::vector<std::thread> jobs(num_threads);
112 std::vector<decltype(handler.initialize())> states(num_threads);
113 std::vector<std::string> errs(num_threads);
114
115 auto join = [&](int i) -> void {
116 if (jobs[i].joinable()) {
117 jobs[i].join();
118 if (errs[i] != "") {
119 throw std::runtime_error(errs[i]);
120 }
121 handler.reduce(states[i]);
122 reads[i].clear();
123 }
124 };
125
126 // Safety measure to enforce const-ness within each thread.
127 const Handler& conhandler = handler;
128
129 try {
130 while (!finished) {
131 for (int t = 0; t < num_threads; ++t) {
132 join(t);
133
134 auto& curreads = reads[t];
135 for (int b = 0; b < block_size; ++b) {
136 if (!fastq()) {
137 finished = true;
138 break;
139 }
140
141 curreads.add_read_sequence(fastq.get_sequence());
142 if constexpr(Handler::use_names) {
143 curreads.add_read_name(fastq.get_name());
144 }
145 }
146
147 states[t] = handler.initialize();
148 jobs[t] = std::thread([&](int i) -> void {
149 try {
150 auto& state = states[i];
151 const auto& curreads = reads[i];
152 size_t nreads = curreads.size();
153
154 if constexpr(!Handler::use_names) {
155 for (size_t b = 0; b < nreads; ++b) {
156 conhandler.process(state, curreads.get_sequence(b));
157 }
158 } else {
159 for (size_t b = 0; b < nreads; ++b) {
160 conhandler.process(state, curreads.get_name(b), curreads.get_sequence(b));
161 }
162 }
163 } catch (std::exception& e) {
164 errs[i] = std::string(e.what());
165 }
166 }, t);
167
168 if (finished) {
169 // We won't get a future iteration to join the previous threads
170 // that we kicked off, so we do it now.
171 for (int u = 0; u < num_threads; ++u) {
172 auto pos = (u + t + 1) % num_threads;
173 join(pos);
174 }
175 break;
176 }
177 }
178 }
179 } catch (std::exception& e) {
180 // Mopping up any loose threads, so to speak.
181 for (int t = 0; t < num_threads; ++t) {
182 if (jobs[t].joinable()) {
183 jobs[t].join();
184 }
185 }
186 throw;
187 }
188
189 return;
190}
191
224template<class Handler>
225void process_paired_end_data(byteme::Reader* input1, byteme::Reader* input2, Handler& handler, int num_threads = 1, int block_size = 100000) {
226 FastqReader fastq1(input1);
227 FastqReader fastq2(input2);
228 bool finished = false;
229
230 std::vector<ChunkOfReads<Handler::use_names> > reads1(num_threads), reads2(num_threads);
231 std::vector<std::thread> jobs(num_threads);
232 std::vector<decltype(handler.initialize())> states(num_threads);
233 std::vector<std::string> errs(num_threads);
234
235 auto join = [&](int i) -> void {
236 if (jobs[i].joinable()) {
237 jobs[i].join();
238 if (errs[i] != "") {
239 throw std::runtime_error(errs[i]);
240 }
241 handler.reduce(states[i]);
242 reads1[i].clear();
243 reads2[i].clear();
244 }
245 };
246
247 try {
248 while (!finished) {
249 for (int t = 0; t < num_threads; ++t) {
250 join(t);
251
252 bool finished1 = false;
253 {
254 auto& curreads = reads1[t];
255 for (int b = 0; b < block_size; ++b) {
256 if (!fastq1()) {
257 finished1 = true;
258 break;
259 }
260
261 curreads.add_read_sequence(fastq1.get_sequence());
262 if constexpr(Handler::use_names) {
263 curreads.add_read_name(fastq1.get_name());
264 }
265 }
266 }
267
268 bool finished2 = false;
269 {
270 auto& curreads = reads2[t];
271 for (int b = 0; b < block_size; ++b) {
272 if (!fastq2()) {
273 finished2 = true;
274 break;
275 }
276
277 curreads.add_read_sequence(fastq2.get_sequence());
278 if constexpr(Handler::use_names) {
279 curreads.add_read_name(fastq2.get_name());
280 }
281 }
282 }
283
284 if (finished1 != finished2 || reads1[t].size() != reads2[t].size()) {
285 throw std::runtime_error("different number of reads in paired FASTQ files");
286 } else if (finished1) {
287 finished = true;
288 }
289
290 states[t] = handler.initialize();
291 jobs[t] = std::thread([&](int i) -> void {
292 auto& state = states[i];
293 const auto& curreads1 = reads1[i];
294 const auto& curreads2 = reads2[i];
295 size_t nreads = curreads1.size();
296
297 try {
298 if constexpr(!Handler::use_names) {
299 for (size_t b = 0; b < nreads; ++b) {
300 handler.process(state, curreads1.get_sequence(b), curreads2.get_sequence(b));
301 }
302 } else {
303 for (size_t b = 0; b < nreads; ++b) {
304 handler.process(
305 state,
306 curreads1.get_name(b),
307 curreads1.get_sequence(b),
308 curreads2.get_name(b),
309 curreads2.get_sequence(b)
310 );
311 }
312 }
313 } catch (std::exception& e) {
314 errs[i] = std::string(e.what());
315 }
316 }, t);
317
318 if (finished) {
319 // We won't get a future iteration to join the previous threads
320 // that we kicked off, so we do it now.
321 for (int u = 0; u < num_threads; ++u) {
322 auto pos = (u + t + 1) % num_threads;
323 join(pos);
324 }
325 break;
326 }
327 }
328 }
329 } catch (std::exception& e) {
330 // Mopping up any loose threads, so to speak.
331 for (int t = 0; t < num_threads; ++t) {
332 if (jobs[t].joinable()) {
333 jobs[t].join();
334 }
335 }
336 throw;
337 }
338
339 return;
340}
341
342
343}
344
345#endif
Defines the FastqReader class.
Stream reads from a FASTQ file.
Definition FastqReader.hpp:24
const std::vector< char > & get_sequence() const
Definition FastqReader.hpp:132
const std::vector< char > & get_name() const
Definition FastqReader.hpp:140
Namespace for the kaori barcode-matching library.
Definition BarcodePool.hpp:13
void process_single_end_data(byteme::Reader *input, Handler &handler, int num_threads=1, int block_size=100000)
Definition process_data.hpp:106
void process_paired_end_data(byteme::Reader *input1, byteme::Reader *input2, Handler &handler, int num_threads=1, int block_size=100000)
Definition process_data.hpp:225