-
Notifications
You must be signed in to change notification settings - Fork 330
Expand file tree
/
Copy pathlambda_authorizer_jwt.py
More file actions
111 lines (97 loc) · 3.51 KB
/
lambda_authorizer_jwt.py
File metadata and controls
111 lines (97 loc) · 3.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
"""
-*- coding: utf-8 -*-
========================
AWS Lambda
========================
Contributor: Chirag Rathod (Srce Cde)
========================
"""
import os
import logging
import jwt
from jwt import PyJWKClient
try:
region = os.environ["AWS_REGION"]
userPoolId = os.environ["USER_POOL_ID"]
url = (
f"https://cognito-idp.{region}.amazonaws.com/{userPoolId}/.well-known/jwks.json"
)
app_client = os.environ["APP_CLIENT_ID"]
# fetching jwks
jwks_client = PyJWKClient(url)
except Exception as e:
logging.error(e)
raise ("Unable to download JWKS")
def return_response(isAuthorized, other_params={}):
return {"isAuthorized": isAuthorized, "context": other_params}
def lambda_handler(event, context):
try:
# fetching access token from event
token = event["headers"]["authorization"]
# check token structure
if len(token.split(".")) != 3:
return return_response(isAuthorized=False, other_params={})
except Exception as e:
logging.error(e)
return return_response(isAuthorized=False, other_params={})
try:
# get unverified headers
headers = jwt.get_unverified_header(token)
# get signing key
signing_key = jwks_client.get_signing_key_from_jwt(token)
# validating exp, iat, signature, iss
data = jwt.decode(
token,
signing_key.key,
algorithms=[headers.get("alg")],
options={
"verify_signature": True,
"verify_exp": True,
"verify_iat": True,
"verify_iss": True,
"verify_aud": False,
},
)
except jwt.InvalidTokenError as e:
logging.error(e)
return return_response(isAuthorized=False, other_params={})
except jwt.DecodeError as e:
logging.error(e)
return return_response(isAuthorized=False, other_params={})
except jwt.InvalidSignatureError as e:
logging.error(e)
return return_response(isAuthorized=False, other_params={})
except jwt.ExpiredSignatureError as e:
logging.error(e)
return return_response(isAuthorized=False, other_params={})
except jwt.InvalidIssuerError as e:
logging.error(e)
return return_response(isAuthorized=False, other_params={})
except jwt.InvalidIssuedAtError as e:
logging.error(e)
return return_response(isAuthorized=False, other_params={})
except Exception as e:
logging.error(e)
return return_response(isAuthorized=False, other_params={})
try:
# verifying audience...use data['client_id'] if verifying an access token else data['aud']
if app_client != data.get("client_id"):
return return_response(isAuthorized=False, other_params={})
except Exception as e:
logging.error(e)
return return_response(isAuthorized=False, other_params={})
try:
# token_use check
if data.get("token_use") != "access":
return return_response(isAuthorized=False, other_params={})
except Exception as e:
logging.error(e)
return return_response(isAuthorized=False, other_params={})
try:
# scope check
if "openid" not in data.get("scope").split(" "):
return return_response(isAuthorized=False, other_params={})
except Exception as e:
logging.error(e)
return return_response(isAuthorized=False, other_params={})
return return_response(isAuthorized=True, other_params={})