2014-11-26

Concurrent processing on Scheme

I'm trying to write concurrent library on Scheme, well more precisely aming to make a SRFI for this if I can. There are 2 reasons for doing this: one is because of handling shared memory manually is not something ordinally human being like me can do. The other one is not all implementation support SRFI-18/21. So my first thing to be resolved is checking which implementation supports which concurrency model. There are literally tons of implementations and my life time is not long enough to check all of them so I've check the ones listed on R6RS.org, known R7RS Scheme implementations and some of 'can't ignore them' R5RS implementations.

Starting with implementations which support SRFI-18:
  • Guile
  • Chicken (via Egg)
  • Sagittarius
  • Gambit
  • Gauche
  • Chibi Scheme
POSIX looks like thread model:
  •  Chez (not SRFI but very similar with POSIX)
  •  Racket (can be similar with POSIX but not quite)
  •  Scheme 48 (not SRFI but looks similar with POSIX) 
  •  Foment (not SRFI but looks similar with POSIX)
Message passing style concurrency model:
  • Racket (also supports message passing style)
  • Scheme 48 (can also be here I think)
  • Mosh
  • Ypsilon
Others:
  • Kawa (future? delay/force looks like syntax)
  • Racket (future, places so on... Racket is huge men!)
No builtin concurrency supported (I simply couldn't find so correct me):
  • Vicare (Ikarus as well I believe)
  • Larceny
  • IronScheme (at least not on Scheme, maybe possible on .NET?)
  • Picrin
I think this is pretty much enough. To check how much difference between these models, well more like between POSIX style and message passing style though, I've wrote couple of bank accounts. Let's start with SRFI-18:
#!r6rs
(import (rnrs) (srfi :18))

(define (open-account initial-amount)
  (let ((lock (make-mutex))
        (balance initial-amount))
    (lambda (operation amount)
      (dynamic-wind
          (lambda () (mutex-lock! lock))
          (lambda ()
            (case operation
              ((withdrow)
               (if (< balance amount)
                   (error 'withdrow "invalid amount")
                   (begin
                     (set! balance (- balance amount))
                     (values amount balance))))
              ((deposit)
               (if (negative? amount)
                   (error 'deposit "invalid amount")
                   (begin
                     (set! balance (+ balance amount))
                     (values 0 balance))))
              (else (error 'acount "invalid message"))))
          (lambda () (mutex-unlock! lock))))))

(define (print . args) (for-each display args) (newline))

(define client (open-account 1000))

(let-values (((money balance) (client 'withdrow 100)))
  (print money ":" balance))
(let-values (((money balance) (client 'deposit 100)))
  (print money ":" balance))

(print "do in parallel")

(let ((ts (map (lambda (msg amount)
                 (make-thread
                  (lambda ()
                    (thread-sleep! (inexact (/ amount 1000)))
                    (let-values (((money balance) (client msg amount)))
                      (print money ":" balance)))))
               '(withdrow deposit withdrow) '(1000 500 500))))
  (for-each thread-start! ts)
  (for-each thread-join! ts))
Next one is Racket. Racket has quite a lot of concurrent functionalities but for now I only used thread and asynchronous channel, and no semaphore. Thread mailbox can be used but it would be hard for me to integrate later.
#lang racket
(require racket/base)
(require racket/match)
(require racket/async-channel)

(define (open-account inital-amount out)
  (let ((mbox (make-async-channel)))
    (thread
     (lambda ()
       (define balance inital-amount)
       (let loop ()
         (match (async-channel-get mbox)
           ((list 'withdrow how-much)
            (if (< balance how-much)
                (begin (async-channel-put out "invalid amount") (loop))
                (begin
                  (set! balance (- balance how-much))
                  (async-channel-put out (cons how-much balance))
                  (loop))))
           ((list 'deposit a)
            (if (negative? a)
                (begin (async-channel-put out "invalid amount") (loop))
                (begin
                  (set! balance (+ balance a))
                  (async-channel-put out (cons 0 balance))
                  (loop))))
           ((list 'close) #t)
           (else "invalid message")))))
    mbox))

(define receipt (make-async-channel))
(define client (open-account 1000 receipt))

(async-channel-put client '(withdrow 100))
(async-channel-put client '(deposit 100))
(displayln (async-channel-get receipt))
(displayln (async-channel-get receipt))

(displayln "do in parallel")

(thread
 (lambda ()
   (sleep .2)
   (async-channel-put client '(withdrow 1000))
   (displayln (async-channel-get receipt))))

(thread
 (lambda ()
   (async-channel-put client '(deposit 500))
   (displayln (async-channel-get receipt))))

(thread
 (lambda ()
   (sleep .1)
   (async-channel-put client '(withdrow 500))
   (displayln (async-channel-get receipt))))

(sleep .5)
(async-channel-put client '(close))
Then Ypsilon. Ypsilon has almost subset of the one Racket has. I might need to use its shared queue/bag feature which I have no idea how to use...
(import (rnrs) (concurrent) (match) (only (core) format usleep))

(define (open-account inital-amount out)
  (let ((mbox (make-mailbox)))
    ;; This call-with-spawn is renamed to spawn* in trunk code.
    ;; So if you are using trunk version, make sure you are using
    ;; spawn* which does the same as call-with-spawn.
    (call-with-spawn
     (lambda ()
       (define balance inital-amount)
       (let loop ()
         (match (recv mbox)
           (('withdrow how-much)
            (if (< balance how-much)
                (begin (send out "invalid amount") (loop))
                (begin
                  (set! balance (- balance how-much))
                  (send out (cons how-much balance))
                  (loop))))
           (('deposit a)
            (if (negative? a)
                (begin (send out "invalid amount") (loop))
                (begin
                  (set! balance (+ balance a))
                  (send out (cons 0 balance))
                  (loop))))
           (('close) #t)
           (else "invalid message"))))
     (lambda (retval)
       (shutdown-mailbox out)
       (shutdown-mailbox mbox)
       (format (current-error-port) "## acount closed~%")))
    mbox))

(define receipt (make-mailbox))
(define client (open-account 1000 receipt))

(define (print . args) (for-each display args) (newline))

(send client '(withdrow 100))
(print (recv receipt))
(send client '(deposit 100))
(print (recv receipt))

(print "do in parallel")

(define count 100000)
(future
 ;; for some reason the thread didn't sleep with usleep...
 (let loop ((i 0) (r '()))
   (unless (= i count)
     (set! r (list i))
     (loop (+ i 1) r)))
 (send client '(withdrow 1000))
 (print (recv receipt)))
(future
 (send client '(deposit 500))
 (print (recv receipt)))
(future
 (send client '(withdrow 500))
 (print (recv receipt)))

(usleep 100000)
(send client '(close))
Tha last one is Mosh. The Mosh one is really not my cupa tea... Maybe it's only for me but feels too much restricted. In the thunk passed to spawn it can't refer any free variables or even global variable defined in the toplevel. But anyway this is the bank account.
(import (rnrs) (mosh concurrent) (match))

(define (open-account initial-amount)
  (let ((pid (spawn (lambda (x)
                      (define balance (car x))
                      (let loop ()
                        (receive
                            (('withdrow from amount)
                             (if (< balance amount)
                                 (! from "invalid amount")
                                 (begin
                                   (set! balance (- balance amount))
                                   (! from (cons amount balance))))
                             (loop))
                            (('deposit from amount)
                             (if (negative? amount)
                                 (! from "invalid amount")
                                 (begin
                                   (set! balance (+ balance amount))
                                   (! from (cons 0 balance))))
                             (loop))
                          (('close from) (! from "closed"))
                          (else (error 'acount "invalid message")))))
                    (list initial-amount)
                    '((rnrs) (mosh concurrent) (rnrs mutable-pairs)))))
    pid))

(define client (open-account 1000))
(define (print . args) (for-each display args) (newline))

(link client)

(! client `(withdrow ,(self) 100))
(receive ((money . balance) (print money ":" balance)))
(! client `(deposit ,(self) 100))
(receive ((money . balance) (print money ":" balance)))

(! client `(withdrow ,(self) 1500))
(receive ((money . balance) (print money ":" balance))
    (other (print other)))
In these small pieces of code, there is no big difference but message passing style always creates a thread when users open a new account. All message passing style concurrent functionalities hide resource synchronisation known as mutex/semaphore which I think what I want so that I can avoid handling lock/unlock manually. (I have no idea how many times I needed to cry because of deadlock or incorrect state...)


I believe as long as implementations support POSIX style thread model, it's not so difficult to implement this message passing style. However if I want to build a different concurrent model on top of other models, how much capability do those non POSIX models have? Can we implement Disruptor model on top of Ypsilon's concurrent library? (though, I didn't understand how disruptor works thoroughly...) Ultimately, which model would *the* next model?

Due to the lack of my knowledge, I don't have any conclusion yet. If you have any suggestion/good to read papers, please let me know.

No comments:

Post a Comment