beets-config/plugins/labels.py

393 lines
12 KiB
Python

from beets import ui
from beets.ui import Subcommand, print_
from beets.ui.commands.modify import print_and_modify
from beets.dbcore import FieldQuery
from beets.dbcore.query import SlowFieldSort
import re
import json
import os
import subprocess
import shlex
from tempfile import NamedTemporaryFile
from beets import util
LABELS_FIELD_NAME = "mylabels"
def do_modify_labels(lib, objs, label, action):
changed = []
for obj in objs:
labels = {}
if action != 2:
if LABELS_FIELD_NAME in obj:
labels = dict(obj[LABELS_FIELD_NAME]) # Make a copy
split = label.split(":")
if action == 0:
if len(split) == 1:
labels[label] = 1
else:
labels[split[0]] = int(split[1])
elif label in labels:
del labels[label]
obj_mods = {
LABELS_FIELD_NAME: labels
}
if print_and_modify(obj, obj_mods, []) and obj not in changed:
changed.append(obj)
# Still something to do?
if not changed:
print_("No changes to make.")
return
# Confirm action.
changed = ui.input_select_objects(
"Really modify",
changed,
lambda o: print_and_modify(o, mods, dels),
)
# Apply changes to database and files
with lib.transaction():
for obj in changed:
obj.try_sync(True, False, False)
action_map = {
"add": 0,
"remove": 1,
"removeall": 2,
"show": 3,
"transfer": 4,
"edit": 5
}
def modify_labels(lib, opts, args):
if not args:
labels_command.parser.print_usage()
return
action = args[0]
if action not in action_map:
print_("%s is not a valid action. Run 'beet labels --help' for usage." % action)
return
actnum = action_map[action]
if actnum == 4: # transfer
transfer_labels(lib, opts, args[1:])
return
if actnum == 5: # edit
edit_labels(lib, opts, args[1:])
return
if actnum >= 2:
label = None
query = args[1:]
else:
label = args[1]
query = args[2:]
items = lib.items(query=query)
if actnum == 3:
for obj in items:
if LABELS_FIELD_NAME in obj:
labels = obj[LABELS_FIELD_NAME]
labelstr = "; ".join([f"{key}:{labels[key]}" for key in labels.keys()])
print_(f"{obj.title}: {labelstr}")
else:
do_modify_labels(lib, items, label, actnum)
def transfer_labels(lib, opts, args):
source_query = args[0]
dest_query = args[1]
source_items = lib.items(query=source_query)
dest_items = lib.items(query=dest_query)
source_list = list(source_items)
dest_list = list(dest_items)
if len(source_list) == 0:
print_("No source items found.")
return
if len(dest_list) == 0:
print_("No destination items found.")
return
if len(source_list) > 1:
print_("Multiple source items found. Please refine your query to select one source item.")
for item in source_list:
print_(f" - {item.artist} - {item.title} ({item.path})")
return
if len(dest_list) > 1:
print_("Multiple destination items found. Please refine your query to select one destination item.")
for item in dest_list:
print_(f" - {item.artist} - {item.title} ({item.path})")
return
source = source_list[0]
dest = dest_list[0]
print_(f"Source: {source.artist} - {source.title}")
print_(f"Dest: {dest.artist} - {dest.title}")
print_("")
if LABELS_FIELD_NAME not in source or not source[LABELS_FIELD_NAME]:
print_("Source has no labels to transfer.")
return
labels = source[LABELS_FIELD_NAME]
labelstr = "; ".join([f"{key}:{labels[key]}" for key in labels.keys()])
print_(f"Labels to transfer: {labelstr}")
if LABELS_FIELD_NAME in dest and dest[LABELS_FIELD_NAME]:
dest_labels = dest[LABELS_FIELD_NAME]
dest_labelstr = "; ".join([f"{key}:{dest_labels[key]}" for key in dest_labels.keys()])
print_(f"WARNING: Destination already has labels: {dest_labelstr}")
print_("These will be overwritten!")
print_("")
if not ui.input_yn("Transfer labels? (y/n)", True):
return
with lib.transaction():
dest[LABELS_FIELD_NAME] = source[LABELS_FIELD_NAME]
dest.try_sync(True, False, False)
print_("Transfer complete.")
def edit_labels(lib, opts, args):
"""Edit labels for items matching the query in a text editor."""
query = args
items = lib.items(query=query)
items_list = list(items)
if not items_list:
print_("No items found.")
return
# Create a temporary file with labels
temp_file = NamedTemporaryFile(mode='w', suffix='.txt', delete=False, encoding='utf-8')
# Write items and their labels to the temp file
for item in items_list:
labels = {}
if LABELS_FIELD_NAME in item and item[LABELS_FIELD_NAME]:
labels = item[LABELS_FIELD_NAME]
# Format: artist - title:\n - key: value
if item.artist:
temp_file.write(f"{item.artist} - {item.title}:\n")
else:
temp_file.write(f"{item.title}:\n")
for key in sorted(labels.keys()):
temp_file.write(f"- {key}")
if labels[key] > 1:
temp_file.write(f": {labels[key]}")
temp_file.write("\n")
temp_file.write("\n") # Blank line between items
temp_file.close()
# Open in editor
editor = util.editor_command()
cmd = shlex.split(editor)
cmd.append(temp_file.name)
try:
subprocess.call(cmd)
except OSError as exc:
os.remove(temp_file.name)
raise ui.UserError(f"could not run editor command {cmd[0]!r}: {exc}")
# Read back the edited file
try:
with open(temp_file.name, 'r', encoding='utf-8') as f:
lines = f.readlines()
finally:
os.remove(temp_file.name)
# Build a lookup from header string to item
def item_header(item):
if item.artist:
return f"{item.artist} - {item.title}:"
return f"{item.title}:"
item_by_header = {item_header(item): item for item in items_list}
# Parse the edited labels and apply changes
# Format: "Artist - Title:\n - key: value\n - key2: value2\n\n"
changes = []
i = 0
while i < len(lines):
line = lines[i].strip()
# Skip blank lines
if not line:
i += 1
continue
# Check if this is an item header (ends with :)
if line.endswith(':'):
item = item_by_header.get(line)
i += 1
# Now read all label lines for this item
new_labels = {}
while i < len(lines):
label_line = lines[i]
# Check if this is a label line (starts with "- ")
if label_line.startswith('- '):
# Parse: "- key: value"
label_content = label_line.strip()[2:].strip() # Remove "- "
if ':' in label_content:
key, val = label_content.split(':', 1)
key = key.strip()
val = val.strip()
try:
new_labels[key] = int(val)
except ValueError:
if item:
print_(f"Warning: Invalid value for label '{key}' in item '{item.title}', skipping")
else:
new_labels[label_content.strip()] = 1
i += 1
else:
# Not a label line, done with this item
break
if item is None:
print_(f"Warning: Header '{line}' not found in query results, skipping")
continue
# Compare with old labels
old_labels = {}
if LABELS_FIELD_NAME in item and item[LABELS_FIELD_NAME]:
old_labels = item[LABELS_FIELD_NAME]
if new_labels != old_labels:
changes.append((item, new_labels))
else:
# Unexpected format, skip
i += 1
# Items removed from the file get their labels cleared
headers_in_file = set()
i = 0
while i < len(lines):
line = lines[i].strip()
if line.endswith(':') and line in item_by_header:
headers_in_file.add(line)
i += 1
for item in items_list:
header = item_header(item)
if header not in headers_in_file:
old_labels = {}
if LABELS_FIELD_NAME in item and item[LABELS_FIELD_NAME]:
old_labels = item[LABELS_FIELD_NAME]
if old_labels:
changes.append((item, {}))
if not changes:
print_("No changes to make.")
return
# Show changes and confirm
for item, new_labels in changes:
old_labels = {}
if LABELS_FIELD_NAME in item and item[LABELS_FIELD_NAME]:
old_labels = item[LABELS_FIELD_NAME]
old_str = "; ".join([f"{k}:{v}" for k, v in sorted(old_labels.items())]) if old_labels else "(none)"
new_str = "; ".join([f"{k}:{v}" for k, v in sorted(new_labels.items())]) if new_labels else "(none)"
print_(f"{item.artist} - {item.title}")
print_(f" Old: {old_str}")
print_(f" New: {new_str}")
if not ui.input_yn(f"Apply changes to {len(changes)} item(s)? (y/n)", True):
return
# Apply changes
with lib.transaction():
for item, new_labels in changes:
item[LABELS_FIELD_NAME] = new_labels
item.try_sync(True, False, False)
print_(f"Updated {len(changes)} item(s).")
labels_command = Subcommand(
'labels',
help='Add, remove, or inspect labels on library items'
)
labels_command.parser.usage = """%prog <action> [args] [query]
Actions:
add <label>[:<cost>] [query] Add a label (optionally with a cost value)
remove <label> [query] Remove a label from items
removeall [query] Remove all labels from items
show [query] Display labels on items
transfer <src-query> <dst-query>
Copy labels from one item to another
edit [query] Open items' labels in a text editor"""
labels_command.func = modify_labels
class HasLabelQuery(FieldQuery):
def __init__(self, _, pattern: str, __):
super().__init__(LABELS_FIELD_NAME, pattern, False)
@classmethod
def value_match(self, pattern, labels):
if labels is not None:
label = pattern
value = None
if "." in pattern:
label, value = pattern.split(".")
if value is None:
return label in labels
else:
return label in labels and str(labels[label]) == value
return False
class LabelValueSort(SlowFieldSort):
def __init__(self, field, ascending=True, case_insensitive=True):
super().__init__(field, ascending, case_insensitive)
# Extract the label key from the field name
# Field format: "label:<labelkey>" or just "labels"
if field.startswith("label:"):
self.label_key = field[len("label:"):]
else:
self.label_key = None
def sort(self, objs):
def key(obj):
labels = obj.get(LABELS_FIELD_NAME, None)
if not labels:
# No labels, return minimum value for sorting
return float('-inf') if self.ascending else float('inf')
if self.label_key:
value = labels.get(self.label_key, float('-inf') if self.ascending else float('inf'))
else:
value = max(labels.values()) if labels else (float('-inf') if self.ascending else float('inf'))
return value
return sorted(objs, key=key, reverse=not self.ascending)