Last active
January 16, 2023 23:10
-
-
Save vhutov/1f5d6a9c4b259f3b108946aaa6a4fce8 to your computer and use it in GitHub Desktop.
Populate SQL db with tracks data
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from sqlalchemy import Column, ForeignKey, MetaData, String, Table | |
metadata = MetaData() | |
artists = Table( | |
"artists", metadata, | |
Column('uri', String(32), primary_key=True), | |
Column('name', String(100), nullable=False) | |
) | |
tracks = Table( | |
"tracks", metadata, | |
Column('uri', String(32), primary_key=True), | |
Column('name', String(100), nullable=False), | |
Column('artist_uri', String(32), ForeignKey('artists.uri'), nullable=False) | |
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import csv | |
import itertools | |
from io import TextIOWrapper | |
from zipfile import ZipFile | |
from sqlalchemy import create_engine | |
from sqlalchemy.engine import URL | |
from model import artists, metadata, tracks | |
def connect(): | |
url_object = URL.create( | |
"mysql+mysqlconnector", | |
username="user", | |
password="user123", | |
host="localhost", | |
database='music' | |
) | |
return create_engine(url_object, echo=True, future=True) | |
def create_tables(engine): | |
metadata.create_all(engine) | |
def lines_stream(): | |
with ZipFile('./scripts/playlist_events.zip', 'r') as file: | |
files = file.namelist() | |
for f in files: | |
with file.open(f, 'r') as myfile: | |
myfile = TextIOWrapper(myfile) | |
reader = csv.DictReader(myfile, delimiter=',') | |
yield from reader | |
def row_batches(lines_iterator, bs): | |
while (True): | |
rows = list(itertools.islice(lines_iterator, bs)) | |
if not rows: | |
break | |
yield rows | |
def to_artist_row(line): | |
return { | |
'uri': line['artist_uri'], | |
'name': line['artist_name'] | |
} | |
def to_track_row(line): | |
return { | |
'uri': line['track_uri'], | |
'name': line['track_name'], | |
'artist_uri': line['artist_uri'] | |
} | |
def prepare_insert(seen_artists, seen_tracks, batch): | |
artist_rows = [ | |
to_artist_row(r) for r in batch if r['artist_uri'] not in seen_artists | |
] | |
track_rows = [ | |
to_track_row(r) for r in batch if r['track_uri'] not in seen_tracks | |
] | |
seen_artists.update(r['uri'] for r in artist_rows) | |
seen_tracks.update(r['uri'] for r in track_rows) | |
return artist_rows, track_rows | |
def populate_tables(engine): | |
batch_size = 1_000 | |
lines = lines_stream() | |
seen_artists = set() | |
seen_tracks = set() | |
for batch, i in zip(row_batches(lines, batch_size), range(0)): | |
batch_offest = i*batch_size | |
print(f'Inserting [{batch_offest}, {batch_offest + batch_size}) values') | |
artist_rows, track_rows = prepare_insert(seen_artists, seen_tracks, batch) | |
with engine.connect() as connection: | |
if artist_rows: | |
connection.execute( | |
artists.insert().prefix_with('IGNORE'), | |
artist_rows | |
) | |
if track_rows: | |
connection.execute( | |
tracks.insert().prefix_with('IGNORE'), | |
track_rows | |
) | |
if artist_rows or track_rows: | |
connection.commit() | |
engine = connect() | |
create_tables(engine) | |
populate_tables(engine) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import itertools | |
from elasticsearch import Elasticsearch, helpers | |
from sqlalchemy import create_engine, select | |
from sqlalchemy.engine import URL | |
from model import artists, tracks | |
def create_db_connection(): | |
url_object = URL.create( | |
"mysql+mysqlconnector", | |
username="user", | |
password="user123", | |
host="localhost", | |
database='music' | |
) | |
return create_engine(url_object, echo=True, future=True) | |
def create_elastic(): | |
return Elasticsearch(hosts='http://localhost:9200') | |
def create_indinces(es): | |
track_mappings = { | |
'properties': { | |
'name': { | |
'type': 'text', | |
'analyzer': 'english' | |
}, | |
'artist': { | |
'type': 'text', | |
'analyzer': 'english' | |
} | |
} | |
} | |
artist_mappings = { | |
'properties': { | |
'name': { | |
'type': 'text', | |
'analyzer': 'english' | |
}, | |
} | |
} | |
if not es.indices.exists(index='tracks'): | |
es.indices.create(index='tracks', mappings=track_mappings) | |
if not es.indices.exists(index='artists'): | |
es.indices.create(index='artists', mappings=artist_mappings) | |
def to_es_action_track(item): | |
return { | |
'_op_type': 'create', | |
'_index': 'tracks', | |
'_id': item['uri'], | |
'name': item['name'], | |
'artist': item['artist'] | |
} | |
def to_es_action_artist(item): | |
return { | |
'_op_type': 'create', | |
'_index': 'artists', | |
'_id': item['uri'], | |
'name': item['name'] | |
} | |
def to_actions_track(batch): | |
return [to_es_action_track(i) for i in batch if i is not None] | |
def to_actions_artists(batch): | |
return [to_es_action_artist(i) for i in batch if i is not None] | |
def read_tracks(connection): | |
columns = select(tracks, artists.c.name.label('artist')) | |
rows = connection.execute(columns.select_from(tracks).join(artists)) | |
return rows | |
def read_artists(connection): | |
return connection.execute(artists.select()) | |
def rows_batches(rows, bs=1000): | |
while True: | |
batch = list(itertools.islice(rows, bs)) | |
if not batch: | |
return | |
yield batch | |
def index_tracks(engine, es): | |
with engine.connect() as connection: | |
rows = read_tracks(connection) | |
for batch in rows_batches(rows): | |
print('indexing next batch') | |
actions = to_actions_track(batch) | |
resp = helpers.bulk(es, actions=actions) | |
print(resp) | |
print('finished indexing tracks') | |
def index_artists(engine, es): | |
with engine.connect() as connection: | |
rows = read_artists(connection) | |
for batch in rows_batches(rows): | |
print('indexing next batch') | |
actions = to_actions_artists(batch) | |
resp = helpers.bulk(es, actions=actions) | |
print(resp) | |
print('finished indexing artists') | |
engine = create_db_connection() | |
es = create_elastic() | |
create_indinces(es) | |
index_tracks(engine, es) | |
index_artists(engine, es) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from io import TextIOWrapper | |
from zipfile import ZipFile | |
import redis | |
r = redis.Redis(host='localhost', port=6379, db=0) | |
index_names = { | |
'matrix_factorisation': 'matrix_factorisation_similarity.txt', | |
'mlp': 'multilayer_perceptron_similarity.txt', | |
'coco': 'coco_similar.txt', | |
'tracks': 'track_similarity.txt' | |
} | |
def populate_index(index_name, file_name): | |
for l in file_name: | |
[key, *values] = l.strip().split(' ') | |
full_key = f'{index_name}:{key}' | |
r.lpush(full_key, *values) | |
with ZipFile('./similar.zip', 'r') as file: | |
files = file.namelist() | |
for f in files: | |
for index_name, suffix in index_names.items(): | |
if f.endswith(suffix): | |
with file.open(f, 'r') as zip_file: | |
populate_index(index_name, TextIOWrapper(zip_file)) | |
continue |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
sqlalchemy | |
mysql-connector-python | |
redis | |
elasticsearch |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment