travail d'un we (j'en ai big marre)

This commit is contained in:
2025-11-23 23:39:43 +01:00
parent 1ff888cf34
commit 4c486199ed
69 changed files with 208 additions and 111 deletions

31
final-loop/src/crc.py Normal file
View File

@@ -0,0 +1,31 @@
def crc(data, polynomial):
n = len(polynomial) - 1
initial_length = len(data)
check_bits = data + [0] * n
for i in range(initial_length):
if check_bits[i] == 1:
for j, p in enumerate(polynomial):
check_bits[i + j] = check_bits[i + j] ^ p
return check_bits[-n:]
def check_crc(data, polynomial, check_bits):
full_data = data + check_bits
for i in range(len(data)):
if full_data[i] == 1:
for j, p in enumerate(polynomial):
full_data[i + j] = full_data[i + j] ^ p
return 1 not in full_data
data = [0,0,1,0,0,0,1,0,1,1,1,1,0,1,1,1,1,0,0,0,1,1,1,1,0,1,1,0,1,0,1,1,0,0,0,0,1,1,0,1]
check = [1,1,0,0,1,1,0,0]
poly = [1,0,0,0,0,0,1,1,1]
print(check_crc(data, poly, check))
if __name__ == "__main__":
example = "0010001011110111100011110110101100001101"
long = [int(i) for i in example]
polynomial = [1, 0, 0, 0, 0, 0, 1, 1, 1] # crc8 polynomial
check = crc(long, polynomial)
print(f"Check bits: {check}")
checked = check_crc(long, polynomial, check)
print(f"Checked: {checked}")

View File

@@ -0,0 +1,86 @@
import argparse
import pprint
from get_heights import get_heights
from encode_decode import spotify_bar_decode
from get_uri import get_uri, get_info
import cv2
from start_song import start_song
import os
from flask import Flask, Response
import threading
import time
import numpy as np
os.environ["QT_QPA_PLATFORM"] = "xcb"
def process_frame(frame, token):
heights,preprocess = get_heights(frame)
cv2.imshow("frame", preprocess)
if len(heights)!=23:
return None # skip bad frames
else:
print("ON TROUVE UN CODE")
heights = heights[1:11] + heights[12:-1]
decoded = spotify_bar_decode(heights)
uri = get_uri(decoded, token)
summary, full_response = get_info(uri["target"], token)
print(f"SONG FOUND : {summary["name"]}")
if uri:
start_song(uri['target'],os.getenv('DEVICE_ID'))
return summary
if __name__ == "__main__":
cap = cv2.VideoCapture('/dev/video0')
token = os.getenv('ACCESS_TOKEN')
if not token:
token = "BQCph85n2Cr-h8jKWhsdsdhywaX3h_bn4pJ_-jdque2u_gn9bI-OdthIowGSU6r058QozL0eJfzy_ClWezXYKrQO2npuyfWVphxSQrKqhBWkGr5bK0UrIfsKKAdJvoNrXD9Db-ObgP5D3-rMpF0Xq3RXwMpTal9NpzTJcHZs_PBjbNClJVy24Jk5WfGbKZPkMs_Hon5TjABx4QzxzE2vxjd4X4EyPlyPuKiIVp-f7yTSJbbRLqt-_O_VJ9mnQ1RgGK16afY7p3JZH_B6-VSCrFuhK_m9yhSieiWoqEeopFEX47Nc4-tuqe8CXcYGiRLZBIcc4w64ly36ZIftxRN7ehcJb2gcV26ZqMS1lg1Yxp0OD4ShJJsinA69X535_w"
print(token)
if not cap.isOpened():
print("Error: Could not open camera.")
exit(1)
else:
print("Camera is working and ready to capture frames.")
while True:
ret, frame = cap.read()
if not ret:
break
# Crop same as streaming for consistency
height, width, _ = frame.shape
margin = 100
frame = frame[margin:height - margin, margin:width - margin]
try:
summary = process_frame(frame, token)
except Exception as e:
print(e)
print("")
continue
# cv2.imshow("preframe", frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()

View File

