InfluxDB をバージョンアップする際の Python API の以降について紹介します.
はじめに
時系列データベースとしてよく使われる InfluxDB はバージョン 2.0 でデータベースの構造もアクセス方法も大きく変更になっています.今のところ日本語での情報があまりみあたらないので,簡単に移行のポイントについて紹介したいと思います.
変更概要
変更があった項目は,大きく次の3点です.
- データベースアクセスに必要なパラメータ
- クエリ言語
- Python API
以降で,順に説明します.
データベースアクセスに必要なパラメータ
InfluxDB 1.x でデータベースアクセスに最低限必要だったのは,次の2項目でした.
- サーバアドレス
- データベース名
認証を付けている場合,ユーザ名やパスワードも必要だったかもしれません.
これに対して,InfluxDB 2.x では次の4項目が必要になります.
- URL
- アクセストークン
- バケット (Bucket)
- 組織 (Organization)
大きな変化点としては,アクセストークンが必須になっていることです.アクセストークンは,InfluxDB にブラウザでアクセスして「↑」マークのボタン,「API Tokens」をクリックすると生成できます.
「バケット」は,InfluxDB 1.x のデータベース名に相当するものです.「組織」は InfluxDB を初期化したときに指定したものを使います.
クエリ言語
InfluxDB 1.x ではデータの取得に SQL に似た InfluxQL を使っていたと思いますが,InfluxDB 2.x では Flux という言語に置き換わっています.
例えば,period
期間中の,tag
というタグがついている,ホスト名が hostname
の labe
のデータを取得する場合,InfluxDB 1.x では次のようなクエリを使っていたと思います.
1 |
SELECT mean("{label}") FROM "{tag}" WHERE ("hostname" = \'{hostname}\') AND time >= now() - {period} GROUP BY time(3m) |
これに対し,InfluxDB 1.x で次のようにします.クエリの中で,対象のデータベースである bucket
も指定します.
1 2 3 4 5 6 |
from(bucket: "{bucket}") |> range(start: -{period}) |> filter(fn:(r) => r._measurement == "{tag}") |> filter(fn: (r) => r.hostname == "{hostname}") |> filter(fn: (r) => r["_field"] == "{label}") |> aggregateWindow(every: 3m, fn: mean, createEmpty: false) |
最初は見慣れず戸惑うかもしれませんが,ストリームに対して加工していく感じで自然に記述できるので,慣れてくると下記のドキュメントをみながらサクサク書けるようになると思います.
Python API
InfluxDB が 2.x になると Python でアクセスするときに使用するライブラリも変更が必要になります.
InfluxDB 1.x のときは InfluxDBClient を使っていたと思いますが,InflixDB 2.x からは influxdb_client を使うことになります.
Ubuntu 22.04 の場合,influxdb_client は Apt にはないので,下記のようにして pip ~インストールします.
1 |
% pip3 install 'influxdb-client[ciso]' |
ぞれぞれのライブラリで同じようにデータを取得するコードを紹介します.
InfluxDBClient を使って InfluxDB 1.x にアクセスする場合はこんな感じ.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
from influxdb import InfluxDBClient import datetime import dateutil.parser INFLUXDB_QUERY = """ SELECT mean("{label}") FROM "{tag}" WHERE ("hostname" = \'{hostname}\') AND time >= now() - {period} GROUP BY time(3m) """ def fetch_data(config, tag, hostname, label, period='30h'): client = InfluxDBClient( host=config['ADDR'], port=config['PORT'], database=config['DB'] ) result = client.query(INFLUXDB_QUERY.format( tag=tag, hostname=hostname, label=label, period=period) ) data = list(map(lambda x: x['mean'], result.get_points())) localtime_offset = datetime.timedelta(hours=9) time = list(map(lambda x: dateutil.parser.parse(x['time'])+localtime_offset, result.get_points())) return { 'value': data, 'time': time, 'valid': len(time) != 0 } |
対して,influxdb_client を使って InfluxDB 2.x にアクセスする場合は次のようになります.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
import influxdb_client import datetime FLUX_QUERY = """ from(bucket: "{bucket}") |> range(start: -{period}) |> filter(fn:(r) => r._measurement == "{tag}") |> filter(fn: (r) => r.hostname == "{hostname}") |> filter(fn: (r) => r["_field"] == "{label}") |> aggregateWindow(every: 3m, fn: mean, createEmpty: false) """ def fetch_data(config, tag, hostname, label, period="30h"): query = FLUX_QUERY.format( bucket=config["BUCKET"], tag=tag, hostname=hostname, label=param, period=period, ) client = influxdb_client.InfluxDBClient( url=config["URL"], token=config["TOKEN"], org=config["ORG"] ) query_api = client.query_api() table_list = query_api.query(query=query) data = [] time = [] localtime_offset = datetime.timedelta(hours=9) if len(table_list) != 0: for record in table_list[0].records: data.append(record.get_value()) time.append(record.get_time() + localtime_offset) return {"value": data, "time": time, "valid": len(time) != 0} |
コメント