Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
453 views
in Technique[技术] by (71.8m points)

c++ - simultaneous read and write to child's stdio using boost.process

i am trying to write and read to child's stdio using boost.process using something like this:

boost::asio::io_service writeService, readService;
bp::async_pipe in{writeService};
bp::async_pipe out{readService};

bp::child process(CompressCmd.c_str(), bp::std_in < in, bp::std_out > out);
Buffer src;
src.reserve(4 * 1024 * 1024);
integer_type read = 0;
//std::atomic_int64_t totalWrite{0};
integer_type totalWrite = 0;
while (callback(CallbackActions::NeedMoreInput, src, read)) {
    in.async_write_some(
        boost::asio::buffer(src.data(), read),
        [](const boost::system::error_code &e, std::size_t) { });
    // written data is not important, that's why using same buffer
    out.async_read_some(boost::asio::buffer(src.data(), src.capacity()),
                        [&](const boost::system::error_code &e,
                           std::size_t byte_transferred) { totalWrite += byte_transferred; });
}
writeService.run();
in.close();
readService.run();

all read and write operations are noitified as success but value of totalWrite is totally incorrect f.e reported 29356032, real value should be around 50000000
i noticed the program is terminating half way ,
using process.wait() after readService.run() freezes child,
using atomic int produces same behaviour

for now i actually only need to know how much data is actually wriiten, that's why i am using same buffer

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)
  1. This pattern:

    while (callback(CallbackActions::NeedMoreInput, src, read)) {
        in.async_write_some(...);
        out.async_read_some(...);
    }
    

    is most likely misguided (async operations always immediately return, so you'd simply keep adding more async operations without giving them a chance to run).

  2. Also misguided is the fact that you have separate services for the pipes, but you're running them in total exclusion, so no read operation will ever run until the writeService completes.

  3. atomic types are misguided since there's no access from multiple threads

  4. What are you trying to do? You reserve a large buffer but never put any data into it (reserve != resize). Therefore you can only hope to write nothing.

    Even more ironically, you are reading into the exact same buffer, at the exact same spot. However, that's immediately Undefined Behaviour1 because you pass it src.capacity() when you know that src.size()==0.

    Even without that error how can you "simultaneously" read and write from exactly the same bytes in memory and still know what the expected outcome would be?

  5. you are not passing your own io_service to Boost Process

A Working Demo

Here's a working sample. Of course I had to guess what you actually want to do.

I opted to make the program send its own source (main.cpp) to stdin, and read stdout iteratively, recording the total_received bytes. It then prints the exit code and that total.

As a make-shift compressor I used '/usr/bin/xxd' because it's available and could even be usefully printed to std::cout for debugging.

Live On Coliru // trouble on Coliru

#include <boost/asio.hpp>
#include <boost/process.hpp>
#include <boost/process/async.hpp>
#include <iostream>
std::vector<char> read_file(std::string const&);

namespace bp = boost::process;
using boost::system::error_code;

using Loop = boost::function<void()>;
using Buffer = std::array<char, 4*1024>;

int main() {
    boost::asio::io_service svc;

    std::string const CompressCmd = "/usr/bin/xxd";

    bp::async_pipe in{svc}, out{svc};
    bp::child process(CompressCmd, bp::std_in < in, bp::std_out > out, svc);

    auto data = read_file("main.cpp");

    Loop read_loop, write_loop;

    Buffer recv_buffer;
    std::size_t total_received = 0;
    read_loop = [&read_loop, &out, &recv_buffer, &total_received] {
        out.async_read_some(boost::asio::buffer(recv_buffer),
            [&](error_code ec, size_t transferred) {
                std::cout << "ReadLoop: " << ec.message() << " got " << transferred << " bytes
";
                total_received += transferred; 
                if (!ec)
                    read_loop(); // continue reading
            });
    };

    boost::asio::async_write(in, boost::asio::buffer(data),
        [&](error_code ec, size_t transferred) {
            std::cout << "WriteLoop: " << ec.message() << " done, " << transferred << " bytes
";
            in.close(ec);
            std::cout << "WriteLoop: closed pipe (" << ec.message() << ")
";
        }); // async

    read_loop(); // async

    svc.run(); // Await all async operations

    std::cout << "Process exitcode " << process.exit_code() << ", total_received=" << total_received << "
";
}

#include <fstream>
#include <iterator>
std::vector<char> read_file(std::string const& fname) {
    std::ifstream ifs(fname);
    return {std::istreambuf_iterator<char>(ifs), {}};
}

Printing

WriteLoop: Success done, 1787 bytes
WriteLoop: closed pipe (Success)
ReadLoop: Success got 4096 bytes
ReadLoop: Success got 3515 bytes
ReadLoop: End of file got 0 bytes
Process exitcode 0, total_received=7611

Explanations, Simplifications

Note that we do all of the writing without a loop. That's because boost::asio::async_write is a composed operation (it hides the loop).

Likewise, if you can "afford" to store the whole received data in memory, you can simplify by using boost::asio::streambuf and using a similar composed operation:

Live On Coliru // trouble on Coliru

#include <boost/asio.hpp>
#include <boost/process.hpp>
#include <boost/process/async.hpp>
#include <iostream>
std::vector<char> read_file(std::string const&);

namespace bp = boost::process;
using boost::system::error_code;

int main() {
    boost::asio::io_service svc;

    std::string const CompressCmd = "/usr/bin/xxd";

    bp::async_pipe in{svc}, out{svc};
    bp::child process(CompressCmd, bp::std_in < in, bp::std_out > out, svc);

    auto data = read_file("main.cpp");

    boost::asio::streambuf recv_buffer;
    boost::asio::async_read(out, recv_buffer,
            [&](error_code ec, size_t transferred) {
                std::cout << "ReadLoop: " << ec.message() << " got " << transferred << " bytes
";
            });

    boost::asio::async_write(in, boost::asio::buffer(data),
        [&](error_code ec, size_t transferred) {
            std::cout << "WriteLoop: " << ec.message() << " done, " << transferred << " bytes
";
            in.close(ec);
            std::cout << "WriteLoop: closed pipe (" << ec.message() << ")
";
        }); // async

    svc.run(); // Await all async operations

    std::cout << "Process exitcode " << process.exit_code() << ", total_received=" << recv_buffer.size() << "
";
}

#include <fstream>
#include <iterator>
std::vector<char> read_file(std::string const& fname) {
    std::ifstream ifs(fname);
    return {std::istreambuf_iterator<char>(ifs), {}};
}

Conversely, if you cannot afford to have all the data in memory before sending, you can create a loop to send input block-wise

Two Asynchronous Loops, With Delays

Let's do that, and make it more entertaining by delaying a second before writing each block. What you'd expect to see is alternating reads/writes happening because of the delays:

Live On Coliru // yay running on Coliru

#include <boost/asio.hpp>
#include <boost/asio/high_resolution_timer.hpp>
#include <boost/process.hpp>
#include <boost/process/async.hpp>
#include <iostream>
#include <fstream>

namespace bp = boost::process;
using boost::system::error_code;
using namespace std::chrono_literals;

using Loop = boost::function<void()>;
using Buffer = std::array<char, 500>;

int main() {
    boost::asio::io_service svc;
    auto on_exit = [](int code, std::error_code ec) {
            std::cout << "Exited " << code << " (" << ec.message() << ")
";
        };

    std::string const CompressCmd = "/usr/bin/xxd";

    bp::async_pipe in{svc}, out{svc};
    bp::child process(CompressCmd, bp::std_in < in, bp::std_out > out, svc, bp::on_exit(on_exit));

    Loop read_loop, write_loop;

    Buffer recv_buffer;
    std::size_t total_received = 0;
    read_loop = [&read_loop, &out, &recv_buffer, &total_received] {
        out.async_read_some(boost::asio::buffer(recv_buffer),
            [&](error_code ec, size_t transferred) {
                std::cout << "ReadLoop: " << ec.message() << " got " << transferred << " bytes
";
                total_received += transferred; 
                if (!ec)
                    read_loop(); // continue reading
            });
    };

    std::ifstream ifs("main.cpp");
    std::size_t total_written = 0;
    Buffer send_buffer;
    boost::asio::high_resolution_timer send_delay(svc);
    write_loop = [&write_loop, &in, &ifs, &send_buffer, &total_written, &send_delay] {
        if (!ifs.good())
        {
            error_code ec;
            in.close(ec);
            std::cout << "WriteLoop: closed stdin (" << ec.message() << ")
";
            return;
        }
        ifs.read(send_buffer.data(), send_buffer.size());

        boost::asio::async_write(in, boost::asio::buffer(send_buffer.data(), ifs.gcount()),
            [&](error_code ec, size_t transferred) {
                std::cout << "WriteLoop: " << ec.message() << " sent " << transferred << " bytes
";
                total_written += transferred; 
                if (!ec) {
                    send_delay.expires_from_now(1s);
                    send_delay.async_wait([&write_loop](error_code ec) {
                        std::cout << "WriteLoop: send delay " << ec.message() << "
";
                        if (!ec) write_loop(); // continue writing
                    });
                }
            });
    };

    read_loop(); // async
    write_loop(); // async

    svc.run(); // Await all async operations

    std::cout << "Process exitcode " << process.exit_code() << ", total_received=" << total_received << "
";
}

Prints

WriteLoop: Success sent 500 bytes
WriteLoop: send delay Success
WriteLoop: Success sent 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 96 bytes
WriteLoop: send delay Success
WriteLoop: Success sent 500 bytes
WriteLoop: send delay Success
WriteLoop: Success sent 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 96 bytes
WriteLoop: send delay Success
WriteLoop: Success sent 500 bytes
WriteLoop: send delay Success
WriteLoop: Success sent 134 bytes
WriteLoop: send delay Success
WriteLoop: closed stdin (Success)
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 500 bytes
ReadLoop: Success got 22 bytes
Exited 0 (Success)
ReadLoop: End of file got 0 bytes
Process exitcode 0, total_received=11214

1 perhaps just unspecified, I'm not inclined to find out the difference right now


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...