ESP32のFreeRTOS入門 その5 キュー

概要

前回は割り込みと通知でした。今回は通知を少し便利にしたキューを説明したいと思います。

キューとは?

通知では、データの受け渡しや、呼び出し回数が重要な場合に使いにくい時があります。キューはデータの集まりであり、処理すべきデータのリストになります。

何かを処理したい場合にキューにデータを入れて依頼をし、別タスクなどでキューからデータを取り出して実行することなどが可能です。

キューの種類

  • 通常のキュー
  • メールボックス(最後のアイテムのみ保持)

基本的には同じものなのですが、保存できるデータが1個の場合にはメールボックスと呼ぶことがあるようです。現在値に更新するなどの用途であれば、最後の値だけを使えばいいのでメールボックスが便利そうです。

内部的には通常のキューとメールボックスで大きな違いがありません。メールボックス専用のヘルパー関数はあるので、ちょっとだけかんたんに使うことができます。

キューの長さ(大きさ)

メールボックスの場合には長さが1のリストですが、メモリが許すかぎり長いリストも作成可能です。処理によりますが、なるべくキューのリストが満杯にならない長さで作成したほうが安全です。

メールボックスの例

// Mailbox用のキュー
QueueHandle_t xQueueMailbox;
void setup() {
  // キューアイテム
  uint8_t data;
  uint8_t data2;
  BaseType_t ret;
  // デバッグ出力準備
  Serial.begin(115200);
  delay(50);
  Serial.println("==== Queue Mailbox Test ====");
  // メールボックス用途は長さ1のみ
  xQueueMailbox = xQueueCreate(1, sizeof(uint8_t));
  // キューの数確認
  Serial.println("\nキューの数確認");
  Serial.printf("uxQueueMessagesWaiting = %d\n", uxQueueMessagesWaiting(xQueueMailbox));
  // キューの追加可能数確認
  Serial.println("\nキューの追加可能数確認");
  Serial.printf("uxQueueSpacesAvailable = %d\n", uxQueueSpacesAvailable(xQueueMailbox));
  // 送信
  Serial.println("\n送信");
  data = 0;
  ret = xQueueSend(xQueueMailbox, &data, 0);
  Serial.printf("xQueueSend(%d) : ret = %d\n", data, ret);
  // キューの数確認
  Serial.println("\nキューの数確認");
  Serial.printf("uxQueueMessagesWaiting = %d\n", uxQueueMessagesWaiting(xQueueMailbox));
  // キューの追加可能数確認
  Serial.println("\nキューの追加可能数確認");
  Serial.printf("uxQueueSpacesAvailable = %d\n", uxQueueSpacesAvailable(xQueueMailbox));
  // 送信(キューが満杯なので送信失敗する)
  Serial.println("\n送信(キューが満杯なので送信失敗する)");
  data = 1;
  ret = xQueueSend(xQueueMailbox, &data, 0);
  Serial.printf("xQueueSend(%d) : ret = %d\n", data, ret);
  // 受信
  Serial.println("\n受信");
  data2 = -1;
  ret = xQueueReceive(xQueueMailbox, &data2, 0);
  Serial.printf("%d = xQueueReceive() : ret = %d\n", data2, ret);
  // 送信(キューが空いたので送信成功する)
  Serial.println("\n送信(キューが空いたので送信成功する)");
  data = 2;
  ret = xQueueSend(xQueueMailbox, &data, 0);
  Serial.printf("xQueueSend(%d) : ret = %d\n", data, ret);
  // 受信
  Serial.println("\n受信");
  data2 = -1;
  ret = xQueueReceive(xQueueMailbox, &data2, 0);
  Serial.printf("%d = xQueueReceive() : ret = %d\n", data2, ret);
  // 上書き送信(上書きは常に成功する)
  Serial.println("\n上書き送信(上書きは常に成功する)");
  data = 3;
  ret = xQueueOverwrite(xQueueMailbox, &data);
  Serial.printf("xQueueOverwrite(%d) : ret = %d\n", data, ret);
  // 上書き送信(上書きは常に成功する)
  Serial.println("\n上書き送信(上書きは常に成功する)");
  data = 4;
  ret = xQueueOverwrite(xQueueMailbox, &data);
  Serial.printf("xQueueOverwrite(%d) : ret = %d\n", data, ret);
  // 受信確認(キューを消さない受信)
  Serial.println("\n受信(キューを消さない受信)");
  data2 = -1;
  ret = xQueuePeek(xQueueMailbox, &data2, 0);
  Serial.printf("%d = xQueuePeek() : ret = %d\n", data2, ret);
  // 受信
  Serial.println("\n受信");
  data2 = -1;
  ret = xQueueReceive(xQueueMailbox, &data2, 0);
  Serial.printf("%d = xQueueReceive() : ret = %d\n", data2, ret);
  // 受信(キューがないのでエラーになる)
  Serial.println("\n受信(キューがないのでエラーになる)");
  data2 = -1;
  ret = xQueueReceive(xQueueMailbox, &data2, 0);
  Serial.printf("%d = xQueueReceive() : ret = %d\n", data2, ret);
  // キュークリア
  Serial.println("\nキュークリア");
  ret = xQueueReset(xQueueMailbox);
  Serial.printf("xQueueReset : ret = %d\n", ret);
  // キュー削除
  Serial.println("\nキュー削除");
  vQueueDelete(xQueueMailbox);
  Serial.printf("xQueueReset\n");
}
void loop() {
}

