From dfe0cd243b35401a315b6f3779288f3d8629cb17 Mon Sep 17 00:00:00 2001 From: Benson Chu Date: Sat, 4 Apr 2026 12:32:05 -0500 Subject: [PATCH] Split things up better --- plugins/beetslabels.py | 205 +---------------------------------------- plugins/labels.py | 205 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 208 insertions(+), 202 deletions(-) create mode 100644 plugins/labels.py diff --git a/plugins/beetslabels.py b/plugins/beetslabels.py index e40086f..a842bfc 100644 --- a/plugins/beetslabels.py +++ b/plugins/beetslabels.py @@ -1,16 +1,13 @@ from beets import ui from beets.plugins import BeetsPlugin -from beets.ui import Subcommand, print_ -from beets.ui.commands import print_and_modify +from beets.ui import print_ from beets.dbcore import types -from beets.dbcore import FieldQuery -import optparse import json from beets.dbcore import queryparse, query -from beets.dbcore.query import Sort, FieldQueryType, SlowFieldSort +from beets.dbcore.query import Sort, FieldQueryType from beets.dbcore.queryparse import query_from_strings, sort_from_strings from beets.library import LibModel @@ -18,203 +15,7 @@ Prefixes = dict[str, FieldQueryType] import re -LABELS_FIELD_NAME = "mylabels" - -def do_modify_labels(lib, objs, label, action): - changed = [] - - for obj in objs: - labels = {} - if action != 2: - if LABELS_FIELD_NAME in obj: - labels = json.loads(obj[LABELS_FIELD_NAME]) - - split = label.split(":") - - if action == 0: - if len(split) == 1: - labels[label] = 0 - else: - labels[split[0]] = int(split[1]) - elif label in labels: - del labels[label] - - obj_mods = { - LABELS_FIELD_NAME: json.dumps(labels) - } - if print_and_modify(obj, obj_mods, []) and obj not in changed: - changed.append(obj) - - # Still something to do? - if not changed: - print_("No changes to make.") - return - - # Confirm action. - changed = ui.input_select_objects( - "Really modify", - changed, - lambda o: print_and_modify(o, mods, dels), - ) - - # Apply changes to database and files - with lib.transaction(): - for obj in changed: - obj.try_sync(True, False, False) - -action_map = { - "add": 0, - "remove": 1, - "removeall": 2, - "show": 3, - "transfer": 4 -} - -def modify_labels(lib, opts, args): - action = args[0] - - if action not in action_map: - print_("%s is not a valid action. " % action) - print_("Valid actions are: add, remove, removeall, show, transfer") - return - - actnum = action_map[action] - - if actnum == 4: # transfer - transfer_labels(lib, opts, args[1:]) - return - - if actnum >= 2: - label = None - query = args[1:] - else: - label = args[1] - query = args[2:] - - items = lib.items(query=query) - - if actnum == 3: - for obj in items: - if LABELS_FIELD_NAME in obj: - labels = json.loads(obj[LABELS_FIELD_NAME]) - labelstr = ";".join([f"{key}:{labels[key]}" for key in labels.keys()]) - print_(f"{obj.title}: {labelstr}") - else: - do_modify_labels(lib, items, label, actnum) - -def transfer_labels(lib, opts, args): - source_query = args[0] - dest_query = args[1] - - source_items = lib.items(query=source_query) - dest_items = lib.items(query=dest_query) - - source_list = list(source_items) - dest_list = list(dest_items) - - if len(source_list) == 0: - print_("No source items found.") - return - - if len(dest_list) == 0: - print_("No destination items found.") - return - - if len(source_list) > 1: - print_("Multiple source items found. Please refine your query to select one source item.") - for item in source_list: - print_(f" - {item.artist} - {item.title} ({item.path})") - return - - if len(dest_list) > 1: - print_("Multiple destination items found. Please refine your query to select one destination item.") - for item in dest_list: - print_(f" - {item.artist} - {item.title} ({item.path})") - return - - source = source_list[0] - dest = dest_list[0] - - print_(f"Source: {source.artist} - {source.title}") - print_(f"Dest: {dest.artist} - {dest.title}") - print_("") - - if LABELS_FIELD_NAME not in source or not source[LABELS_FIELD_NAME]: - print_("Source has no labels to transfer.") - return - - labels = json.loads(source[LABELS_FIELD_NAME]) - labelstr = ";".join([f"{key}:{labels[key]}" for key in labels.keys()]) - print_(f"Labels to transfer: {labelstr}") - - if LABELS_FIELD_NAME in dest and dest[LABELS_FIELD_NAME]: - dest_labels = json.loads(dest[LABELS_FIELD_NAME]) - dest_labelstr = ";".join([f"{key}:{dest_labels[key]}" for key in dest_labels.keys()]) - print_(f"WARNING: Destination already has labels: {dest_labelstr}") - print_("These will be overwritten!") - print_("") - - if not ui.input_yn("Transfer labels? (y/n)", True): - return - - with lib.transaction(): - dest[LABELS_FIELD_NAME] = source[LABELS_FIELD_NAME] - dest.try_sync(True, False, False) - - print_("Transfer complete.") - -labels_command = Subcommand('labels', help='Add or remove labels') -labels_command.func = modify_labels - -class HasLabelQuery(FieldQuery): - def __init__(self, _, pattern: str, __): - super().__init__(LABELS_FIELD_NAME, pattern, False) - - @classmethod - def value_match(self, pattern, jsonstr): - if jsonstr is not None: - label = pattern - value = None - if "." in pattern: - label,value = pattern.split(".") - - labels = json.loads(jsonstr) - - if value == None: - return label in labels - else: - return label in labels and str(labels[label]) == value - return False - -class LabelValueSort(SlowFieldSort): - def __init__(self, field, ascending=True, case_insensitive=True): - super().__init__(field, ascending, case_insensitive) - - # Extract the label key from the field name - # Field format: "label:" or just "labels" - if field.startswith("label:"): - self.label_key = field[len("label:"):] - else: - self.label_key = None - - def sort(self, objs): - def key(obj): - labels_json = obj.get(LABELS_FIELD_NAME, None) - - if not labels_json: - # No labels, return minimum value for sorting - return float('-inf') if self.ascending else float('inf') - - labels = json.loads(labels_json) - - if self.label_key: - value = labels.get(self.label_key, float('-inf') if self.ascending else float('inf')) - else: - value = max(labels.values()) if labels else (float('-inf') if self.ascending else float('inf')) - - return value - - return sorted(objs, key=key, reverse=not self.ascending) +from .labels import HasLabelQuery, labels_command, LABELS_FIELD_NAME, LabelValueSort class LabelValueSortsDict(dict): """Custom dict that returns LabelValueSort for any label:* key.""" diff --git a/plugins/labels.py b/plugins/labels.py new file mode 100644 index 0000000..0734902 --- /dev/null +++ b/plugins/labels.py @@ -0,0 +1,205 @@ +from beets import ui +from beets.ui import Subcommand, print_ +from beets.ui.commands import print_and_modify +from beets.dbcore import FieldQuery +from beets.dbcore.query import SlowFieldSort + +import json + +LABELS_FIELD_NAME = "mylabels" + +def do_modify_labels(lib, objs, label, action): + changed = [] + + for obj in objs: + labels = {} + if action != 2: + if LABELS_FIELD_NAME in obj: + labels = json.loads(obj[LABELS_FIELD_NAME]) + + split = label.split(":") + + if action == 0: + if len(split) == 1: + labels[label] = 0 + else: + labels[split[0]] = int(split[1]) + elif label in labels: + del labels[label] + + obj_mods = { + LABELS_FIELD_NAME: json.dumps(labels) + } + if print_and_modify(obj, obj_mods, []) and obj not in changed: + changed.append(obj) + + # Still something to do? + if not changed: + print_("No changes to make.") + return + + # Confirm action. + changed = ui.input_select_objects( + "Really modify", + changed, + lambda o: print_and_modify(o, mods, dels), + ) + + # Apply changes to database and files + with lib.transaction(): + for obj in changed: + obj.try_sync(True, False, False) + +action_map = { + "add": 0, + "remove": 1, + "removeall": 2, + "show": 3, + "transfer": 4 +} + +def modify_labels(lib, opts, args): + action = args[0] + + if action not in action_map: + print_("%s is not a valid action. " % action) + print_("Valid actions are: add, remove, removeall, show, transfer") + return + + actnum = action_map[action] + + if actnum == 4: # transfer + transfer_labels(lib, opts, args[1:]) + return + + if actnum >= 2: + label = None + query = args[1:] + else: + label = args[1] + query = args[2:] + + items = lib.items(query=query) + + if actnum == 3: + for obj in items: + if LABELS_FIELD_NAME in obj: + labels = json.loads(obj[LABELS_FIELD_NAME]) + labelstr = ";".join([f"{key}:{labels[key]}" for key in labels.keys()]) + print_(f"{obj.title}: {labelstr}") + else: + do_modify_labels(lib, items, label, actnum) + +def transfer_labels(lib, opts, args): + source_query = args[0] + dest_query = args[1] + + source_items = lib.items(query=source_query) + dest_items = lib.items(query=dest_query) + + source_list = list(source_items) + dest_list = list(dest_items) + + if len(source_list) == 0: + print_("No source items found.") + return + + if len(dest_list) == 0: + print_("No destination items found.") + return + + if len(source_list) > 1: + print_("Multiple source items found. Please refine your query to select one source item.") + for item in source_list: + print_(f" - {item.artist} - {item.title} ({item.path})") + return + + if len(dest_list) > 1: + print_("Multiple destination items found. Please refine your query to select one destination item.") + for item in dest_list: + print_(f" - {item.artist} - {item.title} ({item.path})") + return + + source = source_list[0] + dest = dest_list[0] + + print_(f"Source: {source.artist} - {source.title}") + print_(f"Dest: {dest.artist} - {dest.title}") + print_("") + + if LABELS_FIELD_NAME not in source or not source[LABELS_FIELD_NAME]: + print_("Source has no labels to transfer.") + return + + labels = json.loads(source[LABELS_FIELD_NAME]) + labelstr = ";".join([f"{key}:{labels[key]}" for key in labels.keys()]) + print_(f"Labels to transfer: {labelstr}") + + if LABELS_FIELD_NAME in dest and dest[LABELS_FIELD_NAME]: + dest_labels = json.loads(dest[LABELS_FIELD_NAME]) + dest_labelstr = ";".join([f"{key}:{dest_labels[key]}" for key in dest_labels.keys()]) + print_(f"WARNING: Destination already has labels: {dest_labelstr}") + print_("These will be overwritten!") + print_("") + + if not ui.input_yn("Transfer labels? (y/n)", True): + return + + with lib.transaction(): + dest[LABELS_FIELD_NAME] = source[LABELS_FIELD_NAME] + dest.try_sync(True, False, False) + + print_("Transfer complete.") + +labels_command = Subcommand('labels', help='Add or remove labels') +labels_command.func = modify_labels + +class HasLabelQuery(FieldQuery): + def __init__(self, _, pattern: str, __): + super().__init__(LABELS_FIELD_NAME, pattern, False) + + @classmethod + def value_match(self, pattern, jsonstr): + if jsonstr is not None: + label = pattern + value = None + if "." in pattern: + label,value = pattern.split(".") + + labels = json.loads(jsonstr) + + if value == None: + return label in labels + else: + return label in labels and str(labels[label]) == value + return False + +class LabelValueSort(SlowFieldSort): + def __init__(self, field, ascending=True, case_insensitive=True): + super().__init__(field, ascending, case_insensitive) + + # Extract the label key from the field name + # Field format: "label:" or just "labels" + if field.startswith("label:"): + self.label_key = field[len("label:"):] + else: + self.label_key = None + + def sort(self, objs): + def key(obj): + labels_json = obj.get(LABELS_FIELD_NAME, None) + + if not labels_json: + # No labels, return minimum value for sorting + return float('-inf') if self.ascending else float('inf') + + labels = json.loads(labels_json) + + if self.label_key: + value = labels.get(self.label_key, float('-inf') if self.ascending else float('inf')) + else: + value = max(labels.values()) if labels else (float('-inf') if self.ascending else float('inf')) + + return value + + return sorted(objs, key=key, reverse=not self.ascending)