victimsnino/ReactivePlusPlus

libuv / uvw integration

Closed this issue ยท 10 comments

Hi @victimsnino,

i would like to come up with another request :)
As already mentioned before i replaced Qt with libuv (or specifically uvw, libuv's C++ wrapper).
Do you plan any further mainloop integrations (besides Qt)?

That would be great. Thanks a lot!

Hi @mincequi,

Thanks for request :)

To be honest, I'm not sure if I want to add inside rpp all possible integrations for all possible libraries ๐Ÿ˜…๐Ÿ˜…๐Ÿ˜… I expect, rpp should have easy to use interface/wrappers to enable any other external libraries in a few lines of code. But at least I can help with implementation for you.

Could you provide a bit more details about your request? (I mean some examples and etc) :)

Hi @victimsnino,
ok, i agree, that rpp cannot contain all possible event loop solutions like asio, qt, sdl, libuv, libev...

Currently, i am doing the following to synchronize two observables:

    _pvPower.get_observable().combine_latest([](int pvPower, int gridPower) {
        const auto now = std::chrono::system_clock::now();
        const auto seconds = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch()).count();
        return Site::SiteData { (int)seconds, pvPower, gridPower };
    }, _gridPower.get_observable())
    .debounce(std::chrono::milliseconds(cfg->valueOr<int>("site", Config::Key::debounce, 400)), rpp::schedulers::new_thread{})
    .observe_on(rppqt::schedulers::main_thread_scheduler{})
    .subscribe(_siteDataSubject.get_subscriber());

First of all, does this look sane to you? (It actually works perfectly with my Qt based application).
Instead of that, i now want to use uvw's event loop (with its timers) to schedule the timer for the debounce operator.

My assumption is, that a new scheduler has to be implemented for this. Is it really only about to adapt the timer creation from rppqt to a libuv based solution?

                const auto application = QCoreApplication::instance();
                if (!application)
                    throw utils::no_active_qapplication{"Pointer to application is null. Create QApplication before using main_thread_scheduler!"};

                QTimer::singleShot(std::chrono::duration_cast<std::chrono::milliseconds>(duration), application, [fn = std::forward<Fn>(fn), handler = std::forward<Handler>(handler), ... args = std::forward<Args>(args)]() mutable {
                    if (const auto new_duration = fn(handler, args...))
                        defer_for(new_duration->value, std::move(fn), std::move(handler), std::move(args)...);
                });

UPDATE: don't invest any time in that. I guess, i am almost there. However, i can share my code. Just in case you want to provide it :)

First of all, does this look sane to you? (It actually works perfectly with my Qt based application). Instead of that, i now want to use uvw's event loop (with its timers) to schedule the timer for the debounce operator.

Yeah, it looks totally fine for me

My assumption is, that a new scheduler has to be implemented for this. Is it really only about to adapt the timer creation from rppqt to a libuv based solution?

Yeah, actually it should have just implement 1 function to for scheduling of schedulables. But current qt's scheduler for v2 a bit outdated due to not handling all possible schedulables (for not it supports way to defer in 3 different ways

/**
* @brief Timepoint of next execution would be calculcated from NOW timpoint (time of returning from schedulable)
*
* @details Implementation looks like this
* \code{.cpp}
* const auto duration_from_now = schedulable();
* schedule(now() + duration_from_now, schedulable);
* \endcode
*/
struct delay_from_now
{
explicit delay_from_now(duration duration = {})
: value{duration}
{
}
duration value;
};
/**
* @brief Timepoint of next execution would be calculcated from timepoint of current scheduling
*
* @details Implementation looks like this
* \code{.cpp}
* const auto timepoint_for_schedulable = schedulable->get_timepoint();
* sleep_until(timepoint_for_schedulable);
* const auto duration_from_this_timepoint = schedulable();
* schedule(timepoint_for_schedulable + duration_from_this_timepoint, schedulable);
* \endcode
*/
struct delay_from_this_timepoint
{
explicit delay_from_this_timepoint(duration duration = {})
: value{duration}
{
}
duration value;
};
/**
* @brief Provide timepoint of next execution explicitly
*/
struct delay_to
{
explicit delay_to(time_point timepoint = {})
: value{timepoint}
{
}
time_point value;
};

I'm going to fix it in near future =)

However, i can share my code. Just in case you want to provide it :)

Sure, i want to see it =)

Ok, here we go. This is my working rpp-uvw integration:

rpp_uvw.h:

#pragma once

#include <rpp/schedulers/details/worker.hpp>

#include <uvw/loop.h>
#include <uvw/timer.h>

namespace rpp_uvw::schedulers {

class main_thread_scheduler final : public rpp::schedulers::details::scheduler_tag {
private:
    class worker_strategy;
    using main_thread_schedulable = rpp::schedulers::schedulable_wrapper<worker_strategy>;

    class worker_strategy {
    public:
        worker_strategy(const rpp::subscription_base& sub)
            : m_sub{sub} {}

        bool is_subscribed() const { return m_sub.is_subscribed(); }

        void defer_at(rpp::schedulers::time_point time_point, rpp::schedulers::constraint::schedulable_fn auto&& fn) const {
            defer_at(time_point, main_thread_schedulable{*this, time_point, std::forward<decltype(fn)>(fn)});
        }

        void defer_at(rpp::schedulers::time_point time_point, main_thread_schedulable&& fn) const {
            if (!m_sub.is_subscribed())
                return;

            auto timer = uvw::loop::get_default()->resource<uvw::timer_handle>();
            timer->on<uvw::timer_event>([fn = std::move(fn)](const auto&, auto &handle) {
                    const_cast<main_thread_schedulable&&>(fn)();
                    handle.stop();
                    handle.close();
                });
            const auto duration = std::max(std::chrono::milliseconds{0}, std::chrono::duration_cast<std::chrono::milliseconds>(time_point - now()));
            timer->start(duration, uvw::timer_handle::time{0});
        }

        static rpp::schedulers::time_point now() { return rpp::schedulers::clock_type::now(); }
    private:
        rpp::subscription_base m_sub;
    };

public:
    static auto create_worker(const rpp::subscription_base& sub = {}) {
        return rpp::schedulers::worker<worker_strategy>{sub};
    }
};

} // namespace rpp_uvw::schedulers

Oh, it is still v1 implementation =)
Anyway, it looks pretty correct.

Yes, since i depend on behavior_subject, i am still at v1. Once it is ready, i will switch :)

Yes, since i depend on behavior_subject, i am still at v1. Once it is ready, i will switch :)

ohhhhhh, can't understand how i've missed it O_O
Would add it in near future =)

@mincequi , behavior_subject added =)

@victimsnino awesome, will try it very soon.