1#ifndef KAORI_PROCESS_DATA_HPP
2#define KAORI_PROCESS_DATA_HPP
7#include <condition_variable>
26typedef std::size_t ReadIndex;
30 ChunkOfReads() : my_sequence_offset(1), my_name_offset(1) {}
32 void clear(
bool use_names) {
33 my_sequence_buffer.clear();
34 my_sequence_offset.resize(1);
36 my_name_buffer.clear();
37 my_name_offset.resize(1);
41 void add_read_sequence(
const std::vector<char>& sequence) {
42 add_read_details(sequence, my_sequence_buffer, my_sequence_offset);
45 void add_read_name(
const std::vector<char>& name) {
46 add_read_details(name, my_name_buffer, my_name_offset);
49 ReadIndex size()
const {
50 return my_sequence_offset.size() - 1;
53 std::pair<const char*, const char*> get_sequence(ReadIndex i)
const {
54 return get_details(i, my_sequence_buffer, my_sequence_offset);
57 std::pair<const char*, const char*> get_name(ReadIndex i)
const {
58 return get_details(i, my_name_buffer, my_name_offset);
62 std::vector<char> my_sequence_buffer;
63 std::vector<std::size_t> my_sequence_offset;
64 std::vector<char> my_name_buffer;
65 std::vector<std::size_t> my_name_offset;
67 static void add_read_details(
const std::vector<char>& src, std::vector<char>& dst, std::vector<std::size_t>& offset) {
68 dst.insert(dst.end(), src.begin(), src.end());
69 auto last = offset.back();
70 offset.push_back(last + src.size());
73 static std::pair<const char*, const char*> get_details(ReadIndex i,
const std::vector<char>& dest,
const std::vector<std::size_t>& offset) {
74 const char * base = dest.data();
75 return std::make_pair(base + offset[i], base + offset[i + 1]);
79template<
typename Workspace_>
82 template<
typename RunJob_>
83 ThreadPool(RunJob_ run_job,
int num_threads) : my_helpers(num_threads) {
85 std::condition_variable init_cv;
86 int num_initialized = 0;
88 my_threads.reserve(num_threads);
89 for (
int t = 0; t < num_threads; ++t) {
91 my_threads.emplace_back([run_job,
this,&init_mut,&init_cv,&num_initialized](
int thread) ->
void {
93 my_helpers[thread] = &env;
96 std::lock_guard lck(init_mut);
102 std::unique_lock lck(env.mut);
103 env.cv.wait(lck, [&]() ->
bool {
return env.input_ready; });
104 if (env.terminated) {
107 env.input_ready =
false;
112 std::lock_guard elck(my_error_mut);
114 my_error = std::current_exception();
118 env.has_output =
true;
119 env.available =
true;
128 std::unique_lock ilck(init_mut);
129 init_cv.wait(ilck, [&]() ->
bool {
return num_initialized == num_threads; });
134 for (
auto envptr : my_helpers) {
137 std::lock_guard lck(env.mut);
138 env.terminated =
true;
139 env.input_ready =
true;
143 for (
auto& thread : my_threads) {
149 std::vector<std::thread> my_threads;
153 std::condition_variable cv;
154 bool input_ready =
false;
155 bool available =
true;
156 bool has_output =
false;
157 bool terminated =
false;
160 std::vector<Helper*> my_helpers;
162 std::mutex my_error_mut;
163 std::exception_ptr my_error;
166 template<
typename CreateJob_,
typename MergeJob_>
167 void run(CreateJob_ create_job, MergeJob_ merge_job) {
168 auto num_threads = my_threads.size();
169 bool finished =
false;
170 decltype(num_threads) thread = 0, finished_count = 0;
175 auto& env = (*my_helpers[thread]);
176 std::unique_lock lck(env.mut);
177 env.cv.wait(lck, [&]() ->
bool {
return env.available; });
180 std::lock_guard elck(my_error_mut);
182 std::rethrow_exception(my_error);
185 env.available =
false;
187 if (env.has_output) {
189 env.has_output =
false;
195 if (finished_count == num_threads) {
206 finished = create_job(env.work);
207 env.input_ready =
true;
213 if (thread == num_threads) {
279template<
typename Po
inter_,
class Handler_>
281 struct SingleEndWorkspace {
283 decltype(handler.initialize()) state;
287 const Handler_& conhandler = handler;
289 ThreadPool<SingleEndWorkspace> tp(
290 [&](SingleEndWorkspace& work) ->
void {
291 auto& state = work.state;
292 state = conhandler.initialize();
293 const auto& curreads = work.reads;
294 auto nreads = curreads.size();
296 if constexpr(!Handler_::use_names) {
297 for (
decltype(nreads) b = 0; b < nreads; ++b) {
298 conhandler.process(state, curreads.get_sequence(b));
301 for (
decltype(nreads) b = 0; b < nreads; ++b) {
302 conhandler.process(state, curreads.get_name(b), curreads.get_sequence(b));
310 [&](SingleEndWorkspace& work) ->
bool {
311 auto& curreads = work.reads;
312 for (ReadIndex b = 0; b < options.
block_size; ++b) {
318 if constexpr(Handler_::use_names) {
319 curreads.add_read_name(fastq.
get_name());
324 [&](SingleEndWorkspace& work) ->
void {
325 handler.reduce(work.state);
326 work.reads.clear(Handler_::use_names);
390template<
class Po
inter_,
class Handler_>
392 struct PairedEndWorkspace {
393 ChunkOfReads reads1, reads2;
394 decltype(handler.initialize()) state;
399 const Handler_& conhandler = handler;
401 ThreadPool<PairedEndWorkspace> tp(
402 [&](PairedEndWorkspace& work) ->
void {
403 auto& state = work.state;
404 state = conhandler.initialize();
405 const auto& curreads1 = work.reads1;
406 const auto& curreads2 = work.reads2;
407 ReadIndex nreads = curreads1.size();
409 if constexpr(!Handler_::use_names) {
410 for (ReadIndex b = 0; b < nreads; ++b) {
411 conhandler.process(state, curreads1.get_sequence(b), curreads2.get_sequence(b));
414 for (ReadIndex b = 0; b < nreads; ++b) {
417 curreads1.get_name(b),
418 curreads1.get_sequence(b),
419 curreads2.get_name(b),
420 curreads2.get_sequence(b)
429 [&](PairedEndWorkspace& work) ->
bool {
430 bool finished1 =
false;
432 auto& curreads = work.reads1;
433 for (ReadIndex b = 0; b < options.
block_size; ++b) {
440 if constexpr(Handler_::use_names) {
441 curreads.add_read_name(fastq1.
get_name());
446 bool finished2 =
false;
448 auto& curreads = work.reads2;
449 for (ReadIndex b = 0; b < options.
block_size; ++b) {
456 if constexpr(Handler_::use_names) {
457 curreads.add_read_name(fastq2.
get_name());
462 if (finished1 != finished2 || work.reads1.size() != work.reads2.size()) {
463 throw std::runtime_error(
"different number of reads in paired FASTQ files");
467 [&](PairedEndWorkspace& work) ->
void {
468 handler.reduce(work.state);
469 work.reads1.clear(Handler_::use_names);
470 work.reads2.clear(Handler_::use_names);
Defines the FastqReader class.
Stream reads from a FASTQ file.
Definition FastqReader.hpp:32
const std::vector< char > & get_sequence() const
Definition FastqReader.hpp:154
const std::vector< char > & get_name() const
Definition FastqReader.hpp:163
Namespace for the kaori barcode-matching library.
Definition BarcodePool.hpp:16
void process_single_end_data(Pointer_ input, Handler_ &handler, const ProcessSingleEndDataOptions &options)
Definition process_data.hpp:280
void process_paired_end_data(Pointer_ input1, Pointer_ input2, Handler_ &handler, const ProcessPairedEndDataOptions &options)
Definition process_data.hpp:391
Options for process_paired_end_data().
Definition process_data.hpp:334
std::size_t buffer_size
Definition process_data.hpp:344
std::size_t block_size
Definition process_data.hpp:350
int num_threads
Definition process_data.hpp:338
Options for process_single_end_data().
Definition process_data.hpp:227
std::size_t buffer_size
Definition process_data.hpp:237
std::size_t block_size
Definition process_data.hpp:243
int num_threads
Definition process_data.hpp:231