arduino-cliのgRPC連携をPythonで試してみた

概要

arduino-cliはコマンドとして実行する以外にgRPCを利用したデーモンモードをサポートしています。Arduino IDEなども内部ではデーモンモードで起動しているarduino-cliに対してgRPCをコマンドを投げているようでした。

ただし、PythonでのgRPC連携は若干クセがあるのと情報が少ないので苦戦しました。今回はボード一覧を取得して表示するところまでを検証してみました。

Pythonの環境をuvで構築

cd /tmp
uv init arduino-cli-grpc-test
cd arduino-cli-grpc-test

とりあえず/tmpにuvでPythonの新しい環境を作ってみます。

arduino-cliの準備

あらかじめ入っている場合には入れる必要がありません。

arduino-cli version

上記のコマンドを実行してみてバージョンが戻ってくれば大丈夫です。

Windowsの場合

winget install ArduinoSA.CLI

上記でインストールすることが可能です。ただし、プロジェクトフォルダの中にダウンロードした実行ファイルを置くだけでも動きます。

上記から最新版をダウンロードして、解答するだけでも大丈夫です。

Linux(wsl)の場合

curl -fsSL https://raw.githubusercontent.com/arduino/arduino-cli/master/install.sh | sh

上記が公式なインストールコマンドなのですが、ちょっと面倒なのでローカルにコピーして進めます。

wget https://downloads.arduino.cc/arduino-cli/arduino-cli_latest_Linux_64bit.tar.gz
tar zxvf arduino-cli_latest_Linux_64bit.tar.gz
rm arduino-cli_latest_Linux_64bit.tar.gz LICENSE.txt

最新版をコピーしてきて、解凍後に不要なファイルを削除しています。

arduino-cliの環境整備

デフォルトの状態だとなにもコアがインストールされていない状態なので、ボード一覧も表示されません。とりあえずESP32のコアをインストールしてみたいと思います。

./arduino-cli config add board_manager.additional_urls https://espressif.github.io/arduino-esp32/package_esp32_index.json
./arduino-cli core update-index
./arduino-cli core install esp32:esp32
./arduino-cli core upgrade
./arduino-cli board listall

最初にボードマネージャーのURLを追加してから、ESP32をインストールしています。Arduino IDEが入っている環境の場合には何もしなくても最後のboard listallで何らかのボード一覧が表示されると思います。

protoファイルの準備

gRPCはGoogleが作ったデータ交換方式で、リクエストに対してレスポンスを複数返せたり、ストリーミングで通信をすることなどができます。しかしながら事前にどんな連携が可能かをサーバー側とクライアント側が把握する必要があります。

連携可能な項目を設定するのがprotoファイルなのですが、事前に交換しておくことが基本なのですが、サーバー側から接続したクライアントに公開するリフレクション機能があるのですがarduino-cliはサポートしていませんでした。

そのため、GitHubから最新版のprotoファイルを取得しておく必要があります。

wget https://github.com/arduino/arduino-cli/releases/download/v1.2.2/arduino-cli_1.2.2_proto.zip
unzip arduino-cli_1.2.2_proto.zip
rm arduino-cli_1.2.2_proto.zip

上記でprotoファイルを取得しています。バージョンはそのときの最新のものを取得するようにしてください。

├─cc
│  └─arduino
│      └─cli
│          └─commands
│              └─v1
│                      board.proto
│                      commands.proto
│                      common.proto
│                      compile.proto
│                      core.proto
│                      debug.proto
│                      lib.proto
│                      monitor.proto
│                      port.proto
│                      settings.proto
│                      upload.proto
│
└─google
    └─rpc
            status.proto

上記のような構造で展開されたと思います。少し深いファイル構造になっているためいろいろ面倒です。そしてGoogleのprotoファイルも利用しているのがハマりポイントです。このprotoを利用して通信をするのですが、このままではPythonから呼び出しができません。クラスファイルに変換するツールがあるので次の項目で変換を実行します。

protoの変換

uv add grpcio grpcio-tools

まずuvで利用するツールを追加します。

