Paste # 93349
2019-03-14 09:10:27

Fork as a new paste.

Paste viewed 191 times.


(define (batched on-batch #:wait-msec wait #:max-count max-count)
  (define ch (make-async-channel))
  (define (put v) (async-channel-put ch v))
  (define exit-sym (gensym 'batched-exit-message-))
  (define thd (with-thread
                (let recur ([vs    '()]
                            [count 0]
                            [timer never-evt])
                  (define v (sync timer ch))
                  (cond [(equal? v exit-sym)
                         (unless (empty? vs)
                           (on-batch (reverse vs)))]
                        [(equal? v timer) ;timer expired: send accumulated
                         (unless (empty? vs)
                           (on-batch (reverse vs)))
                         (recur '() 0 never-evt)]
                        [(< (add1 count) max-count) ;accumulate and wait
                         (recur (cons v vs)
                                (add1 count)
                                (alarm-evt (+ (current-inexact-milliseconds)
                        [else ;max count: send accumulated
                         (on-batch (reverse (cons v vs)))
                         (recur '() 0 never-evt)]))))
  (plumber-add-flush! (current-plumber)
                      (λ (_flush-handle)
                        (put exit-sym)
                        (sync thd)))

(when (and (current-log-aws-host)
  ;; Create the receiver first so available earlier in startup
  (define lls (map string->symbol (string-split (current-log-levels))))
  (define receiver (apply make-log-receiver (list* (current-logger) lls)))
  ;; PutLogEvents has a limit of 5 requests per second (per log stream)
  ;; so accumulate/batch messages.
  (define aws-put-one (batched #:wait-msec 2000
                               #:max-count 1000
    (let loop ()
      (match (sync receiver)
        [(vector level message _v topic)
         (aws-put-one (hasheq 'message   @~a{[@level] @(sanitize message)}
                              'timestamp (* 1000 (current-seconds))))]
        [_ (void)])
(define (sanitize s)
  (regexp-replace* #px"#:pass \".*?\""
                   "#:pass \"******\""))