软件源程序

项目的主程序:

项目中的 Dash 应用。

from dash_xinet.server import create_app

# 创建 Dash App
app = create_app(__name__,
                 title='Sanstyle Dash')

项目中的 Dash 应用运行主程序。

from dash import dcc, html
from dash.dependencies import Input, Output

from app import app
from layouts import index, record, watch, replay, about
# from examples.run import callback_example
from callbacks.record import *
from callbacks.watch import *
from callbacks.replay import *

layout = html.Article([
    dcc.Location(id='url', refresh=False),  # 定位地址栏
    html.Section(id='page-content'),  # 页面布局
])


@app.callback(Output('page-content', 'children'),
              Input('url', 'pathname'))
def display_page(pathname):
    if pathname == '/':
        return index.layout
    if pathname == '/record':
        return record.layout
    if pathname == '/watch':
        return watch.layout
    if pathname == '/replay':
        return replay.layout
    if pathname == '/about':
        return about.layout
    # elif pathname.startswith('/examples/'):
    #     return callback_example(pathname)
    # else:
    #     return '404'


app.config.suppress_callback_exceptions = True  # 用于支持多页应用

if __name__ == '__main__':
    import asyncio
    from dash_xinet.server import run_server

    port = 7777
    # app.run_server(debug=True, port=5555, threaded=True)
    # app.run_server(app, debug=True, port=5555, threaded=True)
    run = run_server(app, layout,
                     port=port, debug=True
                     )
    asyncio.run(run)
else:
    app.layout = layout
    server = app.server  # 用于 Dash 服务器部署

网页应用的主题

网页应用的 app CSS 主题。

.app-button {
    list-style-type: none;
    margin: 0;
    padding: 0;
    display: inline;
    
}

.app-button a {
    text-decoration-line: none;
}

网页应用的页面主题。

body {
    font-family: cursive;
}

h1 {
    color: hotpink
}

网页应用的布局 layouts/

网页应用的主页布局。

from dash import dcc, html
from utils.toml import load_option
from utils.nav import page_header

layout_options = load_option('options/index.toml')
header = page_header(title=layout_options.title)

layout = html.Article([header])

网页应用的 about 布局。

from dash import dcc, html
from utils.toml import load_option
from utils.nav import page_header
layout_options = load_option('options/about.toml')

header = page_header(title=layout_options.title)

layout = html.Article([header])

网页应用的 record 布局。

from dash.dash_table import DataTable
from dash import dcc, html

from utils.toml import load_option
from utils.nav import page_header
from utils.graph import create_graph

layout_options = load_option('options/record.toml')


# 与硬件通信
device = html.Aside([
    html.Button('打开', id='open-devide',
                n_clicks=0,
                className='w3-pale-green w3-round-xlarge'),
    html.Button('关闭', id='close-devide',
                n_clicks=0,
                className='w3-pale-green w3-round-xlarge'), 
    dcc.Input(id='live-update-text', type='text',
              readOnly=True, className='w3-right')
], className='w3-cell-row w3-pale-blue')

graph = create_graph('view-graph', 'feature-graph',
                     'memory-class', layout_options)

header = page_header(title=layout_options.title)
main = html.Main([dcc.Store(id='memory-frame'),  # 存储每帧数据
                  dcc.Store(id='memory-output'),  # 输出数据流
                  dcc.Store(id='memory-frames'),  # 输出筛选后的数据留
                  dcc.Interval(id='interval-frame',  # 用于数据更新
                 interval=300,
                 n_intervals=0),
                  graph,
                  # 显示数据框
                  DataTable(
    id='memory-table',
    columns=[{'name': i, 'id': i} for i in layout_options.columns]
)])

layout = html.Article([header, device, main])

网页应用的 replay 布局。

from dash import dcc, html

from utils.nav import page_header
from utils.toml import load_option
from utils.stream import Stream
from utils.graph import create_graph
# from utils.update import update_frame_layout
# 数据
layout_options = load_option('options/replay.toml')

stream = Stream(layout_options.save_path)

frame_section = html.Div([
        html.Div([
                  dcc.Input(id='replay-frame-start', value=0,
                            min=stream.min_id,
                            max=stream.max_id,
                            type="number", className='w3-col m1'),
                  dcc.Input(id='replay-frame-end', value=0,
                            min=stream.min_id,
                            max=stream.max_id,
                            readOnly='readonly',
                            type="number", className='w3-col m1')],
                 className='w3-row w3-pale-red')
    ])

