swagger – How to use bearer authentication in openapi-codegen generated python code – Stack Overflow

About jwt¶

JWT means “JSON Web Tokens”.

It’s a standard to codify a JSON object in a long dense string without spaces. It looks like this:

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c

It is not encrypted, so, anyone could recover the information from the contents.

But it’s signed. So, when you receive a token that you emitted, you can verify that you actually emitted it.

Advanced usage with scopes¶

OAuth2 has the notion of “scopes”.

You can use them to add a specific set of permissions to a JWT token.

Api wrappers

With respect to Python, API wrappers are essentially libraries/packages which can be installed using pip. These libraries help communicate with APIs in a syntactically cleaner way. Under the hood, the libraries still make use of requests and headers to make requests. However, the wrappers make your code look cleaner.

The Twilio API we discussed earlier has a wrapper. It can be installed using pip

pip install twilio

Let’s try to do the same thing we did in the previous section with Twilio

from twilio.rest import Client
from dotenv import load_dotenv
import os 


load_dotenv()
TWILIO_ACCOUNT_SID = os.environ.get("TWILIO_ACCOUNT_SID")
TWILIO_ACCOUNT_TOKEN = os.environ.get("TWILIO_ACCOUNT_TOKEN")
client = Client(TWILIO_ACCOUNT_SID , TWILIO_ACCOUNT_TOKEN)

calls = client.calls.list(limit=5)

for idx, record in enumerate(calls):
    print(f"{idx}. {record.duration}")

As you can see, the code is a few lines shorter and looks much cleaner.

Unfortunately, not all APIs have a wrapper. However, a lot of them do. Before a consumer, an API directly, try searching for a wrapper around it. This will make it significantly easier to work with the API.

Handle jwt tokens¶

Import the modules installed.

Create a random secret key that will be used to sign the JWT tokens.

To generate a secure random secret key use the command:

And copy the output to the variable SECRET_KEY (don’t use the one in the example).

Create a variable ALGORITHM with the algorithm used to sign the JWT token and set it to “HS256”.

Create a variable for the expiration of the token.

Define a Pydantic Model that will be used in the token endpoint for the response.

Create a utility function to generate a new access token.

fromdatetimeimportdatetime,timedeltafromtypingimportUnionfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=NoneclassUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.utcnow() expires_deltaelse:expire=datetime.utcnow() timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:str=Depends(oauth2_scheme)):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptJWTError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:User=Depends(get_current_user)):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token",response_model=Token)asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends()):user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)return{"access_token":access_token,"token_type":"bearer"}@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Depends(get_current_active_user)):return[{"item_id":"Foo","owner":current_user.username}]
fromdatetimeimportdatetime,timedeltafromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=NoneclassUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.utcnow() expires_deltaelse:expire=datetime.utcnow() timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:str=Depends(oauth2_scheme)):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptJWTError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:User=Depends(get_current_user)):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token",response_model=Token)asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends()):user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)return{"access_token":access_token,"token_type":"bearer"}@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Depends(get_current_active_user)):return[{"item_id":"Foo","owner":current_user.username}]

Hash and verify the passwords¶

Import the tools we need from passlib.

Похожее:  Личный кабинет интернет-банка Липецккомбанк онлайн

Create a PassLib “context”. This is what will be used to hash and verify passwords.

How does the client get the token?

We know that the client is going to send a token when it makes an API call, but to be able to do that it first needs to obtain the token. For this task there are also two possible methods.

In this article, we will be working with 5 different apis which use different types of authentication. we will be using python to consume the apis.

Not all APIs are as well documented as Twilio. This guide should help you work with APIs which are secured using Keys, BasicAuth, or OAuth2.

We will be working with the following APIS

You can find the source code here

Insecure apis

The Cat Facts API does not require any authentication and is fairly straightforward to work with. Let’s make a request to the following endpoint

Install passlib¶

PassLib is a great Python package to handle password hashes.

It supports many secure hashing algorithms and utilities to work with them.

The recommended algorithm is “Bcrypt”.

So, install PassLib with Bcrypt:

Install python-jose¶

We need to install python-jose to generate and verify the JWT tokens in Python:

Python-jose requires a cryptographic backend as an extra.

