配置参考
目录
配置参考¶
网关服务器¶
- class dask_gateway_server.app.DaskGateway(**kwargs: Any)¶
用于管理跨多个用户的 Dask 集群的网关
- address c.DaskGateway.address = Unicode('')¶
私有 API 服务器应该侦听的地址。
应该采用
{hostname}:{port}
的形式其中
hostname
设置要侦听的主机名。设置为""
或"0.0.0.0"
以侦听所有接口。port
设置要侦听的端口。
默认为
127.0.0.1:0
。
- authenticator_class c.DaskGateway.authenticator_class = Type('dask_gateway_server.auth.SimpleAuthenticator')¶
要使用的网关认证器类
- backend_class c.DaskGateway.backend_class = Type('dask_gateway_server.backends.Backend')¶
要使用的网关后端类
- config_file c.DaskGateway.config_file = Unicode('dask_gateway_config.py')¶
要加载的配置文件
- log_datefmt c.DaskGateway.log_datefmt = Unicode('%Y-%m-%d %H:%M:%S')¶
日志格式化程序用于 %(asctime)s 的日期格式
- log_format c.DaskGateway.log_format = Unicode('%(log_color)s[%(levelname)1.1s %(asctime)s.%(msecs).03d %(name)s]%(reset)s %(message)s')¶
日志格式模板
- log_level c.DaskGateway.log_level = Enum('INFO')¶
按值或名称设置日志级别。
- show_config c.DaskGateway.show_config = Bool(False)¶
将配置转储到标准输出,而不是启动应用程序
- show_config_json c.DaskGateway.show_config_json = Bool(False)¶
将配置转储到标准输出(JSON 格式),而不是启动应用程序
认证¶
Kerberos 认证器¶
- class dask_gateway_server.auth.KerberosAuthenticator(**kwargs: Any)¶
使用 Kerberos 的认证器
- cache_max_age c.KerberosAuthenticator.cache_max_age = Int(300)¶
缓存认证信息的最大时间(秒)。
通过在请求之间缓存响应,有助于减轻后端认证服务的负载。在此时间后,用户需要重新认证才能发出额外请求(请注意,这对用户来说通常是透明的)。
- cookie_name c.KerberosAuthenticator.cookie_name = Unicode('')¶
用于缓存认证信息的 cookie 名称。
- keytab c.KerberosAuthenticator.keytab = Unicode('dask_gateway.keytab')¶
keytab 文件的路径
- service_name c.KerberosAuthenticator.service_name = Unicode('HTTP')¶
服务的 Kerberos 主体名称。
这几乎总是 “HTTP”(默认值)
JupyterHub 认证器¶
- class dask_gateway_server.auth.JupyterHubAuthenticator(**kwargs: Any)¶
使用 JupyterHub 执行认证的认证器
- cache_max_age c.JupyterHubAuthenticator.cache_max_age = Int(300)¶
缓存认证信息的最大时间(秒)。
通过在请求之间缓存响应,有助于减轻后端认证服务的负载。在此时间后,用户需要重新认证才能发出额外请求(请注意,这对用户来说通常是透明的)。
- cookie_name c.JupyterHubAuthenticator.cookie_name = Unicode('')¶
用于缓存认证信息的 cookie 名称。
- jupyterhub_api_token c.JupyterHubAuthenticator.jupyterhub_api_token = Unicode('')¶
Dask Gateway 的 JupyterHub API 令牌,用于认证网关对 JupyterHub 的 API 请求。
默认情况下,这取决于
JUPYTERHUB_API_TOKEN
环境变量。
- jupyterhub_api_url c.JupyterHubAuthenticator.jupyterhub_api_url = Unicode('')¶
JupyterHub 服务器的 API URL。
默认情况下,这取决于
JUPYTERHUB_API_URL
环境变量。
- tls_ca c.JupyterHubAuthenticator.tls_ca = Unicode('')¶
TLS CA 文件的路径,用于验证对 JupyterHub 的 API 请求。
设置此项时,您还应该设置 tls_key 和 tls_cert。
- tls_cert c.JupyterHubAuthenticator.tls_cert = Unicode('')¶
TLS 证书文件的路径,用于向 JupyterHub 发出 API 请求。
设置此项时,您还应该设置 tls_cert。
- tls_key c.JupyterHubAuthenticator.tls_key = Unicode('')¶
TLS 密钥文件的路径,用于向 JupyterHub 发出 API 请求。
设置此项时,您还应该设置 tls_cert。
简单认证器¶
- class dask_gateway_server.auth.SimpleAuthenticator(**kwargs: Any)¶
一个使用基本认证(Basic Auth)的简单认证器。
这是非常不安全的,仅用于测试!!!
- cache_max_age c.SimpleAuthenticator.cache_max_age = Int(300)¶
缓存认证信息的最大时间(秒)。
通过在请求之间缓存响应,有助于减轻后端认证服务的负载。在此时间后,用户需要重新认证才能发出额外请求(请注意,这对用户来说通常是透明的)。
- cookie_name c.SimpleAuthenticator.cookie_name = Unicode('')¶
用于缓存认证信息的 cookie 名称。
- password c.SimpleAuthenticator.password = Unicode(None)¶
如果设置,所有用户必须提供的全局密码。
如果未设置(默认),则完全忽略密码字段。
集群后端¶
基类¶
ClusterConfig¶
- class dask_gateway_server.backends.base.ClusterConfig(**kwargs: Any)¶
用于保存单个 Dask 集群配置的基类
- adaptive_period c.ClusterConfig.adaptive_period = Float(3)¶
自适应伸缩检查之间的时间间隔(秒)。
较小的周期会降低响应集群负载变化时的伸缩延迟,但可能会导致网关服务器负载更高。
- cluster_max_cores c.ClusterConfig.cluster_max_cores = Float(None)¶
此集群可用的最大核心数。
设置为
None
表示无核心限制(默认)。
- cluster_max_memory c.ClusterConfig.cluster_max_memory = MemoryLimit(None)¶
此集群可用的最大内存量(字节)。允许使用以下后缀
K -> Kibibytes
M -> Mebibytes
G -> Gibibytes
T -> Tebibytes
设置为
None
表示无内存限制(默认)。
- cluster_max_workers c.ClusterConfig.cluster_max_workers = Int(0)¶
此集群可用的最大 worker 数量。
请注意,这将在运行时与
cluster_max_cores
和cluster_max_memory
结合使用,以确定此集群实际可用的最大 worker 数量。
- environment c.ClusterConfig.environment = Dict()¶
为 worker 和 scheduler 进程设置的环境变量。
- idle_timeout c.ClusterConfig.idle_timeout = Float(0)¶
空闲集群自动关闭前的时间间隔(秒)。
设置为 0(默认)表示无空闲超时。
- scheduler_cmd c.ClusterConfig.scheduler_cmd = Command()¶
启动 Dask scheduler 的 shell 命令。
- scheduler_cores c.ClusterConfig.scheduler_cores = Int(1)¶
Dask scheduler 可用的 CPU 核心数。
- scheduler_memory c.ClusterConfig.scheduler_memory = MemoryLimit('2 G')¶
Dask scheduler 可用的字节数。允许使用以下后缀
K -> Kibibytes
M -> Mebibytes
G -> Gibibytes
T -> Tebibytes
- worker_cmd c.ClusterConfig.worker_cmd = Command()¶
启动 Dask worker 的 shell 命令。
- worker_cores c.ClusterConfig.worker_cores = Int(1)¶
Dask worker 可用的 CPU 核心数。
- worker_memory c.ClusterConfig.worker_memory = MemoryLimit('2 G')¶
Dask worker 可用的字节数。允许使用以下后缀
K -> Kibibytes
M -> Mebibytes
G -> Gibibytes
T -> Tebibytes
- worker_threads c.ClusterConfig.worker_threads = Int(0)¶
Dask worker 可用的线程数。
默认为
worker_cores
。
后端¶
- class dask_gateway_server.backends.base.Backend(**kwargs: Any)¶
定义 dask-gateway 后端的基类。
子类应该实现以下方法
setup
cleanup
start_cluster
stop_cluster
on_cluster_heartbeat
- api_url c.Backend.api_url = Unicode('')¶
内部组件(例如 Dask 集群)联系网关时将使用的地址。
- cluster_config_class c.Backend.cluster_config_class = Type('dask_gateway_server.backends.base.ClusterConfig')¶
要使用的集群配置类
本地进程¶
LocalClusterConfig¶
- class dask_gateway_server.backends.local.LocalClusterConfig(**kwargs: Any)¶
以本地进程运行时 Dask 集群的配置选项
- adaptive_period c.LocalClusterConfig.adaptive_period = Float(3)¶
自适应伸缩检查之间的时间间隔(秒)。
较小的周期会降低响应集群负载变化时的伸缩延迟,但可能会导致网关服务器负载更高。
- cluster_max_cores c.LocalClusterConfig.cluster_max_cores = Float(None)¶
此集群可用的最大核心数。
设置为
None
表示无核心限制(默认)。
- cluster_max_memory c.LocalClusterConfig.cluster_max_memory = MemoryLimit(None)¶
此集群可用的最大内存量(字节)。允许使用以下后缀
K -> Kibibytes
M -> Mebibytes
G -> Gibibytes
T -> Tebibytes
设置为
None
表示无内存限制(默认)。
- cluster_max_workers c.LocalClusterConfig.cluster_max_workers = Int(0)¶
此集群可用的最大 worker 数量。
请注意,这将在运行时与
cluster_max_cores
和cluster_max_memory
结合使用,以确定此集群实际可用的最大 worker 数量。
- environment c.LocalClusterConfig.environment = Dict()¶
为 worker 和 scheduler 进程设置的环境变量。
- idle_timeout c.LocalClusterConfig.idle_timeout = Float(0)¶
空闲集群自动关闭前的时间间隔(秒)。
设置为 0(默认)表示无空闲超时。
- scheduler_cmd c.LocalClusterConfig.scheduler_cmd = Command()¶
启动 Dask scheduler 的 shell 命令。
- scheduler_cores c.LocalClusterConfig.scheduler_cores = Int(1)¶
Dask scheduler 可用的 CPU 核心数。
- scheduler_memory c.LocalClusterConfig.scheduler_memory = MemoryLimit('2 G')¶
Dask scheduler 可用的字节数。允许使用以下后缀
K -> Kibibytes
M -> Mebibytes
G -> Gibibytes
T -> Tebibytes
- worker_cmd c.LocalClusterConfig.worker_cmd = Command()¶
启动 Dask worker 的 shell 命令。
- worker_cores c.LocalClusterConfig.worker_cores = Int(1)¶
Dask worker 可用的 CPU 核心数。
- worker_memory c.LocalClusterConfig.worker_memory = MemoryLimit('2 G')¶
Dask worker 可用的字节数。允许使用以下后缀
K -> Kibibytes
M -> Mebibytes
G -> Gibibytes
T -> Tebibytes
- worker_threads c.LocalClusterConfig.worker_threads = Int(0)¶
Dask worker 可用的线程数。
默认为
worker_cores
。
本地后端¶
- class dask_gateway_server.backends.local.LocalBackend(**kwargs: Any)¶
启动本地进程的集群后端。
需要超级用户权限才能为请求用户名运行进程。
- api_url c.LocalBackend.api_url = Unicode('')¶
内部组件(例如 Dask 集群)联系网关时将使用的地址。
默认为 {proxy_address}/{prefix}/api,如果应使用不同地址,则手动设置。
- backoff_base_delay c.LocalBackend.backoff_base_delay = Float(0.1)¶
失败后重试时的退避基础延迟(秒)。
如果操作失败,将在计算出的退避时间后重试,计算方法如下
` min(backoff_max_delay, backoff_base_delay * 2 ** num_failures) `
- backoff_max_delay c.LocalBackend.backoff_max_delay = Float(300)¶
失败后重试时的退避策略最大延迟(秒)。
- check_timeouts_period c.LocalBackend.check_timeouts_period = Float(0.0)¶
超时检查之间的时间间隔(秒)。
这不应该太小(以保持开销较低),但应该小于
cluster_heartbeat_timeout
、cluster_start_timeout
和worker_start_timeout
。
- cluster_config_class c.LocalBackend.cluster_config_class = Type('dask_gateway_server.backends.local.LocalClusterConfig')¶
要使用的集群配置类
- cluster_heartbeat_period c.LocalBackend.cluster_heartbeat_period = Int(15)¶
集群到网关的心跳之间的时间间隔(秒)。
较小的周期会更快检测到失败的 worker,但会占用更多资源。较大的周期在出现故障时提供更慢的反馈。
- cluster_heartbeat_timeout c.LocalBackend.cluster_heartbeat_timeout = Float(0.0)¶
在杀死未能发送心跳的 Dask 集群之前的超时时间(秒)。
这应该大于
cluster_heartbeat_period
。默认为2 * cluster_heartbeat_period
。
- cluster_options c.LocalBackend.cluster_options = Union()¶
配置单个集群的用户选项。
允许用户在创建新集群时指定配置覆盖。有关更多信息,请参阅文档
- cluster_start_timeout c.LocalBackend.cluster_start_timeout = Float(60)¶
放弃启动 Dask 集群之前的超时时间(秒)。
- cluster_status_period c.LocalBackend.cluster_status_period = Float(30)¶
集群状态检查之间的时间间隔(秒)。
较小的周期会更快检测到失败的集群,但会占用更多资源。较大的周期在出现故障时提供更慢的反馈。
- clusters_directory c.LocalBackend.clusters_directory = Unicode('')¶
集群工作目录的根目录。
为每个新集群创建一个子目录,该子目录将作为该集群的工作目录。集群关闭时,将移除该子目录。
如果未指定,将为每个集群使用一个临时目录。
- db_cleanup_period c.LocalBackend.db_cleanup_period = Float(600)¶
数据库清理任务之间的时间间隔(秒)。
这设置了从数据库中移除旧记录的频率。这不应该太小(以保持开销较低),但应该小于
db_cluster_max_age
(可能差一个数量级)。
- db_cluster_max_age c.LocalBackend.db_cluster_max_age = Float(86400)¶
保留已完成集群记录的最大时间(秒)。
每过
db_cleanup_period
,将从数据库中移除早于db_cluster_max_age
的已完成集群。
- db_debug c.LocalBackend.db_debug = Bool(False)¶
如果为 True,将记录所有数据库操作
- db_encrypt_keys c.LocalBackend.db_encrypt_keys = List()¶
用于加密数据库中私有数据的密钥列表。也可以通过环境变量
DASK_GATEWAY_ENCRYPT_KEYS
设置,其值为以;
分隔的加密密钥字符串。每个密钥应该是 base64 编码的 32 字节值,并且应该是密码学安全的随机值。如果没有其他选项,可以使用 openssl 通过以下命令生成单个密钥
$ openssl rand -base64 32
单个密钥有效,可以使用多个密钥来支持密钥轮换。
- db_url c.LocalBackend.db_url = Unicode('sqlite:///:memory:')¶
数据库的 URL。默认值仅为内存中的数据库。
如果不在内存中,还必须设置
db_encrypt_keys
。
- inherited_environment c.LocalBackend.inherited_environment = List()¶
scheduler 和 worker 进程从 Dask-Gateway 进程继承的环境变量白名单。
- parallelism c.LocalBackend.parallelism = Int(20)¶
用于启动/停止集群的处理程序数量。
- sigint_timeout c.LocalBackend.sigint_timeout = Int(10)¶
SIGINT 后等待进程停止的秒数。
在此时间后进程仍未停止,则发送 SIGTERM。
- sigkill_timeout c.LocalBackend.sigkill_timeout = Int(5)¶
SIGKILL 后等待进程停止的秒数。
如果在此时间后进程仍未停止,则会记录警告,并将该进程视为僵尸进程。
- sigterm_timeout c.LocalBackend.sigterm_timeout = Int(5)¶
SIGTERM 后等待进程停止的秒数。
如果在此时间后进程仍未停止,则发送 SIGKILL。
- stop_clusters_on_shutdown c.LocalBackend.stop_clusters_on_shutdown = Bool(True)¶
是否在网关关闭时停止活动集群。
如果为 true,所有活动集群将在关闭网关前停止。设置为 False 将保留活动集群运行。
- worker_start_failure_limit c.LocalBackend.worker_start_failure_limit = Int(3)¶
在集群被标记为失败之前,启动 worker 的失败尝试次数限制。
每个启动失败的 worker(超时除外)都会增加计数器。如果 worker 成功启动,计数器将重置。如果计数器超出此限制,集群将被标记为失败并关闭。
- worker_start_timeout c.LocalBackend.worker_start_timeout = Float(60)¶
放弃启动 Dask worker 之前的超时时间(秒)。
- worker_status_period c.LocalBackend.worker_status_period = Float(30)¶
worker 状态检查之间的时间间隔(秒)。
较小的周期会更快检测到失败的 worker,但会占用更多资源。较大的周期在出现故障时提供更慢的反馈。
不安全本地后端¶
- class dask_gateway_server.backends.local.UnsafeLocalBackend(**kwargs: Any)¶
LocalBackend 的一个版本,不设置权限。
仅用于测试!这不提供用户隔离 - 集群运行的权限级别与网关相同。
- api_url c.UnsafeLocalBackend.api_url = Unicode('')¶
内部组件(例如 Dask 集群)联系网关时将使用的地址。
默认为 {proxy_address}/{prefix}/api,如果应使用不同地址,则手动设置。
- backoff_base_delay c.UnsafeLocalBackend.backoff_base_delay = Float(0.1)¶
失败后重试时的退避基础延迟(秒)。
如果操作失败,将在计算出的退避时间后重试,计算方法如下
` min(backoff_max_delay, backoff_base_delay * 2 ** num_failures) `
- backoff_max_delay c.UnsafeLocalBackend.backoff_max_delay = Float(300)¶
失败后重试时的退避策略最大延迟(秒)。
- check_timeouts_period c.UnsafeLocalBackend.check_timeouts_period = Float(0.0)¶
超时检查之间的时间间隔(秒)。
这不应该太小(以保持开销较低),但应该小于
cluster_heartbeat_timeout
、cluster_start_timeout
和worker_start_timeout
。
- cluster_config_class c.UnsafeLocalBackend.cluster_config_class = Type('dask_gateway_server.backends.local.LocalClusterConfig')¶
要使用的集群配置类
- cluster_heartbeat_period c.UnsafeLocalBackend.cluster_heartbeat_period = Int(15)¶
集群到网关的心跳之间的时间间隔(秒)。
较小的周期会更快检测到失败的 worker,但会占用更多资源。较大的周期在出现故障时提供更慢的反馈。
- cluster_heartbeat_timeout c.UnsafeLocalBackend.cluster_heartbeat_timeout = Float(0.0)¶
在杀死未能发送心跳的 Dask 集群之前的超时时间(秒)。
这应该大于
cluster_heartbeat_period
。默认为2 * cluster_heartbeat_period
。
- cluster_options c.UnsafeLocalBackend.cluster_options = Union()¶
配置单个集群的用户选项。
允许用户在创建新集群时指定配置覆盖。有关更多信息,请参阅文档
- cluster_start_timeout c.UnsafeLocalBackend.cluster_start_timeout = Float(60)¶
放弃启动 Dask 集群之前的超时时间(秒)。
- cluster_status_period c.UnsafeLocalBackend.cluster_status_period = Float(30)¶
集群状态检查之间的时间间隔(秒)。
较小的周期会更快检测到失败的集群,但会占用更多资源。较大的周期在出现故障时提供更慢的反馈。
- clusters_directory c.UnsafeLocalBackend.clusters_directory = Unicode('')¶
集群工作目录的根目录。
为每个新集群创建一个子目录,该子目录将作为该集群的工作目录。集群关闭时,将移除该子目录。
如果未指定,将为每个集群使用一个临时目录。
- db_cleanup_period c.UnsafeLocalBackend.db_cleanup_period = Float(600)¶
数据库清理任务之间的时间间隔(秒)。
这设置了从数据库中移除旧记录的频率。这不应该太小(以保持开销较低),但应该小于
db_cluster_max_age
(可能差一个数量级)。
- db_cluster_max_age c.UnsafeLocalBackend.db_cluster_max_age = Float(86400)¶
保留已完成集群记录的最大时间(秒)。
每过
db_cleanup_period
,将从数据库中移除早于db_cluster_max_age
的已完成集群。
- db_debug c.UnsafeLocalBackend.db_debug = Bool(False)¶
如果为 True,将记录所有数据库操作
- db_encrypt_keys c.UnsafeLocalBackend.db_encrypt_keys = List()¶
用于加密数据库中私有数据的密钥列表。也可以通过环境变量
DASK_GATEWAY_ENCRYPT_KEYS
设置,其值为以;
分隔的加密密钥字符串。每个密钥应该是 base64 编码的 32 字节值,并且应该是密码学安全的随机值。如果没有其他选项,可以使用 openssl 通过以下命令生成单个密钥
$ openssl rand -base64 32
单个密钥有效,可以使用多个密钥来支持密钥轮换。
- db_url c.UnsafeLocalBackend.db_url = Unicode('sqlite:///:memory:')¶
数据库的 URL。默认值仅为内存中的数据库。
如果不在内存中,还必须设置
db_encrypt_keys
。
- inherited_environment c.UnsafeLocalBackend.inherited_environment = List()¶
scheduler 和 worker 进程从 Dask-Gateway 进程继承的环境变量白名单。
- parallelism c.UnsafeLocalBackend.parallelism = Int(20)¶
用于启动/停止集群的处理程序数量。
- sigint_timeout c.UnsafeLocalBackend.sigint_timeout = Int(10)¶
SIGINT 后等待进程停止的秒数。
在此时间后进程仍未停止,则发送 SIGTERM。
- sigkill_timeout c.UnsafeLocalBackend.sigkill_timeout = Int(5)¶
SIGKILL 后等待进程停止的秒数。
如果在此时间后进程仍未停止,则会记录警告,并将该进程视为僵尸进程。
- sigterm_timeout c.UnsafeLocalBackend.sigterm_timeout = Int(5)¶
SIGTERM 后等待进程停止的秒数。
如果在此时间后进程仍未停止,则发送 SIGKILL。
- stop_clusters_on_shutdown c.UnsafeLocalBackend.stop_clusters_on_shutdown = Bool(True)¶
是否在网关关闭时停止活动集群。
如果为 true,所有活动集群将在关闭网关前停止。设置为 False 将保留活动集群运行。
- worker_start_failure_limit c.UnsafeLocalBackend.worker_start_failure_limit = Int(3)¶
在集群被标记为失败之前,启动 worker 的失败尝试次数限制。
每个启动失败的 worker(超时除外)都会增加计数器。如果 worker 成功启动,计数器将重置。如果计数器超出此限制,集群将被标记为失败并关闭。
- worker_start_timeout c.UnsafeLocalBackend.worker_start_timeout = Float(60)¶
放弃启动 Dask worker 之前的超时时间(秒)。
- worker_status_period c.UnsafeLocalBackend.worker_status_period = Float(30)¶
worker 状态检查之间的时间间隔(秒)。
较小的周期会更快检测到失败的 worker,但会占用更多资源。较大的周期在出现故障时提供更慢的反馈。
YARN¶
YarnClusterConfig¶
- class dask_gateway_server.backends.yarn.YarnClusterConfig(**kwargs: Any)¶
在 Hadoop/YARN 上运行时 Dask 集群的配置选项
- adaptive_period c.YarnClusterConfig.adaptive_period = Float(3)¶
自适应伸缩检查之间的时间间隔(秒)。
较小的周期会降低响应集群负载变化时的伸缩延迟,但可能会导致网关服务器负载更高。
- cluster_max_cores c.YarnClusterConfig.cluster_max_cores = Float(None)¶
此集群可用的最大核心数。
设置为
None
表示无核心限制(默认)。
- cluster_max_memory c.YarnClusterConfig.cluster_max_memory = MemoryLimit(None)¶
此集群可用的最大内存量(字节)。允许使用以下后缀
K -> Kibibytes
M -> Mebibytes
G -> Gibibytes
T -> Tebibytes
设置为
None
表示无内存限制(默认)。
- cluster_max_workers c.YarnClusterConfig.cluster_max_workers = Int(0)¶
此集群可用的最大 worker 数量。
请注意,这将在运行时与
cluster_max_cores
和cluster_max_memory
结合使用,以确定此集群实际可用的最大 worker 数量。
- environment c.YarnClusterConfig.environment = Dict()¶
为 worker 和 scheduler 进程设置的环境变量。
- idle_timeout c.YarnClusterConfig.idle_timeout = Float(0)¶
空闲集群自动关闭前的时间间隔(秒)。
设置为 0(默认)表示无空闲超时。
- localize_files c.YarnClusterConfig.localize_files = Dict()¶
额外文件,将分发给 worker 和 scheduler 容器。
这是从
local-name
到resource
的映射。资源路径可以是本地的,也可以在 HDFS 中(如果是,请以hdfs://...
为前缀)。如果是归档文件(.tar.gz
或.zip
),资源将被解压为目录local-name
。为了更精细的控制,资源也可以指定为skein.File
对象,或其对应的dict
。这可用于通过配置以下项来分发 conda/虚拟环境
c.YarnClusterConfig.localize_files = { 'environment': { 'source': 'hdfs:///path/to/archived/environment.tar.gz', 'visibility': 'public' } } c.YarnClusterConfig.scheduler_setup = 'source environment/bin/activate' c.YarnClusterConfig.worker_setup = 'source environment/bin/activate'
这些归档文件通常使用
conda-pack
或venv-pack
创建。有关分发文件的更多信息,请参阅 https://jcristharif.com/skein/distributing-files.html。
- queue c.YarnClusterConfig.queue = Unicode('default')¶
在哪个 YARN 队列下提交应用程序
- scheduler_cmd c.YarnClusterConfig.scheduler_cmd = Command()¶
启动 Dask scheduler 的 shell 命令。
- scheduler_cores c.YarnClusterConfig.scheduler_cores = Int(1)¶
Dask scheduler 可用的 CPU 核心数。
- scheduler_memory c.YarnClusterConfig.scheduler_memory = MemoryLimit('2 G')¶
Dask scheduler 可用的字节数。允许使用以下后缀
K -> Kibibytes
M -> Mebibytes
G -> Gibibytes
T -> Tebibytes
- scheduler_setup c.YarnClusterConfig.scheduler_setup = Unicode('')¶
在 Dask scheduler 启动前运行的脚本。
- worker_cmd c.YarnClusterConfig.worker_cmd = Command()¶
启动 Dask worker 的 shell 命令。
- worker_cores c.YarnClusterConfig.worker_cores = Int(1)¶
Dask worker 可用的 CPU 核心数。
- worker_memory c.YarnClusterConfig.worker_memory = MemoryLimit('2 G')¶
Dask worker 可用的字节数。允许使用以下后缀
K -> Kibibytes
M -> Mebibytes
G -> Gibibytes
T -> Tebibytes
- worker_setup c.YarnClusterConfig.worker_setup = Unicode('')¶
在 Dask worker 启动前运行的脚本。
- worker_threads c.YarnClusterConfig.worker_threads = Int(0)¶
Dask worker 可用的线程数。
默认为
worker_cores
。
Yarn 后端¶
- class dask_gateway_server.backends.yarn.YarnBackend(**kwargs: Any)¶
用于管理 Hadoop/YARN 上 Dask 集群的集群后端。
- api_url c.YarnBackend.api_url = Unicode('')¶
内部组件(例如 Dask 集群)联系网关时将使用的地址。
默认为 {proxy_address}/{prefix}/api,如果应使用不同地址,则手动设置。
- app_client_cache_max_size c.YarnBackend.app_client_cache_max_size = Int(10)¶
应用程序客户端缓存的最大大小。
较大的缓存会带来更好的性能,但也会占用更多资源。
- backoff_base_delay c.YarnBackend.backoff_base_delay = Float(0.1)¶
失败后重试时的退避基础延迟(秒)。
如果操作失败,将在计算出的退避时间后重试,计算方法如下
` min(backoff_max_delay, backoff_base_delay * 2 ** num_failures) `
- backoff_max_delay c.YarnBackend.backoff_max_delay = Float(300)¶
失败后重试时的退避策略最大延迟(秒)。
- check_timeouts_period c.YarnBackend.check_timeouts_period = Float(0.0)¶
超时检查之间的时间间隔(秒)。
这不应该太小(以保持开销较低),但应该小于
cluster_heartbeat_timeout
、cluster_start_timeout
和worker_start_timeout
。
- cluster_config_class c.YarnBackend.cluster_config_class = Type('dask_gateway_server.backends.yarn.YarnClusterConfig')¶
要使用的集群配置类
- cluster_heartbeat_period c.YarnBackend.cluster_heartbeat_period = Int(15)¶
集群到网关的心跳之间的时间间隔(秒)。
较小的周期会更快检测到失败的 worker,但会占用更多资源。较大的周期在出现故障时提供更慢的反馈。
- cluster_heartbeat_timeout c.YarnBackend.cluster_heartbeat_timeout = Float(0.0)¶
在杀死未能发送心跳的 Dask 集群之前的超时时间(秒)。
这应该大于
cluster_heartbeat_period
。默认为2 * cluster_heartbeat_period
。
- cluster_options c.YarnBackend.cluster_options = Union()¶
配置单个集群的用户选项。
允许用户在创建新集群时指定配置覆盖。有关更多信息,请参阅文档
- cluster_start_timeout c.YarnBackend.cluster_start_timeout = Float(60)¶
放弃启动 Dask 集群之前的超时时间(秒)。
- cluster_status_period c.YarnBackend.cluster_status_period = Float(30)¶
集群状态检查之间的时间间隔(秒)。
较小的周期会更快检测到失败的集群,但会占用更多资源。较大的周期在出现故障时提供更慢的反馈。
- db_cleanup_period c.YarnBackend.db_cleanup_period = Float(600)¶
数据库清理任务之间的时间间隔(秒)。
这设置了从数据库中移除旧记录的频率。这不应该太小(以保持开销较低),但应该小于
db_cluster_max_age
(可能差一个数量级)。
- db_cluster_max_age c.YarnBackend.db_cluster_max_age = Float(86400)¶
保留已完成集群记录的最大时间(秒)。
每过
db_cleanup_period
,将从数据库中移除早于db_cluster_max_age
的已完成集群。
- db_debug c.YarnBackend.db_debug = Bool(False)¶
如果为 True,将记录所有数据库操作
- db_encrypt_keys c.YarnBackend.db_encrypt_keys = List()¶
用于加密数据库中私有数据的密钥列表。也可以通过环境变量
DASK_GATEWAY_ENCRYPT_KEYS
设置,其值为以;
分隔的加密密钥字符串。每个密钥应该是 base64 编码的 32 字节值,并且应该是密码学安全的随机值。如果没有其他选项,可以使用 openssl 通过以下命令生成单个密钥
$ openssl rand -base64 32
单个密钥有效,可以使用多个密钥来支持密钥轮换。
- db_url c.YarnBackend.db_url = Unicode('sqlite:///:memory:')¶
数据库的 URL。默认值仅为内存中的数据库。
如果不在内存中,还必须设置
db_encrypt_keys
。
- keytab c.YarnBackend.keytab = Unicode(None)¶
Dask Gateway 用户 Kerberos keytab 的路径
- parallelism c.YarnBackend.parallelism = Int(20)¶
用于启动/停止集群的处理程序数量。
- principal c.YarnBackend.principal = Unicode(None)¶
Dask Gateway 用户的 Kerberos 主体
- stop_clusters_on_shutdown c.YarnBackend.stop_clusters_on_shutdown = Bool(True)¶
是否在网关关闭时停止活动集群。
如果为 true,所有活动集群将在关闭网关前停止。设置为 False 将保留活动集群运行。
- worker_start_failure_limit c.YarnBackend.worker_start_failure_limit = Int(3)¶
在集群被标记为失败之前,启动 worker 的失败尝试次数限制。
每个启动失败的 worker(超时除外)都会增加计数器。如果 worker 成功启动,计数器将重置。如果计数器超出此限制,集群将被标记为失败并关闭。
- worker_start_timeout c.YarnBackend.worker_start_timeout = Float(60)¶
放弃启动 Dask worker 之前的超时时间(秒)。
- worker_status_period c.YarnBackend.worker_status_period = Float(30)¶
worker 状态检查之间的时间间隔(秒)。
较小的周期会更快检测到失败的 worker,但会占用更多资源。较大的周期在出现故障时提供更慢的反馈。
Kubernetes¶
KubeClusterConfig¶
- class dask_gateway_server.backends.kubernetes.KubeClusterConfig(**kwargs: Any)¶
在 Kubernetes 上运行单个 Dask 集群的配置
- adaptive_period c.KubeClusterConfig.adaptive_period = Float(3)¶
自适应伸缩检查之间的时间间隔(秒)。
较小的周期会降低响应集群负载变化时的伸缩延迟,但可能会导致网关服务器负载更高。
- cluster_max_cores c.KubeClusterConfig.cluster_max_cores = Float(None)¶
此集群可用的最大核心数。
设置为
None
表示无核心限制(默认)。
- cluster_max_memory c.KubeClusterConfig.cluster_max_memory = MemoryLimit(None)¶
此集群可用的最大内存量(字节)。允许使用以下后缀
K -> Kibibytes
M -> Mebibytes
G -> Gibibytes
T -> Tebibytes
设置为
None
表示无内存限制(默认)。
- cluster_max_workers c.KubeClusterConfig.cluster_max_workers = Int(0)¶
此集群可用的最大 worker 数量。
请注意,这将在运行时与
cluster_max_cores
和cluster_max_memory
结合使用,以确定此集群实际可用的最大 worker 数量。
- environment c.KubeClusterConfig.environment = Dict()¶
为 worker 和 scheduler 进程设置的环境变量。
- idle_timeout c.KubeClusterConfig.idle_timeout = Float(0)¶
空闲集群自动关闭前的时间间隔(秒)。
设置为 0(默认)表示无空闲超时。
- image c.KubeClusterConfig.image = Unicode('daskgateway/dask-gateway:latest')¶
用于运行用户容器的 Docker 镜像。
- image_pull_policy c.KubeClusterConfig.image_pull_policy = Unicode(None)¶
image
中指定的 Docker 镜像的拉取策略
- image_pull_secrets c.KubeClusterConfig.image_pull_secrets = List()¶
访问私有镜像仓库的镜像拉取密钥。
应该是一个字典列表,每个字典包含一个名为
name
的键,以匹配 k8s 原生语法。
- namespace c.KubeClusterConfig.namespace = Unicode('default')¶
在哪个 Kubernetes namespace 中启动 Pod。
如果在启用了服务帐户的 Kubernetes 集群中运行,则默认为当前 namespace。如果不是,则默认为 default
- scheduler_cmd c.KubeClusterConfig.scheduler_cmd = Command()¶
启动 Dask scheduler 的 shell 命令。
- scheduler_cores c.KubeClusterConfig.scheduler_cores = Float(1)¶
Dask scheduler 可用的 CPU 核心数。
- scheduler_cores_limit c.KubeClusterConfig.scheduler_cores_limit = Float(0.0)¶
Dask scheduler 可用的最大 CPU 核心数。
默认为
scheduler_cores
。
- scheduler_extra_container_config c.KubeClusterConfig.scheduler_extra_container_config = Dict()¶
scheduler 容器的任何额外配置。
此字典将在提交前与 scheduler 容器(一个
V1Container
对象)进行深度合并。键应与 kubernetes 规范中的键匹配,并且应为 camelCase。有关更多信息,请参阅
worker_extra_container_config
。
- scheduler_extra_pod_annotations c.KubeClusterConfig.scheduler_extra_pod_annotations = Dict()¶
应用于用户 scheduler Pod 的任何额外注解。
这些注解可以使用集群选项(参阅公开集群选项)设置,以便注入用户特定信息,例如根据用户的组或用户名添加注解。
此字典将在应用于用户 Pod 之前与
common_annotations
合并。
- scheduler_extra_pod_config c.KubeClusterConfig.scheduler_extra_pod_config = Dict()¶
scheduler Pod 的任何额外配置。
此字典将在提交前与 scheduler Pod 规范(一个
V1PodSpec
对象)进行深度合并。键应与 kubernetes 规范中的键匹配,并且应为 camelCase。有关更多信息,请参阅
worker_extra_pod_config
。
- scheduler_extra_pod_labels c.KubeClusterConfig.scheduler_extra_pod_labels = Dict()¶
应用于用户 scheduler Pod 的任何额外标签。
这些标签可以使用集群选项(参阅公开集群选项)设置,以便注入用户特定信息,例如根据用户的组或用户名添加标签。
此字典将在应用于用户 Pod 之前与
common_labels
合并。
- scheduler_memory c.KubeClusterConfig.scheduler_memory = MemoryLimit('2 G')¶
Dask scheduler 可用的字节数。允许使用以下后缀
K -> Kibibytes
M -> Mebibytes
G -> Gibibytes
T -> Tebibytes
- scheduler_memory_limit c.KubeClusterConfig.scheduler_memory_limit = MemoryLimit(0)¶
Dask scheduler 可用内存的最大字节数。允许使用以下后缀:
K -> Kibibytes
M -> Mebibytes
G -> Gibibytes
T -> Tebibytes
默认为
scheduler_memory
。
- worker_cmd c.KubeClusterConfig.worker_cmd = Command()¶
启动 Dask worker 的 shell 命令。
- worker_cores c.KubeClusterConfig.worker_cores = Float(1)¶
Dask worker 可用的 CPU 核心数。
- worker_cores_limit c.KubeClusterConfig.worker_cores_limit = Float(0.0)¶
Dask worker 可用 CPU 核心数上限。
默认为
worker_cores
。
- worker_extra_container_config c.KubeClusterConfig.worker_extra_container_config = Dict()¶
worker 容器的任何额外配置。
在提交之前,此字典将与 worker 容器(一个
V1Container
对象)进行深度合并。键应与 kubernetes 规范中的键匹配,并且应使用 camelCase 命名法。例如,这里我们将 secret 中的环境变量添加到 worker 容器:
c.KubeClusterConfig.worker_extra_container_config = { "envFrom": [ {"secretRef": {"name": "my-env-secret"}} ] }
- worker_extra_pod_annotations c.KubeClusterConfig.worker_extra_pod_annotations = Dict()¶
应用于用户 worker Pod 的任何额外注解 (annotations)。
这些注解可以使用集群选项(参阅公开集群选项)设置,以便注入用户特定信息,例如根据用户的组或用户名添加注解。
此字典将在应用于用户 Pod 之前与
common_annotations
合并。
- worker_extra_pod_config c.KubeClusterConfig.worker_extra_pod_config = Dict()¶
worker Pod 的任何额外配置。
在提交之前,此字典将与 worker Pod 规范(一个
V1PodSpec
对象)进行深度合并。键应与 kubernetes 规范中的键匹配,并且应使用 camelCase 命名法。例如,这里我们为 worker Pod 添加一个容忍度 (toleration)。
c.KubeClusterConfig.worker_extra_pod_config = { "tolerations": [ { "key": "key", "operator": "Equal", "value": "value", "effect": "NoSchedule", } ] }
- worker_extra_pod_labels c.KubeClusterConfig.worker_extra_pod_labels = Dict()¶
应用于用户 worker Pod 的任何额外标签 (labels)。
这些标签可以使用集群选项(参阅公开集群选项)设置,以便注入用户特定信息,例如根据用户的组或用户名添加标签。
此字典将在应用于用户 Pod 之前与
common_labels
合并。
- worker_memory c.KubeClusterConfig.worker_memory = MemoryLimit('2 G')¶
Dask worker 可用的字节数。允许使用以下后缀
K -> Kibibytes
M -> Mebibytes
G -> Gibibytes
T -> Tebibytes
- worker_memory_limit c.KubeClusterConfig.worker_memory_limit = MemoryLimit(0)¶
Dask worker 可用内存的最大字节数。允许使用以下后缀:
K -> Kibibytes
M -> Mebibytes
G -> Gibibytes
T -> Tebibytes
默认为
worker_memory
。
- worker_threads c.KubeClusterConfig.worker_threads = Int(0)¶
Dask worker 可用的线程数。
默认为
worker_cores
。
KubeBackend¶
- class dask_gateway_server.backends.kubernetes.KubeBackend(**kwargs: Any)¶
用于在 Kubernetes 上运行 Dask Gateway 的后端。
- api_url c.KubeBackend.api_url = Unicode('')¶
内部组件(例如 Dask 集群)联系网关时将使用的地址。
- cluster_config_class c.KubeBackend.cluster_config_class = Type('dask_gateway_server.backends.kubernetes.backend.KubeClusterConfig')¶
要使用的集群配置类
- cluster_options c.KubeBackend.cluster_options = Union()¶
配置单个集群的用户选项。
允许用户在创建新集群时指定配置覆盖。有关更多信息,请参阅文档
- common_annotations c.KubeBackend.common_annotations = Dict()¶
应用于 gateway 创建的所有对象的 Kubernetes 注解 (annotations)。
- common_labels c.KubeBackend.common_labels = Dict()¶
应用于 gateway 创建的所有对象的 Kubernetes 标签 (labels)。
- crd_version c.KubeBackend.crd_version = Unicode('v1alpha1')¶
DaskCluster CRD 的版本。
- gateway_instance c.KubeBackend.gateway_instance = Unicode('')¶
此 dask-gateway 实例的唯一 ID。
控制器也必须配置相同的 ID。
- label_selector c.KubeBackend.label_selector = Unicode('')¶
监视由 gateway 管理的对象时使用的标签选择器 (label selector)。
KubeController¶
- class dask_gateway_server.backends.kubernetes.controller.KubeController(**kwargs: Any)¶
Dask Gateway 的 Kubernetes 控制器。
- address c.KubeController.address = Unicode(':8000')¶
服务器应监听的地址。
- api_url c.KubeController.api_url = Unicode('')¶
内部组件(例如 Dask 集群)联系网关时将使用的地址。
- backoff_base_delay c.KubeController.backoff_base_delay = Float(0.1)¶
失败后重试时的退避基础延迟(秒)。
如果操作失败,将在计算出的退避时间后重试,计算方法如下
` min(backoff_max_delay, backoff_base_delay * 2 ** num_failures) `
- backoff_max_delay c.KubeController.backoff_max_delay = Float(300)¶
失败后重试时的退避策略最大延迟(秒)。
- common_annotations c.KubeController.common_annotations = Dict()¶
应用于 gateway 创建的所有对象的 Kubernetes 注解 (annotations)。
- common_labels c.KubeController.common_labels = Dict()¶
应用于 gateway 创建的所有对象的 Kubernetes 标签 (labels)。
- completed_cluster_cleanup_period c.KubeController.completed_cluster_cleanup_period = Float(600)¶
清理任务之间的间隔时间(秒)。
这设置了从 kubernetes 删除旧集群记录的频率。这个值不应该太小(以保持低开销),但应该小于
completed_cluster_max_age
(可能小一个数量级)。
- completed_cluster_max_age c.KubeController.completed_cluster_max_age = Float(86400)¶
保留已完成集群记录的最大时间(秒)。
每隔
completed_cluster_cleanup_period
秒,将从 kubernetes 中删除超过completed_cluster_max_age
时间的已完成集群。
- config_file c.KubeController.config_file = Unicode('dask_gateway_config.py')¶
要加载的配置文件
- crd_version c.KubeController.crd_version = Unicode('v1alpha1')¶
DaskCluster CRD 的版本。
- gateway_instance c.KubeController.gateway_instance = Unicode('')¶
此 dask-gateway 实例的唯一 ID。
控制器也必须配置相同的 ID。
- k8s_api_rate_limit c.KubeController.k8s_api_rate_limit = Int(50)¶
每秒平均 Kubernetes API 调用次数的限制。
- k8s_api_rate_limit_burst c.KubeController.k8s_api_rate_limit_burst = Int(100)¶
每秒最大 Kubernetes API 调用次数的限制。
- label_selector c.KubeController.label_selector = Unicode('')¶
监视由 gateway 管理的对象时使用的标签选择器 (label selector)。
- log_datefmt c.KubeController.log_datefmt = Unicode('%Y-%m-%d %H:%M:%S')¶
日志格式化程序用于 %(asctime)s 的日期格式
- log_format c.KubeController.log_format = Unicode('%(log_color)s[%(levelname)1.1s %(asctime)s.%(msecs).03d %(name)s]%(reset)s %(message)s')¶
日志格式模板
- log_level c.KubeController.log_level = Enum('INFO')¶
按值或名称设置日志级别。
- parallelism c.KubeController.parallelism = Int(20)¶
用于协调 (reconciling) Kubernetes 对象的处理器 (handler) 数量。
- proxy_prefix c.KubeController.proxy_prefix = Unicode('')¶
HTTP/HTTPS 代理应提供服务的路径前缀。
此前缀将添加到所有注册到代理的路由之前。
- proxy_tcp_entrypoint c.KubeController.proxy_tcp_entrypoint = Unicode('tcp')¶
创建 ingressroutetcps 时使用的 Traefik 入口点 (entrypoint) 名称。
- proxy_web_entrypoint c.KubeController.proxy_web_entrypoint = Unicode('web')¶
创建 ingressroutes 时使用的 Traefik 入口点 (entrypoint) 名称。
- proxy_web_middlewares c.KubeController.proxy_web_middlewares = List()¶
应用于添加到代理的 web 路由的中间件 (middlewares) 列表。
- show_config c.KubeController.show_config = Bool(False)¶
将配置转储到标准输出,而不是启动应用程序
- show_config_json c.KubeController.show_config_json = Bool(False)¶
将配置转储到标准输出(JSON 格式),而不是启动应用程序
作业队列 (Job Queues)¶
PBSClusterConfig¶
- class dask_gateway_server.backends.jobqueue.pbs.PBSClusterConfig(**kwargs: Any)¶
在 PBS 上运行时 Dask 集群的配置选项。
- account c.PBSClusterConfig.account = Unicode('')¶
与每个作业关联的记账字符串 (accounting string)。
- adaptive_period c.PBSClusterConfig.adaptive_period = Float(3)¶
自适应伸缩检查之间的时间间隔(秒)。
较小的周期会降低响应集群负载变化时的伸缩延迟,但可能会导致网关服务器负载更高。
- cluster_max_cores c.PBSClusterConfig.cluster_max_cores = Float(None)¶
此集群可用的最大核心数。
设置为
None
表示无核心限制(默认)。
- cluster_max_memory c.PBSClusterConfig.cluster_max_memory = MemoryLimit(None)¶
此集群可用的最大内存量(字节)。允许使用以下后缀
K -> Kibibytes
M -> Mebibytes
G -> Gibibytes
T -> Tebibytes
设置为
None
表示无内存限制(默认)。
- cluster_max_workers c.PBSClusterConfig.cluster_max_workers = Int(0)¶
此集群可用的最大 worker 数量。
请注意,这将在运行时与
cluster_max_cores
和cluster_max_memory
结合使用,以确定此集群实际可用的最大 worker 数量。
- environment c.PBSClusterConfig.environment = Dict()¶
为 worker 和 scheduler 进程设置的环境变量。
- idle_timeout c.PBSClusterConfig.idle_timeout = Float(0)¶
空闲集群自动关闭前的时间间隔(秒)。
设置为 0(默认)表示无空闲超时。
- project c.PBSClusterConfig.project = Unicode('')¶
与每个作业关联的项目。
- queue c.PBSClusterConfig.queue = Unicode('')¶
提交作业的队列。
- scheduler_cmd c.PBSClusterConfig.scheduler_cmd = Command()¶
启动 Dask scheduler 的 shell 命令。
- scheduler_cores c.PBSClusterConfig.scheduler_cores = Int(1)¶
Dask scheduler 可用的 CPU 核心数。
- scheduler_memory c.PBSClusterConfig.scheduler_memory = MemoryLimit('2 G')¶
Dask scheduler 可用的字节数。允许使用以下后缀
K -> Kibibytes
M -> Mebibytes
G -> Gibibytes
T -> Tebibytes
- scheduler_resource_list c.PBSClusterConfig.scheduler_resource_list = Unicode('select=1:ncpus={cores}:mem={memory}')¶
Scheduler 要使用的资源列表。
这是一个模板,接收以下字段:
cores
memory
- scheduler_setup c.PBSClusterConfig.scheduler_setup = Unicode('')¶
在 Dask scheduler 启动前运行的脚本。
- staging_directory c.PBSClusterConfig.staging_directory = Unicode('{home}/.dask-gateway/')¶
作业开始前用于存储文件的暂存目录 (staging directory)。
每个新集群都会创建一个子目录,用于存储该集群的临时文件。集群关闭时,该子目录将被移除。
此字段可以是模板,接收以下字段:
home(用户的主目录)
username(用户名)
- use_stagein c.PBSClusterConfig.use_stagein = Bool(True)¶
如果为 true,上面创建的暂存目录将在运行时使用
-Wstagein
指令复制到作业工作目录中。如果暂存目录位于网络文件系统上,您可以将其设置为 False,并依靠网络文件系统进行访问。
- worker_cmd c.PBSClusterConfig.worker_cmd = Command()¶
启动 Dask worker 的 shell 命令。
- worker_cores c.PBSClusterConfig.worker_cores = Int(1)¶
Dask worker 可用的 CPU 核心数。
- worker_memory c.PBSClusterConfig.worker_memory = MemoryLimit('2 G')¶
Dask worker 可用的字节数。允许使用以下后缀
K -> Kibibytes
M -> Mebibytes
G -> Gibibytes
T -> Tebibytes
- worker_resource_list c.PBSClusterConfig.worker_resource_list = Unicode('select=1:ncpus={cores}:mem={memory}')¶
Workers 要使用的资源列表。
这是一个模板,接收以下字段:
cores
memory
- worker_setup c.PBSClusterConfig.worker_setup = Unicode('')¶
在 Dask worker 启动前运行的脚本。
- worker_threads c.PBSClusterConfig.worker_threads = Int(0)¶
Dask worker 可用的线程数。
默认为
worker_cores
。
PBSBackend¶
- class dask_gateway_server.backends.jobqueue.pbs.PBSBackend(**kwargs: Any)¶
用于在 PBS 集群上部署 Dask 的后端。
- api_url c.PBSBackend.api_url = Unicode('')¶
内部组件(例如 Dask 集群)联系网关时将使用的地址。
默认为 {proxy_address}/{prefix}/api,如果应使用不同地址,则手动设置。
- backoff_base_delay c.PBSBackend.backoff_base_delay = Float(0.1)¶
失败后重试时的退避基础延迟(秒)。
如果操作失败,将在计算出的退避时间后重试,计算方法如下
` min(backoff_max_delay, backoff_base_delay * 2 ** num_failures) `
- backoff_max_delay c.PBSBackend.backoff_max_delay = Float(300)¶
失败后重试时的退避策略最大延迟(秒)。
- cancel_command c.PBSBackend.cancel_command = Unicode('')¶
作业取消命令的路径。
- check_timeouts_period c.PBSBackend.check_timeouts_period = Float(0.0)¶
超时检查之间的时间间隔(秒)。
这不应该太小(以保持开销较低),但应该小于
cluster_heartbeat_timeout
、cluster_start_timeout
和worker_start_timeout
。
- cluster_config_class c.PBSBackend.cluster_config_class = Type('dask_gateway_server.backends.jobqueue.pbs.PBSClusterConfig')¶
要使用的集群配置类
- cluster_heartbeat_period c.PBSBackend.cluster_heartbeat_period = Int(15)¶
集群到网关的心跳之间的时间间隔(秒)。
较小的周期会更快检测到失败的 worker,但会占用更多资源。较大的周期在出现故障时提供更慢的反馈。
- cluster_heartbeat_timeout c.PBSBackend.cluster_heartbeat_timeout = Float(0.0)¶
在杀死未能发送心跳的 Dask 集群之前的超时时间(秒)。
这应该大于
cluster_heartbeat_period
。默认为2 * cluster_heartbeat_period
。
- cluster_options c.PBSBackend.cluster_options = Union()¶
配置单个集群的用户选项。
允许用户在创建新集群时指定配置覆盖。有关更多信息,请参阅文档
- cluster_start_timeout c.PBSBackend.cluster_start_timeout = Float(60)¶
放弃启动 Dask 集群之前的超时时间(秒)。
- cluster_status_period c.PBSBackend.cluster_status_period = Float(30)¶
集群状态检查之间的时间间隔(秒)。
较小的周期会更快检测到失败的集群,但会占用更多资源。较大的周期在出现故障时提供更慢的反馈。
- dask_gateway_jobqueue_launcher c.PBSBackend.dask_gateway_jobqueue_launcher = Unicode('')¶
`dask-gateway-jobqueue-launcher` 可执行文件的路径。
- db_cleanup_period c.PBSBackend.db_cleanup_period = Float(600)¶
数据库清理任务之间的时间间隔(秒)。
这设置了从数据库中移除旧记录的频率。这不应该太小(以保持开销较低),但应该小于
db_cluster_max_age
(可能差一个数量级)。
- db_cluster_max_age c.PBSBackend.db_cluster_max_age = Float(86400)¶
保留已完成集群记录的最大时间(秒)。
每过
db_cleanup_period
,将从数据库中移除早于db_cluster_max_age
的已完成集群。
- db_debug c.PBSBackend.db_debug = Bool(False)¶
如果为 True,将记录所有数据库操作
- db_encrypt_keys c.PBSBackend.db_encrypt_keys = List()¶
用于加密数据库中私有数据的密钥列表。也可以通过环境变量
DASK_GATEWAY_ENCRYPT_KEYS
设置,其值为以;
分隔的加密密钥字符串。每个密钥应该是 base64 编码的 32 字节值,并且应该是密码学安全的随机值。如果没有其他选项,可以使用 openssl 通过以下命令生成单个密钥
$ openssl rand -base64 32
单个密钥有效,可以使用多个密钥来支持密钥轮换。
- db_url c.PBSBackend.db_url = Unicode('sqlite:///:memory:')¶
数据库的 URL。默认值仅为内存中的数据库。
如果不在内存中,还必须设置
db_encrypt_keys
。
- gateway_hostname c.PBSBackend.gateway_hostname = Unicode('')¶
运行 gateway 的节点的主机名。用于在 PBS 指令中引用本地主机。
- parallelism c.PBSBackend.parallelism = Int(20)¶
用于启动/停止集群的处理程序数量。
- status_command c.PBSBackend.status_command = Unicode('')¶
作业状态命令的路径。
- stop_clusters_on_shutdown c.PBSBackend.stop_clusters_on_shutdown = Bool(True)¶
是否在网关关闭时停止活动集群。
如果为 true,所有活动集群将在关闭网关前停止。设置为 False 将保留活动集群运行。
- submit_command c.PBSBackend.submit_command = Unicode('')¶
作业提交命令的路径。
- worker_start_failure_limit c.PBSBackend.worker_start_failure_limit = Int(3)¶
在集群被标记为失败之前,启动 worker 的失败尝试次数限制。
每个启动失败的 worker(超时除外)都会增加计数器。如果 worker 成功启动,计数器将重置。如果计数器超出此限制,集群将被标记为失败并关闭。
- worker_start_timeout c.PBSBackend.worker_start_timeout = Float(60)¶
放弃启动 Dask worker 之前的超时时间(秒)。
- worker_status_period c.PBSBackend.worker_status_period = Float(30)¶
worker 状态检查之间的时间间隔(秒)。
较小的周期会更快检测到失败的 worker,但会占用更多资源。较大的周期在出现故障时提供更慢的反馈。
SlurmClusterConfig¶
- class dask_gateway_server.backends.jobqueue.slurm.SlurmClusterConfig(**kwargs: Any)¶
在 SLURM 上运行时 Dask 集群的配置选项。
- account c.SlurmClusterConfig.account = Unicode('')¶
与每个作业关联的账号字符串 (Account string)。
- adaptive_period c.SlurmClusterConfig.adaptive_period = Float(3)¶
自适应伸缩检查之间的时间间隔(秒)。
较小的周期会降低响应集群负载变化时的伸缩延迟,但可能会导致网关服务器负载更高。
- cluster_max_cores c.SlurmClusterConfig.cluster_max_cores = Float(None)¶
此集群可用的最大核心数。
设置为
None
表示无核心限制(默认)。
- cluster_max_memory c.SlurmClusterConfig.cluster_max_memory = MemoryLimit(None)¶
此集群可用的最大内存量(字节)。允许使用以下后缀
K -> Kibibytes
M -> Mebibytes
G -> Gibibytes
T -> Tebibytes
设置为
None
表示无内存限制(默认)。
- cluster_max_workers c.SlurmClusterConfig.cluster_max_workers = Int(0)¶
此集群可用的最大 worker 数量。
请注意,这将在运行时与
cluster_max_cores
和cluster_max_memory
结合使用,以确定此集群实际可用的最大 worker 数量。
- environment c.SlurmClusterConfig.environment = Dict()¶
为 worker 和 scheduler 进程设置的环境变量。
- idle_timeout c.SlurmClusterConfig.idle_timeout = Float(0)¶
空闲集群自动关闭前的时间间隔(秒)。
设置为 0(默认)表示无空闲超时。
- partition c.SlurmClusterConfig.partition = Unicode('')¶
提交作业的分区。
- qos c.SlurmClusterConfig.qos = Unicode('')¶
与每个作业关联的 QOS 字符串。
- scheduler_cmd c.SlurmClusterConfig.scheduler_cmd = Command()¶
启动 Dask scheduler 的 shell 命令。
- scheduler_cores c.SlurmClusterConfig.scheduler_cores = Int(1)¶
Dask scheduler 可用的 CPU 核心数。
- scheduler_memory c.SlurmClusterConfig.scheduler_memory = MemoryLimit('2 G')¶
Dask scheduler 可用的字节数。允许使用以下后缀
K -> Kibibytes
M -> Mebibytes
G -> Gibibytes
T -> Tebibytes
- scheduler_setup c.SlurmClusterConfig.scheduler_setup = Unicode('')¶
在 Dask scheduler 启动前运行的脚本。
- staging_directory c.SlurmClusterConfig.staging_directory = Unicode('{home}/.dask-gateway/')¶
作业开始前用于存储文件的暂存目录 (staging directory)。
每个新集群都会创建一个子目录,用于存储该集群的临时文件。集群关闭时,该子目录将被移除。
此字段可以是模板,接收以下字段:
home(用户的主目录)
username(用户名)
- worker_cmd c.SlurmClusterConfig.worker_cmd = Command()¶
启动 Dask worker 的 shell 命令。
- worker_cores c.SlurmClusterConfig.worker_cores = Int(1)¶
Dask worker 可用的 CPU 核心数。
- worker_memory c.SlurmClusterConfig.worker_memory = MemoryLimit('2 G')¶
Dask worker 可用的字节数。允许使用以下后缀
K -> Kibibytes
M -> Mebibytes
G -> Gibibytes
T -> Tebibytes
- worker_setup c.SlurmClusterConfig.worker_setup = Unicode('')¶
在 Dask worker 启动前运行的脚本。
- worker_threads c.SlurmClusterConfig.worker_threads = Int(0)¶
Dask worker 可用的线程数。
默认为
worker_cores
。
SlurmBackend¶
- class dask_gateway_server.backends.jobqueue.slurm.SlurmBackend(**kwargs: Any)¶
用于在 Slurm 集群上部署 Dask 的后端。
- api_url c.SlurmBackend.api_url = Unicode('')¶
内部组件(例如 Dask 集群)联系网关时将使用的地址。
默认为 {proxy_address}/{prefix}/api,如果应使用不同地址,则手动设置。
- backoff_base_delay c.SlurmBackend.backoff_base_delay = Float(0.1)¶
失败后重试时的退避基础延迟(秒)。
如果操作失败,将在计算出的退避时间后重试,计算方法如下
` min(backoff_max_delay, backoff_base_delay * 2 ** num_failures) `
- backoff_max_delay c.SlurmBackend.backoff_max_delay = Float(300)¶
失败后重试时的退避策略最大延迟(秒)。
- cancel_command c.SlurmBackend.cancel_command = Unicode('')¶
作业取消命令的路径。
- check_timeouts_period c.SlurmBackend.check_timeouts_period = Float(0.0)¶
超时检查之间的时间间隔(秒)。
这不应该太小(以保持开销较低),但应该小于
cluster_heartbeat_timeout
、cluster_start_timeout
和worker_start_timeout
。
- cluster_config_class c.SlurmBackend.cluster_config_class = Type('dask_gateway_server.backends.jobqueue.slurm.SlurmClusterConfig')¶
要使用的集群配置类
- cluster_heartbeat_period c.SlurmBackend.cluster_heartbeat_period = Int(15)¶
集群到网关的心跳之间的时间间隔(秒)。
较小的周期会更快检测到失败的 worker,但会占用更多资源。较大的周期在出现故障时提供更慢的反馈。
- cluster_heartbeat_timeout c.SlurmBackend.cluster_heartbeat_timeout = Float(0.0)¶
在杀死未能发送心跳的 Dask 集群之前的超时时间(秒)。
这应该大于
cluster_heartbeat_period
。默认为2 * cluster_heartbeat_period
。
- cluster_options c.SlurmBackend.cluster_options = Union()¶
配置单个集群的用户选项。
允许用户在创建新集群时指定配置覆盖。有关更多信息,请参阅文档
- cluster_start_timeout c.SlurmBackend.cluster_start_timeout = Float(60)¶
放弃启动 Dask 集群之前的超时时间(秒)。
- cluster_status_period c.SlurmBackend.cluster_status_period = Float(30)¶
集群状态检查之间的时间间隔(秒)。
较小的周期会更快检测到失败的集群,但会占用更多资源。较大的周期在出现故障时提供更慢的反馈。
- dask_gateway_jobqueue_launcher c.SlurmBackend.dask_gateway_jobqueue_launcher = Unicode('')¶
`dask-gateway-jobqueue-launcher` 可执行文件的路径。
- db_cleanup_period c.SlurmBackend.db_cleanup_period = Float(600)¶
数据库清理任务之间的时间间隔(秒)。
这设置了从数据库中移除旧记录的频率。这不应该太小(以保持开销较低),但应该小于
db_cluster_max_age
(可能差一个数量级)。
- db_cluster_max_age c.SlurmBackend.db_cluster_max_age = Float(86400)¶
保留已完成集群记录的最大时间(秒)。
每过
db_cleanup_period
,将从数据库中移除早于db_cluster_max_age
的已完成集群。
- db_debug c.SlurmBackend.db_debug = Bool(False)¶
如果为 True,将记录所有数据库操作
- db_encrypt_keys c.SlurmBackend.db_encrypt_keys = List()¶
用于加密数据库中私有数据的密钥列表。也可以通过环境变量
DASK_GATEWAY_ENCRYPT_KEYS
设置,其值为以;
分隔的加密密钥字符串。每个密钥应该是 base64 编码的 32 字节值,并且应该是密码学安全的随机值。如果没有其他选项,可以使用 openssl 通过以下命令生成单个密钥
$ openssl rand -base64 32
单个密钥有效,可以使用多个密钥来支持密钥轮换。
- db_url c.SlurmBackend.db_url = Unicode('sqlite:///:memory:')¶
数据库的 URL。默认值仅为内存中的数据库。
如果不在内存中,还必须设置
db_encrypt_keys
。
- parallelism c.SlurmBackend.parallelism = Int(20)¶
用于启动/停止集群的处理程序数量。
- status_command c.SlurmBackend.status_command = Unicode('')¶
作业状态命令的路径。
- stop_clusters_on_shutdown c.SlurmBackend.stop_clusters_on_shutdown = Bool(True)¶
是否在网关关闭时停止活动集群。
如果为 true,所有活动集群将在关闭网关前停止。设置为 False 将保留活动集群运行。
- submit_command c.SlurmBackend.submit_command = Unicode('')¶
作业提交命令的路径。
- worker_start_failure_limit c.SlurmBackend.worker_start_failure_limit = Int(3)¶
在集群被标记为失败之前,启动 worker 的失败尝试次数限制。
每个启动失败的 worker(超时除外)都会增加计数器。如果 worker 成功启动,计数器将重置。如果计数器超出此限制,集群将被标记为失败并关闭。
- worker_start_timeout c.SlurmBackend.worker_start_timeout = Float(60)¶
放弃启动 Dask worker 之前的超时时间(秒)。
- worker_status_period c.SlurmBackend.worker_status_period = Float(30)¶
worker 状态检查之间的时间间隔(秒)。
较小的周期会更快检测到失败的 worker,但会占用更多资源。较大的周期在出现故障时提供更慢的反馈。
代理 (Proxy)¶
代理 (Proxy)¶
- class dask_gateway_server.proxy.Proxy(**kwargs: Any)¶
dask-gateway-server 代理。
- address c.Proxy.address = Unicode(':8000')¶
HTTP/HTTPS 代理应监听的地址。
格式应为
{hostname}:{port}
。其中
hostname
设置要侦听的主机名。设置为""
或"0.0.0.0"
以侦听所有接口。port
设置要侦听的端口。
- api_token c.Proxy.api_token = Unicode('')¶
代理 API 令牌。
一个 32 字节的十六进制编码随机字符串。通常使用
openssl
CLI 创建。$ openssl rand -hex 32
默认情况下从
DASK_GATEWAY_PROXY_TOKEN
环境变量加载。
- externally_managed c.Proxy.externally_managed = Bool(False)¶
代理进程是否由外部管理。
如果为 False(默认),代理进程将由 gateway 进程启动和停止。如果代理将通过某个外部管理器(例如
supervisord
)启动,请设置为 True。
- gateway_url c.Proxy.gateway_url = Unicode('')¶
代理连接到 gateway 服务器应使用的基本 URL。
- log_level c.Proxy.log_level = CaselessStrEnum('warn')¶
代理日志级别。
- max_events c.Proxy.max_events = Int(100)¶
保留的最大事件数(代理更改)。
落后于此数量的代理服务器将需要进行完全刷新。
- prefix c.Proxy.prefix = Unicode('')¶
HTTP/HTTPS 代理应提供服务的路径前缀。
此前缀将添加到所有注册到代理的路由之前。
- proxy_status_period c.Proxy.proxy_status_period = Float(30)¶
代理状态检查之间的间隔时间(秒)。
仅当
externally_managed
为 False 时适用。
- tcp_address c.Proxy.tcp_address = Unicode('')¶
TCP (scheduler) 代理应监听的地址。
应该采用
{hostname}:{port}
的形式其中
hostname
设置要侦听的主机名。设置为""
或"0.0.0.0"
以侦听所有接口。port
设置要侦听的端口。
如果未指定,将默认为
address
。
- tls_cert c.Proxy.tls_cert = Unicode('')¶
HTTP 代理公共 URL 的 TLS 证书文件路径。
设置此项时,您也应该设置
tls_key
。
- tls_key c.Proxy.tls_key = Unicode('')¶
HTTP 代理公共 URL 的 TLS 密钥文件路径。
设置此项时,您还应该设置 tls_cert。
集群管理器选项 (Cluster Manager Options)¶
- class dask_gateway_server.options.Options(*fields, handler=None)¶
暴露的集群选项的声明性规范。
- 参数 (Parameters)
*fields (Field) – 零个或多个可配置字段。
handler (可调用对象, 可选) – 一个签名形如
handler(options)
或handler(options, user)
的可调用对象,其中options
是经过验证的用户选项字典,user
是该用户的User
模型。应返回一个配置覆盖字典,以便转发给集群管理器。如果未提供,默认行为是返回未更改的 options。
示例 (Example)
这里我们暴露选项供用户配置
c.Backend.worker_cores
和c.Backend.worker_memory
。我们对每个资源设置了界限,以防止用户请求过大的 worker。handler 用于将用户指定的 GiB 内存转换为字节(正如c.Backend.worker_memory
所期望的)。from dask_gateway_server.options import Options, Integer, Float def options_handler(options): return { "worker_cores": options.worker_cores, "worker_memory": int(options.worker_memory * 2 ** 30) } c.Backend.DaskGateway.cluster_options = Options( Integer("worker_cores", default=1, min=1, max=4, label="Worker Cores"), Float("worker_memory", default=1, min=1, max=8, label="Worker Memory (GiB)"), handler=options_handler, )
- class dask_gateway_server.options.Integer(field, default=0, min=None, max=None, label=None, target=None)¶
一个整数字段,带有可选界限。
- 参数 (Parameters)
field (str) – 要使用的字段名称。必须是有效的 Python 变量名。这将是用户以编程方式设置此字段时使用的关键字(例如
"worker_cores"
)。default (int, 可选) – 默认值。默认为 0。
min (int, 可选) – 最小有效值(包含)。如果未设置则无界限。
max (int, 可选) – 最大有效值(包含)。如果未设置则无界限。
label (str, 可选) – 一个人类可读的标签,将用于 GUI 表示(例如
"Worker Cores"
)。如果未提供,将使用field
。target (str, 可选) – 在处理后的选项字典中要设置的目标参数。必须是有效的 Python 变量名。如果未提供,将使用
field
。
- class dask_gateway_server.options.Float(field, default=0, min=None, max=None, label=None, target=None)¶
一个浮点数字段,带有可选界限。
- 参数 (Parameters)
field (str) – 要使用的字段名称。必须是有效的 Python 变量名。这将是用户以编程方式设置此字段时使用的关键字(例如
"worker_cores"
)。default (float, 可选) – 默认值。默认为 0。
min (float, 可选) – 最小有效值(包含)。如果未设置则无界限。
max (float, 可选) – 最大有效值(包含)。如果未设置则无界限。
label (str, 可选) – 一个人类可读的标签,将用于 GUI 表示(例如
"Worker Cores"
)。如果未提供,将使用field
。target (str, 可选) – 在处理后的选项字典中要设置的目标参数。必须是有效的 Python 变量名。如果未提供,将使用
field
。
- class dask_gateway_server.options.String(field, default='', label=None, target=None)¶
一个字符串字段。
- 参数 (Parameters)
field (str) – 要使用的字段名称。必须是有效的 Python 变量名。这将是用户以编程方式设置此字段时使用的关键字(例如
"worker_cores"
)。default (str, optional) – 默认值。默认为空字符串 (
""
)。label (str, 可选) – 一个人类可读的标签,将用于 GUI 表示(例如
"Worker Cores"
)。如果未提供,将使用field
。target (str, 可选) – 在处理后的选项字典中要设置的目标参数。必须是有效的 Python 变量名。如果未提供,将使用
field
。
- class dask_gateway_server.options.Bool(field, default=False, label=None, target=None)¶
一个布尔字段。
- 参数 (Parameters)
field (str) – 要使用的字段名称。必须是有效的 Python 变量名。这将是用户以编程方式设置此字段时使用的关键字(例如
"worker_cores"
)。default (bool, optional) – 默认值。默认为 False。
label (str, 可选) – 一个人类可读的标签,将用于 GUI 表示(例如
"Worker Cores"
)。如果未提供,将使用field
。target (str, 可选) – 在处理后的选项字典中要设置的目标参数。必须是有效的 Python 变量名。如果未提供,将使用
field
。
- class dask_gateway_server.options.Select(field, options, default=None, label=None, target=None)¶
一个选择字段,允许用户从几个选项中选择。
- 参数 (Parameters)
field (str) – 要使用的字段名称。必须是有效的 Python 变量名。这将是用户以编程方式设置此字段时使用的关键字(例如
"worker_cores"
)。options (list) – 有效选项的列表。元素可以是 (key, value) 元组 (
(key, value)
),或者只是key
(这种情况下 value 与 key 相同)。Value 可以是任何 Python 对象,key 必须是字符串。default (str, optional) – 默认选项的 key。默认为列表中第一个选项。
label (str, 可选) – 一个人类可读的标签,将用于 GUI 表示(例如
"Worker Cores"
)。如果未提供,将使用field
。target (str, 可选) – 在处理后的选项字典中要设置的目标参数。必须是有效的 Python 变量名。如果未提供,将使用
field
。
- class dask_gateway_server.options.Mapping(field, default=None, label=None, target=None)¶
一个映射字段。
- 参数 (Parameters)
field (str) – 要使用的字段名称。必须是有效的 Python 变量名。这将是用户以编程方式设置此字段时使用的关键字(例如
"worker_cores"
)。default (dict, optional) – 默认值。默认为空字典 (
{}
)。label (str, 可选) – 一个人类可读的标签,将用于 GUI 表示(例如
"Worker Cores"
)。如果未提供,将使用field
。target (str, 可选) – 在处理后的选项字典中要设置的目标参数。必须是有效的 Python 变量名。如果未提供,将使用
field
。
模型¶
用户¶
- class dask_gateway_server.models.User(name, groups=(), admin=False)¶
一个用户记录。
- 参数 (Parameters)
name (str) – 用户名
groups (sequence, optional) – 用户所属的组集合。默认为不属于任何组。
admin (bool, optional) – 用户是否是管理员用户。默认为 False。
集群¶
- class dask_gateway_server.models.Cluster(name, username, token, options, config, status, scheduler_address='', dashboard_address='', api_address='', tls_cert=b'', tls_key=b'', start_time=None, stop_time=None)¶
一个集群记录。
- 参数 (Parameters)
name (str) – 集群名称。一个可用于在网关中标识集群的唯一字符串。
username (str) – 启动此集群的用户名。
token (str) – 与此集群关联的 API token。用于对集群与网关进行认证。
options (dict) – 启动此集群时提供的规范化配置选项集合。这些值面向用户,不一定与后端上的
ClusterConfig
选项相对应。config (dict) – 此集群的
ClusterConfig
的序列化版本。status (ClusterStatus) – 集群的状态。
scheduler_address (str) – 调度器地址。如果集群未运行,则为空字符串。
dashboard_address (str) – 仪表盘地址。如果集群未运行,或集群上没有运行仪表盘,则为空字符串。
api_address (str) – 集群的 API 地址。如果集群未运行,则为空字符串。
tls_cert (bytes) – 与集群关联的 TLS 证书凭据。
tls_key (bytes) – 与集群关联的 TLS 密钥凭据。
start_time (int or None) – 自 epoch 以来的启动时间(毫秒),如果未启动则为 None。
stop_time (int or None) – 自 epoch 以来的停止时间(毫秒),如果未停止则为 None。