插件模板Workflow HooksHera: Argo 工作流的一个新的 Python SDK调试暂停Pod 名称包括模板名称SSO+RBAC Namespace Delegation将默认执行器更改为 Emissary支持 Java 和 Python 客户端库升级到 v3.3翻译自关注
摘要: Argo Workflows v3.3 发布,支持插件、调试模式、修改默认执行器,多租户支持、引入 Python SDK
新特性:插件模板允许任何开发人员使用任何语言编写扩展到他们的工作流
新特性:使用工作流钩子基于条件执行模板
新的 SDK: Hera 是一个新的用于指定 Argo 工作流的 Python SDK
新特性:使用ARGO_DEBUG_PAUSE将任务置于调试模式
增强:Pod 名称包括模板名称
增强:多租户支持SSO+RBAC
增强:将默认执行器更改为Emissary
增强:Java 和 Python 客户端库加入了核心的 Argo 工作流代码库
插件模板目前,工作流中的每个任务要么运行一个 pod(例如“容器”或“脚本”),要么发出一个 HTTP 请求。插件模板允许您编写自己的 HTTP 服务器,插入您的任何工作流来完成任务。
插件的一大好处是,你不需要学习 Golang,也不需要等待 Argo 团队添加功能。你可以自己用 Python 来做,然后立即部署它,这样你就可以在工作流程中使用它了。
插件有很多用例:
发送 Slack 或电子邮件信息更新 Tello board启动 Spark EMR 或 Tekton job与 Airflow 或任何类似的系统集成向报表系统发送数据
插件被实现为 HTTP 服务器。下面是一个用 Python 编写的发送 Slack 消息的程序:
import jsonimport osfrom http.server import baseHTTPRequestHandler, HTTPServerfrom urllib.request import urlopen, Requestclass Plugin(baseHTTPRequestHandler): def args(self): return json.loads(self.rfile.read(int(self.headers.get('Content-Length')))) def reply(self, reply): self.send_response(200) self.end_headers() self.wfile.write(json.dumps(reply).encode("UTF-8")) def unsupported(self): self.send_response(404) self.end_headers() def do_POST(self): if self.path == '/api/v1/template.execute': args = self.args() if 'slack' in args['template'].get('plugin', {}): x = urlopen( Request(os.getenv('URL'), data=json.dumps({'text': args['template']['plugin']['slack']['text']}).encode())) if x.status != 200: raise Exception("not 200") self.reply({'node': {'phase': 'Succeeded', 'message': 'Slack message sent'}}) else: self.reply({}) else: self.unsupported()if __name__ == '__main__': httpd = HTTPServer(('', 7522), Plugin) httpd.serve_forever()
一旦你写了一个插件,就可以把插件打包成一个 configmap。用kubectl apply安装它,会自动加载插件:
argo executor-plugin build ./slack-pluginkubectl apply ./slack-plugin/slack-executor-plugin-configmap.yaml
最后,你可以使用新插件运行工作流:
apiVersion: argoproj.io/v1alpha1kind: Workflowmetadata: generateName: slack-example-spec: entrypoint: main templates: - name: main plugin: slack: text: "{{workflow.name}} finished!"
插件将改变用户使用 Argo 工作流构建平台的方式。了解更多关于插件模板在文档。
Workflow Hooks工作流钩子在满足配置的表达式时执行模板。工作流钩子就像带有条件的退出处理程序。钩子可以在工作流级和模板级配置。
钩子可以用来根据工作流状态的改变或步骤/任务状态的改变来配置通知,就像下面的例子:
apiVersion: argoproj.io/v1alpha1kind: Workflowmetadata: generateName: lifecycle-hook-spec: entrypoint: main hooks: exit: template: http running: expression: workflow.status == "Running" template: http templates: - name: main steps: - - name: step1 template: heads - name: heads container: image: alpine:3.6 command: [sh, -c] args: ["echo \"it was heads\""] - name: http http: url: [http://dummy.restapiexample.com/api/v1/employees](http://dummy.restapiexample.com/api/v1/employees)
Hera: Argo 工作流的一个新的 Python SDKHera (Hera -workflow)是一个新的高效的 SDK,用于在 Python 中指定 Argo 工作流。Hera 的目标是为 Python 开发人员提供一种更简单的方式来构建和提交实验工作流,特别是机器学习。
Hera 是围绕 Argo 工作流的两个核心概念构建的:
Task:保存用于远程执行的 Python 函数的对象工作流:任务的集合
下面是一个使用 Hera 的 DAG 工作流示例:
from hera.task import Taskfrom hera.workflow import Workflowfrom hera.workflow_service import WorkflowServicedef say(message: str): """ This can be anything as long as the Docker image satisfies the dependencies、You can import anything Python that is in your container e.g torch, tensorflow, scipy, biopython, etc - just provide an image to the task! """ print(message)ws = WorkflowService('my-argo-domain.com', 'my-argo-server-token')w = Workflow('diamond', ws)a = Task('A', say, [{'message': 'This is task A!'}])b = Task('B', say, [{'message': 'This is task B!'}])c = Task('C', say, [{'message': 'This is task C!'}])d = Task('D', say, [{'message': 'This is task D!'}])a.next(b).next(d) # a >> b >> da.next(c).next(d) # a >> c >> dw.add_tasks(a, b, c, d)w.submit()
调试暂停许多用户都要求改进调试功能。到目前为止,还不能将任务设置为调试模式。现在有了ARGO_DEBUG_PAUSE, Argo 将暂停你的任务执行器,这样你就可以调试它。指定一些环境变量,选择是否在任务之前或之后暂停,然后将kubctl exec放入容器以调试它。
Pod 名称包括模板名称在 v3.2 中,pod 名称是通过接受工作流名称并根据任务 ID 添加散列后缀来生成的。在 v3.3 中,pod 名称还包含模板的名称。这使得在使用kubectl get pod时,更容易看到哪个 pod 是哪个任务:
Before (v1):
NAME READY STATUS AGEcoinflip-jjzd8–1241984900 0/2 Completed 0coinflip-jjzd8–2544588297 0/2 Completed 0
After (v2):
NAME READY STATUS RESTARTScoinflip-lg6w4-flip-coin-1886328558 0/2 Completed 0coinflip-lg6w4-heads-661049787 0/2 Completed 0
这个特性是可选择的。增加POD_NAMES=v2标志启动控制器.
SSO+RBAC Namespace Delegation在 v3.2 中,必须在argo Namespace 中设置 SSO+RBAC 特性。这对于小团队来说很有效。但是,在每个团队都有自己的 Namespace 的多租户系统中,这可能会变得笨拙。
在 v3.3 中,我们支持在user Namespace 中设置 RBAC。这个更改允许每个团队设置自己的 RBAC,当有许多团队时,可以更容易地管理 RBAC。
将默认执行器更改为 EmissaryKubernetes 对 Docker 的支持正在消失见之前的帖子。我们将用Argo Emissary 执行器取代它。
Emissary 执行器提供几个优势:
比现有的执行器更安全比现有的执行器更快,甚至比 PNS(Process Namespace Sharing)执行器更快支持 ContainerSet 模板(允许您运行更快的步骤和降低成本)
支持新的“调试暂停”功能(帮助调试工作流程中的容器)
支持 Java 和 Python 客户端库您可能已经使用了社区维护的客户端库之一,将 Argo 工作流集成到您的应用程序中。然而,保持这些最新的总是一个挑战。
现在,我们在核心代码库中包含了 Java 和 Python 客户端库,这样它们就可以与 Argo 工作流同步进行维护和发布。我们的目标是确保它们始终是最新的和功能齐全的。
在 Github 这里找到这些Argo sdk。
升级到 v3.3查看GitHub 上最新的 Argo 工作流版本。
在升级到 3.3 版本之前,请确保在这里查看所有更改。
翻译自What’s new in Argo Workflows v3.3
关注本文首发于微信公众号【我的小碗汤】,扫左侧码关注,了解更多咨询,更有免费资源供您学习