Skip to content

Commit 86e298d

Browse files
capturing async output from asynctopolis
1 parent 237c123 commit 86e298d

File tree

4 files changed

+248
-154
lines changed

4 files changed

+248
-154
lines changed

deps.edn

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
clj-fuzzy/clj-fuzzy {:mvn/version "0.4.1"}
1111
clj-thamil/clj-thamil {:mvn/version "0.2.0"}
1212
org.scicloj/clay {:git/url "https://github.com/scicloj/clay"
13-
:git/sha "d64df566e3dd0e90ac9360d86a481a7be7587eaf"}
13+
:git/sha "d8bbf8b3cce134c56faadb97410e7dfc766f24be"}
1414
org.eclipse.elk/org.eclipse.elk.core {:mvn/version "0.10.0"}
1515
org.eclipse.elk/org.eclipse.elk.graph {:mvn/version "0.10.0"}
1616
org.eclipse.elk/org.eclipse.elk.graph.json {:mvn/version "0.10.0"}

src/core/async/flow/example/asynctopolis.clj

Lines changed: 134 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919
;; Here, unseen agents pass messages, track patterns, and sound alarms when the moment calls.
2020
;; No one oversees the whole city, yet everything flows.
2121
;;
22-
;; Beneath it all hums the Stats Core Async Flow,
22+
;; Beneath it all hums the Core Async Flow,
2323
;; a network of processes working together without ever meeting.
24-
;; Today, you'll meet the agents of this asynchronous allegiance.
24+
;; Today, you'll observe the agents of this asynchronous allegiance.
2525

2626
;; This code is adapted from [Alex's stats flow example](https://github.com/puredanger/flow-example),
2727
;; used for his video walkthrough.
@@ -30,6 +30,13 @@
3030
{:youtube-id "lXFwf3O4BVY"
3131
:iframe-width "100%"}
3232

33+
;; In Asynctopolis, there are often races to print messages,
34+
;; which for the sake of clarity we shall serialize.
35+
36+
(defn println* [& args]
37+
(locking println*
38+
(apply println args)))
39+
3340
;; Above us in the sky flies Talon the Stat Hawk.
3441
;; Sleek, silent, and tireless.
3542
;; With a glint in his eye and wings tipped in probability,
@@ -41,30 +48,30 @@
4148

4249
(defn Talon
4350
"Generates a random value between min (inclusive) and max (exclusive)
44-
and writes it to out chan, waiting wait ms between until stop-atom is flagged."
45-
([out min max wait stop-atom]
46-
(loop []
47-
(let [val (+ min (rand-int (- max min)))
48-
put (a/>!! out val)]
49-
(when (and put (not @stop-atom))
50-
(^[long] Thread/sleep wait)
51-
(recur))))))
51+
and writes it to out chan, waiting wait ms between until running becomes false."
52+
[out min max wait flying]
53+
(while @flying
54+
(let [val (+ min (rand-int (- max min)))]
55+
(println* "SQUARK:" val)
56+
(if (a/>!! out val)
57+
(Thread/sleep ^long wait)
58+
(reset! flying false)))))
5259

5360
;; Born of wind and randomness, Talon is no ordinary bird.
5461
;; He executes his mission with the rhythm and the grace of chance incarnate.
55-
;; Talon embodies the eternal recurrence of the loop.
62+
;; Talon embodies an ongoing loop.
5663
;; An autonomous creature of purpose, relentless and unthinking.
5764
;; To be a process is to endure.
5865
;; Ever watchful, speaking in channels.
5966

6067
;; Fly Talon! Collect samples. Let's see what distribution you bring.
6168

