1#ifndef KAORI_PROCESS_DATA_HPP
2#define KAORI_PROCESS_DATA_HPP
6#include "byteme/Reader.hpp"
19template<
bool use_names>
21 ChunkOfReads() : sequence_offset(1), name_offset(1) {}
24 sequence_buffer.clear();
25 sequence_offset.resize(1);
26 if constexpr(use_names) {
28 name_offset.resize(1);
32 void add_read_sequence(
const std::vector<char>& sequence) {
33 add_read_details(sequence, sequence_buffer, sequence_offset);
36 void add_read_name(
const std::vector<char>& name) {
37 add_read_details(name, name_buffer, name_offset);
41 return sequence_offset.size() - 1;
44 std::pair<const char*, const char*> get_sequence(
size_t i)
const {
45 return get_details(i, sequence_buffer, sequence_offset);
48 std::pair<const char*, const char*> get_name(
size_t i)
const {
49 return get_details(i, name_buffer, name_offset);
53 std::vector<char> sequence_buffer;
54 std::vector<size_t> sequence_offset;
55 std::vector<char> name_buffer;
56 std::vector<size_t> name_offset;
58 static void add_read_details(
const std::vector<char>& src, std::vector<char>& dst, std::vector<size_t>& offset) {
59 dst.insert(dst.end(), src.begin(), src.end());
60 auto last = offset.back();
61 offset.push_back(last + src.size());
64 static std::pair<const char*, const char*> get_details(
size_t i,
const std::vector<char>& dest,
const std::vector<size_t>& offset) {
65 const char * base = dest.data();
66 return std::make_pair(base + offset[i], base + offset[i + 1]);
105template<
class Handler>
108 bool finished =
false;
110 std::vector<ChunkOfReads<Handler::use_names> > reads(num_threads);
111 std::vector<std::thread> jobs(num_threads);
112 std::vector<
decltype(handler.initialize())> states(num_threads);
113 std::vector<std::string> errs(num_threads);
115 auto join = [&](
int i) ->
void {
116 if (jobs[i].joinable()) {
119 throw std::runtime_error(errs[i]);
121 handler.reduce(states[i]);
127 const Handler& conhandler = handler;
131 for (
int t = 0; t < num_threads; ++t) {
134 auto& curreads = reads[t];
135 for (
int b = 0; b < block_size; ++b) {
142 if constexpr(Handler::use_names) {
143 curreads.add_read_name(fastq.
get_name());
147 states[t] = handler.initialize();
148 jobs[t] = std::thread([&](
int i) ->
void {
150 auto& state = states[i];
151 const auto& curreads = reads[i];
152 size_t nreads = curreads.size();
154 if constexpr(!Handler::use_names) {
155 for (
size_t b = 0; b < nreads; ++b) {
156 conhandler.process(state, curreads.get_sequence(b));
159 for (
size_t b = 0; b < nreads; ++b) {
160 conhandler.process(state, curreads.get_name(b), curreads.get_sequence(b));
163 }
catch (std::exception& e) {
164 errs[i] = std::string(e.what());
171 for (
int u = 0; u < num_threads; ++u) {
172 auto pos = (u + t + 1) % num_threads;
179 }
catch (std::exception& e) {
181 for (
int t = 0; t < num_threads; ++t) {
182 if (jobs[t].joinable()) {
224template<
class Handler>
225void process_paired_end_data(byteme::Reader* input1, byteme::Reader* input2, Handler& handler,
int num_threads = 1,
int block_size = 100000) {
228 bool finished =
false;
230 std::vector<ChunkOfReads<Handler::use_names> > reads1(num_threads), reads2(num_threads);
231 std::vector<std::thread> jobs(num_threads);
232 std::vector<
decltype(handler.initialize())> states(num_threads);
233 std::vector<std::string> errs(num_threads);
235 auto join = [&](
int i) ->
void {
236 if (jobs[i].joinable()) {
239 throw std::runtime_error(errs[i]);
241 handler.reduce(states[i]);
249 for (
int t = 0; t < num_threads; ++t) {
252 bool finished1 =
false;
254 auto& curreads = reads1[t];
255 for (
int b = 0; b < block_size; ++b) {
262 if constexpr(Handler::use_names) {
263 curreads.add_read_name(fastq1.
get_name());
268 bool finished2 =
false;
270 auto& curreads = reads2[t];
271 for (
int b = 0; b < block_size; ++b) {
278 if constexpr(Handler::use_names) {
279 curreads.add_read_name(fastq2.
get_name());
284 if (finished1 != finished2 || reads1[t].size() != reads2[t].size()) {
285 throw std::runtime_error(
"different number of reads in paired FASTQ files");
286 }
else if (finished1) {
290 states[t] = handler.initialize();
291 jobs[t] = std::thread([&](
int i) ->
void {
292 auto& state = states[i];
293 const auto& curreads1 = reads1[i];
294 const auto& curreads2 = reads2[i];
295 size_t nreads = curreads1.size();
298 if constexpr(!Handler::use_names) {
299 for (
size_t b = 0; b < nreads; ++b) {
300 handler.process(state, curreads1.get_sequence(b), curreads2.get_sequence(b));
303 for (
size_t b = 0; b < nreads; ++b) {
306 curreads1.get_name(b),
307 curreads1.get_sequence(b),
308 curreads2.get_name(b),
309 curreads2.get_sequence(b)
313 }
catch (std::exception& e) {
314 errs[i] = std::string(e.what());
321 for (
int u = 0; u < num_threads; ++u) {
322 auto pos = (u + t + 1) % num_threads;
329 }
catch (std::exception& e) {
331 for (
int t = 0; t < num_threads; ++t) {
332 if (jobs[t].joinable()) {
Defines the FastqReader class.
Stream reads from a FASTQ file.
Definition FastqReader.hpp:24
const std::vector< char > & get_sequence() const
Definition FastqReader.hpp:132
const std::vector< char > & get_name() const
Definition FastqReader.hpp:140
Namespace for the kaori barcode-matching library.
Definition BarcodePool.hpp:13
void process_single_end_data(byteme::Reader *input, Handler &handler, int num_threads=1, int block_size=100000)
Definition process_data.hpp:106
void process_paired_end_data(byteme::Reader *input1, byteme::Reader *input2, Handler &handler, int num_threads=1, int block_size=100000)
Definition process_data.hpp:225