POC
This commit is contained in:
@@ -1,5 +1,73 @@
|
||||
from typing import Tuple
|
||||
import requests
|
||||
import spotipy
|
||||
from spotipy.oauth2 import SpotifyOAuth
|
||||
import sys
|
||||
from urllib.parse import urlencode
|
||||
from urllib.parse import urlencode, urlparse, parse_qs
|
||||
import webbrowser
|
||||
import time
|
||||
import base64
|
||||
|
||||
|
||||
CLIENT_ID = 'a1b29f64bef643b5ade0944830637510'
|
||||
CLIENT_SECRET = '1d74196e6fec41f9986917afd57df3da'
|
||||
REDIRECT_URI = 'https://vinyly.couraud.xyz'
|
||||
SCOPE = 'user-modify-playback-state user-read-playback-state'
|
||||
TOKEN_FILE = 'spotify_token.json'
|
||||
import json
|
||||
|
||||
def get_spotify_token():
|
||||
# Try to load existing token
|
||||
try:
|
||||
with open(TOKEN_FILE) as f:
|
||||
data = json.load(f)
|
||||
if data['expires_at'] > time.time():
|
||||
return data['access_token']
|
||||
# refresh token
|
||||
resp = requests.post(
|
||||
'https://accounts.spotify.com/api/token',
|
||||
data={'grant_type': 'refresh_token', 'refresh_token': data['refresh_token']},
|
||||
headers={'Authorization': 'Basic ' + base64.b64encode(f"{CLIENT_ID}:{CLIENT_SECRET}".encode()).decode()}
|
||||
).json()
|
||||
data['access_token'] = resp['access_token']
|
||||
data['expires_at'] = time.time() + resp['expires_in']
|
||||
with open(TOKEN_FILE, 'w') as f2:
|
||||
json.dump(data, f2)
|
||||
return data['access_token']
|
||||
except FileNotFoundError:
|
||||
# Get authorization code
|
||||
auth_url = 'https://accounts.spotify.com/authorize?' + urlencode({
|
||||
'client_id': CLIENT_ID,
|
||||
'response_type': 'code',
|
||||
'redirect_uri': REDIRECT_URI,
|
||||
'scope': SCOPE
|
||||
})
|
||||
print("Open this URL and authorize:", auth_url)
|
||||
webbrowser.open(auth_url)
|
||||
code = input("Paste the full redirected URL here: ")
|
||||
code = parse_qs(urlparse(code).query)['code'][0]
|
||||
|
||||
# Exchange code for tokens
|
||||
resp = requests.post(
|
||||
'https://accounts.spotify.com/api/token',
|
||||
data={
|
||||
'grant_type': 'authorization_code',
|
||||
'code': code,
|
||||
'redirect_uri': REDIRECT_URI
|
||||
},
|
||||
headers={'Authorization': 'Basic ' + base64.b64encode(f"{CLIENT_ID}:{CLIENT_SECRET}".encode()).decode()}
|
||||
).json()
|
||||
|
||||
data = {
|
||||
'access_token': resp['access_token'],
|
||||
'refresh_token': resp['refresh_token'],
|
||||
'expires_at': time.time() + resp['expires_in']
|
||||
}
|
||||
with open(TOKEN_FILE, 'w') as f:
|
||||
json.dump(data, f)
|
||||
return data['access_token']
|
||||
|
||||
|
||||
HEADERS_LUT = {
|
||||
"X-Client-Id": "58bd3c95768941ea9eb4350aaa033eb3",
|
||||
@@ -11,18 +79,30 @@ HEADERS_LUT = {
|
||||
"Accept-Language": "en",
|
||||
"Spotify-App-Version": "8.5.68",
|
||||
}
|
||||
|
||||
MEDIA_REF_LUT_URL = "https://spclient.wg.spotify.com:443/scannable-id/id"
|
||||
|
||||
def get_uri(media_ref: int, token: str):
|
||||
"""Query Spotify internal API to get the URI of the media reference."""
|
||||
header = {
|
||||
**HEADERS_LUT,
|
||||
"Authorization": f"Bearer {token}"
|
||||
}
|
||||
url = f'{MEDIA_REF_LUT_URL}/{media_ref}?format=json'
|
||||
response = requests.get(url, headers=header)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
auth_url = "https://accounts.spotify.com/authorize?" + urlencode({
|
||||
"client_id": CLIENT_ID,
|
||||
"response_type": "code",
|
||||
"redirect_uri": REDIRECT_URI,
|
||||
"scope": SCOPE
|
||||
})
|
||||
|
||||
|
||||
|
||||
|
||||
def get_uri(media_ref: int):
|
||||
|
||||
with open("/app/songs.json") as f:
|
||||
code_to_url = json.load(f)
|
||||
|
||||
uri = code_to_url.get(str(media_ref))
|
||||
if uri:
|
||||
return uri
|
||||
else:
|
||||
return -1
|
||||
|
||||
|
||||
def get_info(uri: str, token: str) -> Tuple[dict, dict]:
|
||||
"""Query the Spotify API to get information about a URI."""
|
||||
@@ -54,3 +134,4 @@ def get_info(uri: str, token: str) -> Tuple[dict, dict]:
|
||||
|
||||
return result, resp
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user