Here we are using the recommended one: pyca/cryptography.

Password hashing¶

“Hashing” means converting some content (a password in this case) into a sequence of bytes (just a string) that looks like gibberish.

Whenever you pass exactly the same content (exactly the same password) you get exactly the same gibberish.

Похожее:  [Часть 1] Написание своего сайта. Регистрация и Авторизация »

But you cannot convert from the gibberish back to the password.

Reading from .env files

Before moving on to the next sections, let’s look at how to read variables from a .env file. It’s highly recommended to store your credentials in a .env file to avoid them being exposed to others.

We will need to install the python-dotenv library.

pip install python-dotenv

Assume have a .env file with some random API Token

API_TOKEN = "SOME API TOKEN"

Let’s try reading the API Token in Python.

from dotenv import load_dotenv
import os 

load_dotenv()
API_TOKEN = os.environ.get("API_TOKEN")

The get function accepts a variable name stored in the .env file as an argument.

Resources

Github Repo

Table of contents

  • Insecure APIs
  • Reading values from.env files
  • APIs with Keys
  • APIs with Basic Auth
  • API Wrappers
  • The Session Object
  • APIs secured via OAuth2
  • Using the GitHub API (OAuth2)
  • Using the Genius API (OAuth2)

Some familiarity with the requests library is expected. If you need a refresher, you can refer to my previous article.

Token revocation

An important security consideration when working with token authentication is making it easy to revoke tokens. This is not only important to control a leak, but also as a “logout” mechanism that clients can use to disable a token once they don’t need it anymore, ensuring that even if this discarded token is leaked it won’t be of use.

If you are using random tokens, revoking a token requires removing the token from the database. Once the token is not in the database, it won’t work as authentication. I typically implement an endpoint such as /tokens with the DELETE method to do the revocation.

Types of tokens

In terms of their composition, there are two large groups or categories of tokens that I’m going to discuss in this article. Depending on the needs of your application you will have to choose which type of token works best. To be honest, I do not know if there are formal names for these, so I’m going to name them myself. The two groups are random tokens and signed tokens.

Update the /tokenpath operation¶

Create a timedelta with the expiration time of the token.

Create a real JWT access token and return it.

fromdatetimeimportdatetime,timedeltafromtypingimportUnionfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=NoneclassUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.utcnow() expires_deltaelse:expire=datetime.utcnow() timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:str=Depends(oauth2_scheme)):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptJWTError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:User=Depends(get_current_user)):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token",response_model=Token)asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends()):user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)return{"access_token":access_token,"token_type":"bearer"}@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Depends(get_current_active_user)):return[{"item_id":"Foo","owner":current_user.username}]
fromdatetimeimportdatetime,timedeltafromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=NoneclassUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.utcnow() expires_deltaelse:expire=datetime.utcnow() timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:str=Depends(oauth2_scheme)):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptJWTError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:User=Depends(get_current_user)):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token",response_model=Token)asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends()):user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)return{"access_token":access_token,"token_type":"bearer"}@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Depends(get_current_active_user)):return[{"item_id":"Foo","owner":current_user.username}]

Using the genius api (oauth2)

Let’s take a look at another example. I’ll skip the part where we import the libraries and load the credentials.

Похожее:  Как зарегистрироваться в личном кабинете интернет-банка «ФК Открытие»

Recap¶

With what you have seen up to now, you can set up a secure FastAPI application using standards like OAuth2 and JWT.

In almost any framework handling the security becomes a rather complex subject quite quickly.

Many packages that simplify it a lot have to make many compromises with the data model, database, and available features. And some of these packages that simplify things too much actually have security flaws underneath.

Authenticating api endpoints

Once the client is in possession of a token, it can send authenticated requests. The actual authentication mechanism that is often used is Bearer Authentication, which also uses the Authorization header:

Authorization: Bearer <token>

Conclusion

I hope this article serves as a good guide to work with APIs in Python. Before consuming an API directly, always look for a wrapper. The 5 mins you spend looking for a wrapper might save you hours of headache.

1 Звезда2 Звезды3 Звезды4 Звезды5 Звезд (1 оценок, среднее: 5,00 из 5)
Загрузка...

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *

Adblock
detector