Compare commits
3 commits
a5ccea6b33
...
e576ce1278
| Author | SHA1 | Date | |
|---|---|---|---|
| e576ce1278 | |||
| 55205ec0ac | |||
| ca8d753cd0 |
4 changed files with 174 additions and 38 deletions
|
|
@ -63,6 +63,7 @@ beetslabels:
|
||||||
# or: "label:a , label:b"
|
# or: "label:a , label:b"
|
||||||
# in the future? concat: ["label:a", "label:b"]
|
# in the future? concat: ["label:a", "label:b"]
|
||||||
query: "label:effortless"
|
query: "label:effortless"
|
||||||
|
sort: "effortless + exercise"
|
||||||
|
|
||||||
smartplaylist:
|
smartplaylist:
|
||||||
relative_to: ~/Music/0beets_playlists
|
relative_to: ~/Music/0beets_playlists
|
||||||
|
|
@ -86,7 +87,7 @@ smartplaylist:
|
||||||
- name: "precision.m3u"
|
- name: "precision.m3u"
|
||||||
query: ['plprecision:1']
|
query: ['plprecision:1']
|
||||||
- name: "exercise.m3u"
|
- name: "exercise.m3u"
|
||||||
query: ['label:exercise-'] # ['^plexercise::^$ plexercise-']
|
query: ['sorted_playlist:exercise'] # ['^plexercise::^$ plexercise-']
|
||||||
- name: "chiptune.m3u"
|
- name: "chiptune.m3u"
|
||||||
query: ['genre:chiptune', 'genre:8-bit', 'plchiptune:1']
|
query: ['genre:chiptune', 'genre:8-bit', 'plchiptune:1']
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -3,7 +3,7 @@ from beets.plugins import BeetsPlugin
|
||||||
from beets.ui import print_
|
from beets.ui import print_
|
||||||
|
|
||||||
from beets.dbcore import types
|
from beets.dbcore import types
|
||||||
|
from typing import Any
|
||||||
import json
|
import json
|
||||||
|
|
||||||
from beets.dbcore import queryparse, query
|
from beets.dbcore import queryparse, query
|
||||||
|
|
@ -18,6 +18,75 @@ import re
|
||||||
from .labels import HasLabelQuery, labels_command, LABELS_FIELD_NAME, LabelValueSort
|
from .labels import HasLabelQuery, labels_command, LABELS_FIELD_NAME, LabelValueSort
|
||||||
from .playlists import initialize_playlists, expand_playlist_query, valid_playlist, PlaylistValueSort
|
from .playlists import initialize_playlists, expand_playlist_query, valid_playlist, PlaylistValueSort
|
||||||
|
|
||||||
|
|
||||||
|
class KeyValueDelimitedString(types.Type):
|
||||||
|
"""A dict type stored as 'key:value; key2:value2' in the database.
|
||||||
|
|
||||||
|
Stores dictionaries with string keys and integer values in a human-readable
|
||||||
|
semicolon-delimited format. Values default to 1 if not specified.
|
||||||
|
"""
|
||||||
|
|
||||||
|
sql = "TEXT"
|
||||||
|
query = query.SubstringQuery
|
||||||
|
model_type = dict
|
||||||
|
|
||||||
|
@property
|
||||||
|
def null(self):
|
||||||
|
return {}
|
||||||
|
|
||||||
|
def format(self, value: dict | None) -> str:
|
||||||
|
"""Format dict as 'key:value; key2:value2' string."""
|
||||||
|
if not value:
|
||||||
|
return ""
|
||||||
|
# Sort for consistent output
|
||||||
|
return "; ".join(f"{k}:{v}" for k, v in sorted(value.items()))
|
||||||
|
|
||||||
|
def parse(self, string: str) -> dict:
|
||||||
|
"""Parse 'key:value; key2:value2' string or legacy JSON into dict."""
|
||||||
|
if not string:
|
||||||
|
return {}
|
||||||
|
|
||||||
|
# Try parsing as JSON first (for backward compatibility)
|
||||||
|
if string.startswith('{'):
|
||||||
|
try:
|
||||||
|
return json.loads(string)
|
||||||
|
except (json.JSONDecodeError, ValueError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Parse as semicolon-delimited format
|
||||||
|
result = {}
|
||||||
|
for pair in string.split("; "):
|
||||||
|
pair = pair.strip()
|
||||||
|
if not pair:
|
||||||
|
continue
|
||||||
|
if ":" in pair:
|
||||||
|
k, v = pair.split(":", 1)
|
||||||
|
k = k.strip()
|
||||||
|
v = v.strip()
|
||||||
|
try:
|
||||||
|
result[k] = int(v)
|
||||||
|
except ValueError:
|
||||||
|
result[k] = 1
|
||||||
|
else:
|
||||||
|
result[pair] = 1
|
||||||
|
return result
|
||||||
|
|
||||||
|
def to_sql(self, model_value: Any) -> str:
|
||||||
|
"""Convert dict to SQL string."""
|
||||||
|
if model_value is None:
|
||||||
|
return ""
|
||||||
|
return self.format(model_value)
|
||||||
|
|
||||||
|
def normalize(self, value: Any) -> dict:
|
||||||
|
"""Normalize value to dict."""
|
||||||
|
if value is None:
|
||||||
|
return {}
|
||||||
|
if isinstance(value, dict):
|
||||||
|
return value
|
||||||
|
if isinstance(value, str):
|
||||||
|
return self.parse(value)
|
||||||
|
return {}
|
||||||
|
|
||||||
class LabelValueSortsDict(dict):
|
class LabelValueSortsDict(dict):
|
||||||
"""Custom dict that returns LabelValueSort for any label:* key."""
|
"""Custom dict that returns LabelValueSort for any label:* key."""
|
||||||
|
|
||||||
|
|
@ -46,10 +115,23 @@ def parse_sorted_query_override(
|
||||||
"""Given a list of strings, create the `Query` and `Sort` that they
|
"""Given a list of strings, create the `Query` and `Sort` that they
|
||||||
represent.
|
represent.
|
||||||
"""
|
"""
|
||||||
# First, expand any playlist: macros
|
# First, expand any playlist: and sorted_playlist: macros
|
||||||
expanded_parts = []
|
expanded_parts = []
|
||||||
|
|
||||||
for part in parts:
|
for part in parts:
|
||||||
if part.startswith('playlist:') and not re.match(r"[+-]$", part):
|
if part.startswith('sorted_playlist:'):
|
||||||
|
# Extract playlist name
|
||||||
|
playlist_name = part[len("sorted_playlist:"):]
|
||||||
|
|
||||||
|
if valid_playlist(playlist_name):
|
||||||
|
# Expand to the query
|
||||||
|
expanded_parts.extend(expand_playlist_query(playlist_name))
|
||||||
|
# Inject the sort
|
||||||
|
expanded_parts.append(f"playlist:{playlist_name}-")
|
||||||
|
else:
|
||||||
|
# Unknown playlist, just let it go through
|
||||||
|
expanded_parts.append(part)
|
||||||
|
elif part.startswith('playlist:') and not re.match(r"[+-]$", part):
|
||||||
playlist_name = part[len("playlist:"):]
|
playlist_name = part[len("playlist:"):]
|
||||||
|
|
||||||
if valid_playlist(playlist_name):
|
if valid_playlist(playlist_name):
|
||||||
|
|
@ -104,12 +186,10 @@ def parse_sorted_query_override(
|
||||||
return q, s
|
return q, s
|
||||||
|
|
||||||
class BeetsLabelsPlugin(BeetsPlugin):
|
class BeetsLabelsPlugin(BeetsPlugin):
|
||||||
# item_queries = { 'has_label': DelimitedHasExact }
|
|
||||||
# item_types = {'mylabels': types.SEMICOLON_SPACE_DSV}
|
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
self.item_queries = {"label": HasLabelQuery}
|
self.item_queries = {"label": HasLabelQuery}
|
||||||
|
self.item_types = {LABELS_FIELD_NAME: KeyValueDelimitedString()}
|
||||||
|
|
||||||
playlists_config = self.config['playlists'].get()
|
playlists_config = self.config['playlists'].get()
|
||||||
initialize_playlists(playlists_config)
|
initialize_playlists(playlists_config)
|
||||||
|
|
|
||||||
|
|
@ -20,7 +20,7 @@ def do_modify_labels(lib, objs, label, action):
|
||||||
labels = {}
|
labels = {}
|
||||||
if action != 2:
|
if action != 2:
|
||||||
if LABELS_FIELD_NAME in obj:
|
if LABELS_FIELD_NAME in obj:
|
||||||
labels = json.loads(obj[LABELS_FIELD_NAME])
|
labels = dict(obj[LABELS_FIELD_NAME]) # Make a copy
|
||||||
|
|
||||||
split = label.split(":")
|
split = label.split(":")
|
||||||
|
|
||||||
|
|
@ -33,7 +33,7 @@ def do_modify_labels(lib, objs, label, action):
|
||||||
del labels[label]
|
del labels[label]
|
||||||
|
|
||||||
obj_mods = {
|
obj_mods = {
|
||||||
LABELS_FIELD_NAME: json.dumps(labels)
|
LABELS_FIELD_NAME: labels
|
||||||
}
|
}
|
||||||
if print_and_modify(obj, obj_mods, []) and obj not in changed:
|
if print_and_modify(obj, obj_mods, []) and obj not in changed:
|
||||||
changed.append(obj)
|
changed.append(obj)
|
||||||
|
|
@ -94,8 +94,8 @@ def modify_labels(lib, opts, args):
|
||||||
if actnum == 3:
|
if actnum == 3:
|
||||||
for obj in items:
|
for obj in items:
|
||||||
if LABELS_FIELD_NAME in obj:
|
if LABELS_FIELD_NAME in obj:
|
||||||
labels = json.loads(obj[LABELS_FIELD_NAME])
|
labels = obj[LABELS_FIELD_NAME]
|
||||||
labelstr = ";".join([f"{key}:{labels[key]}" for key in labels.keys()])
|
labelstr = "; ".join([f"{key}:{labels[key]}" for key in labels.keys()])
|
||||||
print_(f"{obj.title}: {labelstr}")
|
print_(f"{obj.title}: {labelstr}")
|
||||||
else:
|
else:
|
||||||
do_modify_labels(lib, items, label, actnum)
|
do_modify_labels(lib, items, label, actnum)
|
||||||
|
|
@ -141,13 +141,13 @@ def transfer_labels(lib, opts, args):
|
||||||
print_("Source has no labels to transfer.")
|
print_("Source has no labels to transfer.")
|
||||||
return
|
return
|
||||||
|
|
||||||
labels = json.loads(source[LABELS_FIELD_NAME])
|
labels = source[LABELS_FIELD_NAME]
|
||||||
labelstr = ";".join([f"{key}:{labels[key]}" for key in labels.keys()])
|
labelstr = "; ".join([f"{key}:{labels[key]}" for key in labels.keys()])
|
||||||
print_(f"Labels to transfer: {labelstr}")
|
print_(f"Labels to transfer: {labelstr}")
|
||||||
|
|
||||||
if LABELS_FIELD_NAME in dest and dest[LABELS_FIELD_NAME]:
|
if LABELS_FIELD_NAME in dest and dest[LABELS_FIELD_NAME]:
|
||||||
dest_labels = json.loads(dest[LABELS_FIELD_NAME])
|
dest_labels = dest[LABELS_FIELD_NAME]
|
||||||
dest_labelstr = ";".join([f"{key}:{dest_labels[key]}" for key in dest_labels.keys()])
|
dest_labelstr = "; ".join([f"{key}:{dest_labels[key]}" for key in dest_labels.keys()])
|
||||||
print_(f"WARNING: Destination already has labels: {dest_labelstr}")
|
print_(f"WARNING: Destination already has labels: {dest_labelstr}")
|
||||||
print_("These will be overwritten!")
|
print_("These will be overwritten!")
|
||||||
print_("")
|
print_("")
|
||||||
|
|
@ -178,7 +178,7 @@ def edit_labels(lib, opts, args):
|
||||||
for item in items_list:
|
for item in items_list:
|
||||||
labels = {}
|
labels = {}
|
||||||
if LABELS_FIELD_NAME in item and item[LABELS_FIELD_NAME]:
|
if LABELS_FIELD_NAME in item and item[LABELS_FIELD_NAME]:
|
||||||
labels = json.loads(item[LABELS_FIELD_NAME])
|
labels = item[LABELS_FIELD_NAME]
|
||||||
|
|
||||||
# Format: artist - title:\n - key: value
|
# Format: artist - title:\n - key: value
|
||||||
if item.artist:
|
if item.artist:
|
||||||
|
|
@ -261,7 +261,7 @@ def edit_labels(lib, opts, args):
|
||||||
# Compare with old labels
|
# Compare with old labels
|
||||||
old_labels = {}
|
old_labels = {}
|
||||||
if LABELS_FIELD_NAME in item and item[LABELS_FIELD_NAME]:
|
if LABELS_FIELD_NAME in item and item[LABELS_FIELD_NAME]:
|
||||||
old_labels = json.loads(item[LABELS_FIELD_NAME])
|
old_labels = item[LABELS_FIELD_NAME]
|
||||||
|
|
||||||
if new_labels != old_labels:
|
if new_labels != old_labels:
|
||||||
changes.append((item, new_labels))
|
changes.append((item, new_labels))
|
||||||
|
|
@ -277,10 +277,10 @@ def edit_labels(lib, opts, args):
|
||||||
for item, new_labels in changes:
|
for item, new_labels in changes:
|
||||||
old_labels = {}
|
old_labels = {}
|
||||||
if LABELS_FIELD_NAME in item and item[LABELS_FIELD_NAME]:
|
if LABELS_FIELD_NAME in item and item[LABELS_FIELD_NAME]:
|
||||||
old_labels = json.loads(item[LABELS_FIELD_NAME])
|
old_labels = item[LABELS_FIELD_NAME]
|
||||||
|
|
||||||
old_str = ";".join([f"{k}:{v}" for k, v in sorted(old_labels.items())]) if old_labels else "(none)"
|
old_str = "; ".join([f"{k}:{v}" for k, v in sorted(old_labels.items())]) if old_labels else "(none)"
|
||||||
new_str = ";".join([f"{k}:{v}" for k, v in sorted(new_labels.items())]) if new_labels else "(none)"
|
new_str = "; ".join([f"{k}:{v}" for k, v in sorted(new_labels.items())]) if new_labels else "(none)"
|
||||||
|
|
||||||
print_(f"{item.artist} - {item.title}")
|
print_(f"{item.artist} - {item.title}")
|
||||||
print_(f" Old: {old_str}")
|
print_(f" Old: {old_str}")
|
||||||
|
|
@ -292,7 +292,7 @@ def edit_labels(lib, opts, args):
|
||||||
# Apply changes
|
# Apply changes
|
||||||
with lib.transaction():
|
with lib.transaction():
|
||||||
for item, new_labels in changes:
|
for item, new_labels in changes:
|
||||||
item[LABELS_FIELD_NAME] = json.dumps(new_labels)
|
item[LABELS_FIELD_NAME] = new_labels
|
||||||
item.try_sync(True, False, False)
|
item.try_sync(True, False, False)
|
||||||
|
|
||||||
print_(f"Updated {len(changes)} item(s).")
|
print_(f"Updated {len(changes)} item(s).")
|
||||||
|
|
@ -305,16 +305,14 @@ class HasLabelQuery(FieldQuery):
|
||||||
super().__init__(LABELS_FIELD_NAME, pattern, False)
|
super().__init__(LABELS_FIELD_NAME, pattern, False)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def value_match(self, pattern, jsonstr):
|
def value_match(self, pattern, labels):
|
||||||
if jsonstr is not None:
|
if labels is not None:
|
||||||
label = pattern
|
label = pattern
|
||||||
value = None
|
value = None
|
||||||
if "." in pattern:
|
if "." in pattern:
|
||||||
label,value = pattern.split(".")
|
label, value = pattern.split(".")
|
||||||
|
|
||||||
labels = json.loads(jsonstr)
|
if value is None:
|
||||||
|
|
||||||
if value == None:
|
|
||||||
return label in labels
|
return label in labels
|
||||||
else:
|
else:
|
||||||
return label in labels and str(labels[label]) == value
|
return label in labels and str(labels[label]) == value
|
||||||
|
|
@ -333,14 +331,12 @@ class LabelValueSort(SlowFieldSort):
|
||||||
|
|
||||||
def sort(self, objs):
|
def sort(self, objs):
|
||||||
def key(obj):
|
def key(obj):
|
||||||
labels_json = obj.get(LABELS_FIELD_NAME, None)
|
labels = obj.get(LABELS_FIELD_NAME, None)
|
||||||
|
|
||||||
if not labels_json:
|
if not labels:
|
||||||
# No labels, return minimum value for sorting
|
# No labels, return minimum value for sorting
|
||||||
return float('-inf') if self.ascending else float('inf')
|
return float('-inf') if self.ascending else float('inf')
|
||||||
|
|
||||||
labels = json.loads(labels_json)
|
|
||||||
|
|
||||||
if self.label_key:
|
if self.label_key:
|
||||||
value = labels.get(self.label_key, float('-inf') if self.ascending else float('inf'))
|
value = labels.get(self.label_key, float('-inf') if self.ascending else float('inf'))
|
||||||
else:
|
else:
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,8 @@ from beets.dbcore.query import SlowFieldSort
|
||||||
from .labels import LABELS_FIELD_NAME
|
from .labels import LABELS_FIELD_NAME
|
||||||
|
|
||||||
import json
|
import json
|
||||||
|
import ast
|
||||||
|
import re
|
||||||
|
|
||||||
playlist_config = {}
|
playlist_config = {}
|
||||||
|
|
||||||
|
|
@ -12,37 +14,94 @@ def initialize_playlists(playlists):
|
||||||
for playlist in playlists:
|
for playlist in playlists:
|
||||||
pl_name = playlist["name"]
|
pl_name = playlist["name"]
|
||||||
query = [playlist["query"], ",", f"label:{pl_name}"]
|
query = [playlist["query"], ",", f"label:{pl_name}"]
|
||||||
playlist_config[pl_name] = query
|
sort_expr = playlist.get("sort", None)
|
||||||
|
playlist_config[pl_name] = {
|
||||||
|
"query": query,
|
||||||
|
"sort": sort_expr
|
||||||
|
}
|
||||||
|
|
||||||
def valid_playlist(playlist_name):
|
def valid_playlist(playlist_name):
|
||||||
return playlist_name in playlist_config
|
return playlist_name in playlist_config
|
||||||
|
|
||||||
def expand_playlist_query(playlist_name):
|
def expand_playlist_query(playlist_name):
|
||||||
print(playlist_config[playlist_name])
|
return playlist_config[playlist_name]["query"]
|
||||||
return playlist_config[playlist_name]
|
|
||||||
|
def get_sort_expression(playlist_name):
|
||||||
|
"""Get the sort expression for a playlist, or None if not defined."""
|
||||||
|
return playlist_config[playlist_name].get("sort")
|
||||||
|
|
||||||
|
def extract_label_names(expression):
|
||||||
|
"""Extract label names from a Python expression.
|
||||||
|
|
||||||
|
Returns a set of identifier names found in the expression.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
tree = ast.parse(expression, mode='eval')
|
||||||
|
label_names = set()
|
||||||
|
|
||||||
|
for node in ast.walk(tree):
|
||||||
|
if isinstance(node, ast.Name):
|
||||||
|
label_names.add(node.id)
|
||||||
|
|
||||||
|
return label_names
|
||||||
|
except SyntaxError:
|
||||||
|
return set()
|
||||||
|
|
||||||
|
def evaluate_sort_expression(expression, labels):
|
||||||
|
"""Evaluate a sort expression with label values.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
expression: Python expression like "effortless + exercise * 2"
|
||||||
|
labels: Dict of label names to values
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The evaluated result, or a default value if evaluation fails.
|
||||||
|
"""
|
||||||
|
if not expression:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
# Extract label names from the expression
|
||||||
|
label_names = extract_label_names(expression)
|
||||||
|
|
||||||
|
# Build a namespace with label values (defaulting to 0 for missing labels)
|
||||||
|
namespace = {name: labels.get(name, 0) for name in label_names}
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Evaluate with restricted builtins for safety
|
||||||
|
result = eval(expression, {"__builtins__": {}}, namespace)
|
||||||
|
return float(result) if result is not None else 0
|
||||||
|
except Exception:
|
||||||
|
# If evaluation fails, return 0
|
||||||
|
return 0
|
||||||
|
|
||||||
class PlaylistValueSort(SlowFieldSort):
|
class PlaylistValueSort(SlowFieldSort):
|
||||||
def __init__(self, field, ascending=True, case_insensitive=True):
|
def __init__(self, field, ascending=True, case_insensitive=True):
|
||||||
super().__init__(field, ascending, case_insensitive)
|
super().__init__(field, ascending, case_insensitive)
|
||||||
|
|
||||||
self.playlist_key = field[len("playlist:"):]
|
self.playlist_key = field[len("playlist:"):]
|
||||||
|
self.sort_expr = get_sort_expression(self.playlist_key)
|
||||||
|
|
||||||
def sort(self, objs):
|
def sort(self, objs):
|
||||||
def key(obj):
|
def key(obj):
|
||||||
labels_json = obj.get(LABELS_FIELD_NAME, None)
|
labels = obj.get(LABELS_FIELD_NAME, None)
|
||||||
|
|
||||||
if not labels_json:
|
if not labels:
|
||||||
# No labels, return minimum value for sorting
|
# No labels, return minimum value for sorting
|
||||||
return float('-inf') if self.ascending else float('inf')
|
return float('-inf') if self.ascending else float('inf')
|
||||||
|
|
||||||
labels = json.loads(labels_json)
|
# If there's a sort expression, evaluate it
|
||||||
|
if self.sort_expr:
|
||||||
|
return evaluate_sort_expression(self.sort_expr, labels)
|
||||||
|
|
||||||
|
# Otherwise, use the old behavior:
|
||||||
|
# Check if there's a label with the playlist name
|
||||||
if self.playlist_key in labels:
|
if self.playlist_key in labels:
|
||||||
return labels[self.playlist_key]
|
return labels[self.playlist_key]
|
||||||
|
|
||||||
|
# Sum all matching label values from the playlist query
|
||||||
matching_labels = \
|
matching_labels = \
|
||||||
[label for label in labels
|
[label for label in labels
|
||||||
if label in playlist_config[self.playlist_key]]
|
if label in playlist_config[self.playlist_key]["query"]]
|
||||||
|
|
||||||
if len(matching_labels) == 0:
|
if len(matching_labels) == 0:
|
||||||
return float('-inf') if self.ascending else float('inf')
|
return float('-inf') if self.ascending else float('inf')
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue