# security.py
from werkzeug.security import safe_str_cmp
from user import User
users = [
User(1, 'user1', 'abcxyz'),
]
username_table = {u.username: u for u in users}
userid_table = {u.id: u for u in users}
def authenticate(username, password): # use for login. When username & password are valid, then return user.
user = username_table.get(username, None)
if user and safe_str_cmp(user.password, password):
return user
def identity(payload): # retrieve user_id from jwt in every request
user_id = payload['identity']
return userid_table.get(user_id, None)
init jwt
# app.py
from flask import Flask, request
from flask_restful import Resource, Api, reqparse
from flask_jwt import JWT, jwt_required, current_identity # added for JWT
from security import authenticate, identity # added for JWT
app = Flask(__name__)
app.config['PROPAGATE_EXCEPTIONS'] = True
app.secret_key = 'jose' # added for JWT
api = Api(app)
jwt = JWT(app, authenticate, identity) # add /auth for login
class Item(Resource):
@jwt_required() # added for JWT
def get(self, name):
...
(2) Added UserLogin resource for authenticate & identity
(3) Changed @jwt_required() to @jwt_required
# resources/user.py
from flask_jwt_extended import create_access_token, create_refresh_token
...
# replace `authenticate()` in JWT & identity=user.id
class UserLogin(Resource):
def post(self):
data = _user_parser.parse_args()
user = UserModel.find_by_username(data['username'])
if user and safe_str_cmp(user.password, data['password']): # replace `authenticate()` in JWT
access_token = create_access_token(identity=user.id, fresh=True) # make fresh
refresh_token = create_refresh_token(user.id)
return {
'access_token': access_token,
'refresh_token': refresh_token
}, 200
return {"message": "Invalid Credentials!"}, 401
# refresh token
# Get a new access token without username and password
class TokenRefresh(Resource):
@jwt_refresh_token_required
def post(self):
current_user = get_jwt_identity()
new_token = create_access_token(identity=current_user, fresh=False) # non-fresh
return {
'access_token': new_token
}, 200
# app.py
from flask_jwt_extended import JWTManager
from resources.user import UserRegister, User, UserLogin
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///data.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['PROPAGATE_EXCEPTIONS'] = True
app.secret_key = 'jose' # could do app.config['JWT_SECRET_KEY'] if we prefer
api = Api(app)
...
jwt = JWTManager(app) # no /auth anymore
# claims loader
@jwt.user_claims_loader
def add_claims_to_jwt(identity): # identity=user.id
if identity == 1: # instead of hard-coding, read from a config file.
return {'is_admin': True}
return {'is_admin': False}
api.add_resource(UserLogin, '/login') # /login
...
# /resources/item.py
from flask_jwt_extended import jwt_required, get_jwt_claims, fresh_jwt_required
class Item(Resource):
...
@jwt_required # No longer needs brackets
def get(self, name):
...
@fresh_jwt_required # require password to enter into sudo mode for few hours
def post(self, name):
...
@jwt_required
def delete(self, name):
claims = get_jwt_claims()
if not claims['is_admin']:
return {'message': 'Admin privilege required.'}, 401
...
class ItemList(Resource):
@jwt_optional
def get(self): # logged_in ? entire item list : only item names
user_id = get_jwt_identity()
items = [item.json() for item in ItemModel.find_all()]
if user_id:
return {'items': items}, 200
return {
'items': [item['name'] for item in items],
'message': 'More data available if you log in.'
}, 200
2.2. Claim
claim: additional payload (not identity)
see app.py& Item.delete()
claims loader in app.py.
claims check in Item.delete()
2.3. Optional
see ItemList
add @jwt_optional in ItemList
use get_jwt_identity() to retrieve the identity in JWT.
2.4. Refresh token
fresh token = Set after you key-in password, and you will be sudo for few hours. You can delete your account in few hrs.
non-fresh token = when fresh token is expired, you will get a non-fresh token and not sudo anymore.
see app.py& TokenRefresh
add TokenRefresh resource.
add TokenRefresh to app.py.
require a fresh token for sudo
Added @fresh_jwt_required to Item.post()
2.4. Callback
Added configuration to app.py on JWT.
@jwt.expired_token_loader = token is valid but expired.
@jwt.invalid_token_loader = token is invalid.
@jwt.unauthorized_loader = token isn't present.
@jwt.needs_fresh_token_loader = needs a fresh token but it had a non-fresh token.
@jwt.revoked_token_loader = logout