#pragma once #include "inc.h" #include "UVDelayer.h" #include #include namespace panda { namespace unievent { namespace backend { namespace uv { struct UVLoop : LoopImpl, IMetricsProvider { uv_loop_t* uvloop; UVLoop (Type type) : _delayer(this), _running(false) { switch (type) { case Type::GLOBAL: uvloop = uv_default_loop(); if (!uvloop) throw Error("[UVLoop] uv_default_loop() couldn't create a loop"); break; case Type::LOCAL: case Type::DEFAULT: uvloop = &_uvloop_body; int err = uv_loop_init(uvloop); if (err) throw Error(uvx_error(err)); } uvloop->data = this; } ~UVLoop () { _delayer.destroy(); _la_metrics.reset(); run(RunMode::NOWAIT); // finish all closing handles run(RunMode::NOWAIT); // finish all closing handles int err = uv_loop_close(uvloop); assert(!err); // unievent should have closed all handles } uint64_t now () const override { return uv_now(uvloop); } void update_time () override { uv_update_time(uvloop); } bool alive () const override { return uv_loop_alive(uvloop) != 0; } bool _run (RunMode mode) override { struct AutoResetFlag { bool& flag; AutoResetFlag(bool& flag) : flag(flag) {flag = true;} ~AutoResetFlag() {flag = false;} }; if (_running) { throw panda::exception("Loop::run recursion"); } AutoResetFlag autoflag(_running); uvloop->stop_flag = 0; // fix bug when UV immediately exits run() if stop() was called before run() switch (mode) { case RunMode::DEFAULT : return uv_run(uvloop, UV_RUN_DEFAULT); case RunMode::ONCE : return uv_run(uvloop, UV_RUN_ONCE); case RunMode::NOWAIT : return uv_run(uvloop, UV_RUN_NOWAIT); case RunMode::NOWAIT_FORCE : { uvloop->active_handles++; auto ret = uv_run(uvloop, UV_RUN_NOWAIT); uvloop->active_handles--; return ret; } } assert(0); } void stop () override { uv_stop(uvloop); } bool stopped () const override { return uvloop->stop_flag; } excepted handle_fork () override { int err = uv_loop_fork(uvloop); if (err) return make_unexpected(uvx_error(err)); return {}; } TimerImpl* new_timer (ITimerImplListener*) override; PrepareImpl* new_prepare (IPrepareImplListener*) override; CheckImpl* new_check (ICheckImplListener*) override; IdleImpl* new_idle (IIdleImplListener*) override; AsyncImpl* new_async (IAsyncImplListener*) override; SignalImpl* new_signal (ISignalImplListener*) override; PollImpl* new_poll_sock (IPollImplListener*, sock_t sock) override; PollImpl* new_poll_fd (IPollImplListener*, int fd) override; UdpImpl* new_udp (IUdpImplListener*, int domain, int flags) override; PipeImpl* new_pipe (IStreamImplListener*, bool ipc) override; TcpImpl* new_tcp (IStreamImplListener*, int domain) override; TtyImpl* new_tty (IStreamImplListener*, fd_t) override; WorkImpl* new_work (IWorkImplListener*) override; FsEventImpl* new_fs_event (IFsEventImplListener*) override; uint64_t delay (const delayed_fn& f, const iptr& guard = {}) override { return _delayer.add(f, guard); } void cancel_delay (uint64_t id) noexcept override { _delayer.cancel(id); } void track_load_average (uint32_t nsec) override { _la_metrics.reset(); if (nsec) { uvx_strict(uv_loop_configure(uvloop, UV_METRICS_IDLE_TIME)); _la_metrics = std::make_unique(this, this, nsec); } } double get_load_average () const override { return _la_metrics ? _la_metrics->get() : 0.0f; } uint64_t get_metrics_idle_time () const override { return uv_metrics_idle_time(uvloop); } void* get() override {return uvloop;} private: uv_loop_t _uvloop_body; UVDelayer _delayer; LAMetricsPtr _la_metrics; bool _running; }; }}}}