后台回调#

参考:background-callbacks

在 Dash 2.6 中引入了对 @dash.callback 的后台回调支持。如果您使用的是较早版本的 Dash 2.x,可以使用 long_callback 进行长时间运行的回调。

大多数网络服务器默认的超时时间是 30 秒,这对于需要更长时间完成的回调来说是个问题。虽然您可以在网络服务器上增加超时时间,但这样做的风险在于,长时间的回调可能会占用您应用的所有工作线程,阻止其他请求的处理。后台回调通过在单独的后台队列中运行它们,为使用长时间运行的回调提供了可扩展的解决方案。在后台队列中,回调会按照它们进入的顺序逐一执行,由专门的队列工作线程(们)处理。

您可以通过在回调上设置 background=True 来配置回调在后台运行。设置了 background=True 的回调会使用您配置的后端来运行回调逻辑。目前有两个选项:

  • DiskCache 后端,它在单独的进程中运行回调逻辑,并使用 diskcache 库将结果存储到磁盘上。这是本地开发时最简单的后端,但不建议在生产环境中使用。(pip install dash[diskcache])

  • Celery 后端,它在 Celery worker 中运行回调逻辑,并通过像 Redis 这样的 Celery 代理将结果返回给 Dash 应用。这在生产环境中是推荐的,因为与 DiskCache 不同,它会排队后台回调,由专门的 Celery worker(们)按照接收到的顺序逐一运行。Celery 是一个广泛采用的、生产就绪的任务队列库。(pip install dash[celery])

要使用后台回调,您首先需要使用所选的后端配置管理器。@dash.callback 装饰器需要这个管理器实例。您可以将管理器实例作为 background_callback_manager 关键字参数提供给 dash.Dash app 构造函数,或者作为 @dash.callback 装饰器的 manager 参数。

在接下来的五个示例中,我们将更详细地讨论如何实现后台回调。

简单的后台回调#

这里是后台回调的简单示例,它使用按钮被点击的次数来更新 html.P 元素。该回调使用 time.sleep 来模拟长时间运行的操作。

import time
import os

from dash import Dash, DiskcacheManager, CeleryManager, Input, Output, html, callback

if 'REDIS_URL' in os.environ:
    # Use Redis & Celery if REDIS_URL set as an env variable
    from celery import Celery
    celery_app = Celery(__name__, broker=os.environ['REDIS_URL'], backend=os.environ['REDIS_URL'])
    background_callback_manager = CeleryManager(celery_app)

else:
    # Diskcache for non-production apps when developing locally
    import diskcache
    cache = diskcache.Cache("./cache")
    background_callback_manager = DiskcacheManager(cache)

app = Dash(__name__)

app.layout = html.Div(
    [
        html.Div([html.P(id="paragraph_id", children=["Button not clicked"])]),
        html.Button(id="button_id", children="Run Job!"),
    ]
)

@callback(
    output=Output("paragraph_id", "children"),
    inputs=Input("button_id", "n_clicks"),
    background=True,
    manager=background_callback_manager,
)
def update_clicks(n_clicks):
    time.sleep(2.0)
    return [f"Clicked {n_clicks} times"]


# if __name__ == "__main__":
#     app.run(debug=True)

示例 2:在回调运行期间禁用按钮#

注意,在之前的例子中,没有视觉上的指示表明后台回调正在运行。用户可能会在原始任务完成之前多次点击 “Run Job!” 按钮。你也可以在回调运行时禁用按钮,并在回调完成后重新启用它。

要做到这一点,可以使用 @dash.callbackrunning 参数。这个参数接受一个包含 3 个元素的元组列表。每个元组的第一个元素必须是一个引用 app 布局中组件属性的 Output 依赖对象。第二个元素是回调运行时属性应该设置的值,第三个元素是回调完成时属性应该设置的值。

这个例子使用 running 参数在回调运行时将按钮的 disabled 属性设置为 True,并在回调完成时设置为 False

注意:在这个例子中,background_callback_manager 是提供给 dash.Dash app 构造函数而不是 @dash.callback 装饰器。

import time
import os

from dash import Dash, DiskcacheManager, CeleryManager, Input, Output, html, callback

if 'REDIS_URL' in os.environ:
    # Use Redis & Celery if REDIS_URL set as an env variable
    from celery import Celery
    celery_app = Celery(__name__, broker=os.environ['REDIS_URL'], backend=os.environ['REDIS_URL'])
    background_callback_manager = CeleryManager(celery_app)

else:
    # Diskcache for non-production apps when developing locally
    import diskcache
    cache = diskcache.Cache("./cache")
    background_callback_manager = DiskcacheManager(cache)

app = Dash(__name__, background_callback_manager=background_callback_manager)

app.layout = html.Div(
    [
        html.Div([html.P(id="paragraph_id", children=["Button not clicked"])]),
        html.Button(id="button_id", children="Run Job!"),
    ]
)

@callback(
    output=Output("paragraph_id", "children"),
    inputs=Input("button_id", "n_clicks"),
    background=True,
    running=[
        (Output("button_id", "disabled"), True, False),
    ],
)
def update_clicks(n_clicks):
    time.sleep(2.0)
    return [f"Clicked {n_clicks} times"]


# if __name__ == "__main__":
#     app.run(debug=True)

示例 3:可取消的回调#

这个例子在之前的例子的基础上增加了支持取消长时间运行的回调,通过使用 @dash.callback 装饰器的 cancel 参数。我们将 cancel 参数设置为引用 app 布局中组件属性的 Input 依赖对象列表。当一个回调正在运行时,如果这个属性的值发生变化,回调将被取消。注意,属性的值并不重要——任何值的变化都会取消正在运行的任务(如果有的话)。

import time
import os

from dash import Dash, DiskcacheManager, CeleryManager, Input, Output, html, callback

if 'REDIS_URL' in os.environ:
    # Use Redis & Celery if REDIS_URL set as an env variable
    from celery import Celery
    celery_app = Celery(__name__, broker=os.environ['REDIS_URL'], backend=os.environ['REDIS_URL'])
    background_callback_manager = CeleryManager(celery_app)

else:
    # Diskcache for non-production apps when developing locally
    import diskcache
    cache = diskcache.Cache("./cache")
    background_callback_manager = DiskcacheManager(cache)

app = Dash(__name__, background_callback_manager=background_callback_manager)

app.layout = html.Div(
    [
        html.Div([html.P(id="paragraph_id", children=["Button not clicked"])]),
        html.Button(id="button_id", children="Run Job!"),
        html.Button(id="cancel_button_id", children="Cancel Running Job!"),
    ]
)

@callback(
    output=Output("paragraph_id", "children"),
    inputs=Input("button_id", "n_clicks"),
    background=True,
    running=[
        (Output("button_id", "disabled"), True, False),
        (Output("cancel_button_id", "disabled"), False, True),
    ],
    cancel=[Input("cancel_button_id", "n_clicks")],
)
def update_clicks(n_clicks):
    time.sleep(2.0)
    return [f"Clicked {n_clicks} times"]


# if __name__ == "__main__":
#     app.run(debug=True)

示例 4:进度条#

这个例子使用 @dash.callback 装饰器的 progress 参数来在回调运行时更新进度条。我们将 progress 参数设置为引用 app 布局中组件属性的 Output 依赖分组。

当一个依赖分组被分配给 @dash.callback progress 参数时,被装饰的函数将以一个新的特殊参数作为函数的第一个参数被调用。这个特殊参数,在下面的示例中命名为 set_progress,是一个函数句柄,被装饰的函数调用它以便向 app 提供关于其当前进度的更新。set_progress 函数接受一个参数,该参数对应于传递给 @dash.callbackprogress 参数的 Output 依赖分组中指定的属性分组。

import time
import os

from dash import Dash, DiskcacheManager, CeleryManager, Input, Output, html, callback

if 'REDIS_URL' in os.environ:
    # Use Redis & Celery if REDIS_URL set as an env variable
    from celery import Celery
    celery_app = Celery(__name__, broker=os.environ['REDIS_URL'], backend=os.environ['REDIS_URL'])
    background_callback_manager = CeleryManager(celery_app)

else:
    # Diskcache for non-production apps when developing locally
    import diskcache
    cache = diskcache.Cache("./cache")
    background_callback_manager = DiskcacheManager(cache)

app = Dash(__name__, background_callback_manager=background_callback_manager)

app.layout = html.Div(
    [
        html.Div(
            [
                html.P(id="paragraph_id", children=["Button not clicked"]),
                html.Progress(id="progress_bar", value="0"),
            ]
        ),
        html.Button(id="button_id", children="Run Job!"),
        html.Button(id="cancel_button_id", children="Cancel Running Job!"),
    ]
)

