Skip to content

Move queries and types to respective modules #5775

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,5 @@ f36bc497c8c8f89004f3f6879908d3f0b25123e1
# 2025
# Fix formatting
c490ac5810b70f3cf5fd8649669838e8fdb19f4d
# Copy paste query, types from library to dbcore
40b34e5a7d82e30d9e787885cab6e8d877a055cf
102 changes: 100 additions & 2 deletions beets/dbcore/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from __future__ import annotations

import os
import re
import unicodedata
from abc import ABC, abstractmethod
Expand All @@ -29,13 +30,19 @@
from beets import util

if TYPE_CHECKING:
from beets.dbcore import Model
from beets.dbcore.db import AnyModel
from beets.dbcore.db import AnyModel, Model

P = TypeVar("P", default=Any)
else:
P = TypeVar("P")

# To use the SQLite "blob" type, it doesn't suffice to provide a byte
# string; SQLite treats that as encoded text. Wrapping it in a
# `memoryview` tells it that we actually mean non-text data.
# needs to be defined in here due to circular import.
# TODO: remove it from this module and define it in dbcore/types.py instead
BLOB_TYPE = memoryview


class ParsingError(ValueError):
"""Abstract class for any unparsable user-requested album/query
Expand Down Expand Up @@ -267,6 +274,79 @@ def string_match(cls, pattern: str, value: str) -> bool:
return pattern.lower() in value.lower()


class PathQuery(FieldQuery[bytes]):
"""A query that matches all items under a given path.

Matching can either be case-insensitive or case-sensitive. By
default, the behavior depends on the OS: case-insensitive on Windows
and case-sensitive otherwise.
"""

def __init__(self, field: str, pattern: bytes, fast: bool = True) -> None:
"""Create a path query.

`pattern` must be a path, either to a file or a directory.
"""
path = util.normpath(pattern)

# Case sensitivity depends on the filesystem that the query path is located on.
self.case_sensitive = util.case_sensitive(path)

# Use a normalized-case pattern for case-insensitive matches.
if not self.case_sensitive:
# We need to lowercase the entire path, not just the pattern.
# In particular, on Windows, the drive letter is otherwise not
# lowercased.
# This also ensures that the `match()` method below and the SQL
# from `col_clause()` do the same thing.
path = path.lower()

super().__init__(field, path, fast)

@staticmethod
def is_path_query(query_part: str) -> bool:
"""Try to guess whether a unicode query part is a path query.

Condition: separator precedes colon and the file exists.
"""
colon = query_part.find(":")
if colon != -1:
query_part = query_part[:colon]

# Test both `sep` and `altsep` (i.e., both slash and backslash on
# Windows).
if not (
os.sep in query_part or (os.altsep and os.altsep in query_part)
):
return False

return os.path.exists(util.syspath(util.normpath(query_part)))

def match(self, obj: Model) -> bool:
path = obj.path if self.case_sensitive else obj.path.lower()
return path.startswith(self.pattern)

def col_clause(self) -> tuple[str, Sequence[SQLiteType]]:
if self.case_sensitive:
query_part = "({0} = ?) || (substr({0}, 1, ?) = ?)"
else:
query_part = "(BYTELOWER({0}) = BYTELOWER(?)) || \
(substr(BYTELOWER({0}), 1, ?) = BYTELOWER(?))"

dir_blob = BLOB_TYPE(os.path.join(self.pattern, b""))
return query_part.format(self.field), (
BLOB_TYPE(self.pattern),
len(dir_blob),
dir_blob,
)

def __repr__(self) -> str:
return (
f"{self.__class__.__name__}({self.field!r}, {self.pattern!r}, "
f"fast={self.fast}, case_sensitive={self.case_sensitive})"
)


class RegexpQuery(StringFieldQuery[Pattern[str]]):
"""A query that matches a regular expression in a specific Model field.

Expand Down Expand Up @@ -844,6 +924,24 @@ def _convert(self, s: str) -> float | None:
)


class SingletonQuery(FieldQuery[str]):
"""This query is responsible for the 'singleton' lookup.

It is based on the FieldQuery and constructs a SQL clause
'album_id is NULL' which yields the same result as the previous filter
in Python but is more performant since it's done in SQL.

Using util.str2bool ensures that lookups like singleton:true, singleton:1
and singleton:false, singleton:0 are handled consistently.
"""

def __new__(cls, field: str, value: str, *args, **kwargs):
query = NoneQuery("album_id")
if util.str2bool(value):
return query
return NotQuery(query)


# Sorting.


Expand Down
157 changes: 143 additions & 14 deletions beets/dbcore/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,19 @@

from __future__ import annotations

import re
import time
import typing
from abc import ABC
from typing import TYPE_CHECKING, Any, Generic, TypeVar, cast

from beets.util import str2bool
import beets
from beets import util

