taolib.testing.email_service.providers.failover#
提供商故障转移管理器。
实现多提供商自动故障转移机制,包含简化的电路断路器逻辑。
Attributes#
Classes#
内部提供商状态跟踪。 |
|
提供商故障转移管理器。 |
Module Contents#
- taolib.testing.email_service.providers.failover.logger#
- class taolib.testing.email_service.providers.failover._ProviderState#
内部提供商状态跟踪。
- last_failure: datetime.datetime | None = None#
- cooldown_until: datetime.datetime | None = None#
- class taolib.testing.email_service.providers.failover.ProviderFailoverManager(providers: list[tuple[taolib.testing.email_service.providers.protocol.EmailProviderProtocol, int]], max_consecutive_failures: int = 3, cooldown_seconds: int = 60)#
提供商故障转移管理器。
按优先级排序提供商,自动在主提供商失败时切换到备用提供商。 每个提供商独立跟踪健康状态,失败超过阈值后进入冷却期。
- _states: list[_ProviderState]#
- _max_failures = 3#
- _cooldown_seconds = 60#
- async send(email: taolib.testing.email_service.models.email.EmailDocument) taolib.testing.email_service.providers.protocol.SendResult#
通过可用提供商发送邮件,自动故障转移。
- 参数:
email -- 邮件文档
- 返回:
发送结果
- 抛出:
AllProvidersFailedError -- 所有提供商均失败
- async send_bulk(emails: list[taolib.testing.email_service.models.email.EmailDocument]) list[taolib.testing.email_service.providers.protocol.SendResult]#
批量发送邮件。
- 参数:
emails -- 邮件文档列表
- 返回:
发送结果列表
- get_provider_statuses() list[taolib.testing.email_service.providers.protocol.ProviderHealthStatus]#
获取所有提供商状态。
- 返回:
提供商健康状态列表