02-kubeflow 简单测试

1、背景

当训练新的 ML 模型时,大多数据科学家和 ML 工程师会开发一些新的 Python 脚本或交互式 notebook,以进行数据提取和预处理,来构建用于训练模型的数据集;

然后创建几个其他脚本或 notebook 来尝试不同类型的模型或机器学习框架;

最后收集、调试指标,评估每个模型在测试数据集上的运行情况,来确定要部署到生产中的模型。

上图极大的简化了真正的机器学习工作流程,在实际开发过程中需要大量的人参与,但是除了最初开发该方法的工程师之外,其他人都无法轻易重复使用其中的内容。但是我们可以使用 KubeFlow Pipelines 来解决这些问题。

与其将数据准备、模型训练、模型验证和模型部署视为特定模型中的单一代码库,不如将其视为一系列独立的模块化步骤,让每个步骤都专注于具体任务。下图是 KubeFlow Pipelines。

将机器学习工作流程建模为机器学习流水线有很多好处:

  • 自动化:通过消除手动干预的需求,我们可以安排流水线按照需求重新训练模型,从而确保模型能够适应随时间变化的训练数据。
  • 重复使用:由于流水线的步骤与流水线本身是分开的,所以我们可以轻松地在多个流水线中重复使用单个步骤。
  • 重复性:任何数据科学家或工程师都可以通过手动工作流程重新运行流水线,这样就很清楚需要以什么顺序运行不同的脚本或 notebook。
  • 环境解耦:通过保持机器学习流水线的步骤解耦,我们可以在不同类型的环境中运行不同的步骤。例如,某些数据准备步骤可能需要在大型计算机集群上运行,而模型部署步骤则可能在单个计算机上运行。

2、什么是 Kubeflow

Kubeflow 是一个基于 Kubernetes 的开源平台,旨在简化机器学习系统的开发和部署。Kubeflow 在官方文档中被称为 “Kubernetes 机器学习工具包”,它由几个组件(component)组成,这些组件跨越了机器学习开发生命周期的各个步骤,包括了 notebook developent environment、超参数调试、功能管理、模型服务以及 ML Pipelines。Kubeflow 中央仪表板如下图所示

3、Kubeflow Pipelines

Kubeflow 中的流水线由一个或多个组件(component)组成,它们代表流水线中的各个步骤。每个组件都在其自己的 Docker 容器中运行,这意味着流水线中的每个步骤都具有自己的一组依赖关系,与其他组件无关。

我们需要:

  • 对于开发的每个组件,我们创建一个单独的 Docker 镜像,该镜像会接收输入、执行操作、输出。
  • 我们要有一个单独的 Python 脚本,pipeline.py 脚本会从每个 Docker 镜像创建 Pipelines 组件,然后使用这些组件构造流水线(Pipeline)。

我们总共创建四个组件:

  • preprocess-data:该组件将从 sklearn.datasets 中加载 Boston Housing 数据集,然后将其拆分为训练集和测试集。
  • train-model:该组件将训练模型,以使用“Boston Housing”数据集来预测 Boston 房屋的中位数。
  • test-model:该组件会在测试数据集上计算并输出模型的均方误差。
  • deploy-model:在本文中,我们不会专注于模型的部署和服务,因此该组件将仅记录一条消息,指出它正在部署模型。实际情况下,这可能是将任何模型部署到 QA 或生产环境的通用组件。

4、环境

我们可以在任何安装了 Kubeflow 的 Kubernetes 集群上运行示例代码。本地唯一需要的依赖是 Kubeflow Pipelines SDK,使用 pip 安装:pip install kfp

5、具体步骤

Kubeflow 示例共分以下几步。

  1. 制作流水线中组件所使用的镜像。
  2. 编写组件的脚本。
  3. 使用流水线接口把组件的脚本组装起来。

5.1、制作docker 镜像

preprocess-data 镜像

第一个是 preprocess-data 用到的镜像。

首先创建一个 Python 脚本,脚本作用是用 sklearn.datasets 加载 Boston Housing 数据集,用 Sci-kit Learn.train_test_split 函数将此数据集分为训练集和测试集,然后用 np.save 将数据集保存到磁盘。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# preprocess.py
import numpy as np
from sklearn import datasets
from sklearn.model_selection import train_test_split

