kubeflow-2-pipeline的介绍和应⽤案例
1 概述
1.1 pipeline的功能
Kubeflow中默认提供的⼏个Pipeline都要基于GCP Google的云平台,但是希望在⾃⼰的集部署,所以根据官⽹,总结了⼀些构建Pipeline的流程。
⾸先,数据科学家本⾝就是在提数据,训练,保存模型,部署模型⼏个重要环节中⼯作,Pipeline提供了⼀个很友好的UI来给数据科学家来定义整个过程,⽽且整个过程是运⾏在K8S集上的。这对于⼀些对资源利⽤率有要求的公司,统⼀⼀层K8S来服务在线的应⽤和这些机器学习,还是很不错的。
通过定义这个Pipeline,就可以定义环环相扣的机器学习Workflow,市⾯是有很多类似的产品的,例如阿⾥云,腾讯云都有,但是都不全是基于K8S来做的。然后Pipeline也提供了相关的⼯具来定义这个Pipeline,不过都是Python的,当然这个对于数据科学家来说,不会是什么问题。
最后就是,Pipeline在Kubeflow的⽣态内,结合Notebook,数据科学家甚⾄都可以不⽤跳出去Kubeflow来做其他操作,⼀站式e2e的就搞定了。
pipelines是⼀个机器学习⼯作流的抽象概念,这个⼯作流可以⼩到函数的过程、也可以⼤到机器学习从数据加载、变换、清洗、特征构建、模型训练等多个环节。
在kubeflow中,该组件能以ui界⾯的⽅式记录、交互、反馈实验、任务和每⼀次运⾏。pipelines各流程组件构建成功后,会依据事先定义好的组件依赖关系构建DAG(有向⽆环图)。
在pipelines构建各流程组件前,需要将对应流程的业务代码打包成docker镜像⽂件(kubeflow中运⾏业务代码均以容器的⽅式实现)。1.2 构建pipeline的步骤
如何构建⾃⼰的Pipeine并且上传,主要包括⼏个步骤:
(1)安装专门的SDK
(2)Python定义好Pipeline
(3)SDK构建pipeline的包,最后通过UI上传。
根据已有代码构建pipeline组件。
⽬前提交运⾏pipelines有2种⽅法,⼆者本质都是使⽤sdk编译pipelines组件。
(1)在notebook中使⽤sdk提交pipelines⾄服务中⼼,直接可以在UI中查看pipelines实验运⾏进度。
(2)将pipelines组件打成zip包通过ui上传⾄服务中⼼,同样可以在UI查看实验运⾏进度。
kubeflow虽然开源了⼀段时间,但是⼀些功能与google云平台耦合依然较重,官⽅提供的例⼦在本地k8s集中使⽤有诸多坑,也不⽅便新⼿理解。以tensorflow经典的⼿写数字识别算法为例,在本地k8s集实践pipelines。
2 安装配置环境
3 pipeline sdk
3.1 @kfp.dslponent
component(func)
Decorator for component functions that returns a ContainerOp.
**********************************************************
例如:
@kfp.dslponent
def echo_op():
return kfp.dsl.ContainerOp(
name='echo',
image='library/bash:4.4.23',
command=['sh', '-c'],
arguments=['echo "hello world"']
namespace是干嘛的
)
**********************************************************
3.2 kfp.dsl.ContainerOp
表⽰由容器实现镜像的操作
class ContainerOp(BaseOp)
Represents an op implemented by a container image.
Represents an op implemented by a container image.
参数说明:
常⽤(1)name: the name of the op.
It does not have to be unique within a pipeline,
because the pipeline will generates a unique new name in case of conflicts.
常⽤(2)image: the container image name, such as 'python:3.5-jessie'
常⽤(3)command: the command to run in the container.
If None, uses default CMD in defined in container.
常⽤(4)arguments: the arguments of the command.
The command can include "%s" and supply a PipelineParam as the string replacement.
For example, ('echo %s' % input_param).
At container run time the argument will be 'echo param_value'.
(5)init_containers: the list of `UserContainer` objects
describing the InitContainer to deploy before the `main` container.
(6)sidecars: the list of `Sidecar` objects
describing the sidecar containers to deploy together with the `main` container.
(7)container_kwargs: the dict of additional keyword arguments to pass to the op's `Container` definition.
(8)artifact_argument_paths: Optional. Maps input artifact arguments (values or references) to the local file paths where they'll be placed.
At pipeline run time, the value of the artifact argument is saved to a local file with specified path.
This parameter is only needed when the input file paths are hard-coded in the program.
Otherwise it's better to pass input artifact placement paths by including artifact arguments in the command-line using the InputArgumentPath class instance s.
常⽤(9)file_outputs: Maps output names to container local output file paths.
将输出名称映射到容器本地输出⽂件路径。
The system will take the data from those files and will make it available for passing to downstream tasks.
系统将从这些⽂件中获取数据,并将其传递给下游任务。
For each output in the file_outputs map there will be a corresponding output reference available in the task.outputs dictionary.
对于file_outputs map中的每个输出,输出引⽤对应于task.outputs字典。
These output references can be passed to the other tasks as arguments.
这些输出引⽤可以作为参数传递给其他任务。
The following output names are handled specially by the frontend and backend: "mlpipeline-ui-metadata" and "mlpipeline-metrics".
(10)output_artifact_paths: Deprecated. Maps output artifact labels to local artifact file paths. Deprecated: Use file_outputs instead. It now supports big data outputs.
(11)is_exit_handler: Deprecated. This is no longer needed.
(12)pvolumes: Dictionary for the user to match a path on the op's fs with a
V1Volume or it inherited type.
E.g {"/my/path": vol, "/mnt": other_op.pvolumes["/output"]}.
**********************************************************
例如:
from kfp import dsl|
from dels import V1EnvVar, V1SecretKeySelector
@dsl.pipeline(
name='foo',
description='hello world')
def foo_pipeline(tag: str, pull_image_policy: str):
# any attributes can be parameterized (both serialized string or actual PipelineParam)
op = dsl.ContainerOp(name='foo',
image='busybox:%s' % tag,
# pass in init_container list
init_containers=[dsl.UserContainer('print', 'busybox:latest', command='echo "hello"')],
# pass in sidecars list
sidecars=[dsl.Sidecar('print', 'busybox:latest', command='echo "hello"')],
# pass in k8s container kwargs
container_kwargs={'env': [V1EnvVar('foo', 'bar')]},
)
)
# set `imagePullPolicy` property for `container` with `PipelineParam`
# add sidecar with parameterized image tag
# sidecar follows the argo sidecar swagger spec
op.add_sidecar(dsl.Sidecar('redis', 'redis:%s' % tag).set_image_pull_policy('Always'))
**********************************************************
3.3 @kfp.dsl.pipeline
pipeline(name:Union[str, NoneType]=None, description:Union[str, NoneType]=None, output_directory:Union[str, NoneType]=None)
Decorator of pipeline functions.修饰pipeline函数
**********************************************************
例如
@pipeline(
name='my awesome pipeline',
description='Is it really awesome?'
output_directory='gs://my-bucket/my-output-path'
)
def my_pipeline(a: PipelineParam, b: PipelineParam):
...
**********************************************************
常⽤(1)name: The pipeline name. Default to a sanitized version of the function name.
常⽤(2)description: Optionally, a human-readable description of the pipeline.
常⽤(3)output_directory: The root directory to generate input/output URI under this pipeline.
This is required if input/output URI placeholder is used in this pipeline.
3.4 Compilerpile
将⼀个⽤kfp.dsl.pipeline装饰的⼯作流⽅法,编译成⼀个k8s任务.yaml配置的压缩⽂件:
class Compiler(object):
"""DSL Compiler.
It compiles DSL pipeline functions into workflow yaml.
**********************************************************
例如:
⾸先定义pipeline函数
@dsl.pipeline(
name='name',
description='description'
)
def my_pipeline(a: int = 1, b: str = "default value"):
...
然后编译
Compiler()pile(my_pipeline, 'path/to/workflow.yaml')
**********************************************************
def compile(self, pipeline_func, package_path, type_check=True, pipeline_conf: dsl.PipelineConf = None):
"""Compile the given pipeline function into workflow yaml.
Args:
常⽤(1)pipeline_func: pipeline functions with @dsl.pipeline decorator.
常⽤(2)package_path: the output file path. for example, "~/"
(3)type_check: whether to enable the type check or not, default: False.
(4)pipeline_conf: PipelineConf instance. Can specify op transforms, image pull secrets and other pipeline-level configuration options. Overrides any configu ration that may be set by the pipeline.
"""
3.5 ate_experiment
构建pipeline实验,返回实验编号和名称
create_experiment(self, name, description=None, namespace=None)
Create a new experiment.
参数:
(1)name: The name of the experiment.
(2)description: Description of the experiment.
(3)namespace: Kubernetes namespace where the experiment should be created. For single user deployment, leave it as None;
For multi user, input a namespace where the user is authorized.
返回:
An Experiment object. Most important field is id.
***************************************************************
例如
client = kfp.Client()
try:
experiment = ate_experiment("shiyan")
except Exception:
experiment = _experiment(experiment_name="shiyan")
print(experiment)
输出
Experiment link here
{'created_at': datetime.datetime(2021, 2, 11, 2, 39, 42, tzinfo=tzlocal()),
'description': None,
'id':'9603479a-0e30-4975-b634-eee298c46975',
'name':'shiyan',
'resource_references': None,
'storage_state': None}
3.6 kfp.Client.run_pipeline
⼀次实验的运⾏:

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。