|
import struct |
|
from io import BytesIO, FileIO |
|
|
|
class TaggedFile: |
|
def __init__(self, data): |
|
self.data = data |
|
if not isinstance(self.data, BytesIO): |
|
raise ValueError("TaggedFile argument is not BytesIO") |
|
|
|
def save(self, output_name): |
|
self.data.seek(0) |
|
open(output_name, "wb").write(self.data.read()) |
|
|
|
def read(self, count=0): |
|
return self.data.read(count) |
|
|
|
class ID3Writer: |
|
|
|
def __init__(self): |
|
self.frames = [] |
|
self.padding = 4096 |
|
|
|
def set_frame(self, frame_id, value): |
|
self.frames.append((frame_id, value)) |
|
return self |
|
|
|
def _encode_frames(self): |
|
frames_data = b'' |
|
for frame_id, value in self.frames: |
|
frames_data += self._encode_frame(frame_id, value) |
|
return frames_data |
|
|
|
def _encode_frame(self, frame_id, value): |
|
if frame_id in ('TPE1', 'TCOM', 'TCON'): |
|
if isinstance(value, list): |
|
text = '/'.join(value) |
|
else: |
|
text = str(value) |
|
content = self._encode_text_frame(text) |
|
elif frame_id in ('TIT2', 'TALB', 'TIT1', 'TIT3', 'TPE2', 'TPE3', 'TPE4', |
|
'TRCK', 'TPOS', 'TPUB', 'TKEY', 'TMED', 'TDAT', 'TSRC', |
|
'TSSE', 'TCOP', 'TEXT', 'TLAN', 'WCOM', 'WCOP', 'WOAF', |
|
'WOAR', 'WOAS', 'WORS', 'WPAY', 'WPUB', 'TCMP'): |
|
content = self._encode_text_frame(str(value)) |
|
elif frame_id in ('TYER', 'TLEN', 'TBPM'): |
|
content = self._encode_text_frame(str(value)) |
|
elif frame_id == 'COMM': |
|
content = self._encode_comment_frame(value) |
|
elif frame_id == 'USLT': |
|
content = self._encode_uslt_frame(value) |
|
elif frame_id == 'TXXX': |
|
content = self._encode_txxx_frame(value) |
|
elif frame_id == 'APIC': |
|
content = self._encode_apic_frame(value) |
|
elif frame_id == 'PRIV': |
|
content = self._encode_priv_frame(value) |
|
else: |
|
content = self._encode_text_frame(str(value)) |
|
|
|
size = len(content) |
|
size_bytes = struct.pack('>I', size) # Plain, not syncsafe |
|
|
|
header = frame_id.encode('latin-1') + size_bytes + b'\x00\x00' |
|
return header + content |
|
|
|
def _encode_text_frame(self, text): |
|
return b'\x03' + text.encode('utf-8') |
|
|
|
def _encode_comment_frame(self, data): |
|
language = data.get('language', 'eng').encode('latin-1') |
|
description = data.get('description', '').encode('utf-8') |
|
text = data.get('text', '').encode('utf-8') |
|
return b'\x03' + language + description + b'\x00' + text |
|
|
|
def _encode_uslt_frame(self, data): |
|
language = data.get('language', 'eng').encode('latin-1') |
|
description = data.get('description', '').encode('utf-8') |
|
lyrics = data.get('lyrics', '').encode('utf-8') |
|
return b'\x03' + language + description + b'\x00' + lyrics |
|
|
|
def _encode_txxx_frame(self, data): |
|
description = data.get('description', '').encode('utf-8') |
|
value = data.get('value', '').encode('utf-8') |
|
return b'\x03' + description + b'\x00' + value |
|
|
|
def _encode_apic_frame(self, data): |
|
picture_data = data.get('data') |
|
if isinstance(picture_data, FileIO): |
|
picture_data = picture_data.read() |
|
if not picture_data: |
|
raise ValueError("APIC frame requires 'data' field with image bytes") |
|
|
|
mime_type = data.get('mime') |
|
if not mime_type: |
|
mime_type = self._detect_mime_type(picture_data) |
|
if not mime_type: |
|
raise ValueError("APIC frame requires 'mime' field (e.g., 'image/jpeg', 'image/png')") |
|
|
|
|
|
picture_type = data.get('type', 3) |
|
description = self._encode_unicode_string(data.get('description', '')) |
|
use_unicode = data.get('useUnicodeEncoding', True) |
|
|
|
encoding = 0x01 if use_unicode else 0x00 |
|
mime_bytes = mime_type.encode('latin-1') |
|
|
|
return (bytes([encoding]) + mime_bytes + b'\x00' + |
|
bytes([picture_type]) + description + picture_data) |
|
|
|
def _detect_mime_type(self, image_data): |
|
"""Detect MIME type from image bytes using magic bytes.""" |
|
if not image_data or len(image_data) < 12: |
|
raise ValueError("Not enough data to detect image type") |
|
|
|
# Check JPEG |
|
if image_data[:3] == b'\xff\xd8\xff': |
|
return 'image/jpeg' |
|
|
|
# Check PNG |
|
if image_data[:8] == b'\x89PNG\r\n\x1a\n': |
|
return 'image/png' |
|
|
|
# Check GIF |
|
if image_data[:6] in (b'GIF87a', b'GIF89a'): |
|
return 'image/gif' |
|
|
|
# Check WebP |
|
if image_data[:4] == b'RIFF' and image_data[8:12] == b'WEBP': |
|
return 'image/webp' |
|
|
|
return '' |
|
|
|
def _encode_priv_frame(self, data): |
|
identifier = data.get('id', '').encode('latin-1') |
|
private_data = data.get('data', b'') |
|
return identifier + b'\x00' + private_data |
|
|
|
def _encode_size(self, size): |
|
return bytes([ |
|
(size >> 21) & 0x7F, |
|
(size >> 14) & 0x7F, |
|
(size >> 7) & 0x7F, |
|
size & 0x7F, |
|
]) |
|
|
|
def _encode_unicode_string(self, string): |
|
if string == "": |
|
return b'\xff\xfe\x00\x00' |
|
else: |
|
return b'\xff\xfe' + string.encode('utf-16-le') + b'\x00\x00' |
|
|
|
def _build_tag(self): |
|
frames_data = self._encode_frames() |
|
tag_size = len(frames_data) |
|
flags = 0x00 |
|
syncsafe_size = self._encode_size(tag_size + self.padding) |
|
header = b'ID3' + b'\x03\x00' + bytes([flags]) + syncsafe_size |
|
padding_bytes = b'\x00' * self.padding if self.padding > 0 else b'' |
|
return header + frames_data + padding_bytes |
|
|
|
def _skip_existing_tag(self, data): |
|
""" |
|
Given audio data as bytes, return the position where audio actually starts |
|
(skipping any existing ID3 tag at the beginning). |
|
""" |
|
if data[:3] != b'ID3': |
|
return 0 |
|
|
|
# Get tag size from header (bytes 6-9, syncsafe) |
|
size_bytes = data[6:10] |
|
tag_size = ( |
|
((size_bytes[0] & 0x7F) << 21) | |
|
((size_bytes[1] & 0x7F) << 14) | |
|
((size_bytes[2] & 0x7F) << 7) | |
|
(size_bytes[3] & 0x7F) |
|
) |
|
|
|
# Header is 10 bytes |
|
return 10 + tag_size |
|
|
|
def add_tag(self, audio_file): |
|
""" |
|
Add audio data (bytes, BytesIO, or file path) and return a BytesIO |
|
with the full tagged MP3. Automatically skips any existing ID3 tag. |
|
""" |
|
# Read audio data |
|
if isinstance(audio_file, str): |
|
with open(audio_file, 'rb') as f: |
|
audio_data = f.read() |
|
elif isinstance(audio_file, BytesIO): |
|
audio_data = audio_file.getvalue() |
|
elif isinstance(audio_file, bytes): |
|
audio_data = audio_file |
|
else: |
|
raise TypeError("audio_file must be bytes, BytesIO, or file path string") |
|
|
|
# Skip any existing ID3 tag at the beginning |
|
audio_start = self._skip_existing_tag(audio_data) |
|
clean_audio = audio_data[audio_start:] |
|
|
|
# Build tag and combine with clean audio |
|
tag = self._build_tag() |
|
result = BytesIO() |
|
result.write(tag) |
|
result.write(clean_audio) |
|
result.seek(0) |
|
return TaggedFile(result) |