diff --git a/comfy_api/latest/_ui.py b/comfy_api/latest/_ui.py index b0bbabe2ab24..98f73e47de8e 100644 --- a/comfy_api/latest/_ui.py +++ b/comfy_api/latest/_ui.py @@ -318,9 +318,10 @@ def save_audio( for key, value in metadata.items(): output_container.metadata[key] = value + layout = "mono" if waveform.shape[0] == 1 else "stereo" # Set up the output stream with appropriate properties if format == "opus": - out_stream = output_container.add_stream("libopus", rate=sample_rate) + out_stream = output_container.add_stream("libopus", rate=sample_rate, layout=layout) if quality == "64k": out_stream.bit_rate = 64000 elif quality == "96k": @@ -332,7 +333,7 @@ def save_audio( elif quality == "320k": out_stream.bit_rate = 320000 elif format == "mp3": - out_stream = output_container.add_stream("libmp3lame", rate=sample_rate) + out_stream = output_container.add_stream("libmp3lame", rate=sample_rate, layout=layout) if quality == "V0": # TODO i would really love to support V3 and V5 but there doesn't seem to be a way to set the qscale level, the property below is a bool out_stream.codec_context.qscale = 1 @@ -341,12 +342,12 @@ def save_audio( elif quality == "320k": out_stream.bit_rate = 320000 else: # format == "flac": - out_stream = output_container.add_stream("flac", rate=sample_rate) + out_stream = output_container.add_stream("flac", rate=sample_rate, layout=layout) frame = av.AudioFrame.from_ndarray( waveform.movedim(0, 1).reshape(1, -1).float().numpy(), format="flt", - layout="mono" if waveform.shape[0] == 1 else "stereo", + layout=layout, ) frame.sample_rate = sample_rate frame.pts = 0 diff --git a/comfy_extras/nodes_audio.py b/comfy_extras/nodes_audio.py index 2ed7e0b228fc..812301fb7e5f 100644 --- a/comfy_extras/nodes_audio.py +++ b/comfy_extras/nodes_audio.py @@ -6,65 +6,80 @@ import comfy.model_management import folder_paths import os -import io -import json -import random import hashlib import node_helpers import logging -from comfy.cli_args import args -from comfy.comfy_types import FileLocator - -class EmptyLatentAudio: - def __init__(self): - self.device = comfy.model_management.intermediate_device() +from typing_extensions import override +from comfy_api.latest import ComfyExtension, IO, UI +class EmptyLatentAudio(IO.ComfyNode): @classmethod - def INPUT_TYPES(s): - return {"required": {"seconds": ("FLOAT", {"default": 47.6, "min": 1.0, "max": 1000.0, "step": 0.1}), - "batch_size": ("INT", {"default": 1, "min": 1, "max": 4096, "tooltip": "The number of latent images in the batch."}), - }} - RETURN_TYPES = ("LATENT",) - FUNCTION = "generate" - - CATEGORY = "latent/audio" - - def generate(self, seconds, batch_size): - length = round((seconds * 44100 / 2048) / 2) * 2 - latent = torch.zeros([batch_size, 64, length], device=self.device) - return ({"samples":latent, "type": "audio"}, ) + def define_schema(cls): + return IO.Schema( + node_id="EmptyLatentAudio", + display_name="Empty Latent Audio", + category="latent/audio", + inputs=[ + IO.Float.Input("seconds", default=47.6, min=1.0, max=1000.0, step=0.1), + IO.Int.Input( + "batch_size", default=1, min=1, max=4096, tooltip="The number of latent images in the batch." + ), + ], + outputs=[IO.Latent.Output()], + ) -class ConditioningStableAudio: @classmethod - def INPUT_TYPES(s): - return {"required": {"positive": ("CONDITIONING", ), - "negative": ("CONDITIONING", ), - "seconds_start": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1000.0, "step": 0.1}), - "seconds_total": ("FLOAT", {"default": 47.0, "min": 0.0, "max": 1000.0, "step": 0.1}), - }} + def execute(cls, seconds, batch_size) -> IO.NodeOutput: + length = round((seconds * 44100 / 2048) / 2) * 2 + latent = torch.zeros([batch_size, 64, length], device=comfy.model_management.intermediate_device()) + return IO.NodeOutput({"samples":latent, "type": "audio"}) - RETURN_TYPES = ("CONDITIONING","CONDITIONING") - RETURN_NAMES = ("positive", "negative") + generate = execute # TODO: remove - FUNCTION = "append" - CATEGORY = "conditioning" +class ConditioningStableAudio(IO.ComfyNode): + @classmethod + def define_schema(cls): + return IO.Schema( + node_id="ConditioningStableAudio", + category="conditioning", + inputs=[ + IO.Conditioning.Input("positive"), + IO.Conditioning.Input("negative"), + IO.Float.Input("seconds_start", default=0.0, min=0.0, max=1000.0, step=0.1), + IO.Float.Input("seconds_total", default=47.0, min=0.0, max=1000.0, step=0.1), + ], + outputs=[ + IO.Conditioning.Output(display_name="positive"), + IO.Conditioning.Output(display_name="negative"), + ], + ) - def append(self, positive, negative, seconds_start, seconds_total): + @classmethod + def execute(cls, positive, negative, seconds_start, seconds_total) -> IO.NodeOutput: positive = node_helpers.conditioning_set_values(positive, {"seconds_start": seconds_start, "seconds_total": seconds_total}) negative = node_helpers.conditioning_set_values(negative, {"seconds_start": seconds_start, "seconds_total": seconds_total}) - return (positive, negative) + return IO.NodeOutput(positive, negative) + + append = execute # TODO: remove -class VAEEncodeAudio: - @classmethod - def INPUT_TYPES(s): - return {"required": { "audio": ("AUDIO", ), "vae": ("VAE", )}} - RETURN_TYPES = ("LATENT",) - FUNCTION = "encode" - CATEGORY = "latent/audio" +class VAEEncodeAudio(IO.ComfyNode): + @classmethod + def define_schema(cls): + return IO.Schema( + node_id="VAEEncodeAudio", + display_name="VAE Encode Audio", + category="latent/audio", + inputs=[ + IO.Audio.Input("audio"), + IO.Vae.Input("vae"), + ], + outputs=[IO.Latent.Output()], + ) - def encode(self, vae, audio): + @classmethod + def execute(cls, vae, audio) -> IO.NodeOutput: sample_rate = audio["sample_rate"] if 44100 != sample_rate: waveform = torchaudio.functional.resample(audio["waveform"], sample_rate, 44100) @@ -72,213 +87,134 @@ def encode(self, vae, audio): waveform = audio["waveform"] t = vae.encode(waveform.movedim(1, -1)) - return ({"samples":t}, ) + return IO.NodeOutput({"samples":t}) + + encode = execute # TODO: remove -class VAEDecodeAudio: - @classmethod - def INPUT_TYPES(s): - return {"required": { "samples": ("LATENT", ), "vae": ("VAE", )}} - RETURN_TYPES = ("AUDIO",) - FUNCTION = "decode" - CATEGORY = "latent/audio" +class VAEDecodeAudio(IO.ComfyNode): + @classmethod + def define_schema(cls): + return IO.Schema( + node_id="VAEDecodeAudio", + display_name="VAE Decode Audio", + category="latent/audio", + inputs=[ + IO.Latent.Input("samples"), + IO.Vae.Input("vae"), + ], + outputs=[IO.Audio.Output()], + ) - def decode(self, vae, samples): + @classmethod + def execute(cls, vae, samples) -> IO.NodeOutput: audio = vae.decode(samples["samples"]).movedim(-1, 1) std = torch.std(audio, dim=[1,2], keepdim=True) * 5.0 std[std < 1.0] = 1.0 audio /= std - return ({"waveform": audio, "sample_rate": 44100}, ) + return IO.NodeOutput({"waveform": audio, "sample_rate": 44100}) + decode = execute # TODO: remove -def save_audio(self, audio, filename_prefix="ComfyUI", format="flac", prompt=None, extra_pnginfo=None, quality="128k"): - filename_prefix += self.prefix_append - full_output_folder, filename, counter, subfolder, filename_prefix = folder_paths.get_save_image_path(filename_prefix, self.output_dir) - results: list[FileLocator] = [] +class SaveAudio(IO.ComfyNode): + @classmethod + def define_schema(cls): + return IO.Schema( + node_id="SaveAudio", + display_name="Save Audio (FLAC)", + category="audio", + inputs=[ + IO.Audio.Input("audio"), + IO.String.Input("filename_prefix", default="audio/ComfyUI"), + ], + hidden=[IO.Hidden.prompt, IO.Hidden.extra_pnginfo], + is_output_node=True, + ) - # Prepare metadata dictionary - metadata = {} - if not args.disable_metadata: - if prompt is not None: - metadata["prompt"] = json.dumps(prompt) - if extra_pnginfo is not None: - for x in extra_pnginfo: - metadata[x] = json.dumps(extra_pnginfo[x]) + @classmethod + def execute(cls, audio, filename_prefix="ComfyUI", format="flac") -> IO.NodeOutput: + return IO.NodeOutput( + ui=UI.AudioSaveHelper.get_save_audio_ui(audio, filename_prefix=filename_prefix, cls=cls, format=format) + ) - # Opus supported sample rates - OPUS_RATES = [8000, 12000, 16000, 24000, 48000] + save_flac = execute # TODO: remove - for (batch_number, waveform) in enumerate(audio["waveform"].cpu()): - filename_with_batch_num = filename.replace("%batch_num%", str(batch_number)) - file = f"{filename_with_batch_num}_{counter:05}_.{format}" - output_path = os.path.join(full_output_folder, file) - # Use original sample rate initially - sample_rate = audio["sample_rate"] +class SaveAudioMP3(IO.ComfyNode): + @classmethod + def define_schema(cls): + return IO.Schema( + node_id="SaveAudioMP3", + display_name="Save Audio (MP3)", + category="audio", + inputs=[ + IO.Audio.Input("audio"), + IO.String.Input("filename_prefix", default="audio/ComfyUI"), + IO.Combo.Input("quality", options=["V0", "128k", "320k"], default="V0"), + ], + hidden=[IO.Hidden.prompt, IO.Hidden.extra_pnginfo], + is_output_node=True, + ) - # Handle Opus sample rate requirements - if format == "opus": - if sample_rate > 48000: - sample_rate = 48000 - elif sample_rate not in OPUS_RATES: - # Find the next highest supported rate - for rate in sorted(OPUS_RATES): - if rate > sample_rate: - sample_rate = rate - break - if sample_rate not in OPUS_RATES: # Fallback if still not supported - sample_rate = 48000 - - # Resample if necessary - if sample_rate != audio["sample_rate"]: - waveform = torchaudio.functional.resample(waveform, audio["sample_rate"], sample_rate) - - # Create output with specified format - output_buffer = io.BytesIO() - output_container = av.open(output_buffer, mode='w', format=format) - - # Set metadata on the container - for key, value in metadata.items(): - output_container.metadata[key] = value - - layout = 'mono' if waveform.shape[0] == 1 else 'stereo' - # Set up the output stream with appropriate properties - if format == "opus": - out_stream = output_container.add_stream("libopus", rate=sample_rate, layout=layout) - if quality == "64k": - out_stream.bit_rate = 64000 - elif quality == "96k": - out_stream.bit_rate = 96000 - elif quality == "128k": - out_stream.bit_rate = 128000 - elif quality == "192k": - out_stream.bit_rate = 192000 - elif quality == "320k": - out_stream.bit_rate = 320000 - elif format == "mp3": - out_stream = output_container.add_stream("libmp3lame", rate=sample_rate, layout=layout) - if quality == "V0": - #TODO i would really love to support V3 and V5 but there doesn't seem to be a way to set the qscale level, the property below is a bool - out_stream.codec_context.qscale = 1 - elif quality == "128k": - out_stream.bit_rate = 128000 - elif quality == "320k": - out_stream.bit_rate = 320000 - else: #format == "flac": - out_stream = output_container.add_stream("flac", rate=sample_rate, layout=layout) - - frame = av.AudioFrame.from_ndarray(waveform.movedim(0, 1).reshape(1, -1).float().numpy(), format='flt', layout=layout) - frame.sample_rate = sample_rate - frame.pts = 0 - output_container.mux(out_stream.encode(frame)) - - # Flush encoder - output_container.mux(out_stream.encode(None)) - - # Close containers - output_container.close() - - # Write the output to file - output_buffer.seek(0) - with open(output_path, 'wb') as f: - f.write(output_buffer.getbuffer()) - - results.append({ - "filename": file, - "subfolder": subfolder, - "type": self.type - }) - counter += 1 - - return { "ui": { "audio": results } } - -class SaveAudio: - def __init__(self): - self.output_dir = folder_paths.get_output_directory() - self.type = "output" - self.prefix_append = "" - - @classmethod - def INPUT_TYPES(s): - return {"required": { "audio": ("AUDIO", ), - "filename_prefix": ("STRING", {"default": "audio/ComfyUI"}), - }, - "hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"}, - } - - RETURN_TYPES = () - FUNCTION = "save_flac" - - OUTPUT_NODE = True - - CATEGORY = "audio" - - def save_flac(self, audio, filename_prefix="ComfyUI", format="flac", prompt=None, extra_pnginfo=None): - return save_audio(self, audio, filename_prefix, format, prompt, extra_pnginfo) - -class SaveAudioMP3: - def __init__(self): - self.output_dir = folder_paths.get_output_directory() - self.type = "output" - self.prefix_append = "" - - @classmethod - def INPUT_TYPES(s): - return {"required": { "audio": ("AUDIO", ), - "filename_prefix": ("STRING", {"default": "audio/ComfyUI"}), - "quality": (["V0", "128k", "320k"], {"default": "V0"}), - }, - "hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"}, - } - - RETURN_TYPES = () - FUNCTION = "save_mp3" - - OUTPUT_NODE = True - - CATEGORY = "audio" - - def save_mp3(self, audio, filename_prefix="ComfyUI", format="mp3", prompt=None, extra_pnginfo=None, quality="128k"): - return save_audio(self, audio, filename_prefix, format, prompt, extra_pnginfo, quality) - -class SaveAudioOpus: - def __init__(self): - self.output_dir = folder_paths.get_output_directory() - self.type = "output" - self.prefix_append = "" - - @classmethod - def INPUT_TYPES(s): - return {"required": { "audio": ("AUDIO", ), - "filename_prefix": ("STRING", {"default": "audio/ComfyUI"}), - "quality": (["64k", "96k", "128k", "192k", "320k"], {"default": "128k"}), - }, - "hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"}, - } + @classmethod + def execute(cls, audio, filename_prefix="ComfyUI", format="mp3", quality="128k") -> IO.NodeOutput: + return IO.NodeOutput( + ui=UI.AudioSaveHelper.get_save_audio_ui( + audio, filename_prefix=filename_prefix, cls=cls, format=format, quality=quality + ) + ) - RETURN_TYPES = () - FUNCTION = "save_opus" + save_mp3 = execute # TODO: remove - OUTPUT_NODE = True - CATEGORY = "audio" +class SaveAudioOpus(IO.ComfyNode): + @classmethod + def define_schema(cls): + return IO.Schema( + node_id="SaveAudioOpus", + display_name="Save Audio (Opus)", + category="audio", + inputs=[ + IO.Audio.Input("audio"), + IO.String.Input("filename_prefix", default="audio/ComfyUI"), + IO.Combo.Input("quality", options=["64k", "96k", "128k", "192k", "320k"], default="128k"), + ], + hidden=[IO.Hidden.prompt, IO.Hidden.extra_pnginfo], + is_output_node=True, + ) + + @classmethod + def execute(cls, audio, filename_prefix="ComfyUI", format="opus", quality="V3") -> IO.NodeOutput: + return IO.NodeOutput( + ui=UI.AudioSaveHelper.get_save_audio_ui( + audio, filename_prefix=filename_prefix, cls=cls, format=format, quality=quality + ) + ) - def save_opus(self, audio, filename_prefix="ComfyUI", format="opus", prompt=None, extra_pnginfo=None, quality="V3"): - return save_audio(self, audio, filename_prefix, format, prompt, extra_pnginfo, quality) + save_opus = execute # TODO: remove -class PreviewAudio(SaveAudio): - def __init__(self): - self.output_dir = folder_paths.get_temp_directory() - self.type = "temp" - self.prefix_append = "_temp_" + ''.join(random.choice("abcdefghijklmnopqrstupvxyz") for x in range(5)) +class PreviewAudio(IO.ComfyNode): @classmethod - def INPUT_TYPES(s): - return {"required": - {"audio": ("AUDIO", ), }, - "hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"}, - } + def define_schema(cls): + return IO.Schema( + node_id="PreviewAudio", + display_name="Preview Audio", + category="audio", + inputs=[ + IO.Audio.Input("audio"), + ], + hidden=[IO.Hidden.prompt, IO.Hidden.extra_pnginfo], + is_output_node=True, + ) + + @classmethod + def execute(cls, audio) -> IO.NodeOutput: + return IO.NodeOutput(ui=UI.PreviewAudio(audio, cls=cls)) + + save_flac = execute # TODO: remove + def f32_pcm(wav: torch.Tensor) -> torch.Tensor: """Convert audio to float 32 bits PCM format.""" @@ -316,26 +252,30 @@ def load(filepath: str) -> tuple[torch.Tensor, int]: wav = f32_pcm(wav) return wav, sr -class LoadAudio: +class LoadAudio(IO.ComfyNode): @classmethod - def INPUT_TYPES(s): + def define_schema(cls): input_dir = folder_paths.get_input_directory() files = folder_paths.filter_files_content_types(os.listdir(input_dir), ["audio", "video"]) - return {"required": {"audio": (sorted(files), {"audio_upload": True})}} - - CATEGORY = "audio" + return IO.Schema( + node_id="LoadAudio", + display_name="Load Audio", + category="audio", + inputs=[ + IO.Combo.Input("audio", upload=IO.UploadType.audio, options=sorted(files)), + ], + outputs=[IO.Audio.Output()], + ) - RETURN_TYPES = ("AUDIO", ) - FUNCTION = "load" - - def load(self, audio): + @classmethod + def execute(cls, audio) -> IO.NodeOutput: audio_path = folder_paths.get_annotated_filepath(audio) waveform, sample_rate = load(audio_path) audio = {"waveform": waveform.unsqueeze(0), "sample_rate": sample_rate} - return (audio, ) + return IO.NodeOutput(audio) @classmethod - def IS_CHANGED(s, audio): + def fingerprint_inputs(cls, audio): image_path = folder_paths.get_annotated_filepath(audio) m = hashlib.sha256() with open(image_path, 'rb') as f: @@ -343,46 +283,69 @@ def IS_CHANGED(s, audio): return m.digest().hex() @classmethod - def VALIDATE_INPUTS(s, audio): + def validate_inputs(cls, audio): if not folder_paths.exists_annotated_filepath(audio): return "Invalid audio file: {}".format(audio) return True -class RecordAudio: - @classmethod - def INPUT_TYPES(s): - return {"required": {"audio": ("AUDIO_RECORD", {})}} + load = execute # TODO: remove - CATEGORY = "audio" - RETURN_TYPES = ("AUDIO", ) - FUNCTION = "load" +class RecordAudio(IO.ComfyNode): + @classmethod + def define_schema(cls): + return IO.Schema( + node_id="RecordAudio", + display_name="Record Audio", + category="audio", + inputs=[ + IO.Custom("AUDIO_RECORD").Input("audio"), + ], + outputs=[IO.Audio.Output()], + ) - def load(self, audio): + @classmethod + def execute(cls, audio) -> IO.NodeOutput: audio_path = folder_paths.get_annotated_filepath(audio) waveform, sample_rate = load(audio_path) audio = {"waveform": waveform.unsqueeze(0), "sample_rate": sample_rate} - return (audio, ) + return IO.NodeOutput(audio) + load = execute # TODO: remove -class TrimAudioDuration: - @classmethod - def INPUT_TYPES(cls): - return { - "required": { - "audio": ("AUDIO",), - "start_index": ("FLOAT", {"default": 0.0, "min": -0xffffffffffffffff, "max": 0xffffffffffffffff, "step": 0.01, "tooltip": "Start time in seconds, can be negative to count from the end (supports sub-seconds)."}), - "duration": ("FLOAT", {"default": 60.0, "min": 0.0, "step": 0.01, "tooltip": "Duration in seconds"}), - }, - } - FUNCTION = "trim" - RETURN_TYPES = ("AUDIO",) - CATEGORY = "audio" - DESCRIPTION = "Trim audio tensor into chosen time range." +class TrimAudioDuration(IO.ComfyNode): + @classmethod + def define_schema(cls): + return IO.Schema( + node_id="TrimAudioDuration", + display_name="Trim Audio Duration", + description="Trim audio tensor into chosen time range.", + category="audio", + inputs=[ + IO.Audio.Input("audio"), + IO.Float.Input( + "start_index", + default=0.0, + min=-0xffffffffffffffff, + max=0xffffffffffffffff, + step=0.01, + tooltip="Start time in seconds, can be negative to count from the end (supports sub-seconds).", + ), + IO.Float.Input( + "duration", + default=60.0, + min=0.0, + step=0.01, + tooltip="Duration in seconds", + ), + ], + outputs=[IO.Audio.Output()], + ) - def trim(self, audio, start_index, duration): + @classmethod + def execute(cls, audio, start_index, duration) -> IO.NodeOutput: waveform = audio["waveform"] sample_rate = audio["sample_rate"] audio_length = waveform.shape[-1] @@ -399,23 +362,30 @@ def trim(self, audio, start_index, duration): if start_frame >= end_frame: raise ValueError("AudioTrim: Start time must be less than end time and be within the audio length.") - return ({"waveform": waveform[..., start_frame:end_frame], "sample_rate": sample_rate},) + return IO.NodeOutput({"waveform": waveform[..., start_frame:end_frame], "sample_rate": sample_rate}) + trim = execute # TODO: remove -class SplitAudioChannels: - @classmethod - def INPUT_TYPES(s): - return {"required": { - "audio": ("AUDIO",), - }} - RETURN_TYPES = ("AUDIO", "AUDIO") - RETURN_NAMES = ("left", "right") - FUNCTION = "separate" - CATEGORY = "audio" - DESCRIPTION = "Separates the audio into left and right channels." +class SplitAudioChannels(IO.ComfyNode): + @classmethod + def define_schema(cls): + return IO.Schema( + node_id="SplitAudioChannels", + display_name="Split Audio Channels", + description="Separates the audio into left and right channels.", + category="audio", + inputs=[ + IO.Audio.Input("audio"), + ], + outputs=[ + IO.Audio.Output(display_name="left"), + IO.Audio.Output(display_name="right"), + ], + ) - def separate(self, audio): + @classmethod + def execute(cls, audio) -> IO.NodeOutput: waveform = audio["waveform"] sample_rate = audio["sample_rate"] @@ -425,7 +395,9 @@ def separate(self, audio): left_channel = waveform[..., 0:1, :] right_channel = waveform[..., 1:2, :] - return ({"waveform": left_channel, "sample_rate": sample_rate}, {"waveform": right_channel, "sample_rate": sample_rate}) + return IO.NodeOutput({"waveform": left_channel, "sample_rate": sample_rate}, {"waveform": right_channel, "sample_rate": sample_rate}) + + separate = execute # TODO: remove def match_audio_sample_rates(waveform_1, sample_rate_1, waveform_2, sample_rate_2): @@ -443,21 +415,29 @@ def match_audio_sample_rates(waveform_1, sample_rate_1, waveform_2, sample_rate_ return waveform_1, waveform_2, output_sample_rate -class AudioConcat: +class AudioConcat(IO.ComfyNode): @classmethod - def INPUT_TYPES(s): - return {"required": { - "audio1": ("AUDIO",), - "audio2": ("AUDIO",), - "direction": (['after', 'before'], {"default": 'after', "tooltip": "Whether to append audio2 after or before audio1."}), - }} - - RETURN_TYPES = ("AUDIO",) - FUNCTION = "concat" - CATEGORY = "audio" - DESCRIPTION = "Concatenates the audio1 to audio2 in the specified direction." + def define_schema(cls): + return IO.Schema( + node_id="AudioConcat", + display_name="Audio Concat", + description="Concatenates the audio1 to audio2 in the specified direction.", + category="audio", + inputs=[ + IO.Audio.Input("audio1"), + IO.Audio.Input("audio2"), + IO.Combo.Input( + "direction", + options=['after', 'before'], + default="after", + tooltip="Whether to append audio2 after or before audio1.", + ) + ], + outputs=[IO.Audio.Output()], + ) - def concat(self, audio1, audio2, direction): + @classmethod + def execute(cls, audio1, audio2, direction) -> IO.NodeOutput: waveform_1 = audio1["waveform"] waveform_2 = audio2["waveform"] sample_rate_1 = audio1["sample_rate"] @@ -477,26 +457,33 @@ def concat(self, audio1, audio2, direction): elif direction == 'before': concatenated_audio = torch.cat((waveform_2, waveform_1), dim=2) - return ({"waveform": concatenated_audio, "sample_rate": output_sample_rate},) + return IO.NodeOutput({"waveform": concatenated_audio, "sample_rate": output_sample_rate}) + concat = execute # TODO: remove -class AudioMerge: - @classmethod - def INPUT_TYPES(cls): - return { - "required": { - "audio1": ("AUDIO",), - "audio2": ("AUDIO",), - "merge_method": (["add", "mean", "subtract", "multiply"], {"tooltip": "The method used to combine the audio waveforms."}), - }, - } - FUNCTION = "merge" - RETURN_TYPES = ("AUDIO",) - CATEGORY = "audio" - DESCRIPTION = "Combine two audio tracks by overlaying their waveforms." +class AudioMerge(IO.ComfyNode): + @classmethod + def define_schema(cls): + return IO.Schema( + node_id="AudioMerge", + display_name="Audio Merge", + description="Combine two audio tracks by overlaying their waveforms.", + category="audio", + inputs=[ + IO.Audio.Input("audio1"), + IO.Audio.Input("audio2"), + IO.Combo.Input( + "merge_method", + options=["add", "mean", "subtract", "multiply"], + tooltip="The method used to combine the audio waveforms.", + ) + ], + outputs=[IO.Audio.Output()], + ) - def merge(self, audio1, audio2, merge_method): + @classmethod + def execute(cls, audio1, audio2, merge_method) -> IO.NodeOutput: waveform_1 = audio1["waveform"] waveform_2 = audio2["waveform"] sample_rate_1 = audio1["sample_rate"] @@ -530,85 +517,108 @@ def merge(self, audio1, audio2, merge_method): if max_val > 1.0: waveform = waveform / max_val - return ({"waveform": waveform, "sample_rate": output_sample_rate},) + return IO.NodeOutput({"waveform": waveform, "sample_rate": output_sample_rate}) + merge = execute # TODO: remove -class AudioAdjustVolume: - @classmethod - def INPUT_TYPES(s): - return {"required": { - "audio": ("AUDIO",), - "volume": ("INT", {"default": 1.0, "min": -100, "max": 100, "tooltip": "Volume adjustment in decibels (dB). 0 = no change, +6 = double, -6 = half, etc"}), - }} - RETURN_TYPES = ("AUDIO",) - FUNCTION = "adjust_volume" - CATEGORY = "audio" +class AudioAdjustVolume(IO.ComfyNode): + @classmethod + def define_schema(cls): + return IO.Schema( + node_id="AudioAdjustVolume", + display_name="Audio Adjust Volume", + category="audio", + inputs=[ + IO.Audio.Input("audio"), + IO.Int.Input( + "volume", + default=1, + min=-100, + max=100, + tooltip="Volume adjustment in decibels (dB). 0 = no change, +6 = double, -6 = half, etc", + ) + ], + outputs=[IO.Audio.Output()], + ) - def adjust_volume(self, audio, volume): + @classmethod + def execute(cls, audio, volume) -> IO.NodeOutput: if volume == 0: - return (audio,) + return IO.NodeOutput(audio) waveform = audio["waveform"] sample_rate = audio["sample_rate"] gain = 10 ** (volume / 20) waveform = waveform * gain - return ({"waveform": waveform, "sample_rate": sample_rate},) + return IO.NodeOutput({"waveform": waveform, "sample_rate": sample_rate}) + adjust_volume = execute # TODO: remove -class EmptyAudio: - @classmethod - def INPUT_TYPES(s): - return {"required": { - "duration": ("FLOAT", {"default": 60.0, "min": 0.0, "max": 0xffffffffffffffff, "step": 0.01, "tooltip": "Duration of the empty audio clip in seconds"}), - "sample_rate": ("INT", {"default": 44100, "tooltip": "Sample rate of the empty audio clip."}), - "channels": ("INT", {"default": 2, "min": 1, "max": 2, "tooltip": "Number of audio channels (1 for mono, 2 for stereo)."}), - }} - RETURN_TYPES = ("AUDIO",) - FUNCTION = "create_empty_audio" - CATEGORY = "audio" +class EmptyAudio(IO.ComfyNode): + @classmethod + def define_schema(cls): + return IO.Schema( + node_id="EmptyAudio", + display_name="Empty Audio", + category="audio", + inputs=[ + IO.Float.Input( + "duration", + default=60.0, + min=0.0, + max=0xffffffffffffffff, + step=0.01, + tooltip="Duration of the empty audio clip in seconds", + ), + IO.Float.Input( + "sample_rate", + default=44100, + tooltip="Sample rate of the empty audio clip.", + ), + IO.Float.Input( + "channels", + default=2, + min=1, + max=2, + tooltip="Number of audio channels (1 for mono, 2 for stereo).", + ), + ], + outputs=[IO.Audio.Output()], + ) - def create_empty_audio(self, duration, sample_rate, channels): + @classmethod + def execute(cls, duration, sample_rate, channels) -> IO.NodeOutput: num_samples = int(round(duration * sample_rate)) waveform = torch.zeros((1, channels, num_samples), dtype=torch.float32) - return ({"waveform": waveform, "sample_rate": sample_rate},) - - -NODE_CLASS_MAPPINGS = { - "EmptyLatentAudio": EmptyLatentAudio, - "VAEEncodeAudio": VAEEncodeAudio, - "VAEDecodeAudio": VAEDecodeAudio, - "SaveAudio": SaveAudio, - "SaveAudioMP3": SaveAudioMP3, - "SaveAudioOpus": SaveAudioOpus, - "LoadAudio": LoadAudio, - "PreviewAudio": PreviewAudio, - "ConditioningStableAudio": ConditioningStableAudio, - "RecordAudio": RecordAudio, - "TrimAudioDuration": TrimAudioDuration, - "SplitAudioChannels": SplitAudioChannels, - "AudioConcat": AudioConcat, - "AudioMerge": AudioMerge, - "AudioAdjustVolume": AudioAdjustVolume, - "EmptyAudio": EmptyAudio, -} - -NODE_DISPLAY_NAME_MAPPINGS = { - "EmptyLatentAudio": "Empty Latent Audio", - "VAEEncodeAudio": "VAE Encode Audio", - "VAEDecodeAudio": "VAE Decode Audio", - "PreviewAudio": "Preview Audio", - "LoadAudio": "Load Audio", - "SaveAudio": "Save Audio (FLAC)", - "SaveAudioMP3": "Save Audio (MP3)", - "SaveAudioOpus": "Save Audio (Opus)", - "RecordAudio": "Record Audio", - "TrimAudioDuration": "Trim Audio Duration", - "SplitAudioChannels": "Split Audio Channels", - "AudioConcat": "Audio Concat", - "AudioMerge": "Audio Merge", - "AudioAdjustVolume": "Audio Adjust Volume", - "EmptyAudio": "Empty Audio", -} + return IO.NodeOutput({"waveform": waveform, "sample_rate": sample_rate}) + + create_empty_audio = execute # TODO: remove + + +class AudioExtension(ComfyExtension): + @override + async def get_node_list(self) -> list[type[IO.ComfyNode]]: + return [ + EmptyLatentAudio, + VAEEncodeAudio, + VAEDecodeAudio, + SaveAudio, + SaveAudioMP3, + SaveAudioOpus, + LoadAudio, + PreviewAudio, + ConditioningStableAudio, + RecordAudio, + TrimAudioDuration, + SplitAudioChannels, + AudioConcat, + AudioMerge, + AudioAdjustVolume, + EmptyAudio, + ] + +async def comfy_entrypoint() -> AudioExtension: + return AudioExtension()