uv run python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. cc/arduino/cli/commands/v1/*.proto
uv run python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. google/rpc/*.proto

その後にgrpc_tools.protocで変換を行います。注意しないといけないのはすべてのprocファイルを変換する必要があります。

google
└─rpc
    │  status.proto
    │  status_pb2.py
    │  status_pb2_grpc.py
    │
    └─__pycache__
            status_pb2.cpython-313.pyc

分量が多いのでgoogleフォルダのみですが、上記のようにprotoから_pb2.pyと_pb2_grpc.pyが生成されています。実際にPythonからはこちらのクラスを呼び出す形となります。しかしながらprotoファイルを人がみて判断しないといけないところがあるので、protoファイルも残しておいたほうがいいと思います。

Pythonのプログラム作成

import json
import subprocess
import sys
import shutil
import os
import grpc
import platform
from cc.arduino.cli.commands.v1 import commands_pb2
from cc.arduino.cli.commands.v1 import commands_pb2_grpc
from cc.arduino.cli.commands.v1 import board_pb2

MAX_MESSAGE_LENGTH = 100 * 1024 * 1024  # 100MB


def find_arduino_cli():
    system = platform.system()
    exe_name = "arduino-cli.exe" if system == "Windows" else "arduino-cli"

    current_dir_path = os.path.join(os.getcwd(), exe_name)
    if os.path.isfile(current_dir_path) and os.access(current_dir_path, os.X_OK):
        return current_dir_path

    path_cli = shutil.which(exe_name)
    if path_cli:
        return path_cli

    return None


def arduino_cli():
    # Check if arduino-cli is available in the system PATH
    cmd = find_arduino_cli()
    if cmd is None:
        print("arduino-cli is NOT installed or not in PATH.")
        sys.exit(1)
    print("exec        :" + cmd)

    # Start the arduino-cli daemon process and capture its output
    process = subprocess.Popen(
        [cmd, "daemon", "--port", "0"],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True,
        bufsize=1,
    )
    # Read the output line by line to extract IP and Port information
    for line in process.stdout:
        try:
            log_data = json.loads(line.strip())
            if "IP" in log_data and "Port" in log_data:
                ip = log_data["IP"]
                port = log_data["Port"]
                break  # Exit loop when required information is found
        except json.JSONDecodeError:
            continue  # Ignore lines that are not valid JSON

    # If IP or Port could not be obtained, exit with an error
    if ip is None or port is None:
        print("Failed to get IP and Port from Arduino CLI daemon.")
        sys.exit(1)

    # Return the address in the format "IP:Port"
    return process, f"{ip}:{port}"


def main():
    # Start Arduino CLI and get the gRPC server address
    process, target = arduino_cli()
    print("target      :" + target)

    # Connect to the gRPC server using the obtained address
    with grpc.insecure_channel(
        target, options=[("grpc.max_receive_message_length", MAX_MESSAGE_LENGTH)]
    ) as channel:
        stub = commands_pb2_grpc.ArduinoCoreServiceStub(channel)

        # Request the version of Arduino CLI
        request = commands_pb2.VersionRequest()
        response = stub.Version(request)
        print("version     :" + response.version)

        # Create a new Arduino CLI instance
        request = commands_pb2.CreateRequest()
        response = stub.Create(request)
        id = response.instance.id
        print("instance id :" + str(id))

        # Initialize the created instance
        request = commands_pb2.InitRequest(instance={"id": id})
        for chunk in stub.Init(request):
            print("init        :" + str(chunk))

        # List all available boards
        print("board list  :")
        request = board_pb2.BoardListAllRequest(instance={"id": id})
        response = stub.BoardListAll(request)
        for board in response.boards:
            print(board.name.ljust(50) + " " + board.fqbn)

        # Terminate the arduino-cli daemon process
        process.terminate()


if __name__ == "__main__":
    # Entry point of the script
    main()

短いので全文をいきなり乗せます。

import json
import subprocess
import sys
import shutil
import os
import grpc
import platform

ここは実行ファイルのPathを調べたり、デーモン起動するために利用しているシステム系のimportになります。

from cc.arduino.cli.commands.v1 import commands_pb2
from cc.arduino.cli.commands.v1 import commands_pb2_grpc
from cc.arduino.cli.commands.v1 import board_pb2

これが先程生成したクラスファイルです。レスポンスはcommands_pb2_grpcになるのですが、リクエストは個別のprotoファイルと同じものを呼び出す必要がありcommandsとboardを読み込んでいます。

MAX_MESSAGE_LENGTH = 100 * 1024 * 1024  # 100MB

上記はメッセージ長の指定になります。デフォルトだと4MBに設定されており、ボード一覧などの大量データを受信しようとするとエラーになるのでかなり大きな値を設定しています。

def find_arduino_cli():
    system = platform.system()
    exe_name = "arduino-cli.exe" if system == "Windows" else "arduino-cli"

    current_dir_path = os.path.join(os.getcwd(), exe_name)
    if os.path.isfile(current_dir_path) and os.access(current_dir_path, os.X_OK):
        return current_dir_path

    path_cli = shutil.which(exe_name)
    if path_cli:
        return path_cli

    return None

arduino-cliの実行ファイルを探す関数です。Windowsだとarduino-cli.exe、それ以外だとarduino-cliを探しています。Pathからのみ実行するのであればシンプルなのですが、arduino-cliだと./arduino-cliが含まれないのでこの関数でコマンドのPathを調べています。

def arduino_cli():
    # Check if arduino-cli is available in the system PATH
    cmd = find_arduino_cli()
    if cmd is None:
        print("arduino-cli is NOT installed or not in PATH.")
        sys.exit(1)
    print("exec        :" + cmd)

さっきの関数を呼び出して、Pathを調べています。本当は発見できなかったらダウンロードしてくるのが親切ですが、今回はエラーを表示するだけにしました。

    # Start the arduino-cli daemon process and capture its output
    process = subprocess.Popen(
        [cmd, "daemon", "--port", "0"],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True,
        bufsize=1,
    )

デーモンモードで実行しています。「arduino-cli daemon –port 0」を実行するだけなのですが、いろいろな環境を考えるとPathを調べる必要がありました。また、port指定を0ですると空いているポート番号で起動するようになります。デフォルトは50051なのですが、ほかで利用していると起動エラーになるのでランダムにしました。

    # Read the output line by line to extract IP and Port information
    for line in process.stdout:
        try:
            log_data = json.loads(line.strip())
            if "IP" in log_data and "Port" in log_data:
                ip = log_data["IP"]
                port = log_data["Port"]
                break  # Exit loop when required information is found
        except json.JSONDecodeError:
            continue  # Ignore lines that are not valid JSON

ここも面倒な処理なのですが、デーモンモードで起動すると標準出力に待受をしているIPアドレスとポート番号を表示します。

>arduino-cli daemon --port 0
Daemon is now listening on 127.0.0.1:64326
{"IP":"127.0.0.1","Port":"64326"}

上記のような表示になるので、標準出力を受信してIPアドレスとポート番号をパースするコードが先ほどのものになります。

    # If IP or Port could not be obtained, exit with an error
    if ip is None or port is None:
        print("Failed to get IP and Port from Arduino CLI daemon.")
        sys.exit(1)

    # Return the address in the format "IP:Port"
    return process, f"{ip}:{port}"

ポート番号が取得できなければエラー、取得できたならバックグランドで実行しているプロセスと取得した情報を返却しています。

def main():
    # Start Arduino CLI and get the gRPC server address
    process, target = arduino_cli()
    print("target      :" + target)

やっとmain関数まできました。最初にデーモンモードで起動をして、実行したarduino-cliの情報を表示しています。

    # Connect to the gRPC server using the obtained address
    with grpc.insecure_channel(
        target, options=[("grpc.max_receive_message_length", MAX_MESSAGE_LENGTH)]
    ) as channel:
        stub = commands_pb2_grpc.ArduinoCoreServiceStub(channel)

デーモンの情報を利用してgRPCで接続を行います。ここで最大受信メッセージサイズを指定しています。stubがレスポンスが返ってくるクラスとなります。

        # Request the version of Arduino CLI
        request = commands_pb2.VersionRequest()
        response = stub.Version(request)
        print("version     :" + response.version)

最初にarduino-cliのバージョン番号を取得してみます。commands.protoで宣言されていますのでcommands_pb2のVersionRequest()を呼び出します。受信はstubのVersion(request)になります。

  // Get the version of Arduino CLI in use.
  rpc Version(VersionRequest) returns (VersionResponse) {}

ちなみにprotoファイルだと上記の指定になっています。VersionRequestを投げるとVersionResponseが返ってくる構造です。

message VersionRequest {}

message VersionResponse {
  // The version of Arduino CLI in use.
  string version = 1;
}

VersionRequestとVersionResponseは下の方に定義されており、VersionRequestはパラメーター無しで、VersionResponseは文字列でバージョン情報のみが戻ってきます。

        # Create a new Arduino CLI instance
        request = commands_pb2.CreateRequest()
        response = stub.Create(request)
        id = response.instance.id
        print("instance id :" + str(id))

arduino-cliのgRPCでは呼び出し順番が決まっており、バージョン情報以外はまずはCreateを呼び出す必要があります。この関数を呼び出すとinstance idが取得でき、今後の関数はすべてこのinstance idを引数として投げることとなります。

        # Initialize the created instance
        request = commands_pb2.InitRequest(instance={"id": id})
        for chunk in stub.Init(request):
            print("init        :" + str(chunk))

こちらが非常にハマったポイントです。まずCreateを呼び出したあとにInitで初期化をする必要があります。この初期化を除くと後続はすべて空の結果を返すことになります。エラーを返すの出ればわかりやすいのですが、空だと何が悪いのかの原因特定が難しいです。

  // Initializes an existing Arduino Core instance by loading platforms and
  // libraries.
  rpc Init(InitRequest) returns (stream InitResponse) {}

また、protoファイルをみてみると戻り値がstreamのInitResponseになります。ストリーミングでの戻り値になるので、普通に受信すると初期化が完了していない状態で受信をして、後続の処理がすべて失敗します。ストリーミングが終わるまでforで待つことで初期化待ちをすることとなります。

ちなみにこの初期化ですがWindowsは非常に時間がかかります。コマンド実行でもWindowsは遅いのでこの初期化を毎回内部で実行しているのが原因だと思います。gRPCを利用すると初期化は最初に一度だけ実行して、後続は連続して実行できるようになるのでバッチ処理でもgRPCを利用するメリットはあるかもしれません。

        # List all available boards
        print("board list  :")
        request = board_pb2.BoardListAllRequest(instance={"id": id})
        response = stub.BoardListAll(request)
        for board in response.boards:
            print(board.name.ljust(50) + " " + board.fqbn)

つぎにやっとボード一覧の取得になります。commands.protoではなくboard.protoで定義されている関数のため、呼び出しクラスも異なります。こちらは非常にでかいデータが帰ってきますがストリーミングではなく、通常の呼び出しとなっています。

        # Terminate the arduino-cli daemon process
        process.terminate()

最後にデーモンモードで起動したarduino-cliを終了させます。Windows上のPythonだとスクリプトが終了してもsubprocessで起動したarduino-cliが終了しなかったので、明示的に終了をさせる必要がありました。

if __name__ == "__main__":
    # Entry point of the script
    main()

こちらは単独で実行された場合にはmain()関数を実行するとの定型処理となります。

実行結果

exec        :/mnt/c/tmp/arduino-cli-grpc-test/arduino-cli
target      :127.0.0.1:38723
version     :1.2.2
instance id :1
board list  :
ESP32P4 Dev Module                                 esp32:esp32:esp32p4
ESP32H2 Dev Module                                 esp32:esp32:esp32h2
ESP32C6 Dev Module                                 esp32:esp32:esp32c6
ESP32S3 Dev Module                                 esp32:esp32:esp32s3
ESP32C3 Dev Module                                 esp32:esp32:esp32c3
(以下省略)

上記のような実行結果になりました。

まとめ

やっている処理は単純なのですが、情報がすくなくて苦労しました。Windows環境の場合Initが結構時間かかるので、複数の処理をまとめて実行するとかであればgRPCは便利そうです。

本当はVSCodeとかでgRPC経由でビルドしてくれる拡張とかがあれば便利なのですがあまり活用できている事例が少ないようでした。

コメント