ちょっと長めのスケッチですが、できるだけ使うときのパターン別に試してみました。

==== Queue Mailbox Test ====
キューの数確認
uxQueueMessagesWaiting = 0
キューの追加可能数確認
uxQueueSpacesAvailable = 1
送信
xQueueSend(0) : ret = 1
キューの数確認
uxQueueMessagesWaiting = 1
キューの追加可能数確認
uxQueueSpacesAvailable = 0
送信(キューが満杯なので送信失敗する)
xQueueSend(1) : ret = 0
受信
0 = xQueueReceive() : ret = 1
送信(キューが空いたので送信成功する)
xQueueSend(2) : ret = 1
受信
2 = xQueueReceive() : ret = 1
上書き送信(上書きは常に成功する)
xQueueOverwrite(3) : ret = 1
上書き送信(上書きは常に成功する)
xQueueOverwrite(4) : ret = 1
受信確認(キューを消さない受信)
4 = xQueuePeek() : ret = 1
受信
4 = xQueueReceive() : ret = 1
受信(キューがないのでエラーになる)
255 = xQueueReceive() : ret = 0
キュークリア
xQueueReset : ret = 1
キュー削除
xQueueReset

実行結果が上記です。

キューの作成

  // メールボックス用途は長さ1のみ
  xQueueMailbox = xQueueCreate(1, sizeof(uint8_t));

uint8_t型のキューを長さ1で作成しています。構造体をキューにすることもできます。メールボックなので、長さは1のみですが通常のキューであれば任意の長さで作ることができます。

キューの送信

 ret = xQueueOverwrite(xQueueMailbox, &data);

メールボックスの場合には、最後のキューだけ保存されていれば問題ないはずですので上書きで送信した方がよいと思います。通常の送信の場合にはキューが空の時以外にはエラーになってしまいます。

キューの確認

  // キューの数確認
  Serial.println("\nキューの数確認");
  Serial.printf("uxQueueMessagesWaiting = %d\n", uxQueueMessagesWaiting(xQueueMailbox));

uxQueueMessagesWaiting()関数で、受信待ちのキューの数を確認できます。メールボックスなので0か1が戻ってきます。

ただし、通知待ちと同じように、受信時に判定すればよいのでメールボックスの場合には確認関数よりは直接受信でもよいと思います。

キューの受信確認(キューを消さない受信)

  // 受信確認(キューを消さない受信)
  Serial.println("\n受信(キューを消さない受信)");
  data2 = -1;
  ret = xQueuePeek(xQueueMailbox, &data2, 0);
  Serial.printf("%d = xQueuePeek() : ret = %d\n", data2, ret);

