-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathreporting_api_auth_client.py
More file actions
127 lines (113 loc) · 4.97 KB
/
reporting_api_auth_client.py
File metadata and controls
127 lines (113 loc) · 4.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import base64
import hashlib
import os
import re
import requests
import time
class AuthClient:
def __init__(self, client_id, email, password, hostname):
self.sign_in_url = "https://identitytoolkit.googleapis.com/v1/accounts:signInWithPassword/signin"
self.login_url = "https://login.openx.com"
self.gcip_key = "AIzaSyCLvqp5phL0yGo0uxIN-l7a58mPkV74hsw"
self.client_id = client_id
self.email = email
self.password = password
self.instance_hostname = hostname
self.code_verifier, self.code_challenge = self.generate_code_challenge()
self.get_token_url = "https://api.openx.com/oauth2/v1/token"
self.authorize_url = "https://api.openx.com/oauth2/v1/authorize"
self.session_info_url = f"https://api.openx.com/oauth2/v1/login/session-info"
self.consent_url = "https://api.openx.com/oauth2/v1/login/consent"
self.redirect_url = f"https://{hostname}"
self.token_cache = {}
def get_token(self):
# Check if token is cached
if self.email in self.token_cache:
cached_token = self.token_cache[self.email]
if time.time() < cached_token["expires_at"]:
return cached_token["access_token"]
auth_response = self.get_auth_token()
session_id = self.authorize_and_get_session_id()
session_info = self.get_session_info(session_id, auth_response["idToken"])
consent = self.grant_consent(session_id, auth_response["idToken"], session_info["scope"])
token_response = self.get_access_token(consent["code"])
# Cache token with expiration time
expiration_time = time.time() + token_response["expires_in"]
self.token_cache[self.email] = {
"access_token": token_response["access_token"],
"expires_at": expiration_time
}
return token_response["access_token"]
def get_auth_token(self):
data = {
"email": self.email,
"password": self.password,
"returnSecureToken": True
}
response = requests.post(f"https://identitytoolkit.googleapis.com/v1/accounts:signInWithPassword?key={self.gcip_key}", data=data)
response.raise_for_status()
return response.json()
def authorize_and_get_session_id(self):
params = {
"scope": "openid email profile api",
"response_type": "code",
"client_id": self.client_id,
"redirect_uri": self.redirect_url,
"state": "abcd",
"code_challenge": self.code_challenge,
"code_challenge_method": "S256",
"nonce": "nonce-123456"
}
headers = {"Content-Type": "application/x-www-form-urlencoded"}
response = requests.get(self.authorize_url, params=params, headers=headers, allow_redirects=True)
# Extract session_id from URL
return self.get_session_id_from_url(response.url)
def get_session_info(self, session_id, id_token):
headers = {
"Authorization": f"Bearer {id_token}",
"Origin": self.login_url
}
params = {"session_id": session_id}
response = requests.get(self.session_info_url, params=params, headers=headers)
response.raise_for_status()
return response.json()
def grant_consent(self, session_id, id_token, scope):
headers = {
"Authorization": f"Bearer {id_token}",
"Origin": self.login_url,
"Content-Type": "application/x-www-form-urlencoded"
}
data = {
"consent": "true",
"scope": scope,
"session_id": session_id,
"instance_hostname": self.instance_hostname
}
response = requests.post(self.consent_url, data=data, headers=headers)
response.raise_for_status()
return response.json()
def get_access_token(self, code):
headers = {"Content-Type": "application/x-www-form-urlencoded"}
data = {
"grant_type": "authorization_code",
"code": code,
"redirect_uri": self.redirect_url,
"client_id": self.client_id,
"code_verifier": self.code_verifier
}
response = requests.post(self.get_token_url, data=data, headers=headers)
response.raise_for_status()
return response.json()
@staticmethod
def generate_code_challenge():
# Generate a secure random code verifier (128 bytes)
verifier_bytes = os.urandom(32)
code_verifier = base64.urlsafe_b64encode(verifier_bytes).rstrip(b'=').decode('utf-8')
# Generate the SHA-256 hash of the code verifier
sha256_hash = hashlib.sha256(code_verifier.encode('utf-8')).digest()
code_challenge = base64.urlsafe_b64encode(sha256_hash).rstrip(b'=').decode('utf-8')
return code_verifier, code_challenge
@staticmethod
def get_session_id_from_url(url):
match = re.search(r"[?&]session_id=([^&]+)", url)
return match.group(1) if match else None