我找到了! Aaand 我会将错误报告给 GCP Composer 团队
因此,如果配置 webserver.reload_on_plugin_change=True 则 cli 将进入该部分:
https://github.com/apache/airflow/blob/4aec433e48dcc66c9c7b74947c499260ab6be9e9/airflow/bin/cli.py#L1118-L1138
# if we should check the directory with the plugin,
if self.reload_on_plugin_change:
# compare the previous and current contents of the directory
new_state = self._generate_plugin_state()
# If changed, wait until its content is fully saved.
if new_state != self._last_plugin_state:
self.log.debug(
'[%d / %d] Plugins folder changed. The gunicorn will be restarted the next time the '
'plugin directory is checked, if there is no change in it.',
num_ready_workers_running, num_workers_running
)
self._restart_on_next_plugin_check = True
self._last_plugin_state = new_state
elif self._restart_on_next_plugin_check:
self.log.debug(
'[%d / %d] Starts reloading the gunicorn configuration.',
num_ready_workers_running, num_workers_running
)
self._restart_on_next_plugin_check = False
self._last_refresh_time = time.time()
self._reload_gunicorn()
def _generate_plugin_state(self):
"""
Generate dict of filenames and last modification time of all files in settings.PLUGINS_FOLDER
directory.
"""
if not settings.PLUGINS_FOLDER:
return {}
all_filenames = []
for (root, _, filenames) in os.walk(settings.PLUGINS_FOLDER):
all_filenames.extend(os.path.join(root, f) for f in filenames)
plugin_state = {f: self._get_file_hash(f) for f in sorted(all_filenames)}
return plugin_state
它正在通过调用 os.walk(settings.PLUGINS_FOLDER) 函数生成要检查的文件。
同时 gcsfuse 决定删除这些文件的一部分
并且发生错误 - 找不到文件。
所以禁用 webserver.reload_on_plugin_change 是可行的 - 但这个选项真的很方便,所以我会为谷歌创建错误票