The Negligible Lab

Astray in the forest of electrical and electronic circuits. Adrift in the gap between time and frequency domains. 独立独歩を是とする。

Raspberry Piで地震計を作る ② リアルタイム処理編

はじめに

前回と今回で,Raspberry Piを用いたマイ地震計について書いております。前回は加速度の時系列データから計測震度を計算するPythonプログラムについて述べました。

前回挙げた開発課題を再掲します。

  1. 震度計算プログラムの実装
  2. システムコールsetitimerを活用したタイマ割り込み
  3. マルチプロセッシングとキュー
  4. タイマ割り込みとマルチプロセッシングの組み合わせ
  5. 加速度センサからの加速度取得
  6. OLEDディスプレイへの表示
  7. マイ地震計全体のプログラムの組み立て

今回は,上記2,3,4にあたる処理をPythonにて作って行きます。これは,マイ地震計を作るために必要なリアルタイム処理です。

目次

システムコールsetitimerを活用したタイマ割り込み

setitimerとSIGALRM

さて,加速度の帯域を50 Hzまで確保しようとすると,加速度センサから10 ms毎(100 Hz)にデータを取得する必要があります。気象庁が公開しているCSVファイルも同じく10 ms毎にサンプリングされた加速度を記載しています。

できる限り正確に10 msの周期を刻みながら一定の作業を繰り返すようなプログラムを作ろうと思うと,time.sleep関数などではうまく行かず,いわゆるタイマ割込みを使うのが定石だと思われます。Raspberry PiPythonでタイマ割り込みを作る方法については,POPOさんの下記のブログ記事を参考にさせて頂きました。

rc30-popo.hatenablog.com

仕組みとしては,Linuxシステムコールsetitimerを用いてインターバルタイマを作り,インターバルタイマがカウント終了する度にSIGALRMシグナルをプロセスに送ってもらいます。プロセスの側ではSIGALRMシグナルを受け取ったら,特定の関数が割り込みで実行されるようにシグナルハンドラとして設定しておく,という形になります。

Linuxシステムコールというと「C言語!」というイメージでしたが,Pythonでももちろん取り扱うことができます。筆者は,青木峰郎さんの「ふつうのLinuxプログラミング 第2版」(SBクリエイティブ,2017年)でC言語をベースに勉強させて頂いておりましたが,ここで役に立てることができました。

www.sbcr.jp

プログラム例と誤差の評価

setitimerとSIGALRMを用いたタイマ割込みのプログラム例を作ってみます。Pythonではsignalモジュールをimportすることで,setitimerの呼び出しとシグナルハンドラの設定が可能となります。

import time
import signal

TIMER = 0.01 # [s], interval to call handler

# Signal handler
def handler(signum, frame):
    global t_old
    t = time.time()
    t_diff = t - t_old
    print(t_diff)
    t_old = t

# Main routine
if __name__ == '__main__':

    # Set old timing as current time
    t_old = time.time()  # ---- (*1)

    # Set signal handler for SIGALRM and set interval timer
    signal.signal(signal.SIGALRM, handler)
    signal.setitimer(signal.ITIMER_REAL,  TIMER, TIMER)

    # Do nothing anymore
    while True:
        time.sleep(1)

21行目でsignal.signal(signal.SIGALRM, handler)としてhandler関数をSIGALRMシグナルのハンドラとして設定します。22行目のsignal.setitimer(signal.ITIMER_REAL, TIMER, TIMER)では,Linuxシステムコールsetitimerを呼び出し,実時間(signal.ITIMER_REAL)で10 ms毎にSIGALRMシグナルが発生するように設定します。以降,メインルーチンは何もしないので,while True:の無限ループに入ります。

シグナルハンドラとなったhandler関数では,現在のUNIX時刻をtとして取得し,かつ,以前に呼ばれた際のUNIX時刻t_oldとの差t_diffを求めて表示します。

もし,タイマ割込みの周期が正確に10 msであれば,ターミナルには0.0100000という数値がひたすら並ぶことでしょう。実際には誤差がありますので,表示される数値はバラつくはずです。Raspberry Pi 3 Model A+で実際にやってみると,下記のような数値が羅列されます。

0.010210275650024414
0.009965181350708008
0.00999903678894043
0.010001659393310547
0.009999752044677734
0.009999752044677734
0.009999990463256836
0.009999990463256836
0.009999752044677734
0.010000228881835938
0.009999990463256836
0.009999990463256836
0.009999752044677734
0.010000228881835938
(以下略)

図8にhandler関数1,000回分の試行結果をヒストグラムとして描いてみます。ただし,1行目は18行目(*1)で設定したt_oldを使った計算をしているので無視しています。

f:id:s-inoue2010:20211116164241p:plain
図8: PythonからsetitimerとSIGALRMを用いたタイマ割込み間隔のヒストグラム(Raspberry Pi 3 Model A+)

バラツキが小さいので縦軸を対数としました。9.97 ms ~ 10.03 msの間に99.5%が集中しており,極めて正確に10 ms毎の定期実行を実現できていることが判ります。

マルチプロセッシングとキュー

マルチプロセッシングの動機

前述のタイマ割込みにて10 ms毎に加速度をサンプリングしますが,加速度データが300点集まった時点で(つまり,3秒毎に)短時間計測震度を計算します。計測震度の計算とその後の標準出力やOLEDディスプレイへの情報表示にミリ秒以上の時間を要することは確実で,場合によっては10 msを超えるかもしれません。すると,次の加速度のサンプリングとタイミングが重なってしまうかもしれません。

そのような状況下で,10 msのタイマ割込みを可能な限り正確に継続しようとすると,加速度のサンプリングと計測震度計算を別のプロセスとすることが妥当かと思われます*1

Pythonにも他のプログラミング言語と同様に,マルチプロセッシングを実現する仕組みがあります。ここでは,(筆者の勉強のため)マルチプロセッシングのプログラム例を作ってみます。

子プロセスを作るプログラム例

筆者の頭脳はとにかく貧弱なので,簡単すぎる最小構成例でなければ理解できません。そこで,子プロセスを作って実行するだけのプログラム例を示します。マルチプロセッシングの機能を使うには,multiprocessingモジュールをimportします。

import multiprocessing as mp
import os
import time

# Function run as child process
def proc(message):

    # Show process ID
    pid = os.getpid()
    print(f'This is child process, PID: {pid}')

    # Show message passed from parent process as argument
    print(f'Message from parent process: {message}')

    # Wait for 2 seconds
    time.sleep(2)

# Main routine run as parent process
if __name__ == '__main__':

    # Show process ID 
    pid = os.getpid()
    print(f'This is parent process, PID: {pid}')

    # Create and start child process
    p = mp.Process(target = proc, args = ('Hello world.',))
    p.start()

    # Wait for child process to end
    p.join()
    print('Child process has ended.')

    # End program, parent process
    print('Parent process ends.')

6行目以降で子プロセスとして呼ばれるproc関数を定義します。

19行目以降,メインルーチンは親プロセスとなり,プログラムはここから始まります。22,23行目で親プロセスのプロセスIDを表示します。26行目でp = mp.Process(target = proc, args = ('Hello world.', ))として,子プロセスとなるproc関数を設定します。親プロセスからはpという変数を通じて子プロセスを操作します(プロセスもオブジェクトして扱われるので,pは子プロセスのインスタンスです)。また,argsにてタプルとして引数を渡すことができ,ここでは文字列'Hello world.'を渡しています。次の行でpにstartメソッドを適用して子プロセスを開始します。

子プロセスは9,10行目にて自分のプロセスIDを表示します。親プロセスのプロセスIDとは異なっているはずです。また,引数として受け取ったmessageとして文字列'Hello world.'を表示します。最後に16行目で子プロセスは2秒間待機します。

親プロセスでは30行目のp.join()にて子プロセスが終了するのを待ち,その後,プログラム全体が終了します。

このプログラムを実行すると,下記のように出力されます。

This is parent process, PID: 27625
This is child process, PID: 27626
Message from parent process: Hello world.
Child process has ended.
Parent process ends.

親プロセスと子プロセスでは想定通りプロセスIDが異なっていますね。また,子プロセス(proc関数)では親プロセスから受け取った'Hello world.'を表示することができています。

キューでデータを受け渡すプログラム例

マルチプロセッシングで子プロセスを作ると,子プロセスのメモリ空間は親プロセスとは独立となります*2。したがって,親プロセスで作っておいたグローバル変数を子プロセス側で書き換えても,もはや別プロセスとなっている親プロセス側には影響しません。これは,プロセス間で意図しないメモリの破壊を起こさないという点でメリットでもありますが,プロセス間の通信には特別な仕組みが必要と言うことになります。その仕組みの1つがキュー(queue)です。

