软件源程序¶
项目的主程序:
项目中的 Dash 应用。
from dash_xinet.server import create_app
# 创建 Dash App
app = create_app(__name__,
title='Sanstyle Dash')
项目中的 Dash 应用运行主程序。
from dash import dcc, html
from dash.dependencies import Input, Output
from app import app
from layouts import index, record, watch, replay, about
# from examples.run import callback_example
from callbacks.record import *
from callbacks.watch import *
from callbacks.replay import *
layout = html.Article([
dcc.Location(id='url', refresh=False), # 定位地址栏
html.Section(id='page-content'), # 页面布局
])
@app.callback(Output('page-content', 'children'),
Input('url', 'pathname'))
def display_page(pathname):
if pathname == '/':
return index.layout
if pathname == '/record':
return record.layout
if pathname == '/watch':
return watch.layout
if pathname == '/replay':
return replay.layout
if pathname == '/about':
return about.layout
# elif pathname.startswith('/examples/'):
# return callback_example(pathname)
# else:
# return '404'
app.config.suppress_callback_exceptions = True # 用于支持多页应用
if __name__ == '__main__':
import asyncio
from dash_xinet.server import run_server
port = 7777
# app.run_server(debug=True, port=5555, threaded=True)
# app.run_server(app, debug=True, port=5555, threaded=True)
run = run_server(app, layout,
port=port, debug=True
)
asyncio.run(run)
else:
app.layout = layout
server = app.server # 用于 Dash 服务器部署
网页应用的主题¶
网页应用的 app CSS 主题。
.app-button {
list-style-type: none;
margin: 0;
padding: 0;
display: inline;
}
.app-button a {
text-decoration-line: none;
}
网页应用的页面主题。
body {
font-family: cursive;
}
h1 {
color: hotpink
}
网页应用的布局 layouts/¶
网页应用的主页布局。
from dash import dcc, html
from utils.toml import load_option
from utils.nav import page_header
layout_options = load_option('options/index.toml')
header = page_header(title=layout_options.title)
layout = html.Article([header])
网页应用的 about 布局。
from dash import dcc, html
from utils.toml import load_option
from utils.nav import page_header
layout_options = load_option('options/about.toml')
header = page_header(title=layout_options.title)
layout = html.Article([header])
网页应用的 record 布局。
from dash.dash_table import DataTable
from dash import dcc, html
from utils.toml import load_option
from utils.nav import page_header
from utils.graph import create_graph
layout_options = load_option('options/record.toml')
# 与硬件通信
device = html.Aside([
html.Button('打开', id='open-devide',
n_clicks=0,
className='w3-pale-green w3-round-xlarge'),
html.Button('关闭', id='close-devide',
n_clicks=0,
className='w3-pale-green w3-round-xlarge'),
dcc.Input(id='live-update-text', type='text',
readOnly=True, className='w3-right')
], className='w3-cell-row w3-pale-blue')
graph = create_graph('view-graph', 'feature-graph',
'memory-class', layout_options)
header = page_header(title=layout_options.title)
main = html.Main([dcc.Store(id='memory-frame'), # 存储每帧数据
dcc.Store(id='memory-output'), # 输出数据流
dcc.Store(id='memory-frames'), # 输出筛选后的数据留
dcc.Interval(id='interval-frame', # 用于数据更新
interval=300,
n_intervals=0),
graph,
# 显示数据框
DataTable(
id='memory-table',
columns=[{'name': i, 'id': i} for i in layout_options.columns]
)])
layout = html.Article([header, device, main])
网页应用的 replay 布局。
from dash import dcc, html
from utils.nav import page_header
from utils.toml import load_option
from utils.stream import Stream
from utils.graph import create_graph
# from utils.update import update_frame_layout
# 数据
layout_options = load_option('options/replay.toml')
stream = Stream(layout_options.save_path)
frame_section = html.Div([
html.Div([
dcc.Input(id='replay-frame-start', value=0,
min=stream.min_id,
max=stream.max_id,
type="number", className='w3-col m1'),
dcc.Input(id='replay-frame-end', value=0,
min=stream.min_id,
max=stream.max_id,
readOnly='readonly',
type="number", className='w3-col m1')],
className='w3-row w3-pale-red')
])
# 布局
header = page_header(title=layout_options.title)
graph = create_graph('replay-view-graph', 'replay-feature-graph',
'replay-memory-class', layout_options)
main = html.Main([dcc.Store(id='replay-memory-frame'), # 存储每帧数据
dcc.Store(id='replay-memory-output'), # 输出数据流
dcc.Store(id='replay-memory-frames'), # 输出筛选后的数据留
dcc.Interval(id='replay-interval-frame', # 用于数据更新
interval=200,
n_intervals=0),
graph,
frame_section
])
layout = html.Article([header, main])
网页应用的 watch 布局。
from dash import dcc, html
from utils.nav import page_header
from utils.toml import load_option
from utils.stream import Stream
from utils.graph import create_graph
from utils.update import update_frame_layout
# 数据
layout_options = load_option('options/watch.toml')
stream = Stream(layout_options.save_path)
frame_section = update_frame_layout('frame-slider', 'frame-start', 'frame-end', 'frame-run', 'frame-stop', stream)
# 布局
header = page_header(title=layout_options.title)
graph = create_graph('watch-view-graph', 'watch-feature-graph',
'watch-memory-class', layout_options)
main = html.Main([dcc.Store(id='watch-memory-frame'), # 存储每帧数据
dcc.Store(id='watch-memory-output'), # 输出数据流
dcc.Store(id='watch-memory-frames'), # 输出筛选后的数据留
dcc.Interval(id='watch-interval-frame', # 用于数据更新
interval=200,
n_intervals=0),
graph,
frame_section
])
layout = html.Article([header, main])
网页应用的布局参数 options/¶
网页应用的主页布局参数。
class_names = [ "AEB", "CIPV", "ACC", "MCP",]
columns = [ "frame_id", "track_id", "origin", "class_name", "x", "y", "v_x", "v_y", "a_x", "a_y",]
title = "无人驾驶"
网页应用的 about 布局参数。
class_names = [ "AEB", "CIPV", "ACC", "MCP",]
columns = [ "frame_id", "track_id", "origin", "class_name", "x", "y", "v_x", "v_y", "a_x", "a_y",]
title = "APG UI"
网页应用的 record 布局参数。
save_path = "../out.h5"
class_names = [ "AEB", "CIPV", "ACC", "MCP",]
columns = [ "frame_id", "track_id", "origin", "class_name", "x", "y", "v_x", "v_y", "a_x", "a_y",]
title = "Dash UI"
网页应用的 replay 布局参数。
save_path = "../out3.h5"
class_names = [ "AEB", "CIPV", "ACC", "MCP",]
columns = [ "frame_id", "track_id", "origin", "class_name", "x", "y", "v_x", "v_y", "a_x", "a_y",]
title = "APG UI"
网页应用的 watch 布局参数。
save_path = "../out2.h5"
class_names = [ "AEB", "CIPV", "ACC", "MCP",]
columns = [ "frame_id", "track_id", "origin", "class_name", "x", "y", "v_x", "v_y", "a_x", "a_y",]
title = "APG UI"
网页应用的回调 callbacks/¶
网页应用的 record 回调。
import datetime
import pandas as pd
from dash.dependencies import Input, Output
from dash.exceptions import PreventUpdate
from utils.client import plot_frame, Canvas
from tools.frame import Shape
from utils.client import frame2pandas, simulate_shape
from app import app
from layouts.record import layout_options
frames = []
@app.callback(Output('live-update-text', 'value'),
Input('interval-frame', 'n_intervals'))
def update_metrics(frame_id):
'''⏲计时,更新时间和帧数'''
now = datetime.datetime.now()
now = now.strftime('%H:%M:%S')
return f'{now} @ {frame_id}'
@app.callback(Output('memory-frame', 'data'),
Output('memory-frames', 'data'),
Input('interval-frame', 'n_intervals'))
def update_frame(frame_id):
choice_class = layout_options.class_names + ['other']
# shapes = [simulate_shape(frame_id, k, class_name)
# for k, class_name in enumerate(choice_class)]
shapes = []
k = 0
for _ in range(10):
for class_name in choice_class:
shape = simulate_shape(frame_id, k, class_name)
shapes.append(shape)
# print(shape)
k += 1
frame = [shape.asdict() for shape in shapes]
frame = frame2pandas(frame)
frame.to_hdf(layout_options.save_path, key=f'frame_{frame_id}', mode='a')
frame_dict = frame.to_dict('records')
global frames
frames.extend(frame_dict)
if len(frames) > 500:
frames = frames[-500:]
return frame_dict, frames[1:]
@app.callback(Output('view-graph', 'figure'),
Input('memory-frame', 'data'))
def update_view_graph_frame(frame):
'''更新鸟瞰图'''
canvas = Canvas()
shapes = [Shape(**shape) for shape in frame]
shapes = [canvas.to_shape(*shape.view) for shape in shapes]
canvas.view.update_layout(shapes=shapes)
canvas.update_base()
return canvas.view
@app.callback(Output('memory-output', 'data'),
Input('memory-frames', 'data'),
Input('memory-class', 'value'))
def store_frame(frames, class_selected):
df = pd.DataFrame.from_records(frames)
if frames == None:
raise PreventUpdate
filtered = df[df['class_name'] == class_selected]
return filtered.to_dict('records')
@app.callback(Output('memory-table', 'data'),
Input('memory-output', 'data'))
def on_data_set_table(data):
if data is None:
raise PreventUpdate
return data
@app.callback(Output('feature-graph', 'figure'),
Input('memory-output', 'data'))
def on_data_set_graph(data):
if data is None:
raise PreventUpdate
filtered = pd.DataFrame.from_records(data)
fig = plot_frame(filtered)
return fig
网页应用的 replay 回调。
import datetime
import pandas as pd
from dash.dependencies import Input, Output, State
from dash.exceptions import PreventUpdate
from utils.client import plot_frame, Canvas
from tools.frame import Shape
from app import app
from layouts.replay import stream
frames = []
frame_id = -1
@app.callback(
Output('replay-frame-end', 'value'),
Input('replay-interval-frame', 'n_intervals'),
Input('replay-frame-start', 'value'))
def replay_frame_end(n_intervals, frame_start):
global frame_id
if frame_start is None or frame_start>stream.max_id or frame_start < stream.min_id or len(stream)==0 or frame_start+frame_id>stream.max_id:
frame_id = 0
raise PreventUpdate
else:
frame_id += 1
return frame_start+frame_id
@app.callback(Output('replay-memory-frame', 'data'),
Output('replay-memory-frames', 'data'),
Input('replay-frame-end', 'value'))
def replay_frame(frame_end):
if frame_end > stream.max_id or stream.max_id==0 or len(stream)==0:
frame_end=0
raise PreventUpdate
else:
global frames, frame_id
frame = stream[frame_end]
if isinstance(frame, str):
raise PreventUpdate
frame_dict = frame.to_dict('records')
frames.extend(frame_dict)
if len(frames) > 500:
frames = frames[-500:]
return frame_dict, frames[1:]
@app.callback(Output('replay-view-graph', 'figure'),
Input('replay-memory-frame', 'data'))
def replay_view_graph_frame(frame):
'''更新鸟瞰图'''
if len(stream)==0:
raise PreventUpdate
canvas = Canvas()
shapes = [Shape(**shape) for shape in frame]
shapes = [canvas.to_shape(*shape.view) for shape in shapes]
canvas.view.update_layout(shapes=shapes)
canvas.update_base()
return canvas.view
@app.callback(Output('replay-memory-output', 'data'),
Input('replay-memory-frames', 'data'),
Input('replay-memory-class', 'value'))
def replay_store_frame(frames, class_selected):
if len(stream)==0:
raise PreventUpdate
else:
df = pd.DataFrame.from_records(frames)
if frames == None:
raise PreventUpdate
filtered = df[df['class_name'] == class_selected]
return filtered.to_dict('records')
@app.callback(Output('replay-feature-graph', 'figure'),
Input('replay-memory-output', 'data'))
def replay_on_data_set_graph(data):
if data is None:
raise PreventUpdate
else:
filtered = pd.DataFrame.from_records(data)
fig = plot_frame(filtered)
return fig
网页应用的 watch 回调。
import pandas as pd
from dash.dependencies import Input, Output, State
from dash.exceptions import PreventUpdate
from dash import callback_context
from utils.client import plot_frame, Canvas
from tools.frame import Shape
from app import app
from layouts.watch import stream
@app.callback(
Output('frame-slider', 'value'),
Output('frame-start', 'value'),
Output('frame-end', 'value'),
Input('frame-slider', 'value'),
Input('frame-start', 'value'),
Input('frame-end', 'value'))
def callback(slider_value, frame_start, frame_end):
ctx = callback_context
trigger = ctx.triggered[0]
value = trigger['value']
if not value:
raise PreventUpdate
else:
trigger_id = trigger["prop_id"].split(".")[0]
if trigger_id == 'frame-slider':
slider_value = value
elif trigger_id == 'frame-start':
slider_value[0] = value
elif trigger_id == 'frame-end':
slider_value[1] = value
frame_start = min(slider_value)
frame_end = max(slider_value)
slider_value = frame_start, frame_end
return slider_value, frame_start, frame_end
@app.callback(Output('watch-view-graph', 'figure'),
Input('frame-end', 'value'))
def replay_view_graph_frame(frame_end):
'''更新鸟瞰图'''
if len(stream)==0:
raise PreventUpdate
frame = stream[frame_end]
canvas = Canvas()
shapes = [Shape(**shape) for shape in frame.to_dict('records')]
shapes = [canvas.to_shape(*shape.view) for shape in shapes]
canvas.view.update_layout(shapes=shapes)
canvas.update_base()
return canvas.view
@app.callback(Output('watch-memory-output', 'data'),
Input('frame-start', 'value'),
Input('frame-end', 'value'),
Input('watch-memory-class', 'value'))
def store_frame(frame_start, frame_end, class_selected):
if frame_start == frame_end or len(stream)==0:
raise PreventUpdate
df = stream[frame_start:frame_end+1]
filtered = df[df['class_name'] == class_selected]
return filtered.to_dict('records')
@app.callback(Output('watch-feature-graph', 'figure'),
Input('watch-memory-output', 'data'))
def on_data_set_graph(data):
if data is None or len(stream)==0:
raise PreventUpdate
filtered = pd.DataFrame.from_records(data)
fig = plot_frame(filtered)
return fig
鸟瞰图¶
网页应用的鸟瞰图 view 回调。
import plotly.graph_objects as go
class CanvasMeta:
def __init__(self):
self.x_range = [-7.5, 7.5]
self.y_range = [-30, 210]
self.y_scale = 270/15
self._fig_height = 750
self.view = go.Figure()
self.view.update_layout(
plot_bgcolor='lightgrey',
paper_bgcolor='lightslategrey',
font_color='goldenrod'
)
@property
def base_shapes(self):
center_vline = {'x': 0,
'line_width': 5,
'line_dash': 'dash',
'line_color': 'green',
'annotation_text': '中心线',
'annotation': {'font_size': 12, 'font_family': '宋体'},
'opacity': 0.25}
vline1 = {'x': 3,
'line_width': 5,
'line_dash': 'dash',
'line_color': 'green',
'annotation_text': '3',
'annotation': {'font_size': 12, 'font_family': '宋体'},
'opacity': 0.2}
vline2 = {'x': -3,
'line_width': 5,
'line_dash': 'dash',
'line_color': 'green',
'annotation_text': '-3',
'annotation': {'font_size': 12, 'font_family': '宋体'},
'opacity': 0.2}
hline1 = {'y': 0,
'line_width': 5,
'line_dash': 'dash',
'line_color': 'red',
'annotation_text': '0',
'opacity': 0.3}
hline2 = {'y': 60,
'line_width': 2,
'line_dash': 'dash',
'line_color': 'red',
'annotation_text': '60',
'opacity': 0.3}
hline3 = {'y': 120,
'line_width': 2,
'line_dash': 'dash',
'line_color': 'yellow',
'annotation_text': '120',
'opacity': 0.3}
hline4 = {'y': 180,
'line_width': 2,
'line_dash': 'dash',
'line_color': 'blue',
'annotation_text': '180',
'opacity': 0.3}
# 本车
car = {'type': 'rect',
'xref': 'x',
'yref': 'y',
'x0': -0.25,
'x1': 0.25,
'y0': -3.5,
'y1': 3.5,
'fillcolor': 'blue',
'opacity': 0.8,
'line': {'color': 'Lightgreen', 'width': 1}}
return {
'center_vline': center_vline,
'vline1': vline1,
'vline2': vline2,
'hline1': hline1,
'hline2': hline2,
'hline3': hline3,
'hline4': hline4,
'host_vehicle': car
}
def update_base(self):
base_shapes = self.base_shapes
self.view.add_vline(**base_shapes['center_vline'])
self.view.add_vline(**base_shapes['vline1'])
self.view.add_vline(**base_shapes['vline2'])
self.view.add_hline(**base_shapes['hline1'])
self.view.add_hline(**base_shapes['hline2'])
self.view.add_hline(**base_shapes['hline3'])
self.view.add_hline(**base_shapes['hline4'])
self.view.add_shape(**base_shapes['host_vehicle'])
self.view.update_xaxes(range=self.x_range,
showline=True,
linewidth=2,
linecolor='grey',
mirror=True
)
self.view.update_yaxes(range=self.y_range,
showline=True,
linewidth=2,
linecolor='grey',
mirror=True
)
self.view.update_layout(
margin={'l': 20, 'r': 20, 't': 20, 'b': 10},
height=self._fig_height,
)
def to_bbox(self, x, y, w, h):
return {
'x0': x-w,
'x1': x+w,
'y0': y-h,
'y1': y+h
}
def to_obj(self, _type, x, y, w, h,
fillcolor, opacity,
line_color='LightSeaGreen'):
'''视觉目标'''
obj = ({
'type': _type,
'xref': "x", 'yref': "y",
**self.to_bbox(x, y, w, h),
'fillcolor': fillcolor,
'opacity': opacity,
'line': {
'color': line_color,
'width': 2,
}
})
return obj
def to_visual_obj(self, x, y):
'''视觉目标'''
return self.to_obj('circle', x, y, 0.27, 2.8, 'orange', 0.8, 'grey')
def to_radar_obj(self, x, y):
'''视觉目标'''
return self.to_obj('rect', x, y, 0.3, 3, 'white', 0.4, 'black')
def to_fusion_obj(self, x, y):
'''融合目标'''
obj = self.to_obj('rect', x, y, 0.36, 3.8, 'yellow', 0.4, 'red')
return obj
def to_shape(self, _type, x, y):
if _type == 'visual':
obj = self.to_visual_obj(x, y)
elif _type == 'radar':
obj = self.to_radar_obj(x, y)
elif _type == 'fusion':
obj = self.to_fusion_obj(x, y)
else:
obj = {}
return obj
工具¶
网页应用的鸟瞰图 view 回调。
from dataclasses import dataclass, asdict
@dataclass
class Shape:
frame_id: int # 帧号
track_id: int
origin: str # 目标物来源,取值 'visual', 'radar', 'fusion'
class_name: str # ACC主目标,AEB主目标,CIPV目标,MCP目标,other
x: float # 横向距离
y: float # 纵向距离
v_x: float # 横向速度
v_y: float # 纵向速度
a_x: float # 横向加速度
a_y: float # 纵向加速度
def __repr__(self):
obj = f'{self.origin.capitalize()}' \
f'(track_id={self.track_id}, class_name={self.class_name}, x={self.x}, y={self.y},' \
f'v_x={self.v_x}, v_y={self.v_y}, ' \
f'a_x={self.a_x}, a_y={self.a_y})'
return obj
@property
def view(self):
return self.origin, self.x, self.y
@property
def prop(self):
v = {'x': self.x,
'y': self.y,
'v_x': self.v_x,
'v_y': self.v_y,
'a_x': self.a_x,
'a_y': self.a_y}
return v
def asdict(self):
return asdict(self)
class Frame:
def __init__(self, shapes):
self.shapes = shapes
@property
def bunch(self):
_bunch = {}
for shape in self.shapes:
class_name = shape.class_name
_bunch.setdefault(class_name, []).append(shape)
return _bunch