xQueuePeek()関数は、受信したキューを消さず、中身だけ確認する関数です。通常の処理ではあまり使わない関数だと思います。

キューの受信

  // 受信
  Serial.println("\n受信");
  data2 = -1;
  ret = xQueueReceive(xQueueMailbox, &data2, 0);
  Serial.printf("%d = xQueueReceive() : ret = %d\n", data2, ret);

xQueueReceive()関数が通常の受信です。最後の0は受信待ちするTick数です。0の場合には即時終了になり、portMAX_DELAYの場合には通知と同じように受信するまでブロックして待ちます。

  while(1){
    if(xQueueReceive(xQueueMailbox, &data2, 0)==pdTRUE){
      // 受信
    }
    delay(1);
  }

実際には上記のようにタイムアウト0Tickのノンブロックをループさせるか、以下のようなブロックで処理するかになると思います。

  while(1){
    xQueueReceive(xQueueMailbox, &data2, portMAX_DELAY);
    // 受信
    delay(1);
  }

専用タスクであればportMAX_DELAYで待った方がシンプルになりますが、他の処理との共有タスクの場合にはノンブロックが必要になります。

キューの後始末

  // キュークリア
  Serial.println("\nキュークリア");
  ret = xQueueReset(xQueueMailbox);
  Serial.printf("xQueueReset : ret = %d\n", ret);
  // キュー削除
  Serial.println("\nキュー削除");
  vQueueDelete(xQueueMailbox);
  Serial.printf("xQueueReset\n");

あまりすることはないと思います。キューは動的に作ったり消したりをすると、消し忘れなどが発生するので、最初に作ったものをずーっと使いまわしたほうが安全です。

また、キューの削除には戻り値がないので、変な値を渡しても消えたかがわかりません。

通常のキューの例

// キューの大きさ
#define QUEUE_LENGTH 4
 
// 通常のキュー
QueueHandle_t xQueue;
void setup() {
  // キューアイテム
  uint8_t data;
  uint8_t data2;
  BaseType_t ret;
  // デバッグ出力準備
  Serial.begin(115200);
  delay(50);
  Serial.println("==== Normal Queue Test ====");
  // キュー作成
  xQueue = xQueueCreate(QUEUE_LENGTH, sizeof(uint8_t));
  // キューの数確認
  Serial.println("\nキューの数確認");
  Serial.printf("uxQueueMessagesWaiting = %d\n", uxQueueMessagesWaiting(xQueue));
  // キューの追加可能数確認
  Serial.println("\nキューの追加可能数確認");
  Serial.printf("uxQueueSpacesAvailable = %d\n", uxQueueSpacesAvailable(xQueue));
  // 送信
  Serial.println("\n送信");
  data = 0;
  ret = xQueueSend(xQueue, &data, 0);
  Serial.printf("xQueueSend(%d) : ret = %d\n", data, ret);
  // キューの数確認
  Serial.println("\nキューの数確認");
  Serial.printf("uxQueueMessagesWaiting = %d\n", uxQueueMessagesWaiting(xQueue));
  // キューの追加可能数確認
  Serial.println("\nキューの追加可能数確認");
  Serial.printf("uxQueueSpacesAvailable = %d\n", uxQueueSpacesAvailable(xQueue));
  // 送信
  Serial.println("\n送信");
  data = 1;
  ret = xQueueSend(xQueue, &data, 0);
  Serial.printf("xQueueSend(%d) : ret = %d\n", data, ret);
  // 送信
  Serial.println("\n送信");
  data = 2;
  ret = xQueueSend(xQueue, &data, 0);
  Serial.printf("xQueueSend(%d) : ret = %d\n", data, ret);
  // 送信
  Serial.println("\n送信");
  data = 3;
  ret = xQueueSend(xQueue, &data, 0);
  Serial.printf("xQueueSend(%d) : ret = %d\n", data, ret);
  // 送信(キューが満杯なので送信失敗する)
  Serial.println("\n送信(キューが満杯なので送信失敗する)");
  data = 4;
  ret = xQueueSend(xQueue, &data, 0);
  Serial.printf("xQueueSend(%d) : ret = %d\n", data, ret);
  // 受信
  Serial.println("\n受信");
  data2 = -1;
  ret = xQueueReceive(xQueue, &data2, 0);
  Serial.printf("%d = xQueueReceive() : ret = %d\n", data2, ret);
  // 送信(キューが空いたので送信成功する)
  Serial.println("\n送信(キューが空いたので送信成功する)");
  data = 5;
  ret = xQueueSend(xQueue, &data, 0);
  Serial.printf("xQueueSend(%d) : ret = %d\n", data, ret);
  // 受信
  Serial.println("\n受信");
  data2 = -1;
  ret = xQueueReceive(xQueue, &data2, 0);
  Serial.printf("%d = xQueueReceive() : ret = %d\n", data2, ret);
  // キューの数確認
  Serial.println("\nキューの数確認");
  Serial.printf("uxQueueMessagesWaiting = %d\n", uxQueueMessagesWaiting(xQueue));
  // 受信確認(キューを消さない受信)
  Serial.println("\n受信(キューを消さない受信)");
  data2 = -1;
  ret = xQueuePeek(xQueue, &data2, 0);
  Serial.printf("%d = xQueuePeek() : ret = %d\n", data2, ret);
  // 受信
  Serial.println("\n受信");
  data2 = -1;
  ret = xQueueReceive(xQueue, &data2, 0);
  Serial.printf("%d = xQueueReceive() : ret = %d\n", data2, ret);
  // 送信(キューの先頭に追加)
  Serial.println("\n送信(キューの先頭に追加)");
  data = 6;
  ret = xQueueSendToFront(xQueue, &data, 0);
  Serial.printf("xQueueSend(%d) : ret = %d\n", data, ret);
  // 受信
  Serial.println("\n受信");
  data2 = -1;
  ret = xQueueReceive(xQueue, &data2, 0);
  Serial.printf("%d = xQueueReceive() : ret = %d\n", data2, ret);
  // 受信
  Serial.println("\n受信");
  data2 = -1;
  ret = xQueueReceive(xQueue, &data2, 0);
  Serial.printf("%d = xQueueReceive() : ret = %d\n", data2, ret);
  // 受信
  Serial.println("\n受信");
  data2 = -1;
  ret = xQueueReceive(xQueue, &data2, 0);
  Serial.printf("%d = xQueueReceive() : ret = %d\n", data2, ret);
  // 受信(キューがないのでエラーになる)
  Serial.println("\n受信(キューがないのでエラーになる)");
  data2 = -1;
  ret = xQueueReceive(xQueue, &data2, 0);
  Serial.printf("%d = xQueueReceive() : ret = %d\n", data2, ret);
  // Serialのフラッシュ
  Serial.flush();
  // キュークリア
  Serial.println("\nキュークリア");
  ret = xQueueReset(xQueue);
  Serial.printf("xQueueReset : ret = %d\n", ret);
  // キュー削除
  Serial.println("\nキュー削除");
  vQueueDelete(xQueue);
  Serial.printf("xQueueReset\n");
}
void loop() {
}

ごめんなさい、こちらも長くなりました。また、シリアル出力が長くなりすぎて、出力が止まってしまったので、Serial.flush()が途中に入っています。

==== Normal Queue Test ====
キューの数確認
uxQueueMessagesWaiting = 0
キューの追加可能数確認
uxQueueSpacesAvailable = 4
送信
xQueueSend(0) : ret = 1
キューの数確認
uxQueueMessagesWaiting = 1
キューの追加可能数確認
uxQueueSpacesAvailable = 3
送信
xQueueSend(1) : ret = 1
送信
xQueueSend(2) : ret = 1
送信
xQueueSend(3) : ret = 1
送信(キューが満杯なので送信失敗する)
xQueueSend(4) : ret = 0
受信
0 = xQueueReceive() : ret = 1
送信(キューが空いたので送信成功する)
xQueueSend(5) : ret = 1
受信
1 = xQueueReceive() : ret = 1
キューの数確認
uxQueueMessagesWaiting = 3
受信(キューを消さない受信)
2 = xQueuePeek() : ret = 1
受信
2 = xQueueReceive() : ret = 1
送信(キューの先頭に追加)
xQueueSend(6) : ret = 1
受信
6 = xQueueReceive() : ret = 1
受信
3 = xQueueReceive() : ret = 1
受信
5 = xQueueReceive() : ret = 1
受信(キューがないのでエラーになる)
255 = xQueueReceive() : ret = 0
キュークリア
xQueueReset : ret = 1
キュー削除
xQueueReset

実行結果は上記になります。

キューの作成

// キューの大きさ
#define QUEUE_LENGTH 4
// キュー作成
xQueue = xQueueCreate(QUEUE_LENGTH, sizeof(uint8_t));

通常のキューですので、好きな長さでキューを作成することができます。この例では4で作成しています。

キューの状態

  // キューの数確認
  Serial.println("\nキューの数確認");
  Serial.printf("uxQueueMessagesWaiting = %d\n", uxQueueMessagesWaiting(xQueue));
  // キューの追加可能数確認
  Serial.println("\nキューの追加可能数確認");
  Serial.printf("uxQueueSpacesAvailable = %d\n", uxQueueSpacesAvailable(xQueue));

メールボックスと同じ関数で状態が取得できます。送信時には追加可能数を確認して、追加する空きがあるか、受信時にはキューの数を確認して、受信すべきキューがあるかを確認します。

キューの送信

  // 送信
  Serial.println("\n送信");
  data = 0;
  ret = xQueueSend(xQueue, &data, 0);
  Serial.printf("xQueueSend(%d) : ret = %d\n", data, ret);

キューを送信するときには、キューのどこに追加するのかと、キューに空きがあるかを意識する必要があります。

また、xQueueOverwrite()関数で上書きしようとすると、キューのサイズが1のメールボックス以外ではパニックが発生して、リブートしてしまうので注意してください。

通常のxQueueSend()関数の場合には、キューの最後に追加されます。一般的なFIFO(First In, First Out)のキューで、入れた順にキューが並んでいます。昔に入れたものから出てくるキューになります。

  // 送信(キューの先頭に追加)
  Serial.println("\n送信(キューの先頭に追加)");
  data = 6;
  ret = xQueueSendToFront(xQueue, &data, 0);
  Serial.printf("xQueueSend(%d) : ret = %d\n", data, ret);

xQueueSendToFront()関数はキューの先頭に追加します。キューが処理してほしいリストの場合には、先頭に追加することで、最優先にそのデータが実行されることになります。

キューに空きがあるかは、uxQueueSpacesAvailable()関数で1以上の数字が戻ってくれば送信できます。ただし、マルチタスクや割り込みなど、送信箇所が複数ある場合にはuxQueueSpacesAvailable()関数の戻り値が0以外の場合でも、実際に送信するまでの間にキューが満杯になってしまうことがあり得るので注意してください。

送信関数の最後の引数が送信待ち時間です。通常は0でキューが満杯の場合には即時エラーになりますが、受信と同じようにportMAX_DELAYを指定することで、送信できるまで待機します。

 ret = xQueueSend(xQueue, &data, portMAX_DELAY);

マルチタスクなどで、他のタスクが常にキューを処理している場合には、portMAX_DELAYで送信待ちをしていても、すぐにキューが空くので送信できると思います。

しかしながら、受信側の処理が止まってしまうと、送信側も永遠に待ち続けるためにハングアップしたような状況になってしまいます。送信側でportMAX_DELAYを使うときには、かなり注意して使わないと危険だと思います。

  for(int i=0;i<5;i++){
      ret = xQueueSend(xQueue, &data, 1000);
      if(ret==pdTRUE){
        break;
      }
      delay(1);
  }
  if(i==5){
    // Timeout
  }

上記のように1000Tick(1000ミリ秒=1秒)を5回チャレンジして、だめだったらタイムアウトみたいな処理の方が安心だと思います。

  for(int i=0;i<5;i++){
      ret = xQueueSend(xQueue, &data, 0);
      if(ret==pdTRUE){
        break;
      }
      delay(1000);
  }
  if(i==5){
    // Timeout
  }

ブロック関数はちょっと怖いので、上記のようにdelay()で時間調整をしてもよいと思います。このへんは好みだと思います。送信が一箇所しかないのであればuxQueueMessagesWaiting()関数で送信空きができるまでdelay()するループの方が丁寧かもしれません。

キューの受信と後始末

この辺はメールボックスのときと全く同じです。キューが複数あるので、処理する順番が重要になりますが、送信側の設定になるので受信側は気にする必要はありません。

キューセット

キューを複数まとめて管理するキューセットという機能があります。受信するタスクが1つで、複数のキューを受信する場合にportMAX_DELAYで受信待ちができません。

  while(1){
    // 取得可能なキューを取得する
    QueueHandle_t queue = xQueueSelectFromSet(xQueueSet, portMAX_DELAY);
 
    // キューの種類を調べる
    if(queue == xQueue1){
      Serial.print("xQueue1 ");
    } else if(queue == xQueue2) {
      Serial.print("xQueue2 ");
    } else {
      Serial.print("? ");
    }
     
    // 受信
    ret = xQueueReceive(queue, &data2, 0);
    // 処理
  }

あらかじめキューセットを作っておくと、上記のように処理ができます。

  while(1){
    if(xQueueReceive(xQueue1, &data2, 0)==pdTRUE){
      // キュー1受信
    }
    if(xQueueReceive(xQueue2, &data2, 0)==pdTRUE){
      // キュー2受信
    }
    delay(1);
  }

ただし、多くの場合上記のようにノンブロックで処理をすれば問題ないと思います。時系列でどちらが早く送信されたかが重要な場合には、上記の場合ほぼ同時に受信した場合にxQueue1が優先されることがあるので、問題がでると思います。

個人的にはあまり複雑なことをするよりは、単純に処理をしたほうがいいのでキューセットはおすすめしません。

割り込みからの呼び出し

#include <M5StickC.h>
// キューの大きさ
#define QUEUE_LENGTH 16
// 通常のキュー
QueueHandle_t xQueue;
TaskHandle_t taskHandle;
void IRAM_ATTR onButton() {
  uint8_t data = 0;
  uint8_t ret;
  ret = xQueueSendFromISR(xQueue, &data, 0);
  ret = xQueueSendFromISR(xQueue, &data, 0);
  ret = xQueueSendFromISR(xQueue, &data, 0);
  ret = xQueueSendFromISR(xQueue, &data, 0);
  ret = xQueueSendFromISR(xQueue, &data, 0);
}
void task1(void *pvParameters) {
  while (1) {
    uint8_t data;
    if (xQueueReceive(xQueue, &data, 0)==pdTRUE) {
      Serial.println(M5.Axp.GetBatVoltage());
    }
    delay(1);
  }
}
void setup() {
  M5.begin();
  // キュー作成
  xQueue = xQueueCreate(QUEUE_LENGTH, sizeof(uint8_t));
  // Core1の優先度1で通知受信用タスク起動
  xTaskCreateUniversal(
    task1,
    "task1",
    8192,
    NULL,
    1,
    &taskHandle,
    APP_CPU_NUM
  );
  pinMode(GPIO_NUM_37, INPUT);
  attachInterrupt(GPIO_NUM_37, onButton, FALLING);
}
void loop() {
  delay(1);
}

通知のときに利用したM5StickC用スケッチをキューに書き換えてみました。通知の場合には連続して送信しても、1つしか受信しないことがありましたが、キューの場合にはキューが満杯にならない限り、送信した数だけ受信することができます。

資料

日本語リファレンス

上記は、私がよくわからないときに訳しているので、ちょっとおかしいところがあるかもしれません。

関連ブログ

まとめ

処理自体はそれほど複雑ではないと思いますが、スケッチ例が長くなってしまいました。通知とキューの違いはわかったでしょうか?

さて、次回は本来の意味での排他制御を説明したいと思います。

続編

コメント