【发布时间】:2022-01-05 05:08:39
【问题描述】:
我想将PipelineTask 的实例传递给PipelineTask.add,但是当我尝试时,我得到一个NameError,其中提到PipelineTask 未定义。
我相信这是因为 PipelineTask 仅在调用 PipelineTask.__init__() 之后才绑定。
class Task(BaseModel, abc.ABC):
id: str
@abc.abstractmethod
async def run(self):
pass
class PipelineTask(Task):
@abc.abstractmethod
async def run(self):
pass
def add(self, task: PipelineTask):
## TODO: how do a pass a instance of self here?
next = self
if self.next == None:
self.next = task
next = self.next
else:
current = self.next
while current != None:
current = current.next
next = current
current.next = next
return self
class Pipeline(BaseModel):
"""
Pipeline execute a sequence of tasks
...
init = PipelineTask(id=0)
pipeline = Pipeline(init=PrepareDataPipelineTask(id='prepare'))
pipeline.add(ExecutePipelineTask(id='execute')).add(CollectResultsPipelineTask(id='collect'))
pipeline.run()
...
"""
# The pipelines innitial task
init: PipelineTask
async def run(self):
await self.init.run()
has_next = self.init.next != None
next = self.init
while has_next:
next = next.next
await next.run()
has_next = next.next != None
## Adds a task to the end of the pipeline
async def add(self, next: PipelineTask):
"""add Task to the end of the pipeline"""
self.init.add(next)
class StdoutTask(PipelineTask):
async def run(self):
print(f"[Worker {self.id}] testing...")
async def test_create_pipeline():
tasks = (
StdoutTask(id=1, next=None)
.add(StdoutTask(id=2, next=None))
.add(StdoutTask(id=3, next=None))
)
pipeline = Pipeline(init=tasks)
await pipeline.run()
示例用法:
class StdoutTask(PipelineTask):
async def run(self):
print(f"[Worker {self.id}] testing...")
@pytest.mark.asyncio
async def test_create_pipeline():
tasks = StdoutTask(id=1).add(StdoutTask(id=2)).add(StdoutTask(id=3))
pipeline = Pipeline(init=tasks)
await pipeline.run()
pass
我尝试“取消”指定task 的类型,但随后task 缺少next 属性(例如AttributeError: 'XXXX' object has no attributed next)
def add(self, task):
...
我也尝试过修改task.__class__ = PipelineTask,它添加了额外的方法,但没有添加属性。
以下是单个文件可重现的示例
from pydantic import BaseModel
import abc
import asyncio
class Task(BaseModel, abc.ABC):
id: str
@abc.abstractmethod
async def run(self):
pass
class PipelineTask(Task):
@abc.abstractmethod
async def run(self):
pass
def add(self, task: PipelineTask):
## TODO: how do a pass a instance of self here?
next = self
if self.next == None:
self.next = task
next = self.next
else:
current = self.next
while current != None:
current = current.next
next = current
current.next = next
return self
class Pipeline(BaseModel):
"""
Pipeline execute a sequence of tasks
...
init = PipelineTask(id=0)
pipeline = Pipeline(init=PrepareDataPipelineTask(id='prepare'))
pipeline.add(ExecutePipelineTask(id='execute')).add(CollectResultsPipelineTask(id='collect'))
pipeline.run()
...
"""
# The pipelines innitial task
init: PipelineTask
async def run(self):
await self.init.run()
has_next = self.init.next != None
next = self.init
while has_next:
next = next.next
await next.run()
has_next = next.next != None
## Adds a task to the end of the pipeline
async def add(self, next: PipelineTask):
"""add Task to the end of the pipeline"""
self.init.add(next)
class StdoutTask(PipelineTask):
async def run(self):
print(f"[Worker {self.id}] testing...")
async def test_create_pipeline():
tasks = (
StdoutTask(id=1, next=None)
.add(StdoutTask(id=2, next=None))
.add(StdoutTask(id=3, next=None))
)
pipeline = Pipeline(init=tasks)
await pipeline.run()
解决方案
使用getattr
def add(self, task: "PipelineTask"):
next = getattr(self, "next", None)
if self.next == None:
self.next = task
next = self.next
else:
current = getattr(self, "next", None)
while current != None:
current = getattr(current, "next", None)
next = getattr(current, "next", None)
current.next = next
return self
【问题讨论】:
-
您可以将类型前向声明为字符串文字,例如
def add(self, task: 'PipelineTask'). -
感谢您的提示,但不幸的是它没有添加下一个属性,我仍然遇到同样的错误。
-
这就是你的全部代码吗?因为您在尝试在任何地方读取它之前没有分配
next属性。 -
我正在使用 pydantic,所以生成了
__init__参见:pydantic-docs.helpmanual.io/usage/models/#basic-model-usage
标签: python python-3.x pydantic