Skip to content

Commit 6f18462

Browse files
gh-23: Stabilize PARFOR.
1 parent 56b595e commit 6f18462

File tree

5 files changed

+205
-58
lines changed

5 files changed

+205
-58
lines changed

src/builtins.c

Lines changed: 33 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1556,6 +1556,9 @@ static void ser_value(JsonBuf* jb, SerCtx* ctx, Interpreter* interp, Value v) {
15561556
}
15571557
case VAL_THR: {
15581558
Thr* th = v.as.thr;
1559+
Value thv = value_null();
1560+
thv.type = VAL_THR;
1561+
thv.as.thr = th;
15591562
const char* id = ser_thr_id(ctx, th);
15601563
jb_append_char(jb, '{');
15611564
bool first = true;
@@ -1564,15 +1567,15 @@ static void ser_value(JsonBuf* jb, SerCtx* ctx, Interpreter* interp, Value v) {
15641567
json_obj_field(jb, &first, "id");
15651568
jb_append_json_string(jb, id);
15661569
json_obj_field(jb, &first, "state");
1567-
if (th->finished) jb_append_json_string(jb, "finished");
1568-
else if (th->paused) jb_append_json_string(jb, "paused");
1570+
if (value_thr_get_finished(thv)) jb_append_json_string(jb, "finished");
1571+
else if (value_thr_get_paused(thv)) jb_append_json_string(jb, "paused");
15691572
else jb_append_json_string(jb, "running");
15701573
json_obj_field(jb, &first, "paused");
1571-
jb_append_str(jb, th->paused ? "true" : "false");
1574+
jb_append_str(jb, value_thr_get_paused(thv) ? "true" : "false");
15721575
json_obj_field(jb, &first, "finished");
1573-
jb_append_str(jb, th->finished ? "true" : "false");
1576+
jb_append_str(jb, value_thr_get_finished(thv) ? "true" : "false");
15741577
json_obj_field(jb, &first, "stop");
1575-
jb_append_str(jb, th->finished ? "true" : "false");
1578+
jb_append_str(jb, value_thr_get_finished(thv) ? "true" : "false");
15761579
json_obj_field(jb, &first, "env");
15771580
ser_env(jb, ctx, interp, th->env);
15781581
json_obj_field(jb, &first, "block");
@@ -2269,22 +2272,21 @@ static Value deser_val(JsonValue* obj, UnserCtx* ctx, Interpreter* interp, const
22692272
if (id) {
22702273
Thr* existing = unser_thr_get(ctx, id);
22712274
if (existing) {
2272-
Value ret; ret.type = VAL_THR; ret.as.thr = existing; existing->refcount++;
2273-
return ret;
2275+
Value ret; ret.type = VAL_THR; ret.as.thr = existing;
2276+
return value_copy(ret);
22742277
}
22752278
}
22762279
Value thr = value_thr_new();
2277-
Thr* th = thr.as.thr;
2278-
th->finished = 1;
2279-
th->paused = json_obj_get(obj, "paused") && json_obj_get(obj, "paused")->type == JSON_BOOL ? json_obj_get(obj, "paused")->as.boolean : 0;
2280-
th->started = 0;
2281-
th->body = NULL;
2282-
th->env = NULL;
2280+
value_thr_set_finished(thr, 1);
2281+
value_thr_set_paused(thr, json_obj_get(obj, "paused") && json_obj_get(obj, "paused")->type == JSON_BOOL ? json_obj_get(obj, "paused")->as.boolean : 0);
2282+
value_thr_set_started(thr, 0);
2283+
thr.as.thr->body = NULL;
2284+
thr.as.thr->env = NULL;
22832285
JsonValue* blk = json_obj_get(obj, "block");
22842286
JsonValue* envv = json_obj_get(obj, "env");
2285-
if (blk && blk->type == JSON_OBJ) th->body = deser_stmt(blk, ctx, interp, err);
2286-
if (envv && envv->type == JSON_OBJ) th->env = deser_env(envv, ctx, interp, err);
2287-
if (id) unser_thr_set(ctx, id, th);
2287+
if (blk && blk->type == JSON_OBJ) thr.as.thr->body = deser_stmt(blk, ctx, interp, err);
2288+
if (envv && envv->type == JSON_OBJ) thr.as.thr->env = deser_env(envv, ctx, interp, err);
2289+
if (id) unser_thr_set(ctx, id, thr.as.thr);
22882290
return thr;
22892291
}
22902292

@@ -6390,16 +6392,15 @@ static Value builtin_await(Interpreter* interp, Value* args, int argc, Expr** ar
63906392
// where the worker could free the Thr between the check and
63916393
// the join).
63926394
Value ret = value_copy(args[0]);
6393-
Thr* th = ret.as.thr;
6394-
if (!th->started) {
6395+
if (!value_thr_get_started(ret)) {
63956396
return ret;
63966397
}
63976398
// Wait for worker to mark finished; yield while spinning to be cooperative
6398-
while (!th->finished) {
6399+
while (!value_thr_get_finished(ret)) {
63996400
thrd_yield();
64006401
}
64016402
// Join to reclaim thread resources; ignore join errors
6402-
thrd_join(th->thread, NULL);
6403+
thrd_join(ret.as.thr->thread, NULL);
64036404
return ret;
64046405
}
64056406

@@ -6420,7 +6421,7 @@ static int pause_timer_worker(void* arg) {
64206421
thrd_sleep(&ts, NULL);
64216422
}
64226423
if (pt->thr_val.type == VAL_THR && pt->thr_val.as.thr) {
6423-
pt->thr_val.as.thr->paused = 0;
6424+
value_thr_set_paused(pt->thr_val, 0);
64246425
}
64256426
value_free(pt->thr_val);
64266427
free(pt);
@@ -6436,11 +6437,10 @@ static Value builtin_pause(Interpreter* interp, Value* args, int argc, Expr** ar
64366437
if (args[0].type != VAL_THR || !args[0].as.thr) {
64376438
RUNTIME_ERROR(interp, "PAUSE expects THR argument", line, col);
64386439
}
6439-
Thr* th = args[0].as.thr;
6440-
if (th->finished) {
6440+
if (value_thr_get_finished(args[0])) {
64416441
RUNTIME_ERROR(interp, "Cannot pause finished thread", line, col);
64426442
}
6443-
if (th->paused) {
6443+
if (value_thr_get_paused(args[0])) {
64446444
RUNTIME_ERROR(interp, "Thread already paused", line, col);
64456445
}
64466446

@@ -6455,7 +6455,7 @@ static Value builtin_pause(Interpreter* interp, Value* args, int argc, Expr** ar
64556455
}
64566456
}
64576457

6458-
th->paused = 1;
6458+
value_thr_set_paused(args[0], 1);
64596459

64606460
if (seconds >= 0) {
64616461
PauseTimer* pt = malloc(sizeof(PauseTimer));
@@ -6466,7 +6466,7 @@ static Value builtin_pause(Interpreter* interp, Value* args, int argc, Expr** ar
64666466
if (thrd_create(&t, pause_timer_worker, pt) != thrd_success) {
64676467
value_free(pt->thr_val);
64686468
free(pt);
6469-
th->paused = 0;
6469+
value_thr_set_paused(args[0], 0);
64706470
RUNTIME_ERROR(interp, "Failed to schedule resume", line, col);
64716471
}
64726472
thrd_detach(t);
@@ -6484,11 +6484,10 @@ static Value builtin_resume(Interpreter* interp, Value* args, int argc, Expr** a
64846484
if (args[0].type != VAL_THR || !args[0].as.thr) {
64856485
RUNTIME_ERROR(interp, "RESUME expects THR argument", line, col);
64866486
}
6487-
Thr* th = args[0].as.thr;
6488-
if (!th->paused) {
6487+
if (!value_thr_get_paused(args[0])) {
64896488
RUNTIME_ERROR(interp, "Thread is not paused", line, col);
64906489
}
6491-
th->paused = 0;
6490+
value_thr_set_paused(args[0], 0);
64926491
return value_copy(args[0]);
64936492
}
64946493

@@ -6501,7 +6500,7 @@ static Value builtin_paused(Interpreter* interp, Value* args, int argc, Expr** a
65016500
if (args[0].type != VAL_THR || !args[0].as.thr) {
65026501
RUNTIME_ERROR(interp, "PAUSED expects THR argument", line, col);
65036502
}
6504-
return value_int(args[0].as.thr->paused ? 1 : 0);
6503+
return value_int(value_thr_get_paused(args[0]) ? 1 : 0);
65056504
}
65066505

65076506
// STOP(THR: thread):THR — cooperatively stop a running thread and mark finished
@@ -6513,12 +6512,11 @@ static Value builtin_stop(Interpreter* interp, Value* args, int argc, Expr** arg
65136512
if (args[0].type != VAL_THR || !args[0].as.thr) {
65146513
RUNTIME_ERROR(interp, "STOP expects THR argument", line, col);
65156514
}
6516-
Thr* th = args[0].as.thr;
6517-
if (th->finished) {
6515+
if (value_thr_get_finished(args[0])) {
65186516
return value_copy(args[0]);
65196517
}
6520-
th->paused = 0;
6521-
th->finished = 1;
6518+
value_thr_set_paused(args[0], 0);
6519+
value_thr_set_finished(args[0], 1);
65226520
return value_copy(args[0]);
65236521
}
65246522

@@ -6535,7 +6533,7 @@ static Value builtin_restart(Interpreter* interp, Value* args, int argc, Expr**
65356533
if (!th->body || !th->env) {
65366534
RUNTIME_ERROR(interp, "Cannot restart: no stored thread body/env", line, col);
65376535
}
6538-
if (!th->finished) {
6536+
if (!value_thr_get_finished(args[0])) {
65396537
RUNTIME_ERROR(interp, "Cannot restart running thread", line, col);
65406538
}
65416539
// Delegate to interpreter helper that knows how to launch thr_worker

0 commit comments

Comments
 (0)