def _preprocess_data():
X, y = datasets.load_boston(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33)
np.save('x_train.npy', X_train)
np.save('x_test.npy', X_test)
np.save('y_train.npy', y_train)
np.save('y_test.npy', y_test)


if __name__ == '__main__':
print('Preprocessing data...')
_preprocess_data()

有了脚本之后,我们需要创建一个执行该脚本的 Docker 镜像,Dockerfile 如下:

1
2
3
4
5
FROM python:3.7-slim
WORKDIR /app
RUN pip install -U scikit-learn numpy
COPY preprocess.py ./preprocess.py
ENTRYPOINT [ "python", "preprocess.py" ]

然后将镜像上传到 Kubernetes 集群可以访问的镜像仓库。

1
2
3
4
5
# 构建镜像
docker build -t lepeng/boston_pipeline_preprocess:v1 -f Dockerfile .

# 推送镜像到远程仓库
docker push lepeng/boston_pipeline_preprocess:v1

train 镜像

第二个是 train 用到的镜像。

创建一个 Python 脚本,该脚本使用 Sci-kit Learn 来训练回归模型。这类似于 preprocess-data 的 Python 脚本,其中最大的区别是这里用 argparse 来接受训练数据的文件路径以作为命令行参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# train.py
import argparse
import joblib
import numpy as np
from sklearn.linear_model import SGDRegressor

def train_model(x_train, y_train):
x_train_data = np.load(x_train)
y_train_data = np.load(y_train)

model = SGDRegressor(verbose=1)
model.fit(x_train_data, y_train_data)

joblib.dump(model, 'model.pkl')


if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--x_train')
parser.add_argument('--y_train')
args = parser.parse_args()
train_model(args.x_train, args.y_train)

创建执行该脚本的 Docker 镜像,Dockerfile 如下:

1
2
3
4
5
FROM python:3.7-slim
WORKDIR /app
RUN pip install -U scikit-learn numpy
COPY train.py ./train.py
ENTRYPOINT [ "python", "train.py" ]
1
2
docker build -t lepeng/boston_pipeline_train:v1  -f Dockerfile .
docker push lepeng/boston_pipeline_train:v1

test 镜像

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import argparse
import joblib
import numpy as np
from sklearn.metrics import mean_squared_error

def test_model(x_test, y_test, model_path):
x_test_data = np.load(x_test)
y_test_data = np.load(y_test)

model = joblib.load(model_path)
y_pred = model.predict(x_test_data)

err = mean_squared_error(y_test_data, y_pred)

with open('output.txt', 'a') as f:
f.write(str(err))


if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--x_test')
parser.add_argument('--y_test')
parser.add_argument('--model')
args = parser.parse_args()
test_model(args.x_test, args.y_test, args.model)

Dockerfile 如下:

1
2
3
4
5
FROM python:3.7-slim
WORKDIR /app
RUN pip install -U scikit-learn numpy
COPY test.py ./test.py
ENTRYPOINT [ "python", "test.py" ]
1
2
docker build -t lepeng/boston_pipeline_test:v1  -f Dockerfile .
docker push lepeng/boston_pipeline_test:v1

deploy_model 镜像

1
2
3
4
5
6
7
8
9
10
11
12
13
# deploy_model.py
import argparse

def deploy_model(model_path):
# 改成你自己
print(f'deploying model {model_path}...')


if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--model')
args = parser.parse_args()
deploy_model(args.model)

Dockerfile 如下:

1
2
3
4
FROM python:3.7-slim
WORKDIR /app
COPY deploy_model.py ./deploy_model.py
ENTRYPOINT [ "python", "deploy_model.py" ]
1
2
docker build -t lepeng/boston_pipeline_deploy_model:v1  -f Dockerfile .
docker push lepeng/boston_pipeline_deploy_model:v1

5.2、构建组件

现在开始构建流水线所需的组件。每个组件都要定义为一个返回 ContainerOp 类型的对象(object)。preprocess-data 组件定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# pipeline.py
import kfp
from kfp import dsl


def preprocess_op():
return dsl.ContainerOp(
name='Preprocess Data',
image='wintfru/boston_pipeline_preprocess:v1', # 上文 Dockerfile 定义的 Docker 镜像的名称
arguments=[],
file_outputs={ # file_outputs 参数指定了 Python 脚本组件保存到磁盘的四个 .npy 文件的文件路径。通过将这四个文件指定为“File Output”,我们可以将它们用于流水线中的其他组件。
'x_train': '/app/x_train.npy',
'x_test': '/app/x_test.npy',
'y_train': '/app/y_train.npy',
'y_test': '/app/y_test.npy',
}
)


def train_op(x_train, y_train):
return dsl.ContainerOp(
name='Train Model',
image='wintfru/boston_pipeline_train:v1',
arguments=[ # arguments 指定接受参数,x_train 和 y_train,其将作为命令行参数传递给容器并进行解析。
'--x_train', x_train,
'--y_train', y_train
],
file_outputs={
'model': '/app/model.pkl'
}
)


def test_op(x_test, y_test, model):
return dsl.ContainerOp(
name='Test Model',
image='wintfru/boston_pipeline_test:v1',
arguments=[
'--x_test', x_test,
'--y_test', y_test,
'--model', model
],
file_outputs={
'mean_squared_error': '/app/output.txt'
}
)


def deploy_model_op(model):
return dsl.ContainerOp(
name='Deploy Model',
image='wintfru/boston_pipeline_deploy:v1',
arguments=[
'--model', model
]
)

注意:在组件中对文件路径进行硬编码不是一个很好的做法,就如上面的代码中那样,这要求创建组件定义的人员要了解有关组件实现的特定细节。这会让组件接受文件路径作为命令行参数更加干净,定义组件的人员也可以完全控制输出文件的位置。

5.3、定义流水线

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# pipeline.py
@dsl.pipeline(
name='Boston Housing Pipeline',
description='An example pipeline that trains and logs a regression model.'
)
def boston_pipeline():
_preprocess_op = preprocess_op() # 要访问 preprocess-data 组件的输出,可以使用 _preprocess_op.outputs['NAME_OF_OUTPUT']。

_train_op = train_op( # 默认情况下,当我们从组件访问 file_outputs 时,我们获取的是文件内容而不是文件路径。在本文中,由于这些不是纯文本文件,因此我们不能仅将文件内容作为命令行参数传递到 Docker 容器组件中。要访问文件路径,我们需要使用 dsl.InputArgumentPath() 并传入组件输出。
dsl.InputArgumentPath(_preprocess_op.outputs['x_train']),
dsl.InputArgumentPath(_preprocess_op.outputs['y_train'])
).after(_preprocess_op)

_test_op = test_op(
dsl.InputArgumentPath(_preprocess_op.outputs['x_test']),
dsl.InputArgumentPath(_preprocess_op.outputs['y_test']),
dsl.InputArgumentPath(_train_op.outputs['model'])
).after(_train_op)

deploy_model_op(
dsl.InputArgumentPath(_train_op.outputs['model'])
).after(_test_op)


if __name__ == '__main__':
# 提交运行pipelines有2种方法:
# 第一种:使用sdk提交pipelines至服务中心,直接可以在UI中查看pipelines实验运行进度。
client = kfp.Client()
client.create_run_from_pipeline_func(boston_pipeline, arguments={})

# 第二种:把脚本编译成 Kubernetes 的 yaml 文件,然后提交。打包成 yaml 有两种实现方式:
# 第一种是在命令行使用 dsl-compile --py pipeline.py --output pipeline.tar.gz 命令生成 yaml 文件并打包
# 第二种实现方式是使用 sdk 打包,打包命令如下:
# kfp.compiler.Compiler().compile(boston_pipeline, __file__ + '.yaml')
# 运行方式也许有两种:
# 第一种 通过 kubeflow 的 ui 界面上传 yaml 文件来执行任务。
# 第二种使用如下命令进行提交
# exp = client.get_experiment(experiment_name=pipeline_name)
# client.run_pipeline(exp.id, job_name, __file__ + '.yaml')

流水线由一个注解 @dsl.pipeline 修饰的 Python 函数定义。在函数内,我们可以像使用其他任何函数一样使用组件。为了运行流水线,我们创建一个 kfp.Client 对象,再调用 create_run_from_pipeline_func 函数,并传入定义流水线的函数。

现在,如果我们执行 pipeline.py 并导航到 Kubeflow 中央仪表板中的“Pipelines UI”,我们可以看到流水线图中显示的所有四个组件。我们也可以查看每个组件的输入和输出结果以及控制台日志等。


02-kubeflow 简单测试
https://flepeng.github.io/042-云原生-04-kubeflow-02-kubeflow-简单测试/
作者
Lepeng
发布于
2023年8月28日
许可协议