T
traeai
登录
返回首页
freeCodeCamp.org

QuRT:你手机处理器内的实时操作系统 [完整手册]

8.7Score
QuRT:你手机处理器内的实时操作系统 [完整手册]

TL;DR · AI 摘要

QuRT是高通Hexagon DSP上的实时操作系统,专为微秒级确定性调度设计,负责处理手机中的音频、传感器和AI等低延迟任务,通过FastRPC与主CPU通信。

核心要点

  • QuRT是高通Hexagon DSP的专用实时操作系统,支持优先级抢占式调度,确保微秒级响应。
  • 音频处理、唤醒词检测、传感器融合等关键功能在DSP上由QuRT独立运行,减轻主CPU负载。
  • 开发者可通过Hexagon SDK和FastRPC在QuRT上部署代码,使用模拟器或开发板进行调试。

结构提纲

按章节快速跳转。

  1. §引言:QuRT的作用

    介绍QuRT作为高通DSP上的实时操作系统的核心角色。

  2. 说明QuRT在SoC中与ARM CPU协同工作的双处理器架构。

  3. 配置Hexagon SDK、工具链及仿真器以开始QuRT开发。

  4. 使用SCons和.min文件管理QuRT项目的构建流程。

  5. 涵盖线程、同步、内存、中断和定时器等RTOS核心机制。

  6. 通过FastRPC实现ARM CPU与Hexagon DSP之间的函数调用。

思维导图

用一张图看清主题之间的关系。

查看大纲文本(无障碍 / 无 JS 友好)
  • QuRT 实时操作系统
    • 架构定位
      • 运行于 Hexagon DSP
      • 与 ARM CPU 协同
    • 核心特性
      • 优先级抢占调度
      • 微秒级确定性
      • POSIX 类接口
    • 开发支持
      • Hexagon SDK
      • FastRPC 通信
      • SCons + .min 构建

金句 / Highlights

值得收藏与分享的关键句。

  • The operating system orchestrating that work on the DSP is QuRT (Qualcomm Real-Time Operating System), a POSIX-like, priority-based, preemptive RTOS purpose-built for Qualcomm's Hexagon Digital Signal

    Introduction

    ⬇︎ 下载 PNG𝕏 分享到 X
  • Where Linux is a general-purpose operating system designed for flexibility, QuRT is a precision instrument designed for deterministic, microsecond-level scheduling.

    Why QuRT Matters

    ⬇︎ 下载 PNG𝕏 分享到 X
  • The two processors communicate through a framework called FastRPC. You write code for the DSP side using the Hexagon SDK, and QuRT is the OS that executes your code on the Hexagon processor.

    Where QuRT Fits in the System

    ⬇︎ 下载 PNG𝕏 分享到 X
  • The `setup_sdk_env.source` script configures your shell with paths to the compiler, simulator, and libraries.

    Installing the Hexagon SDK

    ⬇︎ 下载 PNG𝕏 分享到 X
  • Projects live inside the SDK tree and are configured through `.min` files, which are declarative build descriptors.

    Project Structure

    ⬇︎ 下载 PNG𝕏 分享到 X
  • Timing won't match real hardware, but the simulator is valuable for validating correctness during development.

    Verifying Your Setup

    ⬇︎ 下载 PNG𝕏 分享到 X
#QuRT#Hexagon DSP#实时操作系统#高通#FastRPC
打开原文

