|
import csv |
|
import sqlite3 |
|
|
|
from datetime import datetime, timedelta |
|
|
|
# A delphi datetime value is the (fractional) number of days since the epoch |
|
# e.g. 42 minutes past the the UNIX epoch is 25569.029166666667 in Delphi terms. |
|
DELPHI_EPOCH = datetime(1899, 12, 30) |
|
|
|
|
|
def datetime_fromdelphi(dvalue): |
|
return DELPHI_EPOCH + timedelta(days=dvalue) |
|
|
|
|
|
def datetime_todelphi(dt): |
|
try: |
|
return (dt - DELPHI_EPOCH) / timedelta(days=1) |
|
except TypeError: |
|
# Python 2.7 / Python 3.1 or older don't support division |
|
delta = dt - DELPHI_EPOCH |
|
return delta.days + (delta.seconds + delta.microseconds / 1000000.0) / 24 / 3600 |
|
|
|
|
|
def cmp(x, y): |
|
return (x > y) - (x < y) |
|
|
|
|
|
def make_connection(db_path): |
|
def iUnicodeCollate(s1, s2): |
|
return cmp(s1.lower(), s2.lower()) |
|
|
|
conn = sqlite3.connect( |
|
db_path, timeout=60 |
|
) |
|
conn.row_factory = sqlite3.Row |
|
conn.create_collation("IUNICODE", iUnicodeCollate) |
|
|
|
return conn |
|
|
|
|
|
def get_mm_row(c, song): |
|
c.execute( |
|
f"""select ID, LastTimePlayed, Artist, Album, SongTitle from Songs |
|
where Artist = ? |
|
and SongTitle = ? |
|
and Album = ? |
|
""", |
|
[song["artist"], song["title"], song["album"]], |
|
) |
|
r = c.fetchall() |
|
if len(r) > 1: |
|
raise ValueError("multiple matches", r) |
|
if not r: |
|
return None |
|
|
|
return r[0] |
|
|
|
|
|
def update_songs(c, songs): |
|
for song in songs: |
|
print(f"{song['artist']}:{song['title']}: {song['play count']}", end="") |
|
playcount = int(song["play count"]) |
|
if playcount < 1: |
|
print(f" -> skipping: no plays") |
|
continue |
|
|
|
mm_row = get_mm_row(c, song) |
|
if not mm_row: |
|
print("f -> skipping: no match") |
|
continue |
|
mm_id, mm_last_played = mm_row[:2] |
|
|
|
updated_both = False |
|
last_played = datetime.strptime(song["last played"], "%m/%d/%Y, %I:%M:%S %p") |
|
last_played_delphi = datetime_todelphi(last_played) |
|
if last_played.timestamp() > 0 and last_played_delphi > mm_last_played: |
|
c.execute( |
|
f"""update Songs |
|
set LastTimePlayed = ? |
|
where ID = ? |
|
""", |
|
[last_played_delphi, mm_id], |
|
) |
|
updated_both = True |
|
|
|
c.execute( |
|
f"""update Songs |
|
set PlayCounter = PlayCounter + {playcount} |
|
where ID = ? |
|
""", |
|
[mm_id], |
|
) |
|
print(f" -> updated", updated_both) |
|
|
|
|
|
if __name__ == "__main__": |
|
try: |
|
conn = make_connection("MM.DB") |
|
with open("google_music_library.csv") as f: |
|
songs = csv.DictReader(f) |
|
c = conn.cursor() |
|
update_songs(c, songs) |
|
finally: |
|
conn.commit() |
|
conn.close() |