diff --git a/.github/workflows/docker/compose/finetuning-compose.yaml b/.github/workflows/docker/compose/finetuning-compose.yaml index 94e20941c0..7cc0b20588 100644 --- a/.github/workflows/docker/compose/finetuning-compose.yaml +++ b/.github/workflows/docker/compose/finetuning-compose.yaml @@ -5,9 +5,9 @@ services: finetuning: build: - dockerfile: comps/finetuning/Dockerfile + dockerfile: comps/finetuning/src/Dockerfile image: ${REGISTRY:-opea}/finetuning:${TAG:-latest} finetuning-gaudi: build: - dockerfile: comps/finetuning/Dockerfile.intel_hpu + dockerfile: comps/finetuning/src/Dockerfile.intel_hpu image: ${REGISTRY:-opea}/finetuning-gaudi:${TAG:-latest} diff --git a/comps/finetuning/handlers.py b/comps/finetuning/handlers.py deleted file mode 100644 index a47b9f980a..0000000000 --- a/comps/finetuning/handlers.py +++ /dev/null @@ -1,242 +0,0 @@ -# Copyright (C) 2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -import os -import random -import re -import time -import urllib.parse -import uuid -from pathlib import Path -from typing import Dict - -from fastapi import BackgroundTasks, File, Form, HTTPException, UploadFile -from pydantic_yaml import to_yaml_file -from ray.job_submission import JobSubmissionClient - -from comps import CustomLogger -from comps.cores.proto.api_protocol import ( - FileObject, - FineTuningJob, - FineTuningJobCheckpoint, - FineTuningJobIDRequest, - FineTuningJobList, - UploadFileRequest, -) -from comps.finetuning.finetune_config import FinetuneConfig, FineTuningParams - -logger = CustomLogger("finetuning_handlers") - -DATASET_BASE_PATH = "datasets" -JOBS_PATH = "jobs" -OUTPUT_DIR = "output" - -if not os.path.exists(DATASET_BASE_PATH): - os.mkdir(DATASET_BASE_PATH) -if not os.path.exists(JOBS_PATH): - os.mkdir(JOBS_PATH) -if not os.path.exists(OUTPUT_DIR): - os.mkdir(OUTPUT_DIR) - -FineTuningJobID = str -CheckpointID = str -CheckpointPath = str - -CHECK_JOB_STATUS_INTERVAL = 5 # Check every 5 secs - -global ray_client -ray_client: JobSubmissionClient = None - -running_finetuning_jobs: Dict[FineTuningJobID, FineTuningJob] = {} -finetuning_job_to_ray_job: Dict[FineTuningJobID, str] = {} -checkpoint_id_to_checkpoint_path: Dict[CheckpointID, CheckpointPath] = {} - - -# Add a background task to periodicly update job status -def update_job_status(job_id: FineTuningJobID): - while True: - job_status = ray_client.get_job_status(finetuning_job_to_ray_job[job_id]) - status = str(job_status).lower() - # Ray status "stopped" is OpenAI status "cancelled" - status = "cancelled" if status == "stopped" else status - logger.info(f"Status of job {job_id} is '{status}'") - running_finetuning_jobs[job_id].status = status - if status == "succeeded" or status == "cancelled" or status == "failed": - break - time.sleep(CHECK_JOB_STATUS_INTERVAL) - - -def handle_create_finetuning_jobs(request: FineTuningParams, background_tasks: BackgroundTasks): - base_model = request.model - train_file = request.training_file - train_file_path = os.path.join(DATASET_BASE_PATH, train_file) - - if not os.path.exists(train_file_path): - raise HTTPException(status_code=404, detail=f"Training file '{train_file}' not found!") - - finetune_config = FinetuneConfig(General=request.General, Dataset=request.Dataset, Training=request.Training) - finetune_config.General.base_model = base_model - finetune_config.Dataset.train_file = train_file_path - if request.hyperparameters is not None: - if request.hyperparameters.epochs != "auto": - finetune_config.Training.epochs = request.hyperparameters.epochs - - if request.hyperparameters.batch_size != "auto": - finetune_config.Training.batch_size = request.hyperparameters.batch_size - - if request.hyperparameters.learning_rate_multiplier != "auto": - finetune_config.Training.learning_rate = request.hyperparameters.learning_rate_multiplier - - if os.getenv("HF_TOKEN", None): - finetune_config.General.config.token = os.getenv("HF_TOKEN", None) - - job = FineTuningJob( - id=f"ft-job-{uuid.uuid4()}", - model=base_model, - created_at=int(time.time()), - training_file=train_file, - hyperparameters={ - "n_epochs": finetune_config.Training.epochs, - "batch_size": finetune_config.Training.batch_size, - "learning_rate_multiplier": finetune_config.Training.learning_rate, - }, - status="running", - seed=random.randint(0, 1000) if request.seed is None else request.seed, - ) - finetune_config.General.output_dir = os.path.join(OUTPUT_DIR, job.id) - if os.getenv("DEVICE", ""): - logger.info(f"specific device: {os.getenv('DEVICE')}") - - finetune_config.Training.device = os.getenv("DEVICE") - if finetune_config.Training.device == "hpu": - if finetune_config.Training.resources_per_worker.HPU == 0: - # set 1 - finetune_config.Training.resources_per_worker.HPU = 1 - - finetune_config_file = f"{JOBS_PATH}/{job.id}.yaml" - to_yaml_file(finetune_config_file, finetune_config) - - global ray_client - ray_client = JobSubmissionClient() if ray_client is None else ray_client - - ray_job_id = ray_client.submit_job( - # Entrypoint shell command to execute - entrypoint=f"python finetune_runner.py --config_file {finetune_config_file}", - ) - - logger.info(f"Submitted Ray job: {ray_job_id} ...") - - running_finetuning_jobs[job.id] = job - finetuning_job_to_ray_job[job.id] = ray_job_id - - background_tasks.add_task(update_job_status, job.id) - - return job - - -def handle_list_finetuning_jobs(): - finetuning_jobs_list = FineTuningJobList(data=list(running_finetuning_jobs.values()), has_more=False) - - return finetuning_jobs_list - - -def handle_retrieve_finetuning_job(request: FineTuningJobIDRequest): - fine_tuning_job_id = request.fine_tuning_job_id - - job = running_finetuning_jobs.get(fine_tuning_job_id) - if job is None: - raise HTTPException(status_code=404, detail=f"Fine-tuning job '{fine_tuning_job_id}' not found!") - return job - - -def handle_cancel_finetuning_job(request: FineTuningJobIDRequest): - fine_tuning_job_id = request.fine_tuning_job_id - - ray_job_id = finetuning_job_to_ray_job.get(fine_tuning_job_id) - if ray_job_id is None: - raise HTTPException(status_code=404, detail=f"Fine-tuning job '{fine_tuning_job_id}' not found!") - - global ray_client - ray_client = JobSubmissionClient() if ray_client is None else ray_client - ray_client.stop_job(ray_job_id) - - job = running_finetuning_jobs.get(fine_tuning_job_id) - job.status = "cancelled" - return job - - -async def save_content_to_local_disk(save_path: str, content): - save_path = Path(save_path) - try: - if isinstance(content, str): - with open(save_path, "w", encoding="utf-8") as file: - file.write(content) - else: - with save_path.open("wb") as fout: - content = await content.read() - fout.write(content) - except Exception as e: - logger.info(f"Write file failed. Exception: {e}") - raise Exception(status_code=500, detail=f"Write file {save_path} failed. Exception: {e}") - - -def handle_list_finetuning_checkpoints(request: FineTuningJobIDRequest): - fine_tuning_job_id = request.fine_tuning_job_id - - job = running_finetuning_jobs.get(fine_tuning_job_id) - if job is None: - raise HTTPException(status_code=404, detail=f"Fine-tuning job '{fine_tuning_job_id}' not found!") - output_dir = os.path.join(OUTPUT_DIR, job.id) - checkpoints = [] - if os.path.exists(output_dir): - # Iterate over the contents of the directory and add an entry for each - files = os.listdir(output_dir) - for file in files: # Loop over directory contents - file_path = os.path.join(output_dir, file) - if os.path.isdir(file_path) and file.startswith("checkpoint"): - steps = re.findall("\d+", file)[0] - checkpointsResponse = FineTuningJobCheckpoint( - id=f"ftckpt-{uuid.uuid4()}", # Generate a unique ID - created_at=int(time.time()), # Use the current timestamp - fine_tuned_model_checkpoint=file_path, # Directory path itself - fine_tuning_job_id=fine_tuning_job_id, - object="fine_tuning.job.checkpoint", - step_number=steps, - ) - checkpoints.append(checkpointsResponse) - if job.status == "succeeded": - checkpointsResponse = FineTuningJobCheckpoint( - id=f"ftckpt-{uuid.uuid4()}", # Generate a unique ID - created_at=int(time.time()), # Use the current timestamp - fine_tuned_model_checkpoint=output_dir, # Directory path itself - fine_tuning_job_id=fine_tuning_job_id, - object="fine_tuning.job.checkpoint", - ) - checkpoints.append(checkpointsResponse) - - return checkpoints - - -async def upload_file(purpose: str = Form(...), file: UploadFile = File(...)): - return UploadFileRequest(purpose=purpose, file=file) - - -async def handle_upload_training_files(request: UploadFileRequest): - file = request.file - if file is None: - raise HTTPException(status_code=404, detail="upload file failed!") - filename = urllib.parse.quote(file.filename, safe="") - save_path = os.path.join(DATASET_BASE_PATH, filename) - await save_content_to_local_disk(save_path, file) - - fileBytes = os.path.getsize(save_path) - fileInfo = FileObject( - id=f"file-{uuid.uuid4()}", - object="file", - bytes=fileBytes, - created_at=int(time.time()), - filename=filename, - purpose="fine-tune", - ) - - return fileInfo diff --git a/comps/finetuning/Dockerfile b/comps/finetuning/src/Dockerfile similarity index 91% rename from comps/finetuning/Dockerfile rename to comps/finetuning/src/Dockerfile index c1c0b0bf6d..1edc8c3f5c 100644 --- a/comps/finetuning/Dockerfile +++ b/comps/finetuning/src/Dockerfile @@ -24,17 +24,17 @@ RUN python -m pip install --no-cache-dir --upgrade pip && \ python -m pip install --no-cache-dir torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cpu && \ python -m pip install --no-cache-dir intel-extension-for-pytorch && \ python -m pip install --no-cache-dir oneccl_bind_pt --extra-index-url https://pytorch-extension.intel.com/release-whl/stable/cpu/cn/ && \ - python -m pip install --no-cache-dir -r /home/user/comps/finetuning/requirements.txt + python -m pip install --no-cache-dir -r /home/user/comps/finetuning/src/requirements.txt ENV PYTHONPATH=$PYTHONPATH:/home/user -WORKDIR /home/user/comps/finetuning/ +WORKDIR /home/user/comps/finetuning/src RUN echo PKGPATH=$(python3 -c "import pkg_resources; print(pkg_resources.get_distribution('oneccl-bind-pt').location)") >> run.sh && \ echo 'export LD_LIBRARY_PATH=$PKGPATH/oneccl_bindings_for_pytorch/opt/mpi/lib/:$LD_LIBRARY_PATH' >> run.sh && \ echo 'source $PKGPATH/oneccl_bindings_for_pytorch/env/setvars.sh' >> run.sh && \ echo ray start --head --dashboard-host=0.0.0.0 >> run.sh && \ echo export RAY_ADDRESS=http://localhost:8265 >> run.sh && \ - echo python finetuning_service.py >> run.sh + echo python opea_finetuning_microservice.py >> run.sh CMD bash run.sh diff --git a/comps/finetuning/Dockerfile.intel_hpu b/comps/finetuning/src/Dockerfile.intel_hpu similarity index 90% rename from comps/finetuning/Dockerfile.intel_hpu rename to comps/finetuning/src/Dockerfile.intel_hpu index 6acd54a828..ab40f9c48e 100644 --- a/comps/finetuning/Dockerfile.intel_hpu +++ b/comps/finetuning/src/Dockerfile.intel_hpu @@ -19,11 +19,11 @@ USER user ENV PATH=$PATH:/home/user/.local/bin RUN python -m pip install --no-cache-dir --upgrade pip && \ - python -m pip install --no-cache-dir -r /home/user/comps/finetuning/requirements.txt && \ + python -m pip install --no-cache-dir -r /home/user/comps/finetuning/src/requirements.txt && \ python -m pip install --no-cache-dir optimum-habana ENV PYTHONPATH=$PYTHONPATH:/home/user -WORKDIR /home/user/comps/finetuning/ +WORKDIR /home/user/comps/finetuning/src ENTRYPOINT ["/bin/bash", "launch.sh"] diff --git a/comps/finetuning/README.md b/comps/finetuning/src/README.md similarity index 95% rename from comps/finetuning/README.md rename to comps/finetuning/src/README.md index a0d620c960..ac6bb8cad7 100644 --- a/comps/finetuning/README.md +++ b/comps/finetuning/src/README.md @@ -53,7 +53,7 @@ Build docker image with below command: ```bash export HF_TOKEN=${your_huggingface_token} cd ../../ -docker build -t opea/finetuning:latest --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy --build-arg HF_TOKEN=$HF_TOKEN -f comps/finetuning/Dockerfile . +docker build -t opea/finetuning:latest --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy --build-arg HF_TOKEN=$HF_TOKEN -f comps/finetuning/src/Dockerfile . ``` #### 2.1.2 Run Docker with CLI @@ -72,7 +72,7 @@ Build docker image with below command: ```bash cd ../../ -docker build -t opea/finetuning-gaudi:latest --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy -f comps/finetuning/Dockerfile.intel_hpu . +docker build -t opea/finetuning-gaudi:latest --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy -f comps/finetuning/src/Dockerfile.intel_hpu . ``` #### 2.2.2 Run Docker with CLI @@ -244,8 +244,8 @@ curl http://${your_ip}:8015/v1/finetune/list_checkpoints -X POST -H "Content-Typ ### 3.4 Leverage fine-tuned model -After fine-tuning job is done, fine-tuned model can be chosen from listed checkpoints, then the fine-tuned model can be used in other microservices. For example, fine-tuned reranking model can be used in [rerankings](../rerankings/src/README.md) microservice by assign its path to the environment variable `RERANK_MODEL_ID`, fine-tuned embedding model can be used in [embeddings](../embeddings/src/README.md) microservice by assign its path to the environment variable `model`, LLMs after instruction tuning can be used in [llms](../llms/src/text-generation/README.md) microservice by assign its path to the environment variable `your_hf_llm_model`. +After fine-tuning job is done, fine-tuned model can be chosen from listed checkpoints, then the fine-tuned model can be used in other microservices. For example, fine-tuned reranking model can be used in [reranks](../../rerankings/src/README.md) microservice by assign its path to the environment variable `RERANK_MODEL_ID`, fine-tuned embedding model can be used in [embeddings](../../embeddings/src/README.md) microservice by assign its path to the environment variable `model`, LLMs after instruction tuning can be used in [llms](../../llms/text-generation/README.md) microservice by assign its path to the environment variable `your_hf_llm_model`. ## 🚀4. Descriptions for Finetuning parameters -We utilize [OpenAI finetuning parameters](https://platform.openai.com/docs/api-reference/fine-tuning) and extend it with more customizable parameters, see the definitions at [finetune_config](https://github.com/opea-project/GenAIComps/blob/main/comps/finetuning/finetune_config.py). +We utilize [OpenAI finetuning parameters](https://platform.openai.com/docs/api-reference/fine-tuning) and extend it with more customizable parameters, see the definitions at [finetune_config](https://github.com/opea-project/GenAIComps/blob/main/comps/finetuning/src/integrations/finetune_config.py). diff --git a/comps/finetuning/src/integrations/__init__.py b/comps/finetuning/src/integrations/__init__.py new file mode 100644 index 0000000000..916f3a44b2 --- /dev/null +++ b/comps/finetuning/src/integrations/__init__.py @@ -0,0 +1,2 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 diff --git a/comps/finetuning/finetune_config.py b/comps/finetuning/src/integrations/finetune_config.py similarity index 100% rename from comps/finetuning/finetune_config.py rename to comps/finetuning/src/integrations/finetune_config.py diff --git a/comps/finetuning/finetune_runner.py b/comps/finetuning/src/integrations/finetune_runner.py similarity index 84% rename from comps/finetuning/finetune_runner.py rename to comps/finetuning/src/integrations/finetune_runner.py index 0161cd1cab..9a3ce3156d 100644 --- a/comps/finetuning/finetune_runner.py +++ b/comps/finetuning/src/integrations/finetune_runner.py @@ -6,7 +6,7 @@ from pydantic_yaml import parse_yaml_raw_as from transformers import TrainerCallback, TrainerControl, TrainerState, TrainingArguments -from comps.finetuning.finetune_config import FinetuneConfig +from comps.finetuning.src.integrations.finetune_config import FinetuneConfig class FineTuneCallback(TrainerCallback): @@ -29,7 +29,7 @@ def main(): callback = FineTuneCallback() finetune_config["Training"]["callbacks"] = [callback] - from comps.finetuning.llm_on_ray.finetune.finetune import main as llm_on_ray_finetune_main + from comps.finetuning.src.integrations.llm_on_ray.finetune.finetune import main as llm_on_ray_finetune_main llm_on_ray_finetune_main(finetune_config) diff --git a/comps/finetuning/llm_on_ray/common/__init__.py b/comps/finetuning/src/integrations/llm_on_ray/common/__init__.py similarity index 100% rename from comps/finetuning/llm_on_ray/common/__init__.py rename to comps/finetuning/src/integrations/llm_on_ray/common/__init__.py diff --git a/comps/finetuning/llm_on_ray/common/common.py b/comps/finetuning/src/integrations/llm_on_ray/common/common.py similarity index 100% rename from comps/finetuning/llm_on_ray/common/common.py rename to comps/finetuning/src/integrations/llm_on_ray/common/common.py diff --git a/comps/finetuning/llm_on_ray/common/torch_config.py b/comps/finetuning/src/integrations/llm_on_ray/common/torch_config.py similarity index 100% rename from comps/finetuning/llm_on_ray/common/torch_config.py rename to comps/finetuning/src/integrations/llm_on_ray/common/torch_config.py diff --git a/comps/finetuning/llm_on_ray/finetune/__init__.py b/comps/finetuning/src/integrations/llm_on_ray/finetune/__init__.py similarity index 100% rename from comps/finetuning/llm_on_ray/finetune/__init__.py rename to comps/finetuning/src/integrations/llm_on_ray/finetune/__init__.py diff --git a/comps/finetuning/llm_on_ray/finetune/data_process.py b/comps/finetuning/src/integrations/llm_on_ray/finetune/data_process.py similarity index 100% rename from comps/finetuning/llm_on_ray/finetune/data_process.py rename to comps/finetuning/src/integrations/llm_on_ray/finetune/data_process.py diff --git a/comps/finetuning/llm_on_ray/finetune/dpo_trainer.py b/comps/finetuning/src/integrations/llm_on_ray/finetune/dpo_trainer.py similarity index 100% rename from comps/finetuning/llm_on_ray/finetune/dpo_trainer.py rename to comps/finetuning/src/integrations/llm_on_ray/finetune/dpo_trainer.py diff --git a/comps/finetuning/llm_on_ray/finetune/finetune.py b/comps/finetuning/src/integrations/llm_on_ray/finetune/finetune.py similarity index 97% rename from comps/finetuning/llm_on_ray/finetune/finetune.py rename to comps/finetuning/src/integrations/llm_on_ray/finetune/finetune.py index d105269a40..3eabb90938 100644 --- a/comps/finetuning/llm_on_ray/finetune/finetune.py +++ b/comps/finetuning/src/integrations/llm_on_ray/finetune/finetune.py @@ -23,9 +23,9 @@ from transformers import Trainer, TrainingArguments from comps import CustomLogger -from comps.finetuning.finetune_config import FinetuneConfig -from comps.finetuning.llm_on_ray import common -from comps.finetuning.llm_on_ray.finetune.data_process import ( +from comps.finetuning.src.integrations.finetune_config import FinetuneConfig +from comps.finetuning.src.integrations.llm_on_ray import common +from comps.finetuning.src.integrations.llm_on_ray.finetune.data_process import ( DPOCollator, DPODataProcessor, EmbedCollator, @@ -35,7 +35,7 @@ TrainDatasetForCE, TrainDatasetForEmbedding, ) -from comps.finetuning.llm_on_ray.finetune.modeling import BiEncoderModel, CrossEncoder +from comps.finetuning.src.integrations.llm_on_ray.finetune.modeling import BiEncoderModel, CrossEncoder logger = CustomLogger("llm_on_ray/finetune") @@ -394,7 +394,7 @@ def get_trainer(config: Dict, model, ref_model, tokenizer, tokenized_dataset, da if task == "dpo": lora_config = config["General"].get("lora_config", None) peft_config = LoraConfig(**lora_config) - from comps.finetuning.llm_on_ray.finetune.dpo_trainer import DPOTrainer + from comps.finetuning.src.integrations.llm_on_ray.finetune.dpo_trainer import DPOTrainer trainer = DPOTrainer( model, @@ -431,7 +431,7 @@ def get_trainer(config: Dict, model, ref_model, tokenizer, tokenized_dataset, da if task == "dpo": lora_config = config["General"].get("lora_config", None) peft_config = LoraConfig(**lora_config) - from comps.finetuning.llm_on_ray.finetune.dpo_trainer import GaudiDPOTrainer + from comps.finetuning.src.integrations.llm_on_ray.finetune.dpo_trainer import GaudiDPOTrainer trainer = GaudiDPOTrainer( model, diff --git a/comps/finetuning/llm_on_ray/finetune/modeling.py b/comps/finetuning/src/integrations/llm_on_ray/finetune/modeling.py similarity index 100% rename from comps/finetuning/llm_on_ray/finetune/modeling.py rename to comps/finetuning/src/integrations/llm_on_ray/finetune/modeling.py diff --git a/comps/finetuning/src/integrations/opea.py b/comps/finetuning/src/integrations/opea.py new file mode 100644 index 0000000000..0227fbf7b0 --- /dev/null +++ b/comps/finetuning/src/integrations/opea.py @@ -0,0 +1,255 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import os +import random +import re +import time +import urllib.parse +import uuid +from pathlib import Path +from typing import Dict + +from fastapi import BackgroundTasks, File, Form, HTTPException, UploadFile +from pydantic_yaml import to_yaml_file +from ray.job_submission import JobSubmissionClient + +from comps import CustomLogger, OpeaComponent, OpeaComponentRegistry +from comps.cores.proto.api_protocol import ( + FileObject, + FineTuningJob, + FineTuningJobCheckpoint, + FineTuningJobIDRequest, + FineTuningJobList, + UploadFileRequest, +) +from comps.finetuning.src.integrations.finetune_config import FinetuneConfig, FineTuningParams + +logger = CustomLogger("opea") + +DATASET_BASE_PATH = "datasets" +JOBS_PATH = "jobs" +OUTPUT_DIR = "output" + +if not os.path.exists(DATASET_BASE_PATH): + os.mkdir(DATASET_BASE_PATH) +if not os.path.exists(JOBS_PATH): + os.mkdir(JOBS_PATH) +if not os.path.exists(OUTPUT_DIR): + os.mkdir(OUTPUT_DIR) + +FineTuningJobID = str +CheckpointID = str +CheckpointPath = str + +CHECK_JOB_STATUS_INTERVAL = 5 # Check every 5 secs + +global ray_client +ray_client: JobSubmissionClient = None + +running_finetuning_jobs: Dict[FineTuningJobID, FineTuningJob] = {} +finetuning_job_to_ray_job: Dict[FineTuningJobID, str] = {} +checkpoint_id_to_checkpoint_path: Dict[CheckpointID, CheckpointPath] = {} + + +# Add a background task to periodicly update job status +def update_job_status(job_id: FineTuningJobID): + while True: + job_status = ray_client.get_job_status(finetuning_job_to_ray_job[job_id]) + status = str(job_status).lower() + # Ray status "stopped" is OpenAI status "cancelled" + status = "cancelled" if status == "stopped" else status + logger.info(f"Status of job {job_id} is '{status}'") + running_finetuning_jobs[job_id].status = status + if status == "succeeded" or status == "cancelled" or status == "failed": + break + time.sleep(CHECK_JOB_STATUS_INTERVAL) + + +async def save_content_to_local_disk(save_path: str, content): + save_path = Path(save_path) + try: + if isinstance(content, str): + with open(save_path, "w", encoding="utf-8") as file: + file.write(content) + else: + with save_path.open("wb") as fout: + content = await content.read() + fout.write(content) + except Exception as e: + logger.info(f"Write file failed. Exception: {e}") + raise Exception(status_code=500, detail=f"Write file {save_path} failed. Exception: {e}") + + +async def upload_file(purpose: str = Form(...), file: UploadFile = File(...)): + return UploadFileRequest(purpose=purpose, file=file) + + +@OpeaComponentRegistry.register("OPEA_FINETUNING") +class OpeaFinetuning(OpeaComponent): + """A specialized finetuning component derived from OpeaComponent for finetuning services.""" + + def __init__(self, name: str, description: str, config: dict = None): + super().__init__(name, "finetuning", description, config) + + def create_finetuning_jobs(self, request: FineTuningParams, background_tasks: BackgroundTasks): + base_model = request.model + train_file = request.training_file + train_file_path = os.path.join(DATASET_BASE_PATH, train_file) + + if not os.path.exists(train_file_path): + raise HTTPException(status_code=404, detail=f"Training file '{train_file}' not found!") + + finetune_config = FinetuneConfig(General=request.General, Dataset=request.Dataset, Training=request.Training) + finetune_config.General.base_model = base_model + finetune_config.Dataset.train_file = train_file_path + if request.hyperparameters is not None: + if request.hyperparameters.epochs != "auto": + finetune_config.Training.epochs = request.hyperparameters.epochs + + if request.hyperparameters.batch_size != "auto": + finetune_config.Training.batch_size = request.hyperparameters.batch_size + + if request.hyperparameters.learning_rate_multiplier != "auto": + finetune_config.Training.learning_rate = request.hyperparameters.learning_rate_multiplier + + if os.getenv("HF_TOKEN", None): + finetune_config.General.config.token = os.getenv("HF_TOKEN", None) + + job = FineTuningJob( + id=f"ft-job-{uuid.uuid4()}", + model=base_model, + created_at=int(time.time()), + training_file=train_file, + hyperparameters={ + "n_epochs": finetune_config.Training.epochs, + "batch_size": finetune_config.Training.batch_size, + "learning_rate_multiplier": finetune_config.Training.learning_rate, + }, + status="running", + seed=random.randint(0, 1000) if request.seed is None else request.seed, + ) + finetune_config.General.output_dir = os.path.join(OUTPUT_DIR, job.id) + if os.getenv("DEVICE", ""): + logger.info(f"specific device: {os.getenv('DEVICE')}") + + finetune_config.Training.device = os.getenv("DEVICE") + if finetune_config.Training.device == "hpu": + if finetune_config.Training.resources_per_worker.HPU == 0: + # set 1 + finetune_config.Training.resources_per_worker.HPU = 1 + + finetune_config_file = f"{JOBS_PATH}/{job.id}.yaml" + to_yaml_file(finetune_config_file, finetune_config) + + global ray_client + ray_client = JobSubmissionClient() if ray_client is None else ray_client + + ray_job_id = ray_client.submit_job( + # Entrypoint shell command to execute + entrypoint=f"python integrations/finetune_runner.py --config_file {finetune_config_file}", + ) + + logger.info(f"Submitted Ray job: {ray_job_id} ...") + + running_finetuning_jobs[job.id] = job + finetuning_job_to_ray_job[job.id] = ray_job_id + + background_tasks.add_task(update_job_status, job.id) + + return job + + def list_finetuning_jobs(self): + finetuning_jobs_list = FineTuningJobList(data=list(running_finetuning_jobs.values()), has_more=False) + + return finetuning_jobs_list + + def retrieve_finetuning_job(self, request: FineTuningJobIDRequest): + fine_tuning_job_id = request.fine_tuning_job_id + + job = running_finetuning_jobs.get(fine_tuning_job_id) + if job is None: + raise HTTPException(status_code=404, detail=f"Fine-tuning job '{fine_tuning_job_id}' not found!") + return job + + def cancel_finetuning_job(self, request: FineTuningJobIDRequest): + fine_tuning_job_id = request.fine_tuning_job_id + + ray_job_id = finetuning_job_to_ray_job.get(fine_tuning_job_id) + if ray_job_id is None: + raise HTTPException(status_code=404, detail=f"Fine-tuning job '{fine_tuning_job_id}' not found!") + + global ray_client + ray_client = JobSubmissionClient() if ray_client is None else ray_client + ray_client.stop_job(ray_job_id) + + job = running_finetuning_jobs.get(fine_tuning_job_id) + job.status = "cancelled" + return job + + def list_finetuning_checkpoints(self, request: FineTuningJobIDRequest): + fine_tuning_job_id = request.fine_tuning_job_id + + job = running_finetuning_jobs.get(fine_tuning_job_id) + if job is None: + raise HTTPException(status_code=404, detail=f"Fine-tuning job '{fine_tuning_job_id}' not found!") + output_dir = os.path.join(OUTPUT_DIR, job.id) + checkpoints = [] + if os.path.exists(output_dir): + # Iterate over the contents of the directory and add an entry for each + files = os.listdir(output_dir) + for file in files: # Loop over directory contents + file_path = os.path.join(output_dir, file) + if os.path.isdir(file_path) and file.startswith("checkpoint"): + steps = re.findall("\d+", file)[0] + checkpointsResponse = FineTuningJobCheckpoint( + id=f"ftckpt-{uuid.uuid4()}", # Generate a unique ID + created_at=int(time.time()), # Use the current timestamp + fine_tuned_model_checkpoint=file_path, # Directory path itself + fine_tuning_job_id=fine_tuning_job_id, + object="fine_tuning.job.checkpoint", + step_number=steps, + ) + checkpoints.append(checkpointsResponse) + if job.status == "succeeded": + checkpointsResponse = FineTuningJobCheckpoint( + id=f"ftckpt-{uuid.uuid4()}", # Generate a unique ID + created_at=int(time.time()), # Use the current timestamp + fine_tuned_model_checkpoint=output_dir, # Directory path itself + fine_tuning_job_id=fine_tuning_job_id, + object="fine_tuning.job.checkpoint", + ) + checkpoints.append(checkpointsResponse) + + return checkpoints + + async def upload_training_files(self, request: UploadFileRequest): + file = request.file + if file is None: + raise HTTPException(status_code=404, detail="upload file failed!") + filename = urllib.parse.quote(file.filename, safe="") + save_path = os.path.join(DATASET_BASE_PATH, filename) + await save_content_to_local_disk(save_path, file) + + fileBytes = os.path.getsize(save_path) + fileInfo = FileObject( + id=f"file-{uuid.uuid4()}", + object="file", + bytes=fileBytes, + created_at=int(time.time()), + filename=filename, + purpose="fine-tune", + ) + + return fileInfo + + def invoke(self, *args, **kwargs): + pass + + def check_health(self) -> bool: + """Checks the health of the component. + + Returns: + bool: True if the component is healthy, False otherwise. + """ + return True diff --git a/comps/finetuning/launch.sh b/comps/finetuning/src/launch.sh similarity index 88% rename from comps/finetuning/launch.sh rename to comps/finetuning/src/launch.sh index f439db2dec..43730c38ef 100644 --- a/comps/finetuning/launch.sh +++ b/comps/finetuning/src/launch.sh @@ -9,4 +9,4 @@ else fi export RAY_ADDRESS=http://localhost:$RAY_PORT -python finetuning_service.py +python opea_finetuning_microservice.py diff --git a/comps/finetuning/src/opea_finetuning_loader.py b/comps/finetuning/src/opea_finetuning_loader.py new file mode 100644 index 0000000000..e8f35804a2 --- /dev/null +++ b/comps/finetuning/src/opea_finetuning_loader.py @@ -0,0 +1,32 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +from comps import CustomLogger, OpeaComponentLoader + +logger = CustomLogger("opea_finetuning_loader") + + +class OpeaFinetuningLoader(OpeaComponentLoader): + def __init__(self, component_name, **kwargs): + super().__init__(component_name=component_name, **kwargs) + + def invoke(self, *args, **kwargs): + pass + + def create_finetuning_jobs(self, *args, **kwargs): + return self.component.create_finetuning_jobs(*args, **kwargs) + + def cancel_finetuning_job(self, *args, **kwargs): + return self.component.cancel_finetuning_job(*args, **kwargs) + + def list_finetuning_checkpoints(self, *args, **kwargs): + return self.component.list_finetuning_checkpoints(*args, **kwargs) + + def list_finetuning_jobs(self, *args, **kwargs): + return self.component.list_finetuning_jobs(*args, **kwargs) + + def retrieve_finetuning_job(self, *args, **kwargs): + return self.component.retrieve_finetuning_job(*args, **kwargs) + + async def upload_training_files(self, *args, **kwargs): + return await self.component.upload_training_files(*args, **kwargs) diff --git a/comps/finetuning/finetuning_service.py b/comps/finetuning/src/opea_finetuning_microservice.py similarity index 61% rename from comps/finetuning/finetuning_service.py rename to comps/finetuning/src/opea_finetuning_microservice.py index 64097c720c..2578695c4d 100644 --- a/comps/finetuning/finetuning_service.py +++ b/comps/finetuning/src/opea_finetuning_microservice.py @@ -1,38 +1,42 @@ # Copyright (C) 2024 Intel Corporation # SPDX-License-Identifier: Apache-2.0 +import os + from fastapi import BackgroundTasks, Depends -from comps import opea_microservices, register_microservice +from comps import CustomLogger, opea_microservices, register_microservice from comps.cores.proto.api_protocol import FineTuningJobIDRequest, UploadFileRequest -from comps.finetuning.finetune_config import FineTuningParams -from comps.finetuning.handlers import ( - handle_cancel_finetuning_job, - handle_create_finetuning_jobs, - handle_list_finetuning_checkpoints, - handle_list_finetuning_jobs, - handle_retrieve_finetuning_job, - handle_upload_training_files, - upload_file, +from comps.finetuning.src.integrations.finetune_config import FineTuningParams +from comps.finetuning.src.integrations.opea import OpeaFinetuning, upload_file +from comps.finetuning.src.opea_finetuning_loader import OpeaFinetuningLoader + +logger = CustomLogger("opea_finetuning_microservice") + +finetuning_component_name = os.getenv("FINETUNING_COMPONENT_NAME", "OPEA_FINETUNING") +# Initialize OpeaComponentLoader +loader = OpeaFinetuningLoader( + finetuning_component_name, + description=f"OPEA FINETUNING Component: {finetuning_component_name}", ) @register_microservice(name="opea_service@finetuning", endpoint="/v1/fine_tuning/jobs", host="0.0.0.0", port=8015) def create_finetuning_jobs(request: FineTuningParams, background_tasks: BackgroundTasks): - return handle_create_finetuning_jobs(request, background_tasks) + return loader.create_finetuning_jobs(request, background_tasks) @register_microservice( name="opea_service@finetuning", endpoint="/v1/fine_tuning/jobs", host="0.0.0.0", port=8015, methods=["GET"] ) def list_finetuning_jobs(): - return handle_list_finetuning_jobs() + return loader.list_finetuning_jobs() @register_microservice( name="opea_service@finetuning", endpoint="/v1/fine_tuning/jobs/retrieve", host="0.0.0.0", port=8015 ) def retrieve_finetuning_job(request: FineTuningJobIDRequest): - job = handle_retrieve_finetuning_job(request) + job = loader.retrieve_finetuning_job(request) return job @@ -40,7 +44,7 @@ def retrieve_finetuning_job(request: FineTuningJobIDRequest): name="opea_service@finetuning", endpoint="/v1/fine_tuning/jobs/cancel", host="0.0.0.0", port=8015 ) def cancel_finetuning_job(request: FineTuningJobIDRequest): - job = handle_cancel_finetuning_job(request) + job = loader.cancel_finetuning_job(request) return job @@ -51,7 +55,7 @@ def cancel_finetuning_job(request: FineTuningJobIDRequest): port=8015, ) async def upload_training_files(request: UploadFileRequest = Depends(upload_file)): - uploadFileInfo = await handle_upload_training_files(request) + uploadFileInfo = await loader.upload_training_files(request) return uploadFileInfo @@ -59,7 +63,7 @@ async def upload_training_files(request: UploadFileRequest = Depends(upload_file name="opea_service@finetuning", endpoint="/v1/finetune/list_checkpoints", host="0.0.0.0", port=8015 ) def list_checkpoints(request: FineTuningJobIDRequest): - checkpoints = handle_list_finetuning_checkpoints(request) + checkpoints = loader.list_finetuning_checkpoints(request) return checkpoints diff --git a/comps/finetuning/requirements.txt b/comps/finetuning/src/requirements.txt similarity index 100% rename from comps/finetuning/requirements.txt rename to comps/finetuning/src/requirements.txt diff --git a/tests/finetuning/test_finetuning.sh b/tests/finetuning/test_finetuning_opea.sh similarity index 99% rename from tests/finetuning/test_finetuning.sh rename to tests/finetuning/test_finetuning_opea.sh index 11a544dfda..879c8fc7bd 100644 --- a/tests/finetuning/test_finetuning.sh +++ b/tests/finetuning/test_finetuning_opea.sh @@ -13,7 +13,7 @@ ray_port=8265 function build_docker_images() { cd $WORKPATH echo $(pwd) - docker build -t opea/finetuning:comps --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy --build-arg HF_TOKEN=$HF_TOKEN -f comps/finetuning/Dockerfile . + docker build -t opea/finetuning:comps --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy --build-arg HF_TOKEN=$HF_TOKEN -f comps/finetuning/src/Dockerfile . if [ $? -ne 0 ]; then echo "opea/finetuning built fail" exit 1 diff --git a/tests/finetuning/test_finetuning_on_intel_hpu.sh b/tests/finetuning/test_finetuning_opea_on_intel_hpu.sh similarity index 99% rename from tests/finetuning/test_finetuning_on_intel_hpu.sh rename to tests/finetuning/test_finetuning_opea_on_intel_hpu.sh index 6db2027102..d76d48bf7c 100644 --- a/tests/finetuning/test_finetuning_on_intel_hpu.sh +++ b/tests/finetuning/test_finetuning_opea_on_intel_hpu.sh @@ -13,7 +13,7 @@ ray_port=8265 function build_docker_images() { cd $WORKPATH echo $(pwd) - docker build -t opea/finetuning-gaudi:latest --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy -f comps/finetuning/Dockerfile.intel_hpu . + docker build -t opea/finetuning-gaudi:latest --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy -f comps/finetuning/src/Dockerfile.intel_hpu . if [ $? -ne 0 ]; then echo "opea/finetuning-gaudi built fail" exit 1