taolib.remote.probe_runner 源代码

"""远端探测执行器。

默认流程:
1. 连接远端并执行 uname_cmd,验证连接并提取基础信息;
2. 在 prefix( tools_env_cmd + conda_activate_cmd ) 的叠加上下文中:

   - 执行 check_conda_cmd 判断 conda 是否可用;
   - conda 可用时执行 probe_cmd。
"""
from collections.abc import Mapping
from dataclasses import dataclass
from typing import Any

from .connection import ConnectionFactory, ConnectionLike, RunResult, fabric_connection_factory
from .errors import RemoteConfigError, RemoteExecutionError
from .probe_models import RemoteProbeCommands, RemoteProbeReport, RemoteProbeRunOptions
from .session import remote_prefixes


[文档] def validate_ssh_config_minimal(ssh_config: Mapping[str, Any]) -> None: """校验最小可用 SSH 配置。 该校验仅保证能建立最基础的 SSH 连接: - host: 非空字符串 - user: 非空字符串 """ host = ssh_config.get("host") user = ssh_config.get("user") if not isinstance(host, str) or not host.strip(): raise RemoteConfigError("SSH 配置缺少 host") if not isinstance(user, str) or not user.strip(): raise RemoteConfigError("SSH 配置缺少 user")
[文档] def run_remote_handling_interrupt(connection: ConnectionLike, command: str, **kwargs: Any) -> RunResult: """执行远端命令并尽可能将 Windows 下的中断异常归一化为 KeyboardInterrupt。 某些 Windows 终端场景下,中断可能以 `ValueError: I/O operation on closed file` 形式抛出。 为保持调用侧统一处理逻辑,这里将其映射为 KeyboardInterrupt。 """ try: return connection.run(command, **kwargs) except KeyboardInterrupt: raise except ValueError as exc: message = str(exc).lower() if "closed file" in message: raise KeyboardInterrupt from exc raise
@dataclass(frozen=True, slots=True)
[文档] class RemoteProber: """远端探测执行器(可注入连接工厂,便于测试与扩展)。"""
[文档] connection_factory: ConnectionFactory | None = None
[文档] commands: RemoteProbeCommands = RemoteProbeCommands()
[文档] options: RemoteProbeRunOptions = RemoteProbeRunOptions()
[文档] def probe(self, ssh_config: Mapping[str, Any]) -> RemoteProbeReport: validate_ssh_config_minimal(ssh_config) ssh_kwargs = dict(ssh_config) merged_run_kwargs = self.options.merged_run_kwargs() factory = self.connection_factory if factory is None or factory is fabric_connection_factory: factory = fabric_connection_factory() with factory(**ssh_kwargs) as conn: uname_result = run_remote_handling_interrupt( conn, self.commands.uname_cmd, **{"hide": True, **merged_run_kwargs}, ) if not bool(getattr(uname_result, "ok", False)): raise RemoteExecutionError( "uname 执行失败,可能连接失败或命令错误", command=self.commands.uname_cmd, ) uname = (getattr(uname_result, "stdout", "") or "").strip() if not uname: raise RemoteExecutionError( "uname 输出为空,可能连接失败或命令未执行", command=self.commands.uname_cmd, ) with remote_prefixes(conn, self.commands.tools_env_cmd, self.commands.conda_activate_cmd): combined_probe_cmd = ( f"{self.commands.check_conda_cmd} >/dev/null 2>&1 || exit 127; {self.commands.probe_cmd}" ) combined_result = run_remote_handling_interrupt( conn, combined_probe_cmd, **{"warn": True, **merged_run_kwargs}, ) if getattr(combined_result, "ok", False): return RemoteProbeReport( uname=uname, conda_available=True, probe_attempted=True, probe_ok=True, ) exited = getattr(combined_result, "exited", None) if exited == 127: if self.options.raise_on_conda_missing: raise RemoteExecutionError( "conda 不可用或未找到", command=self.commands.check_conda_cmd, ) return RemoteProbeReport( uname=uname, conda_available=False, probe_attempted=False, probe_ok=None, ) if exited is not None: if self.options.raise_on_probe_failure: raise RemoteExecutionError("探测命令执行失败", command=self.commands.probe_cmd) return RemoteProbeReport( uname=uname, conda_available=True, probe_attempted=True, probe_ok=False, ) conda_result = run_remote_handling_interrupt( conn, self.commands.check_conda_cmd, **{"warn": True, "hide": True, **merged_run_kwargs}, ) if not getattr(conda_result, "ok", False): if self.options.raise_on_conda_missing: raise RemoteExecutionError( "conda 不可用或未找到", command=self.commands.check_conda_cmd, ) return RemoteProbeReport( uname=uname, conda_available=False, probe_attempted=False, probe_ok=None, ) if self.options.raise_on_probe_failure: raise RemoteExecutionError("探测命令执行失败", command=self.commands.probe_cmd) return RemoteProbeReport( uname=uname, conda_available=True, probe_attempted=True, probe_ok=False, )
[文档] def probe_remote( ssh_config: Mapping[str, Any], *, tools_env_cmd: str, conda_activate_cmd: str, probe_cmd: str, encoding: str, check_conda_cmd: str, uname_cmd: str, connection_factory: ConnectionFactory | None, run_kwargs: Mapping[str, Any] | None, raise_on_conda_missing: bool, raise_on_probe_failure: bool, ) -> RemoteProbeReport: """兼容接口:连接远端并执行探测命令,返回结构化结果。 该函数保持既有 `taolib.remote.probe.probe_remote` 形态不变。 更推荐的扩展方式是直接使用 RemoteProber,并通过 RemoteProbeCommands/RemoteProbeRunOptions 注入自定义行为。 """ commands = RemoteProbeCommands( tools_env_cmd=tools_env_cmd, conda_activate_cmd=conda_activate_cmd, probe_cmd=probe_cmd, check_conda_cmd=check_conda_cmd, uname_cmd=uname_cmd, ) options = RemoteProbeRunOptions( encoding=encoding, run_kwargs=run_kwargs, raise_on_conda_missing=raise_on_conda_missing, raise_on_probe_failure=raise_on_probe_failure, ) factory = connection_factory or fabric_connection_factory() return RemoteProber(connection_factory=factory, commands=commands, options=options).probe(ssh_config)