后台回调#
在 Dash 2.6 中引入了对 @dash.callback
的后台回调支持。如果您使用的是较早版本的 Dash 2.x,可以使用 long_callback
进行长时间运行的回调。
大多数网络服务器默认的超时时间是 30 秒,这对于需要更长时间完成的回调来说是个问题。虽然您可以在网络服务器上增加超时时间,但这样做的风险在于,长时间的回调可能会占用您应用的所有工作线程,阻止其他请求的处理。后台回调通过在单独的后台队列中运行它们,为使用长时间运行的回调提供了可扩展的解决方案。在后台队列中,回调会按照它们进入的顺序逐一执行,由专门的队列工作线程(们)处理。
您可以通过在回调上设置 background=True
来配置回调在后台运行。设置了 background=True
的回调会使用您配置的后端来运行回调逻辑。目前有两个选项:
DiskCache
后端,它在单独的进程中运行回调逻辑,并使用diskcache
库将结果存储到磁盘上。这是本地开发时最简单的后端,但不建议在生产环境中使用。(pip install dash[diskcache]
)Celery 后端,它在 Celery worker 中运行回调逻辑,并通过像 Redis 这样的 Celery 代理将结果返回给 Dash 应用。这在生产环境中是推荐的,因为与 DiskCache 不同,它会排队后台回调,由专门的 Celery worker(们)按照接收到的顺序逐一运行。Celery 是一个广泛采用的、生产就绪的任务队列库。(
pip install dash[celery]
)
要使用后台回调,您首先需要使用所选的后端配置管理器。@dash.callback
装饰器需要这个管理器实例。您可以将管理器实例作为 background_callback_manager
关键字参数提供给 dash.Dash
app
构造函数,或者作为 @dash.callback
装饰器的 manager
参数。
在接下来的五个示例中,我们将更详细地讨论如何实现后台回调。
简单的后台回调#
这里是后台回调的简单示例,它使用按钮被点击的次数来更新 html.P
元素。该回调使用 time.sleep
来模拟长时间运行的操作。
import time
import os
from dash import Dash, DiskcacheManager, CeleryManager, Input, Output, html, callback
if 'REDIS_URL' in os.environ:
# Use Redis & Celery if REDIS_URL set as an env variable
from celery import Celery
celery_app = Celery(__name__, broker=os.environ['REDIS_URL'], backend=os.environ['REDIS_URL'])
background_callback_manager = CeleryManager(celery_app)
else:
# Diskcache for non-production apps when developing locally
import diskcache
cache = diskcache.Cache("./cache")
background_callback_manager = DiskcacheManager(cache)
app = Dash(__name__)
app.layout = html.Div(
[
html.Div([html.P(id="paragraph_id", children=["Button not clicked"])]),
html.Button(id="button_id", children="Run Job!"),
]
)
@callback(
output=Output("paragraph_id", "children"),
inputs=Input("button_id", "n_clicks"),
background=True,
manager=background_callback_manager,
)
def update_clicks(n_clicks):
time.sleep(2.0)
return [f"Clicked {n_clicks} times"]
# if __name__ == "__main__":
# app.run(debug=True)
示例 2:在回调运行期间禁用按钮#
注意,在之前的例子中,没有视觉上的指示表明后台回调正在运行。用户可能会在原始任务完成之前多次点击 “Run Job!” 按钮。你也可以在回调运行时禁用按钮,并在回调完成后重新启用它。
要做到这一点,可以使用 @dash.callback
的 running
参数。这个参数接受一个包含 3 个元素的元组列表。每个元组的第一个元素必须是一个引用 app
布局中组件属性的 Output
依赖对象。第二个元素是回调运行时属性应该设置的值,第三个元素是回调完成时属性应该设置的值。
这个例子使用 running
参数在回调运行时将按钮的 disabled
属性设置为 True
,并在回调完成时设置为 False
。
注意:在这个例子中,background_callback_manager
是提供给 dash.Dash app
构造函数而不是 @dash.callback
装饰器。
import time
import os
from dash import Dash, DiskcacheManager, CeleryManager, Input, Output, html, callback
if 'REDIS_URL' in os.environ:
# Use Redis & Celery if REDIS_URL set as an env variable
from celery import Celery
celery_app = Celery(__name__, broker=os.environ['REDIS_URL'], backend=os.environ['REDIS_URL'])
background_callback_manager = CeleryManager(celery_app)
else:
# Diskcache for non-production apps when developing locally
import diskcache
cache = diskcache.Cache("./cache")
background_callback_manager = DiskcacheManager(cache)
app = Dash(__name__, background_callback_manager=background_callback_manager)
app.layout = html.Div(
[
html.Div([html.P(id="paragraph_id", children=["Button not clicked"])]),
html.Button(id="button_id", children="Run Job!"),
]
)
@callback(
output=Output("paragraph_id", "children"),
inputs=Input("button_id", "n_clicks"),
background=True,
running=[
(Output("button_id", "disabled"), True, False),
],
)
def update_clicks(n_clicks):
time.sleep(2.0)
return [f"Clicked {n_clicks} times"]
# if __name__ == "__main__":
# app.run(debug=True)
示例 3:可取消的回调#
这个例子在之前的例子的基础上增加了支持取消长时间运行的回调,通过使用 @dash.callback
装饰器的 cancel
参数。我们将 cancel
参数设置为引用 app
布局中组件属性的 Input
依赖对象列表。当一个回调正在运行时,如果这个属性的值发生变化,回调将被取消。注意,属性的值并不重要——任何值的变化都会取消正在运行的任务(如果有的话)。
import time
import os
from dash import Dash, DiskcacheManager, CeleryManager, Input, Output, html, callback
if 'REDIS_URL' in os.environ:
# Use Redis & Celery if REDIS_URL set as an env variable
from celery import Celery
celery_app = Celery(__name__, broker=os.environ['REDIS_URL'], backend=os.environ['REDIS_URL'])
background_callback_manager = CeleryManager(celery_app)
else:
# Diskcache for non-production apps when developing locally
import diskcache
cache = diskcache.Cache("./cache")
background_callback_manager = DiskcacheManager(cache)
app = Dash(__name__, background_callback_manager=background_callback_manager)
app.layout = html.Div(
[
html.Div([html.P(id="paragraph_id", children=["Button not clicked"])]),
html.Button(id="button_id", children="Run Job!"),
html.Button(id="cancel_button_id", children="Cancel Running Job!"),
]
)
@callback(
output=Output("paragraph_id", "children"),
inputs=Input("button_id", "n_clicks"),
background=True,
running=[
(Output("button_id", "disabled"), True, False),
(Output("cancel_button_id", "disabled"), False, True),
],
cancel=[Input("cancel_button_id", "n_clicks")],
)
def update_clicks(n_clicks):
time.sleep(2.0)
return [f"Clicked {n_clicks} times"]
# if __name__ == "__main__":
# app.run(debug=True)
示例 4:进度条#
这个例子使用 @dash.callback
装饰器的 progress
参数来在回调运行时更新进度条。我们将 progress
参数设置为引用 app
布局中组件属性的 Output
依赖分组。
当一个依赖分组被分配给 @dash.callback
的 progress
参数时,被装饰的函数将以一个新的特殊参数作为函数的第一个参数被调用。这个特殊参数,在下面的示例中命名为 set_progress
,是一个函数句柄,被装饰的函数调用它以便向 app
提供关于其当前进度的更新。set_progress
函数接受一个参数,该参数对应于传递给 @dash.callback
的 progress
参数的 Output
依赖分组中指定的属性分组。
import time
import os
from dash import Dash, DiskcacheManager, CeleryManager, Input, Output, html, callback
if 'REDIS_URL' in os.environ:
# Use Redis & Celery if REDIS_URL set as an env variable
from celery import Celery
celery_app = Celery(__name__, broker=os.environ['REDIS_URL'], backend=os.environ['REDIS_URL'])
background_callback_manager = CeleryManager(celery_app)
else:
# Diskcache for non-production apps when developing locally
import diskcache
cache = diskcache.Cache("./cache")
background_callback_manager = DiskcacheManager(cache)
app = Dash(__name__, background_callback_manager=background_callback_manager)
app.layout = html.Div(
[
html.Div(
[
html.P(id="paragraph_id", children=["Button not clicked"]),
html.Progress(id="progress_bar", value="0"),
]
),
html.Button(id="button_id", children="Run Job!"),
html.Button(id="cancel_button_id", children="Cancel Running Job!"),
]
)
@callback(
output=Output("paragraph_id", "children"),
inputs=Input("button_id", "n_clicks"),
background=True,
running=[
(Output("button_id", "disabled"), True, False),
(Output("cancel_button_id", "disabled"), False, True),
(
Output("paragraph_id", "style"),
{"visibility": "hidden"},
{"visibility": "visible"},
),
(
Output("progress_bar", "style"),
{"visibility": "visible"},
{"visibility": "hidden"},
),
],
cancel=Input("cancel_button_id", "n_clicks"),
progress=[Output("progress_bar", "value"), Output("progress_bar", "max")],
prevent_initial_call=True
)
def update_progress(set_progress, n_clicks):
total = 5
for i in range(total + 1):
set_progress((str(i), str(total)))
time.sleep(1)
return f"Clicked {n_clicks} times"
# if __name__ == "__main__":
# app.run(debug=True)
示例 5:进度条图表#
@dash.callback
装饰器的 progress
参数可以用来更新任意组件属性。这个例子创建并更新了 Plotly
柱状图,以显示当前的计算状态。
这个例子还使用了 progress_default
参数来指定一组值,当回调不在进行时,这组值应该被分配给 progress 参数指定的组件。如果没有提供 progress_default
,那么在回调不运行时,progress
中指定的所有依赖属性都被设置为 None
。在这种情况下,progress_default
被设置为一个宽度为零的柱状图。
import time
import os
from dash import Dash, DiskcacheManager, CeleryManager, Input, Output, html, dcc, callback
import plotly.graph_objects as go
if 'REDIS_URL' in os.environ:
# Use Redis & Celery if REDIS_URL set as an env variable
from celery import Celery
celery_app = Celery(__name__, broker=os.environ['REDIS_URL'], backend=os.environ['REDIS_URL'])
background_callback_manager = CeleryManager(celery_app)
else:
# Diskcache for non-production apps when developing locally
import diskcache
cache = diskcache.Cache("./cache")
background_callback_manager = DiskcacheManager(cache)
def make_progress_graph(progress, total):
progress_graph = (
go.Figure(data=[go.Bar(x=[progress])])
.update_xaxes(range=[0, total])
.update_yaxes(
showticklabels=False,
)
.update_layout(height=100, margin=dict(t=20, b=40))
)
return progress_graph
app = Dash(__name__, background_callback_manager=background_callback_manager)
app.layout = html.Div(
[
html.Div(
[
html.P(id="paragraph_id", children=["Button not clicked"]),
dcc.Graph(id="progress_bar_graph", figure=make_progress_graph(0, 10)),
]
),
html.Button(id="button_id", children="Run Job!"),
html.Button(id="cancel_button_id", children="Cancel Running Job!"),
]
)
@callback(
output=Output("paragraph_id", "children"),
inputs=Input("button_id", "n_clicks"),
background=True,
running=[
(Output("button_id", "disabled"), True, False),
(Output("cancel_button_id", "disabled"), False, True),
(
Output("paragraph_id", "style"),
{"visibility": "hidden"},
{"visibility": "visible"},
),
(
Output("progress_bar_graph", "style"),
{"visibility": "visible"},
{"visibility": "hidden"},
),
],
cancel=[Input("cancel_button_id", "n_clicks")],
progress=Output("progress_bar_graph", "figure"),
progress_default=make_progress_graph(0, 10)
)
def update_progress(set_progress, n_clicks):
total = 10
for i in range(total):
time.sleep(0.5)
set_progress(make_progress_graph(i, 10))
return [f"Clicked {n_clicks} times"]
# if __name__ == "__main__":
# app.run(debug=True)
为什么使用任务队列?#
当你的应用程序部署在生产环境中时,有限数量的 CPU 会为该应用程序处理请求。在生产环境中部署时,执行时间超过 30 秒的回调通常会经历超时。即使是执行时间少于 30 秒的回调,在多个用户同时访问你的应用程序时,也可能占用所有可用的服务器资源。当所有的 CPU 都在处理回调时,新访问者看到的是一个空白屏幕,最终会出现 “服务器超时” 的消息。
任务队列是解决这些超时问题的一个方案。就像为你的 Dash 应用程序服务的 web 进程一样,任务队列也由一组专用的 CPU 工作线程运行。这些工作线程会逐一处理任务,并且不会受到超时的限制。当任务队列的工作线程在处理数据时,为 Dash 应用程序服务的 web 进程和常规回调会显示信息加载屏幕、进度条以及任务队列的结果。最终用户永远不会看到超时,并且始终看到一个响应迅速的应用程序。