![图片 1:手机处理器中的实时操作系统 QuRT [完整手册]](https://cdn.hashnode.com/uploads/covers/5e1e335a7a1d3fcc59028c64/e20376ee-713a-473e-946c-5c837eef0b12.png) 每部高通芯片驱动的手机中的 Hexagon DSP 负责处理唤醒词检测、传感器数据处理、降噪以及蓝牙音频流——而所有这些工作进行时,主 ARM CPU 正在运行 Android 系统。

在 DSP 上协调这些任务的操作系统就是 QuRT(高通实时操作系统),这是一种类 POSIX、基于优先级、可抢占式的实时操作系统(RTOS),专为高通的 Hexagon 数字信号处理器(DSP)设计。

本文是一份关于高通实时操作系统的实用指南。内容涵盖 QuRT 的方方面面:架构、线程创建、同步原语、内存管理、中断处理、定时器、通过 FastRPC 实现的处理器间通信,以及完整的传感器融合流水线。每个概念都配有可运行的代码示例,并解释其底层工作原理。

目录

为什么 QuRT 很重要

设想一次手机通话过程中的场景:设备同时对麦克风音频执行降噪处理、运行神经网络进行唤醒词识别、以每秒 400 次的频率读取加速度计数据,并管理蓝牙音频流。

这些任务没有一项是在主 ARM CPU 上运行的。它们全部发生在高通的 Hexagon DSP 上,而负责协调这一切的操作系统正是 QuRT

QuRT(高通实时操作系统)是一种类 POSIX、基于优先级、可抢占式的 RTOS,运行于高通 Hexagon 数字信号处理器之上。如果说 Linux 是一种为灵活性设计的通用操作系统,那么 QuRT 就是一个为确定性、微秒级调度而生的精密工具。

QuRT 在系统中的位置

图片 2:高通 SoC 内部的双处理器架构

此图展示了高通 SoC 内部的双处理器架构。左侧的 ARM CPU 运行 Android 或 Linux,负责通用应用逻辑;右侧的 Hexagon DSP 运行 QuRT,处理对延迟敏感的工作负载:音频处理、传感器融合、机器学习推理和计算卸载。

两个处理器通过名为 FastRPC 的框架进行通信。你可以使用 Hexagon SDK 编写 DSP 端的代码,而 QuRT 就是负责在 Hexagon 处理器上执行你代码的操作系统。

搭建开发环境

在编写任何 QuRT 代码之前,你需要准备工具链以及模拟器或实际硬件。

前提条件

你需要安装 Hexagon SDK(版本 3.5+ 或 4.x),这是高通官方提供的 SDK,包含 Hexagon Tools 编译器工具链。

为了运行你的代码,你可以选择使用高通开发板(例如 Robotics RB5 或 SM8250 HDK),或者使用 SDK 自带的模拟器。推荐使用运行 Ubuntu 18.04 或 20.04 的 Linux 主机进行开发。

安装 Hexagon SDK

code
# 从高通开发者门户下载 Hexagon SDK
# https://developer.qualcomm.com/software/hexagon-dsp-sdk

# 解压并运行安装程序
chmod +x qualcomm_hexagon_sdk_4_x_x_x.bin
./qualcomm_hexagon_sdk_4_x_x_x.bin

# 设置环境变量
export HEXAGON_SDK_ROOT=~/Qualcomm/Hexagon_SDK/4.x.x.x
export HEXAGON_TOOLS_ROOT=~/Qualcomm/Hexagon_SDK/4.x.x.x/tools
source $HEXAGON_SDK_ROOT/setup_sdk_env.source

该步骤将 SDK 安装到你的主目录,并设置构建系统和模拟器所需的环境变量。setup_sdk_env.source 脚本会配置你的 shell,使其包含编译器、模拟器和库文件的路径。

验证你的环境

code
# 检查 Hexagon 编译器
hexagon-clang --version

# 你应该看到类似如下输出:
# Qualcomm Hexagon Clang version 8.x.xx

# 运行 QuRT 模拟器以确认其正常工作
$HEXAGON_SDK_ROOT/tools/HEXAGON_Tools/8.x.xx/Tools/bin/hexagon-sim \
    --simulated_returnval --cosim_file \
    $HEXAGON_SDK_ROOT/libs/common/qurt/computev66/sdksim_bin/osam.cfg \
    -- $HEXAGON_SDK_ROOT/libs/common/qurt/computev66/sdksim_bin/bootimg.pbn

第一条命令确认 Hexagon Clang 编译器已正确安装并可访问。第二条命令启动 QuRT 模拟器,它类似于 Android 模拟器:让你无需物理硬件即可测试 QuRT 程序。虽然时序无法完全匹配真实硬件,但模拟器在开发过程中对验证功能正确性非常有价值。

项目结构

Hexagon SDK 使用 SCons 作为其底层构建系统。项目位于 SDK 目录树内,并通过 .min 文件进行配置,这些文件是声明式构建描述符,由 SDK 的 SCons 架构解析。

一个最小化的项目结构如下所示:

code
$HEXAGON_SDK_ROOT/examples/my_qurt_project/
├── src/
│   └── main.c              # 你的 QuRT 应用代码
├── inc/
│   └── my_module.h         # 头文件
├── hexagon.min              # 针对 Hexagon DSP 端的 SCons 构建配置
└── android.min              # 针对 ARM 端的 SCons 构建配置(若使用 FastRPC)

hexagon.min 文件用于配置 DSP 端的构建,而 android.min 则在使用 FastRPC 进行跨处理器通信时处理 ARM 端的构建。这两个文件都会被位于 $HEXAGON_SDK_ROOT/SConstruct 的 SDK 顶层 SConstruct 文件读取。在 SDK 目录树内的项目不需要单独的 MakefileSConscript 文件。

使用 SCons 进行构建配置

一个最简化的 hexagon.min 构建文件如下所示:

code
# hexagon.min - DSP 端的 SCons 构建描述符

BUILD_LIBS = libmy_qurt_app

# 源文件
libmy_qurt_app_C_SRCS = src/main.c

# QuRT 操作系统库
libmy_qurt_app_LIBS = atomic rpcmem

编译器标志

libmy_qurt_app_HEXAGON_CFLAGS = -O2 -Wall

链接 QuRT 库

libmy_qurt_app_DLLS = libmy_qurt_app_skel

code

`.min` 文件格式是 Hexagon SDK 的 SCons 构建系统专用的。`BUILD_LIBS` 指定库的目标名称,`C_SRCS` 列出源文件,`LIBS` 指定要链接的库,`HEXAGON_CFLAGS` 设置编译器标志,而 `DLLS` 定义共享库的输出名称,其中 `_skel` 后缀是 FastRPC 为 DSP 端实现定义的约定。

在底层,SDK 的 `SConstruct` 会遍历项目树,读取每个 `.min` 文件,并将其声明转换为 SCons 构建目标。你在构建时传入的 `V`(变体)参数用于选择目标架构、构建类型和工具链版本。例如,`V=hexagon_Release_dynamic_toolv84_v66` 表示:针对 Hexagon 架构、发布模式、动态链接,使用 v84 工具链并面向 v66 DSP 架构进行构建。

对于需要比 `.min` 格式更精细控制的项目,你可以编写一个独立的 `SConscript` 文件:

SConscript - 面向 QuRT 项目的独立 SCons 构建脚本

Import('env')

env = env.Clone()

添加头文件路径

env.Append(CPPPATH = ['inc'])

编译器标志

env.Append(CCFLAGS = ['-O2', '-Wall'])

构建共享库

sources = ['src/main.c'] libs = ['atomic', 'rpcmem']

env.SharedLibrary( target = 'libmy_qurt_app_skel', source = sources, LIBS = libs )

code

使用 `SConscript` 方法可以完全访问 SCons 的功能:条件编译、自定义构建步骤、依赖扫描以及多变体构建。`Import('env')` 调用导入由 SDK 顶层 `SConstruct` 配置的构建环境,该环境已经包含了 Hexagon 编译器路径、QuRT 头文件和系统库的信息。`env.Clone()` 创建一个副本,确保你的修改不会影响项目树中的其他项目。

## QuRT 编程模型

QuRT 编程的核心思想非常简单明了:

**QuRT 是一个基于优先级的抢占式实时操作系统(RTOS)。** 这意味着所有代码都在线程中运行(不存在裸机主循环)。高优先级线程总是会立即抢占低优先级线程,无需协商。相同优先级的线程则采用时间片轮转调度。

调度器是无滴答(tick-less)的,意味着它不会周期性唤醒,仅在发生状态变化时运行,例如线程阻塞、信号被设置或更高优先级线程变为就绪状态。

优先级等级(0-255,数值越小优先级越高)

000 ┃ ████ 中断处理程序(请勿修改) 001 ┃ ████ 关键系统任务 ... ┃ 064 ┃ ████ 你的高优先级音频处理 ... ┃ 128 ┃ ████ 你的中等优先级传感器融合 ... ┃ 192 ┃ ████ 你的低优先级日志记录/上报 ... ┃ 255 ┃ ████ 空闲线程(QuRT 内建后台任务)

code

此优先级图展示了 QuRT 的 256 个优先级级别的典型分配方式。优先级 0 是**最高**优先级,255 是**最低**优先级。这与 FreeRTOS 相反,在 FreeRTOS 中数值越大表示优先级越高。

中断处理程序占据最高优先级范围,系统任务紧随其后,用户线程通常位于中间范围。优先级为 255 的空闲线程仅在没有其他任务就绪时运行。

## 创建你的第一个 QuRT 线程

最简单的 QuRT 程序是创建一个线程,打印消息后退出。

/* main.c - 第一个 QuRT 程序 */

#include <stdio.h> #include <stdlib.h> #include <qurt.h>

#define STACK_SIZE 4096

/* 线程栈必须 8 字节对齐 */ static char thread_stack[STACK_SIZE] __attribute__((aligned(8)));

void my_thread_func(void *arg) { int thread_id = (int)(uintptr_t)arg;

printf("Hello from QuRT thread %d!\n", thread_id); printf("My thread ID: %lu\n", qurt_thread_get_id());

/* 线程必须显式退出 */ qurt_thread_exit(QURT_EOK); }

int main(void) { qurt_thread_t thread_id; qurt_thread_attr_t attr;

printf("Main thread starting on QuRT!\n");

/* 初始化线程属性 */ qurt_thread_attr_init(&attr);

/* 配置线程 */ qurt_thread_attr_set_name(&attr, "my_first_thread"); qurt_thread_attr_set_stack_addr(&attr, thread_stack); qurt_thread_attr_set_stack_size(&attr, STACK_SIZE); qurt_thread_attr_set_priority(&attr, 128); /* 中等优先级 */

/* 创建并启动线程 */ int result = qurt_thread_create(&thread_id, &attr, my_thread_func, (void *)42);

if (result != QURT_EOK) { printf("Thread creation failed with error: %d\n", result); return -1; }

printf("Thread created successfully! ID: %lu\n", thread_id);

/* 等待线程结束 */ int status; qurt_thread_join(thread_id, &status);

printf("Thread finished with status: %d\n", status); return 0; }

code

该程序演示了 QuRT 中线程创建的四个步骤。第一步,调用 `qurt_thread_attr_init()` 初始化线程属性结构体;第二步,程序通过设置调试名称(出现在崩溃转储中)、栈地址、栈大小和优先级来配置线程;第三步,调用 `qurt_thread_create()` 创建并立即启动线程,传入函数指针和参数;第四步,调用 `qurt_thread_join()` 阻塞当前线程,直到新线程调用 `qurt_thread_exit()` 结束。

有两个细节至关重要:QuRT 不会自动为你分配栈内存,你必须提供一个静态分配且 8 字节对齐的缓冲区;此外,每个线程在返回前都必须调用 `qurt_thread_exit()`。如果线程函数直接返回而不调用 exit,其行为是未定义的。

### 线程创建流程

qurt_thread_attr_init() │ ▼ ┌─────────────────────┐ │ 设置名称 │ │ 设置栈地址 │ │ 设置栈大小 │ │ 设置优先级 │ └─────────────────────┘ │ ▼ qurt_thread_create() │ ▼ 线程开始运行 ──► my_thread_func() │ │ ▼ ▼ qurt_thread_join() qurt_thread_exit() (等待线程退出) (通知“我已完成”)

code

此流程展示了单个线程的生命周期。属性结构体充当配置对象:你先设置所有线程参数,然后将其传递给 `qurt_thread_create()`。一旦创建完成,线程便开始执行其入口函数。当入口函数调用 `qurt_thread_exit()` 时,线程终止,任何在 `qurt_thread_join()` 中阻塞的线程将被唤醒,并接收到退出状态码。

## 线程创建的内部工作原理

大多数教程都会跳过 `qurt_thread_create()` 内部发生的事情。理解其内部机制有助于更清晰地进行调试和优先级设计决策。

### 内核在线程创建期间的操作

当你调用 `qurt_thread_create()` 时,实际上是在向 QuRT 内核发起一个**系统调用**。内核会按顺序执行五个步骤:

你的代码调用 qurt_thread_create() │ ▼ ┌──────────────────────────────────────────────────────────┐ │ 1. 验证 │ │ • 栈指针是否非空且对齐? │ │ • 栈大小是否 ≥ 最小值(通常为 2KB)? │ │ • 优先级是否在 0-255 范围内? │ │ • 入口函数指针是否非空? │ │ (若任一检查失败 → 返回 QURT_EINVALID) │ ├──────────────────────────────────────────────────────────┤ │ 2. 分配线程控制块(TCB) │ │ • QuRT 分配一个内核侧的数据结构 │ │ • 该结构包含:线程 ID、优先级、状态、保存的寄存器、 │ │ 信号掩码、互斥锁等待列表等 │ ├──────────────────────────────────────────────────────────┤ │ 3. 初始化栈帧 │ │ • 内核在你提供的栈内存顶部建立一个模拟的栈帧 │ │ • 它写入初始寄存器值: │ │ ┌──────────────────────────────────────┐ │ │ │ 栈顶(高地址) │ │ │ │ ┌──────────────────────────────────┐│ │ │ │ │ PC = my_thread_func (入口函数) ││ │ │ │ │ SP = stack_addr + stack_size ││ │ │ │ │ R0 = arg (你的 void* 参数) ││ │ │ │ │ LR = qurt_thread_exit ││ │ │ │ │ SR = 默认状态寄存器 ││ │ │ │ │ R1-R31 = 0 ││ │ │ │ └──────────────────────────────────┘│ │ │ │ ...(栈其余部分未被触及)... │ │ │ │ 栈底(低地址) │ │ │ └──────────────────────────────────────┘ │ ├──────────────────────────────────────────────────────────┤ │ 4. 插入就绪队列 │ │ • 将 TCB 添加到调度器的就绪队列中 │ │ 对应的优先级层级 │ │ • 将线程状态设为 READY(就绪) │ ├──────────────────────────────────────────────────────────┤ │ 5. 触发一次重新调度 │ │ • 调度器检查:“这个新线程的优先级是否高于当前 │ │ 正在运行的线程?” │ │ • 如果是:立即发生上下文切换 │ │ (调用线程被抢占) │ │ • 如果否:新线程将在就绪队列中等待,直到成为 │ │ 优先级最高的可运行线程 │ └──────────────────────────────────────────────────────────┘ │ ▼ qurt_thread_create() 返回给调用者 (但新线程可能已经正在运行!)

code

这一流程中最令人意外的是第 5 步。如果新线程的优先级高于创建它的线程,那么**新线程会在**`qurt_thread_create()`**返回之前就开始运行**。创建线程在调用中途即被抢占。这正是“抢占式”调度的实际含义:调度器不会等待合适时机,而是立即强制执行优先级排序。

### 栈帧如何启动你的函数

当调度器首次进行上下文切换以运行一个全新线程时,它所做的操作与任何上下文切换完全相同:从 TCB 中恢复已保存的寄存器,并跳转到保存的程序计数器(PC)。

对于一个新线程,这些寄存器是在第 3 步中由内核人为构造出来的。**PC(程序计数器)**被设为 `my_thread_func`,因此处理器跳转到你的函数。**R0** 被设为你的 `arg` 参数,因此你的函数能将其作为第一个参数接收(遵循 Hexagon 调用约定)。**SP(栈指针)**被设为你的栈顶,因此你的函数拥有可用的栈空间。而**LR(链接寄存器)**被设为 `qurt_thread_exit`,因此如果你的函数正常返回(你不应依赖这一点),它将自动进入 `qurt_thread_exit`。

表象: ────────────── 对你的线程函数而言,看起来就像有人 “正常调用”了它,并传入了你指定的参数。

真相: ────────────── 调度器恢复了一组人工构造的寄存器, 让处理器“以为”它正从一次函数调用中返回, 从而跳转至你的入口点。

code

就像在一个你从未进入过的房间里醒来,  
但有人把一切都安排得如此完美,  
以至于你没有意识到自己并非从门口走进来的。  

此图对比了程序员的心理模型(普通函数调用)与硬件层面实际发生的情况(通过寄存器恢复模拟一次函数调用)。线程函数无法区分这两种情况,而这正是关键所在。内核制造了一种无缝的错觉。

上下文切换过程详解

考虑一个具体例子:线程 A(优先级 128)创建了线程 B(优先级 64,更高优先级)。以下时间线展示了每一步发生的事情:

code
时间 ──────────────────────────────────────────────►

线程 A (优先级 128)          内核/调度器               线程 B (优先级 64)
────────────────           ────────────────           ────────────────
调用                      
qurt_thread_create()       
   │                       
   ├─► 系统调用 ──────►     验证参数
                            分配 TCB
                            设置栈帧
                            将 B 插入就绪队列
                            
                            “B (64) > A (128)? 是。”
                            
                            保存 A 的寄存器   ──┐
                            到 A 的 TCB       │
                                               │
                            加载 B 的寄存器 ◄─┘
                            来自 B 的 TCB
                            (合成的寄存器值)
                            
                            跳转到 PC ────────► my_thread_func(arg)
                                               │
                                               │ 执行工作...
                                               │ 调用 qurt_thread_exit()
                                               │
                            B 从就绪队列移除 ◄─── 退出系统调用
                            
                            “下一个是谁?A。”
                            
                            加载 A 的寄存器
   │                        跳转到 A 的 PC
   │◄──────────────────────
   │
   ├─► qurt_thread_create()
   │   返回 QURT_EOK
   │
   ▼ 继续执行...

从线程 A 的角度来看,qurt_thread_create() 只是一个耗时较长的函数调用。线程 A 完全不知道自己曾被挂起,也不知道在那段时间里线程 B 已经运行完毕。

调度器使得抢占对被抢占的线程完全透明。这是抢占式调度的一个基本特性:线程无需相互协作,甚至不必知道彼此的存在。

线程控制块内容

TCB(Thread Control Block)是内核用于跟踪每个线程的内部数据结构。你永远不会直接访问它,但理解其内容有助于解释许多 QuRT 的行为:

code
/* 概念上的 TCB 布局(简化版,非实际 QuRT 源码) */
struct qurt_tcb {
    /* 身份信息 */
    qurt_thread_t   thread_id;
    char            name[16];
    
    /* 调度相关 */
    uint8_t         base_priority;
    uint8_t         effective_priority; /* 可能因优先级继承而不同 */
    uint8_t         state;             /* READY, RUNNING, BLOCKED, SUSPENDED */
    
    /* 保存的 CPU 上下文(在上下文切换时填充) */
    uint32_t        saved_regs[32];
    uint32_t        saved_pc;
    uint32_t        saved_sp;
    uint32_t        saved_sr;
    
    /* 栈信息(用于调试和溢出检测) */
    void           *stack_base;
    size_t          stack_size;
    
    /* 阻塞信息 */
    void           *wait_object;  /* 正在等待的互斥锁/信号/管道 */
    uint32_t        wait_mask;    /* 正在等待的信号位 */
    
    /* 链表指针 */
    struct qurt_tcb *next_ready;
    struct qurt_tcb *next_waiting;
    
    /* Join 支持 */
    int             exit_status;  /* 传递给 qurt_thread_exit() 的值 */
    qurt_thread_t   joiner;      /* 在 qurt_thread_join() 中等待的线程 */
};

TCB 存储了调度器所需的一切信息:身份信息(线程 ID 和调试名称)、调度状态(基础和有效优先级、当前状态)、保存的 CPU 上下文(全部 32 个通用寄存器以及 PC、SP 和状态寄存器)、栈边界、阻塞信息(线程正在等待什么)、就绪队列和等待队列的链表指针,以及支持 join 操作的字段。

当启用优先级继承时,effective_priority 字段可能与 base_priority 不同,这将在同步章节中详细说明。

线程状态机

一个 QuRT 线程始终处于以下四种状态之一:

code
qurt_thread_create()
                           │
                           ▼
                    ┌──────────┐
          ┌─────────│  READY   │◄──────────────────────────┐
          │         └──────────┘                           │
          │              │ ▲                               │
          │  调度器选择   │ │ 被更高优先级                  │
          │  该线程      │ │ 线程抢占                       │
          │              │ │                              │
          │              ▼ │                               │
          │         ┌──────────┐     信号/互斥锁/          │
          │         │ RUNNING  │     定时器事件            │
          │         └──────────┘     唤醒线程               │
          │              │                                 │
          │  线程调用    │                                 │
          │  阻塞式 API: │                                 │
          │  - mutex_lock│                                 │
          │  - signal_   │                                 │
          │    wait      │                                 │
          │  - pipe_     │                                 │
          │    receive   ▼                                 │
          │         ┌──────────┐                           │
          │         │ BLOCKED  │───────────────────────────┘
          │         └──────────┘
          │
          │  qurt_thread_exit()
          │         │
          │         ▼
          │    ┌──────────┐
          └───►│  DEAD    │
               └──────────┘
  • READY 表示线程可以运行,正在等待一个硬件线程槽位。
  • RUNNING 表示线程当前正在某个硬件线程上执行(每个硬件线程槽位在同一时间只能有一个线程处于此状态)。
  • BLOCKED 表示线程正在等待某个外部事件:例如互斥锁被释放、信号被触发或定时器超时。
  • DEAD 表示线程已调用 qurt_thread_exit()。如果有其他线程对该线程调用了 qurt_thread_join(),则该线程将接收到退出状态。

硬件线程槽位

Hexagon DSP 是一种支持硬件多线程的处理器,每个核心包含多个硬件线程槽位(通常为 2 到 4 个)。这意味着 QuRT 可以在单个核心上真正同时运行多个线程,而不仅仅是通过时间片轮转实现并发。

code
┌─────────────────────────────────────────┐
│          Hexagon DSP 核心               │
│                                         │
│  ┌───────────┐  ┌───────────┐           │
│  │ 硬件线程  │  │ 硬件线程  │           │
│  │ 槽位 0    │  │ 槽位 1    │  ...      │
│  │           │  │           │           │
│  │ 线程 A    │  │ 线程 B    │           │
│  │ (运行中)  │  │ (运行中)  │           │
│  └───────────┘  └───────────┘           │
│                                         │
│  就绪队列: [C, D, E, F, ...]            │
│  调度器使用最高优先级的就绪线程         │
│  填充硬件槽位                           │
└─────────────────────────────────────────┘

此图展示了一个具有两个硬件线程槽位的 Hexagon 核心。每个槽位可独立且同时地执行一个线程。调度器会用最高优先级的就绪线程填充这些硬件槽位。当软件线程数量超过硬件槽位数量时,较低优先级的线程会被时间片调度。但最高优先级的线程将独占硬件槽位,完全无需上下文切换。

在典型的 Hexagon v66 架构中(支持 4 个硬件线程),前 4 个最高优先级的线程各自拥有独立的执行流水线。仅当某个线程阻塞,或有更高优先级的线程被唤醒并抢占其槽位时,才会发生上下文切换。这正是 QuRT 实现极低调度延迟的原因。

完整的线程生命周期

以下代码展示了完整的线程生命周期,并附有注释说明 QuRT 在每一步的操作:

code
static char stack[8192] __attribute__((aligned(8)));

void my_func(void *arg)
{
    /* 状态:RUNNING。栈已初始化,R0 寄存器包含 arg 参数 */
    int val = *(int *)arg;

    qurt_mutex_lock(&some_mutex);
    /* 如果互斥锁已被占用:状态变为 BLOCKED,直到持有者解锁 */

    shared_data = val;
    qurt_mutex_unlock(&some_mutex);

    qurt_thread_exit(QURT_EOK);
    /* 状态变为 DEAD。等待 join 的线程(若有)将被唤醒 */
}

int main(void)
{
    qurt_thread_t tid;
    qurt_thread_attr_t attr;
    int my_arg = 42;

    qurt_thread_attr_init(&attr);
    qurt_thread_attr_set_stack_addr(&attr, stack);
    qurt_thread_attr_set_stack_size(&attr, sizeof(stack));
    qurt_thread_attr_set_priority(&attr, 100);

    qurt_thread_create(&tid, &attr, my_func, &my_arg);
    /* 若 my_func 的优先级(100)高于 main:main 在此处被抢占 */

    int status;
    qurt_thread_join(tid, &status);
    /* 阻塞直到 my_func 退出;若已退出则立即返回 */
    
    return 0;
}

my_func 开始运行时,内核已经设置好其寄存器,使得 arg 包含指向 my_arg 的指针。此时线程状态为 RUNNING。

当它调用 qurt_mutex_lock() 时,可能发生两种情况之一:如果互斥锁可用,线程成功获取并继续执行;如果互斥锁正被其他线程持有,则调用线程的状态变为 BLOCKED,其寄存器内容被保存到 TCB(线程控制块)中,调度器会选择下一个最高优先级的就绪线程运行。

当持有互斥锁的线程调用 qurt_mutex_unlock() 后,被阻塞的线程将重新进入 READY 状态,调度器会重新评估优先级并决定是否切换。

main 函数一侧,qurt_thread_create() 是否立即返回取决于 my_func 是否完成。如果 my_func 的优先级高于 main,调度器会立即抢占 main,此时 qurt_thread_create() 要等到 my_func 执行完毕(或阻塞)后才会返回。而 qurt_thread_join() 会阻塞 main 直到 my_func 退出,或者如果 my_func 已经退出,则立即返回。

关于栈大小的一个重要提示:如果你将 STACK_SIZE 设置得过小(例如 256 字节),而你的线程调用了 printf,就会导致 栈溢出。QuRT 不会为你检测栈溢出,崩溃将是静默的且难以诊断。始终为你的线程分配至少 8192 字节的栈空间,并在后续通过性能分析后再进行优化。

在模拟器上构建和运行

Hexagon SDK 提供了一个 make 包装脚本,其底层调用 SCons。以下两个命令效果相同:

code
# 选项 1:使用 make 包装脚本(内部调用 SCons)
cd $HEXAGON_SDK_ROOT
make V=hexagon_Release_dynamic_toolv84_v66 \
     tree=my_qurt_project

# 选项 2:直接调用 SCons
cd $HEXAGON_SDK_ROOT
python tools/build/scons/scons.py \
    V=hexagon_Release_dynamic_toolv84_v66 \
    my_qurt_project

这两个命令都会使用 v84 工具链以发布模式为 Hexagon v66 架构构建项目。make 包装脚本只是一个便利层:它解析 V=tree= 参数并将其转发给 SCons。直接使用 SCons 可以访问更多选项,例如用于并行构建的 --jobs=N 和用于输出完整编译器命令的 --verbose

code
# 在模拟器上运行
hexagon-sim --simulated_returnval \
    --cosim_file osam.cfg \
    -- bootimg.pbn \
    -- my_qurt_app.so

hexagon-sim 命令启动 QuRT 模拟器并加载你已编译的应用程序。--simulated_returnval 标志用于捕获你的 main 函数的返回值,--cosim_file 指向 QuRT 操作系统配置文件。

使用多个线程

真正的 QuRT 应用通常包含多个同时运行的线程。生产者-消费者模式是 DSP 编程中最常见的模式之一:一个线程从硬件读取数据,另一个线程处理这些数据。

code
#include <stdio.h>
#include <qurt.h>

#define STACK_SIZE    8192
#define BUFFER_SIZE   16
#define NUM_ITEMS     100

/* 线程栈 */
static char producer_stack[STACK_SIZE] __attribute__((aligned(8)));
static char consumer_stack[STACK_SIZE] __attribute__((aligned(8)));

/* 共享缓冲区 */
static int buffer[BUFFER_SIZE];
static int head = 0;
static int tail = 0;
static int count = 0;

/* 同步原语 */
qurt_mutex_t buffer_mutex;
qurt_cond_t  not_full;
qurt_cond_t  not_empty;

void producer_thread(void *arg)
{
    for (int i = 0; i < NUM_ITEMS; i++) {
        qurt_mutex_lock(&buffer_mutex);

        /* 等待缓冲区有空位 */
        while (count == BUFFER_SIZE) {
            qurt_cond_wait(&not_full, &buffer_mutex);
        }

        /* 生产一个项目 */
        buffer[head] = i;
        head = (head + 1) % BUFFER_SIZE;
        count++;

        printf("[Producer] Put item %d (buffer count: %d)\n", i, count);

        /* 通知消费者已有数据可用 */
        qurt_cond_signal(&not_empty);
        qurt_mutex_unlock(&buffer_mutex);
    }

    qurt_thread_exit(QURT_EOK);
}

void consumer_thread(void *arg)
{
    for (int i = 0; i < NUM_ITEMS; i++) {
        qurt_mutex_lock(&buffer_mutex);

        /* 等待缓冲区中有数据 */
        while (count == 0) {
            qurt_cond_wait(&not_empty, &buffer_mutex);
        }

        /* 消费一个项目 */
        int item = buffer[tail];
        tail = (tail + 1) % BUFFER_SIZE;
        count--;

        printf("[Consumer] Got item %d (buffer count: %d)\n", item, count);

        /* 通知生产者已有空间可用 */
        qurt_cond_signal(&not_full);
        qurt_mutex_unlock(&buffer_mutex);
    }

    qurt_thread_exit(QURT_EOK);
}

int main(void)
{
    qurt_thread_t producer, consumer;
    qurt_thread_attr_t attr;

    /* 在创建线程前初始化同步原语 */
    qurt_mutex_init(&buffer_mutex);
    qurt_cond_init(&not_full);
    qurt_cond_init(&not_empty);

    /* 创建生产者(更高优先级) */
    qurt_thread_attr_init(&attr);
    qurt_thread_attr_set_name(&attr, "producer");
    qurt_thread_attr_set_stack_addr(&attr, producer_stack);
    qurt_thread_attr_set_stack_size(&attr, STACK_SIZE);
    qurt_thread_attr_set_priority(&attr, 100);
    qurt_thread_create(&producer, &attr, producer_thread, NULL);

    /* 创建消费者(较低优先级) */
    qurt_thread_attr_init(&attr);
    qurt_thread_attr_set_name(&attr, "consumer");
    qurt_thread_attr_set_stack_addr(&attr, consumer_stack);
    qurt_thread_attr_set_stack_size(&attr, STACK_SIZE);
    qurt_thread_attr_set_priority(&attr, 110);
    qurt_thread_create(&consumer, &attr, consumer_thread, NULL);

    /* 等待两个线程完成 */
    int status;
    qurt_thread_join(producer, &status);
    qurt_thread_join(consumer, &status);

    /* 清理资源 */
    qurt_mutex_destroy(&buffer_mutex);
    qurt_cond_destroy(&not_full);
    qurt_cond_destroy(&not_empty);

    printf("All done! Produced and consumed %d items.\n", NUM_ITEMS);
    return 0;
}

此代码实现了一个经典的有界缓冲区生产者-消费者模式。共享缓冲区是一个由互斥锁保护的、包含 16 个整数的循环数组。生产者将项目写入缓冲区,消费者从中读取。

当缓冲区满时,生产者会在 not_full 条件变量上阻塞;当缓冲区为空时,消费者会在 not_empty 上阻塞。每一方在修改缓冲区后都会通知另一方。

生产者的优先级(100)高于消费者(110),这是有意为之。在实际的 DSP 场景中,生产者通常从硬件(如麦克风或传感器)读取数据。如果生产者错过了某个硬件采样,该数据将永久丢失。而消费者总可以稍后处理数据。这是一个通用的 RTOS 设计原则:绝不能让你面向硬件的线程饥饿。

同步原语

QuRT 提供了五种主要的同步机制:互斥锁(mutexes)、条件变量(condition variables)、信号(signals)、屏障(barriers)和信号量(semaphores)。

code
┌──────────────┬────────────────────────────────────────────────────┐
│ 原语         │ 使用场景                                           │
├──────────────┼────────────────────────────────────────────────────┤
│ 互斥锁       │ 保护共享数据免受并发访问                           │
│ 条件变量     │ “等待直到 X 为真”(始终与互斥锁配合使用)          │
│ 信号         │ 一个线程通知另一个线程(类似于轻拍某人提醒)       │
│ 屏障         │ “所有线程在此等待,直到全部到达”                   │
├──────────────┼────────────────────────────────────────────────────┤
│ 信号量       │ 控制对有限资源池的访问                             │
│              │ (例如,10 个线程共享 4 个 DMA 通道)              │
└──────────────┴────────────────────────────────────────────────────┘

此表格总结了每种原语及其主要用途。互斥锁用于确保对共享数据的独占访问。条件变量允许线程休眠,直到某个特定的数据条件为真,并且总是与互斥锁一起使用。信号提供线程间轻量级的一对一通知机制。屏障用于在某个共同点同步一组线程。信号量用于控制对 N 个相同资源组成的资源池的访问。

互斥锁

互斥锁确保同一时间只有一个线程可以进入临界区。QuRT 的互斥锁还支持通过 qurt_mutex_try_lock() 实现非阻塞获取。

code
qurt_mutex_t my_mutex;

void init_example(void)
{
    /* 使用前必须先初始化 */
    qurt_mutex_init(&my_mutex);
}

void critical_section_example(void)
{
    qurt_mutex_lock(&my_mutex);

    /* 同一时间仅有一个线程能执行此处代码 */
    shared_counter++;
    shared_buffer[index] = new_value;

    qurt_mutex_unlock(&my_mutex);
}

/* 非阻塞版本 */
void try_lock_example(void)
{
    int result = qurt_mutex_try_lock(&my_mutex);

    if (result == QURT_EOK) {
        shared_counter++;
        qurt_mutex_unlock(&my_mutex);
    } else {
        printf("忙,稍后重试\n");
    }
}

void cleanup_example(void)
{
    qurt_mutex_destroy(&my_mutex);
}

调用 qurt_mutex_lock() 会使当前线程阻塞,直到互斥锁可用并成功获取为止。qurt_mutex_try_lock() 尝试获取互斥锁,若成功则立即返回 QURT_EOK,否则返回错误码表示该锁已被占用。使用完互斥锁后,务必调用 qurt_mutex_destroy() 进行销毁。

QuRT 的互斥锁实现了 优先级继承。如果高优先级线程正在等待一个由低优先级线程持有的互斥锁,那么低优先级线程会临时提升至高优先级。这可以防止 优先级反转 —— 这个经典问题曾导致火星探路者号航天器在任务期间反复重启。

QuRT 会自动处理优先级继承,但你应意识到这一机制的存在,以免在调试时因线程优先级的意外变化而感到困惑。

信号

QuRT 中的信号是一种轻量级的通知机制。一个线程等待特定的信号位被置位,而另一个线程(或中断服务程序 ISR)则设置这些位以唤醒等待线程。

code
#include <qurt.h>

#define SIGNAL_DATA_READY   0x01
#define SIGNAL_STOP         0x02
#define SIGNAL_ERROR        0x04

qurt_signal_t my_signal;

void signal_init(void)
{
    qurt_signal_init(&my_signal);
}

/* 等待线程 */
void waiter_thread(void *arg)
{
    unsigned int received_signals;

    while (1) {
        /* 等待以下任意一个信号 */
        received_signals = qurt_signal_wait(
            &my_signal,
            SIGNAL_DATA_READY | SIGNAL_STOP | SIGNAL_ERROR,
            QURT_SIGNAL_ATTR_WAIT_ANY
        );

        if (received_signals & SIGNAL_STOP) {
            printf("收到停止信号,退出。\n");
            break;
        }

        if (received_signals & SIGNAL_DATA_READY) {
            printf("数据已就绪!开始处理...\n");
            process_data();
            /* 处理完成后清除信号位 */
            qurt_signal_clear(&my_signal, SIGNAL_DATA_READY);
        }

        if (received_signals & SIGNAL_ERROR) {
            printf("发生错误!正在处理...\n");
            handle_error();
            qurt_signal_clear(&my_signal, SIGNAL_ERROR);
        }
    }

    qurt_signal_destroy(&my_signal);
    qurt_thread_exit(QURT_EOK);
}

/* 发送信号的线程(或 ISR) */
void sender_thread(void *arg)
{
    prepare_data();
    qurt_signal_set(&my_signal, SIGNAL_DATA_READY);

    /* 稍后发送停止信号 */
    qurt_signal_set(&my_signal, SIGNAL_STOP);

    qurt_thread_exit(QURT_EOK);
}

等待线程调用 qurt_signal_wait() 并传入关心的信号位掩码。QURT_SIGNAL_ATTR_WAIT_ANY 表示只要任意一个指定的信号位被置位,线程就会被唤醒。发送方线程调用 qurt_signal_set() 来设置一个或多个信号位。处理完信号后,等待线程必须调用 qurt_signal_clear() 清除对应位。如果忘记清除信号位,下一次调用 qurt_signal_wait() 会立即返回,导致线程重复处理同一事件。

选择使用信号还是条件变量取决于具体场景。当需要在无关线程之间传递通知,或从中断服务程序发出通知时,信号更合适,因为它们更简单、开销更小。而当通知与特定数据状态相关(如缓冲区满、队列为空),并且需要互斥锁保护数据检查时,应使用条件变量。

屏障

屏障会阻塞所有参与的线程,直到每个线程都到达屏障点。这种机制在将计算划分为多个阶段、且每个阶段依赖前一阶段结果时非常有用。

code
#define NUM_WORKER_THREADS  4

qurt_barrier_t sync_barrier;

void worker_thread(void *arg)
{
    int thread_num = (int)(uintptr_t)arg;

    /* 第一阶段:每个线程独立计算部分结果 */
    printf("线程 %d:正在计算第一阶段...\n", thread_num);
    compute_partial_result(thread_num);

/* 所有线程在此等待,直到所有线程完成第一阶段 */ qurt_barrier_wait(&sync_barrier);

/* 第二阶段:所有部分结果已准备就绪,进行合并 */ printf("线程 %d:正在执行第二阶段...\n", thread_num); combine_results(thread_num);

qurt_thread_exit(QURT_EOK); }

int main(void) { qurt_barrier_init(&sync_barrier, NUM_WORKER_THREADS);

/* 创建工作线程 */ for (int i = 0; i < NUM_WORKER_THREADS; i++) { create_worker(i); }

join_all_workers();

qurt_barrier_destroy(&sync_barrier); return 0; }

code

屏障使用参与的线程数量进行初始化。每个线程在到达同步点时调用 `qurt_barrier_wait()`。该调用会阻塞,直到所有线程都到达。当最后一个线程调用 `qurt_barrier_wait()` 后,所有线程将同时被释放,并继续进入第二阶段。

### 信号量

信号量用于控制对 N 个相同资源池的访问。与互斥锁(即 N=1 的信号量)不同,信号量允许多达 N 个线程同时持有它。

#define MAX_DMA_CHANNELS 4

qurt_sem_t dma_semaphore;

void init_dma_pool(void) { /* 提供 4 个 DMA 通道 */ qurt_sem_init_val(&dma_semaphore, MAX_DMA_CHANNELS); }

void thread_needing_dma(void *arg) { /* 获取一个 DMA 通道(如果全部 4 个都在使用中则阻塞) */ qurt_sem_down(&dma_semaphore);

int channel = allocate_dma_channel(); perform_dma_transfer(channel); release_dma_channel(channel);

/* 释放信号量槽位 */ qurt_sem_up(&dma_semaphore);

qurt_thread_exit(QURT_EOK); }

code

信号量初始计数值为 4,对应于 DMA 通道的数量。每次调用 `qurt_sem_down()` 都会使计数减一,若计数变为零则阻塞。每次调用 `qurt_sem_up()` 都会使计数加一,并唤醒一个等待中的线程(如果有)。这确保了同时使用 DMA 通道的线程不会超过 4 个。

## 内存管理

DSP 上的内存是有限的。典型的 Hexagon DSP 拥有 256 KB 到 2 MB 的紧耦合内存(TCM),并可访问 DDR。QuRT 提供了有效管理这两种内存的工具。

### 内存布局

┌───────────────────────────────────┐ 高地址 │ DDR (与 ARM 共享) │ │ - 大型缓冲区 │ │ - 神经网络权重 │ │ - 音频/视频帧 │ ├───────────────────────────────────┤ │ QuRT 虚拟内存 │ │ - 用户堆 │ │ - 线程栈 │ ├───────────────────────────────────┤ │ L2 缓存 (TCM 模式) │ │ - 频繁访问的缓冲区 │ │ - 查找表 │ ├───────────────────────────────────┤ │ QuRT 内核 │ │ - 调度器、ISR 处理程序 │ │ - 系统数据结构 │ └───────────────────────────────────┘ 低地址

code

此图展示了从低地址到高地址的 Hexagon DSP 内存布局。QuRT 内核位于最低地址区域,用户代码无法访问。其上方是配置为 TCM 模式的 L2 缓存,用于存放热点数据。虚拟内存区域用于存放用户堆和线程栈。最上方的 DDR 与 ARM CPU 共享,用于存储大型数据缓冲区、机器学习模型权重和媒体帧。DDR 的容量远大于 TCM,但延迟更高。

### 动态内存分配

#include <qurt.h> #include <stdlib.h>

void memory_examples(void) { /* 标准 malloc/free 可正常工作(QuRT 提供了堆) */ int *data = (int *)malloc(1024 * sizeof(int)); if (!data) { printf("malloc 失败!堆内存不足。\n"); return; }

for (int i = 0; i < 1024; i++) { data[i] = i * 2; }

free(data); }

code

QuRT 提供了标准 C 堆,因此 `malloc` 和 `free` 可按预期工作。但 `malloc` 的执行时间不可预测,因为它可能需要搜索空闲链表、分割块或合并相邻的空闲区域。这使得它不适合实时关键路径,因为这些路径要求执行时间必须是确定性的。应仅在初始化和销毁阶段使用 `malloc`,避免在每帧或每个采样点进行分配。

### 缓存管理

在 Hexagon DSP 上,当与 ARM CPU 共享内存时,显式的缓存管理至关重要。

#include <qurt.h>

void cache_management_example(void) { void *buffer; size_t buffer_size = 4096;

/* 分配物理连续且缓存对齐的内存 */ int result = qurt_mem_region_create( &buffer, buffer_size, qurt_mem_default_pool, QURT_MEM_REGION_SHARED );

if (result != QURT_EOK) { printf("内存区域创建失败\n"); return; }

/* 在读取其他处理器(例如 ARM)写入的数据之前: */ qurt_mem_cache_clean(buffer, buffer_size, QURT_MEM_CACHE_INVALIDATE);

/* 从缓冲区读取数据... */

/* 在写入其他处理器将要读取的数据之后: */ fill_buffer_with_results(buffer, buffer_size); qurt_mem_cache_clean(buffer, buffer_size, QURT_MEM_CACHE_FLUSH); }

code

`qurt_mem_region_create()` 调用分配了一个物理连续的内存区域,适合与其他处理器共享。`QURT_MEM_REGION_SHARED` 标志将其标记为可用于跨处理器通信。

共享内存的缓存规则简单却至关重要:

1.  **读取前失效(Invalidate)**:确保你看到的是 ARM CPU 最新写入的数据,而不是陈旧的缓存项。

2.  **写入后刷新(Flush)**:确保你的更改能被 ARM CPU 看到,而不是主存中的旧内容。

忽略这些操作会导致逻辑正确但运行异常的 bug,因为代码操作的是过时的数据。

### 用于可预测分配的内存池

内存池提供 O(1) 时间复杂度的分配,适用于实时关键路径。

#include <qurt.h>

#define BLOCK_SIZE 256 #define NUM_BLOCKS 32

c
/* 内存池的内存是静态分配的,以保证确定性 */
static char pool_memory[BLOCK_SIZE * NUM_BLOCKS] __attribute__((aligned(8)));
static qurt_mem_pool_t my_pool;

void pool_init(void)
{
    qurt_mem_pool_create(&my_pool, pool_memory,
                          BLOCK_SIZE * NUM_BLOCKS,
                          BLOCK_SIZE);
}

void *pool_alloc(void)
{
    void *block = qurt_mem_pool_alloc(&my_pool);
    if (!block) {
        printf("内存池已耗尽!\n");
    }
    return block;
}

void pool_free(void *block)
{
    qurt_mem_pool_free(&my_pool, block);
}

这段代码创建了一个包含 32 个块的内存池,每个块大小为 256 字节。内存池的内存是静态分配的,从而避免在运行时依赖 malloc

qurt_mem_pool_alloc() 能在常数时间内分配一个内存块,qurt_mem_pool_free() 也能在常数时间内释放它。如果内存池已耗尽,分配操作将返回 NULL,而不会阻塞或在其他地方搜索内存。

这种确定性使得内存池成为音频处理循环、传感器数据处理器以及任何有严格截止时间要求的代码的理想选择。

定时器与时间控制

QuRT 提供了基于硬件的定时器,用于实现精确计时。这对 DSP 处理至关重要:例如,如果你以 48 kHz 的频率处理音频,则必须每 10.67 毫秒获取一个新的缓冲区,不能有任何例外。

单次定时器(One-Shot Timer)

code
#include <qurt.h>
#include <qurt_timer.h>

qurt_timer_t my_timer;
qurt_signal_t timer_signal;

#define TIMER_EXPIRED_SIGNAL  0x01

void timer_example(void)
{
    qurt_signal_init(&timer_signal);

    qurt_timer_attr_t attr;
    qurt_timer_attr_init(&attr);

    /* 设置定时器持续时间:10 毫秒 */
    qurt_timer_attr_set_duration(&attr,
        qurt_timer_convert_time_to_ticks(10000,  /* 微秒 */
                                          QURT_TIME_USEC));

    /* 设置定时器到期时触发的信号 */
    qurt_timer_attr_set_signal(&attr, &timer_signal);
    qurt_timer_attr_set_signal_mask(&attr, TIMER_EXPIRED_SIGNAL);

    /* 单次触发:仅触发一次 */
    qurt_timer_attr_set_type(&attr, QURT_TIMER_ONESHOT);

    /* 创建并启动定时器 */
    qurt_timer_create(&my_timer, &attr);

    /* 等待定时器到期 */
    qurt_signal_wait(&timer_signal,
                      TIMER_EXPIRED_SIGNAL,
                      QURT_SIGNAL_ATTR_WAIT_ANY);

    printf("定时器已到期!10ms 已过去。\n");
    qurt_signal_clear(&timer_signal, TIMER_EXPIRED_SIGNAL);

    /* 清理资源 */
    qurt_timer_delete(my_timer);
    qurt_signal_destroy(&timer_signal);
}

该代码创建一个单次定时器,在 10 毫秒后触发。定时器通过属性结构体进行配置,指定其持续时间、要通知的信号对象、信号掩码以及定时器类型(QURT_TIMER_ONESHOT)。当定时器到期时,会设置指定的信号位,唤醒在 qurt_signal_wait() 中阻塞的线程。事件处理完成后,线程清除信号并清理定时器资源。

周期性定时器(Periodic Timer)

code
void periodic_timer_thread(void *arg)
{
    qurt_timer_t periodic_timer;
    qurt_signal_t periodic_signal;
    qurt_timer_attr_t attr;

    qurt_signal_init(&periodic_signal);
    qurt_timer_attr_init(&attr);

    /* 每 1 毫秒触发一次 */
    qurt_timer_attr_set_duration(&attr,
        qurt_timer_convert_time_to_ticks(1000, QURT_TIME_USEC));
    qurt_timer_attr_set_signal(&attr, &periodic_signal);
    qurt_timer_attr_set_signal_mask(&attr, 0x01);
    qurt_timer_attr_set_type(&attr, QURT_TIMER_PERIODIC);

    qurt_timer_create(&periodic_timer, &attr);

    int iteration = 0;
    while (iteration < 1000) {
        qurt_signal_wait(&periodic_signal, 0x01,
                          QURT_SIGNAL_ATTR_WAIT_ANY);
        qurt_signal_clear(&periodic_signal, 0x01);

        /* 每毫秒执行一次 */
        process_audio_frame(iteration);
        iteration++;
    }

    qurt_timer_delete(periodic_timer);
    qurt_signal_destroy(&periodic_signal);
    qurt_thread_exit(QURT_EOK);
}

周期性定时器使用 QURT_TIMER_PERIODIC 而非 QURT_TIMER_ONESHOT,会在设定的时间间隔内重复触发。本例中,定时器以 1 毫秒为间隔运行 1000 次迭代,每次“滴答”处理一帧音频数据。每次迭代后必须清除信号,否则下一次 qurt_signal_wait() 将立即返回。

读取当前时间

code
void timing_example(void)
{
    unsigned long long start_ticks = qurt_sysclock_get_hw_ticks();

    heavy_computation();

    unsigned long long end_ticks = qurt_sysclock_get_hw_ticks();
    unsigned long long elapsed_ticks = end_ticks - start_ticks;

    unsigned long long elapsed_us =
        qurt_timer_convert_ticks_to_time(elapsed_ticks, QURT_TIME_USEC);

    printf("计算耗时 %llu 微秒\n", elapsed_us);
}

qurt_sysclock_get_hw_ticks() 读取硬件周期计数器,提供 DSP 上可用的最高精度时间信息。qurt_timer_convert_ticks_to_time() 将原始的周期数转换为人类可读的单位(本例中为微秒)。使用此模式对单个函数进行性能分析,识别性能瓶颈。

中断处理

在 DSP 上,中断是硬件表明其需要关注的方式。QuRT 提供了一种基于线程的中断模型,相比裸机环境下的传统 ISR 处理程序更加结构化。

code
#include <qurt.h>
#include <qurt_interrupt.h>

#define MY_SENSOR_IRQ      42
#define IRQ_SIGNAL         0x01

static qurt_signal_t irq_signal;

void sensor_isr_thread(void *arg)
{
    int irq = MY_SENSOR_IRQ;

    /* 将此线程注册为 IRQ 42 的处理程序 */
    qurt_interrupt_register(irq, &irq_signal, IRQ_SIGNAL);

    printf("传感器 ISR 线程就绪,等待中断...\n");

    while (1) {
        /* 阻塞,直到硬件中断触发 */
        unsigned int sigs = qurt_signal_wait(
            &irq_signal, IRQ_SIGNAL, QURT_SIGNAL_ATTR_WAIT_ANY);

        if (sigs & IRQ_SIGNAL) {
            qurt_signal_clear(&irq_signal, IRQ_SIGNAL);

/* 快速读取传感器数据 */ int sensor_value = read_sensor_register();

/* 将数据放入队列供处理线程使用 */ enqueue_sensor_data(sensor_value);

/* 通知处理线程 */ qurt_signal_set(&processing_signal, DATA_READY);

/* 重新使能中断 */ qurt_interrupt_acknowledge(irq); } } }

code

QuRT 的中断服务例程(ISR)与裸机环境下的 ISR 不同。它们运行在专用的线程上下文中,这意味着你可以在其中使用互斥锁和信号量。但 ISR 线程仍应尽可能少地工作:读取硬件寄存器、将数据入队、通知处理线程并确认中断。所有开销较大的计算都应在单独的、优先级较低的处理线程中进行。

硬件中断 │ ▼ ISR 线程(高优先级) 处理线程(中等优先级) ┌──────────────────┐ ┌──────────────────────────┐ │ 读取硬件寄存器 │ │ 等待 DATA_READY │ │ 数据入队 │ ──► │ 出队数据 │ │ 发送“就绪”信号 │ │ 执行 FFT / 滤波 / 推理等 │ │ 确认中断 │ │ 写入结果 │ └──────────────────┘ └──────────────────────────┘

code

该图展示了 ISR 卸载模式。左侧的 ISR 线程以最小延迟响应硬件中断:读取传感器寄存器、将原始数据入队、通知处理线程,并确认中断以便下次触发。右侧的处理线程则以较低优先级执行耗时操作(如 FFT、滤波、机器学习推理)。

这种设计确保了即使处理线程仍在处理前一个样本,ISR 线程也能随时响应下一个硬件中断。

## 管道与消息队列

QuRT 提供了内置的管道支持,用于安全、结构化的线程间通信。管道是固定大小的消息队列,支持阻塞式的发送和接收操作。

#include <qurt.h> #include <qurt_pipe.h>

#define PIPE_ELEMENTS 16 #define ELEMENT_SIZE sizeof(sensor_msg_t)

typedef struct { int sensor_id; int value; unsigned long long timestamp; } sensor_msg_t;

/* 管道缓冲区需由用户分配 */ static char pipe_buffer[PIPE_ELEMENTS * ELEMENT_SIZE] __attribute__((aligned(8)));

qurt_pipe_t sensor_pipe;

void pipe_init(void) { qurt_pipe_attr_t attr; qurt_pipe_attr_init(&attr); qurt_pipe_attr_set_buffer(&attr, pipe_buffer); qurt_pipe_attr_set_buffer_partition(&attr, PIPE_ELEMENTS); qurt_pipe_attr_set_elements(&attr, PIPE_ELEMENTS); qurt_pipe_attr_set_element_size(&attr, ELEMENT_SIZE);

qurt_pipe_create(&sensor_pipe, &attr); }

/* 生产者:将传感器数据发送到管道中 */ void sensor_reader_thread(void *arg) { while (1) { sensor_msg_t msg; msg.sensor_id = 1; msg.value = read_accelerometer(); msg.timestamp = qurt_sysclock_get_hw_ticks();

/* 阻塞式发送:如果管道已满则等待 */ qurt_pipe_send(&sensor_pipe, (char *)&msg, ELEMENT_SIZE); } }

/* 消费者:从管道中接收传感器数据 */ void data_processor_thread(void *arg) { sensor_msg_t msg;

while (1) { /* 阻塞式接收:如果管道为空则等待 */ qurt_pipe_receive(&sensor_pipe, (char *)&msg, ELEMENT_SIZE);

printf("传感器 %d: 值=%d 时间戳=%llu\n", msg.sensor_id, msg.value, msg.timestamp);

process_sensor_reading(&msg); } }

code

QuRT 管道通过静态分配的缓冲区、元素数量和元素大小进行配置。与栈类似,缓冲区内存由开发者负责管理。`qurt_pipe_send()` 将消息复制进管道,若管道已满则阻塞;`qurt_pipe_receive()` 从管道复制出消息,若管道为空则阻塞。管道内部自动处理所有同步逻辑,因此无需额外的互斥锁。

管道非常适用于此处所示的传感器数据模式:读取线程以固定频率采样硬件并将消息推入管道,而处理线程从中取出消息并进行处理。管道自动提供缓冲和背压机制。

## QuRT 与 FastRPC

在真实的高通设备中,你很少会单独使用 QuRT。通常情况下,运行在 ARM CPU 上的 Android 或 Linux 应用程序会通过 **FastRPC**(快速远程过程调用)将计算密集型任务卸载到 DSP 上执行。下图展示了完整的处理流水线:

┌───────────────────────────────────────────────────────────────┐ │ ARM CPU 侧 │ │ │ │ your_app.c │ │ ┌───────────────────────────────────────────────────┐ │ │ │ #include "my_dsp_module.h" // 自动生成 │ │ │ │ │ │ │ │ // 这看起来像一个普通的函数调用, │ │ │ │ // 但实际上它是在 DSP 上执行的! │ │ │ │ result = my_dsp_module_process_audio( │ │ │ │ input_buffer, output_buffer, num_samples); │ │ │ └───────────────────┬───────────────────────────────┘ │ │ │ FastRPC │ └───────────────────────┼───────────────────────────────────────┘ (跨越处理器边界) ┌───────────────────────┼───────────────────────────────────────┐ │ ▼ │ │ DSP 侧 (QuRT) │ │ my_dsp_module_skel.c // 自动生成的骨架文件 │ │ ┌───────────────────────────────────────────────────┐ │ │ │ int my_dsp_module_process_audio( │ │ │ │ const int16_t *input, │ │ │ │ int16_t *output, │ │ │ │ int num_samples) │ │ │ │ { │ │ │ │ // 在 QuRT 下的 Hexagon DSP 上运行 │ │ │ │ apply_noise_reduction(input, output, │ │ │ │ num_samples); │ │ │ │ return 0; │ │ │ │ } │ │ │ └───────────────────────────────────────────────────┘ │ └───────────────────────────────────────────────────────────────┘

code

此图展示了 FastRPC 架构。在 ARM CPU 一侧,你的应用程序调用一个看似普通 C 函数的接口。实际上,FastRPC 会将参数序列化,通过处理器边界发送到 Hexagon DSP,在 QuRT 环境下执行该函数,并将结果返回。对程序员而言,这就像一次透明的远程过程调用。

### 步骤 1:定义接口(IDL 文件)

创建一个 `.idl` 文件,描述 ARM 可以在 DSP 上调用的函数:

/* my_dsp_module.idl */ #include "remote.idl" #include "AEEStdDef.idl"

interface my_dsp_module {

/* 简单计算 */ long process_audio( in sequence<short> input, rout sequence<short> output, in long num_samples );

/* 矩阵乘法卸载 */ long matrix_multiply( in sequence<float> mat_a, in sequence<float> mat_b, rout sequence<float> result, in long rows_a, in long cols_a, in long cols_b ); };

code

IDL(接口定义语言)文件定义了跨处理器的 API。每个函数通过方向修饰符指定参数流向:`in` 表示数据从 ARM 流向 DSP,`rout` 表示数据从 DSP 返回给 ARM。`sequence<type>` 语法表示可变长度数组。Hexagon SDK 的 IDL 编译器会根据此定义为 ARM 侧生成桩代码(stub),为 DSP 侧生成骨架代码(skeleton)。

### 步骤 2:实现 DSP 端

/* my_dsp_module_imp.c - DSP 实现 */

#include "my_dsp_module.h" #include <qurt.h> #include <stdio.h>

int my_dsp_module_process_audio( const int16_t *input, int input_len, int16_t *output, int output_len, int num_samples) { if (!input || !output || num_samples <= 0) { return -1; }

/* 使缓存失效:ARM 写入了这些数据 */ qurt_mem_cache_clean((void *)input, num_samples * sizeof(int16_t), QURT_MEM_CACHE_INVALIDATE);

/* 在 DSP 上处理 */ for (int i = 0; i < num_samples; i++) { /* 简单的噪声门限 */ if (abs(input[i]) < 100) { output[i] = 0; } else { output[i] = input[i]; } }

/* 刷新缓存:ARM 将读取这些数据 */ qurt_mem_cache_clean(output, num_samples * sizeof(int16_t), QURT_MEM_CACHE_FLUSH);

return 0; }

code

DSP 实现接收由 ARM CPU 写入的输入缓冲区。在读取之前,代码使缓存失效,以确保 DSP 能从主存中获取最新数据,而不是使用过时的缓存内容。写入输出后,代码刷新缓存,以确保 ARM CPU 能看到 DSP 的结果。实际处理逻辑(本例中是一个简单的噪声门)位于缓存操作之间执行。

### 步骤 3:实现 ARM 端

/* main_arm.c - ARM/Android 应用程序 */

#include <stdio.h> #include <stdlib.h> #include <rpcmem.h> #include "my_dsp_module.h"

int main(void) { int num_samples = 1024;

/* 使用 ION 内存实现与 DSP 的零拷贝共享 */ rpcmem_init();

int16_t *input = (int16_t *)rpcmem_alloc( RPCMEM_HEAP_ID_SYSTEM, RPCMEM_DEFAULT_FLAGS, num_samples * sizeof(int16_t));

int16_t *output = (int16_t *)rpcmem_alloc( RPCMEM_HEAP_ID_SYSTEM, RPCMEM_DEFAULT_FLAGS, num_samples * sizeof(int16_t));

if (!input || !output) { printf("rpcmem_alloc 失败!\n"); return -1; }

/* 填充音频数据 */ for (int i = 0; i < num_samples; i++) { input[i] = (int16_t)(i % 256); }

/* 此调用通过 FastRPC 发送到 DSP */ int result = my_dsp_module_process_audio( input, num_samples, output, num_samples, num_samples);

code

if (result != 0) {
        printf("DSP 处理失败: %d\n", result);
    } else {
        printf("DSP 处理成功!\n");
        printf("前 10 个输出样本: ");
        for (int i = 0; i < 10; i++) {
            printf("%d ", output[i]);
        }
        printf("\n");
    }

    rpcmem_free(input);
    rpcmem_free(output);
    rpcmem_deinit();

    return 0;
}

ARM 端使用 rpcmem_alloc() 分配 ION 内存,这是一种可由 ARM CPU 和 Hexagon DSP 共同访问的共享内存区域,无需数据拷贝。对 my_dsp_module_process_audio() 的调用看起来像普通的函数调用,但 FastRPC 会透明地将其路由到 DSP 上执行。当调用返回时,输出缓冲区中即包含 DSP 的处理结果。

构建完整项目

一个 FastRPC 项目需要进行两次 SCons 构建:一次针对 ARM CPU 端,另一次针对 Hexagon DSP 端。每一端都有自己的 .min 文件(android.minhexagon.min),均由 SDK 的 SConstruct 脚本处理。

code
cd $HEXAGON_SDK_ROOT

# 通过 make 包装器构建 ARM 目标(Android)
make V=android_Release tree=my_dsp_module

# 通过 make 包装器构建 Hexagon DSP 目标
make V=hexagon_Release_dynamic_toolv84_v66 tree=my_dsp_module

# 或者直接调用 SCons 构建两个变体
python tools/build/scons/scons.py \
    V=android_Release \
    V=hexagon_Release_dynamic_toolv84_v66 \
    my_dsp_module

# 推送到设备
adb push android_Release/ship/my_dsp_module /data/local/tmp/
adb push hexagon_Release_dynamic_toolv84_v66/ship/libmy_dsp_module_skel.so \
    /data/local/tmp/

# 运行程序
adb shell "cd /data/local/tmp && ./my_dsp_module"

构建过程会产生两个输出:一个是 ARM 可执行文件(由桩代码和你的 main_arm.c 编译而成),另一个是 Hexagon 共享库(即 _skel.so 文件,由你的 DSP 实现代码编译而成)。SCons 会自动处理 IDL 编译步骤:检测 .idl 文件,生成对应的桩和骨架 C 源文件,并将其包含在相应的构建变体中。两个输出都需要推送到设备上。

当 ARM 可执行文件运行并调用 FastRPC 函数时,系统会将骨架库加载到 DSP 上,并通过该库转发调用。

构建传感器融合流水线

本节将线程、同步、定时器和信号结合在一起,构建一个完整且贴近实际的 QuRT 应用程序。该流水线从三个模拟传感器(加速度计、陀螺仪、磁力计)读取数据,使用互补滤波器融合数据,并以 100 Hz 的频率报告姿态信息。

code
/*
 * sensor_fusion.c - 在 QuRT 上实现多传感器融合流水线
 *
 * 架构:
 *   [Accel ISR] ──► [Fusion Thread] ──► [Report Thread]
 *   [Gyro ISR]  ──►       ▲
 *   [Mag ISR]   ──►       │
 *                    [Timer Thread]
 *                    (每 10ms 触发一次融合)
 */

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <qurt.h>
#include <qurt_timer.h>

/* 配置 */
#define STACK_SIZE          8192
#define FUSION_PERIOD_US    10000   /* 10ms = 100Hz 融合频率 */
#define QUEUE_DEPTH         32

/* 数据类型 */
typedef struct {
    float x, y, z;
    unsigned long long timestamp;
} vec3_sample_t;

typedef struct {
    vec3_sample_t accel;
    vec3_sample_t gyro;
    vec3_sample_t mag;
    float roll, pitch, yaw;
} fused_state_t;

/* 线程栈 */
static char accel_stack[STACK_SIZE]  __attribute__((aligned(8)));
static char gyro_stack[STACK_SIZE]   __attribute__((aligned(8)));
static char mag_stack[STACK_SIZE]    __attribute__((aligned(8)));
static char fusion_stack[STACK_SIZE] __attribute__((aligned(8)));
static char report_stack[STACK_SIZE] __attribute__((aligned(8)));

/* 共享状态 */
static vec3_sample_t latest_accel;
static vec3_sample_t latest_gyro;
static vec3_sample_t latest_mag;
static fused_state_t latest_fused;

static qurt_mutex_t sensor_mutex;
static qurt_mutex_t fused_mutex;
static qurt_signal_t fusion_signal;
static qurt_signal_t report_signal;

#define SIG_FUSION_TICK        0x01
#define SIG_NEW_FUSED_DATA     0x01
#define SIG_SHUTDOWN           0x80

static volatile int running = 1;

/* 模拟传感器读数 */
static void read_accelerometer(vec3_sample_t *sample)
{
    sample->x = 0.01f;
    sample->y = 0.02f;
    sample->z = 9.81f;
    sample->timestamp = qurt_sysclock_get_hw_ticks();
}

static void read_gyroscope(vec3_sample_t *sample)
{
    sample->x = 0.001f;
    sample->y = -0.002f;
    sample->z = 0.0005f;
    sample->timestamp = qurt_sysclock_get_hw_ticks();
}

static void read_magnetometer(vec3_sample_t *sample)
{
    sample->x = 25.0f;
    sample->y = -5.0f;
    sample->z = 40.0f;
    sample->timestamp = qurt_sysclock_get_hw_ticks();
}

/* 加速度计线程 */
void accel_thread(void *arg)
{
    printf("[Accel] 线程已启动\n");

    while (running) {
        vec3_sample_t sample;
        read_accelerometer(&sample);

        qurt_mutex_lock(&sensor_mutex);
        latest_accel = sample;
        qurt_mutex_unlock(&sensor_mutex);

        /* ~400Hz 采样率 */
        qurt_timer_sleep(2500);
    }

    printf("[Accel] 线程退出\n");
    qurt_thread_exit(QURT_EOK);
}

/* 陀螺仪线程 */
void gyro_thread(void *arg)
{
    printf("[Gyro] 线程已启动\n");

    while (running) {
        vec3_sample_t sample;
        read_gyroscope(&sample);

        qurt_mutex_lock(&sensor_mutex);
        latest_gyro = sample;
        qurt_mutex_unlock(&sensor_mutex);

        /* 1kHz 采样率 */
        qurt_timer_sleep(1000);
    }

    printf("[Gyro] 线程退出\n");
    qurt_thread_exit(QURT_EOK);
}

/* 磁力计线程 */
void mag_thread(void *arg)
{
    printf("[Mag] 线程已启动\n");

    while (running) {
        vec3_sample_t sample;
        read_magnetometer(&sample);

        qurt_mutex_lock(&sensor_mutex);
        latest_mag = sample;
        qurt_mutex_unlock(&sensor_mutex);

        /* 100Hz 采样率 */
        qurt_timer_sleep(10000);
    }

    printf("[Mag] 线程退出\n");
    qurt_thread_exit(QURT_EOK);
}
c
/* 简化的互补滤波器 */
static void compute_orientation(
    const vec3_sample_t *accel,
    const vec3_sample_t *gyro,
    const vec3_sample_t *mag,
    fused_state_t *state)
{
    float dt = 0.01f;

    float accel_roll = atan2f(accel->y, accel->z) * 57.2958f;
    float accel_pitch = atan2f(-accel->x,
        sqrtf(accel->y * accel->y + accel->z * accel->z)) * 57.2958f;

    /* 短期信任陀螺仪,长期信任加速度计 */
    state->roll = 0.98f * (state->roll + gyro->x * dt * 57.2958f)
                + 0.02f * accel_roll;
    state->pitch = 0.98f * (state->pitch + gyro->y * dt * 57.2958f)
                 + 0.02f * accel_pitch;

    state->yaw = atan2f(mag->y, mag->x) * 57.2958f;

    state->accel = *accel;
    state->gyro = *gyro;
    state->mag = *mag;
}

/* 融合线程(每10ms运行一次) */
void fusion_thread(void *arg)
{
    qurt_timer_t fusion_timer;
    qurt_timer_attr_t timer_attr;

    printf("[Fusion] 线程已启动\n");

    qurt_timer_attr_init(&timer_attr);
    qurt_timer_attr_set_duration(&timer_attr,
        qurt_timer_convert_time_to_ticks(FUSION_PERIOD_US,
                                          QURT_TIME_USEC));
    qurt_timer_attr_set_signal(&timer_attr, &fusion_signal);
    qurt_timer_attr_set_signal_mask(&timer_attr, SIG_FUSION_TICK);
    qurt_timer_attr_set_type(&timer_attr, QURT_TIMER_PERIODIC);

    qurt_timer_create(&fusion_timer, &timer_attr);

    while (running) {
        unsigned int sigs = qurt_signal_wait(
            &fusion_signal,
            SIG_FUSION_TICK | SIG_SHUTDOWN,
            QURT_SIGNAL_ATTR_WAIT_ANY);

        if (sigs & SIG_SHUTDOWN) break;

        qurt_signal_clear(&fusion_signal, SIG_FUSION_TICK);

        /* 在锁保护下获取传感器数据快照 */
        vec3_sample_t a, g, m;
        qurt_mutex_lock(&sensor_mutex);
        a = latest_accel;
        g = latest_gyro;
        m = latest_mag;
        qurt_mutex_unlock(&sensor_mutex);

        /* 运行融合算法(无需锁,使用本地数据) */
        fused_state_t state;
        qurt_mutex_lock(&fused_mutex);
        state = latest_fused;
        qurt_mutex_unlock(&fused_mutex);

        compute_orientation(&a, &g, &m, &state);

        /* 发布融合结果 */
        qurt_mutex_lock(&fused_mutex);
        latest_fused = state;
        qurt_mutex_unlock(&fused_mutex);

        /* 通知上报线程 */
        qurt_signal_set(&report_signal, SIG_NEW_FUSED_DATA);
    }

    qurt_timer_delete(fusion_timer);
    printf("[Fusion] 线程退出\n");
    qurt_thread_exit(QURT_EOK);
}

/* 上报线程 */
void report_thread(void *arg)
{
    int report_count = 0;

    printf("[Report] 线程已启动\n");

    while (running) {
        unsigned int sigs = qurt_signal_wait(
            &report_signal,
            SIG_NEW_FUSED_DATA | SIG_SHUTDOWN,
            QURT_SIGNAL_ATTR_WAIT_ANY);

        if (sigs & SIG_SHUTDOWN) break;

        qurt_signal_clear(&report_signal, SIG_NEW_FUSED_DATA);

        fused_state_t state;
        qurt_mutex_lock(&fused_mutex);
        state = latest_fused;
        qurt_mutex_unlock(&fused_mutex);

        /* 每100次更新上报一次(在100Hz下为每秒一次) */
        if (++report_count % 100 == 0) {
            printf("[Report] 姿态 - 横滚: %.2f  俯仰: %.2f  "
                   "偏航: %.2f  (更新 #%d)\n",
                   state.roll, state.pitch, state.yaw, report_count);
        }
    }

    printf("[Report] 线程退出\n");
    qurt_thread_exit(QURT_EOK);
}

/* 主函数 */
int main(void)
{
    qurt_thread_t threads[5];
    qurt_thread_attr_t attr;
    int status;

    printf("=== 传感器融合流水线启动 ===\n");

    /* 初始化同步原语 */
    qurt_mutex_init(&sensor_mutex);
    qurt_mutex_init(&fused_mutex);
    qurt_signal_init(&fusion_signal);
    qurt_signal_init(&report_signal);
    memset(&latest_fused, 0, sizeof(latest_fused));

    struct {
        const char *name;
        char *stack;
        int priority;
        void (*func)(void *);
    } thread_configs[] = {
        {"accel_reader", accel_stack,  60, accel_thread},
        {"gyro_reader",  gyro_stack,   60, gyro_thread},
        {"mag_reader",   mag_stack,    70, mag_thread},
        {"fusion",       fusion_stack, 80, fusion_thread},
        {"reporter",     report_stack, 120, report_thread},
    };

    /* 创建所有线程 */
    for (int i = 0; i < 5; i++) {
        qurt_thread_attr_init(&attr);
        qurt_thread_attr_set_name(&attr, thread_configs[i].name);
        qurt_thread_attr_set_stack_addr(&attr, thread_configs[i].stack);
        qurt_thread_attr_set_stack_size(&attr, STACK_SIZE);
        qurt_thread_attr_set_priority(&attr, thread_configs[i].priority);

        int result = qurt_thread_create(&threads[i], &attr,
                                         thread_configs[i].func, NULL);
        if (result != QURT_EOK) {
            printf("创建线程 '%s' 失败: %d\n",
                   thread_configs[i].name, result);
            return -1;
        }
        printf("创建线程 '%s' (优先级 %d)\n",
               thread_configs[i].name, thread_configs[i].priority);
    }

    /* 运行10秒钟 */
    printf("流水线运行10秒钟...\n");
    qurt_timer_sleep(10000000);

    /* 关闭系统 */
    printf("正在关闭...\n");
    running = 0;
    qurt_signal_set(&fusion_signal, SIG_SHUTDOWN);
    qurt_signal_set(&report_signal, SIG_SHUTDOWN);

    /* 等待所有线程结束 */
    for (int i = 0; i < 5; i++) {
        qurt_thread_join(threads[i], &status);
    }

    /* 清理资源 */
    qurt_mutex_destroy(&sensor_mutex);
    qurt_mutex_destroy(&fused_mutex);
    qurt_signal_destroy(&fusion_signal);
    qurt_signal_destroy(&report_signal);

    printf("=== 传感器融合流水线完成 ===\n");
    return 0;
}

该流水线展示了多个 QuRT 模式协同工作的情况。

三个传感器读取线程以最高优先级运行(加速度计和陀螺仪为 60,较慢的磁力计为 70),并在互斥锁保护下持续将最新的采样数据写入共享状态。

一个融合线程由周期性定时器每 10 毫秒触发一次,它会捕获所有三个传感器的读数,运行互补滤波器来计算横滚(roll)、俯仰(pitch)和偏航(yaw),然后发布融合后的结果。

一个报告线程以最低优先级(120)运行,每次有新的融合数据可用时都会收到信号,并每秒记录一次姿态信息。

优先级分配

code
优先级 60:传感器读取线程(最高优先级,绝不丢失硬件数据)
优先级 80:融合引擎(每 10ms 运行一次,必须快速完成)
优先级 120:报告线程(最低优先级,仅用于日志记录)

优先级分配遵循一条严格规则:越接近硬件的线程获得越高的优先级。如果融合线程执行时间过长,报告线程会等待,这是可以接受的,因为延迟的日志消息不会造成实时影响。但如果传感器读取被延迟,则融合算法将基于过期数据进行运算。

在控制无人机或机器人的实际应用中,过期的 IMU 数据意味着错误的姿态估计,可能导致物理上的故障。

调试 QuRT 应用程序

QuRT 的调试能力比 Linux 调试更有限。没有带 TUI 的 gdb,崩溃时的错误消息通常也无济于事。以下技术构成了一套实用的调试工具包。

使用 Printf 调试

code
#include <stdio.h>

void debug_example(void)
{
    printf("[%s:%d] value = %d\n", __func__, __LINE__, some_var);
}

QuRT 通过半主机机制支持 printf。在模拟器上,输出会发送到 stdout;在硬件上,则进入 DIAG 缓冲区(类似于 Android 的 logcat)。这是 QuRT 开发中最常见的调试方法。

QuRT 错误码

code
switch (result) {
    case QURT_EOK:
        break;
    case QURT_EINVALID:
        printf("无效参数\n");
        break;
    case QURT_EFAILED:
        printf("通用失败\n");
        break;
    case QURT_EMEM:
        printf("内存不足\n");
        break;
    case QURT_ENOTALLOWED:
        printf("操作不允许(检查权限)\n");
        break;
    case QURT_ETIMEOUT:
        printf("操作超时\n");
        break;
    default:
        printf("未知错误: %d\n", result);
}

始终检查 QuRT API 调用的返回值。这些是你最常遇到的错误代码。

QURT_EINVALID 通常表示参数错误(栈未对齐、空指针、优先级超出范围等)。QURT_EMEM 表示内核用于内部结构的内存已耗尽。QURT_ENOTALLOWED 通常表明硬件上存在权限问题。

线程状态检查

code
void dump_thread_info(void)
{
    qurt_thread_t tid = qurt_thread_get_id();
    char name[QURT_THREAD_ATTR_NAME_MAXLEN];

    qurt_thread_get_name(name, sizeof(name));

    printf("线程: %s (ID: %lu)\n", name, tid);
}

此函数打印当前线程的名称和 ID,在多个线程向同一日志输出写入内容且需要区分每条消息来源时非常有用。

栈溢出检测

code
#define STACK_CANARY 0xDEADBEEF

static char my_stack[STACK_SIZE] __attribute__((aligned(8)));

void init_stack_canary(void)
{
    /* 在栈底写入金丝雀值 */
    ((unsigned int *)my_stack)[0] = STACK_CANARY;
    ((unsigned int *)my_stack)[1] = STACK_CANARY;
}

void check_stack_canary(void)
{
    if (((unsigned int *)my_stack)[0] != STACK_CANARY ||
        ((unsigned int *)my_stack)[1] != STACK_CANARY) {
        printf("检测到栈溢出!\n");
    }
}

QuRT 不会自动检测栈溢出。这种“金丝雀”模式在线程启动前在栈底部写入一个已知值。如果栈向下增长并超出其边界,就会覆盖该金丝雀值。定期检查金丝雀(或在线程退出时检查)可以捕获此类溢出,否则这些溢出会导致难以追踪的、看似无关的崩溃。

使用 Hexagon 模拟器

code
# 启用指令跟踪运行
hexagon-sim --timing --pmu_statsfile stats.txt \
    --cosim_file osam.cfg \
    -- bootimg.pbn -- my_app.so

# 统计文件提供以下信息:
# - 总周期数
# - 缓存命中/未命中率
# - 停顿周期数
# - 每周期指令数(IPC)

--timing 标志启用周期精确模拟,--pmu_statsfile 将性能计数器数据写入文件。统计文件报告总周期数、缓存命中与未命中率、停顿周期以及每周期指令数(IPC)。这些数据对于判断瓶颈是计算密集型、内存密集型还是停顿密集型至关重要。

常见陷阱

陷阱 1:忘记退出线程

code
/* 错误:线程函数返回但未退出 */
void bad_thread(void *arg) {
    do_work();
    return;  /* 崩溃或未定义行为 */
}

/* 正确 */
void good_thread(void *arg) {
    do_work();
    qurt_thread_exit(QURT_EOK);
}

QuRT 线程若从入口函数返回而未调用 qurt_thread_exit(),会导致未定义行为。内核在线程创建期间会将链接寄存器设为 qurt_thread_exit 作为安全措施,但你不应依赖这一点。务必显式调用 qurt_thread_exit()

陷阱 2:栈分配在错误的作用域内

code
/* 错误:栈位于调用线程的栈上 */
void create_thread_bad(void) {
    char stack[4096];
    qurt_thread_attr_set_stack_addr(&attr, stack);
    qurt_thread_create(&tid, &attr, func, NULL);
}   /* 此处 stack 被销毁,新线程崩溃 */

/* 正确:使用静态或堆分配 */
static char stack[4096] __attribute__((aligned(8)));
void create_thread_good(void) {
    qurt_thread_attr_set_stack_addr(&attr, stack);
    qurt_thread_create(&tid, &attr, func, NULL);
}

栈内存的生命周期必须长于使用它的线程。如果你在函数中将栈作为局部变量分配,那么当函数返回时该栈会被释放,但此时线程可能仍在运行。应使用静态分配(如示例所示)或通过谨慎的生命周期管理进行堆分配。

坑点 3:未意识到优先级反转问题

code
/* BAD: 手动自旋锁,无优先级继承 */
volatile int lock = 0;
while (__sync_lock_test_and_set(&lock, 1)) { /* 自旋 */ }

/* GOOD: 支持优先级继承的 QuRT 互斥锁 */
qurt_mutex_lock(&my_mutex);

如果一个高优先级线程在一个被低优先级线程持有的手动自旋锁上自旋,而此时一个中等优先级线程抢占了持有锁的低优先级线程,那么高优先级线程实际上就被中等优先级线程阻塞了。

QuRT 互斥锁通过自动优先级继承机制解决此问题:持有锁的线程会临时提升至最高等待线程的优先级。而手动自旋锁不具备这种处理能力。

坑点 4:内存未对齐

code
/* BAD */
char stack[4096];

/* GOOD */
char stack[4096] __attribute__((aligned(8)));

/* 对于 DMA 缓冲区,通常需要 256 字节对齐 */
char dma_buffer[1024] __attribute__((aligned(256)));

线程栈必须 8 字节对齐。DMA 缓冲区通常要求 256 字节对齐。在 Hexagon 架构上,未对齐的内存访问会导致硬故障,并且几乎不产生任何诊断信息。

坑点 5:在中断服务程序(ISR)上下文中阻塞

code
/* BAD: mutex_lock 可能无限期阻塞 */
void isr_handler(void *arg) {
    qurt_mutex_lock(&some_mutex);
    qurt_mutex_unlock(&some_mutex);
}

/* GOOD: 使用非阻塞 try_lock 并设置回退机制 */
void isr_handler(void *arg) {
    if (qurt_mutex_try_lock(&some_mutex) == QURT_EOK) {
        /* 快速更新 */
        qurt_mutex_unlock(&some_mutex);
    } else {
        /* 推迟任务给处理线程 */
        qurt_signal_set(&deferred_signal, DEFERRED_WORK);
    }
}

尽管 QuRT 的 ISR 线程在技术上调用阻塞 API 是允许的,但在高优先级中断处理程序中这样做会使中断处理冻结,直到阻塞条件解除。应使用 qurt_mutex_try_lock() 进行非阻塞尝试;若无法获取锁,则通过信号将工作推迟到低优先级线程处理。

性能优化

使用 HVX(Hexagon 向量扩展)

code
#include <hexagon_types.h>
#include <hvx_hexagon_protos.h>

/* 一次性处理 128 字节数据 */
void vectorized_gain(int16_t *audio, int num_samples, int16_t gain)
{
    HVX_Vector *vptr = (HVX_Vector *)audio;
    HVX_Vector vgain = Q6_Vh_vsplat_R(gain);
    int num_vectors = num_samples * sizeof(int16_t) / sizeof(HVX_Vector);

    for (int i = 0; i < num_vectors; i++) {
        vptr[i] = Q6_Vh_vmpy_VhVh_sat(vptr[i], vgain);
    }
}

HVX 在 Hexagon DSP 上提供 128 字节的 SIMD 操作。Q6_Vh_vsplat_R 内建函数将一个标量值广播到向量寄存器的所有通道中。Q6_Vh_vmpy_VhVh_sat 对两个半字向量执行饱和乘法运算。单条 HVX 指令即可处理 64 个 16 位样本,在音频和信号处理负载中相比标量代码可提升一个数量级的性能。

将热点数据锁定在 L2 缓存中

code
void lock_cache_example(void)
{
    extern float fft_twiddle_factors[];
    size_t twiddle_size = 1024 * sizeof(float);

    /* 锁定数据在 L2 中,防止被逐出 */
    qurt_mem_l2cache_lock((unsigned int)fft_twiddle_factors,
                           twiddle_size);

    /* 使用完成后解锁: */
    qurt_mem_l2cache_unlock((unsigned int)fft_twiddle_factors,
                             twiddle_size);
}

qurt_mem_l2cache_lock() 将一段内存区域固定在 L2 缓存中,避免其被其他缓存访问所驱逐。这对于频繁访问的查找表或常量数据非常有用(例如 FFT 蝶形因子)。

过度锁定 L2 数据会减少其他线程可用的缓存空间,因此应有选择性地使用此技术。

避免在热点路径中使用动态内存

code
/* BAD: 在音频处理循环中调用 malloc */
void process_audio_bad(void) {
    while (1) {
        float *temp = malloc(1024 * sizeof(float));
        process(temp);
        free(temp);
    }
}

/* GOOD: 预先分配所有资源 */
static float temp_buffer[1024];
void process_audio_good(void) {
    while (1) {
        process(temp_buffer);
    }
}

mallocfree 的执行时间是非确定性的,因为它们可能遍历空闲链表、分割或合并内存块,最坏情况下还需向内核申请更多内存。

在以 48 kHz 运行的实时音频处理循环中,一次缓慢的内存分配就可能导致可听见的卡顿。应在初始化阶段预先分配所有缓冲区并在后续重复使用。

API 快速参考

code
┌─────────────────────────────────────────────────────────────────┐
│                    QuRT API 快速参考                              │
├─────────────────┬───────────────────────────────────────────────┤
│ 线程            │                                               │
│  创建           │ qurt_thread_create(&id, &attr, func, arg)     │
│  退出           │ qurt_thread_exit(status)                      │
│  等待结束       │ qurt_thread_join(id, &status)                 │
│  获取 ID        │ qurt_thread_get_id()                          │
│  休眠           │ qurt_timer_sleep(usec)                        │
├─────────────────┼───────────────────────────────────────────────┤
│ 互斥锁          │                                               │
│  初始化         │ qurt_mutex_init(&mutex)                       │
│  加锁           │ qurt_mutex_lock(&mutex)                       │
│  尝试加锁       │ qurt_mutex_try_lock(&mutex)                   │
│  解锁           │ qurt_mutex_unlock(&mutex)                     │
│  销毁           │ qurt_mutex_destroy(&mutex)                    │
├─────────────────┼───────────────────────────────────────────────┤
│ 信号量          │                                               │
│  初始化         │ qurt_signal_init(&signal)                     │
│  等待           │ qurt_signal_wait(&sig, mask, attr)            │
│  设置           │ qurt_signal_set(&signal, mask)                │
│  清除           │ qurt_signal_clear(&signal, mask)              │
│  销毁           │ qurt_signal_destroy(&signal)                  │
├─────────────────┼───────────────────────────────────────────────┤
│ 定时器          │                                               │
│  创建           │ qurt_timer_create(&timer, &attr)              │
│  删除           │ qurt_timer_delete(timer)                      │
│  休眠           │ qurt_timer_sleep(usec)                        │
│  时钟周期       │ qurt_sysclock_get_hw_ticks()                  │
├─────────────────┼───────────────────────────────────────────────┤
│ 内存            │                                               │
│  缓存刷新       │ qurt_mem_cache_clean(addr, sz, FLUSH)         │
│  缓存失效       │ qurt_mem_cache_clean(addr, sz, INVALIDATE)    │
│  L2 锁定        │ qurt_mem_l2cache_lock(addr, size)             │
│  L2 解锁        │ qurt_mem_l2cache_unlock(addr, size)           │
├─────────────────┼───────────────────────────────────────────────┤
│ 信号量(Semaphore)│                                           │
│  初始化         │ qurt_sem_init_val(&sem, count)                │
│  down (等待)    │ qurt_sem_down(&sem)                           │
│  up (发布)      │ qurt_sem_up(&sem)                             │
│  销毁           │ qurt_sem_destroy(&sem)                        │
├─────────────────┼───────────────────────────────────────────────┤
│ 屏障(Barrier) │                                               │
│  初始化         │ qurt_barrier_init(&barrier, count)            │
│  等待           │ qurt_barrier_wait(&barrier)                   │
│  销毁           │ qurt_barrier_destroy(&barrier)                │
└─────────────────┴───────────────────────────────────────────────┘

此表格按类别列出了最常用的 QuRT API 函数。左侧列为操作名称,右侧列为函数签名。

  • 线程操作包括创建、终止、等待结束和休眠。
  • 互斥锁操作提供加锁、尝试加锁和解锁功能。
  • 信号量操作支持基于位掩码的通知机制,包含等待、设置和清除操作。定时器操作涵盖创建、删除、休眠以及读取硬件时钟周期计数器。
  • 内存操作包括缓存刷新与失效(对跨处理器缓冲区至关重要),以及针对性能关键数据的 L2 缓存锁定。
  • 信号量和屏障操作完善了同步原语的功能。

后续步骤

本手册涵盖了 QuRT 编程的基础知识:线程管理、同步、内存、定时器、中断、管道、FastRPC 以及多传感器融合流水线。深入学习的下一步遵循自然的学习路径。

首先下载 Hexagon SDK,并在模拟器上运行其中包含的示例项目。位于 $HEXAGON_SDK_ROOT/examples/ 中的示例展示了通过 FastRPC 实现的真实 ARM-DSP 通信模式,是了解完整可运行项目的最佳方式。

阅读 $HEXAGON_SDK_ROOT/docs/ 中的 QuRT 用户指南。该文档详细介绍了本文讨论的所有 API,还包括许多未涵盖的内容(例如 QuRT 的 TLB 管理和电源管理接口)。

尝试使用 HVX(Hexagon 向量扩展)。HVX 是 Hexagon DSP 性能的核心所在,掌握编写向量化 DSP 代码的能力,是你所能使用的最大性能优化手段。

最后,获取一块开发板(例如 Qualcomm RB5),并在真实硬件上运行你的代码。模拟器可以验证正确性,但只有真实硬件才能揭示时序行为、缓存效应以及你的代码与 DSP 上其他软件之间的交互情况。

推荐阅读

Hexagon SDK 文档位于 \(HEXAGON_SDK_ROOT/docs/。QuRT API 参考文档位于 \)HEXAGON_SDK_ROOT/docs/qurt/。高通开发者网络(developer.qualcomm.com)提供额外资源、论坛和技术应用笔记。Hexagon DSP 架构参考手册是了解该硬件本身的权威指南。

QuRT 是一种精密工具。它不会引导你前进,但它赋予你对世界上最强大的 DSP 架构之一进行微秒级实时处理的控制能力。学习曲线虽然陡峭,但一旦跨越,你就会明白为何数十亿设备都将其最关键的实时任务托付给这个小巧的操作系统。

  • * *
  • * *

免费学习编程。freeCodeCamp 的开源课程已帮助超过 40,000 人找到开发者工作。开始学习

AI 可能会生成不准确的信息,请核实重要内容