配置参考

网关服务器

class dask_gateway_server.app.DaskGateway(**kwargs: Any)

用于管理跨多个用户的 Dask 集群的网关

address c.DaskGateway.address = Unicode('')

私有 API 服务器应该侦听的地址。

应该采用 {hostname}:{port} 的形式

其中

  • hostname 设置要侦听的主机名。设置为 """0.0.0.0" 以侦听所有接口。

  • port 设置要侦听的端口。

默认为 127.0.0.1:0

authenticator_class c.DaskGateway.authenticator_class = Type('dask_gateway_server.auth.SimpleAuthenticator')

要使用的网关认证器类

backend_class c.DaskGateway.backend_class = Type('dask_gateway_server.backends.Backend')

要使用的网关后端类

config_file c.DaskGateway.config_file = Unicode('dask_gateway_config.py')

要加载的配置文件

log_datefmt c.DaskGateway.log_datefmt = Unicode('%Y-%m-%d %H:%M:%S')

日志格式化程序用于 %(asctime)s 的日期格式

log_format c.DaskGateway.log_format = Unicode('%(log_color)s[%(levelname)1.1s %(asctime)s.%(msecs).03d %(name)s]%(reset)s %(message)s')

日志格式模板

log_level c.DaskGateway.log_level = Enum('INFO')

按值或名称设置日志级别。

show_config c.DaskGateway.show_config = Bool(False)

将配置转储到标准输出,而不是启动应用程序

show_config_json c.DaskGateway.show_config_json = Bool(False)

将配置转储到标准输出(JSON 格式),而不是启动应用程序

认证

Kerberos 认证器

class dask_gateway_server.auth.KerberosAuthenticator(**kwargs: Any)

使用 Kerberos 的认证器

cache_max_age c.KerberosAuthenticator.cache_max_age = Int(300)

缓存认证信息的最大时间(秒)。

通过在请求之间缓存响应,有助于减轻后端认证服务的负载。在此时间后,用户需要重新认证才能发出额外请求(请注意,这对用户来说通常是透明的)。

cookie_name c.KerberosAuthenticator.cookie_name = Unicode('')

用于缓存认证信息的 cookie 名称。

keytab c.KerberosAuthenticator.keytab = Unicode('dask_gateway.keytab')

keytab 文件的路径

service_name c.KerberosAuthenticator.service_name = Unicode('HTTP')

服务的 Kerberos 主体名称。

这几乎总是 “HTTP”(默认值)

JupyterHub 认证器

class dask_gateway_server.auth.JupyterHubAuthenticator(**kwargs: Any)

使用 JupyterHub 执行认证的认证器

cache_max_age c.JupyterHubAuthenticator.cache_max_age = Int(300)

缓存认证信息的最大时间(秒)。

通过在请求之间缓存响应,有助于减轻后端认证服务的负载。在此时间后,用户需要重新认证才能发出额外请求(请注意,这对用户来说通常是透明的)。

cookie_name c.JupyterHubAuthenticator.cookie_name = Unicode('')

用于缓存认证信息的 cookie 名称。

jupyterhub_api_token c.JupyterHubAuthenticator.jupyterhub_api_token = Unicode('')

Dask Gateway 的 JupyterHub API 令牌,用于认证网关对 JupyterHub 的 API 请求。

默认情况下,这取决于 JUPYTERHUB_API_TOKEN 环境变量。

jupyterhub_api_url c.JupyterHubAuthenticator.jupyterhub_api_url = Unicode('')

JupyterHub 服务器的 API URL。

默认情况下,这取决于 JUPYTERHUB_API_URL 环境变量。

tls_ca c.JupyterHubAuthenticator.tls_ca = Unicode('')

TLS CA 文件的路径,用于验证对 JupyterHub 的 API 请求。

设置此项时,您还应该设置 tls_key 和 tls_cert。

tls_cert c.JupyterHubAuthenticator.tls_cert = Unicode('')

TLS 证书文件的路径,用于向 JupyterHub 发出 API 请求。

设置此项时,您还应该设置 tls_cert。

tls_key c.JupyterHubAuthenticator.tls_key = Unicode('')

TLS 密钥文件的路径,用于向 JupyterHub 发出 API 请求。

设置此项时,您还应该设置 tls_cert。

简单认证器

class dask_gateway_server.auth.SimpleAuthenticator(**kwargs: Any)

一个使用基本认证(Basic Auth)的简单认证器。

这是非常不安全的,仅用于测试!!!

cache_max_age c.SimpleAuthenticator.cache_max_age = Int(300)

缓存认证信息的最大时间(秒)。

通过在请求之间缓存响应,有助于减轻后端认证服务的负载。在此时间后,用户需要重新认证才能发出额外请求(请注意,这对用户来说通常是透明的)。

cookie_name c.SimpleAuthenticator.cookie_name = Unicode('')

用于缓存认证信息的 cookie 名称。

password c.SimpleAuthenticator.password = Unicode(None)

如果设置,所有用户必须提供的全局密码。

如果未设置(默认),则完全忽略密码字段。

集群后端

基类

ClusterConfig

class dask_gateway_server.backends.base.ClusterConfig(**kwargs: Any)

用于保存单个 Dask 集群配置的基类

adaptive_period c.ClusterConfig.adaptive_period = Float(3)

自适应伸缩检查之间的时间间隔(秒)。

较小的周期会降低响应集群负载变化时的伸缩延迟,但可能会导致网关服务器负载更高。

cluster_max_cores c.ClusterConfig.cluster_max_cores = Float(None)

此集群可用的最大核心数。

设置为 None 表示无核心限制(默认)。

cluster_max_memory c.ClusterConfig.cluster_max_memory = MemoryLimit(None)

此集群可用的最大内存量(字节)。允许使用以下后缀

  • K -> Kibibytes

  • M -> Mebibytes

  • G -> Gibibytes

  • T -> Tebibytes

设置为 None 表示无内存限制(默认)。

cluster_max_workers c.ClusterConfig.cluster_max_workers = Int(0)

此集群可用的最大 worker 数量。

请注意,这将在运行时与 cluster_max_corescluster_max_memory 结合使用,以确定此集群实际可用的最大 worker 数量。

environment c.ClusterConfig.environment = Dict()

为 worker 和 scheduler 进程设置的环境变量。

idle_timeout c.ClusterConfig.idle_timeout = Float(0)

空闲集群自动关闭前的时间间隔(秒)。

设置为 0(默认)表示无空闲超时。

scheduler_cmd c.ClusterConfig.scheduler_cmd = Command()

启动 Dask scheduler 的 shell 命令。

scheduler_cores c.ClusterConfig.scheduler_cores = Int(1)

Dask scheduler 可用的 CPU 核心数。

scheduler_memory c.ClusterConfig.scheduler_memory = MemoryLimit('2 G')

Dask scheduler 可用的字节数。允许使用以下后缀

  • K -> Kibibytes

  • M -> Mebibytes

  • G -> Gibibytes

  • T -> Tebibytes

worker_cmd c.ClusterConfig.worker_cmd = Command()

启动 Dask worker 的 shell 命令。

worker_cores c.ClusterConfig.worker_cores = Int(1)

Dask worker 可用的 CPU 核心数。

worker_memory c.ClusterConfig.worker_memory = MemoryLimit('2 G')

Dask worker 可用的字节数。允许使用以下后缀

  • K -> Kibibytes

  • M -> Mebibytes

  • G -> Gibibytes

  • T -> Tebibytes

worker_threads c.ClusterConfig.worker_threads = Int(0)

Dask worker 可用的线程数。

默认为 worker_cores

后端

class dask_gateway_server.backends.base.Backend(**kwargs: Any)

定义 dask-gateway 后端的基类。

子类应该实现以下方法

  • setup

  • cleanup

  • start_cluster

  • stop_cluster

  • on_cluster_heartbeat

api_url c.Backend.api_url = Unicode('')

内部组件(例如 Dask 集群)联系网关时将使用的地址。

cluster_config_class c.Backend.cluster_config_class = Type('dask_gateway_server.backends.base.ClusterConfig')

要使用的集群配置类

cluster_options c.Backend.cluster_options = Union()

配置单个集群的用户选项。

允许用户在创建新集群时指定配置覆盖。有关更多信息,请参阅文档

公开集群选项.

本地进程

LocalClusterConfig

class dask_gateway_server.backends.local.LocalClusterConfig(**kwargs: Any)

以本地进程运行时 Dask 集群的配置选项

adaptive_period c.LocalClusterConfig.adaptive_period = Float(3)

自适应伸缩检查之间的时间间隔(秒)。

较小的周期会降低响应集群负载变化时的伸缩延迟,但可能会导致网关服务器负载更高。

cluster_max_cores c.LocalClusterConfig.cluster_max_cores = Float(None)

此集群可用的最大核心数。

设置为 None 表示无核心限制(默认)。

cluster_max_memory c.LocalClusterConfig.cluster_max_memory = MemoryLimit(None)

此集群可用的最大内存量(字节)。允许使用以下后缀

  • K -> Kibibytes

  • M -> Mebibytes

  • G -> Gibibytes

  • T -> Tebibytes

设置为 None 表示无内存限制(默认)。

cluster_max_workers c.LocalClusterConfig.cluster_max_workers = Int(0)

此集群可用的最大 worker 数量。

请注意,这将在运行时与 cluster_max_corescluster_max_memory 结合使用,以确定此集群实际可用的最大 worker 数量。

environment c.LocalClusterConfig.environment = Dict()

为 worker 和 scheduler 进程设置的环境变量。

idle_timeout c.LocalClusterConfig.idle_timeout = Float(0)

空闲集群自动关闭前的时间间隔(秒)。

设置为 0(默认)表示无空闲超时。

scheduler_cmd c.LocalClusterConfig.scheduler_cmd = Command()

启动 Dask scheduler 的 shell 命令。

scheduler_cores c.LocalClusterConfig.scheduler_cores = Int(1)

Dask scheduler 可用的 CPU 核心数。

scheduler_memory c.LocalClusterConfig.scheduler_memory = MemoryLimit('2 G')

Dask scheduler 可用的字节数。允许使用以下后缀

  • K -> Kibibytes

  • M -> Mebibytes

  • G -> Gibibytes

  • T -> Tebibytes

worker_cmd c.LocalClusterConfig.worker_cmd = Command()

启动 Dask worker 的 shell 命令。

worker_cores c.LocalClusterConfig.worker_cores = Int(1)

Dask worker 可用的 CPU 核心数。

worker_memory c.LocalClusterConfig.worker_memory = MemoryLimit('2 G')

Dask worker 可用的字节数。允许使用以下后缀

  • K -> Kibibytes

  • M -> Mebibytes

  • G -> Gibibytes

  • T -> Tebibytes

worker_threads c.LocalClusterConfig.worker_threads = Int(0)

Dask worker 可用的线程数。

默认为 worker_cores

本地后端

class dask_gateway_server.backends.local.LocalBackend(**kwargs: Any)

启动本地进程的集群后端。

需要超级用户权限才能为请求用户名运行进程。

api_url c.LocalBackend.api_url = Unicode('')

内部组件(例如 Dask 集群)联系网关时将使用的地址。

默认为 {proxy_address}/{prefix}/api,如果应使用不同地址,则手动设置。

backoff_base_delay c.LocalBackend.backoff_base_delay = Float(0.1)

失败后重试时的退避基础延迟(秒)。

如果操作失败,将在计算出的退避时间后重试,计算方法如下

` min(backoff_max_delay, backoff_base_delay * 2 ** num_failures) `

backoff_max_delay c.LocalBackend.backoff_max_delay = Float(300)

失败后重试时的退避策略最大延迟(秒)。

check_timeouts_period c.LocalBackend.check_timeouts_period = Float(0.0)

超时检查之间的时间间隔(秒)。

这不应该太小(以保持开销较低),但应该小于 cluster_heartbeat_timeoutcluster_start_timeoutworker_start_timeout

cluster_config_class c.LocalBackend.cluster_config_class = Type('dask_gateway_server.backends.local.LocalClusterConfig')

要使用的集群配置类

cluster_heartbeat_period c.LocalBackend.cluster_heartbeat_period = Int(15)

集群到网关的心跳之间的时间间隔(秒)。

较小的周期会更快检测到失败的 worker,但会占用更多资源。较大的周期在出现故障时提供更慢的反馈。

cluster_heartbeat_timeout c.LocalBackend.cluster_heartbeat_timeout = Float(0.0)

在杀死未能发送心跳的 Dask 集群之前的超时时间(秒)。

这应该大于 cluster_heartbeat_period。默认为 2 * cluster_heartbeat_period

cluster_options c.LocalBackend.cluster_options = Union()

配置单个集群的用户选项。

允许用户在创建新集群时指定配置覆盖。有关更多信息,请参阅文档

公开集群选项.

cluster_start_timeout c.LocalBackend.cluster_start_timeout = Float(60)

放弃启动 Dask 集群之前的超时时间(秒)。

cluster_status_period c.LocalBackend.cluster_status_period = Float(30)

集群状态检查之间的时间间隔(秒)。

较小的周期会更快检测到失败的集群,但会占用更多资源。较大的周期在出现故障时提供更慢的反馈。

clusters_directory c.LocalBackend.clusters_directory = Unicode('')

集群工作目录的根目录。

为每个新集群创建一个子目录,该子目录将作为该集群的工作目录。集群关闭时,将移除该子目录。

如果未指定,将为每个集群使用一个临时目录。

db_cleanup_period c.LocalBackend.db_cleanup_period = Float(600)

数据库清理任务之间的时间间隔(秒)。

这设置了从数据库中移除旧记录的频率。这不应该太小(以保持开销较低),但应该小于 db_cluster_max_age(可能差一个数量级)。

db_cluster_max_age c.LocalBackend.db_cluster_max_age = Float(86400)

保留已完成集群记录的最大时间(秒)。

每过 db_cleanup_period,将从数据库中移除早于 db_cluster_max_age 的已完成集群。

db_debug c.LocalBackend.db_debug = Bool(False)

如果为 True,将记录所有数据库操作

db_encrypt_keys c.LocalBackend.db_encrypt_keys = List()

用于加密数据库中私有数据的密钥列表。也可以通过环境变量 DASK_GATEWAY_ENCRYPT_KEYS 设置,其值为以 ; 分隔的加密密钥字符串。

每个密钥应该是 base64 编码的 32 字节值,并且应该是密码学安全的随机值。如果没有其他选项,可以使用 openssl 通过以下命令生成单个密钥

$ openssl rand -base64 32

单个密钥有效,可以使用多个密钥来支持密钥轮换。

db_url c.LocalBackend.db_url = Unicode('sqlite:///:memory:')

数据库的 URL。默认值仅为内存中的数据库。

如果不在内存中,还必须设置 db_encrypt_keys

inherited_environment c.LocalBackend.inherited_environment = List()

scheduler 和 worker 进程从 Dask-Gateway 进程继承的环境变量白名单。

parallelism c.LocalBackend.parallelism = Int(20)

用于启动/停止集群的处理程序数量。

sigint_timeout c.LocalBackend.sigint_timeout = Int(10)

SIGINT 后等待进程停止的秒数。

在此时间后进程仍未停止,则发送 SIGTERM。

sigkill_timeout c.LocalBackend.sigkill_timeout = Int(5)

SIGKILL 后等待进程停止的秒数。

如果在此时间后进程仍未停止,则会记录警告,并将该进程视为僵尸进程。

sigterm_timeout c.LocalBackend.sigterm_timeout = Int(5)

SIGTERM 后等待进程停止的秒数。

如果在此时间后进程仍未停止,则发送 SIGKILL。

stop_clusters_on_shutdown c.LocalBackend.stop_clusters_on_shutdown = Bool(True)

是否在网关关闭时停止活动集群。

如果为 true,所有活动集群将在关闭网关前停止。设置为 False 将保留活动集群运行。

worker_start_failure_limit c.LocalBackend.worker_start_failure_limit = Int(3)

在集群被标记为失败之前,启动 worker 的失败尝试次数限制。

每个启动失败的 worker(超时除外)都会增加计数器。如果 worker 成功启动,计数器将重置。如果计数器超出此限制,集群将被标记为失败并关闭。

worker_start_timeout c.LocalBackend.worker_start_timeout = Float(60)

放弃启动 Dask worker 之前的超时时间(秒)。

worker_status_period c.LocalBackend.worker_status_period = Float(30)

worker 状态检查之间的时间间隔(秒)。

较小的周期会更快检测到失败的 worker,但会占用更多资源。较大的周期在出现故障时提供更慢的反馈。

不安全本地后端

class dask_gateway_server.backends.local.UnsafeLocalBackend(**kwargs: Any)

LocalBackend 的一个版本,不设置权限。

仅用于测试!这不提供用户隔离 - 集群运行的权限级别与网关相同。

api_url c.UnsafeLocalBackend.api_url = Unicode('')

内部组件(例如 Dask 集群)联系网关时将使用的地址。

默认为 {proxy_address}/{prefix}/api,如果应使用不同地址,则手动设置。

backoff_base_delay c.UnsafeLocalBackend.backoff_base_delay = Float(0.1)

失败后重试时的退避基础延迟(秒)。

如果操作失败,将在计算出的退避时间后重试,计算方法如下

` min(backoff_max_delay, backoff_base_delay * 2 ** num_failures) `

backoff_max_delay c.UnsafeLocalBackend.backoff_max_delay = Float(300)

失败后重试时的退避策略最大延迟(秒)。

check_timeouts_period c.UnsafeLocalBackend.check_timeouts_period = Float(0.0)

超时检查之间的时间间隔(秒)。

这不应该太小(以保持开销较低),但应该小于 cluster_heartbeat_timeoutcluster_start_timeoutworker_start_timeout

cluster_config_class c.UnsafeLocalBackend.cluster_config_class = Type('dask_gateway_server.backends.local.LocalClusterConfig')

要使用的集群配置类

cluster_heartbeat_period c.UnsafeLocalBackend.cluster_heartbeat_period = Int(15)

集群到网关的心跳之间的时间间隔(秒)。

较小的周期会更快检测到失败的 worker,但会占用更多资源。较大的周期在出现故障时提供更慢的反馈。

cluster_heartbeat_timeout c.UnsafeLocalBackend.cluster_heartbeat_timeout = Float(0.0)

在杀死未能发送心跳的 Dask 集群之前的超时时间(秒)。

这应该大于 cluster_heartbeat_period。默认为 2 * cluster_heartbeat_period

cluster_options c.UnsafeLocalBackend.cluster_options = Union()

配置单个集群的用户选项。

允许用户在创建新集群时指定配置覆盖。有关更多信息,请参阅文档

公开集群选项.

cluster_start_timeout c.UnsafeLocalBackend.cluster_start_timeout = Float(60)

放弃启动 Dask 集群之前的超时时间(秒)。

cluster_status_period c.UnsafeLocalBackend.cluster_status_period = Float(30)

集群状态检查之间的时间间隔(秒)。

较小的周期会更快检测到失败的集群,但会占用更多资源。较大的周期在出现故障时提供更慢的反馈。

clusters_directory c.UnsafeLocalBackend.clusters_directory = Unicode('')

集群工作目录的根目录。

为每个新集群创建一个子目录,该子目录将作为该集群的工作目录。集群关闭时,将移除该子目录。

如果未指定,将为每个集群使用一个临时目录。

db_cleanup_period c.UnsafeLocalBackend.db_cleanup_period = Float(600)

数据库清理任务之间的时间间隔(秒)。

这设置了从数据库中移除旧记录的频率。这不应该太小(以保持开销较低),但应该小于 db_cluster_max_age(可能差一个数量级)。

db_cluster_max_age c.UnsafeLocalBackend.db_cluster_max_age = Float(86400)

保留已完成集群记录的最大时间(秒)。

每过 db_cleanup_period,将从数据库中移除早于 db_cluster_max_age 的已完成集群。

db_debug c.UnsafeLocalBackend.db_debug = Bool(False)

如果为 True,将记录所有数据库操作

db_encrypt_keys c.UnsafeLocalBackend.db_encrypt_keys = List()

用于加密数据库中私有数据的密钥列表。也可以通过环境变量 DASK_GATEWAY_ENCRYPT_KEYS 设置,其值为以 ; 分隔的加密密钥字符串。

每个密钥应该是 base64 编码的 32 字节值,并且应该是密码学安全的随机值。如果没有其他选项,可以使用 openssl 通过以下命令生成单个密钥

$ openssl rand -base64 32

单个密钥有效,可以使用多个密钥来支持密钥轮换。

db_url c.UnsafeLocalBackend.db_url = Unicode('sqlite:///:memory:')

数据库的 URL。默认值仅为内存中的数据库。

如果不在内存中,还必须设置 db_encrypt_keys

inherited_environment c.UnsafeLocalBackend.inherited_environment = List()

scheduler 和 worker 进程从 Dask-Gateway 进程继承的环境变量白名单。

parallelism c.UnsafeLocalBackend.parallelism = Int(20)

用于启动/停止集群的处理程序数量。

sigint_timeout c.UnsafeLocalBackend.sigint_timeout = Int(10)

SIGINT 后等待进程停止的秒数。

在此时间后进程仍未停止,则发送 SIGTERM。

sigkill_timeout c.UnsafeLocalBackend.sigkill_timeout = Int(5)

SIGKILL 后等待进程停止的秒数。

如果在此时间后进程仍未停止,则会记录警告,并将该进程视为僵尸进程。

sigterm_timeout c.UnsafeLocalBackend.sigterm_timeout = Int(5)

SIGTERM 后等待进程停止的秒数。

如果在此时间后进程仍未停止,则发送 SIGKILL。

stop_clusters_on_shutdown c.UnsafeLocalBackend.stop_clusters_on_shutdown = Bool(True)

是否在网关关闭时停止活动集群。

如果为 true,所有活动集群将在关闭网关前停止。设置为 False 将保留活动集群运行。

worker_start_failure_limit c.UnsafeLocalBackend.worker_start_failure_limit = Int(3)

在集群被标记为失败之前,启动 worker 的失败尝试次数限制。

每个启动失败的 worker(超时除外)都会增加计数器。如果 worker 成功启动,计数器将重置。如果计数器超出此限制,集群将被标记为失败并关闭。

worker_start_timeout c.UnsafeLocalBackend.worker_start_timeout = Float(60)

放弃启动 Dask worker 之前的超时时间(秒)。

worker_status_period c.UnsafeLocalBackend.worker_status_period = Float(30)

worker 状态检查之间的时间间隔(秒)。

较小的周期会更快检测到失败的 worker,但会占用更多资源。较大的周期在出现故障时提供更慢的反馈。

YARN

YarnClusterConfig

class dask_gateway_server.backends.yarn.YarnClusterConfig(**kwargs: Any)

在 Hadoop/YARN 上运行时 Dask 集群的配置选项

adaptive_period c.YarnClusterConfig.adaptive_period = Float(3)

自适应伸缩检查之间的时间间隔(秒)。

较小的周期会降低响应集群负载变化时的伸缩延迟,但可能会导致网关服务器负载更高。

cluster_max_cores c.YarnClusterConfig.cluster_max_cores = Float(None)

此集群可用的最大核心数。

设置为 None 表示无核心限制(默认)。

cluster_max_memory c.YarnClusterConfig.cluster_max_memory = MemoryLimit(None)

此集群可用的最大内存量(字节)。允许使用以下后缀

  • K -> Kibibytes

  • M -> Mebibytes

  • G -> Gibibytes

  • T -> Tebibytes

设置为 None 表示无内存限制(默认)。

cluster_max_workers c.YarnClusterConfig.cluster_max_workers = Int(0)

此集群可用的最大 worker 数量。

请注意,这将在运行时与 cluster_max_corescluster_max_memory 结合使用,以确定此集群实际可用的最大 worker 数量。

environment c.YarnClusterConfig.environment = Dict()

为 worker 和 scheduler 进程设置的环境变量。

idle_timeout c.YarnClusterConfig.idle_timeout = Float(0)

空闲集群自动关闭前的时间间隔(秒)。

设置为 0(默认)表示无空闲超时。

localize_files c.YarnClusterConfig.localize_files = Dict()

额外文件,将分发给 worker 和 scheduler 容器。

这是从 local-nameresource 的映射。资源路径可以是本地的,也可以在 HDFS 中(如果是,请以 hdfs://... 为前缀)。如果是归档文件(.tar.gz.zip),资源将被解压为目录 local-name。为了更精细的控制,资源也可以指定为 skein.File 对象,或其对应的 dict

这可用于通过配置以下项来分发 conda/虚拟环境

c.YarnClusterConfig.localize_files = {
    'environment': {
        'source': 'hdfs:///path/to/archived/environment.tar.gz',
        'visibility': 'public'
    }
}
c.YarnClusterConfig.scheduler_setup = 'source environment/bin/activate'
c.YarnClusterConfig.worker_setup = 'source environment/bin/activate'

这些归档文件通常使用 conda-packvenv-pack 创建。有关分发文件的更多信息,请参阅 https://jcristharif.com/skein/distributing-files.html

queue c.YarnClusterConfig.queue = Unicode('default')

在哪个 YARN 队列下提交应用程序

scheduler_cmd c.YarnClusterConfig.scheduler_cmd = Command()

启动 Dask scheduler 的 shell 命令。

scheduler_cores c.YarnClusterConfig.scheduler_cores = Int(1)

Dask scheduler 可用的 CPU 核心数。

scheduler_memory c.YarnClusterConfig.scheduler_memory = MemoryLimit('2 G')

Dask scheduler 可用的字节数。允许使用以下后缀

  • K -> Kibibytes

  • M -> Mebibytes

  • G -> Gibibytes

  • T -> Tebibytes

scheduler_setup c.YarnClusterConfig.scheduler_setup = Unicode('')

在 Dask scheduler 启动前运行的脚本。

worker_cmd c.YarnClusterConfig.worker_cmd = Command()

启动 Dask worker 的 shell 命令。

worker_cores c.YarnClusterConfig.worker_cores = Int(1)

Dask worker 可用的 CPU 核心数。

worker_memory c.YarnClusterConfig.worker_memory = MemoryLimit('2 G')

Dask worker 可用的字节数。允许使用以下后缀

  • K -> Kibibytes

  • M -> Mebibytes

  • G -> Gibibytes

  • T -> Tebibytes

worker_setup c.YarnClusterConfig.worker_setup = Unicode('')

在 Dask worker 启动前运行的脚本。

worker_threads c.YarnClusterConfig.worker_threads = Int(0)

Dask worker 可用的线程数。

默认为 worker_cores

Yarn 后端

class dask_gateway_server.backends.yarn.YarnBackend(**kwargs: Any)

用于管理 Hadoop/YARN 上 Dask 集群的集群后端。

api_url c.YarnBackend.api_url = Unicode('')

内部组件(例如 Dask 集群)联系网关时将使用的地址。

默认为 {proxy_address}/{prefix}/api,如果应使用不同地址,则手动设置。

app_client_cache_max_size c.YarnBackend.app_client_cache_max_size = Int(10)

应用程序客户端缓存的最大大小。

较大的缓存会带来更好的性能,但也会占用更多资源。

backoff_base_delay c.YarnBackend.backoff_base_delay = Float(0.1)

失败后重试时的退避基础延迟(秒)。

如果操作失败,将在计算出的退避时间后重试,计算方法如下

` min(backoff_max_delay, backoff_base_delay * 2 ** num_failures) `

backoff_max_delay c.YarnBackend.backoff_max_delay = Float(300)

失败后重试时的退避策略最大延迟(秒)。

check_timeouts_period c.YarnBackend.check_timeouts_period = Float(0.0)

超时检查之间的时间间隔(秒)。

这不应该太小(以保持开销较低),但应该小于 cluster_heartbeat_timeoutcluster_start_timeoutworker_start_timeout

cluster_config_class c.YarnBackend.cluster_config_class = Type('dask_gateway_server.backends.yarn.YarnClusterConfig')

要使用的集群配置类

cluster_heartbeat_period c.YarnBackend.cluster_heartbeat_period = Int(15)

集群到网关的心跳之间的时间间隔(秒)。

较小的周期会更快检测到失败的 worker,但会占用更多资源。较大的周期在出现故障时提供更慢的反馈。

cluster_heartbeat_timeout c.YarnBackend.cluster_heartbeat_timeout = Float(0.0)

在杀死未能发送心跳的 Dask 集群之前的超时时间(秒)。

这应该大于 cluster_heartbeat_period。默认为 2 * cluster_heartbeat_period

cluster_options c.YarnBackend.cluster_options = Union()

配置单个集群的用户选项。

允许用户在创建新集群时指定配置覆盖。有关更多信息,请参阅文档

公开集群选项.

cluster_start_timeout c.YarnBackend.cluster_start_timeout = Float(60)

放弃启动 Dask 集群之前的超时时间(秒)。

cluster_status_period c.YarnBackend.cluster_status_period = Float(30)

集群状态检查之间的时间间隔(秒)。

较小的周期会更快检测到失败的集群,但会占用更多资源。较大的周期在出现故障时提供更慢的反馈。

db_cleanup_period c.YarnBackend.db_cleanup_period = Float(600)

数据库清理任务之间的时间间隔(秒)。

这设置了从数据库中移除旧记录的频率。这不应该太小(以保持开销较低),但应该小于 db_cluster_max_age(可能差一个数量级)。

db_cluster_max_age c.YarnBackend.db_cluster_max_age = Float(86400)

保留已完成集群记录的最大时间(秒)。

每过 db_cleanup_period,将从数据库中移除早于 db_cluster_max_age 的已完成集群。

db_debug c.YarnBackend.db_debug = Bool(False)

如果为 True,将记录所有数据库操作

db_encrypt_keys c.YarnBackend.db_encrypt_keys = List()

用于加密数据库中私有数据的密钥列表。也可以通过环境变量 DASK_GATEWAY_ENCRYPT_KEYS 设置,其值为以 ; 分隔的加密密钥字符串。

每个密钥应该是 base64 编码的 32 字节值,并且应该是密码学安全的随机值。如果没有其他选项,可以使用 openssl 通过以下命令生成单个密钥

$ openssl rand -base64 32

单个密钥有效,可以使用多个密钥来支持密钥轮换。

db_url c.YarnBackend.db_url = Unicode('sqlite:///:memory:')

数据库的 URL。默认值仅为内存中的数据库。

如果不在内存中,还必须设置 db_encrypt_keys

keytab c.YarnBackend.keytab = Unicode(None)

Dask Gateway 用户 Kerberos keytab 的路径

parallelism c.YarnBackend.parallelism = Int(20)

用于启动/停止集群的处理程序数量。

principal c.YarnBackend.principal = Unicode(None)

Dask Gateway 用户的 Kerberos 主体

stop_clusters_on_shutdown c.YarnBackend.stop_clusters_on_shutdown = Bool(True)

是否在网关关闭时停止活动集群。

如果为 true,所有活动集群将在关闭网关前停止。设置为 False 将保留活动集群运行。

worker_start_failure_limit c.YarnBackend.worker_start_failure_limit = Int(3)

在集群被标记为失败之前,启动 worker 的失败尝试次数限制。

每个启动失败的 worker(超时除外)都会增加计数器。如果 worker 成功启动,计数器将重置。如果计数器超出此限制,集群将被标记为失败并关闭。

worker_start_timeout c.YarnBackend.worker_start_timeout = Float(60)

放弃启动 Dask worker 之前的超时时间(秒)。

worker_status_period c.YarnBackend.worker_status_period = Float(30)

worker 状态检查之间的时间间隔(秒)。

较小的周期会更快检测到失败的 worker,但会占用更多资源。较大的周期在出现故障时提供更慢的反馈。

Kubernetes

KubeClusterConfig

class dask_gateway_server.backends.kubernetes.KubeClusterConfig(**kwargs: Any)

在 Kubernetes 上运行单个 Dask 集群的配置

adaptive_period c.KubeClusterConfig.adaptive_period = Float(3)

自适应伸缩检查之间的时间间隔(秒)。

较小的周期会降低响应集群负载变化时的伸缩延迟,但可能会导致网关服务器负载更高。

cluster_max_cores c.KubeClusterConfig.cluster_max_cores = Float(None)

此集群可用的最大核心数。

设置为 None 表示无核心限制(默认)。

cluster_max_memory c.KubeClusterConfig.cluster_max_memory = MemoryLimit(None)

此集群可用的最大内存量(字节)。允许使用以下后缀

  • K -> Kibibytes

  • M -> Mebibytes

  • G -> Gibibytes

  • T -> Tebibytes

设置为 None 表示无内存限制(默认)。

cluster_max_workers c.KubeClusterConfig.cluster_max_workers = Int(0)

此集群可用的最大 worker 数量。

请注意,这将在运行时与 cluster_max_corescluster_max_memory 结合使用,以确定此集群实际可用的最大 worker 数量。

environment c.KubeClusterConfig.environment = Dict()

为 worker 和 scheduler 进程设置的环境变量。

idle_timeout c.KubeClusterConfig.idle_timeout = Float(0)

空闲集群自动关闭前的时间间隔(秒)。

设置为 0(默认)表示无空闲超时。

image c.KubeClusterConfig.image = Unicode('daskgateway/dask-gateway:latest')

用于运行用户容器的 Docker 镜像。

image_pull_policy c.KubeClusterConfig.image_pull_policy = Unicode(None)

image 中指定的 Docker 镜像的拉取策略

image_pull_secrets c.KubeClusterConfig.image_pull_secrets = List()

访问私有镜像仓库的镜像拉取密钥。

应该是一个字典列表,每个字典包含一个名为 name 的键,以匹配 k8s 原生语法。

namespace c.KubeClusterConfig.namespace = Unicode('default')

在哪个 Kubernetes namespace 中启动 Pod。

如果在启用了服务帐户的 Kubernetes 集群中运行,则默认为当前 namespace。如果不是,则默认为 default

scheduler_cmd c.KubeClusterConfig.scheduler_cmd = Command()

启动 Dask scheduler 的 shell 命令。

scheduler_cores c.KubeClusterConfig.scheduler_cores = Float(1)

Dask scheduler 可用的 CPU 核心数。

scheduler_cores_limit c.KubeClusterConfig.scheduler_cores_limit = Float(0.0)

Dask scheduler 可用的最大 CPU 核心数。

默认为 scheduler_cores

scheduler_extra_container_config c.KubeClusterConfig.scheduler_extra_container_config = Dict()

scheduler 容器的任何额外配置。

此字典将在提交前与 scheduler 容器(一个 V1Container 对象)进行深度合并。键应与 kubernetes 规范中的键匹配,并且应为 camelCase。

有关更多信息,请参阅 worker_extra_container_config

scheduler_extra_pod_annotations c.KubeClusterConfig.scheduler_extra_pod_annotations = Dict()

应用于用户 scheduler Pod 的任何额外注解。

这些注解可以使用集群选项(参阅公开集群选项)设置,以便注入用户特定信息,例如根据用户的组或用户名添加注解。

此字典将在应用于用户 Pod 之前与 common_annotations 合并。

scheduler_extra_pod_config c.KubeClusterConfig.scheduler_extra_pod_config = Dict()

scheduler Pod 的任何额外配置。

此字典将在提交前与 scheduler Pod 规范(一个 V1PodSpec 对象)进行深度合并。键应与 kubernetes 规范中的键匹配,并且应为 camelCase。

有关更多信息,请参阅 worker_extra_pod_config

scheduler_extra_pod_labels c.KubeClusterConfig.scheduler_extra_pod_labels = Dict()

应用于用户 scheduler Pod 的任何额外标签。

这些标签可以使用集群选项(参阅公开集群选项)设置,以便注入用户特定信息,例如根据用户的组或用户名添加标签。

此字典将在应用于用户 Pod 之前与 common_labels 合并。

scheduler_memory c.KubeClusterConfig.scheduler_memory = MemoryLimit('2 G')

Dask scheduler 可用的字节数。允许使用以下后缀

  • K -> Kibibytes

  • M -> Mebibytes

  • G -> Gibibytes

  • T -> Tebibytes

scheduler_memory_limit c.KubeClusterConfig.scheduler_memory_limit = MemoryLimit(0)

Dask scheduler 可用内存的最大字节数。允许使用以下后缀:

  • K -> Kibibytes

  • M -> Mebibytes

  • G -> Gibibytes

  • T -> Tebibytes

默认为 scheduler_memory

worker_cmd c.KubeClusterConfig.worker_cmd = Command()

启动 Dask worker 的 shell 命令。

worker_cores c.KubeClusterConfig.worker_cores = Float(1)

Dask worker 可用的 CPU 核心数。

worker_cores_limit c.KubeClusterConfig.worker_cores_limit = Float(0.0)

Dask worker 可用 CPU 核心数上限。

默认为 worker_cores

worker_extra_container_config c.KubeClusterConfig.worker_extra_container_config = Dict()

worker 容器的任何额外配置。

在提交之前,此字典将与 worker 容器(一个 V1Container 对象)进行深度合并。键应与 kubernetes 规范中的键匹配,并且应使用 camelCase 命名法。

例如,这里我们将 secret 中的环境变量添加到 worker 容器:

c.KubeClusterConfig.worker_extra_container_config = {
    "envFrom": [
        {"secretRef": {"name": "my-env-secret"}}
    ]
}
worker_extra_pod_annotations c.KubeClusterConfig.worker_extra_pod_annotations = Dict()

应用于用户 worker Pod 的任何额外注解 (annotations)。

这些注解可以使用集群选项(参阅公开集群选项)设置,以便注入用户特定信息,例如根据用户的组或用户名添加注解。

此字典将在应用于用户 Pod 之前与 common_annotations 合并。

worker_extra_pod_config c.KubeClusterConfig.worker_extra_pod_config = Dict()

worker Pod 的任何额外配置。

在提交之前,此字典将与 worker Pod 规范(一个 V1PodSpec 对象)进行深度合并。键应与 kubernetes 规范中的键匹配,并且应使用 camelCase 命名法。

例如,这里我们为 worker Pod 添加一个容忍度 (toleration)。

c.KubeClusterConfig.worker_extra_pod_config = {
    "tolerations": [
        {
            "key": "key",
            "operator": "Equal",
            "value": "value",
            "effect": "NoSchedule",
        }
    ]
}
worker_extra_pod_labels c.KubeClusterConfig.worker_extra_pod_labels = Dict()

应用于用户 worker Pod 的任何额外标签 (labels)。

这些标签可以使用集群选项(参阅公开集群选项)设置,以便注入用户特定信息,例如根据用户的组或用户名添加标签。

此字典将在应用于用户 Pod 之前与 common_labels 合并。

worker_memory c.KubeClusterConfig.worker_memory = MemoryLimit('2 G')

Dask worker 可用的字节数。允许使用以下后缀

  • K -> Kibibytes

  • M -> Mebibytes

  • G -> Gibibytes

  • T -> Tebibytes

worker_memory_limit c.KubeClusterConfig.worker_memory_limit = MemoryLimit(0)

Dask worker 可用内存的最大字节数。允许使用以下后缀:

  • K -> Kibibytes

  • M -> Mebibytes

  • G -> Gibibytes

  • T -> Tebibytes

默认为 worker_memory

worker_threads c.KubeClusterConfig.worker_threads = Int(0)

Dask worker 可用的线程数。

默认为 worker_cores

KubeBackend

class dask_gateway_server.backends.kubernetes.KubeBackend(**kwargs: Any)

用于在 Kubernetes 上运行 Dask Gateway 的后端。

api_url c.KubeBackend.api_url = Unicode('')

内部组件(例如 Dask 集群)联系网关时将使用的地址。

cluster_config_class c.KubeBackend.cluster_config_class = Type('dask_gateway_server.backends.kubernetes.backend.KubeClusterConfig')

要使用的集群配置类

cluster_options c.KubeBackend.cluster_options = Union()

配置单个集群的用户选项。

允许用户在创建新集群时指定配置覆盖。有关更多信息,请参阅文档

公开集群选项.

common_annotations c.KubeBackend.common_annotations = Dict()

应用于 gateway 创建的所有对象的 Kubernetes 注解 (annotations)。

common_labels c.KubeBackend.common_labels = Dict()

应用于 gateway 创建的所有对象的 Kubernetes 标签 (labels)。

crd_version c.KubeBackend.crd_version = Unicode('v1alpha1')

DaskCluster CRD 的版本。

gateway_instance c.KubeBackend.gateway_instance = Unicode('')

此 dask-gateway 实例的唯一 ID。

控制器也必须配置相同的 ID。

label_selector c.KubeBackend.label_selector = Unicode('')

监视由 gateway 管理的对象时使用的标签选择器 (label selector)。

KubeController

class dask_gateway_server.backends.kubernetes.controller.KubeController(**kwargs: Any)

Dask Gateway 的 Kubernetes 控制器。

address c.KubeController.address = Unicode(':8000')

服务器应监听的地址。

api_url c.KubeController.api_url = Unicode('')

内部组件(例如 Dask 集群)联系网关时将使用的地址。

backoff_base_delay c.KubeController.backoff_base_delay = Float(0.1)

失败后重试时的退避基础延迟(秒)。

如果操作失败,将在计算出的退避时间后重试,计算方法如下

` min(backoff_max_delay, backoff_base_delay * 2 ** num_failures) `

backoff_max_delay c.KubeController.backoff_max_delay = Float(300)

失败后重试时的退避策略最大延迟(秒)。

common_annotations c.KubeController.common_annotations = Dict()

应用于 gateway 创建的所有对象的 Kubernetes 注解 (annotations)。

common_labels c.KubeController.common_labels = Dict()

应用于 gateway 创建的所有对象的 Kubernetes 标签 (labels)。

completed_cluster_cleanup_period c.KubeController.completed_cluster_cleanup_period = Float(600)

清理任务之间的间隔时间(秒)。

这设置了从 kubernetes 删除旧集群记录的频率。这个值不应该太小(以保持低开销),但应该小于 completed_cluster_max_age(可能小一个数量级)。

completed_cluster_max_age c.KubeController.completed_cluster_max_age = Float(86400)

保留已完成集群记录的最大时间(秒)。

每隔 completed_cluster_cleanup_period 秒,将从 kubernetes 中删除超过 completed_cluster_max_age 时间的已完成集群。

config_file c.KubeController.config_file = Unicode('dask_gateway_config.py')

要加载的配置文件

crd_version c.KubeController.crd_version = Unicode('v1alpha1')

DaskCluster CRD 的版本。

gateway_instance c.KubeController.gateway_instance = Unicode('')

此 dask-gateway 实例的唯一 ID。

控制器也必须配置相同的 ID。

k8s_api_rate_limit c.KubeController.k8s_api_rate_limit = Int(50)

每秒平均 Kubernetes API 调用次数的限制。

k8s_api_rate_limit_burst c.KubeController.k8s_api_rate_limit_burst = Int(100)

每秒最大 Kubernetes API 调用次数的限制。

label_selector c.KubeController.label_selector = Unicode('')

监视由 gateway 管理的对象时使用的标签选择器 (label selector)。

log_datefmt c.KubeController.log_datefmt = Unicode('%Y-%m-%d %H:%M:%S')

日志格式化程序用于 %(asctime)s 的日期格式

log_format c.KubeController.log_format = Unicode('%(log_color)s[%(levelname)1.1s %(asctime)s.%(msecs).03d %(name)s]%(reset)s %(message)s')

日志格式模板

log_level c.KubeController.log_level = Enum('INFO')

按值或名称设置日志级别。

parallelism c.KubeController.parallelism = Int(20)

用于协调 (reconciling) Kubernetes 对象的处理器 (handler) 数量。

proxy_prefix c.KubeController.proxy_prefix = Unicode('')

HTTP/HTTPS 代理应提供服务的路径前缀。

此前缀将添加到所有注册到代理的路由之前。

proxy_tcp_entrypoint c.KubeController.proxy_tcp_entrypoint = Unicode('tcp')

创建 ingressroutetcps 时使用的 Traefik 入口点 (entrypoint) 名称。

proxy_web_entrypoint c.KubeController.proxy_web_entrypoint = Unicode('web')

创建 ingressroutes 时使用的 Traefik 入口点 (entrypoint) 名称。

proxy_web_middlewares c.KubeController.proxy_web_middlewares = List()

应用于添加到代理的 web 路由的中间件 (middlewares) 列表。

show_config c.KubeController.show_config = Bool(False)

将配置转储到标准输出,而不是启动应用程序

show_config_json c.KubeController.show_config_json = Bool(False)

将配置转储到标准输出(JSON 格式),而不是启动应用程序

作业队列 (Job Queues)

PBSClusterConfig

class dask_gateway_server.backends.jobqueue.pbs.PBSClusterConfig(**kwargs: Any)

在 PBS 上运行时 Dask 集群的配置选项。

account c.PBSClusterConfig.account = Unicode('')

与每个作业关联的记账字符串 (accounting string)。

adaptive_period c.PBSClusterConfig.adaptive_period = Float(3)

自适应伸缩检查之间的时间间隔(秒)。

较小的周期会降低响应集群负载变化时的伸缩延迟,但可能会导致网关服务器负载更高。

cluster_max_cores c.PBSClusterConfig.cluster_max_cores = Float(None)

此集群可用的最大核心数。

设置为 None 表示无核心限制(默认)。

cluster_max_memory c.PBSClusterConfig.cluster_max_memory = MemoryLimit(None)

此集群可用的最大内存量(字节)。允许使用以下后缀

  • K -> Kibibytes

  • M -> Mebibytes

  • G -> Gibibytes

  • T -> Tebibytes

设置为 None 表示无内存限制(默认)。

cluster_max_workers c.PBSClusterConfig.cluster_max_workers = Int(0)

此集群可用的最大 worker 数量。

请注意,这将在运行时与 cluster_max_corescluster_max_memory 结合使用,以确定此集群实际可用的最大 worker 数量。

environment c.PBSClusterConfig.environment = Dict()

为 worker 和 scheduler 进程设置的环境变量。

idle_timeout c.PBSClusterConfig.idle_timeout = Float(0)

空闲集群自动关闭前的时间间隔(秒)。

设置为 0(默认)表示无空闲超时。

project c.PBSClusterConfig.project = Unicode('')

与每个作业关联的项目。

queue c.PBSClusterConfig.queue = Unicode('')

提交作业的队列。

scheduler_cmd c.PBSClusterConfig.scheduler_cmd = Command()

启动 Dask scheduler 的 shell 命令。

scheduler_cores c.PBSClusterConfig.scheduler_cores = Int(1)

Dask scheduler 可用的 CPU 核心数。

scheduler_memory c.PBSClusterConfig.scheduler_memory = MemoryLimit('2 G')

Dask scheduler 可用的字节数。允许使用以下后缀

  • K -> Kibibytes

  • M -> Mebibytes

  • G -> Gibibytes

  • T -> Tebibytes

scheduler_resource_list c.PBSClusterConfig.scheduler_resource_list = Unicode('select=1:ncpus={cores}:mem={memory}')

Scheduler 要使用的资源列表。

这是一个模板,接收以下字段:

  • cores

  • memory

scheduler_setup c.PBSClusterConfig.scheduler_setup = Unicode('')

在 Dask scheduler 启动前运行的脚本。

staging_directory c.PBSClusterConfig.staging_directory = Unicode('{home}/.dask-gateway/')

作业开始前用于存储文件的暂存目录 (staging directory)。

每个新集群都会创建一个子目录,用于存储该集群的临时文件。集群关闭时,该子目录将被移除。

此字段可以是模板,接收以下字段:

  • home(用户的主目录)

  • username(用户名)

use_stagein c.PBSClusterConfig.use_stagein = Bool(True)

如果为 true,上面创建的暂存目录将在运行时使用 -Wstagein 指令复制到作业工作目录中。

如果暂存目录位于网络文件系统上,您可以将其设置为 False,并依靠网络文件系统进行访问。

worker_cmd c.PBSClusterConfig.worker_cmd = Command()

启动 Dask worker 的 shell 命令。

worker_cores c.PBSClusterConfig.worker_cores = Int(1)

Dask worker 可用的 CPU 核心数。

worker_memory c.PBSClusterConfig.worker_memory = MemoryLimit('2 G')

Dask worker 可用的字节数。允许使用以下后缀

  • K -> Kibibytes

  • M -> Mebibytes

  • G -> Gibibytes

  • T -> Tebibytes

worker_resource_list c.PBSClusterConfig.worker_resource_list = Unicode('select=1:ncpus={cores}:mem={memory}')

Workers 要使用的资源列表。

这是一个模板,接收以下字段:

  • cores

  • memory

worker_setup c.PBSClusterConfig.worker_setup = Unicode('')

在 Dask worker 启动前运行的脚本。

worker_threads c.PBSClusterConfig.worker_threads = Int(0)

Dask worker 可用的线程数。

默认为 worker_cores

PBSBackend

class dask_gateway_server.backends.jobqueue.pbs.PBSBackend(**kwargs: Any)

用于在 PBS 集群上部署 Dask 的后端。

api_url c.PBSBackend.api_url = Unicode('')

内部组件(例如 Dask 集群)联系网关时将使用的地址。

默认为 {proxy_address}/{prefix}/api,如果应使用不同地址,则手动设置。

backoff_base_delay c.PBSBackend.backoff_base_delay = Float(0.1)

失败后重试时的退避基础延迟(秒)。

如果操作失败,将在计算出的退避时间后重试,计算方法如下

` min(backoff_max_delay, backoff_base_delay * 2 ** num_failures) `

backoff_max_delay c.PBSBackend.backoff_max_delay = Float(300)

失败后重试时的退避策略最大延迟(秒)。

cancel_command c.PBSBackend.cancel_command = Unicode('')

作业取消命令的路径。

check_timeouts_period c.PBSBackend.check_timeouts_period = Float(0.0)

超时检查之间的时间间隔(秒)。

这不应该太小(以保持开销较低),但应该小于 cluster_heartbeat_timeoutcluster_start_timeoutworker_start_timeout

cluster_config_class c.PBSBackend.cluster_config_class = Type('dask_gateway_server.backends.jobqueue.pbs.PBSClusterConfig')

要使用的集群配置类

cluster_heartbeat_period c.PBSBackend.cluster_heartbeat_period = Int(15)

集群到网关的心跳之间的时间间隔(秒)。

较小的周期会更快检测到失败的 worker,但会占用更多资源。较大的周期在出现故障时提供更慢的反馈。

cluster_heartbeat_timeout c.PBSBackend.cluster_heartbeat_timeout = Float(0.0)

在杀死未能发送心跳的 Dask 集群之前的超时时间(秒)。

这应该大于 cluster_heartbeat_period。默认为 2 * cluster_heartbeat_period

cluster_options c.PBSBackend.cluster_options = Union()

配置单个集群的用户选项。

允许用户在创建新集群时指定配置覆盖。有关更多信息,请参阅文档

公开集群选项.

cluster_start_timeout c.PBSBackend.cluster_start_timeout = Float(60)

放弃启动 Dask 集群之前的超时时间(秒)。

cluster_status_period c.PBSBackend.cluster_status_period = Float(30)

集群状态检查之间的时间间隔(秒)。

较小的周期会更快检测到失败的集群,但会占用更多资源。较大的周期在出现故障时提供更慢的反馈。

dask_gateway_jobqueue_launcher c.PBSBackend.dask_gateway_jobqueue_launcher = Unicode('')

`dask-gateway-jobqueue-launcher` 可执行文件的路径。

db_cleanup_period c.PBSBackend.db_cleanup_period = Float(600)

数据库清理任务之间的时间间隔(秒)。

这设置了从数据库中移除旧记录的频率。这不应该太小(以保持开销较低),但应该小于 db_cluster_max_age(可能差一个数量级)。

db_cluster_max_age c.PBSBackend.db_cluster_max_age = Float(86400)

保留已完成集群记录的最大时间(秒)。

每过 db_cleanup_period,将从数据库中移除早于 db_cluster_max_age 的已完成集群。

db_debug c.PBSBackend.db_debug = Bool(False)

如果为 True,将记录所有数据库操作

db_encrypt_keys c.PBSBackend.db_encrypt_keys = List()

用于加密数据库中私有数据的密钥列表。也可以通过环境变量 DASK_GATEWAY_ENCRYPT_KEYS 设置,其值为以 ; 分隔的加密密钥字符串。

每个密钥应该是 base64 编码的 32 字节值,并且应该是密码学安全的随机值。如果没有其他选项,可以使用 openssl 通过以下命令生成单个密钥

$ openssl rand -base64 32

单个密钥有效,可以使用多个密钥来支持密钥轮换。

db_url c.PBSBackend.db_url = Unicode('sqlite:///:memory:')

数据库的 URL。默认值仅为内存中的数据库。

如果不在内存中,还必须设置 db_encrypt_keys

gateway_hostname c.PBSBackend.gateway_hostname = Unicode('')

运行 gateway 的节点的主机名。用于在 PBS 指令中引用本地主机。

parallelism c.PBSBackend.parallelism = Int(20)

用于启动/停止集群的处理程序数量。

status_command c.PBSBackend.status_command = Unicode('')

作业状态命令的路径。

stop_clusters_on_shutdown c.PBSBackend.stop_clusters_on_shutdown = Bool(True)

是否在网关关闭时停止活动集群。

如果为 true,所有活动集群将在关闭网关前停止。设置为 False 将保留活动集群运行。

submit_command c.PBSBackend.submit_command = Unicode('')

作业提交命令的路径。

worker_start_failure_limit c.PBSBackend.worker_start_failure_limit = Int(3)

在集群被标记为失败之前,启动 worker 的失败尝试次数限制。

每个启动失败的 worker(超时除外)都会增加计数器。如果 worker 成功启动,计数器将重置。如果计数器超出此限制,集群将被标记为失败并关闭。

worker_start_timeout c.PBSBackend.worker_start_timeout = Float(60)

放弃启动 Dask worker 之前的超时时间(秒)。

worker_status_period c.PBSBackend.worker_status_period = Float(30)

worker 状态检查之间的时间间隔(秒)。

较小的周期会更快检测到失败的 worker,但会占用更多资源。较大的周期在出现故障时提供更慢的反馈。

SlurmClusterConfig

class dask_gateway_server.backends.jobqueue.slurm.SlurmClusterConfig(**kwargs: Any)

在 SLURM 上运行时 Dask 集群的配置选项。

account c.SlurmClusterConfig.account = Unicode('')

与每个作业关联的账号字符串 (Account string)。

adaptive_period c.SlurmClusterConfig.adaptive_period = Float(3)

自适应伸缩检查之间的时间间隔(秒)。

较小的周期会降低响应集群负载变化时的伸缩延迟,但可能会导致网关服务器负载更高。

cluster_max_cores c.SlurmClusterConfig.cluster_max_cores = Float(None)

此集群可用的最大核心数。

设置为 None 表示无核心限制(默认)。

cluster_max_memory c.SlurmClusterConfig.cluster_max_memory = MemoryLimit(None)

此集群可用的最大内存量(字节)。允许使用以下后缀

  • K -> Kibibytes

  • M -> Mebibytes

  • G -> Gibibytes

  • T -> Tebibytes

设置为 None 表示无内存限制(默认)。

cluster_max_workers c.SlurmClusterConfig.cluster_max_workers = Int(0)

此集群可用的最大 worker 数量。

请注意,这将在运行时与 cluster_max_corescluster_max_memory 结合使用,以确定此集群实际可用的最大 worker 数量。

environment c.SlurmClusterConfig.environment = Dict()

为 worker 和 scheduler 进程设置的环境变量。

idle_timeout c.SlurmClusterConfig.idle_timeout = Float(0)

空闲集群自动关闭前的时间间隔(秒)。

设置为 0(默认)表示无空闲超时。

partition c.SlurmClusterConfig.partition = Unicode('')

提交作业的分区。

qos c.SlurmClusterConfig.qos = Unicode('')

与每个作业关联的 QOS 字符串。

scheduler_cmd c.SlurmClusterConfig.scheduler_cmd = Command()

启动 Dask scheduler 的 shell 命令。

scheduler_cores c.SlurmClusterConfig.scheduler_cores = Int(1)

Dask scheduler 可用的 CPU 核心数。

scheduler_memory c.SlurmClusterConfig.scheduler_memory = MemoryLimit('2 G')

Dask scheduler 可用的字节数。允许使用以下后缀

  • K -> Kibibytes

  • M -> Mebibytes

  • G -> Gibibytes

  • T -> Tebibytes

scheduler_setup c.SlurmClusterConfig.scheduler_setup = Unicode('')

在 Dask scheduler 启动前运行的脚本。

staging_directory c.SlurmClusterConfig.staging_directory = Unicode('{home}/.dask-gateway/')

作业开始前用于存储文件的暂存目录 (staging directory)。

每个新集群都会创建一个子目录,用于存储该集群的临时文件。集群关闭时,该子目录将被移除。

此字段可以是模板,接收以下字段:

  • home(用户的主目录)

  • username(用户名)

worker_cmd c.SlurmClusterConfig.worker_cmd = Command()

启动 Dask worker 的 shell 命令。

worker_cores c.SlurmClusterConfig.worker_cores = Int(1)

Dask worker 可用的 CPU 核心数。

worker_memory c.SlurmClusterConfig.worker_memory = MemoryLimit('2 G')

Dask worker 可用的字节数。允许使用以下后缀

  • K -> Kibibytes

  • M -> Mebibytes

  • G -> Gibibytes

  • T -> Tebibytes

worker_setup c.SlurmClusterConfig.worker_setup = Unicode('')

在 Dask worker 启动前运行的脚本。

worker_threads c.SlurmClusterConfig.worker_threads = Int(0)

Dask worker 可用的线程数。

默认为 worker_cores

SlurmBackend

class dask_gateway_server.backends.jobqueue.slurm.SlurmBackend(**kwargs: Any)

用于在 Slurm 集群上部署 Dask 的后端。

api_url c.SlurmBackend.api_url = Unicode('')

内部组件(例如 Dask 集群)联系网关时将使用的地址。

默认为 {proxy_address}/{prefix}/api,如果应使用不同地址,则手动设置。

backoff_base_delay c.SlurmBackend.backoff_base_delay = Float(0.1)

失败后重试时的退避基础延迟(秒)。

如果操作失败,将在计算出的退避时间后重试,计算方法如下

` min(backoff_max_delay, backoff_base_delay * 2 ** num_failures) `

backoff_max_delay c.SlurmBackend.backoff_max_delay = Float(300)

失败后重试时的退避策略最大延迟(秒)。

cancel_command c.SlurmBackend.cancel_command = Unicode('')

作业取消命令的路径。

check_timeouts_period c.SlurmBackend.check_timeouts_period = Float(0.0)

超时检查之间的时间间隔(秒)。

这不应该太小(以保持开销较低),但应该小于 cluster_heartbeat_timeoutcluster_start_timeoutworker_start_timeout

cluster_config_class c.SlurmBackend.cluster_config_class = Type('dask_gateway_server.backends.jobqueue.slurm.SlurmClusterConfig')

要使用的集群配置类

cluster_heartbeat_period c.SlurmBackend.cluster_heartbeat_period = Int(15)

集群到网关的心跳之间的时间间隔(秒)。

较小的周期会更快检测到失败的 worker,但会占用更多资源。较大的周期在出现故障时提供更慢的反馈。

cluster_heartbeat_timeout c.SlurmBackend.cluster_heartbeat_timeout = Float(0.0)

在杀死未能发送心跳的 Dask 集群之前的超时时间(秒)。

这应该大于 cluster_heartbeat_period。默认为 2 * cluster_heartbeat_period

cluster_options c.SlurmBackend.cluster_options = Union()

配置单个集群的用户选项。

允许用户在创建新集群时指定配置覆盖。有关更多信息,请参阅文档

公开集群选项.

cluster_start_timeout c.SlurmBackend.cluster_start_timeout = Float(60)

放弃启动 Dask 集群之前的超时时间(秒)。

cluster_status_period c.SlurmBackend.cluster_status_period = Float(30)

集群状态检查之间的时间间隔(秒)。

较小的周期会更快检测到失败的集群,但会占用更多资源。较大的周期在出现故障时提供更慢的反馈。

dask_gateway_jobqueue_launcher c.SlurmBackend.dask_gateway_jobqueue_launcher = Unicode('')

`dask-gateway-jobqueue-launcher` 可执行文件的路径。

db_cleanup_period c.SlurmBackend.db_cleanup_period = Float(600)

数据库清理任务之间的时间间隔(秒)。

这设置了从数据库中移除旧记录的频率。这不应该太小(以保持开销较低),但应该小于 db_cluster_max_age(可能差一个数量级)。

db_cluster_max_age c.SlurmBackend.db_cluster_max_age = Float(86400)

保留已完成集群记录的最大时间(秒)。

每过 db_cleanup_period,将从数据库中移除早于 db_cluster_max_age 的已完成集群。

db_debug c.SlurmBackend.db_debug = Bool(False)

如果为 True,将记录所有数据库操作

db_encrypt_keys c.SlurmBackend.db_encrypt_keys = List()

用于加密数据库中私有数据的密钥列表。也可以通过环境变量 DASK_GATEWAY_ENCRYPT_KEYS 设置,其值为以 ; 分隔的加密密钥字符串。

每个密钥应该是 base64 编码的 32 字节值,并且应该是密码学安全的随机值。如果没有其他选项,可以使用 openssl 通过以下命令生成单个密钥

$ openssl rand -base64 32

单个密钥有效,可以使用多个密钥来支持密钥轮换。

db_url c.SlurmBackend.db_url = Unicode('sqlite:///:memory:')

数据库的 URL。默认值仅为内存中的数据库。

如果不在内存中,还必须设置 db_encrypt_keys

parallelism c.SlurmBackend.parallelism = Int(20)

用于启动/停止集群的处理程序数量。

status_command c.SlurmBackend.status_command = Unicode('')

作业状态命令的路径。

stop_clusters_on_shutdown c.SlurmBackend.stop_clusters_on_shutdown = Bool(True)

是否在网关关闭时停止活动集群。

如果为 true,所有活动集群将在关闭网关前停止。设置为 False 将保留活动集群运行。

submit_command c.SlurmBackend.submit_command = Unicode('')

作业提交命令的路径。

worker_start_failure_limit c.SlurmBackend.worker_start_failure_limit = Int(3)

在集群被标记为失败之前,启动 worker 的失败尝试次数限制。

每个启动失败的 worker(超时除外)都会增加计数器。如果 worker 成功启动,计数器将重置。如果计数器超出此限制,集群将被标记为失败并关闭。

worker_start_timeout c.SlurmBackend.worker_start_timeout = Float(60)

放弃启动 Dask worker 之前的超时时间(秒)。

worker_status_period c.SlurmBackend.worker_status_period = Float(30)

worker 状态检查之间的时间间隔(秒)。

较小的周期会更快检测到失败的 worker,但会占用更多资源。较大的周期在出现故障时提供更慢的反馈。

代理 (Proxy)

代理 (Proxy)

class dask_gateway_server.proxy.Proxy(**kwargs: Any)

dask-gateway-server 代理。

address c.Proxy.address = Unicode(':8000')

HTTP/HTTPS 代理应监听的地址。

格式应为 {hostname}:{port}

其中

  • hostname 设置要侦听的主机名。设置为 """0.0.0.0" 以侦听所有接口。

  • port 设置要侦听的端口。

api_token c.Proxy.api_token = Unicode('')

代理 API 令牌。

一个 32 字节的十六进制编码随机字符串。通常使用 openssl CLI 创建。

$ openssl rand -hex 32

默认情况下从 DASK_GATEWAY_PROXY_TOKEN 环境变量加载。

externally_managed c.Proxy.externally_managed = Bool(False)

代理进程是否由外部管理。

如果为 False(默认),代理进程将由 gateway 进程启动和停止。如果代理将通过某个外部管理器(例如 supervisord)启动,请设置为 True。

gateway_url c.Proxy.gateway_url = Unicode('')

代理连接到 gateway 服务器应使用的基本 URL。

log_level c.Proxy.log_level = CaselessStrEnum('warn')

代理日志级别。

max_events c.Proxy.max_events = Int(100)

保留的最大事件数(代理更改)。

落后于此数量的代理服务器将需要进行完全刷新。

prefix c.Proxy.prefix = Unicode('')

HTTP/HTTPS 代理应提供服务的路径前缀。

此前缀将添加到所有注册到代理的路由之前。

proxy_status_period c.Proxy.proxy_status_period = Float(30)

代理状态检查之间的间隔时间(秒)。

仅当 externally_managed 为 False 时适用。

tcp_address c.Proxy.tcp_address = Unicode('')

