> 文章列表 > Zephyr mailbox

Zephyr mailbox

Zephyr mailbox

文章目录

  • 简介
  • 数据结构
    • k_mbox
    • k_mbox_msg
  • 同步模式和异步模式
    • 同步模式
    • 异步模式
      • 异步消息描述符
      • 异步消息描述符内存池
        • 异步消息描述符管理
  • 发送邮件
      • 同步发送邮件
      • 异步发送邮件
  • 接收邮件

简介

  • mailbox 是Zephyr 中的一个内核对象,它提供了增强的消息队列功能,超越了消息队列对象的能力。邮箱允许线程同步或异步地发送和接收任何大小的消息。
  • 信箱允许线程,但不允许 ISR,交换消息。发送消息的线程被称为发送线程,而接收消息的线程则被称为接收线程。每个消息只能由一个线程接收(即不支持点对多点和广播消息)。
  • 使用邮箱交换的消息是以非匿名方式处理的,参与交换的两个线程知道(甚至指定)另一个线程的身份。
  • 消息描述符是一个数据结构,它指定了消息数据的位置,以及消息如何被邮箱处理。发送线程和接收线程在访问邮箱时都提供一个消息描述符。信箱使用消息描述符在兼容的发送和接收线程之间进行消息交换。在交换过程中,邮箱也会更新某些消息描述符字段,让两个线程知道发生了什么。
  • 一个邮箱消息包含零或多个字节的消息数据。消息数据的大小和格式是由应用程序定义的,并且可以从一个消息到另一个消息有所不同。
  • 消息缓冲区是由发送或接收消息数据的线程提供的一个内存区域。一个数组或结构变量通常可用于此目的。
  • 一个既没有消息数据形式的消息被称为空消息。

数据结构

k_mbox

struct k_mbox {/** Transmit messages queue */_wait_q_t tx_msg_queue;/** Receive message queue */_wait_q_t rx_msg_queue;struct k_spinlock lock;SYS_PORT_TRACING_TRACKING_FIELD(k_mbox)
};

k_mbox_msg

struct k_mbox_msg {/** internal use only - needed for legacy API support */uint32_t _mailbox;/** size of message (in bytes) */size_t size;/** application-defined information value */uint32_t info;/** sender's message data buffer */void *tx_data;/** internal use only - needed for legacy API support */void *_rx_data;/** message data block descriptor */struct k_mem_block tx_block;/** source thread id */k_tid_t rx_source_thread;/** target thread id */k_tid_t tx_target_thread;/** internal use only - thread waiting on send (may be a dummy) */k_tid_t _syncing_thread;
#if (CONFIG_NUM_MBOX_ASYNC_MSGS > 0)/** internal use only - semaphore used during asynchronous send */struct k_sem *_async_sem;
#endif
};
  • size
    • 发送消息的长度
  • info
    • 应用程序定义的消息值,发送成功会将发送者和接收者的 info 值进行交换。
  • tx_data
    • 发送者的消息缓冲区,可以用于保存需要发送的数据。
  • tx_block
    • 消息数据块描述符,当 tx_data 使用时,tx_block 无用,当 tx_data 为空且 tx_block.data 不为空时, tx_block.data 作为数据拷贝的来源
  • _syncing_thread
    • 因发送陷入等待的线程,如果采用模式,该指针指向调用 k_mbox_put 的线程。
    • 如果采用异步模式,该指针指向异步消息描述符中的 dummy 线程。
    • _syncing_thread 所指向的线程会被挂起等待。
  • tx_target_thread
    • 发送线程 tcb 指针
  • rx_source_thread
    • 接收线程 tcb 指针

同步模式和异步模式

同步模式

  • 在同步交换中,发送线程会阻塞,直到消息被接收线程完全处理。
  • 同步交换技术提供了一种隐含的流量控制形式,防止发送线程生成消息的速度超过接收线程的消费速度。

异步模式

  • 在异步交换中,发送线程不会等到消息被另一个线程收到后才继续;这允许发送线程在消息被交给接收线程并完全处理之前做其他工作(例如收集将在下一条消息中使用的数据)。用于特定消息交换的技术是由发送线程决定的。
  • 异步交换技术提供了一种明确的流量控制形式,它允许发送线程在发送后续消息之前确定先前发送的消息是否仍然存在。

异步消息描述符

struct k_mbox_async {struct _thread_base thread;		/* dummy thread object */struct k_mbox_msg tx_msg;	/* transmit message descriptor */
};
  • thread
    • 在异步模式下,发送者为了不被挂起,需要提供一个 dummy 线程(Zephyr 中 dummy 线程永远不会被调度执行,但是它可用于参与上下文切换,可以被阻塞,被挂起),将 dummy 线程设置为发送者,替换真实的发送者被阻塞,从而实现异步发送。
  • tx_msg
    • 发送的消息,由于异步模式下发送者不会被阻塞,如果发送消息位于栈上,发送者继续向下运行肯定会破坏数据,因此需要将用户数据拷贝到 tx_msg 中。

