From d168be61999130d0c20cc31b3e6105647d928347 Mon Sep 17 00:00:00 2001 From: juancarlosgg Date: Fri, 31 Oct 2025 12:11:41 +0100 Subject: [PATCH 1/5] [python] add alternative queued signal implementation --- api/CMakeLists.txt | 1 + api/dsr_api.cpp | 95 ++++++----- api/dsr_rt_api.cpp | 16 +- api/dsr_signal_emitter.cpp | 86 ++++++++++ api/include/dsr/api/dsr_api.h | 50 +++++- api/include/dsr/api/dsr_graph_settings.h | 2 + api/include/dsr/api/dsr_signal_emitter.h | 126 ++++++++++++++ python-wrapper/python_api.cpp | 209 ++--------------------- python-wrapper/test/TestSignals.py | 2 + python-wrapper/test/TestSignalsQueued.py | 32 ++-- 10 files changed, 352 insertions(+), 267 deletions(-) create mode 100644 api/dsr_signal_emitter.cpp create mode 100644 api/include/dsr/api/dsr_signal_emitter.h diff --git a/api/CMakeLists.txt b/api/CMakeLists.txt index 154e90d..31d4e75 100644 --- a/api/CMakeLists.txt +++ b/api/CMakeLists.txt @@ -51,6 +51,7 @@ target_sources(dsr_api dsr_inner_eigen_api.cpp dsr_rt_api.cpp dsr_utils.cpp + dsr_signal_emitter.cpp GHistorySaver.cpp ${GEOM_API_SOURCES} ${headers_to_moc} diff --git a/api/dsr_api.cpp b/api/dsr_api.cpp index 1c06d6e..d7f837f 100644 --- a/api/dsr_api.cpp +++ b/api/dsr_api.cpp @@ -5,6 +5,7 @@ #include "dsr/api/dsr_graph_settings.h" #include "dsr/core/topics/IDLGraph.hpp" #include "dsr/core/types/translator.h" +#include "include/dsr/api/dsr_signal_emitter.h" #include #include #include @@ -44,7 +45,11 @@ DSRGraph::DSRGraph(GraphSettings settings) : qDebug() << "Agent name: " << QString::fromStdString(agent_name); utils = std::make_unique(this); - + if (settings.signal_mode == SignalMode::QT) { + set_qt_signals(); + } else { + set_queued_signals(); + } // RTPS Create participant auto[suc, participant_handle] = dsrparticipant.init(agent_id, agent_name, settings.same_host, ParticipantChangeFunctor(this, [&](DSR::DSRGraph *graph, @@ -122,8 +127,8 @@ DSRGraph::DSRGraph(GraphSettings settings) : qDebug() << __FUNCTION__ << "Constructor finished OK"; } -DSRGraph::DSRGraph(std::string name, uint32_t id, const std::string &dsr_input_file, bool all_same_host, int8_t domain_id) - : DSR::DSRGraph(GraphSettings {id, 5, 1, name, dsr_input_file, "", all_same_host, GraphSettings::LOGLEVEL::INFOL, domain_id}) +DSRGraph::DSRGraph(std::string name, uint32_t id, const std::string &dsr_input_file, bool all_same_host, int8_t domain_id, SignalMode mode) + : DSR::DSRGraph(GraphSettings {id, 5, 1, name, dsr_input_file, "", all_same_host, GraphSettings::LOGLEVEL::INFOL, domain_id, mode}) {} @@ -202,10 +207,10 @@ std::optional DSRGraph::insert_node(No &&node) if (delta.has_value()) { dsrpub_node.write(&delta.value()); - emit update_node_signal(node.id(), node.type(), SignalInfo{agent_id}); + emitter.update_node_signal(node.id(), node.type(), SignalInfo{agent_id}); for (const auto &[k, v]: node.fano()) { - emit update_edge_signal(node.id(), k.first, k.second, SignalInfo{agent_id}); + emitter.update_edge_signal(node.id(), k.first, k.second, SignalInfo{agent_id}); } } } @@ -291,13 +296,13 @@ requires (std::is_same_v, DSR::Node>) if (!copy) { if (vec_node_attr.has_value()) { dsrpub_node_attrs.write(&vec_node_attr.value()); - emit update_node_signal(node.id(), node.type(), SignalInfo{agent_id}); + emitter.update_node_signal(node.id(), node.type(), SignalInfo{agent_id}); std::vector atts_names(vec_node_attr->size()); std::transform(std::make_move_iterator(vec_node_attr->begin()), std::make_move_iterator(vec_node_attr->end()), atts_names.begin(), [](auto &&x) { return x.attr_name(); }); - emit update_node_attr_signal(node.id(), atts_names, SignalInfo{agent_id}); + emitter.update_node_attr_signal(node.id(), atts_names, SignalInfo{agent_id}); } } @@ -383,16 +388,16 @@ bool DSRGraph::delete_node(const std::string &name) if (result) { if (!copy) { - emit del_node_signal(id.value(), SignalInfo{agent_id}); - if (node_signal) emit deleted_node_signal(*node_signal, SignalInfo{agent_id}); + emitter.del_node_signal(id.value(), SignalInfo{agent_id}); + if (node_signal) emitter.deleted_node_signal(*node_signal, SignalInfo{agent_id}); dsrpub_node.write(&deleted_node.value()); for (auto &a : delta_vec) { dsrpub_edge.write(&a); } for (auto &edge : deleted_edges) { - emit del_edge_signal(edge.from(), edge.to(), edge.type(), SignalInfo{ agent_id }); - emit deleted_edge_signal(edge); + emitter.del_edge_signal(edge.from(), edge.to(), edge.type(), SignalInfo{ agent_id }); + emitter.deleted_edge_signal(edge, SignalInfo{ agent_id }); } } return true; @@ -416,16 +421,16 @@ bool DSRGraph::delete_node(uint64_t id) if (result) { if (!copy) { - emit del_node_signal(id, SignalInfo{ agent_id }); - if (node_signal) emit deleted_node_signal(*node_signal, SignalInfo{agent_id}); + emitter.del_node_signal(id, SignalInfo{ agent_id }); + if (node_signal) emitter.deleted_node_signal(*node_signal, SignalInfo{agent_id}); dsrpub_node.write(&deleted_node.value()); for (auto &a : delta_vec) { dsrpub_edge.write(&a); } for (auto &edge : deleted_edges) { - emit del_edge_signal(edge.from(), edge.to(), edge.type(), SignalInfo{ agent_id }); - emit deleted_edge_signal(edge); + emitter.del_edge_signal(edge.from(), edge.to(), edge.type(), SignalInfo{ agent_id }); + emitter.deleted_edge_signal(edge, SignalInfo{ agent_id }); } } return true; @@ -632,7 +637,7 @@ requires (std::is_same_v, DSR::Edge>) } if (result) { if (!copy) { - emit update_edge_signal(attrs.from(), attrs.to(), attrs.type(), SignalInfo{ agent_id }); + emitter.update_edge_signal(attrs.from(), attrs.to(), attrs.type(), SignalInfo{ agent_id }); if (delta_edge.has_value()) { //Insert dsrpub_edge.write(&delta_edge.value()); @@ -645,7 +650,7 @@ requires (std::is_same_v, DSR::Edge>) atts_names.begin(), [](auto &&x) { return x.attr_name(); }); - emit update_edge_attr_signal(attrs.from(), attrs.to(), attrs.type(), atts_names, SignalInfo{ agent_id }); + emitter.update_edge_attr_signal(attrs.from(), attrs.to(), attrs.type(), atts_names, SignalInfo{ agent_id }); } } @@ -687,9 +692,9 @@ bool DSRGraph::delete_edge(uint64_t from, uint64_t to, const std::string &key) if (delta.has_value()) { if (!copy) { - emit del_edge_signal(from, to, key, SignalInfo{ agent_id }); + emitter.del_edge_signal(from, to, key, SignalInfo{ agent_id }); if (deleted_edge.has_value()) { - emit deleted_edge_signal(*deleted_edge); + emitter.deleted_edge_signal(*deleted_edge, SignalInfo{ agent_id }); } dsrpub_edge.write(&delta.value()); } @@ -718,9 +723,9 @@ bool DSRGraph::delete_edge(const std::string &from, const std::string &to, const if (delta.has_value()) { if (!copy) { - emit del_edge_signal(id_from.value(), id_to.value(), key, SignalInfo{ agent_id }); + emitter.del_edge_signal(id_from.value(), id_to.value(), key, SignalInfo{ agent_id }); if (deleted_edge.has_value()) { - emit deleted_edge_signal(*deleted_edge); + emitter.deleted_edge_signal(*deleted_edge, SignalInfo{ agent_id }); } dsrpub_edge.write(&delta.value()); } @@ -1069,34 +1074,36 @@ void DSRGraph::join_delta_node(IDL::MvregNode &&mvreg) if (joined) { if (signal) { - emit update_node_signal(id, nodes.at(id).read_reg().type(), SignalInfo{ mvreg.agent_id() }); + emitter.update_node_signal(id, nodes.at(id).read_reg().type(), SignalInfo{ mvreg.agent_id() }); for (const auto &[k, v] : nodes.at(id).read_reg().fano()) { //std::cout << "[JOIN NODE] add edge FROM: "<< id << ", " << k.first << ", " << k.second << std::endl; - emit update_edge_signal(id, k.first, k.second, SignalInfo{ mvreg.agent_id() }); + emitter.update_edge_signal(id, k.first, k.second, SignalInfo{ mvreg.agent_id() }); } for (const auto &[k, v]: map_new_to_edges) { //std::cout << "[JOIN NODE] add edge TO: "<< k << ", " << id << ", " << v << std::endl; - emit update_edge_signal(k, id, v, SignalInfo{ mvreg.agent_id() }); + emitter.update_edge_signal(k, id, v, SignalInfo{ mvreg.agent_id() }); } } else { - emit del_node_signal(id, SignalInfo{ mvreg.agent_id() }); + emitter.del_node_signal(id, SignalInfo{ mvreg.agent_id() }); if (maybe_deleted_node.has_value()) { - emit deleted_node_signal(Node(*maybe_deleted_node)); + Node tmp_node(*maybe_deleted_node); + emitter.deleted_node_signal(tmp_node, SignalInfo{ agent_id }); for (const auto &node: maybe_deleted_node->fano()) { //std::cout << "[JOIN NODE] delete edge FROM: "<< node.second.read_reg().from() << ", " << node.second.read_reg().to() << ", " << node.second.read_reg().type() << std::endl; - emit del_edge_signal(node.second.read_reg().from(), node.second.read_reg().to(), + emitter.del_edge_signal(node.second.read_reg().from(), node.second.read_reg().to(), node.second.read_reg().type(), SignalInfo{ mvreg.agent_id() }); - emit deleted_edge_signal(Edge(node.second.read_reg())); + Edge tmp_edge(node.second.read_reg()); + emitter.deleted_edge_signal(tmp_edge, SignalInfo{ agent_id }); } } //TODO: deleted_edge_signal. update_maps_node_delete was called before so the maps are probably wrong... for (const auto &[from, type] : cache_map_to_edges.value()) { //std::cout << "[JOIN NODE] delete edge TO: "<< from << ", " << id << ", " << type << std::endl; - emit del_edge_signal(from, id, type, SignalInfo{ mvreg.agent_id() }); - //emit deleted_edge_signal(Edge(node.second.read_reg())); TODO: fix this + emitter.del_edge_signal(from, id, type, SignalInfo{ mvreg.agent_id() }); + //emitter.deleted_edge_signal(Edge(node.second.read_reg())); TODO: fix this } } @@ -1238,12 +1245,12 @@ void DSRGraph::join_delta_edge(IDL::MvregEdge &&mvreg) if (joined) { if (signal) { //std::cout << "[JOIN EDGE] add edge: "<< from << ", " << to << ", " << type << std::endl; - emit update_edge_signal(from, to, type, SignalInfo{ mvreg.agent_id() }); + emitter.update_edge_signal(from, to, type, SignalInfo{ mvreg.agent_id() }); } else { //std::cout << "[JOIN EDGE] delete edge: "<< from << ", " << to << ", " << type << std::endl; - emit del_edge_signal(from, to, type, SignalInfo{ mvreg.agent_id() }); + emitter.del_edge_signal(from, to, type, SignalInfo{ mvreg.agent_id() }); if (deleted_edge.has_value()) { - emit deleted_edge_signal(*deleted_edge); + emitter.deleted_edge_signal(*deleted_edge, SignalInfo{ agent_id }); } } } @@ -1491,26 +1498,28 @@ void DSRGraph::join_full_graph(IDL::OrMap &&full_graph) if (signal) { //check what change is joined if (!nd.has_value() || nd->attrs() != nodes[id].read_reg().attrs()) { - emit update_node_signal(id, nodes[id].read_reg().type(), SignalInfo{ agent_id_ch }); + emitter.update_node_signal(id, nodes[id].read_reg().type(), SignalInfo{ agent_id_ch }); } else if (nd.value() != nodes[id].read_reg()) { auto iter = nodes[id].read_reg().fano(); for (const auto &[k, v] : nd->fano()) { if (!iter.contains(k)) { - emit del_edge_signal(id, k.first, k.second, SignalInfo{ agent_id_ch }); + emitter.del_edge_signal(id, k.first, k.second, SignalInfo{ agent_id_ch }); if (v.dk.ds.size() > 0) { - emit deleted_edge_signal(Edge(v.read_reg())); + Edge tmp_edge(v.read_reg()); + emitter.deleted_edge_signal(tmp_edge, SignalInfo{ agent_id }); } } } for (const auto &[k, v] : iter) { if (auto it = nd->fano().find(k); it == nd->fano().end() or it->second != v) - emit update_edge_signal(id, k.first, k.second, SignalInfo{ agent_id_ch }); + emitter.update_edge_signal(id, k.first, k.second, SignalInfo{ agent_id_ch }); } } } else { - emit del_node_signal(id, SignalInfo{ agent_id_ch }); + emitter.del_node_signal(id, SignalInfo{ agent_id_ch }); if (nd.has_value()) { - emit deleted_node_signal(Node(*nd)); + Node tmp_node(*nd); + emitter.deleted_node_signal(tmp_node, SignalInfo{ agent_id_ch }); } } @@ -1689,8 +1698,8 @@ void DSRGraph::edge_attrs_subscription_thread() } - emit update_edge_attr_signal(from, to, type, sig, SignalInfo{samples.vec().at(0).agent_id()}); - emit update_edge_signal(from, to, type, SignalInfo{samples.vec().at(0).agent_id()}); + emitter.update_edge_attr_signal(from, to, type, sig, SignalInfo{samples.vec().at(0).agent_id()}); + emitter.update_edge_signal(from, to, type, SignalInfo{samples.vec().at(0).agent_id()}); }); } @@ -1759,8 +1768,8 @@ void DSRGraph::node_attrs_subscription_thread() sig.emplace_back(std::move(opt_str.value())); } - emit update_node_attr_signal(id, sig, SignalInfo{samples.vec().at(0).agent_id()}); - emit update_node_signal(id, type, SignalInfo{samples.vec().at(0).agent_id()}); + emitter.update_node_attr_signal(id, sig, SignalInfo{samples.vec().at(0).agent_id()}); + emitter.update_node_signal(id, type, SignalInfo{samples.vec().at(0).agent_id()}); }); } } diff --git a/api/dsr_rt_api.cpp b/api/dsr_rt_api.cpp index 3998b8a..8fb43e8 100644 --- a/api/dsr_rt_api.cpp +++ b/api/dsr_rt_api.cpp @@ -341,12 +341,12 @@ void RT_API::insert_or_assign_edge_RT(Node &n, uint64_t to, const std::vectordsrpub_node_attrs.write(&node2.value()); - emit G->update_edge_attr_signal(n.id(), to, "RT" ,{"rt_rotation_euler_xyz", "rt_translation"}, SignalInfo{ G->agent_id }); - emit G->update_edge_signal(n.id(), to, "RT", SignalInfo{ G->agent_id }); + G->emitter.update_edge_attr_signal(n.id(), to, "RT" ,{"rt_rotation_euler_xyz", "rt_translation"}, SignalInfo{ G->agent_id }); + G->emitter.update_edge_signal(n.id(), to, "RT", SignalInfo{ G->agent_id }); if (!no_send) { - emit G->update_node_signal(to_n->id(), to_n->type(), SignalInfo{ G->agent_id }); - emit G->update_node_attr_signal(to_n->id(), {"level", "parent"}, SignalInfo{ G->agent_id }); + G->emitter.update_node_signal(to_n->id(), to_n->type(), SignalInfo{ G->agent_id }); + G->emitter.update_node_attr_signal(to_n->id(), {"level", "parent"}, SignalInfo{ G->agent_id }); } } } @@ -528,12 +528,12 @@ void RT_API::insert_or_assign_edge_RT(Node &n, uint64_t to, std::vector & if (!no_send and node2.has_value()) G->dsrpub_node_attrs.write(&node2.value()); - emit G->update_edge_attr_signal(n.id(), to, "RT",{"rt_rotation_euler_xyz", "rt_translation"}, SignalInfo{ G->agent_id }); - emit G->update_edge_signal(n.id(), to, "RT", SignalInfo{ G->agent_id }); + G->emitter.update_edge_attr_signal(n.id(), to, "RT",{"rt_rotation_euler_xyz", "rt_translation"}, SignalInfo{ G->agent_id }); + G->emitter.update_edge_signal(n.id(), to, "RT", SignalInfo{ G->agent_id }); if (!no_send) { - emit G->update_node_signal(to_n->id(), to_n->type(), SignalInfo{ G->agent_id }); - emit G->update_node_attr_signal(to_n->id(), {"level", "parent"}, SignalInfo{ G->agent_id }); + G->emitter.update_node_signal(to_n->id(), to_n->type(), SignalInfo{ G->agent_id }); + G->emitter.update_node_attr_signal(to_n->id(), {"level", "parent"}, SignalInfo{ G->agent_id }); } } } diff --git a/api/dsr_signal_emitter.cpp b/api/dsr_signal_emitter.cpp new file mode 100644 index 0000000..bc8e1ee --- /dev/null +++ b/api/dsr_signal_emitter.cpp @@ -0,0 +1,86 @@ +#include +#include +#include + +void DSR::QueuedSignalRunner::run_update_node_signal(std::uint64_t a, + const std::string &b, + SignalInfo c) { + + tp.spawn_task([=, this] { + for (auto fn : uns_fns) { + if (fn) + fn(a, b); + } + }); +} +void DSR::QueuedSignalRunner::run_update_node_attr_signal( + std::uint64_t a, const std::vector &b, SignalInfo c) { + tp.spawn_task([=, this] { + for (auto fn : unas_fns) { + if (fn) + fn(a, b); + } + }); +} +void DSR::QueuedSignalRunner::run_update_edge_signal(std::uint64_t a, + std::uint64_t b, + const std::string &c, + SignalInfo d) { + tp.spawn_task([=, this] { + for (auto fn : ues_fns) { + if (fn) + fn(a, b, c); + } + }); +} + +void DSR::QueuedSignalRunner::run_update_edge_attr_signal( + std::uint64_t a, std::uint64_t b, const std::string &c, + const std::vector &d, SignalInfo e) { + tp.spawn_task([=, this] { + for (auto fn : ueas_fns) { + if (fn) + fn(a, b, c, d); + } + }); +} + +void DSR::QueuedSignalRunner::run_del_edge_signal(std::uint64_t a, + std::uint64_t b, + const std::string &c, + SignalInfo d) { + tp.spawn_task([=, this] { + for (auto fn : des_fns) { + if (fn) + fn(a, b, c); + } + }); +} +void DSR::QueuedSignalRunner::run_del_node_signal(std::uint64_t a, + SignalInfo b) { + tp.spawn_task([=, this] { + for (auto fn : den_fns) { + if (fn) + fn(a); + } + }); +} + +void DSR::QueuedSignalRunner::run_deleted_node_signal(const Node &a, + SignalInfo b) { + tp.spawn_task([=, this] { + for (auto fn : dn_fns) { + if (fn) + fn(a); + } + }); +} +void DSR::QueuedSignalRunner::run_deleted_edge_signal(const Edge &a, + SignalInfo b) { + tp.spawn_task([=, this] { + for (auto fn : de_fns) { + if (fn) + fn(a); + } + }); +} \ No newline at end of file diff --git a/api/include/dsr/api/dsr_api.h b/api/include/dsr/api/dsr_api.h index 1766740..b48dd06 100644 --- a/api/include/dsr/api/dsr_api.h +++ b/api/include/dsr/api/dsr_api.h @@ -30,11 +30,13 @@ #include "dsr/api/dsr_utils.h" #include "dsr/api/dsr_signal_info.h" #include "dsr/api/dsr_graph_settings.h" +#include "dsr/api/dsr_signal_emitter.h" #include "dsr/core/types/type_checking/dsr_edge_type.h" #include "dsr/core/types/type_checking/dsr_node_type.h" #include "dsr/core/types/type_checking/dsr_attr_name.h" #include "dsr/core/utils.h" #include "dsr/core/id_generator.h" +#include "dsr_signal_emitter.h" #include "threadpool/threadpool.h" #include @@ -57,9 +59,9 @@ namespace DSR size_t size() const; DSRGraph(GraphSettings settings); - DSRGraph(std::string name, uint32_t id, const std::string& dsr_input_file = std::string(), bool all_same_host = true, int8_t domain_id=0); - [[deprecated("root parameter is not used anymore")]] DSRGraph(uint64_t root, std::string name, int id, const std::string& dsr_input_file = std::string(), bool all_same_host = true, int8_t domain_id=0) - : DSRGraph(name, id, dsr_input_file, all_same_host, domain_id) + DSRGraph(std::string name, uint32_t id, const std::string& dsr_input_file = std::string(), bool all_same_host = true, int8_t domain_id=0, SignalMode = SignalMode::QT); + [[deprecated("root parameter is not used anymore")]] DSRGraph(uint64_t root, std::string name, int id, const std::string& dsr_input_file = std::string(), bool all_same_host = true, int8_t domain_id=0, SignalMode mode = SignalMode::QT) + : DSRGraph(name, id, dsr_input_file, all_same_host, domain_id, mode) {} ~DSRGraph() override; @@ -539,6 +541,14 @@ namespace DSR return ret_vec; } + + ////////////////////////////////////////////////// + ///// QueuedSignals for python + ///////////////////////////////////////////////// + + QueuedSignalRunner* get_signal_runner() { + return emitter.runner.get(); + } private: DSRGraph(const DSRGraph& G); //Private constructor for DSRCopy @@ -557,6 +567,40 @@ namespace DSR bool same_host; id_generator generator; GraphSettings::LOGLEVEL log_level; + signals_fns emitter; + + ////////////////////////////////////////////////////////////////////////// + // Signal method + /////////////////////////////////////////////////////////////////////////// + + void set_qt_signals (){ + emitter = { + [this](std::uint64_t a, const std::string & b, SignalInfo c = {}) { emit update_node_signal(a, b, c); }, + [this](std::uint64_t a, const std::vector &b, SignalInfo c = {}) { emit update_node_attr_signal(a, b, c); }, + [this](std::uint64_t a, std::uint64_t b, const std::string & c, SignalInfo d = {}) { emit update_edge_signal(a, b, c, d); }, + [this](std::uint64_t a, std::uint64_t b, const std::string & c, const std::vector &d, SignalInfo e = {}) { emit update_edge_attr_signal(a, b, c, d, e); }, + [this](std::uint64_t a, std::uint64_t b, const std::string & c, SignalInfo d = {}) { emit del_edge_signal(a, b, c, d); }, + [this](std::uint64_t a, SignalInfo b = {}) { emit del_node_signal(a, b); }, + [this](const Node& a, SignalInfo b = {}) { emit deleted_node_signal(a, b); }, + [this](const Edge& a, SignalInfo b = {}) { emit deleted_edge_signal(a, b); }, + nullptr + }; + } + + void set_queued_signals (){ + auto runner = new QueuedSignalRunner(); + emitter = { + [runner](std::uint64_t a, const std::string & b, SignalInfo c = {}) { runner->run_update_node_signal(a, b, c); }, + [runner](std::uint64_t a, const std::vector &b, SignalInfo c = {}) { runner->run_update_node_attr_signal(a, b, c); }, + [runner](std::uint64_t a, std::uint64_t b, const std::string & c, SignalInfo d = {}) { runner->run_update_edge_signal(a, b, c, d); }, + [runner](std::uint64_t a, std::uint64_t b, const std::string & c, const std::vector &d, SignalInfo e = {}) { runner->run_update_edge_attr_signal(a, b, c, d, e); }, + [runner](std::uint64_t a, std::uint64_t b, const std::string & c, SignalInfo d = {}) { runner->run_del_edge_signal(a, b, c, d); }, + [runner](std::uint64_t a, SignalInfo b = {}) { runner->run_del_node_signal(a, b); }, + [runner](const Node& a, SignalInfo b = {}) { runner->run_deleted_node_signal(a, b); }, + [runner](const Edge& a, SignalInfo b = {}) { runner->run_deleted_edge_signal(a, b); }, + std::unique_ptr(runner) + }; + } ////////////////////////////////////////////////////////////////////////// // Cache maps diff --git a/api/include/dsr/api/dsr_graph_settings.h b/api/include/dsr/api/dsr_graph_settings.h index c9707d9..e26874d 100644 --- a/api/include/dsr/api/dsr_graph_settings.h +++ b/api/include/dsr/api/dsr_graph_settings.h @@ -2,6 +2,7 @@ #include #include +#include namespace DSR { @@ -17,6 +18,7 @@ struct GraphSettings { DEBUGL = 0, INFOL, WARNINGL, ERRORL } log_level {LOGLEVEL::INFOL}; int8_t domain_id = 0; + SignalMode signal_mode = QT; }; } \ No newline at end of file diff --git a/api/include/dsr/api/dsr_signal_emitter.h b/api/include/dsr/api/dsr_signal_emitter.h new file mode 100644 index 0000000..1392e6a --- /dev/null +++ b/api/include/dsr/api/dsr_signal_emitter.h @@ -0,0 +1,126 @@ +#pragma once + +#include +#include +#include +#include +#include + +namespace DSR { + +struct Node; +struct Edge; +struct SignalInfo; + +enum SignalMode { QT, Queue }; + +typedef std::function + update_node_signal_t; +typedef std::function &, + SignalInfo)> + update_node_attr_signal_t; +typedef std::function + update_edge_signal_t; +typedef std::function &, SignalInfo)> + update_edge_attr_signal_t; +typedef std::function + del_edge_signal_t; +typedef std::function del_node_signal_t; +typedef std::function deleted_node_signal_t; +typedef std::function deleted_edge_signal_t; + +typedef std::function + update_node_signal_noinfo_t; +typedef std::function &)> + update_node_attr_signal_noinfo_t; +typedef std::function + update_edge_signal_noinfo_t; +typedef std::function &)> + update_edge_attr_signal_noinfo_t; +typedef update_edge_signal_noinfo_t del_edge_signal_noinfo_t; +typedef std::function del_node_signal_noinfo_t; +typedef std::function deleted_node_signal_noinfo_t; +typedef std::function deleted_edge_signal_noinfo_t; + +typedef std::variant + signal_fn_ptr_t; + +struct QueuedSignalRunner { + ThreadPool tp; + std::vector uns_fns; + std::vector unas_fns; + std::vector ues_fns; + std::vector ueas_fns; + std::vector des_fns; + std::vector den_fns; + std::vector dn_fns; + std::vector de_fns; + explicit QueuedSignalRunner() : tp(2) + {} + + void connect(signal_fn_ptr_t fn, const std::string& type) { + std::visit( + [&, this](auto &&arg) { + using T = std::decay_t; + if constexpr (std::is_same_v) + unas_fns.push_back(arg); + else if constexpr (std::is_same_v) + uns_fns.push_back(arg); + else if constexpr (std::is_same_v) { + if (type == "UPDATE_EDGE") + ues_fns.push_back(arg); + else + des_fns.push_back(arg); + } + else if constexpr (std::is_same_v) + ueas_fns.push_back(arg); + else if constexpr (std::is_same_v) + den_fns.push_back(arg); + else if constexpr (std::is_same_v) + dn_fns.push_back(arg); + else if constexpr (std::is_same_v) + de_fns.push_back(arg); + else + std::cerr << "Python signals don't use SignalInfo parameter\n"; + }, + fn); + } + + void run_update_node_signal(std::uint64_t a, const std::string &b, + SignalInfo c); + void run_update_node_attr_signal(std::uint64_t a, + const std::vector &b, + SignalInfo c); + void run_update_edge_signal(std::uint64_t a, std::uint64_t b, + const std::string &c, SignalInfo d); + void run_update_edge_attr_signal(std::uint64_t a, std::uint64_t b, + const std::string &c, + const std::vector &d, + SignalInfo e); + void run_del_edge_signal(std::uint64_t a, std::uint64_t b, + const std::string &c, SignalInfo d); + void run_del_node_signal(std::uint64_t a, SignalInfo b); + void run_deleted_node_signal(const Node &a, SignalInfo b); + void run_deleted_edge_signal(const Edge &a, SignalInfo b); +}; + +struct signals_fns { + update_node_signal_t update_node_signal; + update_node_attr_signal_t update_node_attr_signal; + update_edge_signal_t update_edge_signal; + update_edge_attr_signal_t update_edge_attr_signal; + del_edge_signal_t del_edge_signal; + del_node_signal_t del_node_signal; + deleted_node_signal_t deleted_node_signal; + deleted_edge_signal_t deleted_edge_signal; + std::unique_ptr runner; +}; +} +; // namespace DSR \ No newline at end of file diff --git a/python-wrapper/python_api.cpp b/python-wrapper/python_api.cpp index 98b8e88..46d4adb 100644 --- a/python-wrapper/python_api.cpp +++ b/python-wrapper/python_api.cpp @@ -3,6 +3,8 @@ // #include "dsr/api/dsr_api.h" +#include +#include #include using namespace DSR; @@ -149,8 +151,6 @@ PYBIND11_MAKE_OPAQUE(std::map, Edge>) PYBIND11_MAKE_OPAQUE(std::map) -static QCoreApplication *app = nullptr; - PYBIND11_MODULE(pydsr, m) { py::bind_map, Edge>>(m, "MapStringEdge"); py::bind_dsr_map>(m, "MapStringAttribute"); @@ -159,30 +159,6 @@ PYBIND11_MODULE(pydsr, m) { uint64_t local_agent_id = -1; - /*std::thread signal_thread ([] { - int argc = 0; - char *argv[] = {nullptr}; - auto *QApp = new QCoreApplication(argc, argv); - app = QApp; - QApp->exec(); - }); - - signal_thread.detach();*/ - - // ugly busy wait until the QApp thread is running. - while (app != nullptr){} - - py::module::import("atexit").attr("register")( - py::cpp_function{ - [&]() -> void { - if (app) { - //QCoreApplication::quit(); - std::exit(0); - } - } - } - ); - //Disable messages from Qt. qInstallMessageHandler([](QtMsgType type, const QMessageLogContext &context, const QString &msg) { if (type == QtCriticalMsg || type == QtFatalMsg) { @@ -244,179 +220,18 @@ PYBIND11_MODULE(pydsr, m) { .export_values(); - sig.def("connect", [](DSRGraph *G, signal_type type, callback_types fn_callback) { - switch (type) { - case UPDATE_NODE: - try { - QObject::connect(G, &DSR::DSRGraph::update_node_signal, - std::get>(fn_callback)); + sig.def("connect", [&](DSRGraph *G, signal_type type, callback_types fn_callback) { - } catch (std::exception &e) { - std::cout << "Update Node Callback must be (int, str)\n " << std::endl; - throw e; - } - break; - case UPDATE_NODE_ATTR: - try { - QObject::connect(G, &DSR::DSRGraph::update_node_attr_signal, - std::get &)>>( - fn_callback)); - - } catch (std::exception &e) { - std::cout << "Update Node Attribute Callback must be (int, [str])\n " << std::endl; - throw e; - } - break; - case UPDATE_EDGE: - try { - QObject::connect(G, &DSR::DSRGraph::update_edge_signal, - std::get>( - fn_callback)); - } catch (std::exception &e) { - std::cout << "Update Edge Callback must be (int, int, str)\n " << std::endl; - throw e; - } - break; - case UPDATE_EDGE_ATTR: - try { - QObject::connect(G, &DSR::DSRGraph::update_edge_attr_signal, - std::get &)>>(fn_callback)); - } catch (std::exception &e) { - std::cout << "Update Edge Attribute Callback must be (int, int, str, [str])\n " << std::endl; - throw e; - } - break; - case DELETE_EDGE: - try { - QObject::connect(G, &DSR::DSRGraph::del_edge_signal, - std::get>( - fn_callback)); - } catch (std::exception &e) { - std::cout << "Delete Edge Callback must be (int, int, str)\n " << std::endl; - throw e; - } - break; - case DELETE_NODE: - try { - QObject::connect(G, &DSR::DSRGraph::del_node_signal, - std::get>(fn_callback)); - } catch (std::exception &e) { - std::cout << "Delete Node Callback must be (int)\n " << std::endl; - throw e; - } - break; - case DELETE_NODE_OBJ: - try { - QObject::connect(G, &DSR::DSRGraph::deleted_node_signal, - std::get>(fn_callback)); - } catch (std::exception &e) { - std::cout << "Delete Node Callback must be (pydsr.Node)\n " << std::endl; - throw e; - } - break; - case DELETE_EDGE_OBJ: - try { - QObject::connect(G, &DSR::DSRGraph::deleted_edge_signal, - std::get>(fn_callback)); - } catch (std::exception &e) { - std::cout << "Delete Node Callback must be (pydsr.Edge)\n " << std::endl; - throw e; - } - break; - default: - throw std::logic_error("Invalid signal type"); + auto runner = G->get_signal_runner(); + if (!runner) { + throw std::runtime_error("Signal runner doesn't exist"); } - }); - - sig.def("connect2", [&](DSRGraph *G, signal_type type, callback_types fn_callback) { - - switch (type) { - case UPDATE_NODE: - try { - QObject::connect(G, &DSR::DSRGraph::update_node_signal, app, - std::get>(fn_callback), - Qt::QueuedConnection); - - } catch (std::exception &e) { - std::cout << "Update Node Callback must be (int, str)\n " << std::endl; - throw e; - } - break; - case UPDATE_NODE_ATTR: - try { - QObject::connect(G, &DSR::DSRGraph::update_node_attr_signal, app, - std::get &)>>(fn_callback), - Qt::QueuedConnection); - - } catch (std::exception &e) { - std::cout << "Update Node Attribute Callback must be (int, [str])\n " << std::endl; - throw e; - } - break; - case UPDATE_EDGE: - try { - QObject::connect(G, &DSR::DSRGraph::update_edge_signal, app, - std::get>(fn_callback), - Qt::QueuedConnection); - } catch (std::exception &e) { - std::cout << "Update Edge Callback must be (int, int, str)\n " << std::endl; - throw e; - } - break; - case UPDATE_EDGE_ATTR: - try { - QObject::connect(G, &DSR::DSRGraph::update_edge_attr_signal, app, - std::get &)>>(fn_callback), - Qt::QueuedConnection); - } catch (std::exception &e) { - std::cout << "Update Edge Attribute Callback must be (int, int, str, [str])\n " << std::endl; - throw e; - } - break; - case DELETE_EDGE: - try { - QObject::connect(G, &DSR::DSRGraph::del_edge_signal, app, - std::get>(fn_callback), - Qt::QueuedConnection); - } catch (std::exception &e) { - std::cout << "Delete Edge Callback must be (int, int, str)\n " << std::endl; - throw e; - } - break; - case DELETE_NODE: - try { - QObject::connect(G, &DSR::DSRGraph::del_node_signal, app, - std::get>(fn_callback), - Qt::QueuedConnection); - } catch (std::exception &e) { - std::cout << "Delete Node Callback must be (int)\n " << std::endl; - throw e; - } - break; - case DELETE_NODE_OBJ: - try { - QObject::connect(G, &DSR::DSRGraph::deleted_node_signal, app, - std::get>(fn_callback), - Qt::QueuedConnection); - } catch (std::exception &e) { - std::cout << "Delete Node Callback must be (pydsr.Node)\n " << std::endl; - throw e; - } - break; - case DELETE_EDGE_OBJ: - try { - QObject::connect(G, &DSR::DSRGraph::deleted_edge_signal, app, - std::get>(fn_callback), - Qt::QueuedConnection); - } catch (std::exception &e) { - std::cout << "Delete Node Callback must be (pydsr.Edge)\n " << std::endl; - throw e; - } - break; - default: - throw std::logic_error("Invalid signal type"); + try { + runner->connect(fn_callback, std::to_string(type)); + } catch (std::exception &e) { + std::cout << "Update Node Callback must be (int, str)\n " << std::endl; + throw e; } }); @@ -737,7 +552,7 @@ PYBIND11_MODULE(pydsr, m) { const std::string &dsr_input_file = "", bool all_same_host = true, int8_t domain_id = 0) -> std::unique_ptr { local_agent_id = id; - auto g = std::make_unique(root, name, id, dsr_input_file, all_same_host, domain_id); + auto g = std::make_unique(root, name, id, dsr_input_file, all_same_host, domain_id, SignalMode::Queue); return g; }), "root"_a, "name"_a, "id"_a, "dsr_input_file"_a = "", "all_same_host"_a = true, "domain_id"_a=0, py::call_guard()) diff --git a/python-wrapper/test/TestSignals.py b/python-wrapper/test/TestSignals.py index 1f83557..4e3464a 100644 --- a/python-wrapper/test/TestSignals.py +++ b/python-wrapper/test/TestSignals.py @@ -96,3 +96,5 @@ def connect(self): result = g.delete_node(2) console.print(result) +import time +time.sleep(1) \ No newline at end of file diff --git a/python-wrapper/test/TestSignalsQueued.py b/python-wrapper/test/TestSignalsQueued.py index e2ece4c..f0800b1 100644 --- a/python-wrapper/test/TestSignalsQueued.py +++ b/python-wrapper/test/TestSignalsQueued.py @@ -57,14 +57,14 @@ def deleted_edge_obj(self, edge: Edge): console.print(f"DELETED EDGE OBJ: {edge}", style='green') def connect(self): - signals.connect2(g, signals.UPDATE_NODE_ATTR, self.update_node_att) - signals.connect2(g, signals.UPDATE_NODE, self.update_node) - signals.connect2(g, signals.DELETE_NODE, self.delete_node) - signals.connect2(g, signals.UPDATE_EDGE, self.update_edge) - signals.connect2(g, signals.UPDATE_EDGE_ATTR, self.update_edge_att) - signals.connect2(g, signals.DELETE_EDGE, self.delete_edge) - signals.connect2(g, signals.DELETE_NODE_OBJ, self.deleted_node_obj) - signals.connect2(g, signals.DELETE_EDGE_OBJ, self.deleted_edge_obj) + signals.connect(g, signals.UPDATE_NODE_ATTR, self.update_node_att) + signals.connect(g, signals.UPDATE_NODE, self.update_node) + signals.connect(g, signals.DELETE_NODE, self.delete_node) + signals.connect(g, signals.UPDATE_EDGE, self.update_edge) + signals.connect(g, signals.UPDATE_EDGE_ATTR, self.update_edge_att) + signals.connect(g, signals.DELETE_EDGE, self.delete_edge) + signals.connect(g, signals.DELETE_NODE_OBJ, self.deleted_node_obj) + signals.connect(g, signals.DELETE_EDGE_OBJ, self.deleted_edge_obj) app = QtWidgets.QApplication(sys.argv) @@ -101,14 +101,14 @@ def deleted_edge_obj(edge: Edge): agent.connect() -signals.connect2(g, signals.UPDATE_NODE_ATTR, update_node_att) -signals.connect2(g, signals.UPDATE_NODE, update_node) -signals.connect2(g, signals.DELETE_NODE, delete_node) -signals.connect2(g, signals.UPDATE_EDGE, update_edge) -signals.connect2(g, signals.UPDATE_EDGE_ATTR, update_edge_att) -signals.connect2(g, signals.DELETE_EDGE, delete_edge) -signals.connect2(g, signals.DELETE_NODE_OBJ, deleted_node_obj) -signals.connect2(g, signals.DELETE_EDGE_OBJ, deleted_edge_obj) +signals.connect(g, signals.UPDATE_NODE_ATTR, update_node_att) +signals.connect(g, signals.UPDATE_NODE, update_node) +signals.connect(g, signals.DELETE_NODE, delete_node) +signals.connect(g, signals.UPDATE_EDGE, update_edge) +signals.connect(g, signals.UPDATE_EDGE_ATTR, update_edge_att) +signals.connect(g, signals.DELETE_EDGE, delete_edge) +signals.connect(g, signals.DELETE_NODE_OBJ, deleted_node_obj) +signals.connect(g, signals.DELETE_EDGE_OBJ, deleted_edge_obj) From 67485e0114361135d2dc552a886b6c622862f27a Mon Sep 17 00:00:00 2001 From: juancarlosgg Date: Mon, 16 Feb 2026 12:15:35 +0100 Subject: [PATCH 2/5] fix: tree viewer null pointer dereference also possible id truncating issue --- gui/dsr_gui.cpp | 2 +- gui/viewers/graph_viewer/graph_viewer.cpp | 7 ++++++- gui/viewers/tree_viewer/tree_viewer.cpp | 2 +- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/gui/dsr_gui.cpp b/gui/dsr_gui.cpp index eeab83d..6931900 100644 --- a/gui/dsr_gui.cpp +++ b/gui/dsr_gui.cpp @@ -352,7 +352,7 @@ void DSRViewer::initialize_views(int options, view central) tree_widget, &TreeViewer::node_check_state_changed, graph_widget, - [=](int value, int id, const std::string &type, QTreeWidgetItem *) + [=](int value, uint64_t id, const std::string &type, QTreeWidgetItem *) { graph_widget->hide_show_node_SLOT(id, value == 2); }); diff --git a/gui/viewers/graph_viewer/graph_viewer.cpp b/gui/viewers/graph_viewer/graph_viewer.cpp index bd57d5e..94e4d6b 100644 --- a/gui/viewers/graph_viewer/graph_viewer.cpp +++ b/gui/viewers/graph_viewer/graph_viewer.cpp @@ -317,6 +317,8 @@ void GraphViewer::del_node_SLOT(uint64_t id) while (gmap.count(id) > 0) { auto item = gmap.at(id); scene.removeItem(item); + for (auto &[type, ids] : type_id_map) + ids.erase(id); delete item; gmap.erase(id); auto id_str = std::to_string(id); @@ -330,7 +332,10 @@ void GraphViewer::del_node_SLOT(uint64_t id) void GraphViewer::hide_show_node_SLOT(uint64_t id, bool visible) { - auto item = gmap[id]; + auto it = gmap.find(id); + if (it == gmap.end() || it->second == nullptr) + return; + auto item = it->second; item->setVisible(visible); for (const auto &gedge: item->edgeList) { diff --git a/gui/viewers/tree_viewer/tree_viewer.cpp b/gui/viewers/tree_viewer/tree_viewer.cpp index 92bdfc0..4a7ce72 100644 --- a/gui/viewers/tree_viewer/tree_viewer.cpp +++ b/gui/viewers/tree_viewer/tree_viewer.cpp @@ -155,7 +155,7 @@ void TreeViewer::category_change_SLOT(int value, QTreeWidgetItem* parent) void TreeViewer::node_change_SLOT(int value, uint64_t id, const std::string &type, QTreeWidgetItem* parent) { QCheckBox* sender = qobject_cast(this->sender()); - if(sender) + if(sender and G->get_node(id).has_value()) { qDebug()<<"Emitting signal for "<(this->itemWidget(parent,0))->text(); emit node_check_state_changed(value, id, type, parent); From fb64f379dd73a55d53f9f584bb2a1435dc2ec3ee Mon Sep 17 00:00:00 2001 From: juancarlosgg Date: Mon, 16 Feb 2026 13:08:13 +0100 Subject: [PATCH 3/5] debug: add sync messages --- api/dsr_api.cpp | 91 ++++++++++++++++------- api/dsr_signal_emitter.cpp | 12 ++- gui/viewers/graph_viewer/graph_viewer.cpp | 8 +- gui/viewers/tree_viewer/tree_viewer.cpp | 6 +- 4 files changed, 82 insertions(+), 35 deletions(-) diff --git a/api/dsr_api.cpp b/api/dsr_api.cpp index d7f837f..c206c10 100644 --- a/api/dsr_api.cpp +++ b/api/dsr_api.cpp @@ -207,6 +207,8 @@ std::optional DSRGraph::insert_node(No &&node) if (delta.has_value()) { dsrpub_node.write(&delta.value()); + if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) + qDebug() << "[INSERT_NODE] emitting update_node_signal" << node.id() << QString::fromStdString(node.type()); emitter.update_node_signal(node.id(), node.type(), SignalInfo{agent_id}); for (const auto &[k, v]: node.fano()) { @@ -296,6 +298,8 @@ requires (std::is_same_v, DSR::Node>) if (!copy) { if (vec_node_attr.has_value()) { dsrpub_node_attrs.write(&vec_node_attr.value()); + if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) + qDebug() << "[UPDATE_NODE] emitting update_node_signal" << node.id() << QString::fromStdString(node.type()); emitter.update_node_signal(node.id(), node.type(), SignalInfo{agent_id}); std::vector atts_names(vec_node_attr->size()); std::transform(std::make_move_iterator(vec_node_attr->begin()), @@ -388,6 +392,8 @@ bool DSRGraph::delete_node(const std::string &name) if (result) { if (!copy) { + if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) + qDebug() << "[DELETE_NODE] emitting del_node_signal" << id.value(); emitter.del_node_signal(id.value(), SignalInfo{agent_id}); if (node_signal) emitter.deleted_node_signal(*node_signal, SignalInfo{agent_id}); dsrpub_node.write(&deleted_node.value()); @@ -421,6 +427,8 @@ bool DSRGraph::delete_node(uint64_t id) if (result) { if (!copy) { + if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) + qDebug() << "[DELETE_NODE] emitting del_node_signal" << id; emitter.del_node_signal(id, SignalInfo{ agent_id }); if (node_signal) emitter.deleted_node_signal(*node_signal, SignalInfo{agent_id}); dsrpub_node.write(&deleted_node.value()); @@ -637,6 +645,8 @@ requires (std::is_same_v, DSR::Edge>) } if (result) { if (!copy) { + if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) + qDebug() << "[INSERT_OR_ASSIGN_EDGE] emitting update_edge_signal" << attrs.from() << attrs.to() << QString::fromStdString(attrs.type()); emitter.update_edge_signal(attrs.from(), attrs.to(), attrs.type(), SignalInfo{ agent_id }); if (delta_edge.has_value()) { //Insert @@ -692,6 +702,8 @@ bool DSRGraph::delete_edge(uint64_t from, uint64_t to, const std::string &key) if (delta.has_value()) { if (!copy) { + if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) + qDebug() << "[DELETE_EDGE] emitting del_edge_signal" << from << to << QString::fromStdString(key); emitter.del_edge_signal(from, to, key, SignalInfo{ agent_id }); if (deleted_edge.has_value()) { emitter.deleted_edge_signal(*deleted_edge, SignalInfo{ agent_id }); @@ -979,7 +991,7 @@ void DSRGraph::join_delta_node(IDL::MvregNode &&mvreg) { node_handle = unprocessed_delta_edge_from.extract(id); } - + std::erase_if(unprocessed_delta_edge_to, [&](auto &it){ return std::get<0>(it.second) == id;}); std::erase_if(unprocessed_delta_edge_att, @@ -994,7 +1006,8 @@ void DSRGraph::join_delta_node(IDL::MvregNode &&mvreg) while (!node_handle_node_att.empty()) { auto &[att_name, delta, timestamp_node_att] = node_handle_node_att.mapped(); - //std::cout << "node_att " << id<< ", " <fano()) { - //std::cout << "[JOIN NODE] delete edge FROM: "<< node.second.read_reg().from() << ", " << node.second.read_reg().to() << ", " << node.second.read_reg().type() << std::endl; + if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) + qDebug() << "[JOIN_NODE] delete edge FROM:" << node.second.read_reg().from() << node.second.read_reg().to() << QString::fromStdString(node.second.read_reg().type()); emitter.del_edge_signal(node.second.read_reg().from(), node.second.read_reg().to(), node.second.read_reg().type(), SignalInfo{ mvreg.agent_id() }); Edge tmp_edge(node.second.read_reg()); @@ -1101,7 +1121,8 @@ void DSRGraph::join_delta_node(IDL::MvregNode &&mvreg) //TODO: deleted_edge_signal. update_maps_node_delete was called before so the maps are probably wrong... for (const auto &[from, type] : cache_map_to_edges.value()) { - //std::cout << "[JOIN NODE] delete edge TO: "<< from << ", " << id << ", " << type << std::endl; + if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) + qDebug() << "[JOIN_NODE] delete edge TO:" << from << id << QString::fromStdString(type); emitter.del_edge_signal(from, id, type, SignalInfo{ mvreg.agent_id() }); //emitter.deleted_edge_signal(Edge(node.second.read_reg())); TODO: fix this } @@ -1199,7 +1220,8 @@ void DSRGraph::join_delta_edge(IDL::MvregEdge &&mvreg) for (auto [begin, end] = unprocessed_delta_edge_from.equal_range(from); begin != end; ++begin) { //There should not be many elements in this iteration if (std::get<0>(begin->second) == to && std::get<1>(begin->second) == type){ std::get<2>(begin->second).join(::mvreg(crdt_delta)); - //std::cout << "JOIN_DELTA_EDGE ID(" << from<< ", "<< to <<", "<< type << ") JOIN UNPROCESSED" << std::endl; + if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) + qDebug() << "[JOIN_EDGE] JOIN UNPROCESSED (from)" << from << to << QString::fromStdString(type); find = true; break; } } @@ -1207,28 +1229,31 @@ void DSRGraph::join_delta_edge(IDL::MvregEdge &&mvreg) for (auto [begin, end] = unprocessed_delta_edge_from.equal_range(to); begin != end; ++begin) { //There should not be many elements in this iteration if (std::get<0>(begin->second) == from && std::get<1>(begin->second) == type){ std::get<2>(begin->second).join(std::move(crdt_delta)); - //std::cout << "JOIN_DELTA_EDGE ID(" << from<< ", "<< to <<", "<< type << ") JOIN UNPROCESSED" << std::endl; + if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) + qDebug() << "[JOIN_EDGE] JOIN UNPROCESSED (to)" << from << to << QString::fromStdString(type); find = true; break; } } if (!find) { - //std::cout << "JOIN_DELTA_EDGE ID(" << from<< ", "<< to <<", "<< type << ") INSERT UNPROCESSED " ; - if (!cfrom) { - //std::cout << "No existe from (" << from << ") unprocessed_delta_edge_from" << std::endl; - unprocessed_delta_edge_from.emplace(from, std::tuple{to, type, crdt_delta, timestamp}); + if (!cfrom) { + if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) + qDebug() << "[JOIN_EDGE] INSERT UNPROCESSED, no from" << from << "unprocessed_delta_edge_from"; + unprocessed_delta_edge_from.emplace(from, std::tuple{to, type, crdt_delta, timestamp}); } - if (cfrom && !cto) + if (cfrom && !cto) { - //std::cout << "No existe to (" << to << ") unprocessed_delta_edge_to" << std::endl; + if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) + qDebug() << "[JOIN_EDGE] INSERT UNPROCESSED, no to" << to << "unprocessed_delta_edge_to"; unprocessed_delta_edge_to.emplace(to, std::tuple{from, type, std::move(crdt_delta), timestamp}); } } else { - // We should sync the deleted_nodes set too... - std::cout <<"TODO: Unhandled, we should sync the deleted_nodes set" << std::endl; + if (log_level <= GraphSettings::LOGLEVEL::WARNINGL) + qDebug() << "WARNING: Unhandled case, should sync deleted_nodes set"; } } else { - //std::cout << "THE EDGE IS PART OF A DELETED NODE, CLEAN FROM UNPROCESSED" << std::endl; + if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) + qDebug() << "[JOIN_EDGE] edge is part of deleted node, cleaning unprocessed"; auto node_deleted = [&](uint64_t id){ unprocessed_delta_edge_from.erase(id); unprocessed_delta_node_att.erase(id); @@ -1244,10 +1269,12 @@ void DSRGraph::join_delta_edge(IDL::MvregEdge &&mvreg) if (joined) { if (signal) { - //std::cout << "[JOIN EDGE] add edge: "<< from << ", " << to << ", " << type << std::endl; + if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) + qDebug() << "[JOIN_EDGE] add edge:" << from << to << QString::fromStdString(type); emitter.update_edge_signal(from, to, type, SignalInfo{ mvreg.agent_id() }); } else { - //std::cout << "[JOIN EDGE] delete edge: "<< from << ", " << to << ", " << type << std::endl; + if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) + qDebug() << "[JOIN_EDGE] delete edge:" << from << to << QString::fromStdString(type); emitter.del_edge_signal(from, to, type, SignalInfo{ mvreg.agent_id() }); if (deleted_edge.has_value()) { emitter.deleted_edge_signal(*deleted_edge, SignalInfo{ agent_id }); @@ -1299,6 +1326,8 @@ std::optional DSRGraph::join_delta_node_attr(IDL::MvregNodeAttr &&m } } if (!find) { + if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) + qDebug() << "[JOIN_NODE_ATTR] storing to unprocessed cache, node" << id << QString::fromStdString(att_name); unprocessed_delta_node_att.emplace(id, std::tuple{att_name, std::move(crdt_delta), timestamp}); } } else { @@ -1364,6 +1393,8 @@ std::optional DSRGraph::join_delta_edge_attr(IDL::MvregEdgeAttr &&m } } if (!find) { + if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) + qDebug() << "[JOIN_EDGE_ATTR] storing to unprocessed cache, edge" << from << to << QString::fromStdString(type) << QString::fromStdString(att_name); unprocessed_delta_edge_att.emplace(std::tuple{from, to, type}, std::tuple{att_name, std::move(crdt_delta), timestamp}); } } else { //If the node is deleted @@ -1422,7 +1453,8 @@ void DSRGraph::join_full_graph(IDL::OrMap &&full_graph) while (!node_handle_edge.empty()) { auto &[to, type, delta, timestamp_edge] = node_handle_edge.mapped(); auto att_key = std::tuple{id, to, type}; - //std::cout << "Procesando delta edge almacenado (unprocessed_delta_edge_from) " << id<< ", " < #include #include +#include void DSR::QueuedSignalRunner::run_update_node_signal(std::uint64_t a, const std::string &b, SignalInfo c) { - + qDebug() << "[SIGNAL] update_node id:" << a << "type:" << QString::fromStdString(b); tp.spawn_task([=, this] { for (auto fn : uns_fns) { if (fn) @@ -15,6 +16,7 @@ void DSR::QueuedSignalRunner::run_update_node_signal(std::uint64_t a, } void DSR::QueuedSignalRunner::run_update_node_attr_signal( std::uint64_t a, const std::vector &b, SignalInfo c) { + qDebug() << "[SIGNAL] update_node_attr id:" << a; tp.spawn_task([=, this] { for (auto fn : unas_fns) { if (fn) @@ -26,6 +28,7 @@ void DSR::QueuedSignalRunner::run_update_edge_signal(std::uint64_t a, std::uint64_t b, const std::string &c, SignalInfo d) { + qDebug() << "[SIGNAL] update_edge from:" << a << "to:" << b << "type:" << QString::fromStdString(c); tp.spawn_task([=, this] { for (auto fn : ues_fns) { if (fn) @@ -37,6 +40,7 @@ void DSR::QueuedSignalRunner::run_update_edge_signal(std::uint64_t a, void DSR::QueuedSignalRunner::run_update_edge_attr_signal( std::uint64_t a, std::uint64_t b, const std::string &c, const std::vector &d, SignalInfo e) { + qDebug() << "[SIGNAL] update_edge_attr from:" << a << "to:" << b << "type:" << QString::fromStdString(c); tp.spawn_task([=, this] { for (auto fn : ueas_fns) { if (fn) @@ -49,6 +53,7 @@ void DSR::QueuedSignalRunner::run_del_edge_signal(std::uint64_t a, std::uint64_t b, const std::string &c, SignalInfo d) { + qDebug() << "[SIGNAL] del_edge from:" << a << "to:" << b << "type:" << QString::fromStdString(c); tp.spawn_task([=, this] { for (auto fn : des_fns) { if (fn) @@ -58,6 +63,7 @@ void DSR::QueuedSignalRunner::run_del_edge_signal(std::uint64_t a, } void DSR::QueuedSignalRunner::run_del_node_signal(std::uint64_t a, SignalInfo b) { + qDebug() << "[SIGNAL] del_node id:" << a; tp.spawn_task([=, this] { for (auto fn : den_fns) { if (fn) @@ -68,6 +74,7 @@ void DSR::QueuedSignalRunner::run_del_node_signal(std::uint64_t a, void DSR::QueuedSignalRunner::run_deleted_node_signal(const Node &a, SignalInfo b) { + qDebug() << "[SIGNAL] deleted_node name:" << QString::fromStdString(a.name()) << "id:" << a.id(); tp.spawn_task([=, this] { for (auto fn : dn_fns) { if (fn) @@ -77,10 +84,11 @@ void DSR::QueuedSignalRunner::run_deleted_node_signal(const Node &a, } void DSR::QueuedSignalRunner::run_deleted_edge_signal(const Edge &a, SignalInfo b) { + qDebug() << "[SIGNAL] deleted_edge from:" << a.from() << "to:" << a.to() << "type:" << QString::fromStdString(a.type()); tp.spawn_task([=, this] { for (auto fn : de_fns) { if (fn) fn(a); } }); -} \ No newline at end of file +} diff --git a/gui/viewers/graph_viewer/graph_viewer.cpp b/gui/viewers/graph_viewer/graph_viewer.cpp index 94e4d6b..b31b476 100644 --- a/gui/viewers/graph_viewer/graph_viewer.cpp +++ b/gui/viewers/graph_viewer/graph_viewer.cpp @@ -280,7 +280,7 @@ GraphEdge* GraphViewer::new_visual_edge(std::uint64_t from, std::uint64_t to, co void GraphViewer::del_edge_SLOT(std::uint64_t from, std::uint64_t to, const std::string &edge_tag) { - qDebug()<<__FUNCTION__<<":"<<__LINE__; + qDebug()<<__FUNCTION__<<"from:"< key = std::make_tuple(from, to, edge_tag); @@ -311,7 +311,7 @@ void GraphViewer::del_edge_SLOT(std::uint64_t from, std::uint64_t to, const std: // remove node from scene void GraphViewer::del_node_SLOT(uint64_t id) { - qDebug()<<__FUNCTION__<<":"<<__LINE__; + qDebug()<<__FUNCTION__<<"node id:"< 0) { @@ -333,8 +333,10 @@ void GraphViewer::del_node_SLOT(uint64_t id) void GraphViewer::hide_show_node_SLOT(uint64_t id, bool visible) { auto it = gmap.find(id); - if (it == gmap.end() || it->second == nullptr) + if (it == gmap.end() || it->second == nullptr) { + qDebug() << __FUNCTION__ << "skipping missing node" << id; return; + } auto item = it->second; item->setVisible(visible); for (const auto &gedge: item->edgeList) diff --git a/gui/viewers/tree_viewer/tree_viewer.cpp b/gui/viewers/tree_viewer/tree_viewer.cpp index 4a7ce72..222b3fb 100644 --- a/gui/viewers/tree_viewer/tree_viewer.cpp +++ b/gui/viewers/tree_viewer/tree_viewer.cpp @@ -71,6 +71,7 @@ void TreeViewer::add_or_assign_node_SLOT(uint64_t id, const std::string &type, auto node = G->get_node(id); if (tree_map.count(id)==0) { + qDebug() << __FUNCTION__ << "new node id:" << id << "type:" << QString::fromStdString(type); QTreeWidgetItem* symbol_widget = nullptr; if (types_map.count(type)) { symbol_widget = types_map[type]; @@ -131,8 +132,7 @@ void TreeViewer::del_edge_SLOT(const std::uint64_t from, const std::uint64_t to, void TreeViewer::del_node_SLOT(uint64_t id) { - - qDebug()<<__FUNCTION__<<":"<<__LINE__; + qDebug()<<__FUNCTION__<<"node id:"< 0) { auto item = tree_map[id]; this->invisibleRootItem()->removeChild(item); @@ -159,6 +159,8 @@ void TreeViewer::node_change_SLOT(int value, uint64_t id, const std::string &typ { qDebug()<<"Emitting signal for "<(this->itemWidget(parent,0))->text(); emit node_check_state_changed(value, id, type, parent); + } else { + qDebug() << __FUNCTION__ << "skipping, node no longer exists in G, id:" << id; } } From d2d1aeac76ffa0c8b7eb2b5e9dafde88cb3e987a Mon Sep 17 00:00:00 2001 From: juancarlosgg Date: Mon, 16 Feb 2026 13:34:05 +0100 Subject: [PATCH 4/5] fix: send unordered edge signal after sync --- api/dsr_api.cpp | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/api/dsr_api.cpp b/api/dsr_api.cpp index c206c10..351bf01 100644 --- a/api/dsr_api.cpp +++ b/api/dsr_api.cpp @@ -1000,6 +1000,7 @@ void DSRGraph::join_delta_node(IDL::MvregNode &&mvreg) std::unordered_set,hash_tuple> map_new_to_edges = {}; + std::unordered_set,hash_tuple> map_new_from_edges = {}; auto consume_unprocessed_deltas = [&](){ decltype(unprocessed_delta_node_att)::node_type node_handle_node_att = unprocessed_delta_node_att.extract(id); @@ -1047,7 +1048,7 @@ void DSRGraph::join_delta_node(IDL::MvregNode &&mvreg) if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) qDebug() << "[JOIN_NODE] unprocessed_delta_edge_to" << from << id << QString::fromStdString(type) << (timestamp < timestamp_edge); if (timestamp < timestamp_edge) { - process_delta_edge(from, id, type, std::move(delta)); + if (process_delta_edge(from, id, type, std::move(delta))) map_new_from_edges.emplace(from, id, type); } if (nodes.contains(from) and nodes.at(from).read_reg().fano().contains({id, type})) { decltype(unprocessed_delta_edge_att)::node_type node_handle_edge_att = unprocessed_delta_edge_att.extract(att_key); @@ -1104,6 +1105,13 @@ void DSRGraph::join_delta_node(IDL::MvregNode &&mvreg) qDebug() << "[JOIN_NODE] add edge TO:" << k << id << QString::fromStdString(v); emitter.update_edge_signal(k, id, v, SignalInfo{ mvreg.agent_id() }); } + + for (const auto &[from, to, type]: map_new_from_edges) + { + if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) + qDebug() << "[JOIN_NODE] add edge FROM (unprocessed_to):" << from << to << QString::fromStdString(type); + emitter.update_edge_signal(from, to, type, SignalInfo{ mvreg.agent_id() }); + } } else { emitter.del_node_signal(id, SignalInfo{ mvreg.agent_id() }); if (maybe_deleted_node.has_value()) { From 3163fb7c92f7cdc247936ec1cb101a6a0813daa7 Mon Sep 17 00:00:00 2001 From: juancarlosgg Date: Mon, 16 Feb 2026 14:53:01 +0100 Subject: [PATCH 5/5] debug: improve logging --- api/dsr_api.cpp | 111 +++++++++-------------- api/dsr_signal_emitter.cpp | 18 ++-- api/include/dsr/api/dsr_api.h | 18 ++-- api/include/dsr/api/dsr_logging.h | 69 ++++++++++++++ api/include/dsr/api/dsr_signal_emitter.h | 3 +- 5 files changed, 134 insertions(+), 85 deletions(-) create mode 100644 api/include/dsr/api/dsr_logging.h diff --git a/api/dsr_api.cpp b/api/dsr_api.cpp index 351bf01..6b3759a 100644 --- a/api/dsr_api.cpp +++ b/api/dsr_api.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include @@ -207,8 +208,7 @@ std::optional DSRGraph::insert_node(No &&node) if (delta.has_value()) { dsrpub_node.write(&delta.value()); - if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) - qDebug() << "[INSERT_NODE] emitting update_node_signal" << node.id() << QString::fromStdString(node.type()); + DSR_LOG_DEBUG("[INSERT_NODE] emitting update_node_signal", node.id(), node.type()); emitter.update_node_signal(node.id(), node.type(), SignalInfo{agent_id}); for (const auto &[k, v]: node.fano()) { @@ -298,8 +298,7 @@ requires (std::is_same_v, DSR::Node>) if (!copy) { if (vec_node_attr.has_value()) { dsrpub_node_attrs.write(&vec_node_attr.value()); - if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) - qDebug() << "[UPDATE_NODE] emitting update_node_signal" << node.id() << QString::fromStdString(node.type()); + DSR_LOG_DEBUG("[UPDATE_NODE] emitting update_node_signal", node.id(), node.type()); emitter.update_node_signal(node.id(), node.type(), SignalInfo{agent_id}); std::vector atts_names(vec_node_attr->size()); std::transform(std::make_move_iterator(vec_node_attr->begin()), @@ -392,8 +391,7 @@ bool DSRGraph::delete_node(const std::string &name) if (result) { if (!copy) { - if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) - qDebug() << "[DELETE_NODE] emitting del_node_signal" << id.value(); + DSR_LOG_DEBUG("[DELETE_NODE] emitting del_node_signal", id.value()); emitter.del_node_signal(id.value(), SignalInfo{agent_id}); if (node_signal) emitter.deleted_node_signal(*node_signal, SignalInfo{agent_id}); dsrpub_node.write(&deleted_node.value()); @@ -427,8 +425,7 @@ bool DSRGraph::delete_node(uint64_t id) if (result) { if (!copy) { - if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) - qDebug() << "[DELETE_NODE] emitting del_node_signal" << id; + DSR_LOG_DEBUG("[DELETE_NODE] emitting del_node_signal", id); emitter.del_node_signal(id, SignalInfo{ agent_id }); if (node_signal) emitter.deleted_node_signal(*node_signal, SignalInfo{agent_id}); dsrpub_node.write(&deleted_node.value()); @@ -645,8 +642,7 @@ requires (std::is_same_v, DSR::Edge>) } if (result) { if (!copy) { - if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) - qDebug() << "[INSERT_OR_ASSIGN_EDGE] emitting update_edge_signal" << attrs.from() << attrs.to() << QString::fromStdString(attrs.type()); + DSR_LOG_DEBUG("[INSERT_OR_ASSIGN_EDGE] emitting update_edge_signal", attrs.from(), attrs.to(), attrs.type()); emitter.update_edge_signal(attrs.from(), attrs.to(), attrs.type(), SignalInfo{ agent_id }); if (delta_edge.has_value()) { //Insert @@ -702,8 +698,7 @@ bool DSRGraph::delete_edge(uint64_t from, uint64_t to, const std::string &key) if (delta.has_value()) { if (!copy) { - if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) - qDebug() << "[DELETE_EDGE] emitting del_edge_signal" << from << to << QString::fromStdString(key); + DSR_LOG_DEBUG("[DELETE_EDGE] emitting del_edge_signal", from, to, key); emitter.del_edge_signal(from, to, key, SignalInfo{ agent_id }); if (deleted_edge.has_value()) { emitter.deleted_edge_signal(*deleted_edge, SignalInfo{ agent_id }); @@ -980,6 +975,7 @@ void DSRGraph::join_delta_node(IDL::MvregNode &&mvreg) auto id = mvreg.id(); uint64_t timestamp = mvreg.timestamp(); + DSR_LOG_DEBUG("[JOIN_NODE] id:", id, "timestamp:", timestamp, "agent:", mvreg.agent_id()); auto crdt_delta = IDLNode_to_CRDT(std::move(mvreg)); bool d_empty = crdt_delta.empty(); @@ -1007,8 +1003,7 @@ void DSRGraph::join_delta_node(IDL::MvregNode &&mvreg) while (!node_handle_node_att.empty()) { auto &[att_name, delta, timestamp_node_att] = node_handle_node_att.mapped(); - if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) - qDebug() << "[JOIN_NODE] node_att" << id << QString::fromStdString(att_name) << (timestamp < timestamp_node_att); + DSR_LOG_DEBUG("[JOIN_NODE] node_att", id, att_name, (timestamp < timestamp_node_att)); if (timestamp < timestamp_node_att) { process_delta_node_attr(id, att_name,std::move(delta)); } @@ -1019,8 +1014,7 @@ void DSRGraph::join_delta_node(IDL::MvregNode &&mvreg) while (!node_handle_edge.empty()) { auto &[to, type, delta, timestamp_edge] = node_handle_edge.mapped(); auto att_key = std::tuple{id, to, type}; - if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) - qDebug() << "[JOIN_NODE] unprocessed_delta_edge_from" << id << to << QString::fromStdString(type) << (timestamp < timestamp_edge); + DSR_LOG_DEBUG("[JOIN_NODE] unprocessed_delta_edge_from", id, to, type, (timestamp < timestamp_edge)); if (timestamp < timestamp_edge) { if (process_delta_edge(id, to, type, std::move(delta))) map_new_to_edges.emplace(to, type); } @@ -1028,8 +1022,7 @@ void DSRGraph::join_delta_node(IDL::MvregNode &&mvreg) decltype(unprocessed_delta_edge_att)::node_type node_handle_edge_att = unprocessed_delta_edge_att.extract(att_key); while (!node_handle_edge_att.empty()) { auto &[att_name, delta, timestamp_edge_att] = node_handle_edge_att.mapped(); - if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) - qDebug() << "[JOIN_NODE] edge_att" << id << to << QString::fromStdString(type) << QString::fromStdString(att_name) << (timestamp < timestamp_edge); + DSR_LOG_DEBUG("[JOIN_NODE] edge_att", id, to, type, att_name, (timestamp < timestamp_edge)); if (timestamp < timestamp_edge_att) { process_delta_edge_attr(id, to, type, att_name, std::move(delta)); } @@ -1045,8 +1038,7 @@ void DSRGraph::join_delta_node(IDL::MvregNode &&mvreg) while (!node_handle_edge.empty()) { auto &[from, type, delta, timestamp_edge] = node_handle_edge.mapped(); auto att_key = std::tuple{from, id, type}; - if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) - qDebug() << "[JOIN_NODE] unprocessed_delta_edge_to" << from << id << QString::fromStdString(type) << (timestamp < timestamp_edge); + DSR_LOG_DEBUG("[JOIN_NODE] unprocessed_delta_edge_to", from, id, type, (timestamp < timestamp_edge)); if (timestamp < timestamp_edge) { if (process_delta_edge(from, id, type, std::move(delta))) map_new_from_edges.emplace(from, id, type); } @@ -1054,8 +1046,7 @@ void DSRGraph::join_delta_node(IDL::MvregNode &&mvreg) decltype(unprocessed_delta_edge_att)::node_type node_handle_edge_att = unprocessed_delta_edge_att.extract(att_key); while (!node_handle_edge_att.empty()) { auto &[att_name, delta, timestamp_edge_att] = node_handle_edge_att.mapped(); - if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) - qDebug() << "[JOIN_NODE] edge_att" << from << id << QString::fromStdString(type) << QString::fromStdString(att_name) << (timestamp < timestamp_edge); + DSR_LOG_DEBUG("[JOIN_NODE] edge_att", from, id, type, att_name, (timestamp < timestamp_edge)); if (timestamp < timestamp_edge_att) { process_delta_edge_attr(from, id, type, att_name, std::move(delta)); } @@ -1092,34 +1083,32 @@ void DSRGraph::join_delta_node(IDL::MvregNode &&mvreg) if (joined) { if (signal) { + DSR_LOG_DEBUG("[JOIN_NODE] node inserted/updated:", id, nodes.at(id).read_reg().type()); emitter.update_node_signal(id, nodes.at(id).read_reg().type(), SignalInfo{ mvreg.agent_id() }); for (const auto &[k, v] : nodes.at(id).read_reg().fano()) { - if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) - qDebug() << "[JOIN_NODE] add edge FROM:" << id << k.first << QString::fromStdString(k.second); + DSR_LOG_DEBUG("[JOIN_NODE] add edge FROM:", id, k.first, k.second); emitter.update_edge_signal(id, k.first, k.second, SignalInfo{ mvreg.agent_id() }); } for (const auto &[k, v]: map_new_to_edges) { - if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) - qDebug() << "[JOIN_NODE] add edge TO:" << k << id << QString::fromStdString(v); + DSR_LOG_DEBUG("[JOIN_NODE] add edge TO:", k, id, v); emitter.update_edge_signal(k, id, v, SignalInfo{ mvreg.agent_id() }); } for (const auto &[from, to, type]: map_new_from_edges) { - if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) - qDebug() << "[JOIN_NODE] add edge FROM (unprocessed_to):" << from << to << QString::fromStdString(type); + DSR_LOG_DEBUG("[JOIN_NODE] add edge FROM (unprocessed_to):", from, to, type); emitter.update_edge_signal(from, to, type, SignalInfo{ mvreg.agent_id() }); } } else { + DSR_LOG_DEBUG("[JOIN_NODE] node deleted:", id); emitter.del_node_signal(id, SignalInfo{ mvreg.agent_id() }); if (maybe_deleted_node.has_value()) { Node tmp_node(*maybe_deleted_node); emitter.deleted_node_signal(tmp_node, SignalInfo{ agent_id }); for (const auto &node: maybe_deleted_node->fano()) { - if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) - qDebug() << "[JOIN_NODE] delete edge FROM:" << node.second.read_reg().from() << node.second.read_reg().to() << QString::fromStdString(node.second.read_reg().type()); + DSR_LOG_DEBUG("[JOIN_NODE] delete edge FROM:", node.second.read_reg().from(), node.second.read_reg().to(), node.second.read_reg().type()); emitter.del_edge_signal(node.second.read_reg().from(), node.second.read_reg().to(), node.second.read_reg().type(), SignalInfo{ mvreg.agent_id() }); Edge tmp_edge(node.second.read_reg()); @@ -1129,8 +1118,7 @@ void DSRGraph::join_delta_node(IDL::MvregNode &&mvreg) //TODO: deleted_edge_signal. update_maps_node_delete was called before so the maps are probably wrong... for (const auto &[from, type] : cache_map_to_edges.value()) { - if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) - qDebug() << "[JOIN_NODE] delete edge TO:" << from << id << QString::fromStdString(type); + DSR_LOG_DEBUG("[JOIN_NODE] delete edge TO:", from, id, type); emitter.del_edge_signal(from, id, type, SignalInfo{ mvreg.agent_id() }); //emitter.deleted_edge_signal(Edge(node.second.read_reg())); TODO: fix this } @@ -1168,6 +1156,7 @@ void DSRGraph::join_delta_edge(IDL::MvregEdge &&mvreg) auto from = mvreg.id(); auto to = mvreg.to(); std::string type = mvreg.type(); + DSR_LOG_DEBUG("[JOIN_EDGE] from:", from, "to:", to, "type:", type, "agent:", mvreg.agent_id()); uint64_t timestamp = mvreg.timestamp(); @@ -1228,8 +1217,7 @@ void DSRGraph::join_delta_edge(IDL::MvregEdge &&mvreg) for (auto [begin, end] = unprocessed_delta_edge_from.equal_range(from); begin != end; ++begin) { //There should not be many elements in this iteration if (std::get<0>(begin->second) == to && std::get<1>(begin->second) == type){ std::get<2>(begin->second).join(::mvreg(crdt_delta)); - if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) - qDebug() << "[JOIN_EDGE] JOIN UNPROCESSED (from)" << from << to << QString::fromStdString(type); + DSR_LOG_DEBUG("[JOIN_EDGE] JOIN UNPROCESSED (from)", from, to, type); find = true; break; } } @@ -1237,31 +1225,26 @@ void DSRGraph::join_delta_edge(IDL::MvregEdge &&mvreg) for (auto [begin, end] = unprocessed_delta_edge_from.equal_range(to); begin != end; ++begin) { //There should not be many elements in this iteration if (std::get<0>(begin->second) == from && std::get<1>(begin->second) == type){ std::get<2>(begin->second).join(std::move(crdt_delta)); - if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) - qDebug() << "[JOIN_EDGE] JOIN UNPROCESSED (to)" << from << to << QString::fromStdString(type); + DSR_LOG_DEBUG("[JOIN_EDGE] JOIN UNPROCESSED (to)", from, to, type); find = true; break; } } if (!find) { if (!cfrom) { - if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) - qDebug() << "[JOIN_EDGE] INSERT UNPROCESSED, no from" << from << "unprocessed_delta_edge_from"; + DSR_LOG_DEBUG("[JOIN_EDGE] INSERT UNPROCESSED, no from", from, "unprocessed_delta_edge_from"); unprocessed_delta_edge_from.emplace(from, std::tuple{to, type, crdt_delta, timestamp}); } if (cfrom && !cto) { - if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) - qDebug() << "[JOIN_EDGE] INSERT UNPROCESSED, no to" << to << "unprocessed_delta_edge_to"; + DSR_LOG_DEBUG("[JOIN_EDGE] INSERT UNPROCESSED, no to", to, "unprocessed_delta_edge_to"); unprocessed_delta_edge_to.emplace(to, std::tuple{from, type, std::move(crdt_delta), timestamp}); } } else { - if (log_level <= GraphSettings::LOGLEVEL::WARNINGL) - qDebug() << "WARNING: Unhandled case, should sync deleted_nodes set"; + DSR_LOG_WARNING("Unhandled case, should sync deleted_nodes set"); } } else { - if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) - qDebug() << "[JOIN_EDGE] edge is part of deleted node, cleaning unprocessed"; + DSR_LOG_DEBUG("[JOIN_EDGE] edge is part of deleted node, cleaning unprocessed"); auto node_deleted = [&](uint64_t id){ unprocessed_delta_edge_from.erase(id); unprocessed_delta_node_att.erase(id); @@ -1277,12 +1260,10 @@ void DSRGraph::join_delta_edge(IDL::MvregEdge &&mvreg) if (joined) { if (signal) { - if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) - qDebug() << "[JOIN_EDGE] add edge:" << from << to << QString::fromStdString(type); + DSR_LOG_DEBUG("[JOIN_EDGE] add edge:", from, to, type); emitter.update_edge_signal(from, to, type, SignalInfo{ mvreg.agent_id() }); } else { - if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) - qDebug() << "[JOIN_EDGE] delete edge:" << from << to << QString::fromStdString(type); + DSR_LOG_DEBUG("[JOIN_EDGE] delete edge:", from, to, type); emitter.del_edge_signal(from, to, type, SignalInfo{ mvreg.agent_id() }); if (deleted_edge.has_value()) { emitter.deleted_edge_signal(*deleted_edge, SignalInfo{ agent_id }); @@ -1316,6 +1297,7 @@ std::optional DSRGraph::join_delta_node_attr(IDL::MvregNodeAttr &&m auto id = mvreg.id(); std::string att_name = mvreg.attr_name(); uint64_t timestamp = mvreg.timestamp(); + DSR_LOG_DEBUG("[JOIN_NODE_ATTR] node:", id, "attr:", att_name); { auto crdt_delta = IDLNodeAttr_to_CRDT(std::move(mvreg)); std::unique_lock lock(_mutex); @@ -1334,8 +1316,7 @@ std::optional DSRGraph::join_delta_node_attr(IDL::MvregNodeAttr &&m } } if (!find) { - if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) - qDebug() << "[JOIN_NODE_ATTR] storing to unprocessed cache, node" << id << QString::fromStdString(att_name); + DSR_LOG_DEBUG("[JOIN_NODE_ATTR] storing to unprocessed cache, node", id, att_name); unprocessed_delta_node_att.emplace(id, std::tuple{att_name, std::move(crdt_delta), timestamp}); } } else { @@ -1381,6 +1362,7 @@ std::optional DSRGraph::join_delta_edge_attr(IDL::MvregEdgeAttr &&m std::string type = mvreg.type(); std::string att_name = mvreg.attr_name(); uint64_t timestamp = mvreg.timestamp(); + DSR_LOG_DEBUG("[JOIN_EDGE_ATTR] edge:", from, to, type, "attr:", att_name); { auto crdt_delta = IDLEdgeAttr_to_CRDT(std::move(mvreg)); @@ -1401,8 +1383,7 @@ std::optional DSRGraph::join_delta_edge_attr(IDL::MvregEdgeAttr &&m } } if (!find) { - if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) - qDebug() << "[JOIN_EDGE_ATTR] storing to unprocessed cache, edge" << from << to << QString::fromStdString(type) << QString::fromStdString(att_name); + DSR_LOG_DEBUG("[JOIN_EDGE_ATTR] storing to unprocessed cache, edge", from, to, type, att_name); unprocessed_delta_edge_att.emplace(std::tuple{from, to, type}, std::tuple{att_name, std::move(crdt_delta), timestamp}); } } else { //If the node is deleted @@ -1461,8 +1442,7 @@ void DSRGraph::join_full_graph(IDL::OrMap &&full_graph) while (!node_handle_edge.empty()) { auto &[to, type, delta, timestamp_edge] = node_handle_edge.mapped(); auto att_key = std::tuple{id, to, type}; - if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) - qDebug() << "[JOIN_FULL] unprocessed_delta_edge_from" << id << to << QString::fromStdString(type) << (timestamp < timestamp_edge); + DSR_LOG_DEBUG("[JOIN_FULL] unprocessed_delta_edge_from", id, to, type, (timestamp < timestamp_edge)); if (timestamp < timestamp_edge) { //TODO: este se debería hacer después de insertar el nodo? process_delta_edge(id, to, type, std::move(delta)); @@ -1471,8 +1451,7 @@ void DSRGraph::join_full_graph(IDL::OrMap &&full_graph) decltype(unprocessed_delta_edge_att)::node_type node_handle_edge_att = unprocessed_delta_edge_att.extract(att_key); while (!node_handle_edge_att.empty()) { auto &[att_name, delta, timestamp_edge_att] = node_handle_edge_att.mapped(); - if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) - qDebug() << "[JOIN_FULL] edge_att" << id << to << QString::fromStdString(type) << QString::fromStdString(att_name) << (timestamp < timestamp_edge); + DSR_LOG_DEBUG("[JOIN_FULL] edge_att", id, to, type, att_name, (timestamp < timestamp_edge)); if (timestamp < timestamp_edge_att) { process_delta_edge_attr(id, to, type, att_name, std::move(delta)); } @@ -1490,8 +1469,7 @@ void DSRGraph::join_full_graph(IDL::OrMap &&full_graph) while (!node_handle_edge.empty()) { auto &[from, type, delta, timestamp_edge] = node_handle_edge.mapped(); auto att_key = std::tuple{from, id, type}; - if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) - qDebug() << "[JOIN_FULL] unprocessed_delta_edge_to" << from << id << QString::fromStdString(type) << (timestamp < timestamp_edge); + DSR_LOG_DEBUG("[JOIN_FULL] unprocessed_delta_edge_to", from, id, type, (timestamp < timestamp_edge)); if (timestamp < timestamp_edge) { process_delta_edge(from, id, type, std::move(delta)); } @@ -1499,8 +1477,7 @@ void DSRGraph::join_full_graph(IDL::OrMap &&full_graph) decltype(unprocessed_delta_edge_att)::node_type node_handle_edge_att = unprocessed_delta_edge_att.extract(att_key); while (!node_handle_edge_att.empty()) { auto &[att_name, delta, timestamp_edge_att] = node_handle_edge_att.mapped(); - if (log_level <= GraphSettings::LOGLEVEL::DEBUGL) - qDebug() << "[JOIN_FULL] edge_att" << from << id << QString::fromStdString(type) << QString::fromStdString(att_name) << (timestamp < timestamp_edge); + DSR_LOG_DEBUG("[JOIN_FULL] edge_att", from, id, type, att_name, (timestamp < timestamp_edge)); if (timestamp < timestamp_edge_att) { process_delta_edge_attr(from, id, type, att_name, std::move(delta)); } @@ -1602,7 +1579,7 @@ std::map DSRGraph::Map() return m; } -void print_sample_info(const eprosima::fastdds::dds::SampleInfo& info) { +void print_sample_info(DSR::GraphSettings::LOGLEVEL log_level, const eprosima::fastdds::dds::SampleInfo& info) { double milliseconds_send = static_cast(info.source_timestamp.seconds()) * 1000.0; milliseconds_send += static_cast(info.source_timestamp.nanosec()) / 1000000.0; @@ -1638,7 +1615,7 @@ void DSRGraph::node_subscription_thread() eprosima::fastdds::dds::SampleInfo m_info; IDL::MvregNode sample; if (reader->take_next_sample(&sample, &m_info) == 0) { - if (showReceived == GraphSettings::LOGLEVEL::DEBUGL) print_sample_info(m_info); + if (showReceived == GraphSettings::LOGLEVEL::DEBUGL) print_sample_info(showReceived, m_info); if (m_info.valid_data) { if (sample.agent_id() != agent_id) { if (showReceived == GraphSettings::LOGLEVEL::DEBUGL) { @@ -1672,7 +1649,7 @@ void DSRGraph::edge_subscription_thread() eprosima::fastdds::dds::SampleInfo m_info; IDL::MvregEdge sample; if (reader->take_next_sample(&sample, &m_info) == 0) { - if (showReceived == GraphSettings::LOGLEVEL::DEBUGL) print_sample_info(m_info); + if (showReceived == GraphSettings::LOGLEVEL::DEBUGL) print_sample_info(showReceived, m_info); if (m_info.valid_data) { if (sample.agent_id() != agent_id) { if (showReceived == GraphSettings::LOGLEVEL::DEBUGL) { @@ -1707,7 +1684,7 @@ void DSRGraph::edge_attrs_subscription_thread() eprosima::fastdds::dds::SampleInfo m_info; IDL::MvregEdgeAttrVec samples; if (reader->take_next_sample(&samples, &m_info) == 0) { - if (showReceived == GraphSettings::LOGLEVEL::DEBUGL) print_sample_info(m_info); + if (showReceived == GraphSettings::LOGLEVEL::DEBUGL) print_sample_info(showReceived, m_info); if (m_info.valid_data) { if (showReceived == GraphSettings::LOGLEVEL::DEBUGL) { qDebug() << name << " Received:" << samples.vec().size() << " edge attr from: " @@ -1775,7 +1752,7 @@ void DSRGraph::node_attrs_subscription_thread() eprosima::fastdds::dds::SampleInfo m_info; IDL::MvregNodeAttrVec samples; if (reader->take_next_sample(&samples, &m_info) == 0) { - if (showReceived == GraphSettings::LOGLEVEL::DEBUGL) print_sample_info(m_info); + if (showReceived == GraphSettings::LOGLEVEL::DEBUGL) print_sample_info(showReceived, m_info); if (m_info.valid_data) { if (showReceived == GraphSettings::LOGLEVEL::DEBUGL) { qDebug() << name << " Received:" << samples.vec().size() << " node attrs from: " @@ -1840,7 +1817,7 @@ void DSRGraph::fullgraph_server_thread() eprosima::fastdds::dds::SampleInfo m_info; IDL::GraphRequest sample; if (reader->take_next_sample(&sample, &m_info) == 0) { - if (log_level == GraphSettings::LOGLEVEL::DEBUGL) print_sample_info(m_info); + if (log_level == GraphSettings::LOGLEVEL::DEBUGL) print_sample_info(log_level, m_info); if (m_info.valid_data) { { std::unique_lock lck(participant_set_mutex); @@ -1896,7 +1873,7 @@ std::pair DSRGraph::fullgraph_request_thread() eprosima::fastdds::dds::SampleInfo m_info; IDL::OrMap sample; if (reader->take_next_sample(&sample, &m_info) == 0) { - if (log_level == GraphSettings::LOGLEVEL::DEBUGL) print_sample_info(m_info); + if (log_level == GraphSettings::LOGLEVEL::DEBUGL) print_sample_info(log_level, m_info); if (m_info.valid_data) { if (sample.id() != graph->get_agent_id()) { if (sample.id() != static_cast(-1)) { diff --git a/api/dsr_signal_emitter.cpp b/api/dsr_signal_emitter.cpp index 02bad8e..3ab6dfc 100644 --- a/api/dsr_signal_emitter.cpp +++ b/api/dsr_signal_emitter.cpp @@ -1,12 +1,12 @@ #include #include #include -#include +#include void DSR::QueuedSignalRunner::run_update_node_signal(std::uint64_t a, const std::string &b, SignalInfo c) { - qDebug() << "[SIGNAL] update_node id:" << a << "type:" << QString::fromStdString(b); + DSR_LOG_DEBUG_L(static_cast(log_level), "[SIGNAL] update_node id:", a, "type:", b); tp.spawn_task([=, this] { for (auto fn : uns_fns) { if (fn) @@ -16,7 +16,7 @@ void DSR::QueuedSignalRunner::run_update_node_signal(std::uint64_t a, } void DSR::QueuedSignalRunner::run_update_node_attr_signal( std::uint64_t a, const std::vector &b, SignalInfo c) { - qDebug() << "[SIGNAL] update_node_attr id:" << a; + DSR_LOG_DEBUG_L(static_cast(log_level), "[SIGNAL] update_node_attr id:", a); tp.spawn_task([=, this] { for (auto fn : unas_fns) { if (fn) @@ -28,7 +28,7 @@ void DSR::QueuedSignalRunner::run_update_edge_signal(std::uint64_t a, std::uint64_t b, const std::string &c, SignalInfo d) { - qDebug() << "[SIGNAL] update_edge from:" << a << "to:" << b << "type:" << QString::fromStdString(c); + DSR_LOG_DEBUG_L(static_cast(log_level), "[SIGNAL] update_edge from:", a, "to:", b, "type:", c); tp.spawn_task([=, this] { for (auto fn : ues_fns) { if (fn) @@ -40,7 +40,7 @@ void DSR::QueuedSignalRunner::run_update_edge_signal(std::uint64_t a, void DSR::QueuedSignalRunner::run_update_edge_attr_signal( std::uint64_t a, std::uint64_t b, const std::string &c, const std::vector &d, SignalInfo e) { - qDebug() << "[SIGNAL] update_edge_attr from:" << a << "to:" << b << "type:" << QString::fromStdString(c); + DSR_LOG_DEBUG_L(static_cast(log_level), "[SIGNAL] update_edge_attr from:", a, "to:", b, "type:", c); tp.spawn_task([=, this] { for (auto fn : ueas_fns) { if (fn) @@ -53,7 +53,7 @@ void DSR::QueuedSignalRunner::run_del_edge_signal(std::uint64_t a, std::uint64_t b, const std::string &c, SignalInfo d) { - qDebug() << "[SIGNAL] del_edge from:" << a << "to:" << b << "type:" << QString::fromStdString(c); + DSR_LOG_DEBUG_L(static_cast(log_level), "[SIGNAL] del_edge from:", a, "to:", b, "type:", c); tp.spawn_task([=, this] { for (auto fn : des_fns) { if (fn) @@ -63,7 +63,7 @@ void DSR::QueuedSignalRunner::run_del_edge_signal(std::uint64_t a, } void DSR::QueuedSignalRunner::run_del_node_signal(std::uint64_t a, SignalInfo b) { - qDebug() << "[SIGNAL] del_node id:" << a; + DSR_LOG_DEBUG_L(static_cast(log_level), "[SIGNAL] del_node id:", a); tp.spawn_task([=, this] { for (auto fn : den_fns) { if (fn) @@ -74,7 +74,7 @@ void DSR::QueuedSignalRunner::run_del_node_signal(std::uint64_t a, void DSR::QueuedSignalRunner::run_deleted_node_signal(const Node &a, SignalInfo b) { - qDebug() << "[SIGNAL] deleted_node name:" << QString::fromStdString(a.name()) << "id:" << a.id(); + DSR_LOG_DEBUG_L(static_cast(log_level), "[SIGNAL] deleted_node name:", a.name(), "id:", a.id()); tp.spawn_task([=, this] { for (auto fn : dn_fns) { if (fn) @@ -84,7 +84,7 @@ void DSR::QueuedSignalRunner::run_deleted_node_signal(const Node &a, } void DSR::QueuedSignalRunner::run_deleted_edge_signal(const Edge &a, SignalInfo b) { - qDebug() << "[SIGNAL] deleted_edge from:" << a.from() << "to:" << a.to() << "type:" << QString::fromStdString(a.type()); + DSR_LOG_DEBUG_L(static_cast(log_level), "[SIGNAL] deleted_edge from:", a.from(), "to:", a.to(), "type:", a.type()); tp.spawn_task([=, this] { for (auto fn : de_fns) { if (fn) diff --git a/api/include/dsr/api/dsr_api.h b/api/include/dsr/api/dsr_api.h index b48dd06..682b55b 100644 --- a/api/include/dsr/api/dsr_api.h +++ b/api/include/dsr/api/dsr_api.h @@ -30,6 +30,7 @@ #include "dsr/api/dsr_utils.h" #include "dsr/api/dsr_signal_info.h" #include "dsr/api/dsr_graph_settings.h" +#include "dsr/api/dsr_logging.h" #include "dsr/api/dsr_signal_emitter.h" #include "dsr/core/types/type_checking/dsr_edge_type.h" #include "dsr/core/types/type_checking/dsr_node_type.h" @@ -575,20 +576,21 @@ namespace DSR void set_qt_signals (){ emitter = { - [this](std::uint64_t a, const std::string & b, SignalInfo c = {}) { emit update_node_signal(a, b, c); }, - [this](std::uint64_t a, const std::vector &b, SignalInfo c = {}) { emit update_node_attr_signal(a, b, c); }, - [this](std::uint64_t a, std::uint64_t b, const std::string & c, SignalInfo d = {}) { emit update_edge_signal(a, b, c, d); }, - [this](std::uint64_t a, std::uint64_t b, const std::string & c, const std::vector &d, SignalInfo e = {}) { emit update_edge_attr_signal(a, b, c, d, e); }, - [this](std::uint64_t a, std::uint64_t b, const std::string & c, SignalInfo d = {}) { emit del_edge_signal(a, b, c, d); }, - [this](std::uint64_t a, SignalInfo b = {}) { emit del_node_signal(a, b); }, - [this](const Node& a, SignalInfo b = {}) { emit deleted_node_signal(a, b); }, - [this](const Edge& a, SignalInfo b = {}) { emit deleted_edge_signal(a, b); }, + [this](std::uint64_t a, const std::string & b, SignalInfo c = {}) { DSR_LOG_DEBUG("[SIGNAL] update_node id:", a, "type:", b); emit update_node_signal(a, b, c); }, + [this](std::uint64_t a, const std::vector &b, SignalInfo c = {}) { DSR_LOG_DEBUG("[SIGNAL] update_node_attr id:", a); emit update_node_attr_signal(a, b, c); }, + [this](std::uint64_t a, std::uint64_t b, const std::string & c, SignalInfo d = {}) { DSR_LOG_DEBUG("[SIGNAL] update_edge from:", a, "to:", b, "type:", c); emit update_edge_signal(a, b, c, d); }, + [this](std::uint64_t a, std::uint64_t b, const std::string & c, const std::vector &d, SignalInfo e = {}) { DSR_LOG_DEBUG("[SIGNAL] update_edge_attr from:", a, "to:", b, "type:", c); emit update_edge_attr_signal(a, b, c, d, e); }, + [this](std::uint64_t a, std::uint64_t b, const std::string & c, SignalInfo d = {}) { DSR_LOG_DEBUG("[SIGNAL] del_edge from:", a, "to:", b, "type:", c); emit del_edge_signal(a, b, c, d); }, + [this](std::uint64_t a, SignalInfo b = {}) { DSR_LOG_DEBUG("[SIGNAL] del_node id:", a); emit del_node_signal(a, b); }, + [this](const Node& a, SignalInfo b = {}) { DSR_LOG_DEBUG("[SIGNAL] deleted_node name:", a.name(), "id:", a.id()); emit deleted_node_signal(a, b); }, + [this](const Edge& a, SignalInfo b = {}) { DSR_LOG_DEBUG("[SIGNAL] deleted_edge from:", a.from(), "to:", a.to(), "type:", a.type()); emit deleted_edge_signal(a, b); }, nullptr }; } void set_queued_signals (){ auto runner = new QueuedSignalRunner(); + runner->log_level = static_cast(log_level); emitter = { [runner](std::uint64_t a, const std::string & b, SignalInfo c = {}) { runner->run_update_node_signal(a, b, c); }, [runner](std::uint64_t a, const std::vector &b, SignalInfo c = {}) { runner->run_update_node_attr_signal(a, b, c); }, diff --git a/api/include/dsr/api/dsr_logging.h b/api/include/dsr/api/dsr_logging.h new file mode 100644 index 0000000..cfbb9a4 --- /dev/null +++ b/api/include/dsr/api/dsr_logging.h @@ -0,0 +1,69 @@ +#pragma once + +#include +#include +#include +#include + +// ANSI color codes +#define DSR_COLOR_RESET "\033[0m" +#define DSR_COLOR_DEBUG "\033[36m" // Cyan +#define DSR_COLOR_INFO "\033[32m" // Green +#define DSR_COLOR_WARNING "\033[33m" // Yellow +#define DSR_COLOR_ERROR "\033[31m" // Red + +// Logging macros for use inside DSRGraph (where log_level is a member) +#define DSR_LOG_DEBUG(...) do { if (log_level <= DSR::GraphSettings::LOGLEVEL::DEBUGL) { std::cout << DSR_COLOR_DEBUG "[DSR DEBUG] " DSR_COLOR_RESET; DSR_LOG_PRINT(__VA_ARGS__); } } while(0) +#define DSR_LOG_INFO(...) do { if (log_level <= DSR::GraphSettings::LOGLEVEL::INFOL) { std::cout << DSR_COLOR_INFO "[DSR INFO] " DSR_COLOR_RESET; DSR_LOG_PRINT(__VA_ARGS__); } } while(0) +#define DSR_LOG_WARNING(...) do { if (log_level <= DSR::GraphSettings::LOGLEVEL::WARNINGL) { std::cout << DSR_COLOR_WARNING "[DSR WARNING] " DSR_COLOR_RESET; DSR_LOG_PRINT(__VA_ARGS__); } } while(0) +#define DSR_LOG_ERROR(...) do { if (log_level <= DSR::GraphSettings::LOGLEVEL::ERRORL) { std::cout << DSR_COLOR_ERROR "[DSR ERROR] " DSR_COLOR_RESET; DSR_LOG_PRINT(__VA_ARGS__); } } while(0) + +// Logging macros with explicit log level parameter (for use outside DSRGraph, e.g. signal emitter) +#define DSR_LOG_DEBUG_L(lvl, ...) do { if ((lvl) <= DSR::GraphSettings::LOGLEVEL::DEBUGL) { std::cout << DSR_COLOR_DEBUG "[DSR DEBUG] " DSR_COLOR_RESET; DSR_LOG_PRINT(__VA_ARGS__); } } while(0) +#define DSR_LOG_INFO_L(lvl, ...) do { if ((lvl) <= DSR::GraphSettings::LOGLEVEL::INFOL) { std::cout << DSR_COLOR_INFO "[DSR INFO] " DSR_COLOR_RESET; DSR_LOG_PRINT(__VA_ARGS__); } } while(0) +#define DSR_LOG_WARNING_L(lvl, ...) do { if ((lvl) <= DSR::GraphSettings::LOGLEVEL::WARNINGL) { std::cout << DSR_COLOR_WARNING "[DSR WARNING] " DSR_COLOR_RESET; DSR_LOG_PRINT(__VA_ARGS__); } } while(0) + +// Internal: print helpers for composite types +namespace dsr_log_detail { + +template +inline void print_value(std::ostream& os, const std::pair& p) { + os << '(' << p.first << ", " << p.second << ')'; +} + +template +inline void print_value(std::ostream& os, const std::tuple& t) { + os << '('; + std::apply([&os](const auto&... args) { + size_t n = 0; + ((os << (n++ ? ", " : "") << args), ...); + }, t); + os << ')'; +} + +inline void print_value(std::ostream& os, bool v) { + os << (v ? "true" : "false"); +} + +template +inline void print_value(std::ostream& os, const T& v) { + os << v; +} + +} // namespace dsr_log_detail + +// Internal: print variadic args separated by spaces +inline void dsr_log_print_impl() { std::cout << std::endl; } + +template +inline void dsr_log_print_impl(const T& first, const Args&... rest) { + dsr_log_detail::print_value(std::cout, first); + if constexpr (sizeof...(rest) > 0) { + std::cout << ' '; + dsr_log_print_impl(rest...); + } else { + std::cout << std::endl; + } +} + +#define DSR_LOG_PRINT(...) dsr_log_print_impl(__VA_ARGS__) diff --git a/api/include/dsr/api/dsr_signal_emitter.h b/api/include/dsr/api/dsr_signal_emitter.h index 1392e6a..365443b 100644 --- a/api/include/dsr/api/dsr_signal_emitter.h +++ b/api/include/dsr/api/dsr_signal_emitter.h @@ -54,6 +54,7 @@ typedef std::variant uns_fns; std::vector unas_fns; std::vector ues_fns; @@ -62,7 +63,7 @@ struct QueuedSignalRunner { std::vector den_fns; std::vector dn_fns; std::vector de_fns; - explicit QueuedSignalRunner() : tp(2) + explicit QueuedSignalRunner() : tp(2) {} void connect(signal_fn_ptr_t fn, const std::string& type) {