【问题标题】:How to implement asynchronous code that looks synchronous mimicking async / await?如何实现看起来同步模仿 async / await 的异步代码?
【发布时间】:2019-10-12 12:20:37
【问题描述】:

或者说,我想依靠epoll(或类似的)来编写看起来像普通代码但不依赖回调的异步网络代码。

代码必须看起来像同步代码,但不像同步代码而不是阻塞等待网络io,它必须暂停当前的协程,并在文件描述符准备好时重新启动它。

【问题讨论】:

    标签: asynchronous networking scheme chez-scheme


    【解决方案1】:

    我最初的想法是依靠生成器和yield。但是this was a mistake 部分被python 滥用yield from 的事实误导了。

    无论如何,guile 纤维是一个伟大的灵感和I adapted it to chez scheme

    这是一个示例服务器代码:

    (define (handler request port)
      (values 200 #f (http-get "https://httpbin.davecheney.com/ip")))
    
    (untangle (lambda ()
                (run-server "127.0.0.1" 8888)))
    

    handler 根据 httpbin 服务返回其 IP。在 call/cc 实际上 call/1cc 的帮助下,代码看起来是同步的。

    untangle 将使用作为参数传递的 lambda 启动事件循环!

    这里是run-server的定义:

    (define (run-server ip port handler)
      (log 'info "HTTP server running at ~a:~a" ip port)
      (let* ((sock (socket 'inet 'stream 'ipv4)))
        (socket:setsockopt sock 1 2 1) ;; re-use address
        (socket:bind sock (make-address ip port))
        (socket:listen sock 1024)
        (let loop ()
          (let ((client (accept sock)))
            (let ((port (fd->port client)))
              (spawn (lambda () (run-once handler port)))
              (loop))))))
    

    如您所见,没有回调。与简单的同步 Web 服务器唯一不同的是 spawn 过程,它将在自己的协程中处理请求。特别是accept 是异步的。

    run-once 只会将方案请求传递给handler 并取其 3 个值来构建响应。不是很有趣。看起来是同步的,实际上是异步的部分是上面的http-get

    我只会解释一下,accept 是如何工作的,因为 http-get 需要引入自定义二进制端口,但我只想说这是相同的行为......

    (define (accept fd)
      (let ((out (socket:%accept fd 0 0)))
        (if (= out -1)
            (let ((code (socket:errno)))
              (if (= code EWOULDBLOCK)
                  (begin
                    (abort-to-prompt fd 'read)
                    (accept fd))
                  (error 'accept (socket:strerror code))))
            out)))
    

    如您所见,它调用了一个过程abort-to-prompt,我们可以简单地调用它pause,它将“停止”协程并调用提示处理程序。

    abort-to-promptcall-with-prompt 合作。

    由于 chez 方案没有提示,所以我使用两个单镜头延续来模拟它call/1cc

    (define %prompt #f)
    (define %abort (list 'abort))
    
    (define (call-with-prompt thunk handler)
      (call-with-values (lambda ()
                          (call/1cc
                           (lambda (k)
                             (set! %prompt k)
                             (thunk))))
        (lambda out
          (cond
           ((and (pair? out) (eq? (car out) %abort))
            (apply handler (cdr out)))
           (else (apply values out))))))
    
    (define (abort-to-prompt . args)
      (call/1cc
       (lambda (k)
         (let ((prompt %prompt))
           (set! %prompt #f)
           (apply prompt (cons %abort (cons k args)))))))
    

    call-with-prompt 将启动一个名为%prompt 的全局set! 延续,这意味着THUNK 有一个提示。如果延续参数OUTcall-with-values 的第二个 lambda,以唯一对象 %abort 开头,则意味着通过 abort-to-prompt 到达延续。它将使用abort-to-prompt 延续和传递给call-with-prompt 延续参数(即(apply handler (cons k (cdr out))))的任何参数调用HANDLER

    abort-to-promp 将在代码执行存储在%prompt 中的提示的延续之后启动一个新的延续以能够返回。

    call-with-prompt 是事件循环的核心。这是它,分为两部分:

    (define (exec epoll thunk waiting)
      (call-with-prompt
       thunk
       (lambda (k fd mode) ;; k is abort-to-prompt continuation that
                           ;; will allow to restart the coroutine
    
         ;; add fd to the correct epoll set
         (case mode
           ((write) (epoll-wait-write epoll fd))
           ((read) (epoll-wait-read epoll fd))
           (else (error 'untangle "mode not supported" mode)))
         (scheme:hash-table-set! waiting fd (make-event k mode)))))
    
    (define (event-loop-run-once epoll waiting)
      ;; execute every callback waiting in queue, 
      ;; call the above exec procedure 
      (let loop ()
        (unless (null? %queue)
          ;; XXX: This is done like that because, exec might spawn
          ;; new coroutine, so we need to cut %queue right now. 
          (let ((head (car %queue))
                (tail (cdr %queue)))
            (set! %queue tail)
            (exec epoll head waiting)
            (loop))))
    
        ;; wait for ONE event
        (let ((fd (epoll-wait-one epoll (inf))
          (let ((event (scheme:hash-table-ref waiting fd)))
            ;; the event is / will be processed, no need to keep around
            (scheme:hash-table-delete! waiting fd)
            (case (event-mode event)
              ((write) (epoll-ctl epoll 2 fd (make-epoll-event-out fd)))
              ((read) (epoll-ctl epoll 2 fd (make-epoll-event-in fd))))
            ;; here it will schedule the event continuation that is the
            ;; abort-to-prompt continuation that will be executed by the
            ;; next call the above event loop event-loop-run-once
            (spawn (event-continuation event))))))
    

    我想就是这样。

    【讨论】:

      【解决方案2】:

      如果您使用的是 chez-scheme,则有 chez-a-sync。它使用 POSIX poll 而不是 epoll(epoll 是特定于 linux 的)。 guile-a-sync2 也可用于 guile-2.2/3.0。

      【讨论】:

        猜你喜欢
        • 2014-09-28
        • 1970-01-01
        • 1970-01-01
        • 2020-10-05
        • 2012-03-22
        • 1970-01-01
        • 2017-09-17
        • 1970-01-01
        相关资源
        最近更新 更多