I am new to programming, as well as to Common Lisp. I am trying to solve the following problem:
There are stream sources (S1, S2, S3), two processors (P1, P2) (by processor I don't mean CPU-processor, but rather processing functions/subsystems), and four stream destinations (D1, D2, D3, D4). By stream, I mean data comes continuously, but could be intermittently. So read/write actions might block. The data from stream need to be aggregated, and then depending upon what they are, to be processed by one of the processors. Each processor produces (from its input aggregate) four kinds of sub-aggregates which then will be sent to one of the four aforementioned destinations.
The pseudo-code I have for the problem is like so:
Three loops, one each for S1, S2, S3, for data aggregation and dispatch. For a particular source(say S2) pseudo-code is:
loop-2 : read from S2,
when sufficient data aggregate,
if aggregate is kind-1 send to P1
else send to P2.
Two loops, one each for P1, P2, for processing and de-aggregation. For a particular processor(say P1) pseudo-code is:
In a loop for P1: take aggregate of kind-1,
process it to produce one or some of four sub-aggregates.
case: sub-aggregate 1 -> serialize and send/write to D1,
sub-aggregate 2 -> serialize and send/write to D2,
sub-aggregate 3 -> serialize and send/write to D3,
sub-aggregate 4 -> serialize and send/write to D4.
Between processor and destination I want to use another channel/queue, but have not mentioned to keep my question simpler for myself.
In my understanding, there are about ten blocking points. Three, if data not available in sources (S1, S2, S3); three when insufficient data at processors (P1, P2), and four when destinations (D1, D2, D3, D4) are not ready for write.
So my question: How do I run all these loops at the same time (is concurrently the word?) without they blocking each other; as well as the other things that are being done in the program.
Prior work: I have looked at this SO question, pollers and notifiers from cl-async. Looking at Cliki-Concurrency, I also saw threading-queue. Not to mention cl-flow and green-threads. CL-flow mentions it is a "Library for asynchonous non-blocking concurrency in Common Lisp.". Is there a way, I could use a queue library with cl-async or cl-flow? For example, flow-concurrently seems the right thing. But can it be used with green-thread of cl-async to run non-terminating loops?
My Difficulty: I have tried to read each link that I mentioned above and to understand as much as I could. But, to be honest, I am unable to make out where/how to even begin. Thank you for help.
Edit: Feb 28, 2020.
I tried using cl-flow like so. After initializing the flow system
(ql:quickload "simple-flow-dispatcher")
(ql:quickload "cl-muth")
(ql:quickload "cl-flow")
(ql:quickload "bt-semaphore")
(use-package :cl-flow :bt-semaphore)
(defvar *output* *standard-output*)
(defun print-error (e)
(format *output* "~A" e))
(defvar *dispatcher* (simple-flow-dispatcher:make-simple-dispatcher
:threads 4
:error-handler #'print-error))
(defun run-flow (flow)
(run *dispatcher* flow))
I used the following functions:
(defun hello-world (n)
(sleep n)
(format t (concatenate 'string (gap n) "Loop-" (write-to-string n) "~%")))
(defun gap(n)
(if (= n 0)
""
(concatenate 'string " " (gap (- n 1)))))
(defun loop-hello (n)
(loop
(hello-world n)))
(defun test-flow ()
(run-flow
(flow:concurrently
(flow:asynchronously
(loop-hello 3))
(flow:asynchronously
(loop-hello 2))
(flow:asynchronously
(loop-hello 1)))))
hello-world is a simple function, and I incorporate function gap to insert some empty space before the text. As out put, I was expecting three columns of printed rows interleaved (concurrently, asynchronously, etc), but I got only the one (the first loop-hello) column.
My current understanding is as follows, please correct me: Both loop and sleep are synchronous statements so the execution gets blocked. So I searched for async version of loop and sleep. I am wondering if I can do it using cl-async.
In the mean time I was trying to simplify my problem. Yesterday, I also stumbled upon this question, and its answer. I am trying to understand it.
Further, since I posted my original question using pseudo-code, I was wondering how I would like (say) a library to provide functionality for me (even if it is non existent as of now).
I tried searching async code in google and most results show javascript libraries. Until now I could vaguely grasp only two. Python, Clojure(Java). Writing for a small function that reads async from q1, processes the value read v1 to v2, and writes to q2 (q1 and q2 being queues). To me, it appears as if I could write (say) like so:
async def p1(x):
#compute v2 from v1 and return v2
async def keep_doing_p1(q1, q2):
while True:
v1 = await q1.pop()
v2 = p1(v1) # function p1 defined above
await q2.push(v2)
Of course, suitable implementations of q1 and q2 will have to be there. And in Clojure(Java), using the core.async lib appropriately:
(def q1 (async/chan))
(def q2 (async/chan))
(defn p1 [v1]
;;; Need a transducer version
;;; compute v2 from v1 and return)
(async/go
(-->
(async/< q1)
(p1)
(async/q2)))
The code I have posted above could be wrong. I have written them based on my (very likely) superficial understanding. I have no Python, Clojure(Java) experience. The examples are not to incite any flame wars either. I think all languages can be (are) good, powerful, convenient, and cater to different human programmers. I would like to solve the problem in CL. I am a senior person (by age) but a junior, in fact a novice (in programming). I am really loving writing it in CL. It is not that I have become or will soon become a great programmer. But with CL I feel, OMG even I can write programs!
Be that as it may, so coming back to question. So I would like a library to provide syntax/functionality like so:
(cc-forever q1 q2 p1
(push q2 (p1 (pop q1))))
;;; many other such cc-forever, operating concurrently
As you could have guessed, cc-forever means (ConCurrently-run FOREVER).
So I am looking for help in making such a function/library. Please suggest, better/simpler alternatives if my thought process is still complicated. Thanks.
Will post my attempts again, later.
Edit: 27-May-2020
I am continuing to explore cl-async and cl-flow. I recently came across with-green-thread macro implemented on top of cl-async. As a newbie what I face dual-problem: these libraries use different syntax and likely use it to do different things semantically. Currently I feel encouraged about with-green-thread, and cl-flow: repeatedly. Though sometimes code throws up error (could be my mishandling of some quicklisp issue or something). Will keep posting. If there is any feedback, will be helpful.