我创建了a script 用于将更新的 Docker 镜像部署到 ECS 上的暂存服务,以便相应的任务定义引用当前版本的 Docker 镜像。我不确定我是否遵循最佳做法,因此欢迎提供反馈。
要使脚本正常工作,您需要备用 ECS 实例或 deploymentConfiguration.minimumHealthyPercent 值,以便 ECS 可以窃取实例以将更新的任务定义部署到。
我的算法是这样的:
- 使用 Git 修订标记任务定义中与容器对应的 Docker 映像。
- 将 Docker 镜像标签推送到相应的注册表。
- 取消注册任务定义系列中的旧任务定义。
- 注册新的任务定义,现在指的是标记有当前 Git 版本的 Docker 映像。
- 更新服务以使用新的任务定义。
我的代码贴在下面:
部署-ecs
#!/usr/bin/env python3
import subprocess
import sys
import os.path
import json
import re
import argparse
import tempfile
_root_dir = os.path.abspath(os.path.normpath(os.path.dirname(__file__)))
sys.path.insert(0, _root_dir)
from _common import *
def _run_ecs_command(args):
run_command(['aws', 'ecs', ] + args)
def _get_ecs_output(args):
return json.loads(run_command(['aws', 'ecs', ] + args, return_stdout=True))
def _tag_image(tag, qualified_image_name, purge):
log_info('Tagging image \'{}\' as \'{}\'...'.format(
qualified_image_name, tag))
log_info('Pulling image from registry in order to tag...')
run_command(
['docker', 'pull', qualified_image_name], capture_stdout=False)
run_command(['docker', 'tag', '-f', qualified_image_name, '{}:{}'.format(
qualified_image_name, tag), ])
log_info('Pushing image tag to registry...')
run_command(['docker', 'push', '{}:{}'.format(
qualified_image_name, tag), ], capture_stdout=False)
if purge:
log_info('Deleting pulled image...')
run_command(
['docker', 'rmi', '{}:latest'.format(qualified_image_name), ])
run_command(
['docker', 'rmi', '{}:{}'.format(qualified_image_name, tag), ])
def _register_task_definition(task_definition_fpath, purge):
with open(task_definition_fpath, 'rt') as f:
task_definition = json.loads(f.read())
task_family = task_definition['family']
tag = run_command([
'git', 'rev-parse', '--short', 'HEAD', ], return_stdout=True).strip()
for container_def in task_definition['containerDefinitions']:
image_name = container_def['image']
_tag_image(tag, image_name, purge)
container_def['image'] = '{}:{}'.format(image_name, tag)
log_info('Finding existing task definitions of family \'{}\'...'.format(
task_family
))
existing_task_definitions = _get_ecs_output(['list-task-definitions', ])[
'taskDefinitionArns']
for existing_task_definition in [
td for td in existing_task_definitions if re.match(
r'arn:aws:ecs+:[^:]+:[^:]+:task-definition/{}:\d+'.format(
task_family),
td)]:
log_info('Deregistering task definition \'{}\'...'.format(
existing_task_definition))
_run_ecs_command([
'deregister-task-definition', '--task-definition',
existing_task_definition, ])
with tempfile.NamedTemporaryFile(mode='wt', suffix='.json') as f:
task_def_str = json.dumps(task_definition)
f.write(task_def_str)
f.flush()
log_info('Registering task definition...')
result = _get_ecs_output([
'register-task-definition',
'--cli-input-json', 'file://{}'.format(f.name),
])
return '{}:{}'.format(task_family, result['taskDefinition']['revision'])
def _update_service(service_fpath, task_def_name):
with open(service_fpath, 'rt') as f:
service_config = json.loads(f.read())
services = _get_ecs_output(['list-services', ])[
'serviceArns']
for service in [s for s in services if re.match(
r'arn:aws:ecs:[^:]+:[^:]+:service/{}'.format(
service_config['serviceName']),
s
)]:
log_info('Updating service with new task definition...')
_run_ecs_command([
'update-service', '--service', service,
'--task-definition', task_def_name,
])
parser = argparse.ArgumentParser(
description="""Deploy latest Docker image to staging server.
The task definition file is used as the task definition, whereas
the service file is used to configure the service.
""")
parser.add_argument(
'task_definition_file', help='Your task definition JSON file')
parser.add_argument('service_file', help='Your service JSON file')
parser.add_argument(
'--purge_image', action='store_true', default=False,
help='Purge Docker image after tagging?')
args = parser.parse_args()
task_definition_file = os.path.abspath(args.task_definition_file)
service_file = os.path.abspath(args.service_file)
os.chdir(_root_dir)
task_def_name = _register_task_definition(
task_definition_file, args.purge_image)
_update_service(service_file, task_def_name)
_common.py
>
import sys
import subprocess
__all__ = ['log_info', 'handle_error', 'run_command', ]
def log_info(msg):
sys.stdout.write('* {}\n'.format(msg))
sys.stdout.flush()
def handle_error(msg):
sys.stderr.write('* {}\n'.format(msg))
sys.exit(1)
def run_command(
command, ignore_error=False, return_stdout=False, capture_stdout=True):
if not isinstance(command, (list, tuple)):
command = [command, ]
command_str = ' '.join(command)
log_info('Running command {}'.format(command_str))
try:
if capture_stdout:
stdout = subprocess.check_output(command)
else:
subprocess.check_call(command)
stdout = None
except subprocess.CalledProcessError as err:
if not ignore_error:
handle_error('Command failed: {}'.format(err))
else:
return stdout.decode() if return_stdout else None