【问题标题】:Using context for timeouts in a channel based pipeline在基于通道的管道中使用上下文超时
【发布时间】:2020-09-23 22:40:01
【问题描述】:

我的设计利用了工作人员之间的通道管道。基本设计是将任务放置在扇出通道上,由长期以来喜欢的 gorouteans 充当工人,消耗来自通道的工作并处理它们。

+-------------+                        +--------+
| task source |  == fan out channels =>| worker |
+-------------+                        +--------+

每个工作人员 goroutean 都在进程的生命周期中存在,并将处理多个作业。

但是,由于新的要求,我现在必须实现超时以完成通过系统处理任务。我的理解是,跨函数的事情的超时通常是通过上下文来实现的。

但是我不确定如何在通过通道连接的工作人员之间传播上下文。

或者我问错了问题,他们是另一种更适合我要求的解决方案吗?

【问题讨论】:

  • 在创建所有 goroutine 时将相同的上下文传递给它们。
  • 这取决于基于工作的超时还是工人超时?能不能加个流程图让它更清楚
  • @ShubhamSrivastava 我已经做出改变,希望能澄清这一点。超时基于作业。
  • 将上下文作为作业的一部分传递(即您通过通道发送给工作人员的消息)。

标签: go


【解决方案1】:

传入goroutine通道和上下文就可以了,像这样:

// ctx created earlier for all goroutines

for i := 0; i < workersCount; i++ {
    go worker(ctx, jobsChan)
}

所以现在您可以让所有工作人员超时。

但是可能您的超时与 1 个工作人员中的 1 个特定工作有关,与其他工作人员和工作无关,因此您需要为每个工作处理创建上下文,如下所示:

for i := 0; i < workersCount; i++ {
    go worker(jobsChan)
}

func worker(jobs <-chan Job) {
   // read from chan
   // and create ctx for job
   ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
   // ...
}

或者(如果我得到@Adrian 的观点正确)你可以这样做:

ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
job.ctx = ctx
job.ctxCancel = cancel
// put job int chan

for i := 0; i < workersCount; i++ {
    go worker(jobsChan)
}

func worker(jobs <-chan Job) {
   // read from chan
   // deal with job.ctx
   // ...
}

【讨论】:

  • 这仅适用于所有作业的超时时间相同的情况,并且超时时间是从作业被提取时开始,而不是从提交时开始(这很不寻常) .
  • @Adrian 关于何时提交 - 只需添加工作信息,如带 unix 时间戳的 submited_at 和工作人员中,当工作被拾起时,只需进行数学计算并计算实际时间差......
  • 这意味着您可能正在创建一个已经过期的上下文,并将该责任置于工作人员身上;只为作业创建上下文并将上下文包含在作业中似乎更简洁,而不是在作业中包含时间戳并在其他地方创建上下文。
  • @Adrian 如果重点是为工作分配上下文 - 我同意,它也是有效的选项,并且在创建工作后的超时方面更精确。更新了我的答案,将其添加为另一个选项。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2014-06-28
  • 1970-01-01
  • 2016-06-18
  • 1970-01-01
  • 2021-03-27
  • 2012-08-10
  • 1970-01-01
相关资源
最近更新 更多