6269
(let [c (a/chan)
63-
stop (atom false)
64-
n 100]
65-
(future (Talon c 0 20 0 stop))
70+
flying (atom true)
71+
n 20]
72+
(future (Talon c 0 12 0 flying))
6673
(let [samples (vec (repeatedly n (fn [] (a/<!! c))))]
67-
(reset! stop true)
74+
(reset! flying false)
6875
(-> (tc/dataset {:index (range n)
6976
:sample samples})
7077
(plotly/base {:=x :index
@@ -98,33 +105,43 @@
98105
(defn Randomius
99106
"Source proc for random stats"
100107
;; describe
101-
([] {:params {:min "Min value to generate"
102-
:max "Max value to generate"
103-
:wait "Time in ms to wait between generating"}
104-
:outs {:out "Output channel for stats"}})
108+
([]
109+
(println* "I am Randomius, I take numbers from Talon and send them out.")
110+
{:params {:min "Min value to generate"
111+
:max "Max value to generate"
112+
:wait "Time in ms to wait between generating"}
113+
:outs {:out "Output channel for stats"}})
105114

106115
;; init
107116
([args]
117+
(println* "Randomius initialing")
108118
(assoc args
109119
:clojure.core.async.flow/in-ports {:stat (a/chan 100)}
110-
:stop (atom false)))
120+
:flying (atom false)))
111121

112122
;; transition
113-
([{:keys [min max wait :clojure.core.async.flow/in-ports] :as state} transition]
123+
([{:keys [min max wait flying clojure.core.async.flow/in-ports] :as state} transition]
124+
(println* "Randomius transitioning" transition)
114125
(case transition
115126
:clojure.core.async.flow/resume
116-
(let [stop-atom (atom false)]
117-
(future (Talon (:stat in-ports) min max wait stop-atom))
118-
(assoc state :stop stop-atom))
127+
(do
128+
(when (not @flying)
129+
(println* "Talon, set flight!")
130+
(reset! flying true)
131+
(future (Talon (:stat in-ports) min max wait flying)))
132+
state)
119133

120134
(:clojure.core.async.flow/pause :clojure.core.async.flow/stop)
121135
(do
122-
(reset! (:stop state) true)
136+
(when @flying
137+
(println* "Talon, rest!")
138+
(reset! flying false))
123139
state)))
124140

125141
;; transform
126-
([state in msg]
127-
[state (when (= in :stat) {:out [msg]})]))
142+
([state input-id msg]
143+
(println* "Randomius transform" msg "from" input-id "to" :out)
144+
[state {:out [msg]}]))
128145

129146
;; Randomius, describe your duties!
130147
(Randomius)
@@ -146,18 +163,20 @@
146163
(a/<!!))
147164

148165
;; Transform with purpose.
166+
149167
(swap! state
150168
(fn [state]
151169
(let [[state step] (Randomius state :stat "I transform, therefore I am")]
152-
(println step)
170+
(println* step)
153171
state)))
154-
;; I see you wish to send a message to `stat`.
155-
;; Be wary in the future, speak only numbers to those who seek stats.
156172

173+
;; I see you wish to send a missive to `out`.
174+
;; Be wary in the future, send only numbers to those who seek stats.
157175

158176
;; Well done, Randomius.
159177
;; You are a true citizen.
160178
;; Now rest.
179+
161180
(swap! state Randomius :clojure.core.async.flow/stop)
162181

163182

@@ -178,21 +197,28 @@
178197

179198
(defn Tallystrix
180199
;; describe
181-
([] {:params {:min "Min value, alert if lower"
182-
:max "Max value, alert if higher"}
183-
:ins {:stat "Channel to receive stat values"
184-
:poke "Channel to poke when it is time to report a window of data to the log"}
185-
:outs {:alert "Notify of value out of range {:val value, :error :high|:low"}
186-
:workload :compute})
200+
([]
201+
(println* "I am Tallystrix, I take from stats or poke, and put to alert and notify")
202+
{:params {:min "Min value, alert if lower"
203+
:max "Max value, alert if higher"}
204+
:ins {:stat "Channel to receive stat values"
205+
:poke "Channel to poke when it is time to report a window of data to the log"}
206+
:outs {:alert "Notify of value out of range {:val value, :error :high|:low"}
207+
:workload :compute})
187208

188209
;; init
189-
([args] (assoc args :vals []))
210+
([args]
211+
(println* "Tallystrix initializing")
212+
(assoc args :vals []))
190213

191214
;; transition
192-
([state transition] state)
215+
([state transition]
216+
(println* "Tallystrix transitioning" transition)
217+
state)
193218

194219
;; transform
195220
([{:keys [min max vals] :as state} input-id msg]
221+
(println* "Tallystrix transforming" input-id msg)
196222
(case input-id
197223
:stat (let [state' (assoc state :vals (conj vals msg))
198224
msgs (cond
@@ -204,13 +230,12 @@
204230
{:clojure.core.async.flow/report (if (empty? vals)
205231
[{:count 0}]
206232
[{:avg (/ (double (reduce + vals)) (count vals))
207-
:count (count vals)}])}]
208-
[state nil])))
233+
:count (count vals)}])}])))
209234

210235
;; Tallystrix, what messages have you?
211236

212237
(let [state {:min 1 :max 5 :vals []}
213-
[state' msgs'] (Tallystrix state :stat 100)]
238+
[state' msgs'] (Tallystrix state :stat 7)]
214239
msgs')
215240

216241
;; Well alerted.
@@ -229,38 +254,47 @@
229254

230255
(defn Chronon
231256
;; describe
232-
([] {:params {:wait "Time to wait between pokes"}
233-
:outs {:out "Poke channel, will send true when the alarm goes off"}})
257+
([]
258+
(println* "I am Chronon, I poke out periodically")
259+
{:params {:wait "Time to wait between pokes"}
260+
:outs {:out "Poke channel, will send true when the alarm goes off"}})
234261

235262
;; init
236263
([args]
264+
(println* "Chronon initializing")
237265
(assoc args
238266
:clojure.core.async.flow/in-ports {:alarm (a/chan 10)}
239-
:stop (atom false)))
267+
:running (atom false)))
240268

