-
Notifications
You must be signed in to change notification settings - Fork 32
Expand file tree
/
Copy pathconcoredocker.py
More file actions
131 lines (112 loc) · 3.34 KB
/
concoredocker.py
File metadata and controls
131 lines (112 loc) · 3.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import time
from ast import literal_eval
import re
import os
def safe_literal_eval(filename, defaultValue):
try:
with open(filename, "r") as file:
return literal_eval(file.read())
except (FileNotFoundError, SyntaxError, ValueError, Exception) as e:
print(f"Error reading {filename}: {e}")
return defaultValue
iport = safe_literal_eval("concore.iport", {})
oport = safe_literal_eval("concore.oport", {})
s = ''
olds = ''
delay = 1
retrycount = 0
inpath = os.path.abspath("/in")
outpath = os.path.abspath("/out")
simtime = 0
#9/21/22
try:
sparams = open(inpath+"1/concore.params").read()
if sparams[0] == '"': #windows keeps "" need to remove
sparams = sparams[1:]
sparams = sparams[0:sparams.find('"')]
if sparams != '{':
print("converting sparams: "+sparams)
sparams = "{'"+re.sub(',',",'",re.sub('=',"':",re.sub(' ','',sparams)))+"}"
print("converted sparams: " + sparams)
try:
params = literal_eval(sparams)
except:
print("bad params: "+sparams)
except:
params = dict()
#9/30/22
def tryparam(n, i):
return params.get(n, i)
#9/12/21
def default_maxtime(default):
global maxtime
maxtime = safe_literal_eval(os.path.join(inpath, "1", "concore.maxtime"), default)
default_maxtime(100)
def unchanged():
global olds, s
if olds == s:
s = ''
return True
olds = s
return False
def read(port, name, initstr):
global s, simtime, retrycount
max_retries=5
time.sleep(delay)
file_path = os.path.join(inpath+str(port), name)
try:
with open(file_path, "r") as infile:
ins = infile.read()
except FileNotFoundError:
print(f"File {file_path} not found, using default value.")
ins = initstr
except Exception as e:
print(f"Error reading {file_path}: {e}")
return initstr
attempts = 0
while len(ins) == 0 and attempts < max_retries:
time.sleep(delay)
try:
with open(file_path, "r") as infile:
ins = infile.read()
except Exception as e:
print(f"Retry {attempts + 1}: Error reading {file_path} - {e}")
attempts += 1
retrycount += 1
if len(ins) == 0:
print(f"Max retries reached for {file_path}, using default value.")
return initstr
s += ins
try:
inval = literal_eval(ins)
simtime = max(simtime, inval[0])
return inval[1:]
except Exception as e:
print(f"Error parsing {ins}: {e}")
return initstr
def write(port, name, val, delta=0):
global simtime
file_path = os.path.join(outpath+str(port), name)
if isinstance(val, str):
time.sleep(2 * delay)
elif not isinstance(val, list):
print("write must have list or str")
return
try:
with open(file_path, "w") as outfile:
if isinstance(val, list):
outfile.write(str([simtime + delta] + val))
simtime += delta
else:
outfile.write(val)
except Exception as e:
print(f"Error writing to {file_path}: {e}")
def initval(simtime_val):
global simtime
try:
val = literal_eval(simtime_val)
simtime = val[0]
return val[1:]
except Exception as e:
print(f"Error parsing simtime_val: {e}")
return []