{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# 后台回调\n", "\n", "参考:[background-callbacks](https://dash.plotly.com/background-callbacks)\n", "\n", "在 Dash 2.6 中引入了对 `@dash.callback` 的后台回调支持。如果您使用的是较早版本的 Dash 2.x,可以使用 `long_callback` 进行长时间运行的回调。\n", "\n", "大多数网络服务器默认的超时时间是 30 秒,这对于需要更长时间完成的回调来说是个问题。虽然您可以在网络服务器上增加超时时间,但这样做的风险在于,长时间的回调可能会占用您应用的所有工作线程,阻止其他请求的处理。后台回调通过在单独的后台队列中运行它们,为使用长时间运行的回调提供了可扩展的解决方案。在后台队列中,回调会按照它们进入的顺序逐一执行,由专门的队列工作线程(们)处理。" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "您可以通过在回调上设置 `background=True` 来配置回调在后台运行。设置了 `background=True` 的回调会使用您配置的后端来运行回调逻辑。目前有两个选项:\n", "- [`DiskCache`](http://www.grantjenks.com/docs/diskcache/index.html) 后端,它在单独的进程中运行回调逻辑,并使用 `diskcache` 库将结果存储到磁盘上。这是本地开发时最简单的后端,但不建议在生产环境中使用。(`pip install dash[diskcache]`)\n", "- [Celery](https://docs.celeryproject.org/en/stable/getting-started/introduction.html) 后端,它在 Celery worker 中运行回调逻辑,并通过像 [Redis](https://redis.io/) 这样的 Celery 代理将结果返回给 Dash 应用。这在生产环境中是推荐的,因为与 DiskCache 不同,它会排队后台回调,由专门的 Celery worker(们)按照接收到的顺序逐一运行。Celery 是一个广泛采用的、生产就绪的任务队列库。(`pip install dash[celery]`)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "要使用后台回调,您首先需要使用所选的后端配置管理器。`@dash.callback` 装饰器需要这个管理器实例。您可以将管理器实例作为 `background_callback_manager` 关键字参数提供给 `dash.Dash` `app` 构造函数,或者作为 `@dash.callback` 装饰器的 `manager` 参数。\n", "\n", "在接下来的五个示例中,我们将更详细地讨论如何实现后台回调。" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 简单的后台回调\n", "\n", "这里是后台回调的简单示例,它使用按钮被点击的次数来更新 `html.P` 元素。该回调使用 `time.sleep` 来模拟长时间运行的操作。" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import time\n", "import os\n", "\n", "from dash import Dash, DiskcacheManager, CeleryManager, Input, Output, html, callback\n", "\n", "if 'REDIS_URL' in os.environ:\n", " # Use Redis & Celery if REDIS_URL set as an env variable\n", " from celery import Celery\n", " celery_app = Celery(__name__, broker=os.environ['REDIS_URL'], backend=os.environ['REDIS_URL'])\n", " background_callback_manager = CeleryManager(celery_app)\n", "\n", "else:\n", " # Diskcache for non-production apps when developing locally\n", " import diskcache\n", " cache = diskcache.Cache(\"./cache\")\n", " background_callback_manager = DiskcacheManager(cache)\n", "\n", "app = Dash(__name__)\n", "\n", "app.layout = html.Div(\n", " [\n", " html.Div([html.P(id=\"paragraph_id\", children=[\"Button not clicked\"])]),\n", " html.Button(id=\"button_id\", children=\"Run Job!\"),\n", " ]\n", ")\n", "\n", "@callback(\n", " output=Output(\"paragraph_id\", \"children\"),\n", " inputs=Input(\"button_id\", \"n_clicks\"),\n", " background=True,\n", " manager=background_callback_manager,\n", ")\n", "def update_clicks(n_clicks):\n", " time.sleep(2.0)\n", " return [f\"Clicked {n_clicks} times\"]\n", "\n", "\n", "# if __name__ == \"__main__\":\n", "# app.run(debug=True)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 示例 2:在回调运行期间禁用按钮\n", "注意,在之前的例子中,没有视觉上的指示表明后台回调正在运行。用户可能会在原始任务完成之前多次点击 \"Run Job!\" 按钮。你也可以在回调运行时禁用按钮,并在回调完成后重新启用它。\n", "\n", "要做到这一点,可以使用 `@dash.callback` 的 `running` 参数。这个参数接受一个包含 3 个元素的元组列表。每个元组的第一个元素必须是一个引用 `app` 布局中组件属性的 `Output` 依赖对象。第二个元素是回调运行时属性应该设置的值,第三个元素是回调完成时属性应该设置的值。" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "这个例子使用 `running` 参数在回调运行时将按钮的 `disabled` 属性设置为 `True`,并在回调完成时设置为 `False`。\n", "\n", "注意:在这个例子中,`background_callback_manager` 是提供给 dash.Dash `app` 构造函数而不是 `@dash.callback` 装饰器。" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "import time\n", "import os\n", "\n", "from dash import Dash, DiskcacheManager, CeleryManager, Input, Output, html, callback\n", "\n", "if 'REDIS_URL' in os.environ:\n", " # Use Redis & Celery if REDIS_URL set as an env variable\n", " from celery import Celery\n", " celery_app = Celery(__name__, broker=os.environ['REDIS_URL'], backend=os.environ['REDIS_URL'])\n", " background_callback_manager = CeleryManager(celery_app)\n", "\n", "else:\n", " # Diskcache for non-production apps when developing locally\n", " import diskcache\n", " cache = diskcache.Cache(\"./cache\")\n", " background_callback_manager = DiskcacheManager(cache)\n", "\n", "app = Dash(__name__, background_callback_manager=background_callback_manager)\n", "\n", "app.layout = html.Div(\n", " [\n", " html.Div([html.P(id=\"paragraph_id\", children=[\"Button not clicked\"])]),\n", " html.Button(id=\"button_id\", children=\"Run Job!\"),\n", " ]\n", ")\n", "\n", "@callback(\n", " output=Output(\"paragraph_id\", \"children\"),\n", " inputs=Input(\"button_id\", \"n_clicks\"),\n", " background=True,\n", " running=[\n", " (Output(\"button_id\", \"disabled\"), True, False),\n", " ],\n", ")\n", "def update_clicks(n_clicks):\n", " time.sleep(2.0)\n", " return [f\"Clicked {n_clicks} times\"]\n", "\n", "\n", "# if __name__ == \"__main__\":\n", "# app.run(debug=True)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 示例 3:可取消的回调\n", "这个例子在之前的例子的基础上增加了支持取消长时间运行的回调,通过使用 `@dash.callback` 装饰器的 `cancel` 参数。我们将 `cancel` 参数设置为引用 `app` 布局中组件属性的 `Input` 依赖对象列表。当一个回调正在运行时,如果这个属性的值发生变化,回调将被取消。注意,属性的值并不重要——任何值的变化都会取消正在运行的任务(如果有的话)。" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "import time\n", "import os\n", "\n", "from dash import Dash, DiskcacheManager, CeleryManager, Input, Output, html, callback\n", "\n", "if 'REDIS_URL' in os.environ:\n", " # Use Redis & Celery if REDIS_URL set as an env variable\n", " from celery import Celery\n", " celery_app = Celery(__name__, broker=os.environ['REDIS_URL'], backend=os.environ['REDIS_URL'])\n", " background_callback_manager = CeleryManager(celery_app)\n", "\n", "else:\n", " # Diskcache for non-production apps when developing locally\n", " import diskcache\n", " cache = diskcache.Cache(\"./cache\")\n", " background_callback_manager = DiskcacheManager(cache)\n", "\n", "app = Dash(__name__, background_callback_manager=background_callback_manager)\n", "\n", "app.layout = html.Div(\n", " [\n", " html.Div([html.P(id=\"paragraph_id\", children=[\"Button not clicked\"])]),\n", " html.Button(id=\"button_id\", children=\"Run Job!\"),\n", " html.Button(id=\"cancel_button_id\", children=\"Cancel Running Job!\"),\n", " ]\n", ")\n", "\n", "@callback(\n", " output=Output(\"paragraph_id\", \"children\"),\n", " inputs=Input(\"button_id\", \"n_clicks\"),\n", " background=True,\n", " running=[\n", " (Output(\"button_id\", \"disabled\"), True, False),\n", " (Output(\"cancel_button_id\", \"disabled\"), False, True),\n", " ],\n", " cancel=[Input(\"cancel_button_id\", \"n_clicks\")],\n", ")\n", "def update_clicks(n_clicks):\n", " time.sleep(2.0)\n", " return [f\"Clicked {n_clicks} times\"]\n", "\n", "\n", "# if __name__ == \"__main__\":\n", "# app.run(debug=True)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 示例 4:进度条\n", "这个例子使用 `@dash.callback` 装饰器的 `progress` 参数来在回调运行时更新进度条。我们将 `progress` 参数设置为引用 `app` 布局中组件属性的 `Output` 依赖分组。\n", "\n", "当一个依赖分组被分配给 `@dash.callback `的 `progress` 参数时,被装饰的函数将以一个新的特殊参数作为函数的第一个参数被调用。这个特殊参数,在下面的示例中命名为 `set_progress`,是一个函数句柄,被装饰的函数调用它以便向 `app` 提供关于其当前进度的更新。`set_progress` 函数接受一个参数,该参数对应于传递给 `@dash.callback` 的 `progress` 参数的 `Output` 依赖分组中指定的属性分组。" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "import time\n", "import os\n", "\n", "from dash import Dash, DiskcacheManager, CeleryManager, Input, Output, html, callback\n", "\n", "if 'REDIS_URL' in os.environ:\n", " # Use Redis & Celery if REDIS_URL set as an env variable\n", " from celery import Celery\n", " celery_app = Celery(__name__, broker=os.environ['REDIS_URL'], backend=os.environ['REDIS_URL'])\n", " background_callback_manager = CeleryManager(celery_app)\n", "\n", "else:\n", " # Diskcache for non-production apps when developing locally\n", " import diskcache\n", " cache = diskcache.Cache(\"./cache\")\n", " background_callback_manager = DiskcacheManager(cache)\n", "\n", "app = Dash(__name__, background_callback_manager=background_callback_manager)\n", "\n", "app.layout = html.Div(\n", " [\n", " html.Div(\n", " [\n", " html.P(id=\"paragraph_id\", children=[\"Button not clicked\"]),\n", " html.Progress(id=\"progress_bar\", value=\"0\"),\n", " ]\n", " ),\n", " html.Button(id=\"button_id\", children=\"Run Job!\"),\n", " html.Button(id=\"cancel_button_id\", children=\"Cancel Running Job!\"),\n", " ]\n", ")\n", "\n", "@callback(\n", " output=Output(\"paragraph_id\", \"children\"),\n", " inputs=Input(\"button_id\", \"n_clicks\"),\n", " background=True,\n", " running=[\n", " (Output(\"button_id\", \"disabled\"), True, False),\n", " (Output(\"cancel_button_id\", \"disabled\"), False, True),\n", " (\n", " Output(\"paragraph_id\", \"style\"),\n", " {\"visibility\": \"hidden\"},\n", " {\"visibility\": \"visible\"},\n", " ),\n", " (\n", " Output(\"progress_bar\", \"style\"),\n", " {\"visibility\": \"visible\"},\n", " {\"visibility\": \"hidden\"},\n", " ),\n", " ],\n", " cancel=Input(\"cancel_button_id\", \"n_clicks\"),\n", " progress=[Output(\"progress_bar\", \"value\"), Output(\"progress_bar\", \"max\")],\n", " prevent_initial_call=True\n", ")\n", "def update_progress(set_progress, n_clicks):\n", " total = 5\n", " for i in range(total + 1):\n", " set_progress((str(i), str(total)))\n", " time.sleep(1)\n", "\n", " return f\"Clicked {n_clicks} times\"\n", "\n", "\n", "# if __name__ == \"__main__\":\n", "# app.run(debug=True)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 示例 5:进度条图表\n", "`@dash.callback` 装饰器的 `progress` 参数可以用来更新任意组件属性。这个例子创建并更新了 `Plotly` 柱状图,以显示当前的计算状态。\n", "\n", "这个例子还使用了 `progress_default` 参数来指定一组值,当回调不在进行时,这组值应该被分配给 progress 参数指定的组件。如果没有提供 `progress_default`,那么在回调不运行时,`progress` 中指定的所有依赖属性都被设置为 `None`。在这种情况下,`progress_default` 被设置为一个宽度为零的柱状图。" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "import time\n", "import os\n", "\n", "from dash import Dash, DiskcacheManager, CeleryManager, Input, Output, html, dcc, callback\n", "import plotly.graph_objects as go\n", "\n", "if 'REDIS_URL' in os.environ:\n", " # Use Redis & Celery if REDIS_URL set as an env variable\n", " from celery import Celery\n", " celery_app = Celery(__name__, broker=os.environ['REDIS_URL'], backend=os.environ['REDIS_URL'])\n", " background_callback_manager = CeleryManager(celery_app)\n", "\n", "else:\n", " # Diskcache for non-production apps when developing locally\n", " import diskcache\n", " cache = diskcache.Cache(\"./cache\")\n", " background_callback_manager = DiskcacheManager(cache)\n", "\n", "def make_progress_graph(progress, total):\n", " progress_graph = (\n", " go.Figure(data=[go.Bar(x=[progress])])\n", " .update_xaxes(range=[0, total])\n", " .update_yaxes(\n", " showticklabels=False,\n", " )\n", " .update_layout(height=100, margin=dict(t=20, b=40))\n", " )\n", " return progress_graph\n", "\n", "\n", "app = Dash(__name__, background_callback_manager=background_callback_manager)\n", "\n", "app.layout = html.Div(\n", " [\n", " html.Div(\n", " [\n", " html.P(id=\"paragraph_id\", children=[\"Button not clicked\"]),\n", " dcc.Graph(id=\"progress_bar_graph\", figure=make_progress_graph(0, 10)),\n", " ]\n", " ),\n", " html.Button(id=\"button_id\", children=\"Run Job!\"),\n", " html.Button(id=\"cancel_button_id\", children=\"Cancel Running Job!\"),\n", " ]\n", ")\n", "\n", "@callback(\n", " output=Output(\"paragraph_id\", \"children\"),\n", " inputs=Input(\"button_id\", \"n_clicks\"),\n", " background=True,\n", " running=[\n", " (Output(\"button_id\", \"disabled\"), True, False),\n", " (Output(\"cancel_button_id\", \"disabled\"), False, True),\n", " (\n", " Output(\"paragraph_id\", \"style\"),\n", " {\"visibility\": \"hidden\"},\n", " {\"visibility\": \"visible\"},\n", " ),\n", " (\n", " Output(\"progress_bar_graph\", \"style\"),\n", " {\"visibility\": \"visible\"},\n", " {\"visibility\": \"hidden\"},\n", " ),\n", " ],\n", " cancel=[Input(\"cancel_button_id\", \"n_clicks\")],\n", " progress=Output(\"progress_bar_graph\", \"figure\"),\n", " progress_default=make_progress_graph(0, 10)\n", ")\n", "def update_progress(set_progress, n_clicks):\n", " total = 10\n", " for i in range(total):\n", " time.sleep(0.5)\n", " set_progress(make_progress_graph(i, 10))\n", "\n", " return [f\"Clicked {n_clicks} times\"]\n", "\n", "\n", "# if __name__ == \"__main__\":\n", "# app.run(debug=True)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 为什么使用任务队列?\n", "当你的应用程序部署在生产环境中时,有限数量的 CPU 会为该应用程序处理请求。在生产环境中部署时,执行时间超过 30 秒的回调通常会经历超时。即使是执行时间少于 30 秒的回调,在多个用户同时访问你的应用程序时,也可能占用所有可用的服务器资源。当所有的 CPU 都在处理回调时,新访问者看到的是一个空白屏幕,最终会出现 \"服务器超时\" 的消息。" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "任务队列是解决这些超时问题的一个方案。就像为你的 Dash 应用程序服务的 web 进程一样,任务队列也由一组专用的 CPU 工作线程运行。这些工作线程会逐一处理任务,并且不会受到超时的限制。当任务队列的工作线程在处理数据时,为 Dash 应用程序服务的 web 进程和常规回调会显示信息加载屏幕、进度条以及任务队列的结果。最终用户永远不会看到超时,并且始终看到一个响应迅速的应用程序。" ] } ], "metadata": { "kernelspec": { "display_name": "py311", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.7" } }, "nbformat": 4, "nbformat_minor": 2 }