# 布局
header = page_header(title=layout_options.title)

graph = create_graph('replay-view-graph', 'replay-feature-graph',
                     'replay-memory-class', layout_options)

main = html.Main([dcc.Store(id='replay-memory-frame'),  # 存储每帧数据
                  dcc.Store(id='replay-memory-output'),  # 输出数据流
                  dcc.Store(id='replay-memory-frames'),  # 输出筛选后的数据留
                  dcc.Interval(id='replay-interval-frame',  # 用于数据更新
                 interval=200,
                 n_intervals=0),
                 graph,
                 frame_section
                  ])

layout = html.Article([header, main])

网页应用的 watch 布局。

from dash import dcc, html

from utils.nav import page_header
from utils.toml import load_option
from utils.stream import Stream
from utils.graph import create_graph
from utils.update import update_frame_layout
# 数据
layout_options = load_option('options/watch.toml')

stream = Stream(layout_options.save_path)

frame_section = update_frame_layout('frame-slider', 'frame-start', 'frame-end', 'frame-run', 'frame-stop', stream)
# 布局
header = page_header(title=layout_options.title)

graph = create_graph('watch-view-graph', 'watch-feature-graph',
                     'watch-memory-class', layout_options)

main = html.Main([dcc.Store(id='watch-memory-frame'),  # 存储每帧数据
                  dcc.Store(id='watch-memory-output'),  # 输出数据流
                  dcc.Store(id='watch-memory-frames'),  # 输出筛选后的数据留
                  dcc.Interval(id='watch-interval-frame',  # 用于数据更新
                 interval=200,
                 n_intervals=0),
                 graph,
                 frame_section
                  ])

layout = html.Article([header, main])

网页应用的布局参数 options/

网页应用的主页布局参数。

class_names = [ "AEB", "CIPV", "ACC", "MCP",]
columns = [ "frame_id", "track_id", "origin", "class_name", "x", "y", "v_x", "v_y", "a_x", "a_y",]
title = "无人驾驶"

网页应用的 about 布局参数。

class_names = [ "AEB", "CIPV", "ACC", "MCP",]
columns = [ "frame_id", "track_id", "origin", "class_name", "x", "y", "v_x", "v_y", "a_x", "a_y",]
title = "APG UI"

网页应用的 record 布局参数。

save_path = "../out.h5"
class_names = [ "AEB", "CIPV", "ACC", "MCP",]
columns = [ "frame_id", "track_id", "origin", "class_name", "x", "y", "v_x", "v_y", "a_x", "a_y",]
title = "Dash UI"

网页应用的 replay 布局参数。

save_path = "../out3.h5"
class_names = [ "AEB", "CIPV", "ACC", "MCP",]
columns = [ "frame_id", "track_id", "origin", "class_name", "x", "y", "v_x", "v_y", "a_x", "a_y",]
title = "APG UI"

网页应用的 watch 布局参数。

save_path = "../out2.h5"
class_names = [ "AEB", "CIPV", "ACC", "MCP",]
columns = [ "frame_id", "track_id", "origin", "class_name", "x", "y", "v_x", "v_y", "a_x", "a_y",]
title = "APG UI"

网页应用的回调 callbacks/

网页应用的 record 回调。

import datetime
import pandas as pd
from dash.dependencies import Input, Output
from dash.exceptions import PreventUpdate

from utils.client import plot_frame, Canvas
from tools.frame import Shape

from utils.client import frame2pandas, simulate_shape
from app import app
from layouts.record import layout_options

frames = []

@app.callback(Output('live-update-text', 'value'),
              Input('interval-frame', 'n_intervals'))
def update_metrics(frame_id):
    '''⏲计时,更新时间和帧数'''
    now = datetime.datetime.now()
    now = now.strftime('%H:%M:%S')
    return f'{now} @ {frame_id}'


@app.callback(Output('memory-frame', 'data'),
              Output('memory-frames', 'data'),
              Input('interval-frame', 'n_intervals'))
def update_frame(frame_id):
    choice_class = layout_options.class_names + ['other']
    # shapes = [simulate_shape(frame_id, k, class_name)
    #           for k, class_name in enumerate(choice_class)]
    shapes = []
    k = 0
    for _ in range(10):
        for class_name in choice_class:
            shape = simulate_shape(frame_id, k, class_name)
            shapes.append(shape)
            # print(shape)
            k += 1
    
    frame = [shape.asdict() for shape in shapes]
    frame = frame2pandas(frame)
    frame.to_hdf(layout_options.save_path, key=f'frame_{frame_id}', mode='a')
    frame_dict = frame.to_dict('records')
    global frames
    frames.extend(frame_dict)
    if len(frames) > 500:
        frames = frames[-500:]
    return frame_dict, frames[1:]


