Skip to content

Commit

Permalink
Add files via upload
Browse files Browse the repository at this point in the history
Refactored code to be more compact and efficient
  • Loading branch information
jeffmeloy authored Feb 1, 2024
1 parent 4cfabfb commit 8d5767a
Showing 1 changed file with 62 additions and 127 deletions.
189 changes: 62 additions & 127 deletions get_python_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,28 +30,6 @@
from typing import Dict, List, Tuple


def group_json(input_json: Dict) -> Dict:
"""
Group JSON formatted dictionary by key.
Args:
input_json (Dict): The input JSON formatted dictionary.
Returns:
Dict: The grouped JSON formatted dictionary.
"""
output_json = {"Code Elements": {}}
for key, value in input_json["Code Elements"].items():
if "`" in key:
new_key = f"{key.split()[-1]} {key.split()[0]}"
output_json["Code Elements"].setdefault(new_key, []).append(value)
else:
output_json["Code Elements"].setdefault(key, []).append(value)
output_json["Code Elements"] = {
k: ", ".join(v) for k, v in output_json["Code Elements"].items()
}
output_json["Code Elements"] = dict(sorted(output_json["Code Elements"].items()))
return output_json


def clean_and_get_unique_elements(input_str: str) -> str:
"""
Clean an input string (str) and return a string of unique elements.
Expand Down Expand Up @@ -95,19 +73,16 @@ class DatasetGenerator:
llm (object): The language model for generating responses.
prompt (str): The prompt format for querying the language model.
Methods:
add_to_list(list_to_update: List[Dict], query: str, response: str,
additional_field=None) -> List[Dict]:
Add response to the instruct list.
get_response_from_llm(query: str, context: str) -> str:
Get language model response to query for given context.
process_question(question_type: str, question_id: str, query: str,
context: str, info: Dict) -> None:
Process question and add generated response to the instruct_list.
process_question_type(question_type: str, question_id: str,
question_text: str) -> None:
Process question related to file, function, class, or method.
process_question(question_type: str, question_id: str, query: str, context: str, info: Dict) -> None:
Process question and add the generated response to the instruct_list.
get_info_string(info, item_type) -> str:
Get string from info dictionary.
process_question_type(question_type: str, question_id: str, question_text: str) -> None:
Process questions related to a file, function, class, or method.
generate() -> Tuple[List[Dict], List[Dict]]:
Generate responses for all the questions and return the instruct_list.
Generate responses for all the questions and returns the instruct_list.
"""

def __init__(
Expand Down Expand Up @@ -151,28 +126,6 @@ def __init__(
"method": "classes",
}

def add_to_list(
self,
list_to_update: List[Dict],
query: str,
response: str,
additional_field=None,
) -> List[Dict]:
"""
Add response to the instruct list.
Args:
list_to_update (List[Dict]): The list to update.
query (str): The query to be added.
response (str): The response to be added.
additional_field (Any): Additional field to be added.
Returns:
List[Dict]: The updated list.
"""
list_to_update.append(
{"instruction": query, "input": additional_field, "output": response}
)
return list_to_update

def get_response_from_llm(self, query: str, context: str) -> str:
"""
Get language model response to query for given context.
Expand All @@ -182,7 +135,6 @@ def get_response_from_llm(self, query: str, context: str) -> str:
Returns:
str: The generated response.
"""
# List of dictionaries for Q and A pairs to be used as additional LLM context
excluded_instructions = ["Call code graph", "Docstring"]
code_qa_list = [
{item["instruction"].split(" in Python file:")[0]: item["output"]}
Expand All @@ -192,84 +144,82 @@ def get_response_from_llm(self, query: str, context: str) -> str:
for prefix in excluded_instructions
)
]
code_qa_dict = {
"Code Elements": {
f"{key.split()[-1]} {key.split()[0]}" if "`" in key else key: value
for item in code_qa_list
for key, value in item.items()
}
}

# Manage context length for LLM starting with the longest and most comprehensive
context_strategies = [
lambda: "```python\n{}\n```".format(str(context)),
lambda: "{}".format(str(context)),
lambda: "```python\n{}\n```".format(
str(self.file_details["file_info"]["file_code_simplified"])
),
lambda: "```python\n{}\n```".format(
self.get_string_from_info(
self.file_details["file_info"], "file_summary"
)
self.get_info_string(self.file_details["file_info"], "file_summary")
),
lambda: "",
]

max_context_length = self.model_config["inference_model"]["model_params"][
"context_length"
]

prompt_template = self.model_config["prompt_template"].format(
system_prompt=self.model_config["system_prompt"],
instruction_prompt=self.model_config["instruction_prompt"],
)

for strategy in context_strategies:
context = strategy()
full_context = f"{context}\nCode Elements:\n{code_qa_list}"
full_context = f"{context}\n{code_qa_dict}"
prompt = prompt_template.format(context=full_context, query=query)
context_size = len(self.llm.tokenize(prompt))
print(f"Context size: {context_size}")
logging.info("***Context Size: " + str(context_size))
if context_size <= 0.70 * max_context_length:
break
else:
logging.error(
f"Model response failed, increase py2dataset_model_config.yaml context_length > {math.ceil(context_size/0.70)}"
)
return ""
else:
err_msg = f"Model response failed, increase py2dataset_model_config.yaml context_length > {math.ceil(context_size/0.70)}"
logging.error(err_msg)
return ""

