diff --git a/api/CMakeLists.txt b/api/CMakeLists.txt index 154e90d..31d4e75 100644 --- a/api/CMakeLists.txt +++ b/api/CMakeLists.txt @@ -51,6 +51,7 @@ target_sources(dsr_api dsr_inner_eigen_api.cpp dsr_rt_api.cpp dsr_utils.cpp + dsr_signal_emitter.cpp GHistorySaver.cpp ${GEOM_API_SOURCES} ${headers_to_moc} diff --git a/api/dsr_api.cpp b/api/dsr_api.cpp index 1c06d6e..6b3759a 100644 --- a/api/dsr_api.cpp +++ b/api/dsr_api.cpp @@ -5,6 +5,7 @@ #include "dsr/api/dsr_graph_settings.h" #include "dsr/core/topics/IDLGraph.hpp" #include "dsr/core/types/translator.h" +#include "include/dsr/api/dsr_signal_emitter.h" #include #include #include @@ -16,6 +17,7 @@ #include #include #include +#include #include @@ -44,7 +46,11 @@ DSRGraph::DSRGraph(GraphSettings settings) : qDebug() << "Agent name: " << QString::fromStdString(agent_name); utils = std::make_unique(this); - + if (settings.signal_mode == SignalMode::QT) { + set_qt_signals(); + } else { + set_queued_signals(); + } // RTPS Create participant auto[suc, participant_handle] = dsrparticipant.init(agent_id, agent_name, settings.same_host, ParticipantChangeFunctor(this, [&](DSR::DSRGraph *graph, @@ -122,8 +128,8 @@ DSRGraph::DSRGraph(GraphSettings settings) : qDebug() << __FUNCTION__ << "Constructor finished OK"; } -DSRGraph::DSRGraph(std::string name, uint32_t id, const std::string &dsr_input_file, bool all_same_host, int8_t domain_id) - : DSR::DSRGraph(GraphSettings {id, 5, 1, name, dsr_input_file, "", all_same_host, GraphSettings::LOGLEVEL::INFOL, domain_id}) +DSRGraph::DSRGraph(std::string name, uint32_t id, const std::string &dsr_input_file, bool all_same_host, int8_t domain_id, SignalMode mode) + : DSR::DSRGraph(GraphSettings {id, 5, 1, name, dsr_input_file, "", all_same_host, GraphSettings::LOGLEVEL::INFOL, domain_id, mode}) {} @@ -202,10 +208,11 @@ std::optional DSRGraph::insert_node(No &&node) if (delta.has_value()) { dsrpub_node.write(&delta.value()); - emit update_node_signal(node.id(), node.type(), SignalInfo{agent_id}); + DSR_LOG_DEBUG("[INSERT_NODE] emitting update_node_signal", node.id(), node.type()); + emitter.update_node_signal(node.id(), node.type(), SignalInfo{agent_id}); for (const auto &[k, v]: node.fano()) { - emit update_edge_signal(node.id(), k.first, k.second, SignalInfo{agent_id}); + emitter.update_edge_signal(node.id(), k.first, k.second, SignalInfo{agent_id}); } } } @@ -291,13 +298,14 @@ requires (std::is_same_v, DSR::Node>) if (!copy) { if (vec_node_attr.has_value()) { dsrpub_node_attrs.write(&vec_node_attr.value()); - emit update_node_signal(node.id(), node.type(), SignalInfo{agent_id}); + DSR_LOG_DEBUG("[UPDATE_NODE] emitting update_node_signal", node.id(), node.type()); + emitter.update_node_signal(node.id(), node.type(), SignalInfo{agent_id}); std::vector atts_names(vec_node_attr->size()); std::transform(std::make_move_iterator(vec_node_attr->begin()), std::make_move_iterator(vec_node_attr->end()), atts_names.begin(), [](auto &&x) { return x.attr_name(); }); - emit update_node_attr_signal(node.id(), atts_names, SignalInfo{agent_id}); + emitter.update_node_attr_signal(node.id(), atts_names, SignalInfo{agent_id}); } } @@ -383,16 +391,17 @@ bool DSRGraph::delete_node(const std::string &name) if (result) { if (!copy) { - emit del_node_signal(id.value(), SignalInfo{agent_id}); - if (node_signal) emit deleted_node_signal(*node_signal, SignalInfo{agent_id}); + DSR_LOG_DEBUG("[DELETE_NODE] emitting del_node_signal", id.value()); + emitter.del_node_signal(id.value(), SignalInfo{agent_id}); + if (node_signal) emitter.deleted_node_signal(*node_signal, SignalInfo{agent_id}); dsrpub_node.write(&deleted_node.value()); for (auto &a : delta_vec) { dsrpub_edge.write(&a); } for (auto &edge : deleted_edges) { - emit del_edge_signal(edge.from(), edge.to(), edge.type(), SignalInfo{ agent_id }); - emit deleted_edge_signal(edge); + emitter.del_edge_signal(edge.from(), edge.to(), edge.type(), SignalInfo{ agent_id }); + emitter.deleted_edge_signal(edge, SignalInfo{ agent_id }); } } return true; @@ -416,16 +425,17 @@ bool DSRGraph::delete_node(uint64_t id) if (result) { if (!copy) { - emit del_node_signal(id, SignalInfo{ agent_id }); - if (node_signal) emit deleted_node_signal(*node_signal, SignalInfo{agent_id}); + DSR_LOG_DEBUG("[DELETE_NODE] emitting del_node_signal", id); + emitter.del_node_signal(id, SignalInfo{ agent_id }); + if (node_signal) emitter.deleted_node_signal(*node_signal, SignalInfo{agent_id}); dsrpub_node.write(&deleted_node.value()); for (auto &a : delta_vec) { dsrpub_edge.write(&a); } for (auto &edge : deleted_edges) { - emit del_edge_signal(edge.from(), edge.to(), edge.type(), SignalInfo{ agent_id }); - emit deleted_edge_signal(edge); + emitter.del_edge_signal(edge.from(), edge.to(), edge.type(), SignalInfo{ agent_id }); + emitter.deleted_edge_signal(edge, SignalInfo{ agent_id }); } } return true; @@ -632,7 +642,8 @@ requires (std::is_same_v, DSR::Edge>) } if (result) { if (!copy) { - emit update_edge_signal(attrs.from(), attrs.to(), attrs.type(), SignalInfo{ agent_id }); + DSR_LOG_DEBUG("[INSERT_OR_ASSIGN_EDGE] emitting update_edge_signal", attrs.from(), attrs.to(), attrs.type()); + emitter.update_edge_signal(attrs.from(), attrs.to(), attrs.type(), SignalInfo{ agent_id }); if (delta_edge.has_value()) { //Insert dsrpub_edge.write(&delta_edge.value()); @@ -645,7 +656,7 @@ requires (std::is_same_v, DSR::Edge>) atts_names.begin(), [](auto &&x) { return x.attr_name(); }); - emit update_edge_attr_signal(attrs.from(), attrs.to(), attrs.type(), atts_names, SignalInfo{ agent_id }); + emitter.update_edge_attr_signal(attrs.from(), attrs.to(), attrs.type(), atts_names, SignalInfo{ agent_id }); } } @@ -687,9 +698,10 @@ bool DSRGraph::delete_edge(uint64_t from, uint64_t to, const std::string &key) if (delta.has_value()) { if (!copy) { - emit del_edge_signal(from, to, key, SignalInfo{ agent_id }); + DSR_LOG_DEBUG("[DELETE_EDGE] emitting del_edge_signal", from, to, key); + emitter.del_edge_signal(from, to, key, SignalInfo{ agent_id }); if (deleted_edge.has_value()) { - emit deleted_edge_signal(*deleted_edge); + emitter.deleted_edge_signal(*deleted_edge, SignalInfo{ agent_id }); } dsrpub_edge.write(&delta.value()); } @@ -718,9 +730,9 @@ bool DSRGraph::delete_edge(const std::string &from, const std::string &to, const if (delta.has_value()) { if (!copy) { - emit del_edge_signal(id_from.value(), id_to.value(), key, SignalInfo{ agent_id }); + emitter.del_edge_signal(id_from.value(), id_to.value(), key, SignalInfo{ agent_id }); if (deleted_edge.has_value()) { - emit deleted_edge_signal(*deleted_edge); + emitter.deleted_edge_signal(*deleted_edge, SignalInfo{ agent_id }); } dsrpub_edge.write(&delta.value()); } @@ -963,6 +975,7 @@ void DSRGraph::join_delta_node(IDL::MvregNode &&mvreg) auto id = mvreg.id(); uint64_t timestamp = mvreg.timestamp(); + DSR_LOG_DEBUG("[JOIN_NODE] id:", id, "timestamp:", timestamp, "agent:", mvreg.agent_id()); auto crdt_delta = IDLNode_to_CRDT(std::move(mvreg)); bool d_empty = crdt_delta.empty(); @@ -974,7 +987,7 @@ void DSRGraph::join_delta_node(IDL::MvregNode &&mvreg) { node_handle = unprocessed_delta_edge_from.extract(id); } - + std::erase_if(unprocessed_delta_edge_to, [&](auto &it){ return std::get<0>(it.second) == id;}); std::erase_if(unprocessed_delta_edge_att, @@ -983,13 +996,14 @@ void DSRGraph::join_delta_node(IDL::MvregNode &&mvreg) std::unordered_set,hash_tuple> map_new_to_edges = {}; + std::unordered_set,hash_tuple> map_new_from_edges = {}; auto consume_unprocessed_deltas = [&](){ decltype(unprocessed_delta_node_att)::node_type node_handle_node_att = unprocessed_delta_node_att.extract(id); while (!node_handle_node_att.empty()) { auto &[att_name, delta, timestamp_node_att] = node_handle_node_att.mapped(); - //std::cout << "node_att " << id<< ", " <fano()) { - //std::cout << "[JOIN NODE] delete edge FROM: "<< node.second.read_reg().from() << ", " << node.second.read_reg().to() << ", " << node.second.read_reg().type() << std::endl; - emit del_edge_signal(node.second.read_reg().from(), node.second.read_reg().to(), + DSR_LOG_DEBUG("[JOIN_NODE] delete edge FROM:", node.second.read_reg().from(), node.second.read_reg().to(), node.second.read_reg().type()); + emitter.del_edge_signal(node.second.read_reg().from(), node.second.read_reg().to(), node.second.read_reg().type(), SignalInfo{ mvreg.agent_id() }); - emit deleted_edge_signal(Edge(node.second.read_reg())); + Edge tmp_edge(node.second.read_reg()); + emitter.deleted_edge_signal(tmp_edge, SignalInfo{ agent_id }); } } //TODO: deleted_edge_signal. update_maps_node_delete was called before so the maps are probably wrong... for (const auto &[from, type] : cache_map_to_edges.value()) { - //std::cout << "[JOIN NODE] delete edge TO: "<< from << ", " << id << ", " << type << std::endl; - emit del_edge_signal(from, id, type, SignalInfo{ mvreg.agent_id() }); - //emit deleted_edge_signal(Edge(node.second.read_reg())); TODO: fix this + DSR_LOG_DEBUG("[JOIN_NODE] delete edge TO:", from, id, type); + emitter.del_edge_signal(from, id, type, SignalInfo{ mvreg.agent_id() }); + //emitter.deleted_edge_signal(Edge(node.second.read_reg())); TODO: fix this } } @@ -1132,6 +1156,7 @@ void DSRGraph::join_delta_edge(IDL::MvregEdge &&mvreg) auto from = mvreg.id(); auto to = mvreg.to(); std::string type = mvreg.type(); + DSR_LOG_DEBUG("[JOIN_EDGE] from:", from, "to:", to, "type:", type, "agent:", mvreg.agent_id()); uint64_t timestamp = mvreg.timestamp(); @@ -1192,7 +1217,7 @@ void DSRGraph::join_delta_edge(IDL::MvregEdge &&mvreg) for (auto [begin, end] = unprocessed_delta_edge_from.equal_range(from); begin != end; ++begin) { //There should not be many elements in this iteration if (std::get<0>(begin->second) == to && std::get<1>(begin->second) == type){ std::get<2>(begin->second).join(::mvreg(crdt_delta)); - //std::cout << "JOIN_DELTA_EDGE ID(" << from<< ", "<< to <<", "<< type << ") JOIN UNPROCESSED" << std::endl; + DSR_LOG_DEBUG("[JOIN_EDGE] JOIN UNPROCESSED (from)", from, to, type); find = true; break; } } @@ -1200,28 +1225,26 @@ void DSRGraph::join_delta_edge(IDL::MvregEdge &&mvreg) for (auto [begin, end] = unprocessed_delta_edge_from.equal_range(to); begin != end; ++begin) { //There should not be many elements in this iteration if (std::get<0>(begin->second) == from && std::get<1>(begin->second) == type){ std::get<2>(begin->second).join(std::move(crdt_delta)); - //std::cout << "JOIN_DELTA_EDGE ID(" << from<< ", "<< to <<", "<< type << ") JOIN UNPROCESSED" << std::endl; + DSR_LOG_DEBUG("[JOIN_EDGE] JOIN UNPROCESSED (to)", from, to, type); find = true; break; } } if (!find) { - //std::cout << "JOIN_DELTA_EDGE ID(" << from<< ", "<< to <<", "<< type << ") INSERT UNPROCESSED " ; - if (!cfrom) { - //std::cout << "No existe from (" << from << ") unprocessed_delta_edge_from" << std::endl; - unprocessed_delta_edge_from.emplace(from, std::tuple{to, type, crdt_delta, timestamp}); + if (!cfrom) { + DSR_LOG_DEBUG("[JOIN_EDGE] INSERT UNPROCESSED, no from", from, "unprocessed_delta_edge_from"); + unprocessed_delta_edge_from.emplace(from, std::tuple{to, type, crdt_delta, timestamp}); } - if (cfrom && !cto) + if (cfrom && !cto) { - //std::cout << "No existe to (" << to << ") unprocessed_delta_edge_to" << std::endl; + DSR_LOG_DEBUG("[JOIN_EDGE] INSERT UNPROCESSED, no to", to, "unprocessed_delta_edge_to"); unprocessed_delta_edge_to.emplace(to, std::tuple{from, type, std::move(crdt_delta), timestamp}); } } else { - // We should sync the deleted_nodes set too... - std::cout <<"TODO: Unhandled, we should sync the deleted_nodes set" << std::endl; + DSR_LOG_WARNING("Unhandled case, should sync deleted_nodes set"); } } else { - //std::cout << "THE EDGE IS PART OF A DELETED NODE, CLEAN FROM UNPROCESSED" << std::endl; + DSR_LOG_DEBUG("[JOIN_EDGE] edge is part of deleted node, cleaning unprocessed"); auto node_deleted = [&](uint64_t id){ unprocessed_delta_edge_from.erase(id); unprocessed_delta_node_att.erase(id); @@ -1237,13 +1260,13 @@ void DSRGraph::join_delta_edge(IDL::MvregEdge &&mvreg) if (joined) { if (signal) { - //std::cout << "[JOIN EDGE] add edge: "<< from << ", " << to << ", " << type << std::endl; - emit update_edge_signal(from, to, type, SignalInfo{ mvreg.agent_id() }); + DSR_LOG_DEBUG("[JOIN_EDGE] add edge:", from, to, type); + emitter.update_edge_signal(from, to, type, SignalInfo{ mvreg.agent_id() }); } else { - //std::cout << "[JOIN EDGE] delete edge: "<< from << ", " << to << ", " << type << std::endl; - emit del_edge_signal(from, to, type, SignalInfo{ mvreg.agent_id() }); + DSR_LOG_DEBUG("[JOIN_EDGE] delete edge:", from, to, type); + emitter.del_edge_signal(from, to, type, SignalInfo{ mvreg.agent_id() }); if (deleted_edge.has_value()) { - emit deleted_edge_signal(*deleted_edge); + emitter.deleted_edge_signal(*deleted_edge, SignalInfo{ agent_id }); } } } @@ -1274,6 +1297,7 @@ std::optional DSRGraph::join_delta_node_attr(IDL::MvregNodeAttr &&m auto id = mvreg.id(); std::string att_name = mvreg.attr_name(); uint64_t timestamp = mvreg.timestamp(); + DSR_LOG_DEBUG("[JOIN_NODE_ATTR] node:", id, "attr:", att_name); { auto crdt_delta = IDLNodeAttr_to_CRDT(std::move(mvreg)); std::unique_lock lock(_mutex); @@ -1292,6 +1316,7 @@ std::optional DSRGraph::join_delta_node_attr(IDL::MvregNodeAttr &&m } } if (!find) { + DSR_LOG_DEBUG("[JOIN_NODE_ATTR] storing to unprocessed cache, node", id, att_name); unprocessed_delta_node_att.emplace(id, std::tuple{att_name, std::move(crdt_delta), timestamp}); } } else { @@ -1337,6 +1362,7 @@ std::optional DSRGraph::join_delta_edge_attr(IDL::MvregEdgeAttr &&m std::string type = mvreg.type(); std::string att_name = mvreg.attr_name(); uint64_t timestamp = mvreg.timestamp(); + DSR_LOG_DEBUG("[JOIN_EDGE_ATTR] edge:", from, to, type, "attr:", att_name); { auto crdt_delta = IDLEdgeAttr_to_CRDT(std::move(mvreg)); @@ -1357,6 +1383,7 @@ std::optional DSRGraph::join_delta_edge_attr(IDL::MvregEdgeAttr &&m } } if (!find) { + DSR_LOG_DEBUG("[JOIN_EDGE_ATTR] storing to unprocessed cache, edge", from, to, type, att_name); unprocessed_delta_edge_att.emplace(std::tuple{from, to, type}, std::tuple{att_name, std::move(crdt_delta), timestamp}); } } else { //If the node is deleted @@ -1415,7 +1442,7 @@ void DSRGraph::join_full_graph(IDL::OrMap &&full_graph) while (!node_handle_edge.empty()) { auto &[to, type, delta, timestamp_edge] = node_handle_edge.mapped(); auto att_key = std::tuple{id, to, type}; - //std::cout << "Procesando delta edge almacenado (unprocessed_delta_edge_from) " << id<< ", " <attrs() != nodes[id].read_reg().attrs()) { - emit update_node_signal(id, nodes[id].read_reg().type(), SignalInfo{ agent_id_ch }); + emitter.update_node_signal(id, nodes[id].read_reg().type(), SignalInfo{ agent_id_ch }); } else if (nd.value() != nodes[id].read_reg()) { auto iter = nodes[id].read_reg().fano(); for (const auto &[k, v] : nd->fano()) { if (!iter.contains(k)) { - emit del_edge_signal(id, k.first, k.second, SignalInfo{ agent_id_ch }); + emitter.del_edge_signal(id, k.first, k.second, SignalInfo{ agent_id_ch }); if (v.dk.ds.size() > 0) { - emit deleted_edge_signal(Edge(v.read_reg())); + Edge tmp_edge(v.read_reg()); + emitter.deleted_edge_signal(tmp_edge, SignalInfo{ agent_id }); } } } for (const auto &[k, v] : iter) { if (auto it = nd->fano().find(k); it == nd->fano().end() or it->second != v) - emit update_edge_signal(id, k.first, k.second, SignalInfo{ agent_id_ch }); + emitter.update_edge_signal(id, k.first, k.second, SignalInfo{ agent_id_ch }); } } } else { - emit del_node_signal(id, SignalInfo{ agent_id_ch }); + emitter.del_node_signal(id, SignalInfo{ agent_id_ch }); if (nd.has_value()) { - emit deleted_node_signal(Node(*nd)); + Node tmp_node(*nd); + emitter.deleted_node_signal(tmp_node, SignalInfo{ agent_id_ch }); } } @@ -1550,7 +1579,7 @@ std::map DSRGraph::Map() return m; } -void print_sample_info(const eprosima::fastdds::dds::SampleInfo& info) { +void print_sample_info(DSR::GraphSettings::LOGLEVEL log_level, const eprosima::fastdds::dds::SampleInfo& info) { double milliseconds_send = static_cast(info.source_timestamp.seconds()) * 1000.0; milliseconds_send += static_cast(info.source_timestamp.nanosec()) / 1000000.0; @@ -1570,7 +1599,7 @@ void print_sample_info(const eprosima::fastdds::dds::SampleInfo& info) { oss << " ms diff reception: " << (milliseconds_recv - milliseconds_send) << "ms" << std::endl; oss << " ms taken from dds: " << (now - milliseconds_send) << "ms" << std::endl; oss << " Valid Data: " << (info.valid_data ? "true" : "false") << std::endl; - std::cout << oss.str(); + qDebug() << QString::fromStdString(oss.str()); } @@ -1586,7 +1615,7 @@ void DSRGraph::node_subscription_thread() eprosima::fastdds::dds::SampleInfo m_info; IDL::MvregNode sample; if (reader->take_next_sample(&sample, &m_info) == 0) { - if (showReceived == GraphSettings::LOGLEVEL::DEBUGL) print_sample_info(m_info); + if (showReceived == GraphSettings::LOGLEVEL::DEBUGL) print_sample_info(showReceived, m_info); if (m_info.valid_data) { if (sample.agent_id() != agent_id) { if (showReceived == GraphSettings::LOGLEVEL::DEBUGL) { @@ -1620,7 +1649,7 @@ void DSRGraph::edge_subscription_thread() eprosima::fastdds::dds::SampleInfo m_info; IDL::MvregEdge sample; if (reader->take_next_sample(&sample, &m_info) == 0) { - if (showReceived == GraphSettings::LOGLEVEL::DEBUGL) print_sample_info(m_info); + if (showReceived == GraphSettings::LOGLEVEL::DEBUGL) print_sample_info(showReceived, m_info); if (m_info.valid_data) { if (sample.agent_id() != agent_id) { if (showReceived == GraphSettings::LOGLEVEL::DEBUGL) { @@ -1655,7 +1684,7 @@ void DSRGraph::edge_attrs_subscription_thread() eprosima::fastdds::dds::SampleInfo m_info; IDL::MvregEdgeAttrVec samples; if (reader->take_next_sample(&samples, &m_info) == 0) { - if (showReceived == GraphSettings::LOGLEVEL::DEBUGL) print_sample_info(m_info); + if (showReceived == GraphSettings::LOGLEVEL::DEBUGL) print_sample_info(showReceived, m_info); if (m_info.valid_data) { if (showReceived == GraphSettings::LOGLEVEL::DEBUGL) { qDebug() << name << " Received:" << samples.vec().size() << " edge attr from: " @@ -1689,8 +1718,8 @@ void DSRGraph::edge_attrs_subscription_thread() } - emit update_edge_attr_signal(from, to, type, sig, SignalInfo{samples.vec().at(0).agent_id()}); - emit update_edge_signal(from, to, type, SignalInfo{samples.vec().at(0).agent_id()}); + emitter.update_edge_attr_signal(from, to, type, sig, SignalInfo{samples.vec().at(0).agent_id()}); + emitter.update_edge_signal(from, to, type, SignalInfo{samples.vec().at(0).agent_id()}); }); } @@ -1723,7 +1752,7 @@ void DSRGraph::node_attrs_subscription_thread() eprosima::fastdds::dds::SampleInfo m_info; IDL::MvregNodeAttrVec samples; if (reader->take_next_sample(&samples, &m_info) == 0) { - if (showReceived == GraphSettings::LOGLEVEL::DEBUGL) print_sample_info(m_info); + if (showReceived == GraphSettings::LOGLEVEL::DEBUGL) print_sample_info(showReceived, m_info); if (m_info.valid_data) { if (showReceived == GraphSettings::LOGLEVEL::DEBUGL) { qDebug() << name << " Received:" << samples.vec().size() << " node attrs from: " @@ -1759,8 +1788,8 @@ void DSRGraph::node_attrs_subscription_thread() sig.emplace_back(std::move(opt_str.value())); } - emit update_node_attr_signal(id, sig, SignalInfo{samples.vec().at(0).agent_id()}); - emit update_node_signal(id, type, SignalInfo{samples.vec().at(0).agent_id()}); + emitter.update_node_attr_signal(id, sig, SignalInfo{samples.vec().at(0).agent_id()}); + emitter.update_node_signal(id, type, SignalInfo{samples.vec().at(0).agent_id()}); }); } } @@ -1788,7 +1817,7 @@ void DSRGraph::fullgraph_server_thread() eprosima::fastdds::dds::SampleInfo m_info; IDL::GraphRequest sample; if (reader->take_next_sample(&sample, &m_info) == 0) { - if (log_level == GraphSettings::LOGLEVEL::DEBUGL) print_sample_info(m_info); + if (log_level == GraphSettings::LOGLEVEL::DEBUGL) print_sample_info(log_level, m_info); if (m_info.valid_data) { { std::unique_lock lck(participant_set_mutex); @@ -1844,7 +1873,7 @@ std::pair DSRGraph::fullgraph_request_thread() eprosima::fastdds::dds::SampleInfo m_info; IDL::OrMap sample; if (reader->take_next_sample(&sample, &m_info) == 0) { - if (log_level == GraphSettings::LOGLEVEL::DEBUGL) print_sample_info(m_info); + if (log_level == GraphSettings::LOGLEVEL::DEBUGL) print_sample_info(log_level, m_info); if (m_info.valid_data) { if (sample.id() != graph->get_agent_id()) { if (sample.id() != static_cast(-1)) { diff --git a/api/dsr_rt_api.cpp b/api/dsr_rt_api.cpp index 3998b8a..8fb43e8 100644 --- a/api/dsr_rt_api.cpp +++ b/api/dsr_rt_api.cpp @@ -341,12 +341,12 @@ void RT_API::insert_or_assign_edge_RT(Node &n, uint64_t to, const std::vectordsrpub_node_attrs.write(&node2.value()); - emit G->update_edge_attr_signal(n.id(), to, "RT" ,{"rt_rotation_euler_xyz", "rt_translation"}, SignalInfo{ G->agent_id }); - emit G->update_edge_signal(n.id(), to, "RT", SignalInfo{ G->agent_id }); + G->emitter.update_edge_attr_signal(n.id(), to, "RT" ,{"rt_rotation_euler_xyz", "rt_translation"}, SignalInfo{ G->agent_id }); + G->emitter.update_edge_signal(n.id(), to, "RT", SignalInfo{ G->agent_id }); if (!no_send) { - emit G->update_node_signal(to_n->id(), to_n->type(), SignalInfo{ G->agent_id }); - emit G->update_node_attr_signal(to_n->id(), {"level", "parent"}, SignalInfo{ G->agent_id }); + G->emitter.update_node_signal(to_n->id(), to_n->type(), SignalInfo{ G->agent_id }); + G->emitter.update_node_attr_signal(to_n->id(), {"level", "parent"}, SignalInfo{ G->agent_id }); } } } @@ -528,12 +528,12 @@ void RT_API::insert_or_assign_edge_RT(Node &n, uint64_t to, std::vector & if (!no_send and node2.has_value()) G->dsrpub_node_attrs.write(&node2.value()); - emit G->update_edge_attr_signal(n.id(), to, "RT",{"rt_rotation_euler_xyz", "rt_translation"}, SignalInfo{ G->agent_id }); - emit G->update_edge_signal(n.id(), to, "RT", SignalInfo{ G->agent_id }); + G->emitter.update_edge_attr_signal(n.id(), to, "RT",{"rt_rotation_euler_xyz", "rt_translation"}, SignalInfo{ G->agent_id }); + G->emitter.update_edge_signal(n.id(), to, "RT", SignalInfo{ G->agent_id }); if (!no_send) { - emit G->update_node_signal(to_n->id(), to_n->type(), SignalInfo{ G->agent_id }); - emit G->update_node_attr_signal(to_n->id(), {"level", "parent"}, SignalInfo{ G->agent_id }); + G->emitter.update_node_signal(to_n->id(), to_n->type(), SignalInfo{ G->agent_id }); + G->emitter.update_node_attr_signal(to_n->id(), {"level", "parent"}, SignalInfo{ G->agent_id }); } } } diff --git a/api/dsr_signal_emitter.cpp b/api/dsr_signal_emitter.cpp new file mode 100644 index 0000000..3ab6dfc --- /dev/null +++ b/api/dsr_signal_emitter.cpp @@ -0,0 +1,94 @@ +#include +#include +#include +#include + +void DSR::QueuedSignalRunner::run_update_node_signal(std::uint64_t a, + const std::string &b, + SignalInfo c) { + DSR_LOG_DEBUG_L(static_cast(log_level), "[SIGNAL] update_node id:", a, "type:", b); + tp.spawn_task([=, this] { + for (auto fn : uns_fns) { + if (fn) + fn(a, b); + } + }); +} +void DSR::QueuedSignalRunner::run_update_node_attr_signal( + std::uint64_t a, const std::vector &b, SignalInfo c) { + DSR_LOG_DEBUG_L(static_cast(log_level), "[SIGNAL] update_node_attr id:", a); + tp.spawn_task([=, this] { + for (auto fn : unas_fns) { + if (fn) + fn(a, b); + } + }); +} +void DSR::QueuedSignalRunner::run_update_edge_signal(std::uint64_t a, + std::uint64_t b, + const std::string &c, + SignalInfo d) { + DSR_LOG_DEBUG_L(static_cast(log_level), "[SIGNAL] update_edge from:", a, "to:", b, "type:", c); + tp.spawn_task([=, this] { + for (auto fn : ues_fns) { + if (fn) + fn(a, b, c); + } + }); +} + +void DSR::QueuedSignalRunner::run_update_edge_attr_signal( + std::uint64_t a, std::uint64_t b, const std::string &c, + const std::vector &d, SignalInfo e) { + DSR_LOG_DEBUG_L(static_cast(log_level), "[SIGNAL] update_edge_attr from:", a, "to:", b, "type:", c); + tp.spawn_task([=, this] { + for (auto fn : ueas_fns) { + if (fn) + fn(a, b, c, d); + } + }); +} + +void DSR::QueuedSignalRunner::run_del_edge_signal(std::uint64_t a, + std::uint64_t b, + const std::string &c, + SignalInfo d) { + DSR_LOG_DEBUG_L(static_cast(log_level), "[SIGNAL] del_edge from:", a, "to:", b, "type:", c); + tp.spawn_task([=, this] { + for (auto fn : des_fns) { + if (fn) + fn(a, b, c); + } + }); +} +void DSR::QueuedSignalRunner::run_del_node_signal(std::uint64_t a, + SignalInfo b) { + DSR_LOG_DEBUG_L(static_cast(log_level), "[SIGNAL] del_node id:", a); + tp.spawn_task([=, this] { + for (auto fn : den_fns) { + if (fn) + fn(a); + } + }); +} + +void DSR::QueuedSignalRunner::run_deleted_node_signal(const Node &a, + SignalInfo b) { + DSR_LOG_DEBUG_L(static_cast(log_level), "[SIGNAL] deleted_node name:", a.name(), "id:", a.id()); + tp.spawn_task([=, this] { + for (auto fn : dn_fns) { + if (fn) + fn(a); + } + }); +} +void DSR::QueuedSignalRunner::run_deleted_edge_signal(const Edge &a, + SignalInfo b) { + DSR_LOG_DEBUG_L(static_cast(log_level), "[SIGNAL] deleted_edge from:", a.from(), "to:", a.to(), "type:", a.type()); + tp.spawn_task([=, this] { + for (auto fn : de_fns) { + if (fn) + fn(a); + } + }); +} diff --git a/api/include/dsr/api/dsr_api.h b/api/include/dsr/api/dsr_api.h index 1766740..682b55b 100644 --- a/api/include/dsr/api/dsr_api.h +++ b/api/include/dsr/api/dsr_api.h @@ -30,11 +30,14 @@ #include "dsr/api/dsr_utils.h" #include "dsr/api/dsr_signal_info.h" #include "dsr/api/dsr_graph_settings.h" +#include "dsr/api/dsr_logging.h" +#include "dsr/api/dsr_signal_emitter.h" #include "dsr/core/types/type_checking/dsr_edge_type.h" #include "dsr/core/types/type_checking/dsr_node_type.h" #include "dsr/core/types/type_checking/dsr_attr_name.h" #include "dsr/core/utils.h" #include "dsr/core/id_generator.h" +#include "dsr_signal_emitter.h" #include "threadpool/threadpool.h" #include @@ -57,9 +60,9 @@ namespace DSR size_t size() const; DSRGraph(GraphSettings settings); - DSRGraph(std::string name, uint32_t id, const std::string& dsr_input_file = std::string(), bool all_same_host = true, int8_t domain_id=0); - [[deprecated("root parameter is not used anymore")]] DSRGraph(uint64_t root, std::string name, int id, const std::string& dsr_input_file = std::string(), bool all_same_host = true, int8_t domain_id=0) - : DSRGraph(name, id, dsr_input_file, all_same_host, domain_id) + DSRGraph(std::string name, uint32_t id, const std::string& dsr_input_file = std::string(), bool all_same_host = true, int8_t domain_id=0, SignalMode = SignalMode::QT); + [[deprecated("root parameter is not used anymore")]] DSRGraph(uint64_t root, std::string name, int id, const std::string& dsr_input_file = std::string(), bool all_same_host = true, int8_t domain_id=0, SignalMode mode = SignalMode::QT) + : DSRGraph(name, id, dsr_input_file, all_same_host, domain_id, mode) {} ~DSRGraph() override; @@ -539,6 +542,14 @@ namespace DSR return ret_vec; } + + ////////////////////////////////////////////////// + ///// QueuedSignals for python + ///////////////////////////////////////////////// + + QueuedSignalRunner* get_signal_runner() { + return emitter.runner.get(); + } private: DSRGraph(const DSRGraph& G); //Private constructor for DSRCopy @@ -557,6 +568,41 @@ namespace DSR bool same_host; id_generator generator; GraphSettings::LOGLEVEL log_level; + signals_fns emitter; + + ////////////////////////////////////////////////////////////////////////// + // Signal method + /////////////////////////////////////////////////////////////////////////// + + void set_qt_signals (){ + emitter = { + [this](std::uint64_t a, const std::string & b, SignalInfo c = {}) { DSR_LOG_DEBUG("[SIGNAL] update_node id:", a, "type:", b); emit update_node_signal(a, b, c); }, + [this](std::uint64_t a, const std::vector &b, SignalInfo c = {}) { DSR_LOG_DEBUG("[SIGNAL] update_node_attr id:", a); emit update_node_attr_signal(a, b, c); }, + [this](std::uint64_t a, std::uint64_t b, const std::string & c, SignalInfo d = {}) { DSR_LOG_DEBUG("[SIGNAL] update_edge from:", a, "to:", b, "type:", c); emit update_edge_signal(a, b, c, d); }, + [this](std::uint64_t a, std::uint64_t b, const std::string & c, const std::vector &d, SignalInfo e = {}) { DSR_LOG_DEBUG("[SIGNAL] update_edge_attr from:", a, "to:", b, "type:", c); emit update_edge_attr_signal(a, b, c, d, e); }, + [this](std::uint64_t a, std::uint64_t b, const std::string & c, SignalInfo d = {}) { DSR_LOG_DEBUG("[SIGNAL] del_edge from:", a, "to:", b, "type:", c); emit del_edge_signal(a, b, c, d); }, + [this](std::uint64_t a, SignalInfo b = {}) { DSR_LOG_DEBUG("[SIGNAL] del_node id:", a); emit del_node_signal(a, b); }, + [this](const Node& a, SignalInfo b = {}) { DSR_LOG_DEBUG("[SIGNAL] deleted_node name:", a.name(), "id:", a.id()); emit deleted_node_signal(a, b); }, + [this](const Edge& a, SignalInfo b = {}) { DSR_LOG_DEBUG("[SIGNAL] deleted_edge from:", a.from(), "to:", a.to(), "type:", a.type()); emit deleted_edge_signal(a, b); }, + nullptr + }; + } + + void set_queued_signals (){ + auto runner = new QueuedSignalRunner(); + runner->log_level = static_cast(log_level); + emitter = { + [runner](std::uint64_t a, const std::string & b, SignalInfo c = {}) { runner->run_update_node_signal(a, b, c); }, + [runner](std::uint64_t a, const std::vector &b, SignalInfo c = {}) { runner->run_update_node_attr_signal(a, b, c); }, + [runner](std::uint64_t a, std::uint64_t b, const std::string & c, SignalInfo d = {}) { runner->run_update_edge_signal(a, b, c, d); }, + [runner](std::uint64_t a, std::uint64_t b, const std::string & c, const std::vector &d, SignalInfo e = {}) { runner->run_update_edge_attr_signal(a, b, c, d, e); }, + [runner](std::uint64_t a, std::uint64_t b, const std::string & c, SignalInfo d = {}) { runner->run_del_edge_signal(a, b, c, d); }, + [runner](std::uint64_t a, SignalInfo b = {}) { runner->run_del_node_signal(a, b); }, + [runner](const Node& a, SignalInfo b = {}) { runner->run_deleted_node_signal(a, b); }, + [runner](const Edge& a, SignalInfo b = {}) { runner->run_deleted_edge_signal(a, b); }, + std::unique_ptr(runner) + }; + } ////////////////////////////////////////////////////////////////////////// // Cache maps diff --git a/api/include/dsr/api/dsr_graph_settings.h b/api/include/dsr/api/dsr_graph_settings.h index c9707d9..e26874d 100644 --- a/api/include/dsr/api/dsr_graph_settings.h +++ b/api/include/dsr/api/dsr_graph_settings.h @@ -2,6 +2,7 @@ #include #include +#include namespace DSR { @@ -17,6 +18,7 @@ struct GraphSettings { DEBUGL = 0, INFOL, WARNINGL, ERRORL } log_level {LOGLEVEL::INFOL}; int8_t domain_id = 0; + SignalMode signal_mode = QT; }; } \ No newline at end of file diff --git a/api/include/dsr/api/dsr_logging.h b/api/include/dsr/api/dsr_logging.h new file mode 100644 index 0000000..cfbb9a4 --- /dev/null +++ b/api/include/dsr/api/dsr_logging.h @@ -0,0 +1,69 @@ +#pragma once + +#include +#include +#include +#include + +// ANSI color codes +#define DSR_COLOR_RESET "\033[0m" +#define DSR_COLOR_DEBUG "\033[36m" // Cyan +#define DSR_COLOR_INFO "\033[32m" // Green +#define DSR_COLOR_WARNING "\033[33m" // Yellow +#define DSR_COLOR_ERROR "\033[31m" // Red + +// Logging macros for use inside DSRGraph (where log_level is a member) +#define DSR_LOG_DEBUG(...) do { if (log_level <= DSR::GraphSettings::LOGLEVEL::DEBUGL) { std::cout << DSR_COLOR_DEBUG "[DSR DEBUG] " DSR_COLOR_RESET; DSR_LOG_PRINT(__VA_ARGS__); } } while(0) +#define DSR_LOG_INFO(...) do { if (log_level <= DSR::GraphSettings::LOGLEVEL::INFOL) { std::cout << DSR_COLOR_INFO "[DSR INFO] " DSR_COLOR_RESET; DSR_LOG_PRINT(__VA_ARGS__); } } while(0) +#define DSR_LOG_WARNING(...) do { if (log_level <= DSR::GraphSettings::LOGLEVEL::WARNINGL) { std::cout << DSR_COLOR_WARNING "[DSR WARNING] " DSR_COLOR_RESET; DSR_LOG_PRINT(__VA_ARGS__); } } while(0) +#define DSR_LOG_ERROR(...) do { if (log_level <= DSR::GraphSettings::LOGLEVEL::ERRORL) { std::cout << DSR_COLOR_ERROR "[DSR ERROR] " DSR_COLOR_RESET; DSR_LOG_PRINT(__VA_ARGS__); } } while(0) + +// Logging macros with explicit log level parameter (for use outside DSRGraph, e.g. signal emitter) +#define DSR_LOG_DEBUG_L(lvl, ...) do { if ((lvl) <= DSR::GraphSettings::LOGLEVEL::DEBUGL) { std::cout << DSR_COLOR_DEBUG "[DSR DEBUG] " DSR_COLOR_RESET; DSR_LOG_PRINT(__VA_ARGS__); } } while(0) +#define DSR_LOG_INFO_L(lvl, ...) do { if ((lvl) <= DSR::GraphSettings::LOGLEVEL::INFOL) { std::cout << DSR_COLOR_INFO "[DSR INFO] " DSR_COLOR_RESET; DSR_LOG_PRINT(__VA_ARGS__); } } while(0) +#define DSR_LOG_WARNING_L(lvl, ...) do { if ((lvl) <= DSR::GraphSettings::LOGLEVEL::WARNINGL) { std::cout << DSR_COLOR_WARNING "[DSR WARNING] " DSR_COLOR_RESET; DSR_LOG_PRINT(__VA_ARGS__); } } while(0) + +// Internal: print helpers for composite types +namespace dsr_log_detail { + +template +inline void print_value(std::ostream& os, const std::pair& p) { + os << '(' << p.first << ", " << p.second << ')'; +} + +template +inline void print_value(std::ostream& os, const std::tuple& t) { + os << '('; + std::apply([&os](const auto&... args) { + size_t n = 0; + ((os << (n++ ? ", " : "") << args), ...); + }, t); + os << ')'; +} + +inline void print_value(std::ostream& os, bool v) { + os << (v ? "true" : "false"); +} + +template +inline void print_value(std::ostream& os, const T& v) { + os << v; +} + +} // namespace dsr_log_detail + +// Internal: print variadic args separated by spaces +inline void dsr_log_print_impl() { std::cout << std::endl; } + +template +inline void dsr_log_print_impl(const T& first, const Args&... rest) { + dsr_log_detail::print_value(std::cout, first); + if constexpr (sizeof...(rest) > 0) { + std::cout << ' '; + dsr_log_print_impl(rest...); + } else { + std::cout << std::endl; + } +} + +#define DSR_LOG_PRINT(...) dsr_log_print_impl(__VA_ARGS__) diff --git a/api/include/dsr/api/dsr_signal_emitter.h b/api/include/dsr/api/dsr_signal_emitter.h new file mode 100644 index 0000000..365443b --- /dev/null +++ b/api/include/dsr/api/dsr_signal_emitter.h @@ -0,0 +1,127 @@ +#pragma once + +#include +#include +#include +#include +#include + +namespace DSR { + +struct Node; +struct Edge; +struct SignalInfo; + +enum SignalMode { QT, Queue }; + +typedef std::function + update_node_signal_t; +typedef std::function &, + SignalInfo)> + update_node_attr_signal_t; +typedef std::function + update_edge_signal_t; +typedef std::function &, SignalInfo)> + update_edge_attr_signal_t; +typedef std::function + del_edge_signal_t; +typedef std::function del_node_signal_t; +typedef std::function deleted_node_signal_t; +typedef std::function deleted_edge_signal_t; + +typedef std::function + update_node_signal_noinfo_t; +typedef std::function &)> + update_node_attr_signal_noinfo_t; +typedef std::function + update_edge_signal_noinfo_t; +typedef std::function &)> + update_edge_attr_signal_noinfo_t; +typedef update_edge_signal_noinfo_t del_edge_signal_noinfo_t; +typedef std::function del_node_signal_noinfo_t; +typedef std::function deleted_node_signal_noinfo_t; +typedef std::function deleted_edge_signal_noinfo_t; + +typedef std::variant + signal_fn_ptr_t; + +struct QueuedSignalRunner { + ThreadPool tp; + uint8_t log_level{1}; // GraphSettings::LOGLEVEL as uint8_t (0=DEBUG, 1=INFO, 2=WARNING, 3=ERROR) + std::vector uns_fns; + std::vector unas_fns; + std::vector ues_fns; + std::vector ueas_fns; + std::vector des_fns; + std::vector den_fns; + std::vector dn_fns; + std::vector de_fns; + explicit QueuedSignalRunner() : tp(2) + {} + + void connect(signal_fn_ptr_t fn, const std::string& type) { + std::visit( + [&, this](auto &&arg) { + using T = std::decay_t; + if constexpr (std::is_same_v) + unas_fns.push_back(arg); + else if constexpr (std::is_same_v) + uns_fns.push_back(arg); + else if constexpr (std::is_same_v) { + if (type == "UPDATE_EDGE") + ues_fns.push_back(arg); + else + des_fns.push_back(arg); + } + else if constexpr (std::is_same_v) + ueas_fns.push_back(arg); + else if constexpr (std::is_same_v) + den_fns.push_back(arg); + else if constexpr (std::is_same_v) + dn_fns.push_back(arg); + else if constexpr (std::is_same_v) + de_fns.push_back(arg); + else + std::cerr << "Python signals don't use SignalInfo parameter\n"; + }, + fn); + } + + void run_update_node_signal(std::uint64_t a, const std::string &b, + SignalInfo c); + void run_update_node_attr_signal(std::uint64_t a, + const std::vector &b, + SignalInfo c); + void run_update_edge_signal(std::uint64_t a, std::uint64_t b, + const std::string &c, SignalInfo d); + void run_update_edge_attr_signal(std::uint64_t a, std::uint64_t b, + const std::string &c, + const std::vector &d, + SignalInfo e); + void run_del_edge_signal(std::uint64_t a, std::uint64_t b, + const std::string &c, SignalInfo d); + void run_del_node_signal(std::uint64_t a, SignalInfo b); + void run_deleted_node_signal(const Node &a, SignalInfo b); + void run_deleted_edge_signal(const Edge &a, SignalInfo b); +}; + +struct signals_fns { + update_node_signal_t update_node_signal; + update_node_attr_signal_t update_node_attr_signal; + update_edge_signal_t update_edge_signal; + update_edge_attr_signal_t update_edge_attr_signal; + del_edge_signal_t del_edge_signal; + del_node_signal_t del_node_signal; + deleted_node_signal_t deleted_node_signal; + deleted_edge_signal_t deleted_edge_signal; + std::unique_ptr runner; +}; +} +; // namespace DSR \ No newline at end of file diff --git a/gui/dsr_gui.cpp b/gui/dsr_gui.cpp index eeab83d..6931900 100644 --- a/gui/dsr_gui.cpp +++ b/gui/dsr_gui.cpp @@ -352,7 +352,7 @@ void DSRViewer::initialize_views(int options, view central) tree_widget, &TreeViewer::node_check_state_changed, graph_widget, - [=](int value, int id, const std::string &type, QTreeWidgetItem *) + [=](int value, uint64_t id, const std::string &type, QTreeWidgetItem *) { graph_widget->hide_show_node_SLOT(id, value == 2); }); diff --git a/gui/viewers/graph_viewer/graph_viewer.cpp b/gui/viewers/graph_viewer/graph_viewer.cpp index bd57d5e..b31b476 100644 --- a/gui/viewers/graph_viewer/graph_viewer.cpp +++ b/gui/viewers/graph_viewer/graph_viewer.cpp @@ -280,7 +280,7 @@ GraphEdge* GraphViewer::new_visual_edge(std::uint64_t from, std::uint64_t to, co void GraphViewer::del_edge_SLOT(std::uint64_t from, std::uint64_t to, const std::string &edge_tag) { - qDebug()<<__FUNCTION__<<":"<<__LINE__; + qDebug()<<__FUNCTION__<<"from:"< key = std::make_tuple(from, to, edge_tag); @@ -311,12 +311,14 @@ void GraphViewer::del_edge_SLOT(std::uint64_t from, std::uint64_t to, const std: // remove node from scene void GraphViewer::del_node_SLOT(uint64_t id) { - qDebug()<<__FUNCTION__<<":"<<__LINE__; + qDebug()<<__FUNCTION__<<"node id:"< 0) { auto item = gmap.at(id); scene.removeItem(item); + for (auto &[type, ids] : type_id_map) + ids.erase(id); delete item; gmap.erase(id); auto id_str = std::to_string(id); @@ -330,7 +332,12 @@ void GraphViewer::del_node_SLOT(uint64_t id) void GraphViewer::hide_show_node_SLOT(uint64_t id, bool visible) { - auto item = gmap[id]; + auto it = gmap.find(id); + if (it == gmap.end() || it->second == nullptr) { + qDebug() << __FUNCTION__ << "skipping missing node" << id; + return; + } + auto item = it->second; item->setVisible(visible); for (const auto &gedge: item->edgeList) { diff --git a/gui/viewers/tree_viewer/tree_viewer.cpp b/gui/viewers/tree_viewer/tree_viewer.cpp index 92bdfc0..222b3fb 100644 --- a/gui/viewers/tree_viewer/tree_viewer.cpp +++ b/gui/viewers/tree_viewer/tree_viewer.cpp @@ -71,6 +71,7 @@ void TreeViewer::add_or_assign_node_SLOT(uint64_t id, const std::string &type, auto node = G->get_node(id); if (tree_map.count(id)==0) { + qDebug() << __FUNCTION__ << "new node id:" << id << "type:" << QString::fromStdString(type); QTreeWidgetItem* symbol_widget = nullptr; if (types_map.count(type)) { symbol_widget = types_map[type]; @@ -131,8 +132,7 @@ void TreeViewer::del_edge_SLOT(const std::uint64_t from, const std::uint64_t to, void TreeViewer::del_node_SLOT(uint64_t id) { - - qDebug()<<__FUNCTION__<<":"<<__LINE__; + qDebug()<<__FUNCTION__<<"node id:"< 0) { auto item = tree_map[id]; this->invisibleRootItem()->removeChild(item); @@ -155,10 +155,12 @@ void TreeViewer::category_change_SLOT(int value, QTreeWidgetItem* parent) void TreeViewer::node_change_SLOT(int value, uint64_t id, const std::string &type, QTreeWidgetItem* parent) { QCheckBox* sender = qobject_cast(this->sender()); - if(sender) + if(sender and G->get_node(id).has_value()) { qDebug()<<"Emitting signal for "<(this->itemWidget(parent,0))->text(); emit node_check_state_changed(value, id, type, parent); + } else { + qDebug() << __FUNCTION__ << "skipping, node no longer exists in G, id:" << id; } } diff --git a/python-wrapper/python_api.cpp b/python-wrapper/python_api.cpp index 98b8e88..46d4adb 100644 --- a/python-wrapper/python_api.cpp +++ b/python-wrapper/python_api.cpp @@ -3,6 +3,8 @@ // #include "dsr/api/dsr_api.h" +#include +#include #include using namespace DSR; @@ -149,8 +151,6 @@ PYBIND11_MAKE_OPAQUE(std::map, Edge>) PYBIND11_MAKE_OPAQUE(std::map) -static QCoreApplication *app = nullptr; - PYBIND11_MODULE(pydsr, m) { py::bind_map, Edge>>(m, "MapStringEdge"); py::bind_dsr_map>(m, "MapStringAttribute"); @@ -159,30 +159,6 @@ PYBIND11_MODULE(pydsr, m) { uint64_t local_agent_id = -1; - /*std::thread signal_thread ([] { - int argc = 0; - char *argv[] = {nullptr}; - auto *QApp = new QCoreApplication(argc, argv); - app = QApp; - QApp->exec(); - }); - - signal_thread.detach();*/ - - // ugly busy wait until the QApp thread is running. - while (app != nullptr){} - - py::module::import("atexit").attr("register")( - py::cpp_function{ - [&]() -> void { - if (app) { - //QCoreApplication::quit(); - std::exit(0); - } - } - } - ); - //Disable messages from Qt. qInstallMessageHandler([](QtMsgType type, const QMessageLogContext &context, const QString &msg) { if (type == QtCriticalMsg || type == QtFatalMsg) { @@ -244,179 +220,18 @@ PYBIND11_MODULE(pydsr, m) { .export_values(); - sig.def("connect", [](DSRGraph *G, signal_type type, callback_types fn_callback) { - switch (type) { - case UPDATE_NODE: - try { - QObject::connect(G, &DSR::DSRGraph::update_node_signal, - std::get>(fn_callback)); + sig.def("connect", [&](DSRGraph *G, signal_type type, callback_types fn_callback) { - } catch (std::exception &e) { - std::cout << "Update Node Callback must be (int, str)\n " << std::endl; - throw e; - } - break; - case UPDATE_NODE_ATTR: - try { - QObject::connect(G, &DSR::DSRGraph::update_node_attr_signal, - std::get &)>>( - fn_callback)); - - } catch (std::exception &e) { - std::cout << "Update Node Attribute Callback must be (int, [str])\n " << std::endl; - throw e; - } - break; - case UPDATE_EDGE: - try { - QObject::connect(G, &DSR::DSRGraph::update_edge_signal, - std::get>( - fn_callback)); - } catch (std::exception &e) { - std::cout << "Update Edge Callback must be (int, int, str)\n " << std::endl; - throw e; - } - break; - case UPDATE_EDGE_ATTR: - try { - QObject::connect(G, &DSR::DSRGraph::update_edge_attr_signal, - std::get &)>>(fn_callback)); - } catch (std::exception &e) { - std::cout << "Update Edge Attribute Callback must be (int, int, str, [str])\n " << std::endl; - throw e; - } - break; - case DELETE_EDGE: - try { - QObject::connect(G, &DSR::DSRGraph::del_edge_signal, - std::get>( - fn_callback)); - } catch (std::exception &e) { - std::cout << "Delete Edge Callback must be (int, int, str)\n " << std::endl; - throw e; - } - break; - case DELETE_NODE: - try { - QObject::connect(G, &DSR::DSRGraph::del_node_signal, - std::get>(fn_callback)); - } catch (std::exception &e) { - std::cout << "Delete Node Callback must be (int)\n " << std::endl; - throw e; - } - break; - case DELETE_NODE_OBJ: - try { - QObject::connect(G, &DSR::DSRGraph::deleted_node_signal, - std::get>(fn_callback)); - } catch (std::exception &e) { - std::cout << "Delete Node Callback must be (pydsr.Node)\n " << std::endl; - throw e; - } - break; - case DELETE_EDGE_OBJ: - try { - QObject::connect(G, &DSR::DSRGraph::deleted_edge_signal, - std::get>(fn_callback)); - } catch (std::exception &e) { - std::cout << "Delete Node Callback must be (pydsr.Edge)\n " << std::endl; - throw e; - } - break; - default: - throw std::logic_error("Invalid signal type"); + auto runner = G->get_signal_runner(); + if (!runner) { + throw std::runtime_error("Signal runner doesn't exist"); } - }); - - sig.def("connect2", [&](DSRGraph *G, signal_type type, callback_types fn_callback) { - - switch (type) { - case UPDATE_NODE: - try { - QObject::connect(G, &DSR::DSRGraph::update_node_signal, app, - std::get>(fn_callback), - Qt::QueuedConnection); - - } catch (std::exception &e) { - std::cout << "Update Node Callback must be (int, str)\n " << std::endl; - throw e; - } - break; - case UPDATE_NODE_ATTR: - try { - QObject::connect(G, &DSR::DSRGraph::update_node_attr_signal, app, - std::get &)>>(fn_callback), - Qt::QueuedConnection); - - } catch (std::exception &e) { - std::cout << "Update Node Attribute Callback must be (int, [str])\n " << std::endl; - throw e; - } - break; - case UPDATE_EDGE: - try { - QObject::connect(G, &DSR::DSRGraph::update_edge_signal, app, - std::get>(fn_callback), - Qt::QueuedConnection); - } catch (std::exception &e) { - std::cout << "Update Edge Callback must be (int, int, str)\n " << std::endl; - throw e; - } - break; - case UPDATE_EDGE_ATTR: - try { - QObject::connect(G, &DSR::DSRGraph::update_edge_attr_signal, app, - std::get &)>>(fn_callback), - Qt::QueuedConnection); - } catch (std::exception &e) { - std::cout << "Update Edge Attribute Callback must be (int, int, str, [str])\n " << std::endl; - throw e; - } - break; - case DELETE_EDGE: - try { - QObject::connect(G, &DSR::DSRGraph::del_edge_signal, app, - std::get>(fn_callback), - Qt::QueuedConnection); - } catch (std::exception &e) { - std::cout << "Delete Edge Callback must be (int, int, str)\n " << std::endl; - throw e; - } - break; - case DELETE_NODE: - try { - QObject::connect(G, &DSR::DSRGraph::del_node_signal, app, - std::get>(fn_callback), - Qt::QueuedConnection); - } catch (std::exception &e) { - std::cout << "Delete Node Callback must be (int)\n " << std::endl; - throw e; - } - break; - case DELETE_NODE_OBJ: - try { - QObject::connect(G, &DSR::DSRGraph::deleted_node_signal, app, - std::get>(fn_callback), - Qt::QueuedConnection); - } catch (std::exception &e) { - std::cout << "Delete Node Callback must be (pydsr.Node)\n " << std::endl; - throw e; - } - break; - case DELETE_EDGE_OBJ: - try { - QObject::connect(G, &DSR::DSRGraph::deleted_edge_signal, app, - std::get>(fn_callback), - Qt::QueuedConnection); - } catch (std::exception &e) { - std::cout << "Delete Node Callback must be (pydsr.Edge)\n " << std::endl; - throw e; - } - break; - default: - throw std::logic_error("Invalid signal type"); + try { + runner->connect(fn_callback, std::to_string(type)); + } catch (std::exception &e) { + std::cout << "Update Node Callback must be (int, str)\n " << std::endl; + throw e; } }); @@ -737,7 +552,7 @@ PYBIND11_MODULE(pydsr, m) { const std::string &dsr_input_file = "", bool all_same_host = true, int8_t domain_id = 0) -> std::unique_ptr { local_agent_id = id; - auto g = std::make_unique(root, name, id, dsr_input_file, all_same_host, domain_id); + auto g = std::make_unique(root, name, id, dsr_input_file, all_same_host, domain_id, SignalMode::Queue); return g; }), "root"_a, "name"_a, "id"_a, "dsr_input_file"_a = "", "all_same_host"_a = true, "domain_id"_a=0, py::call_guard()) diff --git a/python-wrapper/test/TestSignals.py b/python-wrapper/test/TestSignals.py index 1f83557..4e3464a 100644 --- a/python-wrapper/test/TestSignals.py +++ b/python-wrapper/test/TestSignals.py @@ -96,3 +96,5 @@ def connect(self): result = g.delete_node(2) console.print(result) +import time +time.sleep(1) \ No newline at end of file diff --git a/python-wrapper/test/TestSignalsQueued.py b/python-wrapper/test/TestSignalsQueued.py index e2ece4c..f0800b1 100644 --- a/python-wrapper/test/TestSignalsQueued.py +++ b/python-wrapper/test/TestSignalsQueued.py @@ -57,14 +57,14 @@ def deleted_edge_obj(self, edge: Edge): console.print(f"DELETED EDGE OBJ: {edge}", style='green') def connect(self): - signals.connect2(g, signals.UPDATE_NODE_ATTR, self.update_node_att) - signals.connect2(g, signals.UPDATE_NODE, self.update_node) - signals.connect2(g, signals.DELETE_NODE, self.delete_node) - signals.connect2(g, signals.UPDATE_EDGE, self.update_edge) - signals.connect2(g, signals.UPDATE_EDGE_ATTR, self.update_edge_att) - signals.connect2(g, signals.DELETE_EDGE, self.delete_edge) - signals.connect2(g, signals.DELETE_NODE_OBJ, self.deleted_node_obj) - signals.connect2(g, signals.DELETE_EDGE_OBJ, self.deleted_edge_obj) + signals.connect(g, signals.UPDATE_NODE_ATTR, self.update_node_att) + signals.connect(g, signals.UPDATE_NODE, self.update_node) + signals.connect(g, signals.DELETE_NODE, self.delete_node) + signals.connect(g, signals.UPDATE_EDGE, self.update_edge) + signals.connect(g, signals.UPDATE_EDGE_ATTR, self.update_edge_att) + signals.connect(g, signals.DELETE_EDGE, self.delete_edge) + signals.connect(g, signals.DELETE_NODE_OBJ, self.deleted_node_obj) + signals.connect(g, signals.DELETE_EDGE_OBJ, self.deleted_edge_obj) app = QtWidgets.QApplication(sys.argv) @@ -101,14 +101,14 @@ def deleted_edge_obj(edge: Edge): agent.connect() -signals.connect2(g, signals.UPDATE_NODE_ATTR, update_node_att) -signals.connect2(g, signals.UPDATE_NODE, update_node) -signals.connect2(g, signals.DELETE_NODE, delete_node) -signals.connect2(g, signals.UPDATE_EDGE, update_edge) -signals.connect2(g, signals.UPDATE_EDGE_ATTR, update_edge_att) -signals.connect2(g, signals.DELETE_EDGE, delete_edge) -signals.connect2(g, signals.DELETE_NODE_OBJ, deleted_node_obj) -signals.connect2(g, signals.DELETE_EDGE_OBJ, deleted_edge_obj) +signals.connect(g, signals.UPDATE_NODE_ATTR, update_node_att) +signals.connect(g, signals.UPDATE_NODE, update_node) +signals.connect(g, signals.DELETE_NODE, delete_node) +signals.connect(g, signals.UPDATE_EDGE, update_edge) +signals.connect(g, signals.UPDATE_EDGE_ATTR, update_edge_att) +signals.connect(g, signals.DELETE_EDGE, delete_edge) +signals.connect(g, signals.DELETE_NODE_OBJ, deleted_node_obj) +signals.connect(g, signals.DELETE_EDGE_OBJ, deleted_edge_obj)