【问题标题】:Pass and access request response object in child process using fork使用 fork 在子进程中传递和访问请求响应对象
【发布时间】:2021-06-15 04:29:51
【问题描述】:

我想知道如何使用 fork 将请求响应对象从父进程传递到子进程。 我做的是

var child = cp.fork(__dirname + '/childProcess.js', req, res);
child.on('message', function(m) 
{
    console.log("child process returned data " + m);
});
child.send("hello");

childProcess.js

var req = process.argv[2];
var res = process.argv[3];
process.on('message', (msg) => 
{
    console.log("req object :-" + req );
    console.log("res object :-" + res);
}
process.send("callparent");

这让我在子进程中未定义。我也试过 child.send("hello", req.socket ) 。但后来我无法在子进程中访问 req 的方法。它显示为圆形结构。

【问题讨论】:

    标签: node.js


    【解决方案1】:

    根据docs

    child.send(message[, sendHandle][, callback])

    可传递给child.send() 的可选sendHandle 参数用于将TCP 服务器或套接字对象传递给子进程。子进程将接收对象作为第二个参数传递给在process.on('message') 事件上注册的回调函数。

    这建议您应该在child.send() 函数中发送reqreq.socket 对象。

    另外,您没有从注册到process.on('message') 事件的函数的第二个参数访问reqreq.socket 对象。

    【讨论】:

      【解决方案2】:

      其他答案是告诉您将套接字发送给孩子并让孩子听并处理请求,这不是被问到的问题(如何向孩子发送 req/res)。如果您的目标是将套接字传递给孩子并让孩子听并处理请求,那么这些答案就可以了。如果您的目标是在主进程中侦听和预处理请求,并将这些请求传递给子进程进行进一步处理,请参见下文:

      简短的回答是,您实际上必须明确地将请求和响应发送到子进程(它们不会神奇地出现在子进程中)。但这也存在一些挑战。

      在我们的应用程序中,我们遇到了类似的问题。我们的主请求处理器检查请求以查看哪个子进程应该处理每个请求,然后将该请求发送到子进程(通过child.send())。

      在父子之间传递请求/响应的问题是您必须能够序列化使用send 来回发送的对象,尤其是请求对象无法序列化(它充满了循环引用,具有对许多其他对象的引用等)。

      所以我们所做的就是从子进程需要的请求中提取数据子集,然后我们通过child.send() 将那个“轻量级请求”对象发送给子进程,同时保留对原始请求的引用父进程上的队列。子进程完成处理后,通过parent.send()将其响应数据发送回父进程,父进程将其与待处理队列中的原始请求匹配,并使用响应数据满足请求。

      这种方法的优点(与仅在父/子之间建立自定义协议相反)是子请求处理代码仍在对请求/响应对象(只是它们的轻量级版本)进行操作。在我们的例子中,这意味着我们可以使用相同的请求处理代码,无论是在子进程中调用它,还是让进程直接监听和处理请求。

      例如,这里是如何制作一个轻量级、可序列化的请求对象并将其发送给子进程:

      var requestData = 
      {
          httpVersion: request.httpVersion,
          httpVersionMajor: request.httpVersionMajor,
          httpVersionMinor: request.httpVersionMinor,
          method: request.method,
          url: request.url,
          headers: request.headers,
          body: request.body
      }
      child.send(requestData);
      

      您也可以只使用 lodash pick 之类的东西来获取您想要的字段。

      此解决方案的棘手之处在于,如果您需要根据请求/响应调用任何方法,则必须在父级中执行此操作(实际对象所在的位置 - 子级所拥有的只是选定的数据)。

      【讨论】:

      【解决方案3】:

      您创建一个服务器,并将句柄发送给客户端。 例如,如果你使用 express,你可以这样做:

      Server.js

      child= require('child_process').fork('child.js');    
      const server = require('net').createServer();
      server.on('connection', function(socket)  {    
          child.send('handle', socket);
      
      }).listen(80);
      

      然后你创建一个孩子,并接受句柄:

      Child.js

      var app=require('express')()
      app.get('/',function(req,res){
           console.log(req.headers)
           res.send('Working!')
      })  
      
      var myServer=require('http').createServer(app)
      process.on('message', function(m, server)  {
        if (m === 'handle') myServer.listen(handle)
      });
      

      更多信息:

      【讨论】:

      • 感谢您的建议.. 我按照您所说的做了,但我怎么能在子进程中访问 req res 对象.. 另外我想要子进程中的 request.headers。你知道怎么做吗??
      • 像任何快递应用程序一样常规。 app.get('/',function(req,res){})。关于请求标头,您可以提出不同的问题 :) 您可以使用 req.headers 来获取它。
      • 编辑并添加了有关 res、res 和 headers 的信息
      • 这个答案很好,应该说明响应流表示你正在写入套接字,请求流是从套接字读取的。
      【解决方案4】:

      下面的示例可能有助于更好地说明这一点。 这有一个父母和一个孩子,父母创建一个服务器,并在客户端连接时,将套接字句柄传递给孩子。

      孩子只有一个消息循环,它在其中接收句柄,并且能够通过它到达客户端。

      $ cat foo.js

      const cp = require('child_process')
      const n = require('net')
      
      if (process.argv[2] === 'child') {
        process.on('message', (m, h) =>  {
          h.end('ok')
        })
      }
      else {
        const c = cp.fork(process.argv[1], ['child'])
        const s = n.createServer();
        s.on('connection', (h) => {
          c.send({}, h)
        }).listen(12000, () => {
        const m = n.connect(12000)
        m.on('data', (d) => {
          console.log(d.toString())
        })
       })
      }
      

      strace 与握手的相关部分。

      [pid 12055] sendmsg(11, {msg_name(0)=NULL, msg_iov(1)=[{"{\"cmd\":\"NODE_HANDLE\",\"type\":\"net.Socket\",\"msg\":{},\"key\":\"6::::12000\"}\n", 70}], msg_controllen=24, {cmsg_len=20, cmsg_level=SOL_SOCKET, cmsg_type=SCM_RIGHTS, {14}}, msg_flags=0}, 0) = 70
      
      [pid 12061] recvmsg(3, {msg_name(0)=NULL, msg_iov(1)=[{"{\"cmd\":\"NODE_HANDLE\",\"type\":\"net.Socket\",\"msg\":{},\"key\":\"6::::12000\"}\n", 65536}], msg_controllen=24, {cmsg_len=20, cmsg_level=SOL_SOCKET, cmsg_type=SCM_RIGHTS, {12}}, msg_flags=MSG_CMSG_CLOEXEC}, MSG_CMSG_CLOEXEC) = 70
      
      [pid 12061] getsockname(12, {sa_family=AF_INET6, sin6_port=htons(12000), inet_pton(AF_INET6, "::ffff:127.0.0.1", &sin6_addr), sin6_flowinfo=0, sin6_scope_id=0}, [28]) = 0
      
      [pid 12061] getsockopt(12, SOL_SOCKET, SO_TYPE, [1], [4]) = 0
      
      [pid 12061] write(3, "{\"cmd\":\"NODE_HANDLE_ACK\"}\n", 26) = 26
      
      [pid 12055] <... epoll_wait resumed> {{EPOLLIN, {u32=11, u64=11}}}, 1024, -1) = 1
      
      [pid 12055] recvmsg(11, {msg_name(0)=NULL, msg_iov(1)=[{"{\"cmd\":\"NODE_HANDLE_ACK\"}\n", 65536}], msg_controllen=0, msg_flags=MSG_CMSG_CLOEXEC}, MSG_CMSG_CLOEXEC) = 26
      

      如您所见,父子进程参与传输套接字句柄的协议,子进程根据接收到的句柄信息重新创建 net.Socket 对象。随后,子进程能够处理此连接下的客户端。

      此协议以及跨进程传输句柄和工作负载的方式是cluster 模块的核心和灵魂。

      【讨论】:

        【解决方案5】:

        您不能(至少在 Node.js

        首先用这个内容创建一个package.json文件:

        {
          "type": "module"
        }
        

        然后使用以下内容创建文件server.js

        import * as http from 'http'
        import * as child_process from 'child_process'
        import * as net from 'net'
        import * as fs from 'fs'
        import * as crypto from 'crypto'
        
        const ipcPrefix = (process.platform != 'win32' ? '/tmp/' : '\\\\.\\pipe\\') 
                        + crypto.randomBytes(8).toString('hex')
        const subprocess = child_process.fork('subprocess.js', {
          stdio: ['pipe', 'pipe', 'inherit', 'ipc']
        })
        let requestNumber = 0
        
        const server = http.createServer()
        .listen(8080)
        .on('request', (request, response) => {
          if (!subprocess.connected) {
            console.error('Subprocess not connected for: '+request.url)
            response.statusCode = 500
            response.end()
            return
          }
          const {headers, url, method} = request
          const ipcPath = ipcPrefix + requestNumber++ // a unique IPC path
          try {fs.unlinkSync(ipcPath)} catch{} // delete the socket file if it exist already
          let headerChunks = [], headerSize, sizeGotten = 0
          net.createServer() // the IPC server
          .listen(ipcPath, () => subprocess.send({cmd: 'req', headers, url, method, ipcPath}))
          .on('connection', socket => {
            socket.on('close', () => {
              response.end()
              fs.unlinkSync(ipcPath) // clean it up
            })
            socket.once('data', async chunk => {
              headerSize = chunk.readUInt32LE()
              headerChunks.push(chunk)
              sizeGotten += chunk.byteLength
              while (sizeGotten < headerSize) {
                let timer
                const timedOut = await Promise.race([
                  // race next packet or timeout timer
                  new Promise(resolve => {
                    socket.once('data', chunk => {
                      headerChunks.push(chunk)
                      sizeGotten += chunk.byteLength
                      resolve(false)
                    })
                  }),
                  new Promise(resolve => {timer = setTimeout(resolve, 2000, true)})
                ])
                clearTimeout(timer) // to not leave it hanging
                if (timedOut) {
                  response.statusCode = 500
                  // response.write('lol, bye') // optional error page
                  socket.destroy() // this causes the close event which ends the response
                  console.error('Subprocess response timed out...')
                  return // break the loop and exit function
                }
              }
              const data = Buffer.concat(headerChunks)
              const headerData = data.slice(4, 4+headerSize)
              const header = JSON.parse(headerData.toString())
              response.writeHead(header.statusCode, header.headers)
              if (data.byteLength - 4 - headerSize > 0) {
                response.write(data.slice(4 + headerSize))
              }
              socket.pipe(response) // just pipe it through
              //socket.on('data', response.write.bind(response)) // or do this
              // but set a timeout on the socket to close it if inactive
              socket.setTimeout(2000)
              socket.on('timeout', () => {
                socket.destroy()
                console.log('Subprocess response timed out...')
              })
            })
          })
        })
        

        然后创建文件subprocess.js。包含以下内容:

        import * as net from 'net'
        
        process.on('message', msg => {
          switch (msg.cmd) {
            case 'req': {
              const socket = net.createConnection({path: msg.ipcPath})
              // depending on {headers, url, method} in msg do your thing below
              writeHead(socket, 200, {'testHeader': 'something'})
              socket.end('hello world '+msg.url)
            } break
          }
        })
        
        function writeHead(socket, statusCode, headers) {
          const headerBuffer = new TextEncoder().encode(JSON.stringify({statusCode, headers}))
          const headerSize = Buffer.allocUnsafe(4)
          headerSize.writeUInt32LE(headerBuffer.byteLength)
          socket.write(headerSize)
          socket.write(headerBuffer)
        }
        

        现在运行服务器 node server 并在浏览器中转到 localhost:8080 以查看它的运行情况。

        如果你喜欢我的回答,请随时在 GitHub 上赞助我。

        【讨论】:

        • 我不明白为什么有人对我的答案投了反对票...这是最好的答案,它甚至可以与安全的 HTTPS 一起使用。
        猜你喜欢
        • 1970-01-01
        • 2018-07-16
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2016-12-10
        • 2018-01-12
        • 1970-01-01
        相关资源
        最近更新 更多