Pytest で mocker
や freezer
使うサンプルを紹介します.
はじめに
ここ最近 Pytest でテストコードをずっと書いていたのですが,自分の用途で参考になるようなコードがなかなか無かったので,自分なりにまとめてみました.
コード全体は下記に置いてあります.
関数の返値の置き換え
テスト対象
テスト対象として使用するのは,以下の a.py
と b.py
.値を返すだけのシンプル関数です.
1 2 |
def a_func(): return "called a" |
1 2 3 4 5 6 7 8 9 |
import a from a import a_func def b_func(): return a.a_func() def b_func2(): return a_func() |
固定値で置き換え
常に特定の値を返すように関数を置き換えます.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
def test_mock_return_value_1(mocker): import a # 置き換え前 assert a.a_func() == "called a_func" # 「TEST」を返すように置き換え mocker.patch("a.a_func", return_value="T") # 置き換え後 assert a.a_func() == "T" assert a.a_func() == "T" assert a.a_func() == "T" |
呼び出しの度に変化させる
呼び出しの度に異なる値を返すように関数を置き換えます.side_effect
で指定した配列サイズ以上の回数呼び出すとエラーになってしまいますので,使いどころは限られるかもしれません.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
def test_mock_return_value_2(mocker): import a # 置き換え前 assert a.a_func() == "called a_func" # 「T1」~「T3」を順次返すように置き換え mocker.patch("a.a_func", side_effect=["T0", "T1", "T2"]) # 置き換え後 assert a.a_func() == "T0" assert a.a_func() == "T1" assert a.a_func() == "T2" # もう1回追加で呼び出すとエラーになる |
呼び出し回数で変化させる
前の例は,関数の呼び出し回数が有限でしたが,side_effect
の引数を関数にすると,柔軟に返値を変えられるようになります.関数は引数を受け取れるので,引数に応じて値を変えることも簡単にできます.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
def test_mock_return_value_3(mocker): import a # 置き換え前 assert a.a_func() == "called a_func" # 「T1」~「T3」を順次返すように置き換え def a_mock(): a_mock.i += 1 return "T{i}".format(i=a_mock.i % 3) a_mock.i = 2 mocker.patch("a.a_func", side_effect=a_mock) # 置き換え後 assert a.a_func() == "T0" assert a.a_func() == "T1" assert a.a_func() == "T2" assert a.a_func() == "T0" |
特定の関数から呼び出されたときのみ変化させる
テストのカバレッジを確保する際に結構役立つのがこちらの方法.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
def test_mock_return_value_4(mocker): import a import b # 置き換え前 assert a.a_func() == "called a_func" assert b.b_func() == "called a_func" import inspect from a import a_func # オリジナルを import def a_mock(): caller = inspect.stack()[4].function if caller == "b_func": return "T0" else: return a_func() mocker.patch("a.a_func", side_effect=a_mock) # 置き換え後 assert a.a_func() == "called a_func" assert b.b_func() == "T0" |
例外を投げる
値を返すのではなく,例外を投げる場合は side_effect
に例外を指定します.
1 2 3 4 5 6 7 8 9 10 |
def test_mock_return_value_5(mocker): import a # 置き換え前 assert a.a_func() == "called a_func" mocker.patch("a.a_func", side_effect=RuntimeError) with pytest.raises(RuntimeError): a.a_func() |
注意点
from x import y
という形で import
された関数を置き換える場合は注意が必要です.以下のように, import
した後の関数を指定して置き換える必要があります.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
def test_mock_return_value_6(mocker): import a import b # 置き換え前 assert a.a_func() == "called a_func" assert b.b_func() == "called a_func" assert b.b_func2() == "called a_func" mocker.patch("a.a_func", return_value="T") # 置き換え後 assert a.a_func() == "T" assert b.b_func() == "T" assert b.b_func2() == "called a_func" # 置き換わらない mocker.patch("b.a_func", return_value="T2") assert a.a_func() == "T" assert b.b_func() == "T" assert b.b_func2() == "T2" |
インスタンスの置き換え
テスト対象
テスト対象として使用するのは以下の c.py
.プロパティとメソッドを各1個持つシンプルなクラスです.
1 2 3 4 5 6 7 8 9 10 |
class C: def __init__(self): self.prop_ = "prop C" @property def prop(self): return self.prop_ def c_func(self): return "called C.c_func" |
メソッドとプロパティの置き換え
メソッドとプロパティでやり方が違うので注意.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
def test_mock_object_1(mocker): import c c_obj = c.C() assert c_obj.c_func() == "called C.c_func" assert c_obj.prop == "prop C" mocker.patch.object(c.C, "c_func", return_value="T") mocker.patch.object(c.C, "prop", new_callable=mocker.PropertyMock, return_value="prop T") c_obj = c.C() assert c_obj.c_func() == "T" assert c_obj.prop == "prop T" |
時刻の置き換え
時刻を操ってテストする方法を紹介します.
テスト対象
テスト対象として使用するのは,schedule
モジュールを使って Cron のように指定時刻にジョブを実行するコード.
scheduler
で 1分後にジョブを実行するようにスケジューリングし,1分後に実行されたことをチェックしています.
テストの実行に1分かかってしまっているので,これを FreezeGun を使って時刻を操作することで短縮します.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
scheduler_terminate = False def schedule_worker(): import schedule import time global scheduler_terminate while True: if scheduler_terminate: break schedule.run_pending() time.sleep(0.1) # NOTE: 時間の操作 - scheduler モジュール def test_schedule_1(): import schedule import datetime import time import threading import pytz global scheduler_terminate scheduler_terminate = False thread = threading.Thread(target=schedule_worker) thread.start() job_hist = [] def job(): job_hist.append(".") schedule.clear() tz = datetime.timezone(datetime.timedelta(hours=+9), "JST") time_str = ((datetime.datetime.now(tz) + datetime.timedelta(minutes=+1))).strftime("%H:%M") schedule.every().day.at(time_str, pytz.timezone("Asia/Tokyo")).do(job) assert schedule.idle_seconds() <= 60 time.sleep(60) assert job_hist == ["."] scheduler_terminate = True thread.join() |
時刻の操作
時刻を 12:00 にセットしたところで,スケジューリングを行い,その直後に時刻を 12:01 に進めてジョブが実行されたことを確認しています.
time.sleep(0.2)
は schedule
モジュールや schedule_worker
スレッドが処理を進めるのを待つためのウェイトです.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
def test_schedule_2(freezer): import schedule import datetime import time import threading global scheduler_terminate scheduler_terminate = False thread = threading.Thread(target=schedule_worker) thread.start() job_hist = [] def job(): job_hist.append(".") schedule.clear() freezer.move_to(datetime.datetime.now().replace(hour=12, minute=0, second=0)) schedule.every().day.at("12:01").do(job) assert schedule.idle_seconds() <= 60 time.sleep(0.2) freezer.move_to(datetime.datetime.now().replace(hour=12, minute=1, second=0)) time.sleep(0.2) assert job_hist == ["."] scheduler_terminate = True thread.join() |
注意点
multiprocessing.Queue
を使っているコードで FreezeGun を使うと,Queue 操作の際に以下のようなエラーが発生することがあります.
1 2 3 4 5 6 7 8 9 10 11 12 |
if not queue.empty(): File "/usr/lib/python3.10/multiprocessing/queues.py", line 129, in empty return not self._poll() File "/usr/lib/python3.10/multiprocessing/connection.py", line 257, in poll return self._poll(timeout) File "/usr/lib/python3.10/multiprocessing/connection.py", line 424, in _poll r = wait([self], timeout) File "/usr/lib/python3.10/multiprocessing/connection.py", line 931, in wait ready = selector.select(timeout) File "/usr/lib/python3.10/selectors.py", line 416, in select fd_event_list = self._selector.poll(timeout) OverflowError: timeout is too large |
そのため,Queue 操作の際には OverflowError
をキャッチするようにしておく必要があります.
応用編
特定のユースケースでの記述例について紹介します.
open の置き換え
特定のファイルの時のみ,MagicMock を返すように組み込み関数の open を置き換えます.
このサンプルではやっていませんが,MagickMock に対して追加設定を行うことで任意のデータを読み出せる架空のファイルとかが作れるようになります.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
def test_mock_open_1(mocker): import builtins import string import random # このファイル名が指定されたときに置き換えることにする target_file = "".join(random.sample(string.ascii_lowercase, 10)) open_orig = builtins.open # オリジナルの関数を保存する file_mock = mocker.MagicMock() file_mock.read.return_value = "T" def open_mock( file, mode="r", buffering=-1, encoding=None, errors=None, newline=None, closefd=True, opener=None, ): # ファイル名が一致する場合のみ置換 if file == target_file: return file_mock else: return open_orig(file, mode, buffering, encoding, errors, newline, closefd, opener) mocker.patch("builtins.open", side_effect=open_mock) # 存在しないファイルを読み書き f = open(target_file) f.write("T") assert f.read() == "T" # ファイル名がターゲットと異なる場合はちゃんとエラーが出る with pytest.raises(FileNotFoundError): f = open(target_file + "_") |
open の置き換え (context manager 使用)
with open():
に対応する場合は以下のようにします.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
def test_mock_open_2(mocker): import builtins import string import random # このファイル名が指定されたときに置き換えることにする target_file = "".join(random.sample(string.ascii_lowercase, 10)) file_mock = mocker.MagicMock() file_mock.__enter__.return_value.read.return_value = "T" open_orig = builtins.open def open_mock( file, mode="r", buffering=-1, encoding=None, errors=None, newline=None, closefd=True, opener=None, ): if file == target_file: return file_mock else: return open_orig(file, mode, buffering, encoding, errors, newline, closefd, opener) mocker.patch("builtins.open", side_effect=open_mock) # NOTE: 存在しないファイルを読み書き with open(target_file) as f: f.write("T") assert f.read() == "T" |
requests.get の置き換え
requests には,レスポンスデータのモデルを生成できる requests.models.Response()
という関数があるので,これを利用するのがオススメです.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
def test_mock_request(mocker): import requests res = requests.models.Response() res._content_consumed = True res._content = b'{"T": true}' res.encoding = "utf-8" res.status_code = 200 mocker.patch("requests.get", return_value=res) assert requests.get("DUMMY").status_code == 200 assert requests.get("DUMMY").text == '{"T": true}' assert requests.get("DUMMY").json() == {"T": True} |
環境変数の設定
autouse=True
の fixture
を使うことで,全てのテストの前に処理を走らせることができます.これを利用すると,全てのテストの実行時に環境変数をセットすることができます.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
@pytest.fixture(scope="session", autouse=True) def env_mock(): with mock.patch.dict( "os.environ", { "TEST": "true", }, ) as fixture: yield fixture # NOTE: 環境変数のセット def test_env(mocker): import os assert os.environ.get("TEST", "NONE") == "true" |
Slack 通知の無効化
先ほどと同じ仕組みを利用し,次のようにするとテスト実行時における Slack 通知を無効化できます.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
@pytest.fixture(scope="session", autouse=True) def slack_mock(): with mock.patch( "slack_sdk.web.client.WebClient.chat_postMessage", retunr_value=True, ) as fixture: yield fixture def test_mock_slack(mocker): import slack_sdk client = slack_sdk.WebClient(token="DUMMY") client.chat_postMessage(channel="DUMMY", text="DUMMY") |
コメント