Wow, that was simpler than I thought

This commit is contained in:
Benson Chu 2026-04-04 14:37:50 -05:00
parent 55205ec0ac
commit e576ce1278
3 changed files with 93 additions and 32 deletions

View file

@ -3,7 +3,7 @@ from beets.plugins import BeetsPlugin
from beets.ui import print_
from beets.dbcore import types
from typing import Any
import json
from beets.dbcore import queryparse, query
@ -18,6 +18,75 @@ import re
from .labels import HasLabelQuery, labels_command, LABELS_FIELD_NAME, LabelValueSort
from .playlists import initialize_playlists, expand_playlist_query, valid_playlist, PlaylistValueSort
class KeyValueDelimitedString(types.Type):
"""A dict type stored as 'key:value; key2:value2' in the database.
Stores dictionaries with string keys and integer values in a human-readable
semicolon-delimited format. Values default to 1 if not specified.
"""
sql = "TEXT"
query = query.SubstringQuery
model_type = dict
@property
def null(self):
return {}
def format(self, value: dict | None) -> str:
"""Format dict as 'key:value; key2:value2' string."""
if not value:
return ""
# Sort for consistent output
return "; ".join(f"{k}:{v}" for k, v in sorted(value.items()))
def parse(self, string: str) -> dict:
"""Parse 'key:value; key2:value2' string or legacy JSON into dict."""
if not string:
return {}
# Try parsing as JSON first (for backward compatibility)
if string.startswith('{'):
try:
return json.loads(string)
except (json.JSONDecodeError, ValueError):
pass
# Parse as semicolon-delimited format
result = {}
for pair in string.split("; "):
pair = pair.strip()
if not pair:
continue
if ":" in pair:
k, v = pair.split(":", 1)
k = k.strip()
v = v.strip()
try:
result[k] = int(v)
except ValueError:
result[k] = 1
else:
result[pair] = 1
return result
def to_sql(self, model_value: Any) -> str:
"""Convert dict to SQL string."""
if model_value is None:
return ""
return self.format(model_value)
def normalize(self, value: Any) -> dict:
"""Normalize value to dict."""
if value is None:
return {}
if isinstance(value, dict):
return value
if isinstance(value, str):
return self.parse(value)
return {}
class LabelValueSortsDict(dict):
"""Custom dict that returns LabelValueSort for any label:* key."""
@ -117,12 +186,10 @@ def parse_sorted_query_override(
return q, s
class BeetsLabelsPlugin(BeetsPlugin):
# item_queries = { 'has_label': DelimitedHasExact }
# item_types = {'mylabels': types.SEMICOLON_SPACE_DSV}
def __init__(self):
super().__init__()
self.item_queries = {"label": HasLabelQuery}
self.item_types = {LABELS_FIELD_NAME: KeyValueDelimitedString()}
playlists_config = self.config['playlists'].get()
initialize_playlists(playlists_config)

View file

@ -20,7 +20,7 @@ def do_modify_labels(lib, objs, label, action):
labels = {}
if action != 2:
if LABELS_FIELD_NAME in obj:
labels = json.loads(obj[LABELS_FIELD_NAME])
labels = dict(obj[LABELS_FIELD_NAME]) # Make a copy
split = label.split(":")
@ -33,7 +33,7 @@ def do_modify_labels(lib, objs, label, action):
del labels[label]
obj_mods = {
LABELS_FIELD_NAME: json.dumps(labels)
LABELS_FIELD_NAME: labels
}
if print_and_modify(obj, obj_mods, []) and obj not in changed:
changed.append(obj)
@ -94,8 +94,8 @@ def modify_labels(lib, opts, args):
if actnum == 3:
for obj in items:
if LABELS_FIELD_NAME in obj:
labels = json.loads(obj[LABELS_FIELD_NAME])
labelstr = ";".join([f"{key}:{labels[key]}" for key in labels.keys()])
labels = obj[LABELS_FIELD_NAME]
labelstr = "; ".join([f"{key}:{labels[key]}" for key in labels.keys()])
print_(f"{obj.title}: {labelstr}")
else:
do_modify_labels(lib, items, label, actnum)
@ -141,13 +141,13 @@ def transfer_labels(lib, opts, args):
print_("Source has no labels to transfer.")
return
labels = json.loads(source[LABELS_FIELD_NAME])
labelstr = ";".join([f"{key}:{labels[key]}" for key in labels.keys()])
labels = source[LABELS_FIELD_NAME]
labelstr = "; ".join([f"{key}:{labels[key]}" for key in labels.keys()])
print_(f"Labels to transfer: {labelstr}")
if LABELS_FIELD_NAME in dest and dest[LABELS_FIELD_NAME]:
dest_labels = json.loads(dest[LABELS_FIELD_NAME])
dest_labelstr = ";".join([f"{key}:{dest_labels[key]}" for key in dest_labels.keys()])
dest_labels = dest[LABELS_FIELD_NAME]
dest_labelstr = "; ".join([f"{key}:{dest_labels[key]}" for key in dest_labels.keys()])
print_(f"WARNING: Destination already has labels: {dest_labelstr}")
print_("These will be overwritten!")
print_("")
@ -178,7 +178,7 @@ def edit_labels(lib, opts, args):
for item in items_list:
labels = {}
if LABELS_FIELD_NAME in item and item[LABELS_FIELD_NAME]:
labels = json.loads(item[LABELS_FIELD_NAME])
labels = item[LABELS_FIELD_NAME]
# Format: artist - title:\n - key: value
if item.artist:
@ -261,7 +261,7 @@ def edit_labels(lib, opts, args):
# Compare with old labels
old_labels = {}
if LABELS_FIELD_NAME in item and item[LABELS_FIELD_NAME]:
old_labels = json.loads(item[LABELS_FIELD_NAME])
old_labels = item[LABELS_FIELD_NAME]
if new_labels != old_labels:
changes.append((item, new_labels))
@ -277,10 +277,10 @@ def edit_labels(lib, opts, args):
for item, new_labels in changes:
old_labels = {}
if LABELS_FIELD_NAME in item and item[LABELS_FIELD_NAME]:
old_labels = json.loads(item[LABELS_FIELD_NAME])
old_labels = item[LABELS_FIELD_NAME]
old_str = ";".join([f"{k}:{v}" for k, v in sorted(old_labels.items())]) if old_labels else "(none)"
new_str = ";".join([f"{k}:{v}" for k, v in sorted(new_labels.items())]) if new_labels else "(none)"
old_str = "; ".join([f"{k}:{v}" for k, v in sorted(old_labels.items())]) if old_labels else "(none)"
new_str = "; ".join([f"{k}:{v}" for k, v in sorted(new_labels.items())]) if new_labels else "(none)"
print_(f"{item.artist} - {item.title}")
print_(f" Old: {old_str}")
@ -292,7 +292,7 @@ def edit_labels(lib, opts, args):
# Apply changes
with lib.transaction():
for item, new_labels in changes:
item[LABELS_FIELD_NAME] = json.dumps(new_labels)
item[LABELS_FIELD_NAME] = new_labels
item.try_sync(True, False, False)
print_(f"Updated {len(changes)} item(s).")
@ -305,16 +305,14 @@ class HasLabelQuery(FieldQuery):
super().__init__(LABELS_FIELD_NAME, pattern, False)
@classmethod
def value_match(self, pattern, jsonstr):
if jsonstr is not None:
def value_match(self, pattern, labels):
if labels is not None:
label = pattern
value = None
if "." in pattern:
label,value = pattern.split(".")
label, value = pattern.split(".")
labels = json.loads(jsonstr)
if value == None:
if value is None:
return label in labels
else:
return label in labels and str(labels[label]) == value
@ -333,14 +331,12 @@ class LabelValueSort(SlowFieldSort):
def sort(self, objs):
def key(obj):
labels_json = obj.get(LABELS_FIELD_NAME, None)
labels = obj.get(LABELS_FIELD_NAME, None)
if not labels_json:
if not labels:
# No labels, return minimum value for sorting
return float('-inf') if self.ascending else float('inf')
labels = json.loads(labels_json)
if self.label_key:
value = labels.get(self.label_key, float('-inf') if self.ascending else float('inf'))
else:

View file

@ -83,14 +83,12 @@ class PlaylistValueSort(SlowFieldSort):
def sort(self, objs):
def key(obj):
labels_json = obj.get(LABELS_FIELD_NAME, None)
labels = obj.get(LABELS_FIELD_NAME, None)
if not labels_json:
if not labels:
# No labels, return minimum value for sorting
return float('-inf') if self.ascending else float('inf')
labels = json.loads(labels_json)
# If there's a sort expression, evaluate it
if self.sort_expr:
return evaluate_sort_expression(self.sort_expr, labels)