from .query import (
BooleanQuery,
FieldQueryType,
NumericQuery,
SQLiteType,
SubstringQuery,
)
from . import query

SQLiteType = query.SQLiteType
BLOB_TYPE = query.BLOB_TYPE


class ModelType(typing.Protocol):
Expand Down Expand Up @@ -61,7 +61,7 @@ class Type(ABC, Generic[T, N]):
"""The SQLite column type for the value.
"""

query: FieldQueryType = SubstringQuery
query: query.FieldQueryType = query.SubstringQuery
"""The `Query` subclass to be used when querying the field.
"""

Expand Down Expand Up @@ -160,7 +160,7 @@ class BaseInteger(Type[int, N]):
"""A basic integer type."""

sql = "INTEGER"
query = NumericQuery
query = query.NumericQuery
model_type = int

def normalize(self, value: Any) -> int | N:
Expand Down Expand Up @@ -241,7 +241,7 @@ class BaseFloat(Type[float, N]):
"""

sql = "REAL"
query: FieldQueryType = NumericQuery
query: query.FieldQueryType = query.NumericQuery
model_type = float

def __init__(self, digits: int = 1):
Expand Down Expand Up @@ -271,7 +271,7 @@ class BaseString(Type[T, N]):
"""A Unicode string type."""

sql = "TEXT"
query = SubstringQuery
query = query.SubstringQuery

def normalize(self, value: Any) -> T | N:
if value is None:
Expand Down Expand Up @@ -312,14 +312,142 @@ class Boolean(Type):
"""A boolean type."""

sql = "INTEGER"
query = BooleanQuery
query = query.BooleanQuery
model_type = bool

def format(self, value: bool) -> str:
return str(bool(value))

def parse(self, string: str) -> bool:
return str2bool(string)
return util.str2bool(string)


class DateType(Float):
# TODO representation should be `datetime` object
# TODO distinguish between date and time types
query = query.DateQuery

def format(self, value):
return time.strftime(
beets.config["time_format"].as_str(), time.localtime(value or 0)
)

def parse(self, string):
try:
# Try a formatted date string.
return time.mktime(
time.strptime(string, beets.config["time_format"].as_str())
)
except ValueError:
# Fall back to a plain timestamp number.
try:
return float(string)
except ValueError:
return self.null


class BasePathType(Type[bytes, N]):
"""A dbcore type for filesystem paths.

These are represented as `bytes` objects, in keeping with
the Unix filesystem abstraction.
"""

sql = "BLOB"
query = query.PathQuery
model_type = bytes

def parse(self, string: str) -> bytes:
return util.normpath(string)

def normalize(self, value: Any) -> bytes | N:
if isinstance(value, str):
# Paths stored internally as encoded bytes.
return util.bytestring_path(value)

elif isinstance(value, BLOB_TYPE):
# We unwrap buffers to bytes.
return bytes(value)

else:
return value

def to_sql(self, value: bytes) -> BLOB_TYPE:
if isinstance(value, bytes):
value = BLOB_TYPE(value)
return value


class NullPathType(BasePathType[None]):
@property
def null(self) -> None:
return None

def format(self, value: bytes | None) -> str:
return util.displayable_path(value or b"")


class PathType(BasePathType[bytes]):
@property
def null(self) -> bytes:
return b""

def format(self, value: bytes) -> str:
return util.displayable_path(value or b"")


class MusicalKey(String):
"""String representing the musical key of a song.

The standard format is C, Cm, C#, C#m, etc.
"""

ENHARMONIC = {
r"db": "c#",
r"eb": "d#",
r"gb": "f#",
r"ab": "g#",
r"bb": "a#",
}

null = None

def parse(self, key):
key = key.lower()
for flat, sharp in self.ENHARMONIC.items():
key = re.sub(flat, sharp, key)
key = re.sub(r"[\W\s]+minor", "m", key)
key = re.sub(r"[\W\s]+major", "", key)
return key.capitalize()

def normalize(self, key):
if key is None:
return None
else:
return self.parse(key)


class DurationType(Float):
"""Human-friendly (M:SS) representation of a time interval."""

query = query.DurationQuery

def format(self, value):
if not beets.config["format_raw_length"].get(bool):
return util.human_seconds_short(value or 0.0)
else:
return value

def parse(self, string):
try:
# Try to format back hh:ss to seconds.
return util.raw_seconds_short(string)
except ValueError:
# Fall back to a plain float.
try:
return float(string)
except ValueError:
return self.null


# Shared instances of common types.
Expand All @@ -331,6 +459,7 @@ def parse(self, string: str) -> bool:
NULL_FLOAT = NullFloat()
STRING = String()
BOOLEAN = Boolean()
DATE = DateType()
SEMICOLON_SPACE_DSV = DelimitedString(delimiter="; ")

# Will set the proper null char in mediafile
Expand Down
Loading
Loading