My below code was previously working and did all i needed it to, however has started stopping on the line:
tracks_data = self.sp.playlist_tracks(playlist_id)
Here is my full service script below, if you also have more suggestions on improving this in general for efficiency and cost reductions thats a bonus, however main focus is for this to work for now.
The function weekly_cache, will not complete anymore due to the line mentioned above stopping and halting the process, it doesnt crash, or provide errors, it gets to that line and stays there indefinitley.
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
from urllib.parse import urlparse
from google.cloud import storage
import firebase_admin
import os
from firebase_admin import credentials
from firebase_admin import firestore
import json
import re
class SpotifyService:
def __init__(self): # Initialize Variables and Stuff
self.client_id = os.environ.get('SPOTIFY_CLIENT_ID')
self.client_secret = os.environ.get('SPOTIFY_CLIENT_SECRET')
self.client_credentials_manager = SpotifyClientCredentials(client_id=self.client_id, client_secret=self.client_secret)
self.sp = spotipy.Spotify(client_credentials_manager=self.client_credentials_manager)
# Create a cache for playlist tracks and cover images
self.playlist_cache = {}
# Initialize Firebase Admin SDK with your Firebase credentials
self.firebase_cred = credentials.Certificate('./API/new-waave-35fec48b08df.json') # Replace with your Firebase credentials file path
firebase_admin.initialize_app(self.firebase_cred)
self.db = firestore.client()
self.storage_client = storage.Client()
self.bucket_name = 'waaveapibucket' # Replace with your bucket name
playlist_ref = self.db.collection('config').document('admin')
playlist_doc = playlist_ref.get()
data = playlist_doc.to_dict()
playlist_ids = data.get('playlistIds', [])
self.playlist_urls_or_ids = playlist_ids
def upload_to_bucket(self, blob_name, file_path): # Upload to Google Cloud Storage (Simple CRUD thing)
"""Upload data to a bucket."""
bucket = self.storage_client.bucket(self.bucket_name)
blob = bucket.blob(blob_name)
blob.upload_from_filename(file_path)
def read_file_to_array(file_path):
with open(file_path, 'r') as file:
lines = file.readlines()
# Remove newline characters from each line
lines = [line.strip() for line in lines]
return lines
def update_playlist_ids(self):
try:
# Read playlist IDs from file
with open('./API/playlist_ids.txt', 'r') as file:
lines = [line.strip() for line in file.readlines()]
# Get the document reference
playlist_ref = self.db.collection('config').document('admin')
# Update the document
playlist_ref.update({"playlistIds": lines})
print(lines[20] if len(lines) > 20 else "Less than 21 lines in file")
return 'success!!'
except Exception as e:
print(f'Error updating playlist Ids: {e}')
return f'Error updating playlist Ids: {e}'
def get_playlist_cover(self, playlist_url): # Get Playlist Cover from Spotify ID
if 'http' in playlist_url:
parsed_url = urlparse(playlist_url)
playlist_id = parsed_url.path.split('/')[-1]
else:
playlist_id = playlist_url
try:
cover_image = self.sp.playlist_cover_image(playlist_id)[0]['url']
return cover_image
except Exception as e:
print(f'Error retrieving cover image for playlist {playlist_url}: {e}')
return None
def parse_playlist_id(self, playlist_url_or_id): # Convert URLS to IDs
if 'http' in playlist_url_or_id:
parsed_url = urlparse(playlist_url_or_id)
return parsed_url.path.split('/')[-1]
return playlist_url_or_id
def weekly_cache(self): # Cache this week's playlist Data/Results
playlist_data = []
data_to_remove = []
print('Starting weekly_cache function.')
for i, id in enumerate(self.playlist_urls_or_ids, 1):
print(f"Processing playlist {i}: {id}")
# If it's a URL, extract the playlist ID
playlist_id = self.parse_playlist_id(id)
print(f'Playlist ID for {i}: {playlist_id}')
try:
if playlist_id != 'test':
print(f'Retrieving tracks for playlist ID {playlist_id}')
tracks_data = self.sp.playlist_tracks(playlist_id)
tracks = [
f"{item['track']['name']} - {', '.join(artist['name'] for artist in item['track']['artists'])}"
for item in tracks_data['items'] if item['track']
]
print(f'Found {len(tracks)} tracks for playlist ID {playlist_id}')
cover_image = self.sp.playlist_cover_image(playlist_id)[0]['url']
print(f'Cover image URL for playlist ID {playlist_id}: {cover_image}')
playlist_entry = {
'playlist_id': playlist_id,
'cover_image': cover_image,
'songs': tracks
}
playlist_data.append(playlist_entry)
print(f'Successfully processed playlist {playlist_id}')
else:
print(f'Skipping test playlist ID {playlist_id}')
except Exception as e:
print(f'Error retrieving information for playlist {playlist_id}: {e}')
data_to_remove.append(id)
print(f'Added playlist ID {playlist_id} to removal list due to error.')
print(f'Removing {len(data_to_remove)} playlists from the original list.')
self.playlist_urls_or_ids = [item for item in self.playlist_urls_or_ids if item not in data_to_remove]
temp_file = 'playlist_data.json'
with open(temp_file, 'w') as f:
json.dump(playlist_data, f, indent=4)
print(f'Playlist data written to {temp_file}')
# Upload the file to Google Cloud Storage
self.upload_to_bucket('playlist_data.json', temp_file)
print('Uploaded playlist_data.json to Google Cloud Storage')
# Optionally, remove the temporary file if you don't need it locally
os.remove(temp_file)
print(f'Removed local file {temp_file}')
# Update the Firestore database
self.db.collection('config').document('admin').update({"playlistIds": self.playlist_urls_or_ids})
print('Updated Firestore database with new playlist IDs')
def extract_urls(input_string):
# Regular expression for matching URLs
url_regex = r'https?://[^\s]+'
urls = re.findall(url_regex, input_string)
return urls
def get_cache(self): # Get JSON Cache of playlist Data
"""Read data from bucket and parse as JSON."""
bucket = self.storage_client.bucket(self.bucket_name)
blob = bucket.blob('playlist_data.json')
# Download the contents of the blob as a string and then parse it as JSON
json_data = json.loads(blob.download_as_string(client=None))
return json_data
def get_playlists_containing_track(self, track_name, artist_name): # get data about playlists that have track in it
playlist_data = self.get_cache()
results = []
for playlist in playlist_data:
for index, song in enumerate(playlist['songs'], start=1):
if track_name.lower() in song.lower() and artist_name.lower() in song.lower():
results.append({
'playlist_id': playlist['playlist_id'],
'cover_image': playlist['cover_image'],
'position': index
})
return results
Please ignore indentation issues as this is stack overflow's copy & paste issue. Also i have as you can tell, had chat gpt add in debug lines for me already, but it still halts after the print ('Retrieving tracks for playlist ID')