Skip to content

Commit 7e8090e

Browse files
committed
feat: add shared memory transport to concoredocker.hpp (#486)
1 parent 01e0ec0 commit 7e8090e

1 file changed

Lines changed: 196 additions & 1 deletion

File tree

concoredocker.hpp

Lines changed: 196 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,25 @@
1414
#include <regex>
1515
#include <algorithm>
1616
#include <map>
17+
#include <cstring>
18+
19+
#ifdef __linux__
20+
#include <sys/ipc.h>
21+
#include <sys/shm.h>
22+
#include <unistd.h>
23+
#endif
1724

1825
#include "concore_base.hpp"
1926

2027
class Concore {
28+
private:
29+
int shmId_create = -1;
30+
int shmId_get = -1;
31+
char* sharedData_create = nullptr;
32+
char* sharedData_get = nullptr;
33+
int communication_iport = 0; // iport refers to input port
34+
int communication_oport = 0; // oport refers to output port
35+
2136
public:
2237
std::unordered_map<std::string, std::string> iport;
2338
std::unordered_map<std::string, std::string> oport;
@@ -55,13 +70,40 @@ class Concore {
5570
oport = safe_literal_eval("concore.oport", {});
5671
default_maxtime(100);
5772
load_params();
73+
74+
#ifdef __linux__
75+
int iport_number = -1;
76+
int oport_number = -1;
77+
78+
if (!iport.empty())
79+
iport_number = ExtractNumeric(iport.begin()->first);
80+
if (!oport.empty())
81+
oport_number = ExtractNumeric(oport.begin()->first);
82+
83+
if (oport_number != -1) {
84+
communication_oport = 1;
85+
createSharedMemory(oport_number);
86+
}
87+
if (iport_number != -1) {
88+
communication_iport = 1;
89+
getSharedMemory(iport_number);
90+
}
91+
#endif
5892
}
5993

6094
~Concore() {
6195
#ifdef CONCORE_USE_ZMQ
6296
for (auto& kv : zmq_ports)
6397
delete kv.second;
6498
zmq_ports.clear();
99+
#endif
100+
#ifdef __linux__
101+
if (communication_oport == 1 && sharedData_create != nullptr)
102+
shmdt(sharedData_create);
103+
if (communication_iport == 1 && sharedData_get != nullptr)
104+
shmdt(sharedData_get);
105+
if (shmId_create != -1)
106+
shmctl(shmId_create, IPC_RMID, nullptr);
65107
#endif
66108
}
67109

@@ -74,11 +116,20 @@ class Concore {
74116
delay(other.delay), retrycount(other.retrycount),
75117
inpath(std::move(other.inpath)), outpath(std::move(other.outpath)),
76118
simtime(other.simtime), maxtime(other.maxtime),
77-
params(std::move(other.params))
119+
params(std::move(other.params)),
120+
shmId_create(other.shmId_create), shmId_get(other.shmId_get),
121+
sharedData_create(other.sharedData_create), sharedData_get(other.sharedData_get),
122+
communication_iport(other.communication_iport), communication_oport(other.communication_oport)
78123
{
79124
#ifdef CONCORE_USE_ZMQ
80125
zmq_ports = std::move(other.zmq_ports);
81126
#endif
127+
other.shmId_create = -1;
128+
other.shmId_get = -1;
129+
other.sharedData_create = nullptr;
130+
other.sharedData_get = nullptr;
131+
other.communication_iport = 0;
132+
other.communication_oport = 0;
82133
}
83134

84135
Concore& operator=(Concore&& other) noexcept
@@ -91,6 +142,14 @@ class Concore {
91142
delete kv.second;
92143
zmq_ports = std::move(other.zmq_ports);
93144
#endif
145+
#ifdef __linux__
146+
if (communication_oport == 1 && sharedData_create != nullptr)
147+
shmdt(sharedData_create);
148+
if (communication_iport == 1 && sharedData_get != nullptr)
149+
shmdt(sharedData_get);
150+
if (shmId_create != -1)
151+
shmctl(shmId_create, IPC_RMID, nullptr);
152+
#endif
94153

95154
iport = std::move(other.iport);
96155
oport = std::move(other.oport);
@@ -103,6 +162,19 @@ class Concore {
103162
simtime = other.simtime;
104163
maxtime = other.maxtime;
105164
params = std::move(other.params);
165+
shmId_create = other.shmId_create;
166+
shmId_get = other.shmId_get;
167+
sharedData_create = other.sharedData_create;
168+
sharedData_get = other.sharedData_get;
169+
communication_iport = other.communication_iport;
170+
communication_oport = other.communication_oport;
171+
172+
other.shmId_create = -1;
173+
other.shmId_get = -1;
174+
other.sharedData_create = nullptr;
175+
other.sharedData_get = nullptr;
176+
other.communication_iport = 0;
177+
other.communication_oport = 0;
106178

107179
return *this;
108180
}
@@ -131,6 +203,57 @@ class Concore {
131203
inpath + "/1/concore.maxtime", defaultValue);
132204
}
133205

206+
key_t ExtractNumeric(const std::string& str) {
207+
std::string numberString;
208+
size_t numDigits = 0;
209+
while (numDigits < str.length() && std::isdigit(str[numDigits])) {
210+
numberString += str[numDigits];
211+
++numDigits;
212+
}
213+
if (numDigits == 0)
214+
return -1;
215+
if (numDigits == 1 && std::stoi(numberString) <= 0)
216+
return -1;
217+
return std::stoi(numberString);
218+
}
219+
220+
#ifdef __linux__
221+
void createSharedMemory(key_t key) {
222+
shmId_create = shmget(key, 256, IPC_CREAT | 0666);
223+
if (shmId_create == -1) {
224+
std::cerr << "Failed to create shared memory segment.\n";
225+
return;
226+
}
227+
sharedData_create = static_cast<char*>(shmat(shmId_create, NULL, 0));
228+
if (sharedData_create == reinterpret_cast<char*>(-1)) {
229+
std::cerr << "Failed to attach shared memory segment.\n";
230+
sharedData_create = nullptr;
231+
}
232+
}
233+
234+
void getSharedMemory(key_t key) {
235+
int retry = 0;
236+
const int MAX_RETRY = 100;
237+
while (retry < MAX_RETRY) {
238+
shmId_get = shmget(key, 256, 0666);
239+
if (shmId_get != -1)
240+
break;
241+
std::cout << "Shared memory does not exist. Make sure the writer process is running.\n";
242+
sleep(1);
243+
retry++;
244+
}
245+
if (shmId_get == -1) {
246+
std::cerr << "Failed to get shared memory segment after max retries.\n";
247+
return;
248+
}
249+
sharedData_get = static_cast<char*>(shmat(shmId_get, NULL, 0));
250+
if (sharedData_get == reinterpret_cast<char*>(-1)) {
251+
std::cerr << "Failed to attach shared memory segment.\n";
252+
sharedData_get = nullptr;
253+
}
254+
}
255+
#endif
256+
134257
bool unchanged() {
135258
if (olds == s) {
136259
s.clear();
@@ -141,6 +264,10 @@ class Concore {
141264
}
142265

143266
std::vector<double> read(int port, const std::string& name, const std::string& initstr) {
267+
#ifdef __linux__
268+
if (communication_iport == 1)
269+
return read_SM(port, name, initstr);
270+
#endif
144271
std::this_thread::sleep_for(std::chrono::seconds(delay));
145272
std::string file_path = inpath + "/" + std::to_string(port) + "/" + name;
146273
std::ifstream infile(file_path);
@@ -178,7 +305,56 @@ class Concore {
178305
return inval;
179306
}
180307

308+
#ifdef __linux__
309+
std::vector<double> read_SM(int port, const std::string& name, const std::string& initstr) {
310+
std::this_thread::sleep_for(std::chrono::seconds(delay));
311+
std::string ins;
312+
try {
313+
if (shmId_get != -1 && sharedData_get && sharedData_get[0] != '\0')
314+
ins = std::string(sharedData_get, strnlen(sharedData_get, 256));
315+
else
316+
throw 505;
317+
} catch (...) {
318+
ins = initstr;
319+
}
320+
321+
int retry = 0;
322+
const int MAX_RETRY = 100;
323+
while ((int)ins.length() == 0 && retry < MAX_RETRY) {
324+
std::this_thread::sleep_for(std::chrono::seconds(delay));
325+
try {
326+
if (shmId_get != -1 && sharedData_get) {
327+
ins = std::string(sharedData_get, strnlen(sharedData_get, 256));
328+
retrycount++;
329+
} else {
330+
retrycount++;
331+
throw 505;
332+
}
333+
} catch (...) {
334+
std::cerr << "Read error\n";
335+
}
336+
retry++;
337+
}
338+
339+
s += ins;
340+
std::vector<double> inval = concore_base::parselist_double(ins);
341+
if (inval.empty())
342+
inval = concore_base::parselist_double(initstr);
343+
if (inval.empty())
344+
return inval;
345+
simtime = simtime > inval[0] ? simtime : inval[0];
346+
inval.erase(inval.begin());
347+
return inval;
348+
}
349+
#endif
350+
181351
void write(int port, const std::string& name, const std::vector<double>& val, int delta = 0) {
352+
#ifdef __linux__
353+
if (communication_oport == 1) {
354+
write_SM(port, name, val, delta);
355+
return;
356+
}
357+
#endif
182358
std::string file_path = outpath + "/" + std::to_string(port) + "/" + name;
183359
std::ofstream outfile(file_path);
184360
if (!outfile) {
@@ -195,6 +371,25 @@ class Concore {
195371
}
196372
}
197373

374+
#ifdef __linux__
375+
void write_SM(int port, const std::string& name, std::vector<double> val, int delta = 0) {
376+
try {
377+
if (shmId_create == -1)
378+
throw 505;
379+
val.insert(val.begin(), simtime + delta);
380+
std::ostringstream outfile;
381+
outfile << '[';
382+
for (size_t i = 0; i < val.size() - 1; i++)
383+
outfile << val[i] << ',';
384+
outfile << val[val.size() - 1] << ']';
385+
std::string result = outfile.str();
386+
std::strncpy(sharedData_create, result.c_str(), 256 - 1);
387+
} catch (...) {
388+
std::cerr << "skipping +" << outpath << port << "/" << name << "\n";
389+
}
390+
}
391+
#endif
392+
198393
#ifdef CONCORE_USE_ZMQ
199394
void init_zmq_port(const std::string& port_name, const std::string& port_type,
200395
const std::string& address, const std::string& socket_type_str) {

0 commit comments

Comments
 (0)