TCP (scheduler) 代理应监听的地址。

应该采用 {hostname}:{port} 的形式

其中

  • hostname 设置要侦听的主机名。设置为 """0.0.0.0" 以侦听所有接口。

  • port 设置要侦听的端口。

如果未指定,将默认为 address

tls_cert c.Proxy.tls_cert = Unicode('')

HTTP 代理公共 URL 的 TLS 证书文件路径。

设置此项时,您也应该设置 tls_key

tls_key c.Proxy.tls_key = Unicode('')

HTTP 代理公共 URL 的 TLS 密钥文件路径。

设置此项时,您还应该设置 tls_cert。

集群管理器选项 (Cluster Manager Options)

class dask_gateway_server.options.Options(*fields, handler=None)

暴露的集群选项的声明性规范。

参数 (Parameters)
  • *fields (Field) – 零个或多个可配置字段。

  • handler (可调用对象, 可选) – 一个签名形如 handler(options)handler(options, user) 的可调用对象,其中 options 是经过验证的用户选项字典,user 是该用户的 User 模型。应返回一个配置覆盖字典,以便转发给集群管理器。如果未提供,默认行为是返回未更改的 options。

示例 (Example)

这里我们暴露选项供用户配置 c.Backend.worker_coresc.Backend.worker_memory。我们对每个资源设置了界限,以防止用户请求过大的 worker。handler 用于将用户指定的 GiB 内存转换为字节(正如 c.Backend.worker_memory 所期望的)。

from dask_gateway_server.options import Options, Integer, Float

def options_handler(options):
    return {
        "worker_cores": options.worker_cores,
        "worker_memory": int(options.worker_memory * 2 ** 30)
    }

c.Backend.DaskGateway.cluster_options = Options(
    Integer("worker_cores", default=1, min=1, max=4, label="Worker Cores"),
    Float("worker_memory", default=1, min=1, max=8, label="Worker Memory (GiB)"),
    handler=options_handler,
)
class dask_gateway_server.options.Integer(field, default=0, min=None, max=None, label=None, target=None)

一个整数字段,带有可选界限。

参数 (Parameters)
  • field (str) – 要使用的字段名称。必须是有效的 Python 变量名。这将是用户以编程方式设置此字段时使用的关键字(例如 "worker_cores")。

  • default (int, 可选) – 默认值。默认为 0。

  • min (int, 可选) – 最小有效值(包含)。如果未设置则无界限。

  • max (int, 可选) – 最大有效值(包含)。如果未设置则无界限。

  • label (str, 可选) – 一个人类可读的标签,将用于 GUI 表示(例如 "Worker Cores")。如果未提供,将使用 field

  • target (str, 可选) – 在处理后的选项字典中要设置的目标参数。必须是有效的 Python 变量名。如果未提供,将使用 field

class dask_gateway_server.options.Float(field, default=0, min=None, max=None, label=None, target=None)

一个浮点数字段,带有可选界限。

参数 (Parameters)
  • field (str) – 要使用的字段名称。必须是有效的 Python 变量名。这将是用户以编程方式设置此字段时使用的关键字(例如 "worker_cores")。

  • default (float, 可选) – 默认值。默认为 0。

  • min (float, 可选) – 最小有效值(包含)。如果未设置则无界限。

  • max (float, 可选) – 最大有效值(包含)。如果未设置则无界限。

  • label (str, 可选) – 一个人类可读的标签,将用于 GUI 表示(例如 "Worker Cores")。如果未提供,将使用 field

  • target (str, 可选) – 在处理后的选项字典中要设置的目标参数。必须是有效的 Python 变量名。如果未提供,将使用 field

class dask_gateway_server.options.String(field, default='', label=None, target=None)

一个字符串字段。

参数 (Parameters)
  • field (str) – 要使用的字段名称。必须是有效的 Python 变量名。这将是用户以编程方式设置此字段时使用的关键字(例如 "worker_cores")。

  • default (str, optional) – 默认值。默认为空字符串 ("")。

  • label (str, 可选) – 一个人类可读的标签,将用于 GUI 表示(例如 "Worker Cores")。如果未提供,将使用 field

  • target (str, 可选) – 在处理后的选项字典中要设置的目标参数。必须是有效的 Python 变量名。如果未提供,将使用 field

class dask_gateway_server.options.Bool(field, default=False, label=None, target=None)

一个布尔字段。

参数 (Parameters)
  • field (str) – 要使用的字段名称。必须是有效的 Python 变量名。这将是用户以编程方式设置此字段时使用的关键字(例如 "worker_cores")。

  • default (bool, optional) – 默认值。默认为 False。

  • label (str, 可选) – 一个人类可读的标签,将用于 GUI 表示(例如 "Worker Cores")。如果未提供,将使用 field

  • target (str, 可选) – 在处理后的选项字典中要设置的目标参数。必须是有效的 Python 变量名。如果未提供,将使用 field

class dask_gateway_server.options.Select(field, options, default=None, label=None, target=None)

一个选择字段,允许用户从几个选项中选择。

参数 (Parameters)
  • field (str) – 要使用的字段名称。必须是有效的 Python 变量名。这将是用户以编程方式设置此字段时使用的关键字(例如 "worker_cores")。

  • options (list) – 有效选项的列表。元素可以是 (key, value) 元组 ((key, value)),或者只是 key (这种情况下 value 与 key 相同)。Value 可以是任何 Python 对象,key 必须是字符串。

  • default (str, optional) – 默认选项的 key。默认为列表中第一个选项。

  • label (str, 可选) – 一个人类可读的标签,将用于 GUI 表示(例如 "Worker Cores")。如果未提供,将使用 field

  • target (str, 可选) – 在处理后的选项字典中要设置的目标参数。必须是有效的 Python 变量名。如果未提供,将使用 field

class dask_gateway_server.options.Mapping(field, default=None, label=None, target=None)

一个映射字段。

参数 (Parameters)
  • field (str) – 要使用的字段名称。必须是有效的 Python 变量名。这将是用户以编程方式设置此字段时使用的关键字(例如 "worker_cores")。

  • default (dict, optional) – 默认值。默认为空字典 ({})。

  • label (str, 可选) – 一个人类可读的标签,将用于 GUI 表示(例如 "Worker Cores")。如果未提供,将使用 field

  • target (str, 可选) – 在处理后的选项字典中要设置的目标参数。必须是有效的 Python 变量名。如果未提供,将使用 field

模型

用户

class dask_gateway_server.models.User(name, groups=(), admin=False)

一个用户记录。

参数 (Parameters)
  • name (str) – 用户名

  • groups (sequence, optional) – 用户所属的组集合。默认为不属于任何组。

  • admin (bool, optional) – 用户是否是管理员用户。默认为 False。

集群

class dask_gateway_server.models.Cluster(name, username, token, options, config, status, scheduler_address='', dashboard_address='', api_address='', tls_cert=b'', tls_key=b'', start_time=None, stop_time=None)

一个集群记录。

参数 (Parameters)
  • name (str) – 集群名称。一个可用于在网关中标识集群的唯一字符串。

  • username (str) – 启动此集群的用户名。

  • token (str) – 与此集群关联的 API token。用于对集群与网关进行认证。

  • options (dict) – 启动此集群时提供的规范化配置选项集合。这些值面向用户,不一定与后端上的 ClusterConfig 选项相对应。

  • config (dict) – 此集群的 ClusterConfig 的序列化版本。

  • status (ClusterStatus) – 集群的状态。

  • scheduler_address (str) – 调度器地址。如果集群未运行,则为空字符串。

  • dashboard_address (str) – 仪表盘地址。如果集群未运行,或集群上没有运行仪表盘,则为空字符串。

  • api_address (str) – 集群的 API 地址。如果集群未运行,则为空字符串。

  • tls_cert (bytes) – 与集群关联的 TLS 证书凭据。

  • tls_key (bytes) – 与集群关联的 TLS 密钥凭据。

  • start_time (int or None) – 自 epoch 以来的启动时间(毫秒),如果未启动则为 None。

  • stop_time (int or None) – 自 epoch 以来的停止时间(毫秒),如果未停止则为 None。