#!/usr/bin/env python3 import concurrent.futures import configparser import logging import os import queue import re import socket import threading from mcg.utils import SortOrder from mcg.utils import Utils class MPDException(Exception): def __init__(self, error): super(MPDException, self).__init__(self._parse_error(error)) self._error = error def _parse_error(self, error): if error: parts = re.match("\[(\d+)@(\d+)\]\s\{(\w+)\}\s(.*)", error) if parts: self._error_number = int(parts.group(1)) self._command_number = int(parts.group(2)) self._command_name = parts.group(3) return parts.group(4) return error def get_error(self): return self._error def get_error_number(self): return self._error_number def get_command_number(self): return self._command_number def get_command_name(self): return self._command_name class ConnectionException(MPDException): pass class ProtocolException(MPDException): pass class CommandException(MPDException): pass class Future(concurrent.futures.Future): def __init__(self, signal): concurrent.futures.Future.__init__(self) self._signal = signal def get_signal(self): return self._signal class Base(): def __init__(self): self._callbacks = {} def connect_signal(self, signal, callback): """Connect a callback function to a signal (event).""" self._callbacks[signal] = callback def disconnect_signal(self, signal): """Disconnect a callback function from a signal (event).""" if self._has_callback(signal): del self._callbacks[signal] def _has_callback(self, signal): """Check if there is a registered callback function for a signal.""" return signal in self._callbacks def _callback(self, signal, *data): if signal in self._callbacks: callback = self._callbacks[signal] callback(*data) def _callback_future(self, future): self._callback(future.get_signal(), *future.result()) class Client(Base): """Client library for handling the connection to the Music Player Daemon. This class implements an album-based MPD client. It offers a non-blocking threaded worker model for use in graphical environments. """ # Protocol: greeting mark PROTOCOL_GREETING = 'OK MPD ' # Protocol: completion mark PROTOCOL_COMPLETION = 'OK' # Protocol: error mark PROTOCOL_ERROR = 'ACK ' # Protocol: error: permission PROTOCOL_ERROR_PERMISSION = 4 # Protocol: error: no exists PROTOCOL_ERROR_NOEXISTS = 50 # Signal: connection status SIGNAL_CONNECTION = 'connection' # Signal: status SIGNAL_STATUS = 'status' # Signal: stats SIGNAL_STATS = 'stats' # Signal: init loading of albums SIGNAL_INIT_ALBUMS = 'init-albums' # Signal: pulse loading of albums SIGNAL_PULSE_ALBUMS = 'pulse-albums' # Signal: load albums SIGNAL_LOAD_ALBUMS = 'load-albums' # Signal: load playlist SIGNAL_LOAD_PLAYLIST = 'load-playlist' # Signal: load audio output devices SIGNAL_LOAD_OUTPUT_DEVICES = 'load-output-devices' # Signal: load albumart SIGNAL_LOAD_ALBUMART = 'albumart' # Signal: custom (dummy) event to trigger callback SIGNAL_CUSTOM = 'custom' # Signal: error SIGNAL_ERROR = 'error' # Buffer size for reading from socket SOCKET_BUFSIZE = 4096 def __init__(self): """Set class variables and instantiates the Client.""" Base.__init__(self) self._logger = logging.getLogger(__name__) self._sock = None self._buffer = bytearray() self._sock_write = None self._stop = threading.Event() self._actions = queue.Queue() self._worker = None self._idling = False self._host = None self._albums = {} self._playlist = [] self._state = None def get_logger(self): return self._logger # Client commands def connect(self, host, port, password=None): """Connect to MPD with the given host, port and password or with standard values. """ self._logger.info("connect") self._host = host self._add_action(self._connect, host, port, password) self._stop.clear() self._start_worker() def is_connected(self): """Return the connection status.""" return self._worker is not None and self._worker.is_alive() def disconnect(self): """Disconnect from the connected MPD.""" self._logger.info("disconnect") self._stop.set() self._add_action(self._disconnect) def join(self): self._actions.join() def get_status(self): """Determine the current status.""" self._logger.info("get status") self._add_action_signal(Client.SIGNAL_STATUS, self._get_status) def get_stats(self): """Load statistics.""" self._logger.info("get stats") self._add_action_signal(Client.SIGNAL_STATS, self._get_stats) def get_output_devices(self): """Determine the list of audio output devices.""" self._logger.info("get output devices") self._add_action_signal(Client.SIGNAL_LOAD_OUTPUT_DEVICES, self._get_output_devices) def enable_output_device(self, device, enabled): """Enable/disable an audio output device.""" self._logger.info("enable output device") self._add_action(self._enable_output_device, device, enabled) def load_albums(self): self._logger.info("load albums") self._add_action_signal(Client.SIGNAL_LOAD_ALBUMS, self._load_albums) def update(self): self._logger.info("update") self._add_action(self._update) def load_playlist(self): self._logger.info("load playlist") self._add_action_signal(Client.SIGNAL_LOAD_PLAYLIST, self._load_playlist) def clear_playlist(self): """Clear the current playlist""" self._logger.info("clear playlist") self._add_action(self._clear_playlist) def remove_album_from_playlist(self, album): """Remove the given album from the playlist.""" self._logger.info("remove album from playlist") self._add_action(self._remove_album_from_playlist, album) def remove_albums_from_playlist(self, albums): """Remove multiple albums from the playlist in one step.""" self._logger.info("remove multiple albums from playlist") self._add_action(self._remove_albums_from_playlist, albums) def play_album_from_playlist(self, album): """Play the given album from the playlist.""" self._logger.info("play album from playlist") self._add_action(self._play_album_from_playlist, album) def playpause(self): """Play or pauses the current state.""" self._logger.info("playpause") self._add_action(self._playpause) def play_album(self, album): """Add the given album to the queue and play it immediately.""" self._logger.info("play album") self._add_action(self._play_album, album) def queue_album(self, album): """Add the given album to the queue.""" self._logger.info("play album") self._add_action(self._queue_album, album) def queue_albums(self, albums): """Add the given albums to the queue.""" self._logger.info("play albums") self._add_action(self._queue_albums, albums) def seek(self, pos, time): """Seeks to a song at a position""" self._logger.info("seek") self._add_action(self._seek, pos, time) def stop(self): self._logger.info("stop") self._add_action(self._stop) def set_volume(self, volume): self._logger.info("set volume") self._add_action(self._set_volume, volume) def get_albumart(self, album): self._logger.info("get albumart") self._add_action_signal(Client.SIGNAL_LOAD_ALBUMART, self._get_albumart, album) def get_albumart_now(self, album): self._logger.info("get albumart now") future = concurrent.futures.Future() self._add_action_future(future, self._get_albumart, album) (_, albumart) = future.result() return albumart def get_custom(self, name): self._logger.info("get custom \"%s\"", name) self._add_action_signal(Client.SIGNAL_CUSTOM, self._get_custom, name) # Private methods def _connect(self, host, port, password): self._logger.info("connecting to host %r, port %r", host, port) if self._sock is not None: return try: self._sock = self._connect_socket(host, port) self._sock_write = self._sock.makefile("w", encoding="utf-8") self._greet() self._logger.info("connected") if password: self._logger.info("setting password") self._call("password", password) self._set_connection_status(True) except OSError as e: raise ConnectionException("connection failed: {}".format(e)) def _connect_socket(self, host, port): sock = None error = None for res in socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM, socket.IPPROTO_TCP): af, socktype, proto, canonname, sa = res try: sock = socket.socket(af, socktype, proto) sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) sock.connect(sa) return sock except Exception as e: error = e if sock is not None: sock.close() break if error is not None: raise ConnectionException("connection failed: {}".format(error)) else: raise ConnectionException("no suitable socket") def _greet(self): greeting = self._read_line() self._logger.debug("greeting: %s", greeting.strip()) if not greeting.startswith(Client.PROTOCOL_GREETING): self._disconnect_socket() raise ProtocolException("invalid greeting: {}".format(greeting)) self._protocol_version = greeting[len(Client.PROTOCOL_GREETING):].strip() self._logger.debug("protocol version: %s", self._protocol_version) def _disconnect(self): self._logger.info("disconnecting") self._disconnect_socket() def _disconnect_socket(self): if self._sock_write is not None: self._sock_write.close() self._sock_write = None if self._sock is not None: self._sock.close() self._sock = None self._logger.info("disconnected") self._set_connection_status(False) def _idle(self): """React to idle events from MPD.""" self._logger.info("idle") self._idling = True subsystems = self._parse_dict(self._call("idle")) self._idling = False self._logger.info("idle subsystems: %r", subsystems) if subsystems: if subsystems['changed'] == 'player': self.load_playlist() self.get_status() if subsystems['changed'] == 'mixer': self.get_status() if subsystems['changed'] == 'playlist': self.load_playlist() if subsystems['changed'] == 'database': self.load_albums() self.load_playlist() self.get_status() if subsystems['changed'] == 'update': self.load_albums() self.load_playlist() self.get_status() if subsystems['changed'] == 'output': self.get_output_devices() self.get_status() def _noidle(self): if self._idling: self._logger.debug("noidle") self._write("noidle") def _get_status(self): """Action: Perform the real status determination.""" self._logger.info("getting status") status = self._parse_dict(self._call("status")) self._logger.debug("status: %r", status) # State state = None if 'state' in status: state = status['state'] self._state = state # Time time = 0 if 'time' in status: time = int(status['time'].split(':')[0]) # Volume volume = 0 if 'volume' in status: volume = int(status['volume']) # Error error = None if 'error' in status: error = status['error'] # Album file = None album = None pos = 0 song = self._parse_dict(self._call("currentsong")) if song: # File if 'file' in song: file = song['file'] # Track track = self._extract_playlist_track(song) if track: # Album album = self._extract_album(song) # Position pos = track.get_pos() for palbum in self._playlist: if palbum == album and len(palbum.get_tracks()) >= pos: album = palbum break pos = pos - len(palbum.get_tracks()) # Audio audio = None if 'audio' in status: audio = status['audio'] # Bitrate bitrate = None if 'bitrate' in status: bitrate = status['bitrate'] return (state, album, pos, time, volume, file, audio, bitrate, error) def _get_stats(self): """Action: Perform the real statistics gathering.""" self._logger.info("getting statistics") stats = self._parse_dict(self._call("stats")) self._logger.debug("stats: %r", stats) # Artists artists = 0 if 'artists' in stats: artists = int(stats['artists']) # Albums albums = 0 if 'albums' in stats: albums = int(stats['albums']) # Songs songs = 0 if 'songs' in stats: songs = int(stats['songs']) # Database playtime dbplaytime = 0 if 'db_playtime' in stats: dbplaytime = stats['db_playtime'] # Playtime playtime = 0 if 'playtime' in stats: playtime = stats['playtime'] # Uptime uptime = 0 if 'uptime' in stats: uptime = stats['uptime'] return (artists, albums, songs, dbplaytime, playtime, uptime) def _get_output_devices(self): """Action: Perform the real loading of output devices.""" devices = [] for output in self._parse_list(self._call('outputs'), ['outputid']): device = OutputDevice(output['outputid'], output['outputname']) device.set_enabled(int(output['outputenabled']) == 1) devices.append(device) return (devices, ) def _enable_output_device(self, device, enabled): """Action: Perform the real enabling/disabling of an output device.""" if enabled: self._call('enableoutput ', device.get_id()) else: self._call('disableoutput ', device.get_id()) def _load_albums(self): """Action: Perform the real update.""" self._callback(Client.SIGNAL_INIT_ALBUMS) self._albums = {} # Albums for album in self._parse_list(self._call('list album'), ['album']): self._callback(Client.SIGNAL_PULSE_ALBUMS) # Album album = self._extract_album(album) self._logger.debug("album: %r", album) # Tracks for song in self._parse_list(self._call('find album ', album.get_title()), ['file']): track = self._extract_track(song) if track: self._logger.debug("track: %r", track) album.add_track(track) return (self._albums, ) def _update(self): self._call('update') def _load_playlist(self): self._playlist = [] for song in self._parse_list(self._call('playlistinfo'), ['file', 'playlist']): self._logger.debug("song: %r", song) # Track track = self._extract_playlist_track(song) self._logger.debug("track: %r", track) # Album album = self._extract_album(song, lookup=False) if len(self._playlist) == 0 or self._playlist[len(self._playlist)-1] != album: self._playlist.append(album) else: album = self._playlist[len(self._playlist)-1] self._logger.debug("album: %r", album) if track: album.add_track(track) return (self._playlist, ) def _clear_playlist(self): """Action: Perform the real clearing of the current playlist.""" self._call('clear') def _remove_album_from_playlist(self, album): self._call_list('command_list_begin') for track in album.get_tracks(): self._call_list('deleteid', track.get_id()) self._call('command_list_end') def _remove_albums_from_playlist(self, albums): self._call_list('command_list_begin') for album in albums: for track in album.get_tracks(): self._call_list('deleteid', track.get_id()) self._call('command_list_end') def _play_album_from_playlist(self, album): if album.get_tracks(): self._call('playid', album.get_tracks()[0].get_id()) def _playpause(self): """Action: Perform the real play/pause command.""" #status = self._parse_dict(self._call('status')) #if 'state' in status: if self._state == 'play': self._call('pause') else: self._call('play') def _play_album(self, album): track_ids = self._queue_album(album) if track_ids: self._logger.info("play track %d", track_ids[0]) self._call('playid', track_ids[0]) def _queue_album(self, album): track_ids = [] if album in self._albums: self._logger.info("add album %s", album) for track in self._albums[album].get_tracks(): self._logger.info("addid: %r", track.get_file()) track_id = None track_id_response = self._parse_dict(self._call('addid', track.get_file())) if 'id' in track_id_response: track_id = track_id_response['id'] self._logger.debug("track id: %r", track_id) if track_id is not None: track_ids.append(track_id) return track_ids def _queue_albums(self, albums): track_ids = [] for album in albums: track_ids.extend(self._queue_album(album)) def _seek(self, pos, time): self._call('seek', pos, time) def _stop(self): self._call('stop') def _set_volume(self, volume): self._call('setvol', volume) def _get_albumart(self, album): data = None if album in self._albums: album = self._albums[album] if not album.get_tracks(): return None self._logger.debug("get albumart for album \"%s\"", album.get_title()) size = 1 offset = 0 index = 0 # Read data until size is reached try: while offset < size: self._write('albumart', args=[album.get_tracks()[0].get_file(), offset]) # Read first line which tells us whether there is an albumart line = self._read_line() if line.startswith(Client.PROTOCOL_ERROR): error = line[len(Client.PROTOCOL_ERROR):].strip() self._logger.debug("command failed: %r", error) raise CommandException(error) # First line is the file size size = int(self._parse_dict([line])['size']) self._logger.debug("size: %d", size) # Second line is the count of bytes read binary = int(self._parse_dict([self._read_line()])['binary']) self._logger.debug("binary: %d", binary) # Create new data array on the first iteration if not data: data = bytearray(size) # Create a view for the current chunk of data data_view = memoryview(data)[offset:offset+binary] # Read actual bytes self._read_bytes(data_view, binary) offset += binary # Read line break to complete previous repsonse self._read_line() # Read command completion end = self._read_line() if not end.startswith(Client.PROTOCOL_COMPLETION): self._logger.debug("albumart not completed") data = None break except CommandException as e: # If no albumart can be found, do not throw an exception if e.get_error_number() == Client.PROTOCOL_ERROR_NOEXISTS: data = None else: raise e return (album, data) def _get_custom(self, name): return (name, ) def _start_worker(self): """Start the worker thread which waits for action to be performed.""" self._logger.debug("start worker") self._worker = threading.Thread(target=self._run, name='mcg-worker', args=()) self._worker.setDaemon(True) self._worker.start() self._logger.debug("worker started") def _run(self): while not self._stop.is_set() or not self._actions.empty(): if self._sock is not None and self._actions.empty(): self._add_action(self._idle) action = self._actions.get() self._logger.debug("next action: %r", action) self._work(action) self._actions.task_done() self._logger.debug("action done") self._logger.debug("worker finished") def _add_action(self, method, *args): """Add an action to the action list.""" self._logger.debug("add action %r (%r)", method.__name__, args) future = concurrent.futures.Future() action = (future, method, args) self._actions.put(action) self._noidle() return future def _add_action_signal(self, signal, method, *args): """Add an action to the action list that triggers a callback.""" self._logger.debug("add action signal %r: %r (%r)", signal, method.__name__, args) future = Future(signal) future.add_done_callback(self._callback_future) self._add_action_future(future, method, *args) return future def _add_action_future(self, future, method, *args): """Add an action to the action list based on a futre.""" self._logger.debug("add action future %r (%r)", method.__name__, args) action = (future, method, args) self._actions.put(action) self._noidle() def _work(self, action): (future, method, args) = action self._logger.debug("work: %r", method.__name__) try: result = method(*args) future.set_result(result) except ConnectionException as e: self._logger.exception(e) future.set_exception(e) self._callback(Client.SIGNAL_ERROR, e) self._disconnect_socket() except Exception as e: self._logger.exception(e) future.set_exception(e) self._callback(Client.SIGNAL_ERROR, e) def _call(self, command, *args): try: self._write(command, args) return self._read() except MPDException as e: if command == 'idle' and e.get_error_number() == Client.PROTOCOL_ERROR_PERMISSION: self.disconnect() self._callback(Client.SIGNAL_ERROR, e) def _call_list(self, command, *args): try: self._write(command, args) except MPDException as e: if command == 'idle' and e.get_error_number() == Client.PROTOCOL_ERROR_PERMISSION: self.disconnect() self._callback(Client.SIGNAL_ERROR, e) def _write(self, command, args=None): if args is not None and len(args) > 0: line = '{} "{}"\n'.format(command, '" "'.join(str(x).replace('"', '\\\"') for x in args)) else: line = '{}\n'.format(command) self._logger.debug("write: %r", line) self._sock_write.write(line) self._sock_write.flush() def _read(self): self._logger.debug("reading response") response = [] line = self._read_line() while not line.startswith(Client.PROTOCOL_COMPLETION) and not line.startswith(Client.PROTOCOL_ERROR): response.append(line.strip()) line = self._read_line() if line.startswith(Client.PROTOCOL_COMPLETION): self._logger.debug("response complete") if line.startswith(Client.PROTOCOL_ERROR): error = line[len(Client.PROTOCOL_ERROR):].strip() self._logger.debug("command failed: %r", error) raise CommandException(error) self._logger.debug("response: %r", response) return response def _read_line(self): self._logger.debug("reading line") # Read from the buffer data = self._buffer_get_char(b'\x0A') if not data.endswith(b'\x0A'): # Read more from socket until next line break while b'\x0A' not in data: buf = self._sock.recv(Client.SOCKET_BUFSIZE) if buf: data += buf else: break # Get first line from data, add rest to buffer if data: lines = data.split(b'\x0A', 1) data = lines[0] self._buffer_set(lines[1]) if data: return data.decode('utf-8') return None def _read_bytes(self, buf, nbytes): self._logger.debug("reading bytes") # Use already buffered data buf_read = self._buffer_get_size(nbytes) nbytes_read = len(buf_read) buf[0:nbytes_read] = buf_read # Read additional data from socket nbytes = nbytes - nbytes_read if nbytes > 0: buf_view = memoryview(buf)[nbytes_read:] nbytes_read += self._sock.recv_into(buf_view, nbytes) return nbytes_read def _buffer_get_char(self, char): pos = self._buffer.find(char) if pos < 0: pos = len(self._buffer)-1 buf = self._buffer[0:pos+1] self._buffer = self._buffer[pos+1:] return buf def _buffer_get_size(self, size): buf = self._buffer[0:size] self._logger.debug("get %d bytes from buffer", len(buf)) self._buffer = self._buffer[size:] self._logger.debug("leaving %d in the buffer", len(self._buffer)) return buf def _buffer_set(self, buf): self._logger.debug("set %d %s as buffer", len(buf), type(buf)) self._buffer = buf def _parse_dict(self, response): dict = {} if response: for line in response: key, value = self._split_line(line) dict[key] = value return dict def _parse_list(self, response, delimiters): entry = {} if response: for line in response: key, value = self._split_line(line) if entry and key in delimiters: yield entry entry = {} #if key in entry.keys(): # if entry[key] is not list: # entry[key] = [entry[key]] # entry[key].append(value) #else: entry[key] = value if entry: yield entry def _split_line(self, line): parts = line.split(':') return parts[0].lower(), ':'.join(parts[1:]).lstrip() def _extract_album(self, song, lookup=True): album = None if 'album' not in song: song['album'] = MCGAlbum.DEFAULT_ALBUM id = Utils.generate_id(song['album']) if lookup and id in self._albums.keys(): album = self._albums[id] else: album = MCGAlbum(song['album'], self._host) if lookup: self._albums[id] = album return album def _extract_track(self, song): track = None if 'artist' in song and 'title' in song and 'file' in song: track = MCGTrack(song['artist'], song['title'], song['file']) if 'track' in song: track.set_track(song['track']) if 'time' in song: track.set_length(song['time']) if 'date' in song: track.set_date(song['date']) if 'albumartist' in song: track.set_albumartists(song['albumartist']) return track def _extract_playlist_track(self, song): track = self._extract_track(song) if track and 'id' in song and 'pos' in song: track = MCGPlaylistTrack(track, song['id'], song['pos']) return track def _set_connection_status(self, status): self._callback(Client.SIGNAL_CONNECTION, status) class OutputDevice: def __init__(self, id, name): self._id = id self._name = name self._enabled = None def get_id(self): return self._id def get_name(self): return self._name def set_enabled(self, enabled): self._enabled = enabled def is_enabled(self): return self._enabled class MCGAlbum: DEFAULT_ALBUM = 'Various' _FILE_NAMES = ['cover', 'folder'] _FILE_EXTS = ['jpg', 'png', 'jpeg'] _FILTER_DELIMITER = ' ' def __init__(self, title, host): self._artists = [] self._albumartists = [] self._pathes = [] if type(title) is list: title = title[0] self._title = title self._dates = [] self._host = host self._tracks = [] self._length = 0 self._id = Utils.generate_id(title) def __eq__(self, other): return (other and self.get_id() == other.get_id()) def __hash__(self): return hash(self._title) def get_id(self): return self._id def get_artists(self): if self._albumartists: return [artist for artist in self._artists if artist not in self._albumartists] return self._artists def get_albumartists(self): if self._albumartists: return self._albumartists return self._artists def get_title(self): return self._title def get_dates(self): return self._dates def get_date(self): if len(self._dates) == 0: return None return self._dates[0] def get_path(self): return self._path def add_track(self, track): self._tracks.append(track) self._length = self._length + track.get_length() for artist in track.get_artists(): if artist not in self._artists: self._artists.append(artist) for artist in track.get_albumartists(): if artist not in self._albumartists: self._albumartists.append(artist) if track.get_date() is not None and track.get_date() not in self._dates: self._dates.append(track.get_date()) path = os.path.dirname(track.get_file()) if path not in self._pathes: self._pathes.append(path) def get_tracks(self): return self._tracks def get_length(self): return self._length def filter(self, filter_string): if len(filter_string) == 0: return True keywords = filter_string.split(MCGAlbum._FILTER_DELIMITER) for keyword in keywords: if len(keyword) == 0: continue result = False keyword = keyword.lower() # Search in album data for value in self._artists + [self._title] + self._dates: if keyword in value.lower(): result = True break if result: continue # Search in track data for track in self._tracks: if keyword in track.get_title().lower() or keyword in track.get_file().lower(): result = True break if not result: return False return True def compare(album1, album2, criterion=None): if criterion == None: criterion = SortOrder.TITLE if criterion == SortOrder.ARTIST: value_function = "get_artists" elif criterion == SortOrder.TITLE: value_function = "get_title" elif criterion == SortOrder.YEAR: value_function = "get_date" value1 = getattr(album1, value_function)() value2 = getattr(album2, value_function)() if value1 is None and value2 is None: return 0 elif value1 is None: return -1 elif value2 is None: return 1 if value1 < value2: return -1 elif value1 == value2: return 0 else: return 1 class MCGTrack: def __init__(self, artists, title, file): if type(artists) is not list: artists = [artists] self._artists = artists if type(title) is list: title = title[0] self._title = title if type(file) is list: file = file[0] self._file = file self._albumartists = [] self._track = None self._length = 0 self._date = None def __eq__(self, other): return self._file == other.get_file() def __hash__(self): return hash(self._file) def get_artists(self): if self._albumartists: return [artist for artist in self._artists if artist not in self._albumartists] return self._artists def set_albumartists(self, artists): if type(artists) is not list: artists = [artists] self._albumartists = artists def get_albumartists(self): if self._albumartists: return self._albumartists return self._artists def get_title(self): return self._title def get_track(self): return self._track def set_track(self, track): if type(track) is list: track = track[0] if type(track) is str and '/' in track: track = track[0: track.index('/')] if track is not None: try: track = int(track) except ValueError: track = 0 self._track = track def get_length(self): return self._length def set_length(self, length): self._length = int(length) def get_date(self): return self._date def set_date(self, date): if type(date) is list: date = date[0] self._date = date def get_file(self): return self._file class MCGPlaylistTrack(MCGTrack): def __init__(self, track, id, pos): MCGTrack.__init__( self, track.get_artists(), track.get_title(), track.get_file() ) self.set_albumartists(track.get_albumartists()) self.set_track(track.get_track()) self.set_length(track.get_length()) self.set_date(track.get_date()) self._id = int(id) self._pos = int(pos) def get_id(self): return self._id def get_pos(self): return self._pos class MCGConfig(configparser.ConfigParser): CONFIG_DIR = '~/.config/mcg/' def __init__(self, filename): configparser.ConfigParser.__init__(self) self._filename = os.path.expanduser(os.path.join(MCGConfig.CONFIG_DIR, filename)) self._create_dir() def load(self): if os.path.isfile(self._filename): self.read(self._filename) def save(self): with open(self._filename, 'w') as configfile: self.write(configfile) def _create_dir(self): dirname = os.path.dirname(self._filename) if not os.path.exists(dirname): os.makedirs(dirname) class MCGCache(): DIRNAME = '~/.cache/mcg/' SIZE_FILENAME = 'size' _lock = threading.Lock() def __init__(self, host, size): self._host = host self._size = size self._dirname = os.path.expanduser(os.path.join(MCGCache.DIRNAME, host)) if not os.path.exists(self._dirname): os.makedirs(self._dirname) self._read_size() def create_filename(self, album): return os.path.join(self._dirname, '-'.join([album.get_id()])) def _read_size(self): size = 100 MCGCache._lock.acquire() # Read old size filename = os.path.join(self._dirname, MCGCache.SIZE_FILENAME) if os.path.exists(filename): with open(filename, 'r') as f: size = int(f.readline()) # Clear cache if size has changed if size != self._size: self._clear() # Write new size with open(filename, 'w') as f: f.write(str(self._size)) MCGCache._lock.release() def _clear(self): for filename in os.listdir(self._dirname): path = os.path.join(self._dirname, filename) if os.path.isfile(path): try: os.unlink(path) except Exception as e: print("clear:", e)