PasteRack.org
Paste # 28883
2020-05-23 12:44:05

Fork as a new paste.

Paste viewed 351 times.


Embed:

#lang racket

(require racket/future
         racket/fixnum
         future-visualizer)

(struct mfqueue
  (
   (head #:mutable)
   (tail #:mutable)
   fsema-count
   fsema-lock
   )
  #:transparent)

(define (make-mfqueue)
  (mfqueue
   #f
   #f
   (make-fsemaphore 0)
   (make-fsemaphore 1)))

(define (mfqueue-enqueue! mfq v)
  (fsemaphore-wait (mfqueue-fsema-lock mfq))
  (define node (mcons v #f))
  (cond ((mfqueue-head mfq)
         (set-mcdr! (mfqueue-tail mfq) node)
         (set-mfqueue-tail! mfq node)
         )
        (else
         (set-mfqueue-head! mfq node)
         (set-mfqueue-tail! mfq node)
         ))
  (fsemaphore-post (mfqueue-fsema-count mfq))
  (fsemaphore-post (mfqueue-fsema-lock mfq)) 
  (void))

(define (mfqueue-dequeue! mfq)
  (fsemaphore-wait (mfqueue-fsema-count mfq))
  (fsemaphore-wait (mfqueue-fsema-lock mfq))
  (define head (mfqueue-head mfq))
  (define v (mcar head))
  (cond ((mcdr head)
         (set-mfqueue-head! mfq (mcdr head)))
        (else
         (set-mfqueue-head! mfq #f)
         (set-mfqueue-tail! mfq #f))) ; not really needed, but allows GC'ing
  (fsemaphore-post (mfqueue-fsema-lock mfq))
  v)

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

(define mfq (make-mfqueue))

(define max-depth (inexact->exact (ceiling (log (processor-count) 2))))
(define num-futures (expt 2 max-depth))
(displayln (~a "max-depth=" max-depth " num-futures=" num-futures))

(define counters (make-fxvector num-futures))

(define (start-workers (depth 0) (fid 0))
  (cond ((fx< depth max-depth)
         (let ((f (future
                   (λ ()
                     (start-workers (fx+ depth 1) fid)))))
           (start-workers (fx+ depth 1) (fxior fid (fxlshift 1 depth)))
           (touch f)))
        (else
         ; worker code
         (let loop ()
           (define action (mfqueue-dequeue! mfq))
           (when (fx= action 1)
             (fxvector-set! counters fid (fx+ (fxvector-ref counters fid) 1))
             (loop)))
         )))

(define (do-start-workers)
  (future
   (λ ()
     (start-workers))))

(define (print-mfqueue mfq)
  (displayln (~a "count = " (fsemaphore-count (mfqueue-fsema-count mfq))))
  (displayln (~a "lock = " (fsemaphore-count (mfqueue-fsema-lock mfq))))
  (displayln mfq))

(displayln "starting")
(define workers (do-start-workers))
(displayln "started")
(for ((i 10000))
  (mfqueue-enqueue! mfq 1))
(for ((i num-futures))
  (mfqueue-enqueue! mfq 0))
 (displayln (fsemaphore-count (mfqueue-fsema-count mfq)))
 (displayln (fsemaphore-count (mfqueue-fsema-lock mfq)))
 (displayln counters)
(with-handlers ((exn:break? (λ (ex)
                              (newline)
                              (print-mfqueue mfq))))
  (touch workers))
(displayln "done")
(print-mfqueue mfq)
(displayln counters)