【问题标题】:Calling asynchronous socket.io io.emit() in synchronous way以同步方式调用异步socket.io io.emit()
【发布时间】:2021-11-27 12:24:43
【问题描述】:

这里对 socket io 完全陌生。

我想发出在我的服务器上运行的循环中运行的数据。循环运行一段时间(几秒钟,最多几分钟),我想从我的节点服务器发出数据,然后我想在浏览器的客户端可视化(使用图表 js)。基本上是实时数据图表。

出现的问题是,显然只有在循环完成后才会触发发射。据我了解,因为 emit 函数是异步的(非阻塞的)。而且由于循环本身会阻塞直到完成(最初使用 while 循环),所以异步调用只会在之后触发。对吗?

while (CONDITION) {
  ...
io.emit('data update', DATA);
}

现在我想出了自己的解决方案,但感觉很笨拙。我想知道是否有更好的,更多的书本方法来做到这一点。我现在正在做的是递归调用循环步骤(最后我调用发射)作为对持续时间设置为 0 的 setTimeout() 的回调。

function loopStep() {
  if (CONDITION) {
    ...
    io.emit('data update', DATA);
    setTimeout(loopStep,0)
  }
}

setTimeout(loopStep,0);

我的问题是,还有其他更好的方法吗?

编辑:

我被要求添加完整的代码。

这是早期(循环)版本的完整代码代码(据我所知,不得不在这里快速重构):

let stp = 0;
while (stp < maxLogStep) {
    let new_av_reward;

        var smallStep = 0;
        

        while (smallStep < maxSmallStep) {


            let ran1 = get_random();
            let cum_prob = 0;

            let chosen_action = pref.findIndex((el, i) => {
                let pro = prob(i);
                let result = pro + cum_prob > ran1;
                cum_prob += pro;
                return result;
            })

            let cur_reward = get_reward(chosen_action);
            if (stp*maxSmallStep + smallStep == 0) {
                new_av_reward = cur_reward
            } else {
                new_av_reward = prev_av_reward + (cur_reward - prev_av_reward) / (stp * maxSmallStep + smallStep)
            }
            let cur_prob = prob(chosen_action);

            pref.forEach((element, index, array) => {
                if (chosen_action === index) {
                    array[index] = element + stepSize * (cur_reward - new_av_reward) * (1 - cur_prob)
                } else {
                    array[index] = element - stepSize * (cur_reward - new_av_reward) * (cur_prob)
                }
            });
            prev_av_reward = new_av_reward;
            smallStep++
        }


        io.emit('graph update', {
            learnedProb: [prob(0), prob(1), prob(2)],
            averageReward: new_av_reward,
            step: stp * maxSmallStep + smallStep
        });
        stp++
    };

这里是带有 setTimeout 的递归函数:

function learnStep(stp, prev_av_reward) {
    let new_av_reward;
    if (stp < maxLogStep) {
        var smallStep = 0;
        

        while (smallStep < maxSmallStep) {


            let ran1 = get_random();
            let cum_prob = 0;

            let chosen_action = pref.findIndex((el, i) => {
                let pro = prob(i);
                let result = pro + cum_prob > ran1;
                cum_prob += pro;
                return result;
            })

            let cur_reward = get_reward(chosen_action);
            if (stp*maxSmallStep + smallStep == 0) {
                new_av_reward = cur_reward
            } else {
                new_av_reward = prev_av_reward + (cur_reward - prev_av_reward) / (stp * maxSmallStep + smallStep)
            }
            let cur_prob = prob(chosen_action);

            pref.forEach((element, index, array) => {
                if (chosen_action === index) {
                    array[index] = element + stepSize * (cur_reward - new_av_reward) * (1 - cur_prob)
                } else {
                    array[index] = element - stepSize * (cur_reward - new_av_reward) * (cur_prob)
                }
            });
            prev_av_reward = new_av_reward;
            smallStep++
        }


        io.emit('graph update', {
            learnedProb: [prob(0), prob(1), prob(2)],
            averageReward: new_av_reward,
            step: stp * maxSmallStep + smallStep
        });
        setTimeout(learnStep, 0, stp + 1, new_av_reward);
    };

【问题讨论】:

  • 您的意思是使用变量的getter/setter 作为WINDOWS 对象的属性,并从setter 内部调用emit()?我认为这不会改变任何事情,因为 io.emit() 仍然是异步的,并且只会在循环完成后处理。
  • 查看emit 的源代码我不认为它是异步的。你能发布你的while循环的完整代码吗?
  • 我添加了它。我担心这会更加混乱。相关位在我认为顶部的代码 sn-p 中。希望 while 版本是正确的,因为我是从内存中重建的。
  • @tromgy 你有给我发出的源代码的链接吗?

标签: javascript socket.io


【解决方案1】:

如果您的目标是确保在发出完成并且客户端接收并确认该数据之前保持您的 while 循环,您可能需要考虑使用 Promises 和 socket.io 确认功能,如下例所示:

服务器端

io.on("connection", async (socket) => {

  // ...
  while (CONDITION) {
    //  ...
    await new Promise((resolve, reject) => {
      io.emit('data update', data, (ack) => {
        //client acknowledged received
        resolve(true)
      });        
    });
  }
  
});

客户端

socket.on("data update", (data, fn) => {
  console.log(data)
  fn("ack");
});

【讨论】:

    猜你喜欢
    • 2011-08-22
    • 2019-05-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多