1#ifndef KAORI_PROCESS_DATA_HPP
2#define KAORI_PROCESS_DATA_HPP
7#include <condition_variable>
26typedef std::size_t ReadIndex;
30 ChunkOfReads() : my_sequence_offset(1), my_name_offset(1) {}
32 void clear(
bool use_names) {
33 my_sequence_buffer.clear();
34 my_sequence_offset.resize(1);
36 my_name_buffer.clear();
37 my_name_offset.resize(1);
41 void add_read_sequence(
const std::vector<char>& sequence) {
42 add_read_details(sequence, my_sequence_buffer, my_sequence_offset);
45 void add_read_name(
const std::vector<char>& name) {
46 add_read_details(name, my_name_buffer, my_name_offset);
49 ReadIndex size()
const {
50 return my_sequence_offset.size() - 1;
53 std::pair<const char*, const char*> get_sequence(ReadIndex i)
const {
54 return get_details(i, my_sequence_buffer, my_sequence_offset);
57 std::pair<const char*, const char*> get_name(ReadIndex i)
const {
58 return get_details(i, my_name_buffer, my_name_offset);
62 std::vector<char> my_sequence_buffer;
63 std::vector<std::size_t> my_sequence_offset;
64 std::vector<char> my_name_buffer;
65 std::vector<std::size_t> my_name_offset;
67 static void add_read_details(
const std::vector<char>& src, std::vector<char>& dst, std::vector<std::size_t>& offset) {
68 dst.insert(dst.end(), src.begin(), src.end());
69 auto last = offset.back();
70 offset.push_back(last + src.size());
73 static std::pair<const char*, const char*> get_details(ReadIndex i,
const std::vector<char>& dest,
const std::vector<std::size_t>& offset) {
74 const char * base = dest.data();
75 return std::make_pair(base + offset[i], base + offset[i + 1]);
79template<
typename Workspace_>
82 template<
typename RunJob_>
83 ThreadPool(RunJob_ run_job,
int num_threads) : my_helpers(num_threads) {
85 std::condition_variable init_cv;
86 int num_initialized = 0;
88 my_threads.reserve(num_threads);
89 for (
int t = 0; t < num_threads; ++t) {
91 my_threads.emplace_back([run_job,
this,&init_mut,&init_cv,&num_initialized](
int thread) ->
void {
93 my_helpers[thread] = &env;
95 std::lock_guard lck(init_mut);
101 std::unique_lock lck(env.mut);
102 env.cv.wait(lck, [&]() ->
bool {
return env.input_ready; });
103 if (env.terminated) {
106 env.input_ready =
false;
111 std::lock_guard elck(my_error_mut);
113 my_error = std::current_exception();
117 env.has_output =
true;
118 env.available =
true;
126 std::unique_lock ilck(init_mut);
127 init_cv.wait(ilck, [&]() ->
bool {
return num_initialized == num_threads; });
132 for (
auto envptr : my_helpers) {
135 std::lock_guard lck(env.mut);
136 env.terminated =
true;
137 env.input_ready =
true;
141 for (
auto& thread : my_threads) {
147 std::vector<std::thread> my_threads;
151 std::condition_variable cv;
152 bool input_ready =
false;
153 bool available =
true;
154 bool has_output =
false;
155 bool terminated =
false;
158 std::vector<Helper*> my_helpers;
160 std::mutex my_error_mut;
161 std::exception_ptr my_error;
164 template<
typename CreateJob_,
typename MergeJob_>
165 void run(CreateJob_ create_job, MergeJob_ merge_job) {
166 auto num_threads = my_threads.size();
167 bool finished =
false;
168 decltype(num_threads) thread = 0, finished_count = 0;
173 auto& env = (*my_helpers[thread]);
174 std::unique_lock lck(env.mut);
175 env.cv.wait(lck, [&]() ->
bool {
return env.available; });
178 std::lock_guard elck(my_error_mut);
180 std::rethrow_exception(my_error);
183 env.available =
false;
185 if (env.has_output) {
187 env.has_output =
false;
193 if (finished_count == num_threads) {
204 finished = create_job(env.work);
205 env.input_ready =
true;
211 if (thread == num_threads) {
270template<
typename Po
inter_,
class Handler_>
272 struct SingleEndWorkspace {
274 decltype(handler.initialize()) state;
278 const Handler_& conhandler = handler;
280 ThreadPool<SingleEndWorkspace> tp(
281 [&](SingleEndWorkspace& work) ->
void {
282 auto& state = work.state;
283 state = conhandler.initialize();
284 const auto& curreads = work.reads;
285 auto nreads = curreads.size();
287 if constexpr(!Handler_::use_names) {
288 for (
decltype(nreads) b = 0; b < nreads; ++b) {
289 conhandler.process(state, curreads.get_sequence(b));
292 for (
decltype(nreads) b = 0; b < nreads; ++b) {
293 conhandler.process(state, curreads.get_name(b), curreads.get_sequence(b));
301 [&](SingleEndWorkspace& work) ->
bool {
302 auto& curreads = work.reads;
303 for (ReadIndex b = 0; b < options.
block_size; ++b) {
309 if constexpr(Handler_::use_names) {
310 curreads.add_read_name(fastq.
get_name());
315 [&](SingleEndWorkspace& work) ->
void {
316 handler.reduce(work.state);
317 work.reads.clear(Handler_::use_names);
374template<
class Po
inter_,
class Handler_>
376 struct PairedEndWorkspace {
377 ChunkOfReads reads1, reads2;
378 decltype(handler.initialize()) state;
383 const Handler_& conhandler = handler;
385 ThreadPool<PairedEndWorkspace> tp(
386 [&](PairedEndWorkspace& work) ->
void {
387 auto& state = work.state;
388 state = conhandler.initialize();
389 const auto& curreads1 = work.reads1;
390 const auto& curreads2 = work.reads2;
391 ReadIndex nreads = curreads1.size();
393 if constexpr(!Handler_::use_names) {
394 for (ReadIndex b = 0; b < nreads; ++b) {
395 conhandler.process(state, curreads1.get_sequence(b), curreads2.get_sequence(b));
398 for (ReadIndex b = 0; b < nreads; ++b) {
401 curreads1.get_name(b),
402 curreads1.get_sequence(b),
403 curreads2.get_name(b),
404 curreads2.get_sequence(b)
413 [&](PairedEndWorkspace& work) ->
bool {
414 bool finished1 =
false;
416 auto& curreads = work.reads1;
417 for (ReadIndex b = 0; b < options.
block_size; ++b) {
424 if constexpr(Handler_::use_names) {
425 curreads.add_read_name(fastq1.
get_name());
430 bool finished2 =
false;
432 auto& curreads = work.reads2;
433 for (ReadIndex b = 0; b < options.
block_size; ++b) {
440 if constexpr(Handler_::use_names) {
441 curreads.add_read_name(fastq2.
get_name());
446 if (finished1 != finished2 || work.reads1.size() != work.reads2.size()) {
447 throw std::runtime_error(
"different number of reads in paired FASTQ files");
451 [&](PairedEndWorkspace& work) ->
void {
452 handler.reduce(work.state);
453 work.reads1.clear(Handler_::use_names);
454 work.reads2.clear(Handler_::use_names);
Defines the FastqReader class.
Stream reads from a FASTQ file.
Definition FastqReader.hpp:32
const std::vector< char > & get_sequence() const
Definition FastqReader.hpp:147
const std::vector< char > & get_name() const
Definition FastqReader.hpp:156
Namespace for the kaori barcode-matching library.
Definition BarcodePool.hpp:16
void process_single_end_data(Pointer_ input, Handler_ &handler, const ProcessSingleEndDataOptions &options)
Definition process_data.hpp:271
void process_paired_end_data(Pointer_ input1, Pointer_ input2, Handler_ &handler, const ProcessPairedEndDataOptions &options)
Definition process_data.hpp:375
Options for process_paired_end_data().
Definition process_data.hpp:325
std::size_t block_size
Definition process_data.hpp:334
int num_threads
Definition process_data.hpp:329
Options for process_single_end_data().
Definition process_data.hpp:225
std::size_t block_size
Definition process_data.hpp:234
int num_threads
Definition process_data.hpp:229