FastAPIでWebサービスを保護してみる
FastAPIで作成するWebサービスに認証機能を追加してみましょう。
FastAPIは、すべてのセキュリティ仕様について学習する必要がなく、標準的な方法でセキュリティ処理を実装するためのついくつかのツールが提供されています。
FastAPIはOpenAPIに基づいていて、複数の自動インタラクティブドキュメントインターフェイス、コード生成などが可能になっています。
他にも次のような機能拡張が公開されています。
FastAPI-Login
単純なログイン認証機能であればこれで十分かもしれません。
FastAPI-Users
FastAPI-Usersは、FastAPIにJWT認証、Cookie認証の機能を提供します。Python3.7以降が必要。Pythonのバージョンに制限がなければこれをオススメします。 拡張可能なユーザモデル
ユーザ管理のCRUD操作APIを提供
サポートしているORM:SQLAlchemy, MongoDB, Tortorise ORM
FastApi-Admin
FastAPI-Security
FastAPI-Contrib
FastAPI-Contribは、再利用可能な便利な多数のライブラリを提供します。その中に認証や権限設定(アクセスコントロール)の機能が含まれます。サポートしているデータベースがMongoDBだけなのが欠点。 FastAPI-Login による実装
まずは、FastAPI-Login を使ったパスワード認証による 実装してみましょう。
インストール
code: bash
$ pip install fastapi-login
準備
アプリケーションのディレクトリを作成します。
code: bash
$ mkdir -p $HOME/fastapi/authdemo1
$ cd $HOME/fastapi/autodemo1
はじめにアクセスすると {"Hello": "World!"} を返すAPIを準備しましょう。
code: python
from fastapi import FastAPI
app = FastAPI()
@app.get('/')
def index():
return {"Hello": "World!"}
このビュー関数 index() を保護するようにしてみます。
JWT(JSON Web Token) で使用するシークレットキーを準備します。
code: bash
$ python -c 'import os; print(os.urandom(24).hex())'
8c135d24ed30d57f770967295653cc48adf3003ceedc95be
FastAPI-Login の初期化を行います。
code: python
from fastapi import FastAPI
from fastapi_login import LoginManager
SECRET = '8c135d24ed30d57f770967295653cc48adf3003ceedc95be'
app = FastAPI()
manager = LoginManager(SECRET, tokenUrl='/auth/token')
# ...
次に、LoginManagerにユーザーをロードする方法を提供する必要があります。 コールバック関数 user_loaderは、ユーザーオブジェクトまたはNoneを返す必要があります。
データベースからユーザ情報を取得してすることになる場合が多いのですが、ここでは単純にするため辞書型オブジェクトにユーザ情報を登録することにします。
code: python
fake_db = {'freddie@example.com': {'password': 'queen'}}
@manager.user_loader
def load_user(email: str):
user = fake_db.get(email)
@app.post('/auth/token')
def login(data: OAuth2PasswordRequestForm = Depends()):
email = data.username
password = data.password
user = load_user(email)
if not user:
raise InvalidCredentialsException
raise InvalidCredentialsException
access_token = manager.create_access_token(
data=dict(sub=email)
)
return {'access_token': access_token, 'token_type': 'bearer'}
これで、保護が必要なAPIではLoginManager()のインスタンスオブジェクトを依存性呼び出し(Depends)で与えます。
code: python
@app.get('/')
def index(user=Depends(manager)):
return {"Hello": "World!"}
認証せずにアクセスすると認証エラーが返されます。
code: bash
{
"detail": "Not authenticated"
}
code: bash
{
"access_token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJmcmVkZGllQGV4YW1wbGUuY29tIiwiZXhwIjoxNTkwODA1MDA4fQ.H-3m01oVAGhD6fdQcLKRto7wdoZEqOpo1ZsolxQbprA",
"token_type": "bearer"
}
code: bash
{
"access_token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJmcmVkZGllQGV4YW1wbGUuY29tIiwiZXhwIjoxNTkwODA1MDc2fQ.tSBbsoxt64WhmPqMis-Xng2RnmbqEYs2Ky0YoFamYRo",
"token_type": "bearer"
}
ここで取得したアクセストークンを ヘッダ Authorization: Bearer <ACESS_TOKEN> としてGETメソッドに与えると認証保護されたAPIにアクセスできるようになります。
code: bash
$ curl -s -X GET -H 'Authorization: Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJmcmVkZGllQGV4YW1wbGUuY29tIiwiZXhwIjoxNTkwODA1MDc2fQ.tSBbsoxt64WhmPqMis-Xng2RnmbqEYs2Ky0YoFamYRo' http://127.0.0.1:8080/ | python -m json.tool {
"Hello": "World!"
}
FastAPI-Login は Flast-Login と同じような設定方法で認証保護をAPIに追加することができることを確認できました。
FastAPI-Users による実装
FastAPI-Login は認証によるアクセス権限があるかどうかの処理は簡単になりますが、ユーザ情報の取得する処理は独自に作成する必要が’あります。これは、自由度があるので決して欠点ではありませんが、データベースにアクセスするような場合は開発工数が増えてしまいます。
そこで、FastAPI-Users を使ってみましょう。
FastAPI-Users には次のような機能があります。
拡張可能な基本的なユーザーモデル
すぐに使用できる登録、ログイン、パスワード忘れ、パスワードリセット
すぐに使えるOAuth2フロー
ユーザーをルーターに挿入するための依存性呼び出し可能オブジェクト
カスタマイズ可能なデータベースバックエンド
サポートしているORM:SQLAlchemy、mongodb、Tortoise
複数のカスタマイズ可能な認証バックエンド
JWT認証バックエンドが含まれています
Cookie認証バックエンド含まれています
複数の認証バックエンドがあっても、OpenAPIスキーマを完全にサポート
FastAPI-Users は Python3.7 以降が必要になります。
インストール
既にCONDA環境になっているのであれば抜けておきましょう。
code: bash
$ conda deactivate
conda環境を作ります。このとき python と curl をインストールしておきます。
code: bash
$ conda create -y -n fastapi_auth python=3.7 curl
$ conda activate fastapi_auth
FastAPIと関連パッケージをインストールしておきましょう。
code: bash
$ pip install fastapi uvicorn fastapi-users databases httpx_oauth aiosqlite
準備
アプリケーションのディレクトリを作成します。
code: bash
$ mkdir -p $HOME/fastapi/authdemo2
$ cd $HOME/fastapi/autodemo2
はじめにアクセスすると {"Hello": "World!"} を返すAPIを準備しましょう。
code: python
from fastapi import FastAPI
app = FastAPI()
@app.get('/')
def index():
return {"Hello": "World!"}
このビュー関数 index() を保護するようにしてみます。
データベースの準備
FastAPI-Users がサポートするORMは次のものです。
SQLAlchemy (databasesパッケージを利用)
MongoDB
Tortoise ORM
ここでは、SQLAlchemy を使ってユーザ情報を登録するようにしましょう。
ユーザモデル
FastAPI-Users は、認証のための最小限のユーザーモデルを提供します。
table: FastAPI-Usersのユーザモデルの構成フィールド
フィールド名 タイプ 説明
id UUID4 ユーザーの一意の識別子。デフォルトのタイプはUUID4です。
email STR ユーザーのメール。電子メールバリデーターによって検証されます。
is_active Bool ユーザー情報が有効かどうか。デフォルトはTrue
無効な場合ログインおよびパスワードを忘れの要求は拒否されます。
is_superuser Bool ユーザーが管理者権限を持つかどうか。デフォルトはFalseです。
管理ロジックの実装に役立ちます。
検証モデル
Pydantic モデルが4つ提供されます。
table: FastAPI-Usersの検証モデル
モデルクラス 説明
BaseUser 基本的なフィールドと検証を提供します
BaseCreateUser ユーザー登録専用の検証を提供します
Eメールを必須にし、必須のパスワードフィールドを追加します
BaseUpdateUser ユーザープロファイルの更新専用の検証を提供します
オプションのパスワードフィールドを追加します
BaseUserDSB データベース内のユーザーを表します
hashed_passwordフィールドを追加します
これらのモデルを継承して、検証モデルを定義する必要があります。
code: python
import databases
import sqlalchemy
from fastapi import FastAPI
from fastapi_users import models
from fastapi_users.db import SQLAlchemyBaseUserTable, SQLAlchemyUserDatabase
from sqlalchemy.ext.declarative import DeclarativeMeta, declarative_base
class User(models.BaseUser):
pass
class UserCreate(User, models.BaseUserCreate):
pass
class UserUpdate(User, models.BaseUserUpdate):
pass
class UserDB(User, models.BaseUserDB):
pass
DATABASE_URL = "sqlite:///./test.db"
database = databases.Database(DATABASE_URL)
Base: DeclarativeMeta = declarative_base()
class UserTable(Base, SQLAlchemyBaseUserTable):
pass
engine = sqlalchemy.create_engine(
DATABASE_URL, connect_args={"check_same_thread": False}
)
Base.metadata.create_all(engine)
users = UserTable.__table__
user_db = SQLAlchemyUserDatabase(UserDB, database, users)
app = FastAPI()
@app.on_event("startup")
async def startup():
await database.connect()
@app.on_event("shutdown")
async def shutdown():
await database.disconnect()
この例では、async def と awaite を使ってコルーチンの定義と呼び出して行っていますが、FastAPIは非同期処理をサポートしているためですが、今の時点ではこういうものだという理解でも問題ありません。
認証設定
FastAPI-Users は、次に認証方法をプラグインすることができます。
ブラウザベースのクエリのCookie認証
純粋なAPIクエリのJWT(JSON Web Token)認証。
FastAPI-Users が認証をチェックするときは、各メソッドが順次に実行され、最初にユーザーの権限を認めるメソッドがユーザを生成します。ユーザーを生成するメソッドがないときは、HTTPExceptionが発生します。
バックエンドごとのログイン/ログアウトの処理をルーターに追加できます。
JWTの設定
認証クラス JWTAuthenticationのインスタンス化はとても簡単で、トークンとトークンの存続時間(秒単位)をエンコードするために使用されるシークレットキーSECRETを定義するだけです。
code: pythonn
from fastapi_users.authentication import JWTAuthentication
SECRET = "YOU_HAVE_TO_CHANGE_THIS_SECRET_KEY"
auth_backends = []
jwt_auth = JWTAuthentication(secret=SECRET, lifetime_seconds=3600))
auth_backends.append(jwt_auth)
HTTPプロトコルと認証URI
FastAPI-Users では次のAPIが提供されています。
table: FastAPI-Users でのHTTPプロトコルと認証URIのマッピング
HTTPプロトコル 認証URI 説明
POST /login JWT認証でのログイン
POST /logout JWT認証でのログアウト
POST /register JWT認証でのユーザ登録
POST /forgot-password JWT認証でのパスワード忘れ
POST /reset-password JWT認証でのパスワードリセット
POST /authorize OAuth認証でのユーザ権限チェック
POST /callback OAuth認証でのコールバック処理
GET /me ログイン中のユーザ情報の取得
PATCH /me ログイン中のユーザ情報を更新
GET /{user_id} 指定したユーザIDのユーザ情報を取得
PATCH /{user_id} 指定したユーザIDのユーザ情報を更新
DELETE /{user_id} 指定したユーザIDのユーザ情報を削除
ルーター設定
code: python
from fastapi import FastAPI
from fastapi_users import FastAPIUsers
from fastapi_users.authentication import JWTAuthentication
SECRET = "SECRET"
jwt_auth = JWTAuthentication(secret=SECRET, lifetime_seconds=3600))
fastapi_users = FastAPIUsers(
user_db, jwt_auth, User, UserCreate, UserUpdate, UserDB, )
app = FastAPI()
app.include_router(
fastapi_users.get_auth_router(jwt_auth),
prefix="/auth/jwt",
)
ここで、tags は URIルーティングがOpenAPIに追加されてブラウザで参照できる自動ドキュメントーションでの分類に使用されます。
https://gyazo.com/2c41a677b743a19083f570af449f2fce
ソースコードの整理
いま時点で次のようなソースコードを用意します。
code: db.py
import sqlalchemy
import databases
from sqlalchemy.ext.declarative import declarative_base
from fastapi_users.db import SQLAlchemyUserDatabase
DATABASE_URL = "sqlite:///./test.db"
database = databases.Database(DATABASE_URL)
Base = declarative_base()
engine = sqlalchemy.create_engine(
DATABASE_URL, connect_args={"check_same_thread": False}
)
Base.metadata.create_all(engine)
code: models.py
import databases
import sqlalchemy
from fastapi_users import FastAPIUsers, models
from fastapi_users.db import (
sqlalchemy, SQLAlchemyBaseUserTable, SQLAlchemyUserDatabase
)
from sqlalchemy.ext.declarative import DeclarativeMeta, declarative_base
from db import Base, database
class User(models.BaseUser):
pass
class UserCreate(User, models.BaseUserCreate):
pass
class UserUpdate(User, models.BaseUserUpdate):
pass
class UserDB(User, models.BaseUserDB):
pass
class UserTable(Base, SQLAlchemyBaseUserTable):
pass
users = UserTable.__table__
user_db = SQLAlchemyUserDatabase(UserDB, database, users)
code: app.py
from fastapi import FastAPI, Request, Depends
from fastapi_users import FastAPIUsers, models
from fastapi_users.authentication import JWTAuthentication
from fastapi_users.db import SQLAlchemyBaseUserTable, SQLAlchemyUserDatabase
from sqlalchemy.ext.declarative import DeclarativeMeta, declarative_base
from models import (
User, UserCreate, UserUpdate, UserDB, UserTable, users, user_db
)
from db import Base, engine, database
SECRET = "SECRET"
def on_after_register(user: UserDB, request: Request):
print(f"User {user.id} has registered.")
def on_after_forgot_password(user: UserDB, token: str, request: Request):
print(f"User {user.id} has forgot their password. Reset token: {token}")
jwt_auth = JWTAuthentication(secret=SECRET, lifetime_seconds=3600)
app = FastAPI()
fastapi_users = FastAPIUsers(
user_db, jwt_auth, User, UserCreate, UserUpdate, UserDB, )
app.include_router(
fastapi_users.get_auth_router(jwt_auth),
prefix="/auth/jwt",
)
app.include_router(
fastapi_users.get_register_router(on_after_register),
prefix="/auth",
)
app.include_router(
fastapi_users.get_reset_password_router(
SECRET, after_forgot_password=on_after_forgot_password
),
prefix="/auth",
)
app.include_router(
fastapi_users.get_users_router(),
prefix="/users",
)
@app.on_event("startup")
async def startup():
await database.connect()
@app.on_event("shutdown")
async def shutdown():
await database.disconnect()
@app.get('/')
def index(user=Depends(fastapi_users.get_current_user)):
return {"Hello": "World!"}
保護したいAPIにはDepends(fastapi_users.get_current_user) として依存呼び出しを行うだけです。
Alembic でデータベースを作成
モデルクラスを使って自分でデータベースを作成してもよいのですが、
ここではAlembic でデータベースを作成しましょう。
code: bash
$ alembic init migrations
alembic.ini で以下の修正します。
code: python
sqlalchemy.url = sqlite:///test.db
migrations/env.py の target_metadata を修正します。
code: python
from models import Base
target_metadata = Base.metadata
これでデータベースを作成することができるようになります。
code: bash
$ export PYTHONPATH=.
$ alembic revision --autogenerate -m "db initialize"
migrations/version/マイグレーションファイル で次のように修正します。
code: python
from alembic import op
import fastapi_users # add
import sqlalchemy as sa
code: bash
$ alembic upgrade head
$ alembic stamp head
これでデータベースが作成されます。
code: bash
$ sqlite3 test.db
SQLite version 3.31.1 2020-01-27 19:55:54
Enter ".help" for usage hints.
sqlite> .schema
CREATE TABLE alembic_version (
version_num VARCHAR(32) NOT NULL,
CONSTRAINT alembic_version_pkc PRIMARY KEY (version_num)
);
CREATE TABLE user (
id CHAR(36) NOT NULL,
email VARCHAR NOT NULL,
hashed_password VARCHAR NOT NULL,
is_active BOOLEAN NOT NULL,
is_superuser BOOLEAN NOT NULL,
PRIMARY KEY (id),
CHECK (is_active IN (0, 1)),
CHECK (is_superuser IN (0, 1))
);
CREATE UNIQUE INDEX ix_user_email ON user (email);
sqlite>
ユーザを作ります。
code: bash
$ curl -X POST "http://127.0.0.1:8080/auth/register" -H "accept: application/json" -H "Content-Type: application/json" -d '{"id":"a2ad814c-4230-4873-a0cf-12d16c9ee9f8","email":"freddie@example.com","is_active":true,"is_superuser":false,"password":"queen"}" IDのフィールドは次のコードで作成したUUIDを使用します。
code: bash
$ python -c 'import uuid; print(uuid.uuid4())'
a2ad814c-4230-4873-a0cf-12d16c9ee9f8
ブラウザでAPIドキュメントを表示させて、POSTメソッドの /registerを参照してから、"Try It Out" をクリックするとあらわれるデータ領域を修正して execute をクリックする方が簡単かもしれません。
code: bash
$ curl -s -X GET -H 'Authorization: Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VyX2lkIjoiNDA1ODlkZmItYjYxMi00YjA0LTlhNjItZmJlNTUwMDIzZmQwIiwiYXVkIjoiZmFzdGFwaS11c2VyczphdXRoIiwiZXhwIjoxNTkwODQ3MTA4fQ.EaDEKE0Asydg1pST6lgKJtmmnVHfNycwbv5yOWjnJqc' http://127.0.0.1:8080/ | python -m json.tool {
"Hello": "World!"
}
トークンの有効期限になると次のようにエラーになります。
code: bash
$ curl -s -X GET -H 'Authorization: Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VyX2lkIjoiNDA1ODlkZmItYjYxMi00YjA0LTlhNjItZmJlNTUwMDIzZmQwIiwiYXVkIjoiZmFzdGFwaS11c2VyczphdXRoIiwiZXhwIjoxNTkwODQ3MTA4fQ.EaDEKE0Asydg1pST6lgKJtmmnVHfNycwbv5yOWjnJqc' http://127.0.0.1:8080/ | python -m json.tool {
"detail": "Unauthorized"
}