キューを使って子プロセス側で生成したオブジェクトを親プロセスに渡すプログラム例を示します。

import multiprocessing as mp
import os
import time

# Function run as child process
def proc(q, message):  # ----(*1)

    # Show process ID
    pid = os.getpid()
    print(f'This is child process, PID: {pid}')

    # Show message passed from parent process as argument
    print(f'Message from parent process: {message}')

    # Wait for 2 seconds
    time.sleep(2)

    # Create an object in child process and put it into queue ----(*2)
    i = 100
    print(f'i in child process to be put it into queue: {i}')
    q.put(i)

# Main routine run as parent process
if __name__ == '__main__':

    # Show process ID
    pid = os.getpid()
    print(f'This is parent process, PID: {pid}')

    # Define queue
    q = mp.Queue()  # ----(*3)

    # Create and start child process
    p = mp.Process(target = proc, args = (q, 'Hello world.',))  # ----(*4)
    p.start()

    # Get object from queue
    j = q.get()  # ----(*5)
    print(f'j in parent process, got out of queue: {j}')

    # Wait for child process to end
    p.join()
    print('Child process has ended.')

    # End program, parent process
    print('Parent process ends.')

前節のプログラムから(*1) ~ (*5)の5ヶ所が追加・変更されています。まず,親プロセス側の31行目(*3)にてキューのインスタンスqを作成します。33行目(*4)にて子プロセスにproc関数を設定するのですが,引数にqを追加しています。また,6行目(*1)のようにqをproc関数の仮引数に追加しています。

子プロセス側では,18行目以降(*2)にて変数iに整数100を代入し,これをq.put(i)としてキューに置いています。親プロセス側では37行目(*5)にてj = q.get()として,変数jにキューに置かれた整数100を受け取っています。

このプログラムを実行すると,下記のように出力されます。

This is parent process, PID: 28238
This is child process, PID: 28239
Message from parent process: Hello world.
i in child process to be put it into queue: 100
j in parent process, got out of queue: 100
Child process has ended.
Parent process ends.

子プロセスで生成されたi = 100という値が親プロセスにj = 100としてプロセス間通信されました。このi = 100の代わりに加速度センサから取得したデータを入れてあげれば,子プロセスから親プロセスに加速度データを渡すことができそうです。キューはFIFOバッファとなっているので,親プロセスでq.get()されるまで溜まり続け,古いものからq.get()で取り出されます。

タイマ割込みとマルチプロセッシングの組み合わせ

どのように組み合わせるか

これまでに述べたタイマ割込みとマルチプロセッシングを組み合わせることで,タイマ割込みによる処理を子プロセスとして独立させます。これにより,子プロセスで加速度データを取得しつつ,メインルーチン(親プロセス)で計測震度を計算するような並列処理を作れます。

割込みルーチン,つまり,SIGALRMシグナルのハンドラとなるのはdef文で定義される1つの関数です。また,子プロセスを作る際にtargetとして指定するのもdef文で定義される1つの関数です。今回の各プログラム例では,それぞれhandler関数とproc関数と名付けています。この2つの関数をどう組み合わせるかで少し悩みました。

いろいろ試行錯誤しましたが,結論としてはhandler関数をproc関数の関数内関数として定義すると,handler関数内からproc関数内の変数にも,もちろんグローバル変数*3にもアクセスできるため,使い易いのではないかと筆者は考えました*4。ある意味,子プロセスの中ではproc関数が言わば「メインルーチン」となっていると見なせば,自然かと思われます。

プログラム例

具体的に,プログラム例を作ってみます。なお,これまでのプログラム例にあったプロセスIDを表示する箇所や,文字列'Hello world.'を親プロセスから子プロセスに渡す部分を省いています。

import multiprocessing as mp
import time
import signal

# Function run as child process
def proc(q):

    # Define signal handler as nested function
    def handler(signum, frame):  # ----(*1)
        nonlocal i  # ----(*2)
        print(f'i in signal handler, in child process: {i}')
        q.put(i)
        i += 1

    # Set signal handler for SIGALRM and set interval timer ----(*3)
    i = 100
    signal.signal(signal.SIGALRM, handler)
    signal.setitimer(signal.ITIMER_REAL, 0.5, 0.5)  # Every 1 second

    # Do nothing anymore
    while True:
        pass