241269
;; transition
242-
([{:keys [wait :clojure.core.async.flow/in-ports] :as state} transition]
270+
([{:keys [wait running clojure.core.async.flow/in-ports] :as state} transition]
271+
(println* "Chronon transitioning" transition)
243272
(case transition
244273
:clojure.core.async.flow/resume
245-
(let [stop-atom (atom false)]
246-
(future (loop []
247-
(let [put (a/>!! (:alarm in-ports) true)]
248-
(when (and put (not @stop-atom))
249-
(^[long] Thread/sleep wait)
250-
(recur)))))
251-
(assoc state :stop stop-atom))
274+
(do
275+
(when (not @running)
276+
(println* "Chronon running")
277+
(reset! running true)
278+
(future (while @running
279+
(if (a/>!! (:alarm in-ports) true)
280+
(Thread/sleep ^long wait)
281+
(reset! running false)))))
282+
state)
252283

253284
(:clojure.core.async.flow/pause :clojure.core.async.flow/stop)
254285
(do
255-
(reset! (:stop state) true)
286+
(when @running
287+
(println* "Chronon rests.")
288+
(reset! running false))
256289
state)))
257290

258291
;; transform
259-
([state in msg]
260-
[state (when (= in :alarm) {:out [true]})]))
292+
([state input-id msg]
293+
(println* "Chronon transforms" input-id msg "to" :out)
294+
[state {:out [msg]}]))
261295

262296
;; Chronon has no familiar to do his work,
263-
;; and listens to no-one.
297+
;; and listens only to himself.
264298

265299
;; ## Meet Claxxus, the Notifier, the Herald
266300

@@ -276,18 +310,25 @@
276310

277311
(defn Claxxus
278312
;; describe
279-
([] {:params {:prefix "Log message prefix"}
280-
:ins {:in "Channel to receive messages"}})
313+
([]
314+
(println* "I am Claxxus, I shout what I hear from in")
315+
{:params {:prefix "Log message prefix"}
316+
:ins {:in "Channel to receive messages"}})
281317

282318
;; init
283-
([state] state)
319+
([state]
320+
(println* "Claxxus initializing")
321+
state)
284322

285323
;; transition
286-
([state _transition] state)
324+
([state transition]
325+
(println* "Claxxus transitioning" transition)
326+
state)
287327

288328
;; transform
289-
([{:keys [prefix] :as state} _in msg]
290-
(println prefix msg)
329+
([{:keys [prefix] :as state} input-id msg]
330+
(println* "Claxxus transforming" input-id msg)
331+
(println* prefix msg)
291332
[state nil]))
292333

293334
;; Cursed to know only how to shout.
@@ -338,8 +379,6 @@
338379

339380
;; The city is ready, but not yet in action.
340381

341-
(datafy/datafy flow)
342-
343382
(def chs (flow/start flow))
344383

345384
chs
@@ -348,29 +387,53 @@ chs
348387
;; Tallystrix sends her summaries to `report`, dutifully.
349388
;; When something breaks it flows to `error`.
350389

351-
;; Claxxus does not speak of such failures.
352-
;; He is for alerts.
353-
;; Thresholds breached, events of note, things the city must hear.
390+
(a/poll! (:report-chan chs))
354391

355-
;; The city breathes, the asynchronous allegiance stirs.
392+
;; Start initialized, but transitioning has not occurred yet.
356393
;; Transition with order.
357394

358395
(flow/resume flow)
396+
(Thread/sleep 1)
397+
398+
;; The city breathes, the asynchronous allegiance stirs.
359399

360400
;; Transform with purpose.
361401

402+
(a/poll! (:report-chan chs))
403+
404+
(a/poll! (:report-chan chs))
405+
362406
(flow/inject flow [:Tallystrix :poke] [true])
363-
(flow/inject flow [:Tallystrix :stat] ["abc1000"]) ;; trigger an alert
364-
(flow/inject flow [:Claxxus :in] [:sandwich])
365407

366408
(a/poll! (:report-chan chs))
409+
410+
;; Mischief is afoot.
411+
412+
(flow/inject flow [:Tallystrix :stat] ["abc1000"])
367413
(a/poll! (:error-chan chs))
368414

369-
;; The flow can coordinate peace.
415+
;; Claxxus does not speak of such failures.
416+
;; He is for alerts.
417+
;; Thresholds breached, events of note, things the city must hear.
418+
419+
(flow/inject flow [:Claxxus :in] [:sandwich])
420+
(Thread/sleep 1)
421+
422+
;; Fluxus est graphum, fluxus est processus, fluxus est data.
423+
424+
(datafy/datafy flow)
425+
426+
;; The flow may coordinate peace.
370427

371428
(flow/pause flow)
429+
(Thread/sleep 1)
430+
431+
;; Pax optima rerum.
432+
433+
;; The flow may cease.
372434

373435
(flow/stop flow)
436+
(Thread/sleep 1)
374437

375438
;; The city falls silent.
376439

0 commit comments

Comments
 (0)