from beets import ui from beets.ui import Subcommand, print_ from beets.ui.commands.modify import print_and_modify from beets.dbcore import FieldQuery from beets.dbcore.query import SlowFieldSort import re import json import os import subprocess import shlex from tempfile import NamedTemporaryFile from beets import util LABELS_FIELD_NAME = "mylabels" def do_modify_labels(lib, objs, label, action): changed = [] for obj in objs: labels = {} if action != 2: if LABELS_FIELD_NAME in obj: labels = dict(obj[LABELS_FIELD_NAME]) # Make a copy split = label.split(":") if action == 0: if len(split) == 1: labels[label] = 1 else: labels[split[0]] = int(split[1]) elif label in labels: del labels[label] obj_mods = { LABELS_FIELD_NAME: labels } if print_and_modify(obj, obj_mods, []) and obj not in changed: changed.append(obj) # Still something to do? if not changed: print_("No changes to make.") return # Confirm action. changed = ui.input_select_objects( "Really modify", changed, lambda o: print_and_modify(o, mods, dels), ) # Apply changes to database and files with lib.transaction(): for obj in changed: obj.try_sync(True, False, False) action_map = { "add": 0, "remove": 1, "removeall": 2, "show": 3, "transfer": 4, "edit": 5, "listall": 6, } def modify_labels(lib, opts, args): if not args: labels_command.parser.print_usage() return action = args[0] if action not in action_map: print_("%s is not a valid action. Run 'beet labels --help' for usage." % action) return actnum = action_map[action] if actnum == 4: # transfer transfer_labels(lib, opts, args[1:]) return if actnum == 5: # edit edit_labels(lib, opts, args[1:]) return if actnum == 6: # listall listall_labels(lib, opts, args[1:]) return if actnum >= 2: label = None query = args[1:] else: label = args[1] query = args[2:] items = lib.items(query=query) if actnum == 3: for obj in items: if LABELS_FIELD_NAME in obj: labels = obj[LABELS_FIELD_NAME] labelstr = "; ".join([f"{key}:{labels[key]}" for key in labels.keys()]) print_(f"{obj.title}: {labelstr}") else: do_modify_labels(lib, items, label, actnum) def listall_labels(lib, opts, args): """Print every distinct label key used across all items.""" all_labels = set() for item in lib.items(): if LABELS_FIELD_NAME in item and item[LABELS_FIELD_NAME]: all_labels.update(item[LABELS_FIELD_NAME].keys()) for label in sorted(all_labels): print_(label) def transfer_labels(lib, opts, args): source_query = args[0] dest_query = args[1] source_items = lib.items(query=source_query) dest_items = lib.items(query=dest_query) source_list = list(source_items) dest_list = list(dest_items) if len(source_list) == 0: print_("No source items found.") return if len(dest_list) == 0: print_("No destination items found.") return if len(source_list) > 1: print_("Multiple source items found. Please refine your query to select one source item.") for item in source_list: print_(f" - {item.artist} - {item.title} ({item.path})") return if len(dest_list) > 1: print_("Multiple destination items found. Please refine your query to select one destination item.") for item in dest_list: print_(f" - {item.artist} - {item.title} ({item.path})") return source = source_list[0] dest = dest_list[0] print_(f"Source: {source.artist} - {source.title}") print_(f"Dest: {dest.artist} - {dest.title}") print_("") if LABELS_FIELD_NAME not in source or not source[LABELS_FIELD_NAME]: print_("Source has no labels to transfer.") return labels = source[LABELS_FIELD_NAME] labelstr = "; ".join([f"{key}:{labels[key]}" for key in labels.keys()]) print_(f"Labels to transfer: {labelstr}") if LABELS_FIELD_NAME in dest and dest[LABELS_FIELD_NAME]: dest_labels = dest[LABELS_FIELD_NAME] dest_labelstr = "; ".join([f"{key}:{dest_labels[key]}" for key in dest_labels.keys()]) print_(f"WARNING: Destination already has labels: {dest_labelstr}") print_("These will be overwritten!") print_("") if not ui.input_yn("Transfer labels? (y/n)", True): return with lib.transaction(): dest[LABELS_FIELD_NAME] = source[LABELS_FIELD_NAME] dest.try_sync(True, False, False) print_("Transfer complete.") def edit_labels(lib, opts, args): """Edit labels for items matching the query in a text editor.""" query = args items = lib.items(query=query) items_list = list(items) if not items_list: print_("No items found.") return # Create a temporary file with labels temp_file = NamedTemporaryFile(mode='w', suffix='.txt', delete=False, encoding='utf-8') # Write items and their labels to the temp file for item in items_list: labels = {} if LABELS_FIELD_NAME in item and item[LABELS_FIELD_NAME]: labels = item[LABELS_FIELD_NAME] # Format: artist - title:\n - key: value if item.artist: temp_file.write(f"{item.artist} - {item.title}:\n") else: temp_file.write(f"{item.title}:\n") for key in sorted(labels.keys()): temp_file.write(f"- {key}") if labels[key] > 1: temp_file.write(f": {labels[key]}") temp_file.write("\n") temp_file.write("\n") # Blank line between items temp_file.close() # Open in editor editor = util.editor_command() cmd = shlex.split(editor) cmd.append(temp_file.name) try: subprocess.call(cmd) except OSError as exc: os.remove(temp_file.name) raise ui.UserError(f"could not run editor command {cmd[0]!r}: {exc}") # Read back the edited file try: with open(temp_file.name, 'r', encoding='utf-8') as f: lines = f.readlines() finally: os.remove(temp_file.name) # Build a lookup from header string to item def item_header(item): if item.artist: return f"{item.artist} - {item.title}:" return f"{item.title}:" item_by_header = {item_header(item): item for item in items_list} # Parse the edited labels and apply changes # Format: "Artist - Title:\n - key: value\n - key2: value2\n\n" changes = [] i = 0 while i < len(lines): line = lines[i].strip() # Skip blank lines if not line: i += 1 continue # Check if this is an item header (ends with :) if line.endswith(':'): item = item_by_header.get(line) i += 1 # Now read all label lines for this item new_labels = {} while i < len(lines): label_line = lines[i] # Check if this is a label line (starts with "- ") if label_line.startswith('- '): # Parse: "- key: value" label_content = label_line.strip()[2:].strip() # Remove "- " if ':' in label_content: key, val = label_content.split(':', 1) key = key.strip() val = val.strip() try: new_labels[key] = int(val) except ValueError: if item: print_(f"Warning: Invalid value for label '{key}' in item '{item.title}', skipping") else: new_labels[label_content.strip()] = 1 i += 1 else: # Not a label line, done with this item break if item is None: print_(f"Warning: Header '{line}' not found in query results, skipping") continue # Compare with old labels old_labels = {} if LABELS_FIELD_NAME in item and item[LABELS_FIELD_NAME]: old_labels = item[LABELS_FIELD_NAME] if new_labels != old_labels: changes.append((item, new_labels)) else: # Unexpected format, skip i += 1 # Items removed from the file get their labels cleared headers_in_file = set() i = 0 while i < len(lines): line = lines[i].strip() if line.endswith(':') and line in item_by_header: headers_in_file.add(line) i += 1 for item in items_list: header = item_header(item) if header not in headers_in_file: old_labels = {} if LABELS_FIELD_NAME in item and item[LABELS_FIELD_NAME]: old_labels = item[LABELS_FIELD_NAME] if old_labels: changes.append((item, {})) if not changes: print_("No changes to make.") return # Show changes and confirm for item, new_labels in changes: old_labels = {} if LABELS_FIELD_NAME in item and item[LABELS_FIELD_NAME]: old_labels = item[LABELS_FIELD_NAME] old_str = "; ".join([f"{k}:{v}" for k, v in sorted(old_labels.items())]) if old_labels else "(none)" new_str = "; ".join([f"{k}:{v}" for k, v in sorted(new_labels.items())]) if new_labels else "(none)" print_(f"{item.artist} - {item.title}") print_(f" Old: {old_str}") print_(f" New: {new_str}") if not ui.input_yn(f"Apply changes to {len(changes)} item(s)? (y/n)", True): return # Apply changes with lib.transaction(): for item, new_labels in changes: item[LABELS_FIELD_NAME] = new_labels item.try_sync(True, False, False) print_(f"Updated {len(changes)} item(s).") labels_command = Subcommand( 'labels', help='Add, remove, or inspect labels on library items' ) labels_command.parser.usage = """%prog [args] [query] Actions: add