# Main routine run as parent process
if __name__ == '__main__':

    # Define queue
    q = mp.Queue()

    # Create and start child process
    p = mp.Process(target = proc, args = (q,))
    p.start()

    # Get object from queue, 10 times
    for _ in range(10):  # ----(*4)
        j = q.get()
        print(f'j in parent process, get out of queue: {j}')

    # Terminate child process
    p.terminate()  # ----(*5)
    print('Terminated child process.')

    # End program, parent process
    print('Parent process ends.')

この例では,子プロセスの中の15行目以降(*3)で,変数iを100として定義し,シグナルハンドラとsetitimerを設定しています。

9行目(*1)のdef文でシグナルハンドラとなるhandler関数を定義していますが,これはproc関数の関数内関数であり,言い換えればdef文がネストされています。このようにすると,10行目(*2)のnonlocal文でproc関数内の変数iを書き換えられるようになります。

もし,handler関数がproc関数の中に入っておらず,__main__内に同格の関数としてdef文で定義されいてた場合,handler関数内からproc関数の中で作った変数を参照したり変更したりすることはできません。というのも,シグナルハンドラとなる関数にはsigumとframeの2つの引数しか持たせることができないため,proc関数の中で作った変数をhandler関数に渡すことができないためです*5

メインルーチン(親プロセス)ではキューを介してproc関数内のhandler関数からオブジェクトを受け取ります。ここでは子プロセス内のタイマ割込みによってhandler関数内で100からカウントアップしていく整数です。35行目(*4)のfor文のようにキューから変数jとしてオブジェクトを受け取ることを10回繰り返し,その後,40行目(*5)のように,子プロセスを停止(terminate)します*6。ここでは10回としましたが,マイ地震計としては,加速度または計測震度が小さくなって地震が終わったと見なした際に,p.terminate()をすれば,加速度の定周期サンプリングを終了できます。

このプログラムを実行すると,下記のように出力されます。

i in signal handler, in child process: 100
j in parent process, get out of queue: 100
i in signal handler, in child process: 101
j in parent process, get out of queue: 101
i in signal handler, in child process: 102
j in parent process, get out of queue: 102
i in signal handler, in child process: 103
j in parent process, get out of queue: 103
i in signal handler, in child process: 104
j in parent process, get out of queue: 104
i in signal handler, in child process: 105
j in parent process, get out of queue: 105
i in signal handler, in child process: 106
j in parent process, get out of queue: 106
i in signal handler, in child process: 107
j in parent process, get out of queue: 107
i in signal handler, in child process: 108
j in parent process, get out of queue: 108
i in signal handler, in child process: 109
j in parent process, get out of queue: 109
Terminated child process.
Parent process ends.

整数が100から109まで10回カウントアップされ,子プロセスから親プロセスにキューを介して渡され(プロセス間通信され)ていることが判ります。

まとめ

以上でタイマ割込みと,マルチプロセッシング,その組み合わせをPythonで実現方法について,プログラム例を作りながら述べました。これによって,マイ地震計に必要となるリアルタイム処理を実現できるようになります。特に,最後に示したプログラム例は,マイ地震計全体としてのプログラムの骨格となります。

このようなリアルタイム性を求められる処理をプログラミングしようとすると,やはり,OSと様々なやり取りをしなければならず,プログラミング言語の文法について知っているだけでなく,OSとのインターフェースについて知識が必要であると今回は痛感しました*7Linuxカーネルの作りについて,概略でも少しずつ勉強したいという思いです(ご安心ください。積読はしてありますよ!)。

*1:PythonにはGlobal Interpreter Lock, GILという仕組みがあり,マルチスレッディングでは,GILを取得している1つのスレッドしか同時に実行できないようです。

*2:マルチスレッディングでは同じプロセス,同じメモリ空間に留まります。

*3:ただし,前述の通り,書き換えても親プロセスには反映されません。

*4:とは言っても筆者はプログラミングのド素人なので,玄人の皆さまからの忌憚なきご意見をお待ちしております。

*5:スタックフレームオブジェクトframeをうまく活用すればできるのかもしれません。

*6:この方法は適切ではないかもしれませんが,筆者は他の方法を知りません…。

*7:なお,今回のプログラム例はWindows上のPythonでは動きません。シグナルやマルチプロセッシングの作りがLinux(をはじめとするUNIX系OS)と異なるためです。