00-kubeflow pipeline dsl

SDK 1.8.13 官网地址:https://kubeflow-pipelines.readthedocs.io/en/1.8.13/source/kfp.dsl.html

kfp.dsl package

kfp.dsl.RUN_ID_PLACEHOLDER

kfp.dsl.EXECUTION_ID_PLACEHOLDER

kfp.dsl.BaseOp

source

1
2
3
4
5
6
7
8
9
10
class kfp.dsl.BaseOp(name: str, init_containers: List[kfp.dsl._container_op.UserContainer] = None, sidecars: List[kfp.dsl._container_op.Sidecar] = None, is_exit_handler: bool = False)
Bases: object

Base operator.

Parameters:
name – the name of the op. It does not have to be unique within a pipeline because the pipeline will generates a unique new name in case of conflicts.
init_containers – the list of UserContainer objects describing the InitContainer to deploy before the main container.
sidecars – the list of Sidecar objects describing the sidecar containers to deploy together with the main container.
is_exit_handler – Deprecated.

add_affinity(affinity: kubernetes.client.models.v1_affinity.V1Affinity)[source]

Add K8s Affinity.

Parameters:
  • affinity – Kubernetes affinity For detailed spec, check affinity definition
  • https – //github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_affinity.py

Example:

1
2
3
4
5
6
7
8
V1Affinity(
node_affinity=V1NodeAffinity(
required_during_scheduling_ignored_during_execution=V1NodeSelector(
node_selector_terms=[V1NodeSelectorTerm(
match_expressions=[V1NodeSelectorRequirement(
key='beta.kubernetes.io/instance-type',
operator='In',
values=['p2.xlarge'])])])))

add_init_container(init_container: kfp.dsl._container_op.UserContainer)[source]

Add a init container to the Op.

Parameters:init_container – UserContainer object.

add_node_selector_constraint(label_name: Union[str, kfp.dsl._pipeline_param.PipelineParam], value: Union[str, kfp.dsl._pipeline_param.PipelineParam])[source]

Add a constraint for nodeSelector.

Each constraint is a key-value pair label. For the container to be eligible to run on a node, the node must have each of the constraints appeared as labels.

Parameters:
  • label_name (Union[str, PipelineParam]) – The name of the constraint label.
  • value (Union[str, PipelineParam]) – The value of the constraint label.

add_pod_annotation(name: str, value: str)[source]

Adds a pod’s metadata annotation.

Parameters:
  • name – The name of the annotation.
  • value – The value of the annotation.

add_pod_label(name: str, value: str)[source]

Adds a pod’s metadata label.

Parameters:
  • name – The name of the label.
  • value – The value of the label.

add_sidecar(sidecar: kfp.dsl._container_op.Sidecar)[source]

Add a sidecar to the Op.

Parameters:sidecar – SideCar object.

add_toleration(tolerations: kubernetes.client.models.v1_toleration.V1Toleration)[source]

Add K8s tolerations.

Parameters:tolerations – Kubernetes toleration For detailed spec, check toleration definition https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_toleration.py

add_volume(volume)[source]

Add K8s volume to the container.

Parameters:
  • volume – Kubernetes volumes For detailed spec, check volume definition
  • https – //github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_volume.py

after(*ops)[source]

Specify explicit dependency on other ops.

apply(mod_func)[source]

Applies a modifier function to self.

The function should return the passed object. This is needed to chain “extention methods” to this class.

Example:

1
2
3
4
5
6
7
from kfp.gcp import use_gcp_secret
task = (
train_op(...)
.set_memory_request('1G')
.apply(use_gcp_secret('user-gcp-sa'))
.set_memory_limit('2G')
)

attrs_with_pipelineparams = [‘node_selector’, ‘volumes’, ‘pod_annotations’, ‘pod_labels’, ‘num_retries’, ‘init_containers’, ‘sidecars’, ‘tolerations’]

inputs

List of PipelineParams that will be converted into input parameters (io.argoproj.workflow.v1alpha1.Inputs) for the argo workflow.

set_caching_options(enable_caching: bool) → kfp.dsl._container_op.BaseOp[source]

Sets caching options for the Op.

Parameters:enable_caching – Whether or not to enable caching for this task.
Returns:Self return to allow chained setting calls.

set_display_name(name: str)[source]

set_retry(num_retries: int, policy: Optional[str] = None, backoff_duration: Optional[str] = None, backoff_factor: Optional[float] = None, backoff_max_duration: Optional[str] = None)[source]

Sets the number of times the task is retried until it’s declared failed.

Parameters:
  • num_retries – Number of times to retry on failures.
  • policy – Retry policy name.
  • backoff_duration – The time interval between retries. Defaults to an immediate retry. In case you specify a simple number, the unit defaults to seconds. You can also specify a different unit, for instance, 2m (2 minutes), 1h (1 hour).
  • backoff_factor – The exponential backoff factor applied to backoff_duration. For example, if backoff_duration=”60” (60 seconds) and backoff_factor=2, the first retry will happen after 60 seconds, then after 120, 240, and so on.
  • backoff_max_duration – The maximum interval that can be reached with the backoff strategy.

set_timeout(seconds: int)[source]

Sets the timeout for the task in seconds.

Parameters:seconds – Number of seconds.

kfp.dsl.Condition

source

1
2
3
4
5
6
7
8
9
classk fp.dsl.Condition(_condition_, _name=None_)

Bases: `kfp.dsl._ops_group.OpsGroup`

Represents an condition group with a condition.

Parameters:
condition (ConditionOperator) – the condition.
name (str) – name of the condition

Example:

1
2
3
with Condition(param1=='pizza', '[param1 is pizza]'): 
op1 = ContainerOp(…)
op2 = ContainerOp(…)

Methods:

1
2
3
4
5
after(*ops)
Specify explicit dependency on other ops.


remove_op_recursive(op)

kfp.dsl.ContainerOp 操作容器的重要 API

source

类 ContainerOp 将类 Container 中所有的方法都添加到 cls 中了,所以类 ContainerOP 可以使用类 Container 中所有的方法。如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class ContainerOp(BaseOp):
def __init__(...)
self._container = Container(
image=image, args=arguments, command=command, **container_kwargs)
# decorator func to proxy a method in `Container` into `ContainerOp`
def _proxy(proxy_attr):
"""Decorator func to proxy to ContainerOp.container."""

def _decorated(*args, **kwargs):
# execute method
ret = getattr(self._container, proxy_attr)(*args, **kwargs)
if ret == self._container:
return self
return ret

return deprecation_warning(_decorated, proxy_attr, proxy_attr)

# iter thru container and attach a proxy func to the container method
for attr_to_proxy in dir(self._container):
func = getattr(self._container, attr_to_proxy)
# ignore private methods, and bypass method overrided by subclasses.
if (not hasattr(self, attr_to_proxy) and
hasattr(func, '__call__') and (attr_to_proxy[0] != '_') and
(attr_to_proxy not in ignore_set)):
# only proxy public callables
setattr(self, attr_to_proxy, _proxy(attr_to_proxy))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
classkfp.dsl.ContainerOp(name: str, image: str, command: Union[str, List[str], None] = None, arguments: Union[str, int, float, bool, kfp.dsl._pipeline_param.PipelineParam, List[T], None] = None, init_containers: Optional[List[kfp.dsl._container_op.UserContainer]] = None, sidecars: Optional[List[kfp.dsl._container_op.Sidecar]] = None, container_kwargs: Optional[Dict[KT, VT]] = None, artifact_argument_paths: Optional[List[kfp.dsl._container_op.InputArgumentPath]] = None, file_outputs: Optional[Dict[str, str]] = None, output_artifact_paths: Optional[Dict[str, str]] = None, is_exit_handler: bool = False, pvolumes: Optional[Dict[str, kubernetes.client.models.v1_volume.V1Volume]] = None)

Bases: kfp.dsl._container_op.BaseOp

Represents an op implemented by a container image.

Parameters:
name – the name of the op. It does not have to be unique within a pipeline because the pipeline will generates a unique new name in case of conflicts.
image – the container image name, such as ‘python:3.5-jessie’
command – the command to run in the container. If None, uses default CMD in defined in container.
arguments – the arguments of the command. The command can include “%s” and supply a PipelineParam as the string replacement. For example, (‘echo %s’ % input_param). At container run time the argument will be ‘echo param_value’.
init_containers – the list of UserContainer objects describing the InitContainer to deploy before the main container.
sidecars – the list of Sidecar objects describing the sidecar containers to deploy together with the main container.
container_kwargs – the dict of additional keyword arguments to pass to the op’s Container definition.
artifact_argument_paths – Optional. Maps input artifact arguments (values or references) to the local file paths where they’ll be placed. At pipeline run time, the value of the artifact argument is saved to a local file with specified path. This parameter is only needed when the input file paths are hard-coded in the program. Otherwise it’s better to pass input artifact placement paths by including artifact arguments in the command-line using the InputArgumentPath class instances.
file_outputs – Maps output names to container local output file paths. 将输出名称映射到容器本地输出文件路径。
The system will take the data from those files and will make it available for passing to downstream tasks. 系统将从这些文件中获取数据,并将其传递给下游任务。
For each output in the file_outputs map there will be a corresponding output reference available in the task.outputs dictionary. 对于file_outputs map中的每个输出,输出引用对应于task.outputs字典。
These output references can be passed to the other tasks as arguments. The following output names are handled specially by the frontend and 这些输出引用可以作为参数传递给其他任务。
backend: “mlpipeline-ui-metadata” and “mlpipeline-metrics”.
output_artifact_paths – Deprecated. Maps output artifact labels to local artifact file paths. Deprecated: Use file_outputs instead. It now
supports big data outputs.
is_exit_handler – Deprecated. This is no longer needed.
pvolumes – Dictionary for the user to match a path on the op’s fs with a V1Volume or it inherited type.
E.g {“/my/path”: vol, “/mnt”: other_op.pvolumes[“/output”]}.

Example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from kfp import dsl
from kubernetes.client.models import V1EnvVar, V1SecretKeySelector
@dsl.pipeline(
name='foo',
description='hello world')
def foo_pipeline(tag: str, pull_image_policy: str):
# any attributes can be parameterized (both serialized string or actual PipelineParam)
op = dsl.ContainerOp(name='foo',
image='busybox:%s' % tag,
# pass in init_container list
init_containers=[dsl.UserContainer('print', 'busybox:latest', command='echo "hello"')],
# pass in sidecars list
sidecars=[dsl.Sidecar('print', 'busybox:latest', command='echo "hello"')],
# pass in k8s container kwargs
container_kwargs={'env': [V1EnvVar('foo', 'bar')]},
)
# set `imagePullPolicy` property for `container` with `PipelineParam`
op.container.set_image_pull_policy(pull_image_policy)
# add sidecar with parameterized image tag
# sidecar follows the argo sidecar swagger spec
op.add_sidecar(dsl.Sidecar('redis', 'redis:%s' % tag).set_image_pull_policy('Always'))

