"""
Google Play Music integration via gmusicapi.
"""
# pylint: disable=broad-except
# pylint: disable=protected-access
from __future__ import print_function
try: # Python 3.x
from urllib.request import urlopen
except ImportError: # Python 2.x
from urllib import urlopen
try:
from PIL import Image
except ImportError:
Image = None
from io import BytesIO
from hashlib import sha1
from threading import Thread, Lock
from uuid import UUID
from gmusicapi.clients import Mobileclient
from clay.eventhook import EventHook
from clay.log import logger
from clay.settings import settings
STATION_FETCH_LEN = 50
[docs]def asynchronous(func):
"""
Decorates a function to become asynchronous.
Once called, runs original function in a new Thread.
Must be called with a 'callback' argument that will be called
once thread with original function finishes. Receives two args:
result and error.
- "result" contains function return value or None if there was an exception.
- "error" contains None or Exception if there was one.
"""
def wrapper(*args, **kwargs):
"""
Inner function.
"""
callback = kwargs.pop('callback')
extra = kwargs.pop('extra', dict())
def process():
"""
Thread body.
"""
try:
result = func(*args, **kwargs)
except Exception as error:
callback(None, error, **extra)
else:
callback(result, None, **extra)
Thread(target=process).start()
return wrapper
[docs]def synchronized(func):
"""
Decorates a function to become thread-safe by preventing
it from being executed multiple times before previous calls end.
Lock is acquired on entrance and is released on return or Exception.
"""
lock = Lock()
def wrapper(*args, **kwargs):
"""
Inner function.
"""
try:
lock.acquire()
return func(*args, **kwargs)
finally:
lock.release()
return wrapper
[docs]class Track(object):
"""
Model that represents single track from Google Play Music.
"""
TYPE_UPLOADED = 'uploaded'
TYPE_STORE = 'store'
SOURCE_LIBRARY = 'library'
SOURCE_STATION = 'station'
SOURCE_PLAYLIST = 'playlist'
SOURCE_SEARCH = 'search'
[docs] def __init__(self, source, data):
# In playlist items and user uploaded songs the storeIds are missing so
self.store_id = (data['storeId'] if 'storeId' in data else data.get('id'))
self.playlist_item_id = (UUID(data['id']) if source == self.SOURCE_PLAYLIST else None)
self.library_id = (UUID(data['id']) if source == self.SOURCE_LIBRARY else None)
# To filter out the playlist items we need to reassign the store_id when fetching the track
if 'track' in data:
data = data['track']
self.store_id = data['storeId']
artist_art_ref = next(iter(sorted(
[
ref
for ref
in data.get('artistArtRef', [])
],
key=lambda x: x['aspectRatio']
)), None)
self.title = data['title']
self.artist = data['artist']
self.duration = int(data['durationMillis'])
self.rating = (int(data['rating']) if 'rating' in data else 0)
self.source = source
self.cached_url = None
self.artist_art_url = None
self.artist_art_filename = None
if artist_art_ref is not None:
self.artist_art_url = artist_art_ref['url']
self.artist_art_filename = sha1(
self.artist_art_url.encode('utf-8')
).hexdigest() + u'.jpg'
self.explicit_rating = (int(data['explicitType']))
if self.rating == 5:
gp.cached_liked_songs.add_liked_song(self)
# User uploaded songs miss a store_id
self.album_name = data['album']
self.album_url = (data['albumArtRef'][0]['url'] if 'albumArtRef' in data else "")
self.original_data = data
@property
def id(self): # pylint: disable=invalid-name
"""
Return ID for this track.
"""
if self.library_id:
return self.library_id
return self.store_id
@property
def filename(self):
"""
Return a filename for this track.
"""
return self.store_id + '.mp3'
[docs] def __eq__(self, other):
return (
(self.library_id and self.library_id == other.library_id) or
(self.store_id and self.store_id == other.store_id) or
(self.playlist_item_id and self.playlist_item_id == other.playlist_item_id)
)
[docs] @classmethod
def from_data(cls, data, source, many=False):
"""
Construct and return one or many :class:`.Track` instances
from Google Play Music API response.
"""
if many:
return [track for track in
[cls.from_data(one, source) for one in data]
if track is not None]
try:
if source == cls.SOURCE_PLAYLIST and 'track' not in data:
track = gp.get_track_by_id(UUID(data['trackId']))
else:
track = Track(source, data)
return track
except Exception as error: # pylint: disable=bare-except
logger.error(
'Failed to parse track data: %s, failing data: %s',
repr(error),
data
)
# TODO: Fix this.
# print('Failed to create track from data.')
# print('Failing payload was:')
# print(data)
# raise Exception(
# 'Failed to create track from data. Original error: {}. Payload: {}'.format(
# str(error),
# data
# )
# )
return None
raise AssertionError()
[docs] def get_url(self, callback):
"""
Gets playable stream URL for this track.
"callback" is called with "(url, error)" args after URL is fetched.
Keep in mind this URL is valid for a limited time.
"""
def on_get_url(url, error):
"""
Called when URL is fetched.
"""
self.cached_url = url
callback(url, error, self)
if gp.is_subscribed:
track_id = self.store_id
else:
track_id = self.library_id
gp.get_stream_url_async(track_id, callback=on_get_url)
[docs] @synchronized
def get_artist_art_filename(self):
"""
Return artist art filename, None if this track doesn't have any.
Downloads if necessary.
"""
if self.artist_art_url is None:
return None
if not settings.get_is_file_cached(self.artist_art_filename):
response = urlopen(self.artist_art_url)
data = response.read()
if Image:
image = Image.open(BytesIO(data))
image.thumbnail((128, 128))
out = BytesIO()
image.save(out, format='JPEG')
data = out.getvalue()
settings.save_file_to_cache(self.artist_art_filename, data)
return settings.get_cached_file_path(self.artist_art_filename)
# get_artist_arg_filename_async = asynchronous(get_artist_art_filename)
[docs] @synchronized
def create_station(self):
"""
Creates a new station from this :class:`.Track`.
Returns :class:`.Station` instance.
"""
station_name = u'Station - {}'.format(self.title)
station_id = gp.mobile_client.create_station(
name=station_name,
track_id=self.store_id
)
station = Station(station_id, station_name)
station.load_tracks()
return station
create_station_async = asynchronous(create_station)
[docs] def add_to_my_library(self):
"""
Add a track to my library.
"""
return gp.add_to_my_library(self)
add_to_my_library_async = asynchronous(add_to_my_library)
[docs] def remove_from_my_library(self):
"""
Remove a track from my library.
"""
return gp.remove_from_my_library(self)
remove_from_my_library_async = asynchronous(remove_from_my_library)
[docs] def rate_song(self, rating):
"""
Rate the song either 0 (no thumb), 1 (down thumb) or 5 (up thumb).
"""
gp.mobile_client.rate_songs(self.original_data, rating)
self.original_data['rating'] = rating
self.rating = rating
if rating == 5:
gp.cached_liked_songs.add_liked_song(self)
[docs] def __str__(self):
return u'<Track "{} - {}" from {}>'.format(
self.artist,
self.title,
self.source
)
__repr__ = __str__
[docs]class Artist(object):
"""
Model that represents an artist.
"""
[docs] def __init__(self, artist_id, name):
self._id = artist_id
self.name = name
@property
def id(self): # pylint: disable=invalid-name
"""
Artist ID.
"""
return self._id
[docs] @classmethod
def from_data(cls, data, many=False):
"""
Construct and return one or many :class:`.Artist` instances
from Google Play Music API response.
"""
if many:
return [cls.from_data(one) for one in data]
return Artist(
artist_id=data['artistId'],
name=data['name']
)
[docs]class Station(object):
"""
Model that represents specific station on Google Play Music.
"""
[docs] def __init__(self, station_id, name):
self.name = name
self._id = station_id
self._tracks = []
self._tracks_loaded = False
@property
def id(self): # pylint: disable=invalid-name
"""
Station ID.
"""
return self._id
[docs] def load_tracks(self):
"""
Fetch tracks related to this station and
populate it with :class:`Track` instances.
"""
data = gp.mobile_client.get_station_tracks(self.id, STATION_FETCH_LEN)
self._tracks = Track.from_data(data, Track.SOURCE_STATION, many=True)
self._tracks_loaded = True
return self
load_tracks_async = asynchronous(load_tracks)
[docs] def get_tracks(self):
"""
Return a list of tracks in this station.
"""
assert self._tracks_loaded, 'Must call ".load_tracks()" before ".get_tracks()"'
return self._tracks
[docs] @classmethod
def from_data(cls, data, many=False):
"""
Construct and return one or many :class:`.Station` instances
from Google Play Music API response.
"""
if many:
return [cls.from_data(one) for one in data if one['inLibrary']]
return Station(
station_id=data['id'],
name=data['name']
)
[docs]class SearchResults(object):
"""
Model that represents search results including artists & tracks.
"""
[docs] def __init__(self, tracks, artists):
self.artists = artists
self.tracks = tracks
[docs] @classmethod
def from_data(cls, data):
"""
Construct and return :class:`.SearchResults` instance from raw data.
"""
return SearchResults(
tracks=Track.from_data(data['song_hits'], Track.SOURCE_SEARCH, many=True),
artists=Artist.from_data([
item['artist']
for item
in data['artist_hits']
], many=True)
)
[docs] def get_artists(self):
"""
Return found artists.
"""
return self.artists
[docs] def get_tracks(self):
"""
Return found tracks.
"""
return self.tracks
[docs]class Playlist(object):
"""
Model that represents remotely stored (Google Play Music) playlist.
"""
[docs] def __init__(self, playlist_id, name, tracks):
self._id = playlist_id
self.name = name
self.tracks = tracks
@property
def id(self): # pylint: disable=invalid-name
"""
Playlist ID.
"""
return self._id
[docs] @classmethod
def from_data(cls, data, many=False):
"""
Construct and return one or many :class:`.Playlist` instances
from Google Play Music API response.
"""
if many:
return [cls.from_data(one) for one in data]
return Playlist(
playlist_id=data['id'],
name=data['name'],
tracks=Track.from_data(data['tracks'], Track.SOURCE_PLAYLIST, many=True)
)
[docs]class LikedSongs(object):
"""
A local model that represents the songs that a user liked and displays them as a faux playlist.
This mirrors the "liked songs" generated playlist feature of the Google Play Music apps.
"""
[docs] def __init__(self):
self._id = None # pylint: disable=invalid-name
self.name = "Liked Songs"
self._tracks = []
self._sorted = False
@property
def tracks(self):
"""
Get a sorted list of liked tracks.
"""
if self._sorted:
tracks = self._tracks
else:
self._tracks.sort(key=lambda k: k.original_data.get('lastRatingChangeTimestamp', '0'),
reverse=True)
self._sorted = True
tracks = self._tracks
return tracks
[docs] def add_liked_song(self, song):
"""
Add a liked song to the list.
"""
self._tracks.insert(0, song)
[docs] def remove_liked_song(self, song):
"""
Remove a liked song from the list
"""
self._tracks.remove(song)
[docs]class _GP(object):
"""
Interface to :class:`gmusicapi.Mobileclient`. Implements
asynchronous API calls, caching and some other perks.
Singleton.
"""
# TODO: Switch to urwid signals for more explicitness?
caches_invalidated = EventHook()
[docs] def __init__(self):
# self.is_debug = os.getenv('CLAY_DEBUG')
self.mobile_client = Mobileclient()
self.mobile_client._make_call = self._make_call_proxy(
self.mobile_client._make_call
)
# if self.is_debug:
# self.debug_file = open('/tmp/clay-api-log.json', 'w')
# self._last_call_index = 0
self.cached_tracks = None
self.cached_liked_songs = LikedSongs()
self.cached_playlists = None
self.cached_stations = None
self.invalidate_caches()
self.auth_state_changed = EventHook()
[docs] def _make_call_proxy(self, func):
"""
Return a function that wraps *fn* and logs args & return values.
"""
def _make_call(protocol, *args, **kwargs):
"""
Wrapper function.
"""
logger.debug('GP::{}(*{}, **{})'.format(
protocol.__name__,
args,
kwargs
))
result = func(protocol, *args, **kwargs)
# self._last_call_index += 1
# call_index = self._last_call_index
# self.debug_file.write(json.dumps([
# call_index,
# protocol.__name__, args, kwargs,
# result
# ]) + '\n')
# self.debug_file.flush()
return result
return _make_call
[docs] def invalidate_caches(self):
"""
Clear cached tracks & playlists & stations.
"""
self.cached_tracks = None
self.cached_playlists = None
self.cached_stations = None
self.caches_invalidated.fire()
[docs] @synchronized
def login(self, email, password, device_id, **_):
"""
Log in into Google Play Music.
"""
self.mobile_client.logout()
self.invalidate_caches()
# prev_auth_state = self.is_authenticated
result = self.mobile_client.login(email, password, device_id)
# if prev_auth_state != self.is_authenticated:
self.auth_state_changed.fire(self.is_authenticated)
return result
login_async = asynchronous(login)
[docs] @synchronized
def use_authtoken(self, authtoken, device_id):
"""
Try to use cached token to log into Google Play Music.
"""
# pylint: disable=protected-access
self.mobile_client.session._authtoken = authtoken
self.mobile_client.session.is_authenticated = True
self.mobile_client.android_id = device_id
del self.mobile_client.is_subscribed
if self.mobile_client.is_subscribed:
self.auth_state_changed.fire(True)
return True
del self.mobile_client.is_subscribed
self.mobile_client.android_id = None
self.mobile_client.session.is_authenticated = False
self.auth_state_changed.fire(False)
return False
use_authtoken_async = asynchronous(use_authtoken)
[docs] def get_authtoken(self):
"""
Return currently active auth token.
"""
# pylint: disable=protected-access
return self.mobile_client.session._authtoken
[docs] @synchronized
def get_all_tracks(self):
"""
Cache and return all tracks from "My library".
Each track will have "id" and "storeId" keys.
"""
if self.cached_tracks:
return self.cached_tracks
data = self.mobile_client.get_all_songs()
self.cached_tracks = Track.from_data(data, Track.SOURCE_LIBRARY, True)
return self.cached_tracks
get_all_tracks_async = asynchronous(get_all_tracks)
[docs] def get_stream_url(self, stream_id):
"""
Returns playable stream URL of track by id.
"""
return self.mobile_client.get_stream_url(stream_id)
get_stream_url_async = asynchronous(get_stream_url)
[docs] @synchronized
def get_all_user_station_contents(self, **_):
"""
Return list of :class:`.Station` instances.
"""
if self.cached_stations:
return self.cached_stations
self.get_all_tracks()
self.cached_stations = Station.from_data(
self.mobile_client.get_all_stations(),
True
)
return self.cached_stations
get_all_user_station_contents_async = ( # pylint: disable=invalid-name
asynchronous(get_all_user_station_contents)
)
[docs] @synchronized
def get_all_user_playlist_contents(self, **_):
"""
Return list of :class:`.Playlist` instances.
"""
if self.cached_playlists:
return [self.cached_liked_songs] + self.cached_playlists
self.get_all_tracks()
self.cached_playlists = Playlist.from_data(
self.mobile_client.get_all_user_playlist_contents(),
True
)
return [self.cached_liked_songs] + self.cached_playlists
get_all_user_playlist_contents_async = ( # pylint: disable=invalid-name
asynchronous(get_all_user_playlist_contents)
)
[docs] def get_cached_tracks_map(self):
"""
Return a dictionary of tracks where keys are strings with track IDs
and values are :class:`.Track` instances.
"""
return {track.id: track for track in self.cached_tracks}
[docs] def get_track_by_id(self, any_id):
"""
Return track by id or store_id.
"""
for track in self.cached_tracks:
if any_id in (track.library_id, track.store_id, track.playlist_item_id):
return track
return None
[docs] def search(self, query):
"""
Find tracks and return an instance of :class:`.SearchResults`.
"""
results = self.mobile_client.search(query)
return SearchResults.from_data(results)
search_async = asynchronous(search)
[docs] def add_to_my_library(self, track):
"""
Add a track to my library.
"""
result = self.mobile_client.add_store_tracks(track.id)
if result:
self.invalidate_caches()
return result
[docs] def remove_from_my_library(self, track):
"""
Remove a track from my library.
"""
result = self.mobile_client.delete_songs(track.id)
if result:
self.invalidate_caches()
return result
@property
def is_authenticated(self):
"""
Return True if user is authenticated on Google Play Music, false otherwise.
"""
return self.mobile_client.is_authenticated()
@property
def is_subscribed(self):
"""
Return True if user is subscribed on Google Play Music, false otherwise.
"""
return self.mobile_client.is_subscribed
gp = _GP() # pylint: disable=invalid-name