624 lines
21 KiB
Python
624 lines
21 KiB
Python
''' For interfacing with the Subsonic API '''
|
|
|
|
import logging
|
|
import os
|
|
import aiohttp
|
|
|
|
from pathlib import Path
|
|
|
|
from util import env
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# Parameters for the Subsonic API
|
|
SUBSONIC_REQUEST_PARAMS = {
|
|
"u": env.SUBSONIC_USER,
|
|
"p": env.SUBSONIC_PASSWORD,
|
|
"v": "1.15.0",
|
|
"c": "discodrome",
|
|
"f": "json"
|
|
}
|
|
|
|
globalsession = None
|
|
|
|
async def get_session() -> aiohttp.ClientSession:
|
|
''' Get an aiohttp session '''
|
|
global globalsession
|
|
if globalsession is None:
|
|
globalsession = aiohttp.ClientSession()
|
|
return globalsession
|
|
|
|
async def close_session() -> None:
|
|
''' Close the aiohttp session '''
|
|
global globalsession
|
|
if globalsession is not None:
|
|
globalsession.close()
|
|
globalsession = None
|
|
|
|
class APIError(Exception):
|
|
''' Exception raised for errors in the Subsonic API '''
|
|
def __init__(self, errorcode: int, message: str) -> None:
|
|
self.errorcode = errorcode
|
|
self.message = message
|
|
super().__init__(self.message)
|
|
|
|
class Song():
|
|
''' Object representing a song returned from the Subsonic API '''
|
|
def __init__(self, json_object: dict) -> None:
|
|
#! Other properties exist in the initial json response but are currently unused by Discodrome and thus aren't supported here
|
|
self._id: str = json_object["id"] if "id" in json_object else ""
|
|
self._title: str = json_object["title"] if "title" in json_object else "Unknown Track"
|
|
self._album: str = json_object["album"] if "album" in json_object else "Unknown Album"
|
|
self._artist: str = json_object["artist"] if "artist" in json_object else "Unknown Artist"
|
|
self._cover_id: str = json_object["coverArt"] if "coverArt" in json_object else ""
|
|
self._duration: int = json_object["duration"] if "duration" in json_object else 0
|
|
|
|
@property
|
|
def song_id(self) -> str:
|
|
''' The song's id '''
|
|
return self._id
|
|
|
|
@property
|
|
def title(self) -> str:
|
|
''' The song's title '''
|
|
return self._title
|
|
|
|
@property
|
|
def album(self) -> str:
|
|
''' The album containing the song '''
|
|
return self._album
|
|
|
|
@property
|
|
def artist(self) -> str:
|
|
''' The song's artist '''
|
|
return self._artist
|
|
|
|
@property
|
|
def cover_id(self) -> str:
|
|
''' The id of the cover art used by the song '''
|
|
return self._cover_id
|
|
|
|
@property
|
|
def duration(self) -> int:
|
|
''' The total duration of the song '''
|
|
return self._duration
|
|
|
|
@property
|
|
def duration_printable(self) -> str:
|
|
''' The total duration of the song as a human readable string in the format `mm:ss` '''
|
|
return f"{(self._duration // 60):02d}:{(self._duration % 60):02d}"
|
|
|
|
class Album():
|
|
''' Object representing an album returned from subsonic API '''
|
|
def __init__(self, json_object: dict) -> None:
|
|
self._id: str = json_object["id"] if "id" in json_object else ""
|
|
self._name: str = json_object["name"] if "name" in json_object else "Unknown Album"
|
|
self._artist: str = json_object["artist"] if "artist" in json_object else "Unknown Artist"
|
|
self._cover_id: str = json_object["coverArt"] if "coverArt" in json_object else ""
|
|
self._song_count: int = json_object["songCount"] if "songCount" in json_object else 0
|
|
self._duration: int = json_object["duration"] if "duration" in json_object else 0
|
|
self._year: int = json_object["year"] if "year" in json_object else 0
|
|
self._songs: list[Song] = []
|
|
for song in json_object["song"]:
|
|
self._songs.append(Song(song))
|
|
|
|
@property
|
|
def album_id(self) -> str:
|
|
''' The album's id '''
|
|
return self._id
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
''' The album's name '''
|
|
return self._name
|
|
|
|
@property
|
|
def artist(self) -> str:
|
|
''' The album's artist '''
|
|
return self._artist
|
|
|
|
@property
|
|
def cover_id(self) -> str:
|
|
''' The id of the cover art used by the album '''
|
|
return self._cover_id
|
|
|
|
@property
|
|
def song_count(self) -> int:
|
|
''' The number of songs in the album '''
|
|
return self._song_count
|
|
|
|
@property
|
|
def duration(self) -> int:
|
|
''' The total duration of the album '''
|
|
return self._duration
|
|
|
|
@property
|
|
def duration_printable(self) -> str:
|
|
''' The total duration of the album as a human readable string in the format `mm:ss` '''
|
|
return f"{(self._duration // 60):02d}:{(self._duration % 60):02d}"
|
|
|
|
@property
|
|
def year(self) -> int:
|
|
''' The year the album was released '''
|
|
return self._year
|
|
|
|
@property
|
|
def songs(self) -> list[Song]:
|
|
''' The songs in the album '''
|
|
return self._songs
|
|
|
|
class Playlist():
|
|
''' Object representing a playlist returned from subsonic API '''
|
|
def __init__(self, json_object: dict) -> None:
|
|
self._id: str = json_object["id"] if "id" in json_object else ""
|
|
self._name: str = json_object["name"] if "name" in json_object else "Unknown Album"
|
|
self._cover_id: str = json_object["coverArt"] if "coverArt" in json_object else ""
|
|
self._song_count: int = json_object["songCount"] if "songCount" in json_object else 0
|
|
self._duration: int = json_object["duration"] if "duration" in json_object else 0
|
|
self._songs: list[Song] = []
|
|
for song in json_object["entry"]:
|
|
self._songs.append(Song(song))
|
|
|
|
@property
|
|
def playlist_id(self) -> str:
|
|
''' The playlist's id '''
|
|
return self._id
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
''' The playlist's name '''
|
|
return self._name
|
|
|
|
@property
|
|
def cover_id(self) -> str:
|
|
''' The id of the cover art used by the playlist '''
|
|
return self._cover_id
|
|
|
|
@property
|
|
def song_count(self) -> int:
|
|
''' The number of songs in the playlist '''
|
|
return self._song_count
|
|
|
|
@property
|
|
def duration(self) -> int:
|
|
''' The total duration of the playlist '''
|
|
return self._duration
|
|
|
|
@property
|
|
def duration_printable(self) -> str:
|
|
''' The total duration of the playlist as a human readable string in the format `mm:ss` '''
|
|
return f"{(self._duration // 60):02d}:{(self._duration % 60):02d}"
|
|
|
|
@property
|
|
def songs(self) -> list[Song]:
|
|
''' The songs in the playlist '''
|
|
return self._songs
|
|
|
|
async def ping_api() -> bool:
|
|
''' Send a ping request to the subsonic API '''
|
|
|
|
session = await get_session()
|
|
async with await session.get(f"{env.SUBSONIC_SERVER}/rest/ping.view", params=SUBSONIC_REQUEST_PARAMS) as response:
|
|
response.raise_for_status()
|
|
ping_data = await response.json()
|
|
if await check_subsonic_error(ping_data):
|
|
return False
|
|
logger.debug("Ping Response: %s", ping_data)
|
|
|
|
return True
|
|
|
|
async def check_subsonic_error(response: dict[str, any]) -> bool:
|
|
''' Checks and logs error codes returned by the subsonic API. Returns true if an error is present '''
|
|
|
|
logging.debug("Checking for subsonic error...")
|
|
if isinstance(response, aiohttp.ClientResponse):
|
|
try:
|
|
response = await response.json()
|
|
except Exception as e:
|
|
return False
|
|
|
|
if response["subsonic-response"]["status"] == "ok":
|
|
logging.debug("No error found.")
|
|
return False
|
|
|
|
err_code = response["subsonic-response"]["error"]["code"]
|
|
match err_code:
|
|
case 0:
|
|
err_msg = "Generic Error."
|
|
raise APIError(err_code, err_msg)
|
|
case 10:
|
|
err_msg = "Required Parameter Missing."
|
|
raise APIError(err_code, err_msg)
|
|
case 20:
|
|
err_msg = "Incompatible Subsonic REST protocol version. Client must upgrade."
|
|
raise APIError(err_code, err_msg)
|
|
case 30:
|
|
err_msg = "Incompatible Subsonic REST protocol version. Server must upgrade."
|
|
raise APIError(err_code, err_msg)
|
|
case 40:
|
|
err_msg = "Wrong username or password."
|
|
raise APIError(err_code, err_msg)
|
|
case 41:
|
|
err_msg = "Token authentication not supported for LDAP users."
|
|
raise APIError(err_code, err_msg)
|
|
case 50:
|
|
err_msg = "User is not authorized for the given operation."
|
|
raise APIError(err_code, err_msg)
|
|
case 60:
|
|
err_msg = "The trial period for the Subsonic server is over."
|
|
raise APIError(err_code, err_msg)
|
|
case 70:
|
|
err_msg = "The requested data was not found."
|
|
case _:
|
|
err_msg = "Unknown Error Code."
|
|
raise APIError(err_code, err_msg)
|
|
|
|
logger.warning("Subsonic API request responded with error code %s: %s", err_code, err_msg)
|
|
return True
|
|
|
|
async def search(query: str, *, artist_count: int=00, artist_offset: int=0, album_count: int=0, album_offset: int=0, song_count: int=1, song_offset: int=0) -> list[Song]:
|
|
''' Send a search request to the subsonic API '''
|
|
|
|
# Sanitize special characters in the user's query
|
|
#parsed_query = urlParse.quote(query, safe='')
|
|
|
|
search_params = {
|
|
"query": query, #todo: fix parsed query
|
|
"artistCount": str(artist_count),
|
|
"artistOffset": str(artist_offset),
|
|
"albumCount": str(album_count),
|
|
"albumOffset": str(album_offset),
|
|
"songCount": str(song_count),
|
|
"songOffset": str(song_offset)
|
|
}
|
|
|
|
params = SUBSONIC_REQUEST_PARAMS | search_params
|
|
|
|
session = await get_session()
|
|
async with await session.get(f"{env.SUBSONIC_SERVER}/rest/search3.view", params=params) as response:
|
|
response.raise_for_status()
|
|
search_data = await response.json()
|
|
if await check_subsonic_error(search_data):
|
|
return []
|
|
logger.debug("Search Response: %s", search_data)
|
|
|
|
results: list[Song] = []
|
|
|
|
try:
|
|
for item in search_data["subsonic-response"]["searchResult3"]["song"]:
|
|
results.append(Song(item))
|
|
except KeyError:
|
|
return []
|
|
|
|
return results
|
|
|
|
async def search_album(query: str) -> list[Album]:
|
|
''' Send a search request to the subsonic API to return 1 album and all its songs '''
|
|
|
|
# Sanitize special characters in the user's query
|
|
#parsed_query = urlParse.quote(query, safe='')
|
|
|
|
search_params = {
|
|
"query": query,
|
|
"artistCount": "0",
|
|
"albumCount": "1",
|
|
"albumOffset": "0",
|
|
"songCount": "0",
|
|
"songOffset": "0"
|
|
}
|
|
|
|
params = SUBSONIC_REQUEST_PARAMS | search_params
|
|
|
|
session = await get_session()
|
|
async with await session.get(f"{env.SUBSONIC_SERVER}/rest/search3.view", params=params) as response:
|
|
response.raise_for_status()
|
|
search_data = await response.json()
|
|
if await check_subsonic_error(search_data):
|
|
return None
|
|
try:
|
|
albumid = search_data["subsonic-response"]["searchResult3"]["album"][0]["id"]
|
|
except Exception as e:
|
|
return None
|
|
logger.debug("Album ID: %s", albumid)
|
|
|
|
album_params = {
|
|
"id": albumid
|
|
}
|
|
|
|
album_params = SUBSONIC_REQUEST_PARAMS | album_params
|
|
|
|
async with await session.get(f"{env.SUBSONIC_SERVER}/rest/getAlbum.view", params=album_params) as response:
|
|
response.raise_for_status()
|
|
search_data = await response.json()
|
|
if await check_subsonic_error(search_data):
|
|
return None
|
|
logger.debug("Search Response: %s", search_data)
|
|
|
|
|
|
try:
|
|
album = Album(search_data["subsonic-response"]["album"])
|
|
except Exception as e:
|
|
logger.error("Failed to parse album data: %s", e)
|
|
return None
|
|
|
|
return album
|
|
|
|
async def get_user_playlists() -> list[int]:
|
|
''' Retrive metadata of all playlists the Subsonic user is authorised to play '''
|
|
|
|
session = await get_session()
|
|
async with await session.get(f"{env.SUBSONIC_SERVER}/rest/getPlaylists.view", params=SUBSONIC_REQUEST_PARAMS) as response:
|
|
response.raise_for_status()
|
|
query_data = await response.json()
|
|
if await check_subsonic_error(query_data):
|
|
return None
|
|
logger.debug("Playlists query response: %s", query_data)
|
|
|
|
playlists = query_data["subsonic-response"]["playlists"]["playlist"]
|
|
|
|
return playlists
|
|
|
|
async def get_playlist(id: str) -> Playlist:
|
|
''' Retrive the contents of a specific playlist '''
|
|
|
|
playlist_params = {
|
|
"id": id
|
|
}
|
|
|
|
params = SUBSONIC_REQUEST_PARAMS | playlist_params
|
|
|
|
session = await get_session()
|
|
async with await session.get(f"{env.SUBSONIC_SERVER}/rest/getPlaylist.view", params=params) as response:
|
|
response.raise_for_status()
|
|
playlist = await response.json()
|
|
if await check_subsonic_error(playlist):
|
|
return None
|
|
logger.debug("Playlist query response: %s", playlist)
|
|
|
|
try:
|
|
playlist = Playlist(playlist["subsonic-response"]["playlist"])
|
|
except Exception as e:
|
|
logger.error("Failed to parse playlist data: %s", e)
|
|
return None
|
|
|
|
return playlist
|
|
|
|
async def get_artist_id(query: str) -> str:
|
|
''' Send a search request to the subsonic API to return the id of an artist '''
|
|
|
|
search_params = {
|
|
"query": query,
|
|
"artistCount": "1",
|
|
"albumCount": "0",
|
|
"albumOffset": "0",
|
|
"songCount": "0",
|
|
"songOffset": "0"
|
|
}
|
|
|
|
params = SUBSONIC_REQUEST_PARAMS | search_params
|
|
|
|
session = await get_session()
|
|
async with await session.get(f"{env.SUBSONIC_SERVER}/rest/search3.view", params=params) as response:
|
|
response.raise_for_status()
|
|
search_data = await response.json()
|
|
if await check_subsonic_error(search_data):
|
|
return None
|
|
artistid = search_data["subsonic-response"]["searchResult3"]["artist"][0]["id"]
|
|
logger.debug("Artist ID: %s", artistid)
|
|
|
|
return artistid
|
|
|
|
async def get_artist_discography(query: str) -> Album:
|
|
''' Send a search request to the subsonic API to return all albums by an artist '''
|
|
|
|
artistid = await get_artist_id(query)
|
|
|
|
artist_params = {
|
|
"id": artistid
|
|
}
|
|
|
|
artist_params = SUBSONIC_REQUEST_PARAMS | artist_params
|
|
|
|
session = await get_session()
|
|
async with await session.get(f"{env.SUBSONIC_SERVER}/rest/getArtist.view", params=artist_params) as response:
|
|
response.raise_for_status()
|
|
search_data = await response.json()
|
|
if await check_subsonic_error(search_data):
|
|
return None
|
|
logger.debug("Search Response: %s", search_data)
|
|
albums = search_data["subsonic-response"]["artist"]["album"]
|
|
|
|
album_list : list[Album] = []
|
|
|
|
for albuminfo in albums:
|
|
albumid = albuminfo["id"]
|
|
album_params = {
|
|
"id": albumid
|
|
}
|
|
album_params = SUBSONIC_REQUEST_PARAMS | album_params
|
|
async with await session.get(f"{env.SUBSONIC_SERVER}/rest/getAlbum.view", params=album_params) as response:
|
|
response.raise_for_status()
|
|
album = await response.json()
|
|
if await check_subsonic_error(album):
|
|
return None
|
|
logger.debug("Search Response: %s", album)
|
|
album = Album(album["subsonic-response"]["album"])
|
|
album_list.append(album)
|
|
|
|
return album_list
|
|
|
|
|
|
def get_album_art_url(cover_id: str, size: int=300) -> str:
|
|
''' Get the URL for album art from the subsonic API '''
|
|
if not cover_id:
|
|
return None
|
|
params = {
|
|
**SUBSONIC_REQUEST_PARAMS,
|
|
"id": cover_id,
|
|
"size": str(size)
|
|
}
|
|
query_string = "&".join(f"{k}={v}" for k, v in params.items())
|
|
return f"{env.SUBSONIC_SERVER}/rest/getCoverArt?{query_string}"
|
|
|
|
|
|
async def get_album_art_file(cover_id: str, size: int=300) -> str:
|
|
''' Request album art from the subsonic API '''
|
|
target_path = f"cache/{cover_id}.jpg"
|
|
|
|
# Check if the cover art is already cached (TODO: Check for last-modified date?)
|
|
if os.path.exists(target_path):
|
|
return target_path
|
|
|
|
cover_params = {
|
|
"id": cover_id,
|
|
"size": str(size)
|
|
}
|
|
|
|
params = SUBSONIC_REQUEST_PARAMS | cover_params
|
|
|
|
session = await get_session()
|
|
async with await session.get(f"{env.SUBSONIC_SERVER}/rest/getCoverArt", params=params) as response:
|
|
logging.debug("Response: %s", response.content)
|
|
if await check_subsonic_error(response) or response.status != 200:
|
|
return "resources/cover_not_found.jpg"
|
|
|
|
file = Path(target_path)
|
|
file.parent.mkdir(exist_ok=True, parents=True)
|
|
file.write_bytes(await response.read())
|
|
|
|
return target_path
|
|
|
|
async def get_random_songs(size: int=None, genre: str=None, from_year: int=None, to_year: int=None, music_folder_id: str=None) -> list[Song]:
|
|
''' Request random songs from the subsonic API '''
|
|
logger.debug("Requesting random song...")
|
|
search_params: dict[str, any] = {}
|
|
|
|
# Handle Optional params
|
|
if size is not None:
|
|
search_params["size"] = size
|
|
|
|
if genre is not None:
|
|
search_params["genre"] = genre
|
|
|
|
if from_year is not None:
|
|
search_params["fromYear"] = from_year
|
|
|
|
if to_year is not None:
|
|
search_params["toYear"] = to_year
|
|
|
|
if music_folder_id is not None:
|
|
search_params["musicFolderId"] = music_folder_id
|
|
|
|
|
|
params = SUBSONIC_REQUEST_PARAMS | search_params
|
|
|
|
session = await get_session()
|
|
async with await session.get(f"{env.SUBSONIC_SERVER}/rest/getRandomSongs.view", params=params) as response:
|
|
response.raise_for_status()
|
|
search_data = await response.json()
|
|
if await check_subsonic_error(search_data):
|
|
return []
|
|
logger.debug("Search Response: %s", search_data)
|
|
|
|
results: list[Song] = []
|
|
for item in search_data["subsonic-response"]["randomSongs"]["song"]:
|
|
results.append(Song(item))
|
|
|
|
return results
|
|
|
|
async def get_similar_songs(song_id: str, count: int=1) -> list[Song]:
|
|
''' Request similar songs from the subsonic API '''
|
|
|
|
logger.debug("Requesting similar song...")
|
|
logger.debug("Song id: %s", song_id)
|
|
|
|
if song_id is None:
|
|
return []
|
|
|
|
search_params = {
|
|
"id": song_id,
|
|
"count": count
|
|
}
|
|
|
|
params = SUBSONIC_REQUEST_PARAMS | search_params
|
|
|
|
session = await get_session()
|
|
async with await session.get(f"{env.SUBSONIC_SERVER}/rest/getSimilarSongs.view", params=params) as response:
|
|
response.raise_for_status()
|
|
search_data = await response.json()
|
|
logging.debug("Json Response: %s", search_data)
|
|
subsonic_error = await check_subsonic_error(search_data)
|
|
logger.debug("Subsonic error: %s", subsonic_error)
|
|
if subsonic_error:
|
|
logger.debug("Subsonic error. Returning empty list.")
|
|
return []
|
|
|
|
results: list[Song] = []
|
|
|
|
if search_data["subsonic-response"]["similarSongs"] == {}:
|
|
logging.debug("No similar songs found. Returning empty list.")
|
|
return []
|
|
|
|
logger.debug("Similar songs: %s", search_data["subsonic-response"]["similarSongs"]["song"])
|
|
for item in search_data["subsonic-response"]["similarSongs"]["song"]:
|
|
results.append(Song(item))
|
|
|
|
logger.debug("Similar songs: %s", results)
|
|
return results
|
|
|
|
async def stream(stream_id: str):
|
|
''' Send a stream request to the subsonic API '''
|
|
|
|
stream_params = {
|
|
"id": stream_id
|
|
# TODO: handle other params
|
|
}
|
|
|
|
params = SUBSONIC_REQUEST_PARAMS | stream_params
|
|
|
|
session = await get_session()
|
|
async with await session.get(f"{env.SUBSONIC_SERVER}/rest/stream.view", params=params, timeout=20) as response:
|
|
response.raise_for_status()
|
|
if response.content_type == "text/xml":
|
|
logger.error("Failed to stream song: %s", await response.text())
|
|
return None
|
|
return str(response.url)
|
|
|
|
async def scrobble(song_id: str, *, submission: bool=True, timestamp: int=None) -> bool:
|
|
''' Submit a scrobble event for a song to the Subsonic/Navidrome API.
|
|
|
|
Parameters:
|
|
- song_id: The Subsonic song id to scrobble
|
|
- submission: Whether this is a final submission (True) or a now-playing ping (False)
|
|
- timestamp: Optional Unix epoch seconds for when the song was played
|
|
'''
|
|
|
|
scrobble_params: dict[str, any] = {
|
|
"id": song_id,
|
|
"submission": "true" if submission else "false",
|
|
}
|
|
if timestamp is not None:
|
|
# Subsonic expects epoch time in milliseconds
|
|
time_ms = int(timestamp) * 1000
|
|
scrobble_params["time"] = str(time_ms)
|
|
|
|
params = SUBSONIC_REQUEST_PARAMS | scrobble_params
|
|
|
|
try:
|
|
session = await get_session()
|
|
async with await session.get(f"{env.SUBSONIC_SERVER}/rest/scrobble.view", params=params) as response:
|
|
response.raise_for_status()
|
|
data = await response.json()
|
|
if await check_subsonic_error(data):
|
|
logger.warning("Scrobble returned a Subsonic error for song %s", song_id)
|
|
return False
|
|
logger.debug("Scrobble response: %s", data)
|
|
return True
|
|
except aiohttp.ClientError as e:
|
|
logger.error("HTTP error during scrobble for song %s: %s", song_id, e)
|
|
return False
|
|
except Exception as e:
|
|
logger.error("Unexpected error during scrobble for song %s: %s", song_id, e)
|
|
return False
|