异步消息描述符内存池

  • 异步模式下存在一个内存池,用于提供异步消息描述符所需要的内存,内存池中的一项就是一个异步消息描述符,需要将其中的每一项的 thread 初始化为 dummy 类型,从而避免其被调度器调度运行使系统崩溃。
  • 为了在邮箱使用前将这段内存初始化,使用了Zephyr中的模块自动初始化机制,在内核启动前调用 init_mbox_module 将 async_msg 初始化,初始化后将每一个节点放入栈中,该函数运行完之后,栈中保存的就是空闲未使用的异步消息描述符。

使用异步模式初始化内存池:

#if (CONFIG_NUM_MBOX_ASYNC_MSGS > 0)static int init_mbox_module(const struct device *dev)
{ARG_UNUSED(dev);/* 创建异步消息描述符的内存池 */static struct k_mbox_async __noinit async_msg[CONFIG_NUM_MBOX_ASYNC_MSGS];#if (CONFIG_NUM_MBOX_ASYNC_MSGS > 0)/* dummy 线程仅需要执行最小初始化,未分配栈空间,* 未指定入口地址,初始状态设置为 _THREAD_DUMMY,表明其本身不是一个真实的线程* 初始化完成后将每一个描述符添加到栈中,栈中的描述符即为剩余未使用的描述符。*/int i;for (i = 0; i < CONFIG_NUM_MBOX_ASYNC_MSGS; i++) {z_init_thread_base(&async_msg[i].thread, 0, _THREAD_DUMMY, 0);k_stack_push(&async_msg_free, (stack_data_t)&async_msg[i]);}
#endif /* CONFIG_NUM_MBOX_ASYNC_MSGS > 0 */return 0;
}SYS_INIT(init_mbox_module, PRE_KERNEL_1, CONFIG_KERNEL_INIT_PRIORITY_OBJECTS);#endif /* CONFIG_NUM_MBOX_ASYNC_MSGS */

异步消息描述符管理

  • 异步发送时需要从内存池取出一个描述符,接收时需要将描述符归还。
  • 此处使用栈来执行内存管理,分配时从栈中 pop 一个节点,归还时向栈中 push 一个节点。
  • 接收者会判断发送者是否为 dummy 线程,如果是则将内存进行回收。
/* stack of unused asynchronous message descriptors */
K_STACK_DEFINE(async_msg_free, CONFIG_NUM_MBOX_ASYNC_MSGS);/* allocate an asynchronous message descriptor */
static inline void mbox_async_alloc(struct k_mbox_async **async)
{(void)k_stack_pop(&async_msg_free, (stack_data_t *)async, K_FOREVER);
}/* free an asynchronous message descriptor */
static inline void mbox_async_free(struct k_mbox_async *async)
{k_stack_push(&async_msg_free, (stack_data_t)async);
}

发送邮件

  • 发送邮件是通过 mbox_message_put 实现的,针对同步发送和异步发送有所区别。
  • 下面是 mbox_message_put 实现:
static int mbox_message_put(struct k_mbox *mbox, struct k_mbox_msg *tx_msg,k_timeout_t timeout)
{struct k_thread *sending_thread;struct k_thread *receiving_thread;struct k_mbox_msg *rx_msg;k_spinlock_key_t key;/* 在发送邮件时设置发送者ID */tx_msg->rx_source_thread = _current;/* 将当前线程的 swap_data 指向 tx_msg,以便接收者从 swap_data 读取数据 */sending_thread = tx_msg->_syncing_thread;sending_thread->base.swap_data = tx_msg;/* search mailbox's rx queue for a compatible receiver */key = k_spin_lock(&mbox->lock);SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_mbox, message_put, mbox, timeout);_WAIT_Q_FOR_EACH(&mbox->rx_msg_queue, receiving_thread) {rx_msg = (struct k_mbox_msg *)receiving_thread->base.swap_data;/* 从接收等待队列中查找符合要求的接收线程(接收地址和发送地址要匹配) * 然后将消息从发送者拷贝到接收者,接收者会通过判断 _syncing_thread * 是否为dummy 线程决定是否需要释放内存。*/if (mbox_message_match(tx_msg, rx_msg) == 0) {/* 将接收线程从接收队列中取出并唤醒 */z_unpend_thread(receiving_thread);/* 将接收线程切换为就绪状态,设置返回值0 */arch_thread_return_value_set(receiving_thread, 0);z_ready_thread(receiving_thread);#if (CONFIG_NUM_MBOX_ASYNC_MSGS > 0)/* 如果发送者为 dummy 线程,此处会触发一次上下文切换立即唤醒接收者 */if ((sending_thread->base.thread_state & _THREAD_DUMMY)!= 0U) {z_reschedule(&mbox->lock, key);return 0;}
#endifSYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_mbox, message_put, mbox, timeout);/* 如果使用同步发送,将会把当前线程挂起,直到邮件被接收完成 */int ret = z_pend_curr(&mbox->lock, key, NULL, K_FOREVER);SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_mbox, message_put, mbox, timeout, ret);return ret;}}/* 找不到接收者,等待时间为0,返回-ENOMSG */if (K_TIMEOUT_EQ(timeout, K_NO_WAIT)) {SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_mbox, message_put, mbox, timeout, -ENOMSG);k_spin_unlock(&mbox->lock, key);return -ENOMSG;}#if (CONFIG_NUM_MBOX_ASYNC_MSGS > 0)/* 如果使用异步发送,由于此时没有接收者, 需要将 dummy 线程添加到 tx_msg_queue 中,* 发送者可以继续向下执行,返回0。*/if ((sending_thread->base.thread_state & _THREAD_DUMMY) != 0U) {z_pend_thread(sending_thread, &mbox->tx_msg_queue, K_FOREVER);k_spin_unlock(&mbox->lock, key);return 0;}
#endifSYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_mbox, message_put, mbox, timeout);/* 同步发送时,发送者会被阻塞一段时间,直到超时或者被接收,之后会从 tx_msg_queue 中取出 */int ret = z_pend_curr(&mbox->lock, key, &mbox->tx_msg_queue, timeout);SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_mbox, message_put, mbox, timeout, ret);return ret;
}
  • 当 rx_msg_queue 中存在接收者等待的情况下,无论是采用同步发送还是异步发送邮件,当前线程都有可能暂停,下面是两种方式的区别:
    • 当采用异步方式发送,会触发一次上下文切换,如果接收者优先级更高,当前线程会停止运行被移入就绪队列中,然后运行接收线程, 接收线程会回收 dummy线程的内存,发送线程直到下一次调度才能继续运行。
    • 采用同步模式时,会直接将当前线程设置为 _THREAD_PENDING 状态,然后运行接收线程,由于同步模式发送者为真实线程,因此不会回收内存,但是会将发送线程从挂起转换为就绪态,发送线程重新开始运行。

同步发送邮件

int k_mbox_put(struct k_mbox *mbox, struct k_mbox_msg *tx_msg,k_timeout_t timeout)
{/* 采用同步发送模式时,被阻塞的线程为调用线程本身 */tx_msg->_syncing_thread = _current;SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_mbox, put, mbox, timeout);int ret = mbox_message_put(mbox, tx_msg, timeout);SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_mbox, put, mbox, timeout, ret);return ret;
}

异步发送邮件

#if (CONFIG_NUM_MBOX_ASYNC_MSGS > 0)
void k_mbox_async_put(struct k_mbox *mbox, struct k_mbox_msg *tx_msg,struct k_sem *sem)
{struct k_mbox_async *async;SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_mbox, async_put, mbox, sem);/* 从内存池分配异步消息描述符,然后将 dummy 线程设置为被阻塞对象,调用者继续向后执行  */mbox_async_alloc(&async);async->thread.prio = _current->base.prio;async->tx_msg = *tx_msg;async->tx_msg._syncing_thread = (struct k_thread *)&async->thread;async->tx_msg._async_sem = sem;(void)mbox_message_put(mbox, &async->tx_msg, K_FOREVER);SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_mbox, async_put, mbox, sem);
}

接收邮件

  • 接收函数只有一个,接收者可通过 thread_state 的 dummy 位判断发送方式为同步或者异步。
  • dummy 线程在接收完成后会被回收资源,而真实线程会由挂起状态转换为就绪态。
void k_mbox_data_get(struct k_mbox_msg *rx_msg, void *buffer)
{/* handle case where data is to be discarded */if (buffer == NULL) {rx_msg->size = 0;mbox_message_dispose(rx_msg);return;}/* copy message data to buffer, then dispose of message */if ((rx_msg->tx_data != NULL) && (rx_msg->size > 0U)) {(void)memcpy(buffer, rx_msg->tx_data, rx_msg->size);}mbox_message_dispose(rx_msg);
}static void mbox_message_dispose(struct k_mbox_msg *rx_msg)
{struct k_thread *sending_thread;struct k_mbox_msg *tx_msg;/* do nothing if message was disposed of when it was received */if (rx_msg->_syncing_thread == NULL) {return;}if (rx_msg->tx_block.data != NULL) {rx_msg->tx_block.data = NULL;}/* recover sender info */sending_thread = rx_msg->_syncing_thread;rx_msg->_syncing_thread = NULL;tx_msg = (struct k_mbox_msg *)sending_thread->base.swap_data;/* update data size field for sender */tx_msg->size = rx_msg->size;#if (CONFIG_NUM_MBOX_ASYNC_MSGS > 0)/* 回收资源 */if ((sending_thread->base.thread_state & _THREAD_DUMMY) != 0U) {struct k_sem *async_sem = tx_msg->_async_sem;mbox_async_free((struct k_mbox_async *)sending_thread);if (async_sem != NULL) {k_sem_give(async_sem);}return;}
#endif/* 唤醒发送线程*/arch_thread_return_value_set(sending_thread, 0);z_mark_thread_as_not_pending(sending_thread);z_ready_thread(sending_thread);z_reschedule_unlocked();
}

彭州一中资讯