response = ""
try: # get response from LLM
try:
response = re.sub(r"\n\s*\n", "\n\n", self.llm(prompt))
code_elements_combined = {}
for item in code_qa_list:
code_elements_combined.update(item)
code_elements_json = json.dumps(
{"Code Elements": code_elements_combined}, indent=4
)
code_elements_json = json.dumps(
group_json(json.loads(code_elements_json)), indent=4
)
response += "\n" + code_elements_json # Appending the JSON formatted string
response += "\n" + json.dumps(code_qa_dict, indent=4)
logging.info(f"***Overall Response: {response}")
except Exception as error:
logging.error(f"Failed to generate model response: {error}")

if self.detailed: # Get llm response for each code_qa_list item
if (
self.detailed
): # add llm_response for each code object to self.instruct_list "output"
for item in code_qa_list:
instruct_key = list(item.keys())[0]
instruct_value = list(item.values())[0]
query = f"Describe the purpose and significance of these {instruct_key}: [{instruct_value}] within the code."
prompt_template = self.model_config["prompt_template"].format(
system_prompt=self.model_config["system_prompt"],
instruction_prompt=self.model_config["instruction_prompt"],
)
prompt = prompt_template.format(context=context, query=query)
try:
instruct_key = list(item.keys())[0]
instruct_value = list(item.values())[0]
query = f"Describe the purpose and significance of these {instruct_key}: [{instruct_value}] within the code."
prompt = (
self.model_config["prompt_template"]
.format(
system_prompt=self.model_config["system_prompt"],
instruction_prompt=self.model_config["instruction_prompt"],
)
.format(context=full_context, query=query)
)

item_response = re.sub(r"\n\s*\n", "\n\n", self.llm(prompt))
logging.info(f"\n***Itemized Response: {query}\n{item_response}")
for item in self.instruct_list:
if item["instruction"].startswith(instruct_key):
output = f"\n\nPurpose and Significance:\n{item_response}"
item["output"] += output
break

except Exception as error:
logging.error(f"Failed to generate model response: {error}")

# replace the output value in self.instruct_list with the item.key + this_response
for i, instruct_item in enumerate(self.instruct_list):
if instruct_item["instruction"].startswith(list(item.keys())[0]):
instruct_item[
"output"
] = f"{instruct_value}\n\nPurpose and Significance:\n{item_response}"
break

return response

def process_question(
Expand All @@ -281,35 +231,27 @@ def process_question(
question_type (str): The type of question to be processed.
question_id (str): The ID of the question to be processed.
query (str): The query to be processed.
context (str): The context to be used for generating the response.`
context (str): The context to be used for generating the response.
info (Dict): The information of the Python file.
Returns:
None
"""
if question_id.endswith("code_graph") or question_id.endswith("docstring"):
if question_id.endswith(("code_graph", "docstring")):
response = info.get(question_id, {})
elif self.use_llm and question_id.endswith("purpose"):
response = self.get_response_from_llm(query, context)
else:
response = clean_and_get_unique_elements(str(info.get(question_id, "")))
response = str(response).strip()

if question_type == "file":
context = "".join(
[
"```python\n",
str(self.file_details["file_info"]["file_code"]),
"\n```",
]
)
if response and response != "None":
response_str = str(response).strip()
if response_str:
self.instruct_list.append(
{"instruction": query, "input": context, "output": response_str}
)
context = f"```python\n{context}\n```"
self.instruct_list.append(
{"instruction": query, "input": context, "output": response}
)

@staticmethod
def get_string_from_info(info, item_type):
def get_info_string(info: Dict, item_type: str) -> str:
"""
Get string from info dictionary.
Args:
Expand All @@ -318,10 +260,9 @@ def get_string_from_info(info, item_type):
Returns:
str: The string from the info.
"""
if info[item_type]:
items = [item.strip() for item in str(info[item_type]).split(",") if item]
return ", ".join(items)
return ""
return ", ".join(
[item.strip() for item in str(info.get(item_type, "")).split(",") if item]
)

def process_question_type(
self, question_type: str, question_id: str, question_text: str
Expand Down Expand Up @@ -358,21 +299,15 @@ def process_question_type(
context = info[f"{question_type}_code"]
mapping = {f"{question_type}_name": name}
if question_id == f"{question_type}_purpose" and self.use_llm:
variables_string = self.get_string_from_info(
info, f"{question_type}_variables"
)
inputs_string = self.get_string_from_info(
info, f"{question_type}_inputs"
)
combined_string = ", ".join(
[s for s in [variables_string, inputs_string] if s]
)
variables = self.get_info_string(info, f"{question_type}_variables")
inputs = self.get_info_string(info, f"{question_type}_inputs")
combined = ", ".join([s for s in [variables, inputs] if s])
mapping[
f"{question_type}_variables"
] = clean_and_get_unique_elements(combined_string)
] = clean_and_get_unique_elements(combined)

if question_type == "class":
methods_string = self.get_string_from_info(
methods_string = self.get_info_string(
info, f"{question_type}_methods"
)
mapping[f"{question_type}_methods"] = methods_string
Expand Down

0 comments on commit 8d5767a

Please sign in to comment.