Just read it and dispatch to the worker threads when appropriate.
Having no clue what kind of chunks you want to separately handle, let's assume
you read all of the prime numbers from https://www.mathsisfun.com/includes/primes-to-100k.zip, read them in chunks, then do some work on all the primes on separate threads.
What Is The Work?
Here's some lazy prime job:
void handle_batch(std::vector<size_t> params) {
if (!params.empty()) {
std::cout
<< "Batch n:" << params.size()
<< "Range [" << params.front() << ".." << params.back() << "]"
<< "Sum:" << std::accumulate(begin(params), end(params), 0ull)
<< std::endl;
}
}
Yeah, we just print a description of the job params and their sum. We can doodle a bit on it to make it more lifelike, like making it take some time, and being aware that we are on worker threads, so we want to synchronize access to the console.
void handle_batch(std::vector<size_t> params) {
std::mutex s_mx;
if (!params.empty()) {
// emulate some work, because I'm lazy
auto sum = std::accumulate(begin(params), end(params), 0ull);
// then wait some 100..200ms
{
using namespace std::chrono_literals;
std::mt19937 prng(std::random_device{}());
std::this_thread::sleep_for(
std::uniform_real_distribution<>(100,200)(prng)*1ms);
}
// simple thread id (thread::id displays ugly)
auto tid = std::hash<std::thread::id>{}(std::this_thread::get_id()) % 100;
// report results to stdout
std::lock_guard lk(s_mx); // make sure the output doesn't intermix
std::cout
<< "Thread #" << std::setw(2) << std::setfill('0') << tid
<< " Batch n:" << params.size()
<< "Range [" << params.front() << ".." << params.back() << "]"
<< "Sum:" << sum
<< std::endl;
}
}
Okay, that's enough gore for the unimportant bits.
The Plan
Well, there's a slight complication with my chosen approach, because not only is that site using https (ugh), also it is serving up ZIP files (ugh). And we're using C++ (ugh?).
At least, we can do the the whole SSL connect business synchronously in not too much code, we want the reading to be asynchronously, though, because that way we can demonstrate that
- you can do a lot of intermixed IO on just the main thread using Boost Asio
- same goes for Boost Process to launch
zcat
as a child process to unzip the primes content (we'll assume UNIX-like system with zcat
installed)
- which means we'll be asynchronously writing to that child process stdin
- and also asynchronously reading from its stdout
- spawning off batch jobs along the way as soon as they're ready
This should be pretty good model for your workload, because the workers take more time than the IO, however, we do many IO tasks on a single thread without blocking.
Let's Get The Data
As said, we will use a single thread for IO, and a thread pool for the batch workers:
int main() {
net::io_context io; // main thread does all io
net::thread_pool pool(6); // worker threads
There. That's a start. Now, we want to have a SSL connection, and request that ZIP. Here it is:
http::response_parser<http::buffer_body> res_reader;
beast::flat_buffer lookahead; // for the res_reader
std::array<char,512> buf{0}; // for download content
auto ctx = ssl_context();
ssl::stream<tcp::socket> s(io, ctx);
{ // synchronously write request
std::string host = "www.mathsisfun.com";
connect_https(s, host, tcp::resolver{io}.resolve(host, "https"));
http::write(s, get_request(host, "/includes/primes-to-100k.zip"));
http::read_header(s, lookahead, res_reader);
//std::cerr << "Headers: " << res_reader.get().base() << std::endl;
}
Yup, that already did the reading of the response headers1. Of course we cheated because we need three helpers:
making an ssl context
auto ssl_context() {
ssl::context ctx{ssl::context::sslv23};
ctx.set_default_verify_paths();
ctx.set_verify_mode(ssl::verify_peer);
return ctx;
}
connecting over SSL
void connect_https(stream& s, std::string const& host, tcp::resolver::iterator eps) {
net::connect(s.lowest_layer(), eps);
s.lowest_layer().set_option(tcp::no_delay(true));
if (!SSL_set_tlsext_host_name(s.native_handle(), host.c_str())) {
throw system_error{ { (int)::ERR_get_error(), net::error::get_ssl_category() } };
}
s.handshake(stream::handshake_type::client);
}
making the HTTP request
auto get_request(std::string const& host, std::string const& path) {
using namespace http;
request<string_body> req;
req.version(11);
req.method(verb::get);
req.target("https://" + host + path);
req.set(field::user_agent, "test");
req.set(field::host, host);
std::cerr << req << std::endl;
return req;
}
Not bad, for C++.
Pipe It Into zcat
Now we start with the asynchrony: let's have a "pump" or "loop" that sends all the response data into a pipe:
// now, asynchoronusly read contents
process::async_pipe pipe_to_zcat(io);
std::function<void(error_code, size_t)> receive_zip;
receive_zip
is what we call our loop. It's a self-chaining asynchronous operation. So, everytime it is called, it will pump some data into the pipe, and call one more async_read
for the HTTP response:
receive_zip = [&s, &response_reader, &pipe_to_zcat, &buf, &lookahead, &receive_zip]
(error_code ec, size_t /*ignore_this*/)
{
auto& res = response_reader.get();
auto& body = res.body();
if (body.data) {
auto n = sizeof(buf) - body.size;
net::write(pipe_to_zcat, net::buffer(buf, n));
}
bool done = ec && !(ec == http::error::need_buffer);
done += response_reader.is_done();
if (done) {
std::cerr << "receive_zip: " << ec.message() << std::endl;
pipe_to_zcat.close();
} else {
body.data = buf.data();
body.size = buf.size();
http::async_read(s, lookahead, response_reader, receive_zip);
}
};
This slightly complicated looking reading of a buffered response is almost literally from the documentation here.
Now, all we have to do is prime the pump:
// kick off receive loop
receive_zip(error_code{}, 0);
Intermezzo, Unzip
This is not the interesting part, let's go: We are launching a subprocess zcat
and want a second pipe to read the output from:
process::async_pipe zcat_output(io);
process::child zcat(
process::search_path("zcat"),
process::std_in < pipe_to_zcat,
process::std_out > zcat_output,
process::on_exit([](int exitcode, std::error_code ec) {
std::cerr << "Child process exited with " << exitcode << " (" << ec.message() << ")
";
}), io);
End of intermission :)
(We even threw in error reporting because, why not?)
Ah, The Good Stuff: Primes On Tap!
Now, we have another async read loop, this time to read back the uncompressed primes. This is where we will assemble batch jobs to be handled on the worker pool.
std::function<void(error_code, size_t)> receive_primes;
net::streambuf sb;
Like receive_zip
before, receive_primes
is our loop driver, the sb
buffer is just a buffer which makes it easy to read using std::istream
as you'd normally do from std::cin
.
receive_primes = [&zcat_output, &sb, &receive_primes, &pool](error_code ec, size_t /*transferred*/) {
{
std::istream is(&sb);
size_t n = std::count(net::buffers_begin(sb.data()), net::buffers_end(sb.data()), '
');
std::vector<size_t> batch(n);
std::copy_n(std::istream_iterator<size_t>(is), n, batch.begin());
is.ignore(1, '
'); // we know a newline is pending, eat it to keep invariant
post(pool, std::bind(handle_batch, std::move(batch)));
}
if (ec) {
std::cerr << "receive_primes: " << ec.message() << std::endl;
zcat_output.close();
} else {
net::async_read_until(zcat_output, sb, "
", receive_primes);
}
};
Because async_read_until
may read partial lines, we count the number (n
) of full lines in the buffer and pack them into a vector. After we make sure that we eat the impending newline, we ... post the batch job, finally:
post(pool, std::bind(handle_batch, std::move(batch)));
We move ownership to the task because it will run on a separate thread, and the best way to handle concurrency is to minimize sharing.
Again, priming the pump:
// kick off handler loop as well:
receive_primes(error_code{}, 0);
PUTTING IT ALL TOGETHER
Well. Prepare for the anticlimax. With all the async chains setup, all we need to do is... wait.
io.run();
pool.join();
} // end of main
The io.run()
keeps running both pumps and awaits the child process, all on the main thread, as we like.
The pool.join()
waits for all batch jobs to be completed, before stopping the thread pool. If you leave out that line, you might not run all the tasks, because the destructor of thread_pool
calls stop()
before it calls join()
.
Toy around with the buffer size (512 bytes in my example) to see how large batches become. Note that 512 bytes is compressed bytes.
"UNLIVE" DEMO
Sadly no online compiler that I know of supports external network access, so you'll have to run this one yourself. For convenience, here's a full listing, and sample output from a run on my computer:
Live On Coliru
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/beast.hpp>
#include <boost/beast/http.hpp>
#include <boost/process.hpp>
#include <boost/process/async.hpp>
#include <iomanip>
#include <iostream>
void handle_batch(std: