363 lines
11 KiB
Python
363 lines
11 KiB
Python
from beets import ui
|
|
from beets.ui import Subcommand, print_
|
|
from beets.ui.commands.modify import print_and_modify
|
|
from beets.dbcore import FieldQuery
|
|
from beets.dbcore.query import SlowFieldSort
|
|
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import shlex
|
|
from tempfile import NamedTemporaryFile
|
|
from beets import util
|
|
|
|
LABELS_FIELD_NAME = "mylabels"
|
|
|
|
def do_modify_labels(lib, objs, label, action):
|
|
changed = []
|
|
|
|
for obj in objs:
|
|
labels = {}
|
|
if action != 2:
|
|
if LABELS_FIELD_NAME in obj:
|
|
labels = dict(obj[LABELS_FIELD_NAME]) # Make a copy
|
|
|
|
split = label.split(":")
|
|
|
|
if action == 0:
|
|
if len(split) == 1:
|
|
labels[label] = 1
|
|
else:
|
|
labels[split[0]] = int(split[1])
|
|
elif label in labels:
|
|
del labels[label]
|
|
|
|
obj_mods = {
|
|
LABELS_FIELD_NAME: labels
|
|
}
|
|
if print_and_modify(obj, obj_mods, []) and obj not in changed:
|
|
changed.append(obj)
|
|
|
|
# Still something to do?
|
|
if not changed:
|
|
print_("No changes to make.")
|
|
return
|
|
|
|
# Confirm action.
|
|
changed = ui.input_select_objects(
|
|
"Really modify",
|
|
changed,
|
|
lambda o: print_and_modify(o, mods, dels),
|
|
)
|
|
|
|
# Apply changes to database and files
|
|
with lib.transaction():
|
|
for obj in changed:
|
|
obj.try_sync(True, False, False)
|
|
|
|
action_map = {
|
|
"add": 0,
|
|
"remove": 1,
|
|
"removeall": 2,
|
|
"show": 3,
|
|
"transfer": 4,
|
|
"edit": 5
|
|
}
|
|
|
|
def modify_labels(lib, opts, args):
|
|
if not args:
|
|
labels_command.parser.print_usage()
|
|
return
|
|
|
|
action = args[0]
|
|
|
|
if action not in action_map:
|
|
print_("%s is not a valid action. Run 'beet labels --help' for usage." % action)
|
|
return
|
|
|
|
actnum = action_map[action]
|
|
|
|
if actnum == 4: # transfer
|
|
transfer_labels(lib, opts, args[1:])
|
|
return
|
|
|
|
if actnum == 5: # edit
|
|
edit_labels(lib, opts, args[1:])
|
|
return
|
|
|
|
if actnum >= 2:
|
|
label = None
|
|
query = args[1:]
|
|
else:
|
|
label = args[1]
|
|
query = args[2:]
|
|
|
|
items = lib.items(query=query)
|
|
|
|
if actnum == 3:
|
|
for obj in items:
|
|
if LABELS_FIELD_NAME in obj:
|
|
labels = obj[LABELS_FIELD_NAME]
|
|
labelstr = "; ".join([f"{key}:{labels[key]}" for key in labels.keys()])
|
|
print_(f"{obj.title}: {labelstr}")
|
|
else:
|
|
do_modify_labels(lib, items, label, actnum)
|
|
|
|
def transfer_labels(lib, opts, args):
|
|
source_query = args[0]
|
|
dest_query = args[1]
|
|
|
|
source_items = lib.items(query=source_query)
|
|
dest_items = lib.items(query=dest_query)
|
|
|
|
source_list = list(source_items)
|
|
dest_list = list(dest_items)
|
|
|
|
if len(source_list) == 0:
|
|
print_("No source items found.")
|
|
return
|
|
|
|
if len(dest_list) == 0:
|
|
print_("No destination items found.")
|
|
return
|
|
|
|
if len(source_list) > 1:
|
|
print_("Multiple source items found. Please refine your query to select one source item.")
|
|
for item in source_list:
|
|
print_(f" - {item.artist} - {item.title} ({item.path})")
|
|
return
|
|
|
|
if len(dest_list) > 1:
|
|
print_("Multiple destination items found. Please refine your query to select one destination item.")
|
|
for item in dest_list:
|
|
print_(f" - {item.artist} - {item.title} ({item.path})")
|
|
return
|
|
|
|
source = source_list[0]
|
|
dest = dest_list[0]
|
|
|
|
print_(f"Source: {source.artist} - {source.title}")
|
|
print_(f"Dest: {dest.artist} - {dest.title}")
|
|
print_("")
|
|
|
|
if LABELS_FIELD_NAME not in source or not source[LABELS_FIELD_NAME]:
|
|
print_("Source has no labels to transfer.")
|
|
return
|
|
|
|
labels = source[LABELS_FIELD_NAME]
|
|
labelstr = "; ".join([f"{key}:{labels[key]}" for key in labels.keys()])
|
|
print_(f"Labels to transfer: {labelstr}")
|
|
|
|
if LABELS_FIELD_NAME in dest and dest[LABELS_FIELD_NAME]:
|
|
dest_labels = dest[LABELS_FIELD_NAME]
|
|
dest_labelstr = "; ".join([f"{key}:{dest_labels[key]}" for key in dest_labels.keys()])
|
|
print_(f"WARNING: Destination already has labels: {dest_labelstr}")
|
|
print_("These will be overwritten!")
|
|
print_("")
|
|
|
|
if not ui.input_yn("Transfer labels? (y/n)", True):
|
|
return
|
|
|
|
with lib.transaction():
|
|
dest[LABELS_FIELD_NAME] = source[LABELS_FIELD_NAME]
|
|
dest.try_sync(True, False, False)
|
|
|
|
print_("Transfer complete.")
|
|
|
|
def edit_labels(lib, opts, args):
|
|
"""Edit labels for items matching the query in a text editor."""
|
|
query = args
|
|
items = lib.items(query=query)
|
|
items_list = list(items)
|
|
|
|
if not items_list:
|
|
print_("No items found.")
|
|
return
|
|
|
|
# Create a temporary file with labels
|
|
temp_file = NamedTemporaryFile(mode='w', suffix='.txt', delete=False, encoding='utf-8')
|
|
|
|
# Write items and their labels to the temp file
|
|
for item in items_list:
|
|
labels = {}
|
|
if LABELS_FIELD_NAME in item and item[LABELS_FIELD_NAME]:
|
|
labels = item[LABELS_FIELD_NAME]
|
|
|
|
# Format: artist - title:\n - key: value
|
|
if item.artist:
|
|
temp_file.write(f"{item.artist} - {item.title}:\n")
|
|
else:
|
|
temp_file.write(f"{item.title}:\n")
|
|
|
|
for key in sorted(labels.keys()):
|
|
temp_file.write(f"- {key}")
|
|
|
|
if labels[key] > 1:
|
|
temp_file.write(f": {labels[key]}")
|
|
|
|
temp_file.write("\n")
|
|
temp_file.write("\n") # Blank line between items
|
|
|
|
temp_file.close()
|
|
|
|
# Open in editor
|
|
editor = util.editor_command()
|
|
cmd = shlex.split(editor)
|
|
cmd.append(temp_file.name)
|
|
|
|
try:
|
|
subprocess.call(cmd)
|
|
except OSError as exc:
|
|
os.remove(temp_file.name)
|
|
raise ui.UserError(f"could not run editor command {cmd[0]!r}: {exc}")
|
|
|
|
# Read back the edited file
|
|
try:
|
|
with open(temp_file.name, 'r', encoding='utf-8') as f:
|
|
lines = f.readlines()
|
|
finally:
|
|
os.remove(temp_file.name)
|
|
|
|
# Parse the edited labels and apply changes
|
|
# Format: "Artist - Title:\n - key: value\n - key2: value2\n\n"
|
|
changes = []
|
|
i = 0
|
|
item_idx = 0
|
|
|
|
while i < len(lines) and item_idx < len(items_list):
|
|
line = lines[i].strip()
|
|
|
|
# Skip blank lines
|
|
if not line:
|
|
i += 1
|
|
continue
|
|
|
|
# Check if this is an item header (ends with :)
|
|
if line.endswith(':'):
|
|
item = items_list[item_idx]
|
|
item_idx += 1
|
|
i += 1
|
|
|
|
# Now read all label lines for this item
|
|
new_labels = {}
|
|
while i < len(lines):
|
|
label_line = lines[i]
|
|
# Check if this is a label line (starts with " - ")
|
|
if label_line.startswith('- '):
|
|
# Parse: " - key: value"
|
|
label_content = label_line.strip()[2:].strip() # Remove " - "
|
|
if ':' in label_content:
|
|
key, val = label_content.split(':', 1)
|
|
key = key.strip()
|
|
val = val.strip()
|
|
try:
|
|
new_labels[key] = int(val)
|
|
except ValueError:
|
|
print_(f"Warning: Invalid value for label '{key}' in item '{item.title}', skipping")
|
|
else:
|
|
new_labels[label_content.strip()] = 1
|
|
i += 1
|
|
else:
|
|
# Not a label line, done with this item
|
|
break
|
|
|
|
# Compare with old labels
|
|
old_labels = {}
|
|
if LABELS_FIELD_NAME in item and item[LABELS_FIELD_NAME]:
|
|
old_labels = item[LABELS_FIELD_NAME]
|
|
|
|
if new_labels != old_labels:
|
|
changes.append((item, new_labels))
|
|
else:
|
|
# Unexpected format, skip
|
|
i += 1
|
|
|
|
if not changes:
|
|
print_("No changes to make.")
|
|
return
|
|
|
|
# Show changes and confirm
|
|
for item, new_labels in changes:
|
|
old_labels = {}
|
|
if LABELS_FIELD_NAME in item and item[LABELS_FIELD_NAME]:
|
|
old_labels = item[LABELS_FIELD_NAME]
|
|
|
|
old_str = "; ".join([f"{k}:{v}" for k, v in sorted(old_labels.items())]) if old_labels else "(none)"
|
|
new_str = "; ".join([f"{k}:{v}" for k, v in sorted(new_labels.items())]) if new_labels else "(none)"
|
|
|
|
print_(f"{item.artist} - {item.title}")
|
|
print_(f" Old: {old_str}")
|
|
print_(f" New: {new_str}")
|
|
|
|
if not ui.input_yn(f"Apply changes to {len(changes)} item(s)? (y/n)", True):
|
|
return
|
|
|
|
# Apply changes
|
|
with lib.transaction():
|
|
for item, new_labels in changes:
|
|
item[LABELS_FIELD_NAME] = new_labels
|
|
item.try_sync(True, False, False)
|
|
|
|
print_(f"Updated {len(changes)} item(s).")
|
|
|
|
labels_command = Subcommand(
|
|
'labels',
|
|
help='Add, remove, or inspect labels on library items'
|
|
)
|
|
labels_command.parser.usage = """%prog <action> [args] [query]
|
|
|
|
Actions:
|
|
add <label>[:<cost>] [query] Add a label (optionally with a cost value)
|
|
remove <label> [query] Remove a label from items
|
|
removeall [query] Remove all labels from items
|
|
show [query] Display labels on items
|
|
transfer <src-query> <dst-query>
|
|
Copy labels from one item to another
|
|
edit [query] Open items' labels in a text editor"""
|
|
labels_command.func = modify_labels
|
|
|
|
class HasLabelQuery(FieldQuery):
|
|
def __init__(self, _, pattern: str, __):
|
|
super().__init__(LABELS_FIELD_NAME, pattern, False)
|
|
|
|
@classmethod
|
|
def value_match(self, pattern, labels):
|
|
if labels is not None:
|
|
label = pattern
|
|
value = None
|
|
if "." in pattern:
|
|
label, value = pattern.split(".")
|
|
|
|
if value is None:
|
|
return label in labels
|
|
else:
|
|
return label in labels and str(labels[label]) == value
|
|
return False
|
|
|
|
class LabelValueSort(SlowFieldSort):
|
|
def __init__(self, field, ascending=True, case_insensitive=True):
|
|
super().__init__(field, ascending, case_insensitive)
|
|
|
|
# Extract the label key from the field name
|
|
# Field format: "label:<labelkey>" or just "labels"
|
|
if field.startswith("label:"):
|
|
self.label_key = field[len("label:"):]
|
|
else:
|
|
self.label_key = None
|
|
|
|
def sort(self, objs):
|
|
def key(obj):
|
|
labels = obj.get(LABELS_FIELD_NAME, None)
|
|
|
|
if not labels:
|
|
# No labels, return minimum value for sorting
|
|
return float('-inf') if self.ascending else float('inf')
|
|
|
|
if self.label_key:
|
|
value = labels.get(self.label_key, float('-inf') if self.ascending else float('inf'))
|
|
else:
|
|
value = max(labels.values()) if labels else (float('-inf') if self.ascending else float('inf'))
|
|
|
|
return value
|
|
|
|
return sorted(objs, key=key, reverse=not self.ascending)
|