@@ -0,0 +1,118 @@
import numpy as np
import crccheck
# This code was written by "Doyle" on Stack Overflow
# https://stackoverflow.com/a/64950150/10703868
# Utils for conversion between int, array of binary
# and array of bytes (as ints)
def int_to_bin(num, length, endian):
if endian == 'l':
return [num >> i & 1 for i in range(0, length)]
elif endian == 'b':
return [num >> i & 1 for i in range(length-1, -1, -1)]
def bin_to_int(bin,length):
return int("".join([str(bin[i]) for i in range(length-1,-1,-1)]),2)
def bin_to_bytes(bin, length):
b = bin[0:length] + [0] * (-length % 8)
return [(b[i]<<7) + (b[i+1]<<6) + (b[i+2]<<5) + (b[i+3]<<4) +
(b[i+4]<<3) + (b[i+5]<<2) + (b[i+6]<<1) + b[i+7] for i in range(0,len(b),8)]
# Return the circular right shift of an array by 'n' positions
def shift_right(arr, n):
return arr[-n % len(arr):len(arr):] + arr[0:-n % len(arr)]
gray_code = [0,1,3,2,7,6,4,5]
gray_code_inv = [[0,0,0],[0,0,1],[0,1,1],[0,1,0],
[1,1,0],[1,1,1],[1,0,1],[1,0,0]]
# CRC using Rocksoft model:
# NOTE: this is not quite any of their predefined CRC's
# 8: number of check bits (degree of poly)
# 0x7: representation of poly without high term (x^8+x^2+x+1)
# 0x0: initial fill of register
# True: byte reverse data
# True: byte reverse check
# 0xff: Mask check (i.e. invert)
spotify_crc = crccheck.crc.Crc(8, 0x7, 0x0, True, True, 0xff)
def calc_spotify_crc(bin37):
bytes = bin_to_bytes(bin37, 37)
return int_to_bin(spotify_crc.calc(bytes), 8, 'b')
def check_spotify_crc(bin45):
data = bin_to_bytes(bin45,37)
return spotify_crc.calc(data) == bin_to_bytes(bin45[37:], 8)[0]
# Simple convolutional encoder
def encode_cc(dat):
gen1 = [1,0,1,1,0,1,1]
gen2 = [1,1,1,1,0,0,1]
punct = [1,1,0]
dat_pad = dat[-6:] + dat # 6 bits are needed to initialize
# register for tail-biting
stream1 = np.convolve(dat_pad, gen1, mode='valid') % 2
stream2 = np.convolve(dat_pad, gen2, mode='valid') % 2
enc = [val for pair in zip(stream1, stream2) for val in pair]
return [enc[i] for i in range(len(enc)) if punct[i % 3]]
# To create a generator matrix for a code, we encode each row
# of the identity matrix. Note that the CRC is not quite linear
# because of the check mask so we apply the lamda function to
# invert it. Given a 37 bit media reference we can encode by
# ref * spotify_generator + spotify_mask (mod 2)
_i37 = np.identity(37, dtype=bool)
crc_generator = [_i37[r].tolist() +
list(map(lambda x : 1-x, calc_spotify_crc(_i37[r].tolist())))
for r in range(37)]
spotify_generator = 1*np.array([encode_cc(crc_generator[r]) for r in range(37)], dtype=bool)
del _i37
spotify_mask = 1*np.array(encode_cc(37*[0] + 8*[1]), dtype=bool)
# The following matrix is used to "invert" the convolutional code.
# In particular, we choose a 45 vector basis for the columns of the
# generator matrix (by deleting those in positions equal to 2 mod 4)
# and then inverting the matrix. By selecting the corresponding 45
# elements of the convolutionally encoded vector and multiplying
# on the right by this matrix, we get back to the unencoded data,
# assuming there are no errors.
# Note: numpy does not invert binary matrices, i.e. GF(2), so we
# hard code the following 3 row vectors to generate the matrix.
conv_gen = [[0,1,0,1,1,1,1,0,1,1,0,0,0,1]+31*[0],
[1,0,1,0,1,0,1,0,0,0,1,1,1] + 32*[0],
[0,0,1,0,1,1,1,1,1,1,0,0,1] + 32*[0] ]
conv_generator_inv = 1*np.array([shift_right(conv_gen[(s-27) % 3],s) for s in range(27,72)], dtype=bool)
# Given an integer media reference, returns list of 20 barcode levels
def spotify_bar_code(ref):
bin37 = np.array([int_to_bin(ref, 37, 'l')], dtype=bool)
enc = (np.add(1*np.dot(bin37, spotify_generator), spotify_mask) % 2).flatten()
perm = [enc[7*i % 60] for i in range(60)]
return [gray_code[4*perm[i]+2*perm[i+1]+perm[i+2]] for i in range(0,len(perm),3)]
# Equivalent function but using CRC and CC encoders.
def spotify_bar_code2(ref):
bin37 = int_to_bin(ref, 37, 'l')
enc_crc = bin37 + calc_spotify_crc(bin37)
enc_cc = encode_cc(enc_crc)
perm = [enc_cc[7*i % 60] for i in range(60)]
return [gray_code[4*perm[i]+2*perm[i+1]+perm[i+2]] for i in range(0,len(perm),3)]
# Given 20 (clean) barcode levels, returns media reference
def spotify_bar_decode(levels):
level_bits = np.array([gray_code_inv[levels[i]] for i in range(20)], dtype=bool).flatten()
conv_bits = [level_bits[43*i % 60] for i in range(60)]
cols = [i for i in range(60) if i % 4 != 2] # columns to invert
conv_bits45 = np.array([conv_bits[c] for c in cols], dtype=bool)
bin45 = (1*np.dot(conv_bits45, conv_generator_inv) % 2).tolist()
if check_spotify_crc(bin45):
return bin_to_int(bin45, 37)
else:
print('Error in levels; Use real decoder!!!')
return -1

View File

@@ -0,0 +1,70 @@
import numpy as np
import matplotlib.pyplot as plt
from skimage import io
from skimage.filters import threshold_otsu, gaussian
from skimage.morphology import closing, square
from skimage.measure import label, regionprops
from skimage.color import rgb2gray
import matplotlib.patches as patches
from sklearn.cluster import KMeans
import warnings
from skimage.morphology import remove_small_objects
from skimage.transform import resize
from skimage import exposure
from skimage.morphology import erosion, square
import cv2
from skimage.filters import threshold_local
from scipy import ndimage as ndi
from skimage.segmentation import watershed
from skimage.morphology import opening, rectangle
from scipy.ndimage import gaussian_filter
warnings.filterwarnings("ignore", category=FutureWarning, message=".*square.*deprecated.*")
#### keep this. If does not work, remove black and reaply the logic only to not-black part of the picture
def get_heights(image):
# image = io.imread(filename)
im = rgb2gray(image)
# Filtrage gaussien léger pour diminuer bruit
im_blur = gaussian_filter(im, sigma=1)
# Calcul du seuil local avec block_size impair, ajuster offset si besoin
th = threshold_local(im_blur, block_size=71, offset=-0.09)
binary_im = im_blur > th
separated = binary_im
labeled = label(separated)
bar_dims = [r.bbox for r in regionprops(labeled)]
bar_dims.sort(key=lambda x: x[1]) # left to right
bars = bar_dims#[1:] # skip logo
bar_heights_raw = []
if(len(bars)!=23):
print(len(bars))
return [0], (separated.astype("uint8") * 255) # convert to 0/255 uint8
for bar in bars:
top, left, bottom, right = bar
effective_height = bottom - top # use bounding box height directly
bar_heights_raw.append(effective_height)
# Cluster measured heights to 8 clusters representing discrete bar levels
bar_heights_raw_np = np.array(bar_heights_raw).reshape(-1, 1)
kmeans = KMeans(n_clusters=8, random_state=0).fit(bar_heights_raw_np)
cluster_centers = np.sort(kmeans.cluster_centers_.flatten())
# Assign each bar height to closest cluster center index (0 to 7)
predicted_levels = []
for h in bar_heights_raw:
diffs = np.abs(cluster_centers - h)
closest_cluster = np.argmin(diffs)
predicted_levels.append(int(closest_cluster))
print(predicted_levels)
return predicted_levels, (separated.astype("uint8") * 255)

56
final-loop/src/get_uri.py Normal file
View File

@@ -0,0 +1,56 @@
from typing import Tuple
import requests
HEADERS_LUT = {
"X-Client-Id": "58bd3c95768941ea9eb4350aaa033eb3",
"Accept-Encoding": "gzip, deflate",
"Connection": "close",
"App-Platform": "iOS",
"Accept": "*/*",
"User-Agent": "Spotify/8.5.68 iOS/13.4 (iPhone9,3)",
"Accept-Language": "en",
"Spotify-App-Version": "8.5.68",
}
MEDIA_REF_LUT_URL = "https://spclient.wg.spotify.com:443/scannable-id/id"
def get_uri(media_ref: int, token: str):
"""Query Spotify internal API to get the URI of the media reference."""
header = {
**HEADERS_LUT,
"Authorization": f"Bearer {token}"
}
url = f'{MEDIA_REF_LUT_URL}/{media_ref}?format=json'
response = requests.get(url, headers=header)
response.raise_for_status()
return response.json()
def get_info(uri: str, token: str) -> Tuple[dict, dict]:
"""Query the Spotify API to get information about a URI."""
info_headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"Authorization": f"Bearer {token}"
}
split = uri.split(":")
content_type = split[-2] + "s"
id = split[-1]
response = requests.get(f"https://api.spotify.com/v1/{content_type}/{id}", headers=info_headers)
response.raise_for_status()
resp = response.json()
result = {
"name": resp["name"],
"type": split[-2],
"url": resp["external_urls"]["spotify"],
}
if "artists" in resp:
result['artists'] = []
for a in resp['artists']:
result["artists"].append(a["name"])
if "album" in resp:
result['album'] = resp['album']['name']
if "description" in resp:
result["description"] = resp['description']
return result, resp

11
final-loop/src/permute.py Normal file
View File

@@ -0,0 +1,11 @@
def permute(bits, step=7):
for i in range(len(bits)):
yield bits[(i * step) % len(bits)]
if __name__ == "__main__":
bits = "111000111100101111101110111001011100110000100100011100110011"
print("".join(permute(bits)))
# 111100110001110101101000011110010110101100111111101000111000

View File

@@ -0,0 +1,35 @@
from typing import List
def encode(bits: List[int], polynomial: List[int], tail_bite=False):
"""Convolutionally encode the stream of bits using the generator polynomial.
If tail_bite == True, prepend the tail of the input. Otherwise use 0s to fill.
"""
if tail_bite:
tail = bits[-(len(polynomial) - 1):]
else:
tail = [0 for i in range(len(polynomial) - 1)]
full = tail + bits
polynomial.reverse() # Reverse since we're working the other direction
parity_bits = []
for i in range(len(bits)):
parity = 0
for j in range(len(polynomial)):
parity ^= full[i + j] * polynomial[j]
parity_bits.append(parity)
return parity_bits
if __name__ == "__main__":
g0 = [1, 0, 1, 1, 0, 1, 1]
g1 = [1, 1, 1, 1, 0, 0, 1]
bits = "010001001110111111110001110101101011011001100"
g0_expected = "100011100111110100110011110100000010001001011"
g1_expected = "110011100010110110110100101101011100110011011"
bits = [int(i) for i in bits]
p0 = encode(bits, g0, True)
p1 = encode(bits, g1, True)
print(g0_expected == "".join(str(i) for i in p0))
print(g1_expected == "".join(str(i) for i in p1))

View File

@@ -0,0 +1,43 @@
import spotipy
from spotipy.oauth2 import SpotifyOAuth
import sys
# Replace these with your Spotify developer app credentials
CLIENT_ID = 'a1b29f64bef643b5ade0944830637510'
CLIENT_SECRET = '1d74196e6fec41f9986917afd57df3da'
REDIRECT_URI = 'https://vinyly.couraud.xyz'
SCOPE = 'user-modify-playback-state user-read-playback-state'
def start_song(URI="spotify:track:0RoA7ObU6phWpqhlC9zH4Z",device_id="a499b8f3684d8098ffb5f9ff11392ebdd61fc6d1"):
auth_manager = SpotifyOAuth(client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
redirect_uri=REDIRECT_URI,
scope=SCOPE,
open_browser=False)
# Check if token already cached; if not, do manual auth
token_info = auth_manager.get_cached_token()
if not token_info:
print("Please scan the QR code and paste the full redirect URL after authorization:")
redirect_response = input("Redirect URL: ").strip()
token_info = auth_manager.get_access_token(code=auth_manager.parse_response_code(redirect_response))
auth_manager.cache_token(token_info)
sp = spotipy.Spotify(auth_manager=auth_manager)
devices = sp.devices()
if not devices['devices']:
print('No active Spotify devices found. Please start playback on a device.')
sys.exit(1)
current = sp.current_user_playing_track()
if current and current.get("item"):
current_uri = current["item"]["uri"]
if current_uri != URI:
sp.start_playback(device_id=device_id, uris=[URI])
if __name__ == '__main__':
start_song(URI,device_id)

View File

@@ -0,0 +1,15 @@
media_ref = 57639171874
binary = f"{bin(media_ref)[2:]:0>37}"
print(binary)
# pad with 3 bits to the right:
binary = f"{binary:0<39}"
print(binary)
a = "100011100111110100110011110100000010001001011"
b = "110011100010110110110100101101011100110011011"
c = zip(a, b)
print("".join(i + j for i, j in c))