【问题标题】:Spray reverse proxy: keep transferring data after client has disconnected喷雾反向代理:客户端断开连接后继续传输数据
【发布时间】:2016-04-14 14:05:44
【问题描述】:

我正在尝试使用 Spray/Akka 实现反向 HTTP 代理,但遇到了麻烦。我发现在某些情况下,即使客户端断开连接,我的代理服务器也会继续从上游服务器接收数据。

这是我实现我的 Spray 代理指令的方式(只是对 bthuillier's implementation 稍作修改):

trait ProxyDirectives {

  private def sending(f: RequestContext ⇒ HttpRequest)(implicit system: ActorSystem): Route = {
    val transport = IO(Http)(system)
    ctx ⇒ transport.tell(f(ctx), ctx.responder)
  }

  /**
    * Re-shape the original request, to match the destination server.
    */
  private def reShapeRequest(req: HttpRequest, uri: Uri): HttpRequest = {
    req.copy(
      uri = uri,
      headers = req.headers.map {
        case x: HttpHeaders.Host => HttpHeaders.Host(uri.authority.host.address, uri.authority.port)
        case x => x
      }
    )
  }

  /**
    * proxy the request to the specified uri
    *
    */
  def proxyTo(uri: Uri)(implicit system: ActorSystem): Route = {
    sending(ctx => reShapeRequest(ctx.request, uri))
  }
}

如果我在客户端和服务器之间放置一个代理层(即客户端 proxyTo 服务器),这个反向代理会很好用,但是如果我在客户端和服务器之间放置两层就会有问题服务器。例如,如果我有以下简单的 Python HTTP 服务器:

import socket
from threading import Thread, Semaphore
import time

from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
from SocketServer import ThreadingMixIn


class MyHTTPHandler(BaseHTTPRequestHandler):
    protocol_version = 'HTTP/1.1'

    def do_GET(self):
        self.send_response(200)
        self.send_header('Transfer-Encoding', 'chunked')
        self.end_headers()

        for i in range(100):
            data = ('%s\n' % i).encode('utf-8')
            self.wfile.write(hex(len(data))[2:].encode('utf-8'))
            self.wfile.write(b'\r\n')
            self.wfile.write(data)
            self.wfile.write(b'\r\n')
            time.sleep(1)
        self.wfile.write(b'0\r\n\r\n')


class MyServer(ThreadingMixIn, HTTPServer):
    def server_bind(self):
        HTTPServer.server_bind(self)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

    def server_close(self):
        HTTPServer.server_close(self)


if __name__ == '__main__':
    server = MyServer(('127.0.0.1', 8080), MyHTTPHandler)
    server.serve_forever()

这基本上什么都不做,只是打开一个分块响应(用于长期运行,以便我们可以检查问题)。如果我按以下方式链接两层代理:

class TestActor(val target: String)(implicit val system: ActorSystem) extends Actor
  with HttpService
  with ProxyDirectives
{
  // we use the enclosing ActorContext's or ActorSystem's dispatcher for our Futures and Scheduler
  implicit private def executionContext = actorRefFactory.dispatcher

  // the HttpService trait defines only one abstract member, which
  // connects the services environment to the enclosing actor or test
  def actorRefFactory = context

  val serviceRoute: Route = {
    get {
      proxyTo(target)
    }
  }

  // runs the service routes.
  def receive = runRoute(serviceRoute) orElse handleTimeouts

  private def handleTimeouts: Receive = {
    case Timedout(x: HttpRequest) =>
      sender ! HttpResponse(StatusCodes.InternalServerError, "Request timed out.")
  }
}

object DebugMain extends App {
  val actorName = "TestActor"
  implicit val system = ActorSystem(actorName)

  // create and start our service actor
  val service = system.actorOf(
    Props { new TestActor("http://127.0.0.1:8080") },
    s"${actorName}Service"
  )
  val service2 = system.actorOf(
    Props { new TestActor("http://127.0.0.1:8081") },
    s"${actorName}2Service"
  )

  IO(Http) ! Http.Bind(service, "::0", port = 8081)
  IO(Http) ! Http.Bind(service2, "::0", port = 8082)
}

使用curl http://localhost:8082连接代理服务器,你会看到Akka系统在curl被杀死后仍然在传输数据(你可以打开DEBUG级别的日志查看详情)。

我该如何处理这个问题?谢谢。

【问题讨论】:

    标签: scala akka spray akka-http


    【解决方案1】:

    嗯,原来这是一个非常复杂的问题,而我的解决方案需要将近 100 行代码。

    实际上,问题不仅存在于我堆叠两层代理时。当我使用一层代理时,问题确实存在,但没有打印日志,所以我之前没有意识到这个问题。

    关键问题是,虽然我们使用IO(Http) ! HttpRequest,但它实际上是来自spray-can 的主机级API。主机级 API 的连接由 Spray HttpManager 管理,我们的代码无法访问。因此,我们对该连接无能为力,除非我们向IO(Http) 发送Http.CloseAll,这将导致所有上游连接关闭。

    (如果有人知道如何从HttpManager 获得连接,请告诉我)。

    我们必须使用 spray-can 的连接级 API 来应对这种情况。所以我想出了这样的东西:

    /**
      * Proxy to upstream server, where the server response may be a long connection.
      *
      * @param uri Target URI, where to proxy to.
      * @param system Akka actor system.
      */
    def proxyToLongConnection(uri: Uri)(implicit system: ActorSystem): Route = {
      val io = IO(Http)(system)
    
      ctx => {
        val request = reShapeRequest(ctx.request, uri)
    
        // We've successfully opened a connection to upstream server, now start proxying data.
        actorRefFactory.actorOf {
          Props {
            new Actor with ActorLogging {
              private var upstream: ActorRef = null
              private val upstreamClosed = new AtomicBoolean(false)
              private val clientClosed = new AtomicBoolean(false)
              private val contextStopped = new AtomicBoolean(false)
    
              // Connect to the upstream server.
              {
                implicit val timeout = Timeout(FiniteDuration(10, TimeUnit.SECONDS))
                io ! Http.Connect(
                  request.uri.authority.host.toString,
                  request.uri.effectivePort,
                  sslEncryption = request.uri.scheme == "https"
                )
                context.become(connecting)
              }
    
              def connecting: Receive = {
                case _: Http.Connected =>
                  upstream = sender()
                  upstream ! request
                  context.unbecome()  // Restore the context to [[receive]]
    
                case Http.CommandFailed(Http.Connect(address, _, _, _, _)) =>
                  log.warning("Could not connect to {}", address)
                  complete(StatusCodes.GatewayTimeout)(ctx)
                  closeBothSide()
    
                case x: Http.ConnectionClosed =>
                  closeBothSide()
              }
    
              override def receive: Receive = {
                case x: HttpResponse =>
                  ctx.responder ! x.withAck(ContinueSend(0))
    
                case x: ChunkedMessageEnd =>
                  ctx.responder ! x.withAck(ContinueSend(0))
    
                case x: ContinueSend =>
                  closeBothSide()
    
                case x: Failure =>
                  closeBothSide()
    
                case x: Http.ConnectionClosed =>
                  closeBothSide()
    
                case x =>
                  // Proxy everything else from server to the client.
                  ctx.responder ! x
              }
    
              private def closeBothSide(): Unit = {
                if (upstream != null) {
                  if (!upstreamClosed.getAndSet(true)) {
                    upstream ! Http.Close
                  }
                }
                if (!clientClosed.getAndSet(true)) {
                  ctx.responder ! Http.Close
                }
                if (!contextStopped.getAndSet(true)) {
                  context.stop(self)
                }
              }
            } // new Actor
          } // Props
        } // actorOf
      } // (ctx: RequestContext) => Unit
    }
    

    代码有点长,我怀疑应该有一些更干净简单的实现(实际上我对 Akka 并不熟悉)。不过,这段代码有效,所以我把这个解决方案放在这里。如果您找到了更好的解决方案,您可以免费发布您对此问题的解决方案。

    【讨论】:

      猜你喜欢
      • 2012-11-15
      • 1970-01-01
      • 2015-12-23
      • 1970-01-01
      • 1970-01-01
      • 2012-05-31
      • 1970-01-01
      • 2012-11-09
      • 2013-04-17
      相关资源
      最近更新 更多