Compare commits

...

3 commits

Author SHA1 Message Date
e576ce1278 Wow, that was simpler than I thought 2026-04-04 14:37:50 -05:00
55205ec0ac Add a sorted_playlist macro 2026-04-04 14:34:32 -05:00
ca8d753cd0 Use eval to compute cost 2026-04-04 14:34:13 -05:00
4 changed files with 174 additions and 38 deletions

View file

@ -63,6 +63,7 @@ beetslabels:
# or: "label:a , label:b" # or: "label:a , label:b"
# in the future? concat: ["label:a", "label:b"] # in the future? concat: ["label:a", "label:b"]
query: "label:effortless" query: "label:effortless"
sort: "effortless + exercise"
smartplaylist: smartplaylist:
relative_to: ~/Music/0beets_playlists relative_to: ~/Music/0beets_playlists
@ -86,7 +87,7 @@ smartplaylist:
- name: "precision.m3u" - name: "precision.m3u"
query: ['plprecision:1'] query: ['plprecision:1']
- name: "exercise.m3u" - name: "exercise.m3u"
query: ['label:exercise-'] # ['^plexercise::^$ plexercise-'] query: ['sorted_playlist:exercise'] # ['^plexercise::^$ plexercise-']
- name: "chiptune.m3u" - name: "chiptune.m3u"
query: ['genre:chiptune', 'genre:8-bit', 'plchiptune:1'] query: ['genre:chiptune', 'genre:8-bit', 'plchiptune:1']

View file

@ -3,7 +3,7 @@ from beets.plugins import BeetsPlugin
from beets.ui import print_ from beets.ui import print_
from beets.dbcore import types from beets.dbcore import types
from typing import Any
import json import json
from beets.dbcore import queryparse, query from beets.dbcore import queryparse, query
@ -18,6 +18,75 @@ import re
from .labels import HasLabelQuery, labels_command, LABELS_FIELD_NAME, LabelValueSort from .labels import HasLabelQuery, labels_command, LABELS_FIELD_NAME, LabelValueSort
from .playlists import initialize_playlists, expand_playlist_query, valid_playlist, PlaylistValueSort from .playlists import initialize_playlists, expand_playlist_query, valid_playlist, PlaylistValueSort
class KeyValueDelimitedString(types.Type):
"""A dict type stored as 'key:value; key2:value2' in the database.
Stores dictionaries with string keys and integer values in a human-readable
semicolon-delimited format. Values default to 1 if not specified.
"""
sql = "TEXT"
query = query.SubstringQuery
model_type = dict
@property
def null(self):
return {}
def format(self, value: dict | None) -> str:
"""Format dict as 'key:value; key2:value2' string."""
if not value:
return ""
# Sort for consistent output
return "; ".join(f"{k}:{v}" for k, v in sorted(value.items()))
def parse(self, string: str) -> dict:
"""Parse 'key:value; key2:value2' string or legacy JSON into dict."""
if not string:
return {}
# Try parsing as JSON first (for backward compatibility)
if string.startswith('{'):
try:
return json.loads(string)
except (json.JSONDecodeError, ValueError):
pass
# Parse as semicolon-delimited format
result = {}
for pair in string.split("; "):
pair = pair.strip()
if not pair:
continue
if ":" in pair:
k, v = pair.split(":", 1)
k = k.strip()
v = v.strip()
try:
result[k] = int(v)
except ValueError:
result[k] = 1
else:
result[pair] = 1
return result
def to_sql(self, model_value: Any) -> str:
"""Convert dict to SQL string."""
if model_value is None:
return ""
return self.format(model_value)
def normalize(self, value: Any) -> dict:
"""Normalize value to dict."""
if value is None:
return {}
if isinstance(value, dict):
return value
if isinstance(value, str):
return self.parse(value)
return {}
class LabelValueSortsDict(dict): class LabelValueSortsDict(dict):
"""Custom dict that returns LabelValueSort for any label:* key.""" """Custom dict that returns LabelValueSort for any label:* key."""
@ -46,10 +115,23 @@ def parse_sorted_query_override(
"""Given a list of strings, create the `Query` and `Sort` that they """Given a list of strings, create the `Query` and `Sort` that they
represent. represent.
""" """
# First, expand any playlist: macros # First, expand any playlist: and sorted_playlist: macros
expanded_parts = [] expanded_parts = []
for part in parts: for part in parts:
if part.startswith('playlist:') and not re.match(r"[+-]$", part): if part.startswith('sorted_playlist:'):
# Extract playlist name
playlist_name = part[len("sorted_playlist:"):]
if valid_playlist(playlist_name):
# Expand to the query
expanded_parts.extend(expand_playlist_query(playlist_name))
# Inject the sort
expanded_parts.append(f"playlist:{playlist_name}-")
else:
# Unknown playlist, just let it go through
expanded_parts.append(part)
elif part.startswith('playlist:') and not re.match(r"[+-]$", part):
playlist_name = part[len("playlist:"):] playlist_name = part[len("playlist:"):]
if valid_playlist(playlist_name): if valid_playlist(playlist_name):
@ -104,12 +186,10 @@ def parse_sorted_query_override(
return q, s return q, s
class BeetsLabelsPlugin(BeetsPlugin): class BeetsLabelsPlugin(BeetsPlugin):
# item_queries = { 'has_label': DelimitedHasExact }
# item_types = {'mylabels': types.SEMICOLON_SPACE_DSV}
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.item_queries = {"label": HasLabelQuery} self.item_queries = {"label": HasLabelQuery}
self.item_types = {LABELS_FIELD_NAME: KeyValueDelimitedString()}
playlists_config = self.config['playlists'].get() playlists_config = self.config['playlists'].get()
initialize_playlists(playlists_config) initialize_playlists(playlists_config)

View file

@ -20,7 +20,7 @@ def do_modify_labels(lib, objs, label, action):
labels = {} labels = {}
if action != 2: if action != 2:
if LABELS_FIELD_NAME in obj: if LABELS_FIELD_NAME in obj:
labels = json.loads(obj[LABELS_FIELD_NAME]) labels = dict(obj[LABELS_FIELD_NAME]) # Make a copy
split = label.split(":") split = label.split(":")
@ -33,7 +33,7 @@ def do_modify_labels(lib, objs, label, action):
del labels[label] del labels[label]
obj_mods = { obj_mods = {
LABELS_FIELD_NAME: json.dumps(labels) LABELS_FIELD_NAME: labels
} }
if print_and_modify(obj, obj_mods, []) and obj not in changed: if print_and_modify(obj, obj_mods, []) and obj not in changed:
changed.append(obj) changed.append(obj)
@ -94,8 +94,8 @@ def modify_labels(lib, opts, args):
if actnum == 3: if actnum == 3:
for obj in items: for obj in items:
if LABELS_FIELD_NAME in obj: if LABELS_FIELD_NAME in obj:
labels = json.loads(obj[LABELS_FIELD_NAME]) labels = obj[LABELS_FIELD_NAME]
labelstr = ";".join([f"{key}:{labels[key]}" for key in labels.keys()]) labelstr = "; ".join([f"{key}:{labels[key]}" for key in labels.keys()])
print_(f"{obj.title}: {labelstr}") print_(f"{obj.title}: {labelstr}")
else: else:
do_modify_labels(lib, items, label, actnum) do_modify_labels(lib, items, label, actnum)
@ -141,13 +141,13 @@ def transfer_labels(lib, opts, args):
print_("Source has no labels to transfer.") print_("Source has no labels to transfer.")
return return
labels = json.loads(source[LABELS_FIELD_NAME]) labels = source[LABELS_FIELD_NAME]
labelstr = ";".join([f"{key}:{labels[key]}" for key in labels.keys()]) labelstr = "; ".join([f"{key}:{labels[key]}" for key in labels.keys()])
print_(f"Labels to transfer: {labelstr}") print_(f"Labels to transfer: {labelstr}")
if LABELS_FIELD_NAME in dest and dest[LABELS_FIELD_NAME]: if LABELS_FIELD_NAME in dest and dest[LABELS_FIELD_NAME]:
dest_labels = json.loads(dest[LABELS_FIELD_NAME]) dest_labels = dest[LABELS_FIELD_NAME]
dest_labelstr = ";".join([f"{key}:{dest_labels[key]}" for key in dest_labels.keys()]) dest_labelstr = "; ".join([f"{key}:{dest_labels[key]}" for key in dest_labels.keys()])
print_(f"WARNING: Destination already has labels: {dest_labelstr}") print_(f"WARNING: Destination already has labels: {dest_labelstr}")
print_("These will be overwritten!") print_("These will be overwritten!")
print_("") print_("")
@ -178,7 +178,7 @@ def edit_labels(lib, opts, args):
for item in items_list: for item in items_list:
labels = {} labels = {}
if LABELS_FIELD_NAME in item and item[LABELS_FIELD_NAME]: if LABELS_FIELD_NAME in item and item[LABELS_FIELD_NAME]:
labels = json.loads(item[LABELS_FIELD_NAME]) labels = item[LABELS_FIELD_NAME]
# Format: artist - title:\n - key: value # Format: artist - title:\n - key: value
if item.artist: if item.artist:
@ -261,7 +261,7 @@ def edit_labels(lib, opts, args):
# Compare with old labels # Compare with old labels
old_labels = {} old_labels = {}
if LABELS_FIELD_NAME in item and item[LABELS_FIELD_NAME]: if LABELS_FIELD_NAME in item and item[LABELS_FIELD_NAME]:
old_labels = json.loads(item[LABELS_FIELD_NAME]) old_labels = item[LABELS_FIELD_NAME]
if new_labels != old_labels: if new_labels != old_labels:
changes.append((item, new_labels)) changes.append((item, new_labels))
@ -277,10 +277,10 @@ def edit_labels(lib, opts, args):
for item, new_labels in changes: for item, new_labels in changes:
old_labels = {} old_labels = {}
if LABELS_FIELD_NAME in item and item[LABELS_FIELD_NAME]: if LABELS_FIELD_NAME in item and item[LABELS_FIELD_NAME]:
old_labels = json.loads(item[LABELS_FIELD_NAME]) old_labels = item[LABELS_FIELD_NAME]
old_str = ";".join([f"{k}:{v}" for k, v in sorted(old_labels.items())]) if old_labels else "(none)" old_str = "; ".join([f"{k}:{v}" for k, v in sorted(old_labels.items())]) if old_labels else "(none)"
new_str = ";".join([f"{k}:{v}" for k, v in sorted(new_labels.items())]) if new_labels else "(none)" new_str = "; ".join([f"{k}:{v}" for k, v in sorted(new_labels.items())]) if new_labels else "(none)"
print_(f"{item.artist} - {item.title}") print_(f"{item.artist} - {item.title}")
print_(f" Old: {old_str}") print_(f" Old: {old_str}")
@ -292,7 +292,7 @@ def edit_labels(lib, opts, args):
# Apply changes # Apply changes
with lib.transaction(): with lib.transaction():
for item, new_labels in changes: for item, new_labels in changes:
item[LABELS_FIELD_NAME] = json.dumps(new_labels) item[LABELS_FIELD_NAME] = new_labels
item.try_sync(True, False, False) item.try_sync(True, False, False)
print_(f"Updated {len(changes)} item(s).") print_(f"Updated {len(changes)} item(s).")
@ -305,16 +305,14 @@ class HasLabelQuery(FieldQuery):
super().__init__(LABELS_FIELD_NAME, pattern, False) super().__init__(LABELS_FIELD_NAME, pattern, False)
@classmethod @classmethod
def value_match(self, pattern, jsonstr): def value_match(self, pattern, labels):
if jsonstr is not None: if labels is not None:
label = pattern label = pattern
value = None value = None
if "." in pattern: if "." in pattern:
label,value = pattern.split(".") label, value = pattern.split(".")
labels = json.loads(jsonstr) if value is None:
if value == None:
return label in labels return label in labels
else: else:
return label in labels and str(labels[label]) == value return label in labels and str(labels[label]) == value
@ -333,14 +331,12 @@ class LabelValueSort(SlowFieldSort):
def sort(self, objs): def sort(self, objs):
def key(obj): def key(obj):
labels_json = obj.get(LABELS_FIELD_NAME, None) labels = obj.get(LABELS_FIELD_NAME, None)
if not labels_json: if not labels:
# No labels, return minimum value for sorting # No labels, return minimum value for sorting
return float('-inf') if self.ascending else float('inf') return float('-inf') if self.ascending else float('inf')
labels = json.loads(labels_json)
if self.label_key: if self.label_key:
value = labels.get(self.label_key, float('-inf') if self.ascending else float('inf')) value = labels.get(self.label_key, float('-inf') if self.ascending else float('inf'))
else: else:

View file

@ -5,6 +5,8 @@ from beets.dbcore.query import SlowFieldSort
from .labels import LABELS_FIELD_NAME from .labels import LABELS_FIELD_NAME
import json import json
import ast
import re
playlist_config = {} playlist_config = {}
@ -12,37 +14,94 @@ def initialize_playlists(playlists):
for playlist in playlists: for playlist in playlists:
pl_name = playlist["name"] pl_name = playlist["name"]
query = [playlist["query"], ",", f"label:{pl_name}"] query = [playlist["query"], ",", f"label:{pl_name}"]
playlist_config[pl_name] = query sort_expr = playlist.get("sort", None)
playlist_config[pl_name] = {
"query": query,
"sort": sort_expr
}
def valid_playlist(playlist_name): def valid_playlist(playlist_name):
return playlist_name in playlist_config return playlist_name in playlist_config
def expand_playlist_query(playlist_name): def expand_playlist_query(playlist_name):
print(playlist_config[playlist_name]) return playlist_config[playlist_name]["query"]
return playlist_config[playlist_name]
def get_sort_expression(playlist_name):
"""Get the sort expression for a playlist, or None if not defined."""
return playlist_config[playlist_name].get("sort")
def extract_label_names(expression):
"""Extract label names from a Python expression.
Returns a set of identifier names found in the expression.
"""
try:
tree = ast.parse(expression, mode='eval')
label_names = set()
for node in ast.walk(tree):
if isinstance(node, ast.Name):
label_names.add(node.id)
return label_names
except SyntaxError:
return set()
def evaluate_sort_expression(expression, labels):
"""Evaluate a sort expression with label values.
Args:
expression: Python expression like "effortless + exercise * 2"
labels: Dict of label names to values
Returns:
The evaluated result, or a default value if evaluation fails.
"""
if not expression:
return 0
# Extract label names from the expression
label_names = extract_label_names(expression)
# Build a namespace with label values (defaulting to 0 for missing labels)
namespace = {name: labels.get(name, 0) for name in label_names}
try:
# Evaluate with restricted builtins for safety
result = eval(expression, {"__builtins__": {}}, namespace)
return float(result) if result is not None else 0
except Exception:
# If evaluation fails, return 0
return 0
class PlaylistValueSort(SlowFieldSort): class PlaylistValueSort(SlowFieldSort):
def __init__(self, field, ascending=True, case_insensitive=True): def __init__(self, field, ascending=True, case_insensitive=True):
super().__init__(field, ascending, case_insensitive) super().__init__(field, ascending, case_insensitive)
self.playlist_key = field[len("playlist:"):] self.playlist_key = field[len("playlist:"):]
self.sort_expr = get_sort_expression(self.playlist_key)
def sort(self, objs): def sort(self, objs):
def key(obj): def key(obj):
labels_json = obj.get(LABELS_FIELD_NAME, None) labels = obj.get(LABELS_FIELD_NAME, None)
if not labels_json: if not labels:
# No labels, return minimum value for sorting # No labels, return minimum value for sorting
return float('-inf') if self.ascending else float('inf') return float('-inf') if self.ascending else float('inf')
labels = json.loads(labels_json) # If there's a sort expression, evaluate it
if self.sort_expr:
return evaluate_sort_expression(self.sort_expr, labels)
# Otherwise, use the old behavior:
# Check if there's a label with the playlist name
if self.playlist_key in labels: if self.playlist_key in labels:
return labels[self.playlist_key] return labels[self.playlist_key]
# Sum all matching label values from the playlist query
matching_labels = \ matching_labels = \
[label for label in labels [label for label in labels
if label in playlist_config[self.playlist_key]] if label in playlist_config[self.playlist_key]["query"]]
if len(matching_labels) == 0: if len(matching_labels) == 0:
return float('-inf') if self.ascending else float('inf') return float('-inf') if self.ascending else float('inf')