@callback(
    output=Output("paragraph_id", "children"),
    inputs=Input("button_id", "n_clicks"),
    background=True,
    running=[
        (Output("button_id", "disabled"), True, False),
        (Output("cancel_button_id", "disabled"), False, True),
        (
            Output("paragraph_id", "style"),
            {"visibility": "hidden"},
            {"visibility": "visible"},
        ),
        (
            Output("progress_bar", "style"),
            {"visibility": "visible"},
            {"visibility": "hidden"},
        ),
    ],
    cancel=Input("cancel_button_id", "n_clicks"),
    progress=[Output("progress_bar", "value"), Output("progress_bar", "max")],
    prevent_initial_call=True
)
def update_progress(set_progress, n_clicks):
    total = 5
    for i in range(total + 1):
        set_progress((str(i), str(total)))
        time.sleep(1)

    return f"Clicked {n_clicks} times"


# if __name__ == "__main__":
#     app.run(debug=True)

示例 5:进度条图表#

@dash.callback 装饰器的 progress 参数可以用来更新任意组件属性。这个例子创建并更新了 Plotly 柱状图,以显示当前的计算状态。

这个例子还使用了 progress_default 参数来指定一组值,当回调不在进行时,这组值应该被分配给 progress 参数指定的组件。如果没有提供 progress_default,那么在回调不运行时,progress 中指定的所有依赖属性都被设置为 None。在这种情况下,progress_default 被设置为一个宽度为零的柱状图。

import time
import os

from dash import Dash, DiskcacheManager, CeleryManager, Input, Output, html, dcc, callback
import plotly.graph_objects as go

if 'REDIS_URL' in os.environ:
    # Use Redis & Celery if REDIS_URL set as an env variable
    from celery import Celery
    celery_app = Celery(__name__, broker=os.environ['REDIS_URL'], backend=os.environ['REDIS_URL'])
    background_callback_manager = CeleryManager(celery_app)

else:
    # Diskcache for non-production apps when developing locally
    import diskcache
    cache = diskcache.Cache("./cache")
    background_callback_manager = DiskcacheManager(cache)

def make_progress_graph(progress, total):
    progress_graph = (
        go.Figure(data=[go.Bar(x=[progress])])
        .update_xaxes(range=[0, total])
        .update_yaxes(
            showticklabels=False,
        )
        .update_layout(height=100, margin=dict(t=20, b=40))
    )
    return progress_graph


app = Dash(__name__, background_callback_manager=background_callback_manager)

app.layout = html.Div(
    [
        html.Div(
            [
                html.P(id="paragraph_id", children=["Button not clicked"]),
                dcc.Graph(id="progress_bar_graph", figure=make_progress_graph(0, 10)),
            ]
        ),
        html.Button(id="button_id", children="Run Job!"),
        html.Button(id="cancel_button_id", children="Cancel Running Job!"),
    ]
)

@callback(
    output=Output("paragraph_id", "children"),
    inputs=Input("button_id", "n_clicks"),
    background=True,
    running=[
        (Output("button_id", "disabled"), True, False),
        (Output("cancel_button_id", "disabled"), False, True),
        (
            Output("paragraph_id", "style"),
            {"visibility": "hidden"},
            {"visibility": "visible"},
        ),
        (
            Output("progress_bar_graph", "style"),
            {"visibility": "visible"},
            {"visibility": "hidden"},
        ),
    ],
    cancel=[Input("cancel_button_id", "n_clicks")],
    progress=Output("progress_bar_graph", "figure"),
    progress_default=make_progress_graph(0, 10)
)
def update_progress(set_progress, n_clicks):
    total = 10
    for i in range(total):
        time.sleep(0.5)
        set_progress(make_progress_graph(i, 10))

    return [f"Clicked {n_clicks} times"]


# if __name__ == "__main__":
#     app.run(debug=True)

为什么使用任务队列?#

当你的应用程序部署在生产环境中时,有限数量的 CPU 会为该应用程序处理请求。在生产环境中部署时,执行时间超过 30 秒的回调通常会经历超时。即使是执行时间少于 30 秒的回调,在多个用户同时访问你的应用程序时,也可能占用所有可用的服务器资源。当所有的 CPU 都在处理回调时,新访问者看到的是一个空白屏幕,最终会出现 “服务器超时” 的消息。

任务队列是解决这些超时问题的一个方案。就像为你的 Dash 应用程序服务的 web 进程一样,任务队列也由一组专用的 CPU 工作线程运行。这些工作线程会逐一处理任务,并且不会受到超时的限制。当任务队列的工作线程在处理数据时,为 Dash 应用程序服务的 web 进程和常规回调会显示信息加载屏幕、进度条以及任务队列的结果。最终用户永远不会看到超时,并且始终看到一个响应迅速的应用程序。