from beets import ui from beets.ui import Subcommand, print_ from beets.ui.commands import print_and_modify from beets.dbcore import FieldQuery from beets.dbcore.query import SlowFieldSort import json import os import subprocess import shlex from tempfile import NamedTemporaryFile from beets import util LABELS_FIELD_NAME = "mylabels" def do_modify_labels(lib, objs, label, action): changed = [] for obj in objs: labels = {} if action != 2: if LABELS_FIELD_NAME in obj: labels = json.loads(obj[LABELS_FIELD_NAME]) split = label.split(":") if action == 0: if len(split) == 1: labels[label] = 1 else: labels[split[0]] = int(split[1]) elif label in labels: del labels[label] obj_mods = { LABELS_FIELD_NAME: json.dumps(labels) } if print_and_modify(obj, obj_mods, []) and obj not in changed: changed.append(obj) # Still something to do? if not changed: print_("No changes to make.") return # Confirm action. changed = ui.input_select_objects( "Really modify", changed, lambda o: print_and_modify(o, mods, dels), ) # Apply changes to database and files with lib.transaction(): for obj in changed: obj.try_sync(True, False, False) action_map = { "add": 0, "remove": 1, "removeall": 2, "show": 3, "transfer": 4, "edit": 5 } def modify_labels(lib, opts, args): action = args[0] if action not in action_map: print_("%s is not a valid action. " % action) print_("Valid actions are: add, remove, removeall, show, transfer, edit") return actnum = action_map[action] if actnum == 4: # transfer transfer_labels(lib, opts, args[1:]) return if actnum == 5: # edit edit_labels(lib, opts, args[1:]) return if actnum >= 2: label = None query = args[1:] else: label = args[1] query = args[2:] items = lib.items(query=query) if actnum == 3: for obj in items: if LABELS_FIELD_NAME in obj: labels = json.loads(obj[LABELS_FIELD_NAME]) labelstr = ";".join([f"{key}:{labels[key]}" for key in labels.keys()]) print_(f"{obj.title}: {labelstr}") else: do_modify_labels(lib, items, label, actnum) def transfer_labels(lib, opts, args): source_query = args[0] dest_query = args[1] source_items = lib.items(query=source_query) dest_items = lib.items(query=dest_query) source_list = list(source_items) dest_list = list(dest_items) if len(source_list) == 0: print_("No source items found.") return if len(dest_list) == 0: print_("No destination items found.") return if len(source_list) > 1: print_("Multiple source items found. Please refine your query to select one source item.") for item in source_list: print_(f" - {item.artist} - {item.title} ({item.path})") return if len(dest_list) > 1: print_("Multiple destination items found. Please refine your query to select one destination item.") for item in dest_list: print_(f" - {item.artist} - {item.title} ({item.path})") return source = source_list[0] dest = dest_list[0] print_(f"Source: {source.artist} - {source.title}") print_(f"Dest: {dest.artist} - {dest.title}") print_("") if LABELS_FIELD_NAME not in source or not source[LABELS_FIELD_NAME]: print_("Source has no labels to transfer.") return labels = json.loads(source[LABELS_FIELD_NAME]) labelstr = ";".join([f"{key}:{labels[key]}" for key in labels.keys()]) print_(f"Labels to transfer: {labelstr}") if LABELS_FIELD_NAME in dest and dest[LABELS_FIELD_NAME]: dest_labels = json.loads(dest[LABELS_FIELD_NAME]) dest_labelstr = ";".join([f"{key}:{dest_labels[key]}" for key in dest_labels.keys()]) print_(f"WARNING: Destination already has labels: {dest_labelstr}") print_("These will be overwritten!") print_("") if not ui.input_yn("Transfer labels? (y/n)", True): return with lib.transaction(): dest[LABELS_FIELD_NAME] = source[LABELS_FIELD_NAME] dest.try_sync(True, False, False) print_("Transfer complete.") def edit_labels(lib, opts, args): """Edit labels for items matching the query in a text editor.""" query = args items = lib.items(query=query) items_list = list(items) if not items_list: print_("No items found.") return # Create a temporary file with labels temp_file = NamedTemporaryFile(mode='w', suffix='.txt', delete=False, encoding='utf-8') # Write items and their labels to the temp file for item in items_list: labels = {} if LABELS_FIELD_NAME in item and item[LABELS_FIELD_NAME]: labels = json.loads(item[LABELS_FIELD_NAME]) # Format: artist - title:\n - key: value if item.artist: temp_file.write(f"{item.artist} - {item.title}:\n") else: temp_file.write(f"{item.title}:\n") for key in sorted(labels.keys()): temp_file.write(f"- {key}") if labels[key] > 1: temp_file.write(f": {labels[key]}") temp_file.write("\n") temp_file.write("\n") # Blank line between items temp_file.close() # Open in editor editor = util.editor_command() cmd = shlex.split(editor) cmd.append(temp_file.name) try: subprocess.call(cmd) except OSError as exc: os.remove(temp_file.name) raise ui.UserError(f"could not run editor command {cmd[0]!r}: {exc}") # Read back the edited file try: with open(temp_file.name, 'r', encoding='utf-8') as f: lines = f.readlines() finally: os.remove(temp_file.name) # Parse the edited labels and apply changes # Format: "Artist - Title:\n - key: value\n - key2: value2\n\n" changes = [] i = 0 item_idx = 0 while i < len(lines) and item_idx < len(items_list): line = lines[i].strip() # Skip blank lines if not line: i += 1 continue # Check if this is an item header (ends with :) if line.endswith(':'): item = items_list[item_idx] item_idx += 1 i += 1 # Now read all label lines for this item new_labels = {} while i < len(lines): label_line = lines[i] # Check if this is a label line (starts with " - ") if label_line.startswith('- '): # Parse: " - key: value" label_content = label_line.strip()[2:].strip() # Remove " - " if ':' in label_content: key, val = label_content.split(':', 1) key = key.strip() val = val.strip() try: new_labels[key] = int(val) except ValueError: print_(f"Warning: Invalid value for label '{key}' in item '{item.title}', skipping") else: new_labels[label_content.strip()] = 1 i += 1 else: # Not a label line, done with this item break # Compare with old labels old_labels = {} if LABELS_FIELD_NAME in item and item[LABELS_FIELD_NAME]: old_labels = json.loads(item[LABELS_FIELD_NAME]) if new_labels != old_labels: changes.append((item, new_labels)) else: # Unexpected format, skip i += 1 if not changes: print_("No changes to make.") return # Show changes and confirm for item, new_labels in changes: old_labels = {} if LABELS_FIELD_NAME in item and item[LABELS_FIELD_NAME]: old_labels = json.loads(item[LABELS_FIELD_NAME]) old_str = ";".join([f"{k}:{v}" for k, v in sorted(old_labels.items())]) if old_labels else "(none)" new_str = ";".join([f"{k}:{v}" for k, v in sorted(new_labels.items())]) if new_labels else "(none)" print_(f"{item.artist} - {item.title}") print_(f" Old: {old_str}") print_(f" New: {new_str}") if not ui.input_yn(f"Apply changes to {len(changes)} item(s)? (y/n)", True): return # Apply changes with lib.transaction(): for item, new_labels in changes: item[LABELS_FIELD_NAME] = json.dumps(new_labels) item.try_sync(True, False, False) print_(f"Updated {len(changes)} item(s).") labels_command = Subcommand('labels', help='Add or remove labels') labels_command.func = modify_labels class HasLabelQuery(FieldQuery): def __init__(self, _, pattern: str, __): super().__init__(LABELS_FIELD_NAME, pattern, False) @classmethod def value_match(self, pattern, jsonstr): if jsonstr is not None: label = pattern value = None if "." in pattern: label,value = pattern.split(".") labels = json.loads(jsonstr) if value == None: return label in labels else: return label in labels and str(labels[label]) == value return False class LabelValueSort(SlowFieldSort): def __init__(self, field, ascending=True, case_insensitive=True): super().__init__(field, ascending, case_insensitive) # Extract the label key from the field name # Field format: "label:" or just "labels" if field.startswith("label:"): self.label_key = field[len("label:"):] else: self.label_key = None def sort(self, objs): def key(obj): labels_json = obj.get(LABELS_FIELD_NAME, None) if not labels_json: # No labels, return minimum value for sorting return float('-inf') if self.ascending else float('inf') labels = json.loads(labels_json) if self.label_key: value = labels.get(self.label_key, float('-inf') if self.ascending else float('inf')) else: value = max(labels.values()) if labels else (float('-inf') if self.ascending else float('inf')) return value return sorted(objs, key=key, reverse=not self.ascending)