@app.callback(Output('view-graph', 'figure'),
              Input('memory-frame', 'data'))
def update_view_graph_frame(frame):
    '''更新鸟瞰图'''
    canvas = Canvas()
    shapes = [Shape(**shape) for shape in frame]
    shapes = [canvas.to_shape(*shape.view) for shape in shapes]
    canvas.view.update_layout(shapes=shapes)
    canvas.update_base()
    return canvas.view


@app.callback(Output('memory-output', 'data'),
              Input('memory-frames', 'data'),
              Input('memory-class', 'value'))
def store_frame(frames, class_selected):
    df = pd.DataFrame.from_records(frames)
    if frames == None:
        raise PreventUpdate
    filtered = df[df['class_name'] == class_selected]
    return filtered.to_dict('records')


@app.callback(Output('memory-table', 'data'),
              Input('memory-output', 'data'))
def on_data_set_table(data):
    if data is None:
        raise PreventUpdate
    return data


@app.callback(Output('feature-graph', 'figure'),
              Input('memory-output', 'data'))
def on_data_set_graph(data):
    if data is None:
        raise PreventUpdate
    filtered = pd.DataFrame.from_records(data)
    fig = plot_frame(filtered)
    return fig

网页应用的 replay 回调。

import datetime
import pandas as pd
from dash.dependencies import Input, Output, State
from dash.exceptions import PreventUpdate

from utils.client import plot_frame, Canvas
from tools.frame import Shape

from app import app
from layouts.replay import stream

frames = []
frame_id = -1

@app.callback(
              Output('replay-frame-end', 'value'),
              Input('replay-interval-frame', 'n_intervals'),
              Input('replay-frame-start', 'value'))
def replay_frame_end(n_intervals, frame_start):
    global frame_id
    if frame_start is None or frame_start>stream.max_id or frame_start < stream.min_id or len(stream)==0 or frame_start+frame_id>stream.max_id:
        frame_id = 0
        raise PreventUpdate
    else:
        frame_id += 1
        return frame_start+frame_id


@app.callback(Output('replay-memory-frame', 'data'),
              Output('replay-memory-frames', 'data'),
              Input('replay-frame-end', 'value'))
def replay_frame(frame_end):
    if frame_end > stream.max_id or stream.max_id==0 or len(stream)==0:
        frame_end=0
        raise PreventUpdate
    else:
        global frames, frame_id
        frame = stream[frame_end]
        if isinstance(frame, str):
            raise PreventUpdate
        frame_dict = frame.to_dict('records')
        frames.extend(frame_dict)
        if len(frames) > 500:
            frames = frames[-500:]
        return frame_dict, frames[1:]

@app.callback(Output('replay-view-graph', 'figure'),
              Input('replay-memory-frame', 'data'))
def replay_view_graph_frame(frame):
    '''更新鸟瞰图'''
    if len(stream)==0:
        raise PreventUpdate
    canvas = Canvas()
    shapes = [Shape(**shape) for shape in frame]
    shapes = [canvas.to_shape(*shape.view) for shape in shapes]
    canvas.view.update_layout(shapes=shapes)
    canvas.update_base()
    return canvas.view


@app.callback(Output('replay-memory-output', 'data'),
              Input('replay-memory-frames', 'data'),
              Input('replay-memory-class', 'value'))
def replay_store_frame(frames, class_selected):
    if len(stream)==0:
        raise PreventUpdate
    else:
        df = pd.DataFrame.from_records(frames)
        if frames == None:
            raise PreventUpdate
        filtered = df[df['class_name'] == class_selected]
        return filtered.to_dict('records')


@app.callback(Output('replay-feature-graph', 'figure'),
              Input('replay-memory-output', 'data'))
def replay_on_data_set_graph(data):
    if data is None:
        raise PreventUpdate
    else:
        filtered = pd.DataFrame.from_records(data)
        fig = plot_frame(filtered)
        return fig

网页应用的 watch 回调。

import pandas as pd
from dash.dependencies import Input, Output, State
from dash.exceptions import PreventUpdate
from dash import callback_context