Methods:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
add_affinity(affinity: kubernetes.client.models.v1_affinity.V1Affinity)
Add K8s Affinity.
Parameters:
affinity – Kubernetes affinity For detailed spec, check affinity definition
https – //github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_affinity.py
Example:
V1Affinity(
node_affinity=V1NodeAffinity(
required_during_scheduling_ignored_during_execution=V1NodeSelector(
node_selector_terms=[V1NodeSelectorTerm(
match_expressions=[V1NodeSelectorRequirement(
key='beta.kubernetes.io/instance-type',
operator='In',
values=['p2.xlarge'])])])))


add_init_container(init_container: kfp.dsl._container_op.UserContainer)
Add a init container to the Op.
Parameters:init_container – UserContainer object.


add_node_selector_constraint(label_name: Union[str, kfp.dsl._pipeline_param.PipelineParam], value: Union[str, kfp.dsl._pipeline_param.PipelineParam]) → kfp.dsl._container_op.ContainerOp
Sets accelerator type requirement for this task.
When compiling for v2, this function can be optionally used with set_gpu_limit to set the number of accelerator required. Otherwise, by default the number requested will be 1.
Parameters:
label_name – The name of the constraint label. For v2, only ‘cloud.google.com/gke-accelerator’ is supported now.
value – The name of the accelerator. For v2, available values include ‘nvidia-tesla-k80’, ‘tpu-v3’.
Returns:
self return to allow chained call with other resource specification.


add_pod_annotation(name: str, value: str)
Adds a pod’s metadata annotation.
Parameters:
name – The name of the annotation.
value – The value of the annotation.


add_pod_label(name: str, value: str) # 为 pod 添加标签
Adds a pod’s metadata label.
Parameters:
name – The name of the label.
value – The value of the label.


add_pvolumes(pvolumes: Dict[str, kubernetes.client.models.v1_volume.V1Volume] = None)
Updates the existing pvolumes dict, extends volumes and volume_mounts and redefines the pvolume attribute.
Parameters:pvolumes – Dictionary. Keys are mount paths, values are Kubernetes volumes or inherited types (e.g. PipelineVolumes).


add_sidecar(sidecar: kfp.dsl._container_op.Sidecar)
Add a sidecar to the Op.
Parameters:sidecar – SideCar object.


add_toleration(tolerations: kubernetes.client.models.v1_toleration.V1Toleration)
Add K8s tolerations.
Parameters:tolerations – Kubernetes toleration For detailed spec, check toleration definition https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_toleration.py


add_volume(volume)
Add K8s volume to the container.
Parameters:
volume – Kubernetes volumes For detailed spec, check volume definition
https – //github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_volume.py


after(*ops)
Specify explicit dependency on other ops.


apply(mod_func)
Applies a modifier function to self.
The function should return the passed object. This is needed to chain “extention methods” to this class.
Example:
from kfp.gcp import use_gcp_secret
task = (
train_op(...)
.set_memory_request('1G')
.apply(use_gcp_secret('user-gcp-sa'))
.set_memory_limit('2G')
)


arguments


attrs_with_pipelineparams= ['node_selector', 'volumes', 'pod_annotations', 'pod_labels', 'num_retries', 'init_containers', 'sidecars', 'tolerations']


command


container
Container object that represents the container property in io.argoproj.workflow.v1alpha1.Template. Can be used to update the container configurations.
Example:
import kfp.dsl as dsl
from kubernetes.client.models import V1EnvVar

@dsl.pipeline(name='example_pipeline')
def immediate_value_pipeline():
op1 = (dsl.ContainerOp(name='example', image='nginx:alpine')
.container
.add_env_variable(V1EnvVar(name='HOST',
value='foo.bar'))
.add_env_variable(V1EnvVar(name='PORT', value='80'))
.parent # return the parent `ContainerOp`
)


container_spec


env_variables


image


inputs
List of PipelineParams that will be converted into input parameters (io.argoproj.workflow.v1alpha1.Inputs) for the argo workflow.


is_v2


set_caching_options(enable_caching: bool) → kfp.dsl._container_op.BaseOp
Sets caching options for the Op.
Parameters:enable_caching – Whether or not to enable caching for this task.
Returns:Self return to allow chained setting calls.


set_display_name(name: str)


set_retry(num_retries: int, policy: Optional[str] = None, backoff_duration: Optional[str] = None, backoff_factor: Optional[float] = None, backoff_max_duration: Optional[str] = None)
Sets the number of times the task is retried until it’s declared failed.
Parameters:
num_retries – Number of times to retry on failures.
policy – Retry policy name.
backoff_duration – The time interval between retries. Defaults to an immediate retry. In case you specify a simple number, the unit defaults to seconds. You can also specify a different unit, for instance, 2m (2 minutes), 1h (1 hour).
backoff_factor – The exponential backoff factor applied to backoff_duration. For example, if backoff_duration=”60” (60 seconds) and backoff_factor=2, the first retry will happen after 60 seconds, then after 120, 240, and so on.
backoff_max_duration – The maximum interval that can be reached with the backoff strategy.


set_timeout(seconds: int)
Sets the timeout for the task in seconds.
Parameters: secondsNumber of seconds.

kfp.dsl.ExitHandler

source

1
2
3
4
5
6
7
8
classkfp.dsl.ExitHandler(exit_op: kfp.dsl._container_op.ContainerOp, name: Optional[str] = None)[source]
Bases: kfp.dsl._ops_group.OpsGroup

Represents an exit handler that is invoked upon exiting a group of ops.

Parameters:exit_op – An operator invoked at exiting a group of ops.

Raises:ValueError – Raised if the exit_op is invalid.

Example

1
2
3
4
exit_op = ContainerOp(...)
with ExitHandler(exit_op):
op1 = ContainerOp(...)
op2 = ContainerOp(...)

methods

1
2
3
4
5
after(*ops)
Specify explicit dependency on other ops.


remove_op_recursive(op)

kfp.dsl.InputArgumentPath

source

1
2
3
class kfp.dsl.InputArgumentPath(argument, input=None, path=None)

Bases: object

kfp.dsl.ParallelFor

class kfp.dsl.ParallelFor(loop_args: Union[List[Union[int, float, str, Dict[str, Any]]], kfp.dsl._pipeline_param.PipelineParam], parallelism: int = None)[source]

Bases: kfp.dsl._ops_group.OpsGroup

Represents a parallel for loop over a static set of items.

Example

In this case op1 would be executed twice, once with case args=['echo 1'] and once with case args=['echo 2']:

1
2
3
with dsl.ParallelFor([{'a': 1, 'b': 10}, {'a': 2, 'b': 20}]) as item:
op1 = ContainerOp(..., args=['echo {}'.format(item.a)])
op2 = ContainerOp(..., args=['echo {}'.format(item.b])

TYPE_NAME = ‘for_loop’

after(*ops)

Specify explicit dependency on other ops.

remove_op_recursive(op)

kfp.dsl.PipelineConf

source

1
2
3
4
class kfp.dsl.PipelineConf
Bases: object

PipelineConf contains pipeline level settings.

add_op_transformer(transformer)[source]

Configures the op_transformers which will be applied to all ops in the pipeline. The ops can be ResourceOp, VolumeOp, or ContainerOp.

Parameters:transformer – A function that takes a kfp Op as input and returns a kfp Op

data_passing_method

set_default_pod_node_selector(label_name: str, value: str)[source]

Add a constraint for nodeSelector for a pipeline.

Each constraint is a key-value pair label.

For the container to be eligible to run on a node, the node must have each of the constraints appeared as labels.

Parameters:
  • label_name – The name of the constraint label.
  • value – The value of the constraint label.

set_dns_config(dns_config: kubernetes.client.models.v1_pod_dns_config.V1PodDNSConfig)[source]

Set the dnsConfig to be given to each pod.

Parameters:dns_config – Kubernetes V1PodDNSConfig For detailed description, check Kubernetes V1PodDNSConfig definition https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1PodDNSConfig.md

Example

1
2
3
4
5
6
7
import kfp
from kubernetes.client.models import V1PodDNSConfig, V1PodDNSConfigOption
pipeline_conf = kfp.dsl.PipelineConf()
pipeline_conf.set_dns_config(dns_config=V1PodDNSConfig(
nameservers=["1.2.3.4"],
options=[V1PodDNSConfigOption(name="ndots", value="2")],
))

set_image_pull_policy(policy: str)[source]

Configures the default image pull policy.

Parameters:policy – the pull policy, has to be one of: Always, Never, IfNotPresent. For more info: https://github.com/kubernetes-client/python/blob/10a7f95435c0b94a6d949ba98375f8cc85a70e5a/kubernetes/docs/V1Container.md

set_image_pull_secrets(image_pull_secrets)[source]

Configures the pipeline level imagepullsecret.

Parameters:image_pull_secrets – a list of Kubernetes V1LocalObjectReference For detailed description, check Kubernetes V1LocalObjectReference definition https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1LocalObjectReference.md

set_parallelism(max_num_pods: int)[source]

Configures the max number of total parallel pods that can execute at the same time in a workflow.

Parameters:max_num_pods – max number of total parallel pods.

set_pod_disruption_budget(min_available: Union[int, str])[source]

PodDisruptionBudget holds the number of concurrent disruptions that you allow for pipeline Pods.

Parameters:min_available (Union[int, str]) – An eviction is allowed if at least “minAvailable” pods selected by “selector” will still be available after the eviction, i.e. even in the absence of the evicted pod. So for example you can prevent all voluntary evictions by specifying “100%”. “minAvailable” can be either an absolute number or a percentage.

set_timeout(seconds: int)[source]

Configures the pipeline level timeout.

Parameters:seconds – number of seconds for timeout

set_ttl_seconds_after_finished(seconds: int)[source]

Configures the ttl after the pipeline has finished.

Parameters:seconds – number of seconds for the workflow to be garbage collected after it is finished.

class kfp.dsl.PipelineExecutionMode[source]

Bases: enum.Enum

An enumeration.

V1_LEGACY = 1

V2_COMPATIBLE = 2

V2_ENGINE = 3

class kfp.dsl.PipelineParam(name: str, op_name: Optional[str] = None, value: Optional[str] = None, param_type: Union[str, Dict[KT, VT], None] = None, pattern: Optional[str] = None)[source]

Bases: object

Representing a future value that is passed between pipeline components.

A PipelineParam object can be used as a pipeline function argument so that it will be a pipeline parameter that shows up in ML Pipelines system UI. It can also represent an intermediate value passed between components.

Parameters:
  • name – name of the pipeline parameter.
  • op_name – the name of the operation that produces the PipelineParam. None means it is not produced by any operator, so if None, either user constructs it directly (for providing an immediate value), or it is a pipeline function argument.
  • value – The actual value of the PipelineParam. If provided, the PipelineParam is “resolved” immediately. For now, we support string only.
  • param_type – the type of the PipelineParam.
  • pattern – the serialized string regex pattern this pipeline parameter created from.

Raises: ValueError in name or op_name contains invalid characters, or both

op_name and value are set.

full_name

Unique name in the argo yaml for the PipelineParam.

ignore_type()[source]

ignore_type ignores the type information such that type checking would also pass.

to_struct()[source]

class kfp.dsl.PipelineVolume(pvc: str = None, volume: kubernetes.client.models.v1_volume.V1Volume = None, **kwargs)[source]

Bases: kubernetes.client.models.v1_volume.V1Volume

Representing a volume that is passed between pipeline operators and is.

to be mounted by a ContainerOp or its inherited type.

A PipelineVolume object can be used as an extention of the pipeline function’s filesystem. It may then be passed between ContainerOps, exposing dependencies.

TODO(https://github.com/kubeflow/pipelines/issues/4822): Determine the

stability level of this feature.

Parameters:
  • pvc – The name of an existing PVC
  • volume – Create a deep copy out of a V1Volume or PipelineVolume with no deps
Raises:

ValueError – If volume is not None and kwargs is not None If pvc is not None and kwargs.pop(“name”) is not None

after(*ops)[source]

Creates a duplicate of self with the required dependecies excluding the redundant dependenices.

Parameters:*ops – Pipeline operators to add as dependencies

attribute_map = {‘aws_elastic_block_store’: ‘awsElasticBlockStore’, ‘azure_disk’: ‘azureDisk’, ‘azure_file’: ‘azureFile’, ‘cephfs’: ‘cephfs’, ‘cinder’: ‘cinder’, ‘config_map’: ‘configMap’, ‘csi’: ‘csi’, ‘downward_api’: ‘downwardAPI’, ‘empty_dir’: ‘emptyDir’, ‘fc’: ‘fc’, ‘flex_volume’: ‘flexVolume’, ‘flocker’: ‘flocker’, ‘gce_persistent_disk’: ‘gcePersistentDisk’, ‘git_repo’: ‘gitRepo’, ‘glusterfs’: ‘glusterfs’, ‘host_path’: ‘hostPath’, ‘iscsi’: ‘iscsi’, ‘name’: ‘name’, ‘nfs’: ‘nfs’, ‘persistent_volume_claim’: ‘persistentVolumeClaim’, ‘photon_persistent_disk’: ‘photonPersistentDisk’, ‘portworx_volume’: ‘portworxVolume’, ‘projected’: ‘projected’, ‘quobyte’: ‘quobyte’, ‘rbd’: ‘rbd’, ‘scale_io’: ‘scaleIO’, ‘secret’: ‘secret’, ‘storageos’: ‘storageos’, ‘vsphere_volume’: ‘vsphereVolume’}

aws_elastic_block_store

E501

Returns:The aws_elastic_block_store of this V1Volume. # noqa: E501
Return type:V1AWSElasticBlockStoreVolumeSource
Type:Gets the aws_elastic_block_store of this V1Volume. # noqa

azure_disk

E501

Returns:The azure_disk of this V1Volume. # noqa: E501
Return type:V1AzureDiskVolumeSource
Type:Gets the azure_disk of this V1Volume. # noqa

azure_file

E501

Returns:The azure_file of this V1Volume. # noqa: E501
Return type:V1AzureFileVolumeSource
Type:Gets the azure_file of this V1Volume. # noqa

cephfs

E501

Returns:The cephfs of this V1Volume. # noqa: E501
Return type:V1CephFSVolumeSource
Type:Gets the cephfs of this V1Volume. # noqa

cinder

E501

Returns:The cinder of this V1Volume. # noqa: E501
Return type:V1CinderVolumeSource
Type:Gets the cinder of this V1Volume. # noqa

config_map

E501

Returns:The config_map of this V1Volume. # noqa: E501
Return type:V1ConfigMapVolumeSource
Type:Gets the config_map of this V1Volume. # noqa

csi

E501

Returns:The csi of this V1Volume. # noqa: E501
Return type:V1CSIVolumeSource
Type:Gets the csi of this V1Volume. # noqa

downward_api

E501

Returns:The downward_api of this V1Volume. # noqa: E501
Return type:V1DownwardAPIVolumeSource
Type:Gets the downward_api of this V1Volume. # noqa

empty_dir

E501

Returns:The empty_dir of this V1Volume. # noqa: E501
Return type:V1EmptyDirVolumeSource
Type:Gets the empty_dir of this V1Volume. # noqa

fc

E501

Returns:The fc of this V1Volume. # noqa: E501
Return type:V1FCVolumeSource
Type:Gets the fc of this V1Volume. # noqa

flex_volume

E501

Returns:The flex_volume of this V1Volume. # noqa: E501
Return type:V1FlexVolumeSource
Type:Gets the flex_volume of this V1Volume. # noqa

flocker

E501

Returns:The flocker of this V1Volume. # noqa: E501
Return type:V1FlockerVolumeSource
Type:Gets the flocker of this V1Volume. # noqa

gce_persistent_disk

E501

Returns:The gce_persistent_disk of this V1Volume. # noqa: E501
Return type:V1GCEPersistentDiskVolumeSource
Type:Gets the gce_persistent_disk of this V1Volume. # noqa

git_repo

E501

Returns:The git_repo of this V1Volume. # noqa: E501
Return type:V1GitRepoVolumeSource
Type:Gets the git_repo of this V1Volume. # noqa

glusterfs

E501

Returns:The glusterfs of this V1Volume. # noqa: E501
Return type:V1GlusterfsVolumeSource
Type:Gets the glusterfs of this V1Volume. # noqa

host_path

E501

Returns:The host_path of this V1Volume. # noqa: E501
Return type:V1HostPathVolumeSource
Type:Gets the host_path of this V1Volume. # noqa

iscsi

E501

Returns:The iscsi of this V1Volume. # noqa: E501
Return type:V1ISCSIVolumeSource
Type:Gets the iscsi of this V1Volume. # noqa

name

E501

Volume’s name. Must be a DNS_LABEL and unique within the pod. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names # noqa: E501

Returns:The name of this V1Volume. # noqa: E501
Return type:str
Type:Gets the name of this V1Volume. # noqa

nfs

E501

Returns:The nfs of this V1Volume. # noqa: E501
Return type:V1NFSVolumeSource
Type:Gets the nfs of this V1Volume. # noqa

openapi_types = {‘aws_elastic_block_store’: ‘V1AWSElasticBlockStoreVolumeSource’, ‘azure_disk’: ‘V1AzureDiskVolumeSource’, ‘azure_file’: ‘V1AzureFileVolumeSource’, ‘cephfs’: ‘V1CephFSVolumeSource’, ‘cinder’: ‘V1CinderVolumeSource’, ‘config_map’: ‘V1ConfigMapVolumeSource’, ‘csi’: ‘V1CSIVolumeSource’, ‘downward_api’: ‘V1DownwardAPIVolumeSource’, ‘empty_dir’: ‘V1EmptyDirVolumeSource’, ‘fc’: ‘V1FCVolumeSource’, ‘flex_volume’: ‘V1FlexVolumeSource’, ‘flocker’: ‘V1FlockerVolumeSource’, ‘gce_persistent_disk’: ‘V1GCEPersistentDiskVolumeSource’, ‘git_repo’: ‘V1GitRepoVolumeSource’, ‘glusterfs’: ‘V1GlusterfsVolumeSource’, ‘host_path’: ‘V1HostPathVolumeSource’, ‘iscsi’: ‘V1ISCSIVolumeSource’, ‘name’: ‘str’, ‘nfs’: ‘V1NFSVolumeSource’, ‘persistent_volume_claim’: ‘V1PersistentVolumeClaimVolumeSource’, ‘photon_persistent_disk’: ‘V1PhotonPersistentDiskVolumeSource’, ‘portworx_volume’: ‘V1PortworxVolumeSource’, ‘projected’: ‘V1ProjectedVolumeSource’, ‘quobyte’: ‘V1QuobyteVolumeSource’, ‘rbd’: ‘V1RBDVolumeSource’, ‘scale_io’: ‘V1ScaleIOVolumeSource’, ‘secret’: ‘V1SecretVolumeSource’, ‘storageos’: ‘V1StorageOSVolumeSource’, ‘vsphere_volume’: ‘V1VsphereVirtualDiskVolumeSource’}

persistent_volume_claim

E501

Returns:The persistent_volume_claim of this V1Volume. # noqa: E501
Return type:V1PersistentVolumeClaimVolumeSource
Type:Gets the persistent_volume_claim of this V1Volume. # noqa

photon_persistent_disk

E501

Returns:The photon_persistent_disk of this V1Volume. # noqa: E501
Return type:V1PhotonPersistentDiskVolumeSource
Type:Gets the photon_persistent_disk of this V1Volume. # noqa

portworx_volume

E501

Returns:The portworx_volume of this V1Volume. # noqa: E501
Return type:V1PortworxVolumeSource
Type:Gets the portworx_volume of this V1Volume. # noqa

projected

E501

Returns:The projected of this V1Volume. # noqa: E501
Return type:V1ProjectedVolumeSource
Type:Gets the projected of this V1Volume. # noqa

quobyte

E501

Returns:The quobyte of this V1Volume. # noqa: E501
Return type:V1QuobyteVolumeSource
Type:Gets the quobyte of this V1Volume. # noqa

rbd

E501

Returns:The rbd of this V1Volume. # noqa: E501
Return type:V1RBDVolumeSource
Type:Gets the rbd of this V1Volume. # noqa

scale_io

E501

Returns:The scale_io of this V1Volume. # noqa: E501
Return type:V1ScaleIOVolumeSource
Type:Gets the scale_io of this V1Volume. # noqa

secret

E501

Returns:The secret of this V1Volume. # noqa: E501
Return type:V1SecretVolumeSource
Type:Gets the secret of this V1Volume. # noqa

storageos

E501

Returns:The storageos of this V1Volume. # noqa: E501
Return type:V1StorageOSVolumeSource
Type:Gets the storageos of this V1Volume. # noqa

to_dict()

Returns the model properties as a dict

to_str()

Returns the string representation of the model

vsphere_volume

E501

Returns:The vsphere_volume of this V1Volume. # noqa: E501
Return type:V1VsphereVirtualDiskVolumeSource
Type:Gets the vsphere_volume of this V1Volume. # noqa

class kfp.dsl.ResourceOp(k8s_resource=None, action: str = ‘create’, merge_strategy: str = None, success_condition: str = None, failure_condition: str = None, set_owner_reference: bool = None, attribute_outputs: Optional[Dict[str, str]] = None, flags: Optional[List[str]] = None, **kwargs)[source]

Bases: kfp.dsl._container_op.BaseOp

Represents an op which will be translated into a resource template.

TODO(https://github.com/kubeflow/pipelines/issues/4822): Determine the

stability level of this feature.

Parameters:
  • k8s_resource – A k8s resource which will be submitted to the cluster
  • action – One of “create”/”delete”/”apply”/”patch” (default is “create”)
  • merge_strategy – The merge strategy for the “apply” action
  • success_condition – The successCondition of the template
  • failure_condition – The failureCondition of the template For more info see: https://github.com/argoproj/argo-workflows/blob/master/examples/k8s-jobs.yaml
  • attribute_outputs – Maps output labels to resource’s json paths, similarly to file_outputs of ContainerOp
  • kwargs – name, sidecars. See BaseOp definition
Raises:

ValueError – if not inside a pipeline if the name is an invalid string if no k8s_resource is provided if merge_strategy is set without “apply” action

add_affinity(affinity: kubernetes.client.models.v1_affinity.V1Affinity)

Add K8s Affinity.

Parameters:
  • affinity – Kubernetes affinity For detailed spec, check affinity definition
  • https – //github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_affinity.py

Example:

1
2
3
4
5
6
7
8
V1Affinity(
node_affinity=V1NodeAffinity(
required_during_scheduling_ignored_during_execution=V1NodeSelector(
node_selector_terms=[V1NodeSelectorTerm(
match_expressions=[V1NodeSelectorRequirement(
key='beta.kubernetes.io/instance-type',
operator='In',
values=['p2.xlarge'])])])))

add_init_container(init_container: kfp.dsl._container_op.UserContainer)

Add a init container to the Op.

Parameters:init_container – UserContainer object.

add_node_selector_constraint(label_name: Union[str, kfp.dsl._pipeline_param.PipelineParam], value: Union[str, kfp.dsl._pipeline_param.PipelineParam])

Add a constraint for nodeSelector.

Each constraint is a key-value pair label. For the container to be eligible to run on a node, the node must have each of the constraints appeared as labels.

Parameters:
  • label_name (Union[str, PipelineParam]) – The name of the constraint label.
  • value (Union[str, PipelineParam]) – The value of the constraint label.

add_pod_annotation(name: str, value: str)

Adds a pod’s metadata annotation.

Parameters:
  • name – The name of the annotation.
  • value – The value of the annotation.

add_pod_label(name: str, value: str)

Adds a pod’s metadata label.

Parameters:
  • name – The name of the label.
  • value – The value of the label.

add_sidecar(sidecar: kfp.dsl._container_op.Sidecar)

Add a sidecar to the Op.

Parameters:sidecar – SideCar object.

add_toleration(tolerations: kubernetes.client.models.v1_toleration.V1Toleration)

Add K8s tolerations.

Parameters:tolerations – Kubernetes toleration For detailed spec, check toleration definition https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_toleration.py

add_volume(volume)

Add K8s volume to the container.

Parameters:
  • volume – Kubernetes volumes For detailed spec, check volume definition
  • https – //github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_volume.py

after(*ops)

Specify explicit dependency on other ops.

apply(mod_func)

Applies a modifier function to self.

The function should return the passed object. This is needed to chain “extention methods” to this class.

Example:

1
2
3
4
5
6
7
from kfp.gcp import use_gcp_secret
task = (
train_op(...)
.set_memory_request('1G')
.apply(use_gcp_secret('user-gcp-sa'))
.set_memory_limit('2G')
)

attrs_with_pipelineparams = [‘node_selector’, ‘volumes’, ‘pod_annotations’, ‘pod_labels’, ‘num_retries’, ‘init_containers’, ‘sidecars’, ‘tolerations’]

delete(flags: Optional[List[str]] = None)[source]

Returns a ResourceOp which deletes the resource.

inputs

List of PipelineParams that will be converted into input parameters (io.argoproj.workflow.v1alpha1.Inputs) for the argo workflow.

resource

Resource object that represents the resource property in io.argoproj.workflow.v1alpha1.Template.

set_caching_options(enable_caching: bool) → kfp.dsl._container_op.BaseOp

Sets caching options for the Op.

Parameters:enable_caching – Whether or not to enable caching for this task.
Returns:Self return to allow chained setting calls.

set_display_name(name: str)

set_retry(num_retries: int, policy: Optional[str] = None, backoff_duration: Optional[str] = None, backoff_factor: Optional[float] = None, backoff_max_duration: Optional[str] = None)

Sets the number of times the task is retried until it’s declared failed.

Parameters:
  • num_retries – Number of times to retry on failures.
  • policy – Retry policy name.
  • backoff_duration – The time interval between retries. Defaults to an immediate retry. In case you specify a simple number, the unit defaults to seconds. You can also specify a different unit, for instance, 2m (2 minutes), 1h (1 hour).
  • backoff_factor – The exponential backoff factor applied to backoff_duration. For example, if backoff_duration=”60” (60 seconds) and backoff_factor=2, the first retry will happen after 60 seconds, then after 120, 240, and so on.
  • backoff_max_duration – The maximum interval that can be reached with the backoff strategy.

set_timeout(seconds: int)

Sets the timeout for the task in seconds.

Parameters:seconds – Number of seconds.

kfp.dsl.Sidecar

source

继承关系:kfp.dsl.Sidecar --> kfp.dsl.UserContainer --> kfp.dsl.Container--> kubernetes.client.models.V1Container

1
2
3
4
5
6
7
8
9
10
11
12
13
classkfp.dsl.Sidecar(name: str, image: str, command: Union[str, List[str]] = None, args: Union[str, List[str]] = None, mirror_volume_mounts: bool = None, **kwargs)

Bases: kfp.dsl._container_op.UserContainer

Creates a new instance of Sidecar.

Parameters:
name – unique name for the sidecar container
image – image to use for the sidecar container, e.g. redis:alpine
command – entrypoint array. Not executed within a shell.
args – arguments to the entrypoint.
mirror_volume_mounts – MirrorVolumeMounts will mount the same volumes specified in the main container to the sidecar (including artifacts), at the same mountPaths. This enables dind daemon to partially see the same filesystem as the main container in order to use features such as docker volume binding
**kwargs – keyword arguments available for Container

method:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
add_env_from(env_from) → kfp.dsl._container_op.Container
Add a source to populate environment variables int the container.

Parameters:
env_from – Kubernetes environment from source For detailed spec, check environment from source definition
https – //github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_env_var_source.py


add_env_variable(env_variable) → kfp.dsl._container_op.Container
Add environment variable to the container.

Parameters:
env_variable – Kubernetes environment variable For detailed spec, check environment variable definition
https – //github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_env_var.py


add_port(container_port) → kfp.dsl._container_op.Container
Add a container port to the container.

Parameters:
container_port – Kubernetes container port For detailed spec, check container port definition
https – //github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_container_port.py


add_resource_limit(resource_name, value) → kfp.dsl._container_op.Container
Add the resource limit of the container.

Parameters:
resource_name – The name of the resource. It can be cpu, memory, etc.
value – The string value of the limit.


add_resource_request(resource_name, value) → kfp.dsl._container_op.Container
Add the resource request of the container.

Parameters:
resource_name – The name of the resource. It can be cpu, memory, etc.
value – The string value of the request.


add_volume_devices(volume_device) → kfp.dsl._container_op.Container
Add a block device to be used by the container.

Parameters:
volume_device – Kubernetes volume device For detailed spec, volume device definition
https – //github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_volume_device.py


add_volume_mount(volume_mount) → kfp.dsl._container_op.Container
Add volume to the container.

Parameters:
volume_mount – Kubernetes volume mount For detailed spec, check volume mount definition
https – //github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_volume_mount.py


args
E501
Arguments to the entrypoint. The docker image’s CMD is used if this is not provided. Variable references $(VAR_NAME) are expanded using the container’s environment. If a variable cannot be resolved, the reference in the input string will be unchanged. The $(VAR_NAME) syntax can be escaped with a double $$, ie: $$(VAR_NAME). Escaped references will never be expanded, regardless of whether the variable exists or not. Cannot be updated. More info: https://kubernetes.io/docs/tasks/inject-data-application/define-command-argument-container/#running-a-command-in-a-shell # noqa: E501

Returns:The args of this V1Container. # noqa: E501
Return type:list[str]
Type:Gets the args of this V1Container. # noqa
attribute_map= {'args': 'args', 'command': 'command', 'env': 'env', 'env_from': 'envFrom', 'image': 'image', 'image_pull_policy': 'imagePullPolicy', 'lifecycle': 'lifecycle', 'liveness_probe': 'livenessProbe', 'mirror_volume_mounts': 'mirrorVolumeMounts', 'name': 'name', 'ports': 'ports', 'readiness_probe': 'readinessProbe', 'resources': 'resources', 'security_context': 'securityContext', 'startup_probe': 'startupProbe', 'stdin': 'stdin', 'stdin_once': 'stdinOnce', 'termination_message_path': 'terminationMessagePath', 'termination_message_policy': 'terminationMessagePolicy', 'tty': 'tty', 'volume_devices': 'volumeDevices', 'volume_mounts': 'volumeMounts', 'working_dir': 'workingDir'}


command
E501
Entrypoint array. Not executed within a shell. The docker image’s ENTRYPOINT is used if this is not provided. Variable references $(VAR_NAME) are expanded using the container’s environment. If a variable cannot be resolved, the reference in the input string will be unchanged. The $(VAR_NAME) syntax can be escaped with a double $$, ie: $$(VAR_NAME). Escaped references will never be expanded, regardless of whether the variable exists or not. Cannot be updated. More info: https://kubernetes.io/docs/tasks/inject-data-application/define-command-argument-container/#running-a-command-in-a-shell # noqa: E501

Returns:The command of this V1Container. # noqa: E501
Return type:list[str]
Type:Gets the command of this V1Container. # noqa


env
E501
List of environment variables to set in the container. Cannot be updated. # noqa: E501

Returns:The env of this V1Container. # noqa: E501
Return type:list[V1EnvVar]
Type:Gets the env of this V1Container. # noqa


env_from
E501
List of sources to populate environment variables in the container. The keys defined within a source must be a C_IDENTIFIER. All invalid keys will be reported as an event when the container is starting. When a key exists in multiple sources, the value associated with the last source will take precedence. Values defined by an Env with a duplicate key will take precedence. Cannot be updated. # noqa: E501

Returns:The env_from of this V1Container. # noqa: E501
Return type:list[V1EnvFromSource]
Type:Gets the env_from of this V1Container. # noqa


get_resource_limit(resource_name: str) → Optional[str]
Get the resource limit of the container.
Parameters:resource_name – The name of the resource. It can be cpu, memory, etc.


get_resource_request(resource_name: str) → Optional[str]
Get the resource request of the container.
Parameters: resource_name – The name of the resource. It can be cpu, memory, etc.


image
E501
Docker image name. More info: https://kubernetes.io/docs/concepts/containers/images This field is optional to allow higher level config management to default or override container images in workload controllers like Deployments and StatefulSets. # noqa: E501

Returns:The image of this V1Container. # noqa: E501
Return type:str
Type:Gets the image of this V1Container. # noqa


image_pull_policy
E501
Image pull policy. One of Always, Never, IfNotPresent. Defaults to Always if :latest tag is specified, or IfNotPresent otherwise. Cannot be updated. More info: https://kubernetes.io/docs/concepts/containers/images#updating-images # noqa: E501

Returns:The image_pull_policy of this V1Container. # noqa: E501
Return type:str
Type: Gets the image_pull_policy of this V1Container. # noqa


inputs
A list of PipelineParam found in the UserContainer object.


lifecycle
E501
Returns:The lifecycle of this V1Container. # noqa: E501
Return type:V1Lifecycle
Type:Gets the lifecycle of this V1Container. # noqa


liveness_probe
E501
Returns:The liveness_probe of this V1Container. # noqa: E501
Return type:V1Probe
Type:Gets the liveness_probe of this V1Container. # noqa


name
E501
Name of the container specified as a DNS_LABEL. Each container in a pod must have a unique name (DNS_LABEL). Cannot be updated. # noqa: E501
Returns:The name of this V1Container. # noqa: E501
Return type:str
Type:Gets the name of this V1Container. # noqa
openapi_types= {'args': 'list[str]', 'command': 'list[str]', 'env': 'list[V1EnvVar]', 'env_from': 'list[V1EnvFromSource]', 'image': 'str', 'image_pull_policy': 'str', 'lifecycle': 'V1Lifecycle', 'liveness_probe': 'V1Probe', 'mirror_volume_mounts': 'bool', 'name': 'str', 'ports': 'list[V1ContainerPort]', 'readiness_probe': 'V1Probe', 'resources': 'V1ResourceRequirements', 'security_context': 'V1SecurityContext', 'startup_probe': 'V1Probe', 'stdin': 'bool', 'stdin_once': 'bool', 'termination_message_path': 'str', 'termination_message_policy': 'str', 'tty': 'bool', 'volume_devices': 'list[V1VolumeDevice]', 'volume_mounts': 'list[V1VolumeMount]', 'working_dir': 'str'}


ports
E501
List of ports to expose from the container. Exposing a port here gives the system additional information about the network connections a container uses, but is primarily informational. Not specifying a port here DOES NOT prevent that port from being exposed. Any port which is listening on the default0.0.0.0” address inside a container will be accessible from the network. Cannot be updated. # noqa: E501
Returns:The ports of this V1Container. # noqa: E501
Return type:list[V1ContainerPort]
Type:Gets the ports of this V1Container. # noqa


readiness_probe
E501
Returns:The readiness_probe of this V1Container. # noqa: E501
Return type:V1Probe
Type:Gets the readiness_probe of this V1Container. # noqa


resources
E501
Returns:The resources of this V1Container. # noqa: E501
Return type:V1ResourceRequirements
Type:Gets the resources of this V1Container. # noqa


security_context
E501
Returns:The security_context of this V1Container. # noqa: E501
Return type:V1SecurityContext
Type:Gets the security_context of this V1Container. # noqa


set_cpu_limit(cpu: Union[str, kfp.dsl._pipeline_param.PipelineParam]) → kfp.dsl._container_op.Container
Set cpu limit (maximum) for this operator.
Parameters:cpu (Union[str, PipelineParam]) – A string which can be a number or a number followed by “m”, which means 1/1000.


set_cpu_request(cpu: Union[str, kfp.dsl._pipeline_param.PipelineParam]) → kfp.dsl._container_op.Container
Set cpu request (minimum) for this operator.
Parameters:cpu (Union[str, PipelineParam]) – A string which can be a number or a number followed by “m”, which means 1/1000.


set_env_variable(name: str, value: str) → kfp.dsl._container_op.Container
Sets environment variable to the container (v2 only).

Parameters:
name – The name of the environment variable.
value – The value of the environment variable.


set_ephemeral_storage_limit(size) → kfp.dsl._container_op.Container
Set ephemeral-storage request (maximum) for this operator.

Parameters:size – a string which can be a number or a number followed by one of “E”, “P”, “T”, “G”, “M”, “K”.


set_ephemeral_storage_request(size) → kfp.dsl._container_op.Container
Set ephemeral-storage request (minimum) for this operator.

Parameters:size – a string which can be a number or a number followed by one of “E”, “P”, “T”, “G”, “M”, “K”.


set_gpu_limit(gpu: Union[str, kfp.dsl._pipeline_param.PipelineParam], vendor: Union[str, kfp.dsl._pipeline_param.PipelineParam] = 'nvidia') → kfp.dsl._container_op.Container
Set gpu limit for the operator.
This function add ‘<vendor>.com/gpu’ into resource limit. Note that there is no need to add GPU request. GPUs are only supposed to be specified in the limits section. See https://kubernetes.io/docs/tasks/manage-gpus/scheduling-gpus/.

Parameters:
gpu (Union[str, PipelineParam]) – A string which must be a positive number.
vendor (Union[str, PipelineParam]) – Optional. A string which is the vendor of the requested gpu. The supported values are: ‘nvidia’ (default), and ‘amd’. The value is ignored in v2.


set_image_pull_policy(image_pull_policy) → kfp.dsl._container_op.Container
Set image pull policy for the container.
Parameters:image_pull_policy – One of Always, Never, IfNotPresent.


set_lifecycle(lifecycle) → kfp.dsl._container_op.Container
Setup a lifecycle config for the container.

Parameters:
lifecycle – Kubernetes lifecycle For detailed spec, lifecycle definition
https – //github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_lifecycle.py


set_liveness_probe(liveness_probe) → kfp.dsl._container_op.Container
Set a liveness probe for the container.

Parameters:
liveness_probe – Kubernetes liveness probe For detailed spec, check probe definition
https – //github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_probe.py


set_memory_limit(memory: Union[str, kfp.dsl._pipeline_param.PipelineParam]) → kfp.dsl._container_op.Container
Set memory limit (maximum) for this operator.

Parameters: memory (Union[str, PipelineParam]) – a string which can be a number or a number followed by one of “E”, “P”, “T”, “G”, “M”, “K”.


set_memory_request(memory: Union[str, kfp.dsl._pipeline_param.PipelineParam]) → kfp.dsl._container_op.Container
Set memory request (minimum) for this operator.

Parameters: memory (Union[str, PipelineParam]) – a string which can be a number or a number followed by one of “E”, “P”, “T”, “G”, “M”, “K”.


set_mirror_volume_mounts(mirror_volume_mounts=True)
Setting mirrorVolumeMounts to true will mount the same volumes specified in the main container to the container (including artifacts), at the same mountPaths. This enables dind daemon to partially see the same filesystem as the main container in order to use features such as docker volume binding.

Parameters: mirror_volume_mounts – boolean flag


set_readiness_probe(readiness_probe) → kfp.dsl._container_op.Container
Set a readiness probe for the container.

Parameters:
readiness_probe – Kubernetes readiness probe For detailed spec, check probe definition
https – //github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_probe.py


set_security_context(security_context) → kfp.dsl._container_op.Container
Set security configuration to be applied on the container.

Parameters:
security_context – Kubernetes security context For detailed spec, check security context definition
https – //github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_security_context.py


set_stdin(stdin=True) → kfp.dsl._container_op.Container
Whether this container should allocate a buffer for stdin in the container runtime. If this is not set, reads from stdin in the container will always result in EOF.

Parameters: stdin – boolean flag


set_stdin_once(stdin_once=True) → kfp.dsl._container_op.Container
Whether the container runtime should close the stdin channel after it has been opened by a single attach. When stdin is true the stdin stream will remain open across multiple attach sessions. If stdinOnce is set to true, stdin is opened on container start, is empty until the first client attaches to stdin, and then remains open and accepts data until the client disconnects, at which time stdin is closed and remains closed until the container is restarted. If this flag is false, a container processes that reads from stdin will never receive an EOF.

Parameters: stdin_once – boolean flag


set_termination_message_path(termination_message_path) → kfp.dsl._container_op.Container
Path at which the file to which the container’s termination message will be written is mounted into the container’s filesystem. Message written is intended to be brief final status, such as an assertion failure message. Will be truncated by the node if greater than 4096 bytes. The total message length across all containers will be limited to 12kb.

Parameters: termination_message_path – path for the termination message


set_termination_message_policy(termination_message_policy) → kfp.dsl._container_op.Container
Indicate how the termination message should be populated. File will use the contents of terminationMessagePath to populate the container status message on both success and failure. FallbackToLogsOnError will use the last chunk of container log output if the termination message file is empty and the container exited with an error. The log output is limited to 2048 bytes or 80 lines, whichever is smaller.

Parameters: termination_message_policy – File or FallbackToLogsOnError


set_tty(tty: bool = True) → kfp.dsl._container_op.Container
Whether this container should allocate a TTY for itself, also requires ‘stdin’ to be true.

Parameters: tty – boolean flag


startup_probe
E501
Returns: The startup_probe of this V1Container. # noqa: E501
Return type: V1Probe
Type: Gets the startup_probe of this V1Container. # noqa


stdin
E501
Whether this container should allocate a buffer for stdin in the container runtime. If this is not set, reads from stdin in the container will always result in EOF. Default is false. # noqa: E501
Returns: The stdin of this V1Container. # noqa: E501
Return type: bool
Type: Gets the stdin of this V1Container. # noqa


stdin_once
E501
Whether the container runtime should close the stdin channel after it has been opened by a single attach. When stdin is true the stdin stream will remain open across multiple attach sessions. If stdinOnce is set to true, stdin is opened on container start, is empty until the first client attaches to stdin, and then remains open and accepts data until the client disconnects, at which time stdin is closed and remains closed until the container is restarted. If this flag is false, a container processes that reads from stdin will never receive an EOF. Default is false # noqa: E501

Returns: The stdin_once of this V1Container. # noqa: E501
Return type: bool
Type: Gets the stdin_once of this V1Container. # noqa


termination_message_path
E501
Optional: Path at which the file to which the container’s termination message will be written is mounted into the container’s filesystem. Message written is intended to be brief final status, such as an assertion failure message. Will be truncated by the node if greater than 4096 bytes. The total message length across all containers will be limited to 12kb. Defaults to /dev/termination-log. Cannot be updated. # noqa: E501

Returns: The termination_message_path of this V1Container. # noqa: E501
Return type: str
Type: Gets the termination_message_path of this V1Container. # noqa


termination_message_policy
E501
Indicate how the termination message should be populated. File will use the contents of terminationMessagePath to populate the container status message on both success and failure. FallbackToLogsOnError will use the last chunk of container log output if the termination message file is empty and the container exited with an error. The log output is limited to 2048 bytes or 80 lines, whichever is smaller. Defaults to File. Cannot be updated. # noqa: E501
Returns: The termination_message_policy of this V1Container. # noqa: E501
Return type: str
Type: Gets the termination_message_policy of this V1Container. # noqa


to_dict()
Returns the model properties as a dict

to_str()
Returns the string representation of the model

tty
E501
Whether this container should allocate a TTY for itself, also requires ‘stdin’ to be true. Default is false. # noqa: E501
Returns: The tty of this V1Container. # noqa: E501
Return type: bool
Type: Gets the tty of this V1Container. # noqa


volume_devices
E501
volumeDevices is the list of block devices to be used by the container. # noqa: E501
Returns: The volume_devices of this V1Container. # noqa: E501
Return type: list[V1VolumeDevice]
Type: Gets the volume_devices of this V1Container. # noqa


volume_mounts
E501
Pod volumes to mount into the container’s filesystem. Cannot be updated. # noqa: E501
Returns: The volume_mounts of this V1Container. # noqa: E501
Return type: list[V1VolumeMount]
Type: Gets the volume_mounts of this V1Container. # noqa


working_dir
E501
Container’s working directory. If not specified, the container runtime’s default will be used, which might be configured in the container image. Cannot be updated. # noqa: E501
Returns: The working_dir of this V1Container. # noqa: E501
Return type: str
Type: Gets the working_dir of this V1Container. # noqa

kfp.dsl.SubGraph

class kfp.dsl.SubGraph(parallelism: int)[source]

Bases: kfp.dsl._ops_group.OpsGroup

TYPE_NAME = ‘subgraph’

after(*ops)

Specify explicit dependency on other ops.

remove_op_recursive(op)

class kfp.dsl.UserContainer(name: str, image: str, command: Union[str, List[str]] = None, args: Union[str, List[str]] = None, mirror_volume_mounts: bool = None, **kwargs)[source]

Bases: kfp.dsl._container_op.Container

Represents an argo workflow UserContainer (io.argoproj.workflow.v1alpha1.UserContainer) to be used in UserContainer property in argo’s workflow template (io.argoproj.workflow.v1alpha1.Template).

UserContainer inherits from Container class with an addition of mirror_volume_mounts attribute (mirrorVolumeMounts property).

See https://github.com/argoproj/argo-workflows/blob/master/api/openapi-spec/swagger.json

Parameters:
  • name – unique name for the user container
  • image – image to use for the user container, e.g. redis:alpine
  • command – entrypoint array. Not executed within a shell.
  • args – arguments to the entrypoint.
  • mirror_volume_mounts – MirrorVolumeMounts will mount the same volumes specified in the main container to the container (including artifacts), at the same mountPaths. This enables dind daemon to partially see the same filesystem as the main container in order to use features such as docker volume binding
  • **kwargs – keyword arguments available for Container

swagger_types

The key is attribute name and the value is attribute type.

Type:dict

Example

1
2
3
4
from kfp.dsl import ContainerOp, UserContainer
# creates a `ContainerOp` and adds a redis init container
op = (ContainerOp(name='foo-op', image='busybox:latest')
.add_initContainer(UserContainer(name='redis', image='redis:alpine')))

add_env_from(env_from) → kfp.dsl._container_op.Container

Add a source to populate environment variables int the container.

Parameters:
  • env_from – Kubernetes environment from source For detailed spec, check environment from source definition
  • https – //github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_env_var_source.py

add_env_variable(env_variable) → kfp.dsl._container_op.Container

Add environment variable to the container.

Parameters:
  • env_variable – Kubernetes environment variable For detailed spec, check environment variable definition
  • https – //github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_env_var.py

add_port(container_port) → kfp.dsl._container_op.Container

Add a container port to the container.

Parameters:
  • container_port – Kubernetes container port For detailed spec, check container port definition
  • https – //github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_container_port.py

add_resource_limit(resource_name, value) → kfp.dsl._container_op.Container

Add the resource limit of the container.

Parameters:
  • resource_name – The name of the resource. It can be cpu, memory, etc.
  • value – The string value of the limit.

add_resource_request(resource_name, value) → kfp.dsl._container_op.Container

Add the resource request of the container.

Parameters:
  • resource_name – The name of the resource. It can be cpu, memory, etc.
  • value – The string value of the request.

add_volume_devices(volume_device) → kfp.dsl._container_op.Container

Add a block device to be used by the container.

Parameters:
  • volume_device – Kubernetes volume device For detailed spec, volume device definition
  • https – //github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_volume_device.py

add_volume_mount(volume_mount) → kfp.dsl._container_op.Container

Add volume to the container.

Parameters:
  • volume_mount – Kubernetes volume mount For detailed spec, check volume mount definition
  • https – //github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_volume_mount.py

args

E501

Arguments to the entrypoint. The docker image’s CMD is used if this is not provided. Variable references $(VAR_NAME) are expanded using the container’s environment. If a variable cannot be resolved, the reference in the input string will be unchanged. The $(VAR_NAME) syntax can be escaped with a double $$, ie: $$(VAR_NAME). Escaped references will never be expanded, regardless of whether the variable exists or not. Cannot be updated. More info: https://kubernetes.io/docs/tasks/inject-data-application/define-command-argument-container/#running-a-command-in-a-shell # noqa: E501

Returns:The args of this V1Container. # noqa: E501
Return type:list[str]
Type:Gets the args of this V1Container. # noqa

attribute_map = {‘args’: ‘args’, ‘command’: ‘command’, ‘env’: ‘env’, ‘env_from’: ‘envFrom’, ‘image’: ‘image’, ‘image_pull_policy’: ‘imagePullPolicy’, ‘lifecycle’: ‘lifecycle’, ‘liveness_probe’: ‘livenessProbe’, ‘mirror_volume_mounts’: ‘mirrorVolumeMounts’, ‘name’: ‘name’, ‘ports’: ‘ports’, ‘readiness_probe’: ‘readinessProbe’, ‘resources’: ‘resources’, ‘security_context’: ‘securityContext’, ‘startup_probe’: ‘startupProbe’, ‘stdin’: ‘stdin’, ‘stdin_once’: ‘stdinOnce’, ‘termination_message_path’: ‘terminationMessagePath’, ‘termination_message_policy’: ‘terminationMessagePolicy’, ‘tty’: ‘tty’, ‘volume_devices’: ‘volumeDevices’, ‘volume_mounts’: ‘volumeMounts’, ‘working_dir’: ‘workingDir’}

command

E501

Entrypoint array. Not executed within a shell. The docker image’s ENTRYPOINT is used if this is not provided. Variable references $(VAR_NAME) are expanded using the container’s environment. If a variable cannot be resolved, the reference in the input string will be unchanged. The $(VAR_NAME) syntax can be escaped with a double $$, ie: $$(VAR_NAME). Escaped references will never be expanded, regardless of whether the variable exists or not. Cannot be updated. More info: https://kubernetes.io/docs/tasks/inject-data-application/define-command-argument-container/#running-a-command-in-a-shell # noqa: E501

Returns:The command of this V1Container. # noqa: E501
Return type:list[str]
Type:Gets the command of this V1Container. # noqa

env

E501

List of environment variables to set in the container. Cannot be updated. # noqa: E501

Returns:The env of this V1Container. # noqa: E501
Return type:list[V1EnvVar]
Type:Gets the env of this V1Container. # noqa

env_from

E501

List of sources to populate environment variables in the container. The keys defined within a source must be a C_IDENTIFIER. All invalid keys will be reported as an event when the container is starting. When a key exists in multiple sources, the value associated with the last source will take precedence. Values defined by an Env with a duplicate key will take precedence. Cannot be updated. # noqa: E501

Returns:The env_from of this V1Container. # noqa: E501
Return type:list[V1EnvFromSource]
Type:Gets the env_from of this V1Container. # noqa

get_resource_limit(resource_name: str) → Optional[str]

Get the resource limit of the container.

Parameters:resource_name – The name of the resource. It can be cpu, memory, etc.

get_resource_request(resource_name: str) → Optional[str]

Get the resource request of the container.

Parameters:resource_name – The name of the resource. It can be cpu, memory, etc.

image

E501

Docker image name. More info: https://kubernetes.io/docs/concepts/containers/images This field is optional to allow higher level config management to default or override container images in workload controllers like Deployments and StatefulSets. # noqa: E501

Returns:The image of this V1Container. # noqa: E501
Return type:str
Type:Gets the image of this V1Container. # noqa

image_pull_policy

E501

Image pull policy. One of Always, Never, IfNotPresent. Defaults to Always if :latest tag is specified, or IfNotPresent otherwise. Cannot be updated. More info: https://kubernetes.io/docs/concepts/containers/images#updating-images # noqa: E501

Returns:The image_pull_policy of this V1Container. # noqa: E501
Return type:str
Type:Gets the image_pull_policy of this V1Container. # noqa

inputs

A list of PipelineParam found in the UserContainer object.

lifecycle

E501

Returns:The lifecycle of this V1Container. # noqa: E501
Return type:V1Lifecycle
Type:Gets the lifecycle of this V1Container. # noqa

liveness_probe

E501

Returns:The liveness_probe of this V1Container. # noqa: E501
Return type:V1Probe
Type:Gets the liveness_probe of this V1Container. # noqa

name

E501

Name of the container specified as a DNS_LABEL. Each container in a pod must have a unique name (DNS_LABEL). Cannot be updated. # noqa: E501

Returns:The name of this V1Container. # noqa: E501
Return type:str
Type:Gets the name of this V1Container. # noqa

openapi_types = {‘args’: ‘list[str]‘, ‘command’: ‘list[str]‘, ‘env’: ‘list[V1EnvVar]‘, ‘env_from’: ‘list[V1EnvFromSource]‘, ‘image’: ‘str’, ‘image_pull_policy’: ‘str’, ‘lifecycle’: ‘V1Lifecycle’, ‘liveness_probe’: ‘V1Probe’, ‘mirror_volume_mounts’: ‘bool’, ‘name’: ‘str’, ‘ports’: ‘list[V1ContainerPort]‘, ‘readiness_probe’: ‘V1Probe’, ‘resources’: ‘V1ResourceRequirements’, ‘security_context’: ‘V1SecurityContext’, ‘startup_probe’: ‘V1Probe’, ‘stdin’: ‘bool’, ‘stdin_once’: ‘bool’, ‘termination_message_path’: ‘str’, ‘termination_message_policy’: ‘str’, ‘tty’: ‘bool’, ‘volume_devices’: ‘list[V1VolumeDevice]‘, ‘volume_mounts’: ‘list[V1VolumeMount]‘, ‘working_dir’: ‘str’}

ports

E501

List of ports to expose from the container. Exposing a port here gives the system additional information about the network connections a container uses, but is primarily informational. Not specifying a port here DOES NOT prevent that port from being exposed. Any port which is listening on the default “0.0.0.0” address inside a container will be accessible from the network. Cannot be updated. # noqa: E501

Returns:The ports of this V1Container. # noqa: E501
Return type:list[V1ContainerPort]
Type:Gets the ports of this V1Container. # noqa

readiness_probe

E501

Returns:The readiness_probe of this V1Container. # noqa: E501
Return type:V1Probe
Type:Gets the readiness_probe of this V1Container. # noqa

resources

E501

Returns:The resources of this V1Container. # noqa: E501
Return type:V1ResourceRequirements
Type:Gets the resources of this V1Container. # noqa

security_context

E501

Returns:The security_context of this V1Container. # noqa: E501
Return type:V1SecurityContext
Type:Gets the security_context of this V1Container. # noqa

set_cpu_limit(cpu: Union[str, kfp.dsl._pipeline_param.PipelineParam]) → kfp.dsl._container_op.Container

Set cpu limit (maximum) for this operator.

Parameters:cpu (Union[str, PipelineParam]) – A string which can be a number or a number followed by “m”, which means 1/1000.

set_cpu_request(cpu: Union[str, kfp.dsl._pipeline_param.PipelineParam]) → kfp.dsl._container_op.Container

Set cpu request (minimum) for this operator.

Parameters:cpu (Union[str, PipelineParam]) – A string which can be a number or a number followed by “m”, which means 1/1000.

set_env_variable(name: str, value: str) → kfp.dsl._container_op.Container

Sets environment variable to the container (v2 only).

Parameters:
  • name – The name of the environment variable.
  • value – The value of the environment variable.

set_ephemeral_storage_limit(size) → kfp.dsl._container_op.Container

Set ephemeral-storage request (maximum) for this operator.

Parameters:size – a string which can be a number or a number followed by one of “E”, “P”, “T”, “G”, “M”, “K”.

set_ephemeral_storage_request(size) → kfp.dsl._container_op.Container

Set ephemeral-storage request (minimum) for this operator.

Parameters:size – a string which can be a number or a number followed by one of “E”, “P”, “T”, “G”, “M”, “K”.

set_gpu_limit(gpu: Union[str, kfp.dsl._pipeline_param.PipelineParam], vendor: Union[str, kfp.dsl._pipeline_param.PipelineParam] = ‘nvidia’) → kfp.dsl._container_op.Container

Set gpu limit for the operator.

This function add ‘.com/gpu’ into resource limit. Note that there is no need to add GPU request. GPUs are only supposed to be specified in the limits section. See https://kubernetes.io/docs/tasks/manage-gpus/scheduling-gpus/.

Parameters:
  • gpu (Union[str, PipelineParam]) – A string which must be a positive number.
  • vendor (Union[str, PipelineParam]) – Optional. A string which is the vendor of the requested gpu. The supported values are: ‘nvidia’ (default), and ‘amd’. The value is ignored in v2.

set_image_pull_policy(image_pull_policy) → kfp.dsl._container_op.Container

Set image pull policy for the container.

Parameters:image_pull_policy – One of Always, Never, IfNotPresent.

set_lifecycle(lifecycle) → kfp.dsl._container_op.Container

Setup a lifecycle config for the container.

Parameters:
  • lifecycle – Kubernetes lifecycle For detailed spec, lifecycle definition
  • https – //github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_lifecycle.py

set_liveness_probe(liveness_probe) → kfp.dsl._container_op.Container

Set a liveness probe for the container.

Parameters:
  • liveness_probe – Kubernetes liveness probe For detailed spec, check probe definition
  • https – //github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_probe.py

set_memory_limit(memory: Union[str, kfp.dsl._pipeline_param.PipelineParam]) → kfp.dsl._container_op.Container

Set memory limit (maximum) for this operator.

Parameters:memory (Union[str, PipelineParam]) – a string which can be a number or a number followed by one of “E”, “P”, “T”, “G”, “M”, “K”.

set_memory_request(memory: Union[str, kfp.dsl._pipeline_param.PipelineParam]) → kfp.dsl._container_op.Container

Set memory request (minimum) for this operator.

Parameters:memory (Union[str, PipelineParam]) – a string which can be a number or a number followed by one of “E”, “P”, “T”, “G”, “M”, “K”.

set_mirror_volume_mounts(mirror_volume_mounts=True)[source]

Setting mirrorVolumeMounts to true will mount the same volumes specified in the main container to the container (including artifacts), at the same mountPaths. This enables dind daemon to partially see the same filesystem as the main container in order to use features such as docker volume binding.

Parameters:mirror_volume_mounts – boolean flag

set_readiness_probe(readiness_probe) → kfp.dsl._container_op.Container

Set a readiness probe for the container.

Parameters:
  • readiness_probe – Kubernetes readiness probe For detailed spec, check probe definition
  • https – //github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_probe.py

set_security_context(security_context) → kfp.dsl._container_op.Container

Set security configuration to be applied on the container.

Parameters:
  • security_context – Kubernetes security context For detailed spec, check security context definition
  • https – //github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_security_context.py

set_stdin(stdin=True) → kfp.dsl._container_op.Container

Whether this container should allocate a buffer for stdin in the container runtime. If this is not set, reads from stdin in the container will always result in EOF.

Parameters:stdin – boolean flag

set_stdin_once(stdin_once=True) → kfp.dsl._container_op.Container

Whether the container runtime should close the stdin channel after it has been opened by a single attach. When stdin is true the stdin stream will remain open across multiple attach sessions. If stdinOnce is set to true, stdin is opened on container start, is empty until the first client attaches to stdin, and then remains open and accepts data until the client disconnects, at which time stdin is closed and remains closed until the container is restarted. If this flag is false, a container processes that reads from stdin will never receive an EOF.

Parameters:stdin_once – boolean flag

set_termination_message_path(termination_message_path) → kfp.dsl._container_op.Container

Path at which the file to which the container’s termination message will be written is mounted into the container’s filesystem. Message written is intended to be brief final status, such as an assertion failure message. Will be truncated by the node if greater than 4096 bytes. The total message length across all containers will be limited to 12kb.

Parameters:termination_message_path – path for the termination message

set_termination_message_policy(termination_message_policy) → kfp.dsl._container_op.Container

Indicate how the termination message should be populated. File will use the contents of terminationMessagePath to populate the container status message on both success and failure. FallbackToLogsOnError will use the last chunk of container log output if the termination message file is empty and the container exited with an error. The log output is limited to 2048 bytes or 80 lines, whichever is smaller.

Parameters:termination_message_policyFile or FallbackToLogsOnError

set_tty(tty: bool = True) → kfp.dsl._container_op.Container

Whether this container should allocate a TTY for itself, also requires ‘stdin’ to be true.

Parameters:tty – boolean flag

startup_probe

E501

Returns:The startup_probe of this V1Container. # noqa: E501
Return type:V1Probe
Type:Gets the startup_probe of this V1Container. # noqa

stdin

E501

Whether this container should allocate a buffer for stdin in the container runtime. If this is not set, reads from stdin in the container will always result in EOF. Default is false. # noqa: E501

Returns:The stdin of this V1Container. # noqa: E501
Return type:bool
Type:Gets the stdin of this V1Container. # noqa

stdin_once

E501

Whether the container runtime should close the stdin channel after it has been opened by a single attach. When stdin is true the stdin stream will remain open across multiple attach sessions. If stdinOnce is set to true, stdin is opened on container start, is empty until the first client attaches to stdin, and then remains open and accepts data until the client disconnects, at which time stdin is closed and remains closed until the container is restarted. If this flag is false, a container processes that reads from stdin will never receive an EOF. Default is false # noqa: E501

Returns:The stdin_once of this V1Container. # noqa: E501
Return type:bool
Type:Gets the stdin_once of this V1Container. # noqa

termination_message_path

E501

Optional: Path at which the file to which the container’s termination message will be written is mounted into the container’s filesystem. Message written is intended to be brief final status, such as an assertion failure message. Will be truncated by the node if greater than 4096 bytes. The total message length across all containers will be limited to 12kb. Defaults to /dev/termination-log. Cannot be updated. # noqa: E501

Returns:The termination_message_path of this V1Container. # noqa: E501
Return type:str
Type:Gets the termination_message_path of this V1Container. # noqa

termination_message_policy

E501

Indicate how the termination message should be populated. File will use the contents of terminationMessagePath to populate the container status message on both success and failure. FallbackToLogsOnError will use the last chunk of container log output if the termination message file is empty and the container exited with an error. The log output is limited to 2048 bytes or 80 lines, whichever is smaller. Defaults to File. Cannot be updated. # noqa: E501

Returns:The termination_message_policy of this V1Container. # noqa: E501
Return type:str
Type:Gets the termination_message_policy of this V1Container. # noqa

to_dict()

Returns the model properties as a dict

to_str()

Returns the string representation of the model

tty

E501

Whether this container should allocate a TTY for itself, also requires ‘stdin’ to be true. Default is false. # noqa: E501

Returns:The tty of this V1Container. # noqa: E501
Return type:bool
Type:Gets the tty of this V1Container. # noqa

volume_devices

E501

volumeDevices is the list of block devices to be used by the container. # noqa: E501

Returns:The volume_devices of this V1Container. # noqa: E501
Return type:list[V1VolumeDevice]
Type:Gets the volume_devices of this V1Container. # noqa

volume_mounts

E501

Pod volumes to mount into the container’s filesystem. Cannot be updated. # noqa: E501

Returns:The volume_mounts of this V1Container. # noqa: E501
Return type:list[V1VolumeMount]
Type:Gets the volume_mounts of this V1Container. # noqa

working_dir

E501

Container’s working directory. If not specified, the container runtime’s default will be used, which might be configured in the container image. Cannot be updated. # noqa: E501

Returns:The working_dir of this V1Container. # noqa: E501
Return type:str
Type:Gets the working_dir of this V1Container. # noqa

kfp.dsl.VolumeOp 将转换为资源模板的op,该模板将创建PVC。

source

class kfp.dsl.VolumeOp(resource_name: str = None, size: str = None, storage_class: str = None, modes: List[str] = None, annotations: Dict[str, str] = None, data_source=None, volume_name=None, generate_unique_name: bool = True, **kwargs)

Bases: kfp.dsl._resource_op.ResourceOp

Represents an op which will be translated into a resource template which will be creating a PVC.

TODO(https://github.com/kubeflow/pipelines/issues/4822): Determine the

stability level of this feature.

Parameters:
  • resource_name – A desired name for the PVC which will be created
  • size – The size of the PVC which will be created
  • storage_class – The storage class to use for the dynamically created PVC
  • modes – The access modes for the PVC
  • annotations – Annotations to be patched in the PVC
  • data_source – May be a V1TypedLocalObjectReference, and then it is used in the data_source field of the PVC as is. Can also be a string/PipelineParam, and in that case it will be used as a VolumeSnapshot name (Alpha feature)
  • volume_name – VolumeName is the binding reference to the PersistentVolume backing this claim.
  • generate_unique_name – Generate unique name for the PVC
  • kwargs – See kfp.dsl.ResourceOp
Raises:

ValueError – if k8s_resource is provided along with other arguments if k8s_resource is not a V1PersistentVolumeClaim if size is None if size is an invalid memory string (when not a

PipelineParam)

if data_source is not one of (str, PipelineParam,

V1TypedLocalObjectReference)

add_affinity(affinity: kubernetes.client.models.v1_affinity.V1Affinity)

Add K8s Affinity.

Parameters:
  • affinity – Kubernetes affinity For detailed spec, check affinity definition
  • https – //github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_affinity.py

Example:

1
2
3
4
5
6
7
8
V1Affinity(
node_affinity=V1NodeAffinity(
required_during_scheduling_ignored_during_execution=V1NodeSelector(
node_selector_terms=[V1NodeSelectorTerm(
match_expressions=[V1NodeSelectorRequirement(
key='beta.kubernetes.io/instance-type',
operator='In',
values=['p2.xlarge'])])])))

add_init_container(init_container: kfp.dsl._container_op.UserContainer)

Add a init container to the Op.

Parameters:init_container – UserContainer object.

add_node_selector_constraint(label_name: Union[str, kfp.dsl._pipeline_param.PipelineParam], value: Union[str, kfp.dsl._pipeline_param.PipelineParam])

Add a constraint for nodeSelector.

Each constraint is a key-value pair label. For the container to be eligible to run on a node, the node must have each of the constraints appeared as labels.

Parameters:
  • label_name (Union[str, PipelineParam]) – The name of the constraint label.
  • value (Union[str, PipelineParam]) – The value of the constraint label.

add_pod_annotation(name: str, value: str)

Adds a pod’s metadata annotation.

Parameters:
  • name – The name of the annotation.
  • value – The value of the annotation.

add_pod_label(name: str, value: str)

Adds a pod’s metadata label.

Parameters:
  • name – The name of the label.
  • value – The value of the label.

add_sidecar(sidecar: kfp.dsl._container_op.Sidecar)

Add a sidecar to the Op.

Parameters:sidecar – SideCar object.

add_toleration(tolerations: kubernetes.client.models.v1_toleration.V1Toleration)

Add K8s tolerations.

Parameters:tolerations – Kubernetes toleration For detailed spec, check toleration definition https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_toleration.py

add_volume(volume)

Add K8s volume to the container.

Parameters:
  • volume – Kubernetes volumes For detailed spec, check volume definition
  • https – //github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_volume.py

after(*ops)

Specify explicit dependency on other ops.

apply(mod_func)

Applies a modifier function to self.

The function should return the passed object. This is needed to chain “extention methods” to this class.

Example:

1
2
3
4
5
6
7
from kfp.gcp import use_gcp_secret
task = (
train_op(...)
.set_memory_request('1G')
.apply(use_gcp_secret('user-gcp-sa'))
.set_memory_limit('2G')
)

attrs_with_pipelineparams = [‘node_selector’, ‘volumes’, ‘pod_annotations’, ‘pod_labels’, ‘num_retries’, ‘init_containers’, ‘sidecars’, ‘tolerations’]

delete(flags: Optional[List[str]] = None)

Returns a ResourceOp which deletes the resource.

inputs

List of PipelineParams that will be converted into input parameters (io.argoproj.workflow.v1alpha1.Inputs) for the argo workflow.

resource

Resource object that represents the resource property in io.argoproj.workflow.v1alpha1.Template.

set_caching_options(enable_caching: bool) → kfp.dsl._container_op.BaseOp

Sets caching options for the Op.

Parameters:enable_caching – Whether or not to enable caching for this task.
Returns:Self return to allow chained setting calls.

set_display_name(name: str)

set_retry(num_retries: int, policy: Optional[str] = None, backoff_duration: Optional[str] = None, backoff_factor: Optional[float] = None, backoff_max_duration: Optional[str] = None)

Sets the number of times the task is retried until it’s declared failed.

Parameters:
  • num_retries – Number of times to retry on failures.
  • policy – Retry policy name.
  • backoff_duration – The time interval between retries. Defaults to an immediate retry. In case you specify a simple number, the unit defaults to seconds. You can also specify a different unit, for instance, 2m (2 minutes), 1h (1 hour).
  • backoff_factor – The exponential backoff factor applied to backoff_duration. For example, if backoff_duration=”60” (60 seconds) and backoff_factor=2, the first retry will happen after 60 seconds, then after 120, 240, and so on.
  • backoff_max_duration – The maximum interval that can be reached with the backoff strategy.

set_timeout(seconds: int)

Sets the timeout for the task in seconds.

Parameters:seconds – Number of seconds.

class kfp.dsl.VolumeSnapshotOp(resource_name: str = None, pvc: str = None, snapshot_class: str = None, annotations: Dict[str, str] = None, volume: kubernetes.client.models.v1_volume.V1Volume = None, api_version: str = ‘snapshot.storage.k8s.io/v1alpha1’, **kwargs)[source]

Bases: kfp.dsl._resource_op.ResourceOp

Represents an op which will be translated into a resource template which will be creating a VolumeSnapshot.

TODO(https://github.com/kubeflow/pipelines/issues/4822): Determine the

stability level of this feature.

Parameters:
  • resource_name – A desired name for the VolumeSnapshot which will be created
  • pvc – The name of the PVC which will be snapshotted
  • snapshot_class – The snapshot class to use for the dynamically created VolumeSnapshot
  • annotations – Annotations to be patched in the VolumeSnapshot
  • volume – An instance of V1Volume
  • kwargs – See kfp.dsl.ResourceOp
Raises:

ValueError – if k8s_resource is provided along with other arguments if k8s_resource is not a VolumeSnapshot if pvc and volume are None if pvc and volume are not None if volume does not reference a PVC

add_affinity(affinity: kubernetes.client.models.v1_affinity.V1Affinity)

Add K8s Affinity.

Parameters:
  • affinity – Kubernetes affinity For detailed spec, check affinity definition
  • https – //github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_affinity.py

Example:

1
2
3
4
5
6
7
8
V1Affinity(
node_affinity=V1NodeAffinity(
required_during_scheduling_ignored_during_execution=V1NodeSelector(
node_selector_terms=[V1NodeSelectorTerm(
match_expressions=[V1NodeSelectorRequirement(
key='beta.kubernetes.io/instance-type',
operator='In',
values=['p2.xlarge'])])])))

add_init_container(init_container: kfp.dsl._container_op.UserContainer)

Add a init container to the Op.

Parameters:init_container – UserContainer object.

add_node_selector_constraint(label_name: Union[str, kfp.dsl._pipeline_param.PipelineParam], value: Union[str, kfp.dsl._pipeline_param.PipelineParam])

Add a constraint for nodeSelector.

Each constraint is a key-value pair label. For the container to be eligible to run on a node, the node must have each of the constraints appeared as labels.

Parameters:
  • label_name (Union[str, PipelineParam]) – The name of the constraint label.
  • value (Union[str, PipelineParam]) – The value of the constraint label.

add_pod_annotation(name: str, value: str)

Adds a pod’s metadata annotation.

Parameters:
  • name – The name of the annotation.
  • value – The value of the annotation.

add_pod_label(name: str, value: str)

Adds a pod’s metadata label.

Parameters:
  • name – The name of the label.
  • value – The value of the label.

add_sidecar(sidecar: kfp.dsl._container_op.Sidecar)

Add a sidecar to the Op.

Parameters:sidecar – SideCar object.

add_toleration(tolerations: kubernetes.client.models.v1_toleration.V1Toleration)

Add K8s tolerations.

Parameters:tolerations – Kubernetes toleration For detailed spec, check toleration definition https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_toleration.py

add_volume(volume)

Add K8s volume to the container.

Parameters:
  • volume – Kubernetes volumes For detailed spec, check volume definition
  • https – //github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_volume.py

after(*ops)

Specify explicit dependency on other ops.

apply(mod_func)

Applies a modifier function to self.

The function should return the passed object. This is needed to chain “extention methods” to this class.

Example:

1
2
3
4
5
6
7
from kfp.gcp import use_gcp_secret
task = (
train_op(...)
.set_memory_request('1G')
.apply(use_gcp_secret('user-gcp-sa'))
.set_memory_limit('2G')
)

attrs_with_pipelineparams = [‘node_selector’, ‘volumes’, ‘pod_annotations’, ‘pod_labels’, ‘num_retries’, ‘init_containers’, ‘sidecars’, ‘tolerations’]

delete(flags: Optional[List[str]] = None)

Returns a ResourceOp which deletes the resource.

inputs

List of PipelineParams that will be converted into input parameters (io.argoproj.workflow.v1alpha1.Inputs) for the argo workflow.

resource

Resource object that represents the resource property in io.argoproj.workflow.v1alpha1.Template.

set_caching_options(enable_caching: bool) → kfp.dsl._container_op.BaseOp

Sets caching options for the Op.

Parameters:enable_caching – Whether or not to enable caching for this task.
Returns:Self return to allow chained setting calls.

set_display_name(name: str)

set_retry(num_retries: int, policy: Optional[str] = None, backoff_duration: Optional[str] = None, backoff_factor: Optional[float] = None, backoff_max_duration: Optional[str] = None)

Sets the number of times the task is retried until it’s declared failed.

Parameters:
  • num_retries – Number of times to retry on failures.
  • policy – Retry policy name.
  • backoff_duration – The time interval between retries. Defaults to an immediate retry. In case you specify a simple number, the unit defaults to seconds. You can also specify a different unit, for instance, 2m (2 minutes), 1h (1 hour).
  • backoff_factor – The exponential backoff factor applied to backoff_duration. For example, if backoff_duration=”60” (60 seconds) and backoff_factor=2, the first retry will happen after 60 seconds, then after 120, 240, and so on.
  • backoff_max_duration – The maximum interval that can be reached with the backoff strategy.

set_timeout(seconds: int)

Sets the timeout for the task in seconds.

Parameters:seconds – Number of seconds.

kfp.dsl.component(func)[source]

Decorator for component functions that returns a ContainerOp.

This is useful to enable type checking in the DSL compiler.

Example

1
2
3
@dsl.component
def foobar(model: TFModel(), step: MLStep()):
return dsl.ContainerOp()

kfp.dsl.get_pipeline_conf()[source]

Configure the pipeline level setting to the current pipeline Note: call the function inside the user defined pipeline function.

kfp.dsl.graph_component(func)[source]

Decorator for graph component functions.

This decorator returns an ops_group.

Example

1
2
3
4
5
6
7
8
9
10
11
# Warning: caching is tricky when recursion is involved. Please be careful
# and set proper max_cache_staleness in case of infinite loop.
import kfp.dsl as dsl
@dsl.graph_component
def flip_component(flip_result):
print_flip = PrintOp(flip_result)
flipA = FlipCoinOp().after(print_flip)
flipA.execution_options.caching_strategy.max_cache_staleness = "P0D"
with dsl.Condition(flipA.output == 'heads'):
flip_component(flipA.output)
return {'flip_result': flipA.output}

kfp.dsl.importer(*args, **kwargs)[source]

kfp.dsl.pipeline(name: Optional[str] = None, description: Optional[str] = None, pipeline_root: Optional[str] = None)[source]

Decorator of pipeline functions.

Example

1
2
3
4
5
6
7
@pipeline(
name='my-pipeline',
description='My ML Pipeline.'
pipeline_root='gs://my-bucket/my-output-path'
)
def my_pipeline(a: PipelineParam, b: PipelineParam):
...
Parameters:
  • name – The pipeline name. Default to a sanitized version of the function name.
  • description – Optionally, a human-readable description of the pipeline.
  • pipeline_root – The root directory to generate input/output URI under this pipeline. This is required if input/output URI placeholder is used in this pipeline.

kfp.dsl.python_component(name, description=None, base_image=None, target_component_file: str = None)[source]

Decorator for Python component functions.

This decorator adds the metadata to the function object itself.

Args:

name: Human-readable name of the component description: Optional. Description of the component base_image: Optional. Docker container image to use as the base of the

component. Needs to have Python 3.5+ installed.

target_component_file: Optional. Local file to store the component

definition. The file can then be used for sharing.

Returns:

The same function (with some metadata fields set).

Example:

1
2
3
4
5
6
7
@dsl.python_component(
name='my awesome component',
description='Come, Let's play',
base_image='tensorflow/tensorflow:1.11.0-py3',
)
def my_component(a: str, b: int) -> str:
...

Deprecated since version 0.2.6: This decorator does not seem to be used, so we deprecate it. If you need this decorator, please create an issue at https://github.com/kubeflow/pipelines/issues


00-kubeflow pipeline dsl
https://flepeng.github.io/042-云原生-04-kubeflow-30-SDK-API-1-8-13-00-kubeflow-pipeline-dsl/
作者
Lepeng
发布于
2024年3月27日
许可协议