from utils.client import plot_frame, Canvas
from tools.frame import Shape

from app import app
from layouts.watch import stream


@app.callback(
    Output('frame-slider', 'value'),
    Output('frame-start', 'value'),
    Output('frame-end', 'value'),
    Input('frame-slider', 'value'),
    Input('frame-start', 'value'),
    Input('frame-end', 'value'))
def callback(slider_value, frame_start, frame_end):
    ctx = callback_context
    trigger = ctx.triggered[0]
    value = trigger['value']
    if not value:
        raise PreventUpdate
    else:
        trigger_id = trigger["prop_id"].split(".")[0]
        if trigger_id == 'frame-slider':
            slider_value = value
        elif trigger_id == 'frame-start':
            slider_value[0] = value
        elif trigger_id == 'frame-end':
            slider_value[1] = value
        frame_start = min(slider_value)
        frame_end = max(slider_value)
        slider_value = frame_start, frame_end
    return slider_value, frame_start, frame_end


@app.callback(Output('watch-view-graph', 'figure'),
              Input('frame-end', 'value'))
def replay_view_graph_frame(frame_end):
    '''更新鸟瞰图'''
    if len(stream)==0:
        raise PreventUpdate
    frame = stream[frame_end]
    canvas = Canvas()
    shapes = [Shape(**shape) for shape in frame.to_dict('records')]
    shapes = [canvas.to_shape(*shape.view) for shape in shapes]
    canvas.view.update_layout(shapes=shapes)
    canvas.update_base()
    return canvas.view


@app.callback(Output('watch-memory-output', 'data'),
              Input('frame-start', 'value'),
              Input('frame-end', 'value'),
              Input('watch-memory-class', 'value'))
def store_frame(frame_start, frame_end, class_selected):
    if frame_start == frame_end or len(stream)==0:
        raise PreventUpdate
    df = stream[frame_start:frame_end+1]
    filtered = df[df['class_name'] == class_selected]
    return filtered.to_dict('records')

@app.callback(Output('watch-feature-graph', 'figure'),
              Input('watch-memory-output', 'data'))
def on_data_set_graph(data):
    if data is None or len(stream)==0:
        raise PreventUpdate
    filtered = pd.DataFrame.from_records(data)
    fig = plot_frame(filtered)
    return fig

鸟瞰图

网页应用的鸟瞰图 view 回调。

import plotly.graph_objects as go


class CanvasMeta:
    def __init__(self):
        self.x_range = [-7.5, 7.5]
        self.y_range = [-30, 210]
        self.y_scale = 270/15
        self._fig_height = 750
        self.view = go.Figure()
        self.view.update_layout(
            plot_bgcolor='lightgrey',
            paper_bgcolor='lightslategrey',
            font_color='goldenrod'
        )

    @property
    def base_shapes(self):
        center_vline = {'x': 0,
                        'line_width': 5,
                        'line_dash': 'dash',
                        'line_color': 'green',
                        'annotation_text': '中心线',
                        'annotation': {'font_size': 12, 'font_family': '宋体'},
                        'opacity': 0.25}
        vline1 = {'x': 3,
                 'line_width': 5,
                 'line_dash': 'dash',
                 'line_color': 'green',
                 'annotation_text': '3',
                 'annotation': {'font_size': 12, 'font_family': '宋体'},
                 'opacity': 0.2}
        vline2 = {'x': -3,
                 'line_width': 5,
                 'line_dash': 'dash',
                 'line_color': 'green',
                 'annotation_text': '-3',
                 'annotation': {'font_size': 12, 'font_family': '宋体'},
                 'opacity': 0.2}
        hline1 = {'y': 0,
                  'line_width': 5,
                  'line_dash': 'dash',
                  'line_color': 'red',
                  'annotation_text': '0',
                  'opacity': 0.3}
        hline2 = {'y': 60,
                  'line_width': 2,
                  'line_dash': 'dash',
                  'line_color': 'red',
                  'annotation_text': '60',
                  'opacity': 0.3}
        hline3 = {'y': 120,
                  'line_width': 2,
                  'line_dash': 'dash',
                  'line_color': 'yellow',
                  'annotation_text': '120',
                  'opacity': 0.3}
        hline4 = {'y': 180,
                  'line_width': 2,
                  'line_dash': 'dash',
                  'line_color': 'blue',
                  'annotation_text': '180',
                  'opacity': 0.3}
        # 本车
        car = {'type': 'rect',
               'xref': 'x',
               'yref': 'y',
               'x0': -0.25,
               'x1': 0.25,
               'y0': -3.5,
               'y1': 3.5,
               'fillcolor': 'blue',
               'opacity': 0.8,
               'line': {'color': 'Lightgreen', 'width': 1}}
        return {
            'center_vline': center_vline,
            'vline1': vline1,
            'vline2': vline2,
            'hline1': hline1,
            'hline2': hline2,
            'hline3': hline3,
            'hline4': hline4,
            'host_vehicle': car
        }

    def update_base(self):
        base_shapes = self.base_shapes
        self.view.add_vline(**base_shapes['center_vline'])
        self.view.add_vline(**base_shapes['vline1'])
        self.view.add_vline(**base_shapes['vline2'])
        self.view.add_hline(**base_shapes['hline1'])
        self.view.add_hline(**base_shapes['hline2'])
        self.view.add_hline(**base_shapes['hline3'])
        self.view.add_hline(**base_shapes['hline4'])
        self.view.add_shape(**base_shapes['host_vehicle'])
        self.view.update_xaxes(range=self.x_range,
                               showline=True,
                               linewidth=2,
                               linecolor='grey',
                               mirror=True
                               )

        self.view.update_yaxes(range=self.y_range,
                               showline=True,
                               linewidth=2,
                               linecolor='grey',
                               mirror=True
                               )
        self.view.update_layout(
            margin={'l': 20, 'r': 20, 't': 20, 'b': 10},
            height=self._fig_height,
        )

    def to_bbox(self, x, y, w, h):
        return {
            'x0': x-w,
            'x1': x+w,
            'y0': y-h,
            'y1': y+h
        }

    def to_obj(self, _type, x, y, w, h,
               fillcolor, opacity,
               line_color='LightSeaGreen'):
        '''视觉目标'''
        obj = ({
            'type': _type,
            'xref': "x", 'yref': "y",
            **self.to_bbox(x, y, w, h),
            'fillcolor': fillcolor,
            'opacity': opacity,
            'line': {
                'color': line_color,
                'width': 2,

            }
        })
        return obj

    def to_visual_obj(self, x, y):
        '''视觉目标'''
        return self.to_obj('circle', x, y, 0.27, 2.8, 'orange', 0.8, 'grey')

    def to_radar_obj(self, x, y):
        '''视觉目标'''
        return self.to_obj('rect', x, y, 0.3, 3, 'white', 0.4, 'black')

    def to_fusion_obj(self, x, y):
        '''融合目标'''
        obj = self.to_obj('rect', x, y, 0.36, 3.8, 'yellow', 0.4, 'red')
        return obj

    def to_shape(self, _type, x, y):
        if _type == 'visual':
            obj = self.to_visual_obj(x, y)
        elif _type == 'radar':
            obj = self.to_radar_obj(x, y)
        elif _type == 'fusion':
            obj = self.to_fusion_obj(x, y)
        else:
            obj = {}
        return obj

工具

网页应用的鸟瞰图 view 回调。

from dataclasses import dataclass, asdict


@dataclass
class Shape:
    frame_id: int # 帧号
    track_id: int
    origin: str  # 目标物来源,取值 'visual', 'radar', 'fusion'
    class_name: str  # ACC主目标,AEB主目标,CIPV目标,MCP目标,other
    x: float  # 横向距离
    y: float  # 纵向距离
    v_x: float  # 横向速度
    v_y: float  # 纵向速度
    a_x: float  # 横向加速度
    a_y: float  # 纵向加速度

    def __repr__(self):
        obj = f'{self.origin.capitalize()}' \
            f'(track_id={self.track_id}, class_name={self.class_name}, x={self.x}, y={self.y},' \
            f'v_x={self.v_x}, v_y={self.v_y}, ' \
            f'a_x={self.a_x}, a_y={self.a_y})'
        return obj

    @property
    def view(self):
        return self.origin, self.x, self.y

    @property
    def prop(self):
        v = {'x': self.x,
             'y': self.y,
             'v_x': self.v_x,
             'v_y': self.v_y,
             'a_x': self.a_x,
             'a_y': self.a_y}
        return v

    def asdict(self):
        return asdict(self)


class Frame:
    def __init__(self, shapes):
        self.shapes = shapes

    @property
    def bunch(self):
        _bunch = {}
        for shape in self.shapes:
            class_name = shape.class_name
            _bunch.setdefault(class_name, []).append(shape)
        return _bunch