diff --git a/.gitignore b/.gitignore
index 022bc04..9c867a4 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,7 +1,13 @@
__pycache__/
.vscode/settings.json
.DS_Store
+.idea/
*.log
navienv/
verification_test
.navi_version
+.navi_history
+config
+qodana.yaml
+memories/
+data/
diff --git a/README.md b/README.md
index 123b9a2..21d3a84 100644
--- a/README.md
+++ b/README.md
@@ -1,6 +1,6 @@
Navi | CLI - An innovation in cybersec AI
- v0.6.5 - "CSI - Con"
+ v0.6.6 - "Specter"
## 🤝 Sponsors / Endorsements: Thank you so much!
@@ -15,14 +15,11 @@
## ✨ **Key Features of Navi v0.6.5**
-- **Upgraded Navi Shell** - The shell can now execute system commands, without breaking the flow of conversation. See more below!
-- **Navi chips Upgrade** - The new alias variable within the custom scripts allow for Navi to execute scripts right from the chat. Once again not disrupting the flow.
-- **Chip Creators Guide** - We are in the process of streamlining documentation on making custom chips.
-- **Navi Nmap Chip** - We moved the Nmap Script over to being its own chip.
+- **Navi3b** - The first iteration of our local AI system. Currently running Llama3.2-3b with ollama in the background.
+- **Streamlined 3b install** - The install process is included the first time you launch navi (from edge currently) `navi --edge`
+- **Navi chips Upgrade** - The chips system has been upgraded yet again with some qol, as well as a chip creator + template
+- **Chip Creators Guide** - The chip documentation has been updated to reflect recent changes to the sytem.
- **Wiki Re-write** - With new power comes new documentation
-- **Llama3.2 Integration** - We are running Meta's Llama AI on the backend.
-- **[Navi Mind](https://github.com/SaintsSec/Navi-Mind)** - Planning phase of the first Navi AI Model we are calling *Navi Mind*.
-- **[Navi Public Training](https://github.com/SaintsSec/Navi-Training)** - We want to be transparent about the data going into Navi. So we are building out a whole repo just for that.
### **Work In Progress**
diff --git a/chips/Com/Community-Chips b/chips/Com/Community-Chips
new file mode 100644
index 0000000..e69de29
diff --git a/chips/Dev/Developer-Chips b/chips/Dev/Developer-Chips
new file mode 100644
index 0000000..e69de29
diff --git a/chips/SSG/SSG-Chips b/chips/SSG/SSG-Chips
new file mode 100644
index 0000000..e69de29
diff --git a/chips/SSG/chip-template.txt b/chips/SSG/chip-template.txt
new file mode 100644
index 0000000..0c9cc27
--- /dev/null
+++ b/chips/SSG/chip-template.txt
@@ -0,0 +1,50 @@
+import navi_internal
+
+# Chip documentation: https://github.com/SaintsSec/Navi/wiki/4.-Developing-Chips-%E2%80%90-Indepth
+
+command: str = "{{CHIP_NAME}}"
+use: str = "What does this chip do?"
+aliases: list = ['{{CHIP_NAME}}']
+params: dict = {
+ '-help': 'Display help information',
+ '-h': 'Display help information',
+}
+
+help_params: tuple = ('-help', '-h')
+
+
+def print_params() -> None:
+ # Print the header
+ print(f"{'Parameter':<10} | {'Description'}")
+ print("-" * 40)
+
+ # Print each dictionary item
+ for param, description in params.items():
+ print(f"{param:<10} | {description}")
+
+# What Navi calls to run this Chip
+def run(arguments=None) -> None:
+ # Get the instance of Navi. Required to access Navi-specific functions
+ navi_instance = navi_internal.navi_instance
+ navi_instance.print_message(f"How can I help you, {navi_instance.get_user()}?")
+
+ # To get a dictionary of current navi settings, and modify dictionary. Some modifications
+ # might require app restart. To prevent circular imports, keep this in a function or class.
+ #from navi_shell import get_navi_settings, modify_navi_settings
+
+ # Optional: Converts argument tokens into a list
+ arg_array = arguments.text.split()
+
+ # Remove the command itself
+ arg_array.pop(0)
+
+ # Optional: Check for parameters
+ if arg_array is not None:
+ for arg in arg_array:
+ match arg:
+ case x if x in help_params:
+ print_params()
+ return
+ case _:
+ navi_instance.print_message(f"Invalid parameter: {arg}")
+ return
diff --git a/chips/SSG/navi_chip_creator.py b/chips/SSG/navi_chip_creator.py
new file mode 100644
index 0000000..426c72f
--- /dev/null
+++ b/chips/SSG/navi_chip_creator.py
@@ -0,0 +1,178 @@
+import os
+import re
+import subprocess # nosec
+import sys
+import webbrowser
+
+from colorama import Fore
+
+import navi_internal
+from navi_shell import restart_navi
+
+command: str = "chip-create"
+use: str = "Creates a new Navi chip"
+aliases: list = ['chip-create', 'cc']
+params: dict[str, str] = {
+ '-help': 'Display help information',
+ '-h': 'Display help information',
+}
+
+help_params: tuple = ('-help', '-h')
+template_file: str = "chips/SSG/chip-template.txt"
+chip_install_path: str = "chips/Dev/"
+chip_documentation_link: str = "https://github.com/SaintsSec/Navi/wiki/4.-Developing-Chips-%E2%80%90-Indepth"
+
+
+def print_params() -> None:
+ """Prints the available parameters and their descriptions."""
+ print(f"{'Parameter':<10} | {'Description'}")
+ print("-" * 40)
+ for param, description in params.items():
+ print(f"{param:<10} | {description}")
+
+
+def get_user_input(prompt):
+ """Gets input from the user."""
+ return input(prompt).strip()
+
+
+def confirm_details(chip_name, chip_file_name, navi_instance):
+ """Asks the user to confirm the chip details or make changes."""
+ while True:
+ navi_instance.print_message(f"Perfect! Here's a recap:"
+ f"\nChip Name: {chip_name}"
+ f"\nPython File Name: {chip_file_name}.py\n")
+ choice = get_user_input("Are you ready to proceed or do you want to make changes? "
+ f"({Fore.YELLOW}c{Fore.RESET})ontinue or ({Fore.YELLOW}m{Fore.RESET})ake changes: ").lower()
+ if choice == 'c':
+ return chip_name, chip_file_name
+ elif choice == 'm':
+ return make_changes(chip_name, chip_file_name, navi_instance)
+ else:
+ navi_instance.print_message("Invalid input. Please choose 'c' to continue or 'm' to make changes.")
+
+
+def make_changes(chip_name, chip_file_name, navi_instance):
+ """Allows the user to change the chip name or file name."""
+ while True:
+ change_choice = get_user_input(f"What do you want to change? ({Fore.YELLOW}chip{Fore.RESET}) name or "
+ f"({Fore.YELLOW}file{Fore.RESET}) name: ").lower()
+ if change_choice == 'chip':
+ navi_instance.print_message("Let's update the Chip Name.")
+ chip_name = get_user_input("New Chip Name: ")
+ return confirm_details(chip_name, chip_file_name, navi_instance)
+ elif change_choice == 'file':
+ navi_instance.print_message("Let's update the File Name.")
+ chip_file_name = get_user_input("New File Name: ")
+ return confirm_details(chip_name, chip_file_name, navi_instance)
+ else:
+ navi_instance.print_message("Invalid choice. Please enter 'chip' or 'file'.")
+
+
+def create_chip_file(chip_name, chip_file_name):
+ """Creates the chip file from a template."""
+ if not os.path.exists(template_file):
+ print(f"{Fore.RED}ERROR:{Fore.RESET} Template file '{template_file}' is missing. Aborting.")
+ return None
+
+ with open(template_file, "r") as template:
+ template_content = template.read()
+
+ chip_file_name_final = chip_file_name.replace(".py", "") + ".py"
+ template_content = template_content.replace("{{CHIP_NAME}}", chip_name)
+ chip_file_path = os.path.join(os.getcwd(), chip_install_path, chip_file_name_final)
+
+ with open(chip_file_path, "w") as chip_dev:
+ chip_dev.write(template_content)
+
+ print(f"{Fore.GREEN}Chip '{chip_name}' created successfully!{Fore.RESET}")
+ return chip_file_path
+
+
+def post_creation_options(chip_file_path, navi_instance):
+ """Provides options to the user after chip creation."""
+ navi_instance.print_message(f"Here are some options for you:\n"
+ f"{Fore.YELLOW}1{Fore.RESET}: Open directory of Chip location\n"
+ f"{Fore.YELLOW}2{Fore.RESET}: Open in your preferred code editor\n"
+ f"{Fore.YELLOW}3{Fore.RESET}: Reload Navi to load the new Chip\n"
+ f"{Fore.YELLOW}4{Fore.RESET}: Review the documentation online\n"
+ f"{Fore.YELLOW}5{Fore.RESET} or other value: I'm finished")
+ choice = get_user_input("Please enter 1, 2, 3, 4, or 5: ")
+ try:
+ if choice == '1':
+ print("Opening directory of Chip location")
+ directory = os.path.dirname(chip_file_path)
+ if sys.platform.startswith('win'):
+ subprocess.run(['explorer', directory], check=True) # nosec
+ elif sys.platform == 'darwin':
+ subprocess.run(['open', directory], check=True) # nosec
+ else:
+ subprocess.run(['xdg-open', directory], check=True) # nosec
+ elif choice == '2':
+ print("Opening in your preferred code editor")
+ if sys.platform.startswith('win'):
+ subprocess.run(['explorer', chip_file_path], check=True) # nosec
+ elif sys.platform == 'darwin':
+ subprocess.run(['open', chip_file_path], check=True) # nosec
+ else:
+ subprocess.run(['xdg-open', chip_file_path], check=True) # nosec
+ elif choice == '3':
+ restart_navi()
+ elif choice == '4':
+ webbrowser.open(chip_documentation_link)
+ else:
+ pass
+ except FileNotFoundError:
+ print("Couldn't find the file or directory.")
+ except subprocess.SubprocessError as e:
+ print(f"Failed to execute the command: {e}")
+ except Exception as e:
+ print(f"An unexpected error occurred: {e}")
+
+
+def sanitize_input(name):
+ """
+ Sanitizes the user-provided chip name.
+ Ensures it is safe and valid for use in file paths.
+ """
+ sanitized_name = name.strip()
+ if len(name) > 50:
+ print(f"{Fore.RED}Warning:{Fore.RESET} Input is too long (50 characters max). Trimming to 49 characters.")
+ sanitized_name = sanitized_name[:50]
+
+ # Replace invalid characters with underscores
+ old_name = sanitized_name
+ sanitized_name = re.sub(r'[^\w\-]', '_', name)
+ if old_name != sanitized_name:
+ print(f"{Fore.RED}Warning:{Fore.RESET} Invalid characters in '{old_name}'. Replacing with '{sanitized_name}'.")
+
+ return sanitized_name
+
+
+def run(arguments=None):
+ navi_instance = navi_internal.navi_instance
+ arg_array = arguments.text.split()[1:] # Exclude the command itself
+
+ if arg_array:
+ for arg in arg_array:
+ if arg in help_params:
+ print_params()
+ return
+ else:
+ choice = get_user_input(f"Invalid parameter: {arg}\n"
+ f"Do you want to review the available parameters? (y/n): ").lower()
+ if choice == 'y':
+ print_params()
+ return
+
+ navi_instance.print_message(
+ f"Welcome to Navi Chip creator, {navi_instance.get_user()}. Please enter the name of your new Chip.")
+ chip_name = sanitize_input(get_user_input("Chip Name: "))
+ navi_instance.print_message(
+ f"Great name, {navi_instance.get_user()}! What do you want the python file to be called?")
+ chip_file_name = sanitize_input(get_user_input("Chip File Name: "))
+
+ chip_name, chip_file_name = confirm_details(chip_name, chip_file_name, navi_instance)
+ chip_file_path = create_chip_file(chip_name, chip_file_name)
+ if chip_file_path:
+ post_creation_options(chip_file_path, navi_instance)
diff --git a/commands/navi_chip_installer.py b/chips/SSG/navi_chip_installer.py
similarity index 90%
rename from commands/navi_chip_installer.py
rename to chips/SSG/navi_chip_installer.py
index 98a8823..49d7956 100644
--- a/commands/navi_chip_installer.py
+++ b/chips/SSG/navi_chip_installer.py
@@ -1,13 +1,14 @@
import os
-import sys
-import requests
-import zipfile
import shutil
import subprocess # nosec
+import sys
import uuid
-import navi_internal
+import zipfile
+import requests
from colorama import Fore
+
+import navi_internal
from navi import get_parameters
from navi_shell import restart_navi
@@ -83,7 +84,7 @@ def download_and_extract(download_url):
return None, None
-def copy_files_to_install_path(extracted_dir, install_path="/commands"):
+def copy_files_to_install_path(extracted_dir, install_path="/chips/Com"):
installed_files = []
try:
for item in os.listdir(extracted_dir):
@@ -114,7 +115,7 @@ def install_requirements(extracted_dir):
os.remove(requirements_path)
-def update_script(download_url, install_path="commands"):
+def update_script(download_url, install_path="chips/Com"):
print("Downloading chip...")
extracted_dir, download_guid = download_and_extract(download_url)
if extracted_dir:
@@ -236,21 +237,39 @@ def get_installed_chips() -> list[dict[str, str]] | None:
return modules
+def get_dev_chips():
+ import chips
+ dev_chips = []
+ for command_name, module in chips.modules.items():
+ if module.__name__.startswith("chips.Dev"):
+ command_aliases = getattr(module, 'aliases', [])
+ command_use = getattr(module, 'use', "")
+ dev_chips.append((command_name, command_use, command_aliases))
+ return dev_chips
+
+
def list_installed_chips() -> None:
chips = get_installed_chips()
if chips:
- print("Installed Chips:")
+ print("Installed Community Chips:")
for module in chips:
print(f"- {module['name']} (Owner: {module['owner']}, Version: {module['version']})")
else:
- print("No chips are installed.")
+ print("No Community Chips are installed.")
+ dev_chips = get_dev_chips()
+ if not dev_chips:
+ print("No Developer Chips are installed.")
+ else:
+ print("\nDeveloper Chips:")
+ for chip in dev_chips:
+ print(f"- {chip[0]} (Use: {chip[1]}, Aliases: {chip[2]})")
def about_chip(name) -> dict[str, str] | None:
log_file_path = "installed_chips.txt"
if not os.path.exists(log_file_path):
- print("No chips are installed.")
+ print("No Community Chips are installed.")
return None
with open(log_file_path, 'r') as log_file:
@@ -290,7 +309,7 @@ def about_chip(name) -> dict[str, str] | None:
"latest_version": latest_version
}
- print(f"The chip '{name}' is not installed.")
+ print(f"The Community Chip '{name}' is not installed.")
return None
@@ -310,9 +329,9 @@ def update_chip(chip_name: str) -> None:
def help_text() -> None:
navi.print_message("Chip Manager\n"
- "chips [install | uninstall | search | update] [app/query]\n\n"
- "List currently installed chips\n"
- "chips list")
+ "chips [install | uninstall | search | update] [app/query]\n\n"
+ "List currently installed chips\n"
+ "chips list")
def run(arguments=None) -> None:
diff --git a/commands/navi_clear.py b/chips/SSG/navi_clear.py
similarity index 94%
rename from commands/navi_clear.py
rename to chips/SSG/navi_clear.py
index e934f42..07ef037 100644
--- a/commands/navi_clear.py
+++ b/chips/SSG/navi_clear.py
@@ -1,4 +1,3 @@
-#!/bin/python3
import navi_internal
command = "clear"
diff --git a/commands/navi_exit.py b/chips/SSG/navi_exit.py
similarity index 94%
rename from commands/navi_exit.py
rename to chips/SSG/navi_exit.py
index cdece7f..37bfce3 100644
--- a/commands/navi_exit.py
+++ b/chips/SSG/navi_exit.py
@@ -1,4 +1,3 @@
-#!/bin/python3
import navi_internal
command = "exit"
diff --git a/commands/navi_help.py b/chips/SSG/navi_help.py
similarity index 95%
rename from commands/navi_help.py
rename to chips/SSG/navi_help.py
index 20dabea..f55a029 100644
--- a/commands/navi_help.py
+++ b/chips/SSG/navi_help.py
@@ -1,5 +1,5 @@
#!/bin/python3
-import commands
+import chips
command = "navi_help"
use = "Displays the help screen"
@@ -12,7 +12,7 @@ def run(arguments=None):
max_alias_length = 0
command_data = []
- for command_name, module in commands.modules.items():
+ for command_name, module in chips.modules.items():
command_aliases = getattr(module, 'aliases', [])
command_use = getattr(module, 'use', "")
diff --git a/chips/SSG/navi_memories.py b/chips/SSG/navi_memories.py
new file mode 100644
index 0000000..b352dd8
--- /dev/null
+++ b/chips/SSG/navi_memories.py
@@ -0,0 +1,82 @@
+import os
+
+import navi_internal
+
+command: str = "memory"
+use: str = "Manage chat memory sessions."
+aliases: list = ['session', 'memory']
+params: dict = {
+ 'list': 'List all available sessions and indicate the active one',
+ 'create ': 'Create a new session with the specified name',
+ 'remove ': 'Remove a session with the specified name',
+ 'set-default ': 'Set the default session',
+ 'set-active ': 'Set the active session',
+ '-help': 'Display help information',
+ '-h': 'Display help information',
+}
+
+memory_dir = "memories"
+
+# Ensure the memory directory exists
+if not os.path.exists(memory_dir):
+ os.makedirs(memory_dir)
+
+help_params: tuple = ('-help', '-h')
+
+
+def print_params() -> None:
+ """Print available parameters and descriptions."""
+ print(f"{'Parameter':<20} | {'Description'}")
+ print("-" * 50)
+ for param, description in params.items():
+ print(f"{param:<20} | {description}")
+
+
+def list_sessions(active_session) -> None:
+ sessions = os.listdir(memory_dir)
+ print("Available Sessions:")
+ for session in sessions:
+ session_name = os.path.splitext(session)[0]
+ if session_name == active_session:
+ print(f"* {session_name} (Active)")
+ else:
+ print(f" {session_name}")
+
+
+def does_session_exist(session_name) -> bool:
+ if not os.path.exists(os.path.join(memory_dir, f"{session_name}.json")):
+ print(f"Session '{session_name}' does not exist.")
+ return False
+ return True
+
+
+def run(arguments=None) -> None:
+ navi_instance = navi_internal.navi_instance
+ active_session = navi_instance.get_active_session()
+
+ arg_array = arguments.text.split()
+ arg_array.pop(0) # Remove the command itself
+
+ if not arg_array:
+ navi_instance.print_message("No arguments provided. Use '-help' for more information.")
+ return
+
+ match arg_array:
+ case ['list']:
+ list_sessions(active_session)
+ case ['create', session_name]:
+ navi_instance.create_new_session(session_name)
+ case ['remove', session_name]:
+ if does_session_exist:
+ navi_instance.remove_session(session_name.upper())
+ case ['set-default', session_name]:
+ if does_session_exist:
+ from navi_shell import modify_navi_settings
+ modify_navi_settings("session", session_name.upper())
+ case ['set-active', session_name]:
+ if does_session_exist:
+ navi_instance.set_active_session(session_name.upper())
+ case x if x[0] in help_params:
+ print_params()
+ case _:
+ navi_instance.print_message("Invalid arguments. Use '-help' for more information.")
diff --git a/chips/SSG/navi_settings.py b/chips/SSG/navi_settings.py
new file mode 100644
index 0000000..bbd39bc
--- /dev/null
+++ b/chips/SSG/navi_settings.py
@@ -0,0 +1,127 @@
+import os
+
+import navi_internal
+from navi_shell import restart_navi
+
+command = "settings"
+use = "Review and modify the Navi settings"
+aliases = ['--settings']
+
+script_dir = os.path.dirname(os.path.abspath(__file__))
+config_path = os.path.abspath(os.path.join(script_dir, "..", "..", "config"))
+default_config_path = os.path.abspath(os.path.join(script_dir, "..", "..", "default_config"))
+
+
+def create_config(default_values):
+ if not os.path.exists(config_path):
+ with open(config_path, "w") as file:
+ for key, value in default_values.items():
+ file.write(f"{key}={value}\n")
+
+
+def modify_config(key, value):
+ if not os.path.exists(config_path):
+ print(f"Unable to modify file at {config_path}.")
+ return
+
+ with open(config_path, "r") as file:
+ lines = file.readlines()
+
+ modified = False
+ with open(config_path, "w") as file:
+ for line in lines:
+ if line.startswith("#") or "=" not in line.strip():
+ file.write(line)
+ continue
+
+ current_key, _ = line.strip().split("=", 1)
+ if current_key == key:
+ file.write(f"{key}={value}\n")
+ modified = True
+ else:
+ file.write(line)
+
+ # Add the key-value pair if it doesn't exist
+ if not modified:
+ file.write(f"{key}={value}\n")
+
+
+def read_config(path_to_config):
+ if not os.path.exists(path_to_config):
+ print(f"Config file not found at {path_to_config}.")
+ return {}
+
+ config = {}
+ with open(path_to_config, "r") as file:
+ for line in file:
+ if line.startswith("#") or not line.strip():
+ continue
+
+ if "=" in line:
+ key, value = line.strip().split("=", 1)
+ value = value.split("#", 1)[0].strip()
+ if value.lower() == "true":
+ value = True
+ elif value.lower() == "false":
+ value = False
+ config[key.strip()] = value
+ return config
+
+
+def settings_init():
+ import getpass
+
+ default_config = read_config(default_config_path)
+
+ # Check if the config file exists
+ if os.path.exists(config_path):
+ # Read the existing config
+ user_config = read_config(config_path)
+ # Add any missing keys from the default config
+ for key, default_value in default_config.items():
+ if key not in user_config:
+ print(f"Adding missing key: {key} with default value: {default_value}")
+ user_config[key] = default_value
+ modify_config(key, default_value)
+ return user_config
+ else:
+ # If the config file doesn't exist, create it using defaults
+ print("Config file not found. Creating a new one with default settings.")
+ create_config(default_config)
+ modify_config("username", getpass.getuser())
+
+ return read_config(config_path)
+
+
+def run(arguments=None):
+ navi_instance = navi_internal.navi_instance
+
+ current_settings = settings_init()
+
+ while True:
+ navi_instance.clear_terminal()
+ print("\nCurrent Settings:")
+ for key, value in current_settings.items():
+ print(f" {key}: {value}")
+
+ print("\nOptions:")
+ print(" [1] Update a setting")
+ print(" [2] Exit")
+
+ choice = input("\nEnter your choice: ").strip()
+
+ if choice == "1":
+ key_to_modify = input("Enter the setting key to update: ").strip()
+ if key_to_modify in current_settings:
+ new_value = input(
+ f"Enter the new value for '{key_to_modify}' (current: {current_settings[key_to_modify]}): ").strip()
+ modify_config(key_to_modify, new_value)
+ print(f"'{key_to_modify}' updated successfully!")
+ current_settings = settings_init()
+ else:
+ print(f"Error: '{key_to_modify}' is not a valid setting.")
+ elif choice == "2":
+ restart_navi()
+ break
+ else:
+ print("Invalid choice. Please try again.")
diff --git a/commands/navi_spec.py b/chips/SSG/navi_spec.py
similarity index 99%
rename from commands/navi_spec.py
rename to chips/SSG/navi_spec.py
index 9bd8482..4fc851b 100755
--- a/commands/navi_spec.py
+++ b/chips/SSG/navi_spec.py
@@ -1,10 +1,12 @@
#!/bin/python3
import platform
+import socket
+from datetime import datetime
+
import psutil
import requests
-import socket
+
import navi_internal
-from datetime import datetime
# Navi Command System Variables
command = "navi_specs"
@@ -78,7 +80,8 @@ def run(arguments=None):
# Main functions
response_message = navi_instance.llm_chat(
- "Give me a really simple quip about getting my systems specs. Dont include commands or references to operating systems.", True)
+ "Give me a really simple quip about getting my systems specs. Dont include commands or references to operating systems.",
+ True)
clean_text = str(response_message).replace("(", "").replace(")", "").replace(", 200", "").replace("\"", "").replace(
"\\n", "")
output = clean_text + "\n"
diff --git a/commands/navi_system.py b/chips/SSG/navi_system.py
similarity index 88%
rename from commands/navi_system.py
rename to chips/SSG/navi_system.py
index 57f1662..e16ea84 100644
--- a/commands/navi_system.py
+++ b/chips/SSG/navi_system.py
@@ -1,7 +1,8 @@
#!/bin/python3
+import shlex
import subprocess # nosec
+
import navi_internal
-import shlex
from navi import get_command_path
command = "navi_sys"
@@ -13,7 +14,8 @@ def run(arguments=None):
navi_command = str(arguments).replace("TERMINAL OUTPUT", "", 1).strip()
base_command = navi_command.split()[0]
if get_command_path(base_command) is not None:
- navi_instance.print_message(f"\nDo I have your permission to use your **shell** to execute the following: \n\n{navi_command}\n")
+ navi_instance.print_message(
+ f"\nDo I have your permission to use your **shell** to execute the following: \n\n{navi_command}\n")
user_input = input("Do you want me to continue (y/n): ").strip().lower()
if user_input == 'y':
result = subprocess.run(
diff --git a/chips/SSG/navi_token_management.py b/chips/SSG/navi_token_management.py
new file mode 100644
index 0000000..cd225ca
--- /dev/null
+++ b/chips/SSG/navi_token_management.py
@@ -0,0 +1,112 @@
+import navi_internal
+
+token_limit_max: int = 4096
+navi_settings: dict = {}
+
+command: str = "token-config"
+use: str = "Adjust token limits for RAG and chat memory."
+aliases: list = ['set_tokens', 'token_limits']
+params: dict = {
+ '-rag': 'Set the token limit for RAG (e.g., -rag 1024)',
+ '-chat': 'Set the token limit for chat memory (e.g., -chat 3072)',
+ '-show': 'Display the current token limit partition.',
+ '-help': 'Display help information.',
+}
+
+help_params: tuple = ('-help', '-h')
+
+
+def print_params() -> None:
+ print(f"{'Parameter':<10} | {'Description'}")
+ print("-" * 40)
+
+ for param, description in params.items():
+ print(f"{param:<10} | {description}")
+
+
+def set_token_limit(rag: int = None, chat: int = None) -> str:
+ global navi_settings
+ token_limit_rag = int(navi_settings["token_limit_rag"])
+ token_limit_chat = int(navi_settings["token_limit_chat"])
+
+ # Calculate new limits if provided
+ new_rag = token_limit_rag if rag is None else rag
+ new_chat = token_limit_chat if chat is None else chat
+
+ # Validate the total allocation
+ if new_rag + new_chat > token_limit_max:
+ return f"Error: Total allocation exceeds the maximum token limit of {token_limit_max}."
+
+ from navi_shell import modify_navi_settings, get_navi_settings
+ modify_navi_settings("token_limit_rag", new_rag)
+ modify_navi_settings("token_limit_chat", new_chat)
+
+ # Refresh the navi_settings dictionary
+ navi_settings = get_navi_settings()
+
+ return f"Token limits updated. RAG: {new_rag}, Chat: {new_chat}"
+
+
+def show_token_limits() -> str:
+ rag = int(navi_settings["token_limit_rag"])
+ chat = int(navi_settings["token_limit_chat"])
+ return f"Current Token Limits:\n- RAG: {rag}\n- Chat: {chat}\n- Total: {rag + chat} (Max: {token_limit_max})"
+
+
+def run(arguments=None) -> None:
+ navi_instance = navi_internal.navi_instance
+ global token_limit_max
+ token_limit_max = navi_instance.get_max_token_limit()
+ arg_array = arguments.text.split()
+
+ arg_array.pop(0)
+
+ from navi_shell import get_navi_settings
+ global navi_settings
+ navi_settings = get_navi_settings()
+
+ # Parse parameters
+ if arg_array:
+ rag_limit = None
+ chat_limit = None
+ show_only = False
+
+ for i in range(len(arg_array)):
+ arg = arg_array[i]
+
+ match arg:
+ case '-rag':
+ try:
+ # Fetch the next value and skip it in the loop
+ rag_limit = int(arg_array[i + 1])
+ arg_array[i + 1] = None # Mark as processed
+ except (IndexError, ValueError):
+ navi_instance.print_message("Error: Invalid value for -rag.")
+ return
+ case '-chat':
+ try:
+ # Fetch the next value and skip it in the loop
+ chat_limit = int(arg_array[i + 1])
+ arg_array[i + 1] = None # Mark as processed
+ except (IndexError, ValueError):
+ navi_instance.print_message("Error: Invalid value for -chat.")
+ return
+ case '-show':
+ show_only = True
+ case x if x in help_params:
+ print_params()
+ return
+ case _:
+ # Skip any previously processed value
+ if arg is not None:
+ navi_instance.print_message(f"Invalid parameter: {arg}")
+ return
+
+ if show_only:
+ navi_instance.print_message(show_token_limits())
+ return
+
+ result = set_token_limit(rag=rag_limit, chat=chat_limit)
+ navi_instance.print_message(result)
+ else:
+ print_params()
diff --git a/commands/__init__.py b/chips/__init__.py
similarity index 79%
rename from commands/__init__.py
rename to chips/__init__.py
index f5fc428..98d93e7 100644
--- a/commands/__init__.py
+++ b/chips/__init__.py
@@ -1,10 +1,12 @@
"""Init commands."""
-from os.path import dirname, basename, isfile, join, exists
-import os
-import sys
-import glob
import importlib
import logging
+import os
+import sys
+from os.path import dirname, join, exists
+from pathlib import Path
+
+package_dir = Path(__file__).parent
# Add the parent directory to the Python path
sys.path.append(dirname(dirname(__file__)))
@@ -23,7 +25,7 @@
def load_module(name):
"""Load module with the given name."""
try:
- module = importlib.import_module(f".{name}", 'commands')
+ module = importlib.import_module(f".{name}", 'chips')
except ModuleNotFoundError as e:
print(f"Module '{name}' not found: {e}")
logging.basicConfig(
@@ -40,11 +42,17 @@ def load_module(name):
return module
-# List all .py files in the current directory, excluding __init__.py
+def module_name_from_path(f):
+ rel_path = f.relative_to(package_dir).with_suffix('')
+ module_name = '.'.join(rel_path.parts)
+ return module_name
+
+
+# List all .py files in the current directory and subdirectories, excluding __init__.py
__all__ = [
- basename(f)[:-3]
- for f in glob.glob(join(dirname(__file__), "*.py"))
- if isfile(f) and not f.endswith('__init__.py') and dirname(f) == dirname(__file__)
+ module_name_from_path(f)
+ for f in package_dir.rglob('*.py')
+ if f.is_file() and f.name != '__init__.py'
]
# Load each module found in __all__
diff --git a/config.py b/config.py
index 97d03ab..d835099 100644
--- a/config.py
+++ b/config.py
@@ -1,3 +1,3 @@
-server = "labs.saintssec.org"
+remote = "labs.saintssec.org"
local = "localhost"
port = 11434
diff --git a/default_config b/default_config
new file mode 100644
index 0000000..e9c7de2
--- /dev/null
+++ b/default_config
@@ -0,0 +1,9 @@
+username=default_user
+navi_name=Navi
+use_local_model=True
+dont_check_for_updates=False
+update_branch=main
+session=DEFAULT_SESSION
+overwrite_session=True
+token_limit_rag=2048
+token_limit_chat=2048
\ No newline at end of file
diff --git a/install/__init__.py b/install/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/install/install.sh b/install/install.sh
index d50bb8f..271bf00 100755
--- a/install/install.sh
+++ b/install/install.sh
@@ -83,7 +83,7 @@ copy_navi() {
cleanup_install_directory() {
cd /opt/Navi || exit 1
- declare -a files_to_remove=("install" "README.md" ".git" ".gitignore")
+ declare -a files_to_remove=("README.md" ".git" ".gitignore")
for item in "${files_to_remove[@]}"; do
if [ -e "$item" ]; then
diff --git a/install/local_model.py b/install/local_model.py
new file mode 100644
index 0000000..a92dec6
--- /dev/null
+++ b/install/local_model.py
@@ -0,0 +1,242 @@
+import os
+import platform
+import shutil
+import subprocess # nosec
+import tempfile
+from getpass import getpass
+from typing import Tuple
+
+
+def install_ollama() -> bool:
+ try:
+ if platform.system() in ["Linux"]:
+ print("Installing Ollama for Linux...")
+ curl_process = subprocess.run(
+ ["curl", "-fsSL", "https://ollama.com/install.sh"],
+ check=True,
+ capture_output=True,
+ text=True,
+ )
+
+ subprocess.run(
+ ["sh"],
+ input=curl_process.stdout,
+ check=True,
+ capture_output=True,
+ text=True,
+ )
+ elif platform.system() in ["Darwin"]:
+ print("Installing Ollama for macOS...")
+ print("Checking for Homebrew...")
+ brew_check = subprocess.run(["brew", "--version"], capture_output=True, text=True)
+ if brew_check.returncode != 0:
+ print("Homebrew is not installed. Please install Homebrew first:")
+ print("Visit: https://brew.sh/")
+ return False
+
+ print("Installing Ollama via Homebrew...")
+ subprocess.run(["brew", "install", "ollama"], check=True)
+ print("Ollama installed successfully on macOS!")
+ elif platform.system() == "Windows":
+ print("Installing Ollama for Windows...")
+
+ # Download the installer
+ subprocess.run(
+ ["powershell", "-Command",
+ "Invoke-WebRequest", "-Uri", "https://ollama.com/download/OllamaSetup.exe",
+ "-OutFile", "ollama_installer.exe"],
+ check=True,
+ capture_output=True,
+ text=True,
+ )
+
+ # Run the installer
+ subprocess.run(
+ ["powershell", "-Command",
+ "Start-Process", "-FilePath", "./ollama_installer.exe", "-Wait"],
+ check=True,
+ capture_output=True,
+ text=True,
+ )
+
+ # Prompt the user to continue. Really tried to wait for installer but it kept detaching
+ print("Please complete the Ollama installation. Once done, type 'c' and press Enter to continue.")
+ while input("Type 'c' to continue: ").strip().lower() != 'c':
+ print("Invalid input. Please type 'c' to continue.")
+ else:
+ print("Unsupported platform for Ollama installation.")
+ return False
+ if not ollama_installed():
+ print("Oh dear, something went wrong installing Ollama.")
+ return False
+ print("Ollama installation succeeded!")
+ return True
+ except subprocess.CalledProcessError as e:
+ print("Ollama installation failed!")
+ print("Return code:", e.returncode)
+ print("stdout:", e.stdout)
+ print("stderr:", e.stderr)
+ return False
+ finally:
+ if platform.system() == "Windows" and os.path.exists("ollama_installer.exe"):
+ try:
+ os.remove("ollama_installer.exe")
+ except Exception as e:
+ print(f"Failed to remove installer: {e}")
+
+
+def ollama_installed() -> bool:
+ try:
+ subprocess.run(
+ ["ollama", "--version"],
+ check=True,
+ capture_output=True,
+ text=True,
+ )
+ return True
+ except subprocess.CalledProcessError:
+ return False
+ except FileNotFoundError:
+ return False
+
+
+def is_ollama_service_running() -> bool:
+ try:
+ process = subprocess.run(
+ ["ollama", "--version"],
+ capture_output=True,
+ text=True,
+ check=True
+ )
+ # If no warning is present, the service is running. Ollama doesn't have a proper check...
+ if "Warning: could not connect to a running Ollama instance" in process.stderr:
+ return False
+ return True
+ except subprocess.CalledProcessError as e:
+ print("Error running 'ollama --version':", e.stderr)
+ return False
+
+
+def start_ollama_service():
+ try:
+ # Start Ollama serve in the background
+ process = subprocess.Popen(
+ ["ollama", "serve"],
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ )
+ print(f"Ollama service started with PID: {process.pid}")
+ return process
+ except Exception as e:
+ print(f"Failed to start Ollama service: {e}")
+ return None
+
+
+def check_and_start_ollama_service():
+ if is_ollama_service_running():
+ print("Ollama service is already running.")
+ else:
+ print("Ollama service is not running. Attempting to start it...")
+ start_ollama_service()
+
+
+def check_model_installed() -> Tuple[bool, bool]:
+ try:
+ # Run the command to check if the model exists
+ subprocess.run(
+ ["ollama", "show", "navi-cli"],
+ check=True,
+ capture_output=True,
+ text=True,
+ )
+ return True, False # Model is installed, no unexpected error
+ except subprocess.CalledProcessError as e:
+ if "Error: model 'navi-cli' not found" in e.stderr:
+ return False, False # Model not installed, no unexpected error
+ else:
+ print("An unexpected error occurred.")
+ print("stderr:", e.stderr)
+ return False, True # Model not installed, unexpected error
+
+
+def install_model():
+ import platform
+ is_windows = platform.system() == "Windows"
+ temp_script_path = None
+ script_dir = os.path.dirname(os.path.abspath(__file__))
+ repo_dir = os.path.join(script_dir, "navi3b")
+ if not os.path.exists(repo_dir):
+ os.mkdir(repo_dir)
+ else:
+ shutil.rmtree(repo_dir, ignore_errors=True)
+ os.mkdir(repo_dir)
+
+ if not ollama_installed():
+ if not install_ollama():
+ print("Failed to install Ollama. Visit https://ollama.com and install Ollama manually.")
+ return
+
+ if not is_windows:
+ # Get sudo password for macOS/Linux
+ password = getpass("Enter your sudo password: ")
+
+ # Create a temporary sudo-askpass script
+ with tempfile.NamedTemporaryFile("w", delete=False) as temp_script:
+ temp_script.write(f"#!/bin/bash\necho {password}\n")
+ temp_script_path = temp_script.name
+
+ try:
+ print("Cloning model from github.com/saintssec/navi3b...")
+ subprocess.run(
+ ["git", "clone", "https://github.com/saintssec/navi3b", repo_dir],
+ check=True,
+ capture_output=True,
+ text=True,
+ )
+ print("Git clone succeeded!")
+ except subprocess.CalledProcessError as e:
+ print("Git clone failed!")
+ print("Return code:", e.returncode)
+ print("stdout:", e.stdout)
+ print("stderr:", e.stderr)
+ shutil.rmtree(repo_dir, ignore_errors=True)
+ return
+
+ print("Verifying model...")
+ model_file_path = os.path.join(repo_dir, "navi3b.modelfile")
+ if not os.path.exists(model_file_path):
+ print(f"Error: {model_file_path} not found.")
+ return
+
+ # Create the model using Ollama
+ try:
+ print("Creating model... this may take a while...")
+ if is_windows:
+ subprocess.run(
+ ["ollama", "create", "navi-cli", "-f", model_file_path],
+ check=True,
+ capture_output=True,
+ text=True,
+ )
+ else:
+ env = os.environ.copy()
+ env["SUDO_ASKPASS"] = temp_script_path
+ subprocess.run(
+ ["sudo", "ollama", "create", "navi-cli", "-f", model_file_path],
+ env=env,
+ check=True,
+ capture_output=True,
+ text=True,
+ )
+
+ print("Model creation succeeded!")
+ except subprocess.CalledProcessError as e:
+ print("Model creation failed!")
+ print("Return code:", e.returncode)
+ print("stdout:", e.stdout)
+ print("stderr:", e.stderr)
+ finally:
+ # Cleanup: Remove the temporary script and cloned directory
+ if temp_script_path and os.path.exists(temp_script_path):
+ os.remove(temp_script_path)
+ shutil.rmtree(repo_dir, ignore_errors=True)
diff --git a/install/requirements.txt b/install/requirements.txt
index 57b1d34..616623f 100644
--- a/install/requirements.txt
+++ b/install/requirements.txt
@@ -13,3 +13,5 @@ image
mido
psutil
prompt_toolkit==3.0.48
+PyPDF2==3.0.1
+sentence-transformers==3.3.1
diff --git a/navi.py b/navi.py
index 1fffe03..124c3ec 100644
--- a/navi.py
+++ b/navi.py
@@ -1,5 +1,5 @@
-import re
import platform
+import re
import subprocess # nosec
diff --git a/mods/mods.py b/navi_banner.py
similarity index 51%
rename from mods/mods.py
rename to navi_banner.py
index 8fe84b0..95d0cac 100644
--- a/mods/mods.py
+++ b/navi_banner.py
@@ -4,7 +4,6 @@
versionNum = get_navi_version()
-
# The cover art:
art = rf"""{breakline}
| _ __ _ |
@@ -19,31 +18,17 @@
| the use of Navi. --nhelp for more information |
{breakline}
"""
-vbusterArt = rf"""{breakline}
-| _ ______ __ |
-| | | / / __ )__ _______/ /____ _____ |
-| | | / / __ / / / / ___/ __/ _ \/ ___/ |
-| | |/ / /_/ / /_/ (__ ) /_/ __/ / |
-| |___/_____/\__,_/____/\__/\___/_/ |
-| Powered by ClamAV |
-{breakline}
-
-"""
-reconArt = rf"""{breakline}
-| ____ |
-| / __ \___ _________ ____ |
-| / /_/ / _ \/ ___/ __ \/ __ \\ |
-| / _, _/ __/ /__/ /_/ / / / / |
-| /_/ |_|\___/\___/\____/_/ /_/ |
-{breakline}
-"""
-gptArt = rf"""{breakline}
-| _ __ _ __________ ______ |
-| / | / /___ __ __(_) / ____/ __ \/_ __/ |
-| / |/ / __ `/ | / / /_____/ / __/ /_/ / / / |
-| / /| / /_/ /| |/ / /_____/ /_/ / ____/ / / |
-| /_/ |_/\__,_/ |___/_/ \____/_/ /_/ |
-| |
+three_b_art = rf"""{breakline}
+| _ __ _ _____ __ |
+| / | / /___ __ __(_)__ // /_ |
+| / |/ / __ `/ | / / / /_ __ \ |
+| / /| / /_/ /| |/ / /___/ / /_/ / |
+| /_/ |_/\__,_/ |___/_//____/_.___/v{versionNum} |
+|===================================================|
+| Disclaimer: Saints Security Group LLC does not |
+| condone or support illegal activity and assumes |
+| no responsibility for damages caused through |
+| the use of Navi. --nhelp for more information |
{breakline}
"""
diff --git a/navi_internal.py b/navi_internal.py
index 8bf2073..8e2f742 100644
--- a/navi_internal.py
+++ b/navi_internal.py
@@ -1,28 +1,63 @@
-import requests
+import json
import os
-import textwrap
+import platform
import random
+import textwrap
import time
-import commands
-import json
-import config
-import spacy
-import platform
+import requests
+import spacy
+from PyPDF2 import PdfReader
from prompt_toolkit import PromptSession
from prompt_toolkit.history import FileHistory
+from sentence_transformers import SentenceTransformer, util
-from mods import mods
+import chips
+import config
+import navi_banner
class NaviApp:
- art: str = mods.art
- helpAr: str = mods.helpArt
- breakline: str = mods.breakline
- ai_name_rep: str = "Navi> "
+ art: str = navi_banner.art
+ three_b_art: str = navi_banner.three_b_art
+ helpAr: str = navi_banner.helpArt
+ breakline: str = navi_banner.breakline
+ ai_name_rep: str = "Navi"
- server: str = config.server
+ server: str = config.remote
port: int = config.port
+ local: str = config.local
+
+ memory_dir: str = "memories"
+ default_session: str = "DEFAULT_SESSION"
+ token_limit_max: int = 4096
+ active_session: str = default_session
+
+ knowledge_store_path: str = "data/knowledge_store.json"
+ input_directory: str = "data/input_files"
+ archive_directory: str = "data/archive"
+ # Initialize SentenceTransformer for RAG
+ retriever_model = SentenceTransformer('all-MiniLM-L6-v2')
+
+ llm_chat_prompt: str = (
+ "You are a highly intelligent chatbot. "
+ "You must answer user questions conversationally unless the user explicitly requests a terminal command. "
+ "Rules: "
+ "1. Respond conversationally for all general questions. Do not include TERMINAL OUTPUT for these responses. "
+ "2. Only respond with terminal commands if the user explicitly requests terminal execution (e.g., 'write to a file,' 'run a command'). "
+ "3. When responding with a terminal command, follow this exact format: "
+ " TERMINAL OUTPUT {terminal code to execute (do not use quotes, backticks, or markdown)}. "
+ "4. Do not include additional text, explanations, or formatting (e.g., markdown, backticks, or language tags like `bash`). "
+ "Examples: "
+ "- User: 'What job did Katie apply to?' "
+ "- Response: 'Katie applied to the position of Office Associate II at the Maine Department of Health.' "
+ "- User: 'Write her job to a file called job.txt.' "
+ "- Response: 'TERMINAL OUTPUT {echo Office Associate II at Maine Department of Health > job.txt}' "
+ "Never include TERMINAL OUTPUT unless explicitly requested. "
+ f"The user's operating system is {platform.system()}. User message:"
+ )
+
+ is_local: bool = True
script_dir = os.path.dirname(os.path.abspath(__file__))
hist_file = os.path.join(script_dir, ".navi_history")
@@ -40,6 +75,173 @@ def __new__(cls, *args, **kwargs):
cls._instance = super(NaviApp, cls).__new__(cls, *args, **kwargs)
return cls._instance
+ def __init__(self):
+ self.knowledge_store = self.load_knowledge_store()
+ self.setup_knowledge_input_dir()
+
+ # ------------------------------ RAG MANAGEMENT ------------------------------
+
+ def setup_knowledge_input_dir(self):
+ os.makedirs(self.input_directory, exist_ok=True)
+ os.makedirs(self.archive_directory, exist_ok=True)
+
+ def load_knowledge_store(self):
+ if os.path.exists(self.knowledge_store_path):
+ with open(self.knowledge_store_path, "r") as f:
+ return json.load(f)
+ return []
+
+ def save_knowledge_store(self):
+ with open(self.knowledge_store_path, "w") as f:
+ json.dump(self.knowledge_store, f, indent=4)
+
+ def process_knowledge_files(self):
+ for file_name in os.listdir(self.input_directory):
+ file_path = os.path.join(self.input_directory, file_name)
+ if not os.path.isfile(file_path):
+ continue
+
+ content = ""
+ if file_name.endswith(".pdf"):
+ content = self.extract_text_from_pdf(file_path)
+ elif file_name.endswith(".txt"):
+ content = self.extract_text_from_txt(file_path)
+
+ if content:
+ self.knowledge_store.append({"content": content, "source": file_name})
+ self.save_knowledge_store()
+ print(f"Added knowledge from {file_name}")
+ # Move processed files to the archive directory
+ import shutil
+ archive_path = os.path.join(self.archive_directory, file_name)
+ shutil.move(file_path, archive_path)
+ print(f"Processed {file_name}")
+
+ def retrieve_context(self, query):
+ if not self.knowledge_store:
+ return "No relevant knowledge available."
+
+ query_embedding = self.retriever_model.encode(query, convert_to_tensor=True)
+ knowledge_embeddings = self.retriever_model.encode(
+ [item["content"] for item in self.knowledge_store], convert_to_tensor=True
+ )
+
+ scores = util.pytorch_cos_sim(query_embedding, knowledge_embeddings)[0]
+ top_indices = scores.argsort(descending=True)[:3] # Retrieve top 3 matches
+
+ retrieved_snippets = [
+ f"{self.knowledge_store[i]['content']} (Source: {self.knowledge_store[i]['source']})"
+ for i in top_indices
+ ]
+ return "\n".join(retrieved_snippets)
+
+ def extract_text_from_pdf(self, file_path):
+ try:
+ text = []
+ reader = PdfReader(file_path)
+ for page in reader.pages:
+ text.append(page.extract_text())
+ return "\n".join(text)
+ except Exception as e:
+ print(f"Error extracting text from {file_path}: {e}")
+ return ""
+
+ def extract_text_from_txt(self, file_path):
+ try:
+ with open(file_path, "r", encoding="utf-8") as f:
+ return f.read()
+ except Exception as e:
+ print(f"Error reading text file {file_path}: {e}")
+ return ""
+
+ def trim_rag_to_token_limit(self, text, token_limit):
+ words = text.split()
+ if len(words) > token_limit:
+ trimmed_text = " ".join(words[:token_limit])
+ return trimmed_text + "..."
+ return text
+
+ # ------------------------------ MEMORY MANAGEMENT ---------------------------
+
+ def setup_memory(self) -> None:
+ if not os.path.exists(self.memory_dir):
+ os.makedirs(self.memory_dir)
+ if not os.path.exists(self.get_session_path(self.default_session)):
+ self.create_new_session(self.default_session)
+
+ def get_session_path(self, session_name):
+ return os.path.join(self.memory_dir, f"{session_name}.json")
+
+ def trim_history_to_token_limit(self, chat_history, token_limit):
+ while chat_history and self.calculate_tokens(chat_history) > token_limit:
+ chat_history.pop(0)
+
+ return chat_history
+
+ def load_session(self, session_name):
+ path = self.get_session_path(session_name)
+ if os.path.exists(path):
+ with open(path, 'r') as f:
+ return json.load(f)
+ return []
+
+ def save_session(self, session_name, chat_history):
+ path = self.get_session_path(session_name)
+ with open(path, 'w') as f:
+ json.dump(chat_history, f, indent=4)
+
+ def calculate_tokens(self, chat_history):
+ return sum(len(entry['content'].split()) for entry in chat_history)
+
+ def create_new_session(self, session_name):
+ if not session_name.upper():
+ print("Session name cannot be empty.")
+ return
+ if os.path.exists(self.get_session_path(session_name.upper())):
+ print("Session with this name already exists.")
+ return
+ self.save_session(session_name.upper(), [])
+
+ def set_active_session(self, session_name):
+ if not os.path.exists(self.get_session_path(session_name)):
+ print(f"Session {session_name} does not exist.")
+ return None
+ self.active_session = session_name
+
+ def save_chat_to_session(self, session_name, history, chat_user, chat_assistant, token_limit):
+ chat_history = history
+ chat_history.append(chat_user)
+ chat_history.append(chat_assistant)
+
+ # Handle token overflow
+ from navi_shell import get_navi_settings
+ if get_navi_settings()["overwrite_session"] and self.calculate_tokens(chat_history) > token_limit:
+ chat_history.pop(0)
+
+ self.save_session(session_name, chat_history)
+
+ def get_active_session(self):
+ return self.active_session
+
+ def remove_session(self, session_name):
+ if os.path.exists(self.get_session_path(session_name)):
+ if session_name == self.default_session:
+ # Clear the default session
+ self.save_session(self.default_session, [])
+ else:
+ # Set active session to the default session
+ self.set_active_session(self.default_session)
+ # Remove the session file
+ os.remove(self.get_session_path(session_name))
+ # If the removed session was a config default, set it to the default session
+ from navi_shell import get_navi_settings, modify_navi_settings
+ if get_navi_settings()["session"] is session_name:
+ modify_navi_settings("session", self.default_session)
+ else:
+ print(f"{session_name} does not exist.")
+
+ # ------------------------------ CORE NAVI FUNCTIONS --------------------------
+
def setup_history(self) -> None:
self.session = PromptSession(history=FileHistory(self.hist_file))
@@ -49,10 +251,16 @@ def get_user(self) -> str:
def set_user(self, sys_user: str) -> None:
self.user = sys_user
+ def set_local(self, local_state) -> None:
+ self.is_local = local_state
+
+ def set_navi_name(self, navi_name: str) -> None:
+ self.ai_name_rep = navi_name
+
def print_message(self, text: str, include_ai_name: bool = True) -> None:
to_print = text
if include_ai_name:
- to_print = self.ai_name_rep + text
+ to_print = self.ai_name_rep + "> " + text
sleep_times = {
(0, 0.1): 0.0,
(0.1, 0.2): 0.05,
@@ -75,7 +283,7 @@ def print_message(self, text: str, include_ai_name: bool = True) -> None:
wrapped_lines = textwrap.fill(line, width=wrap_width)
for char in wrapped_lines:
print(char, end="", flush=True)
- random_num = random.uniform(0, 1) # nosec
+ random_num = random.uniform(0, 1) # nosec
for range_tuple, sleep_time in sleep_times.items():
if range_tuple[0] <= random_num < range_tuple[1]:
time.sleep(sleep_time)
@@ -85,33 +293,77 @@ def print_message(self, text: str, include_ai_name: bool = True) -> None:
def clear_terminal(self) -> None:
os.system('cls' if os.name == 'nt' else 'clear')
- print(self.art)
+ if self.is_local:
+ print(self.three_b_art)
+ else:
+ print(self.art)
- def llm_chat(self, user_message: str, called_from_app: bool = False) -> tuple[str, int]:
+ def fetch_token_limits(self):
+ from navi_shell import get_navi_settings
+ try:
+ navi_settings = get_navi_settings()
+
+ token_limit_rag = int(navi_settings["token_limit_rag"])
+ token_limit_chat = int(navi_settings["token_limit_chat"])
+
+ # Check if the combined total exceeds the maximum allowed
+ return_default = False
+ if token_limit_rag < 0 or token_limit_chat < 0:
+ print("Warning: Negative token values are invalid. Using default values")
+ return_default = True
+ if token_limit_rag + token_limit_chat > self.token_limit_max:
+ print("Warning: Combined token limits exceed the maximum allowed. Using default values")
+ return_default = True
+ if return_default:
+ return 2048, 2048
+ else:
+ return token_limit_rag, token_limit_chat
+ except (ValueError, TypeError, KeyError) as e:
+ print(f"Warning: Issue fetching token limits: {e}. Using default values.")
+ return 2048, 2048
+
+ def get_max_token_limit(self):
+ return self.token_limit_max
+
+ def llm_chat(self, user_message: str, called_from_app: bool = False, call_remote: bool = False) -> tuple[str, int]:
# Define the API endpoint and payload
message_amendment = user_message
if not called_from_app:
- message_amendment = (
- ("If the user message has a terminal command request, provide the following 'TERMINAL OUTPUT {"
- "terminal code to execute request (no not encapsulate command in quotes)}' and NOTHING "
- "ELSE. Otherwise continue to communicate"
- "normally.") +
- f"The user's OS is {platform.system()}" + ". User message:")
+ message_amendment = self.llm_chat_prompt
message_amendment += user_message
- url = f"http://{self.server}:{self.port}/api/chat"
+
+ token_limit_rag, token_limit_chat = self.fetch_token_limits()
+
+ # Check if RAG should be used
+ retrieved_context = ""
+ if self.is_local:
+ # Retrieve context and trim to token limit
+ retrieved_context = self.retrieve_context(user_message)
+ retrieved_context = self.trim_rag_to_token_limit(retrieved_context, token_limit_rag)
+
+ # Load chat history and trim for token limit
+ chat_history = self.load_session(self.active_session)
+ chat_submission = self.trim_history_to_token_limit(chat_history, token_limit_chat)
+
+ # Create combined input for API call
+ if retrieved_context:
+ combined_input = f"Retrieved Context:\n{retrieved_context}\n\nUser Query:\n{message_amendment}"
+ else:
+ combined_input = message_amendment
payload = {
"model": "navi-cli",
- "messages": [{"role": "user", "content": message_amendment}]
+ "messages": chat_submission + [{"role": "user", "content": combined_input}]
}
headers = {'Content-Type': 'application/json'}
+ url = f"http://{self.local}:{self.port}/api/chat"
+ if call_remote or not self.is_local:
+ url = f"http://{self.server}:{self.port}/api/chat"
response = requests.post(url, headers=headers, json=payload)
- # Check if the response is valid
+ # Process the response
if response.status_code == 200:
response_text = response.text
-
- # Split the response into lines and parse each line as JSON
messages = [line for line in response_text.split('\n') if line]
extracted_responses = []
@@ -125,28 +377,58 @@ def llm_chat(self, user_message: str, called_from_app: bool = False) -> tuple[st
except KeyboardInterrupt:
self.print_message(f"Keyboard interrupt registered, talk soon {self.user}!")
- # Concatenate the extracted messages
+ # Concatenate assistant responses
full_response = "".join(extracted_responses)
+
+ # Save only the user message and assistant response to chat history
+ self.save_chat_to_session(
+ self.active_session,
+ chat_history,
+ {"role": "user", "content": user_message},
+ {"role": "assistant", "content": full_response},
+ token_limit_chat
+ )
+
return full_response, 200
else:
- return f"{response.url},{response.json()}", 400
+ return f"Error: {response.status_code}, {response.text}", 400
def process_message(self, user_message: str) -> None:
processed_message = self.nlp(user_message.strip())
navi_commands = [ent for ent in processed_message.ents if ent.label_ == "NAVI_COMMAND"]
+
# Check if the message is a question
question_keywords = {"is", "does", "do", "what", "when", "where", "who", "why", "what", "how"}
is_question = any(token.text.lower() in question_keywords for token in processed_message if token.i == 0)
if navi_commands and not is_question:
command = navi_commands[0].text
- main_command = commands.alias_to_command.get(command)
+ main_command = chips.alias_to_command.get(command)
if main_command:
- commands.modules[main_command].run(processed_message)
+ chips.modules[main_command].run(processed_message)
else:
response_message, http_status = self.llm_chat(user_message)
- if response_message.startswith("TERMINAL OUTPUT"):
- commands.modules["navi_sys"].run(response_message)
+
+ # Normalize TERMINAL OUTPUT and process terminal-related responses
+ if "TERMINAL OUTPUT" in response_message.upper(): # Case-insensitive check
+ # Normalize TERMINAL OUTPUT
+ response_message = response_message.replace("Terminal Output", "TERMINAL OUTPUT").replace(
+ "terminal output", "TERMINAL OUTPUT")
+
+ # Remove unwanted formatting
+ clean_response = (
+ response_message.replace("```", "")
+ .replace("bash", "")
+ .replace("TERMINAL OUTPUT", "")
+ .strip()
+ )
+ if clean_response.startswith("{") and clean_response.endswith("}"):
+ clean_response = clean_response[1:-1].strip() # Remove surrounding braces
+
+ if clean_response: # Ensure the command isn't empty
+ chips.modules["navi_sys"].run(clean_response)
+ else:
+ self.print_message("Invalid terminal command received.")
else:
self.print_message(f"{response_message if http_status == 200 else 'Issue with server'}")
@@ -162,7 +444,7 @@ def chat_with_navi(self) -> None:
def setup_navi_vocab(self) -> None:
# Register commands and aliases with the entity ruler
- for command, module in commands.modules.items():
+ for command, module in chips.modules.items():
patterns = [{"label": "NAVI_COMMAND", "pattern": command}]
aliases = getattr(module, 'aliases', []) # Safely get the aliases attribute, default to an empty list
for alias in aliases:
diff --git a/navi_shell.py b/navi_shell.py
index 77e4549..45a5f5c 100644
--- a/navi_shell.py
+++ b/navi_shell.py
@@ -1,15 +1,20 @@
+import argparse
+import getpass
import os
import sys
-import getpass
-import argparse
import traceback
+
+from colorama import Fore
+
import navi_internal
+from chips import SSG
+from install import local_model
from navi_updater import check_version, update_script
def handle_exception(exc_type, exc_value, exc_traceback) -> None:
from datetime import datetime
-
+
if issubclass(exc_type, KeyboardInterrupt):
sys.__excepthook__(exc_type, exc_value, exc_traceback)
return
@@ -26,7 +31,8 @@ def handle_exception(exc_type, exc_value, exc_traceback) -> None:
log_file_path = os.path.abspath(log_file)
- print(f"\nDang! Navi crashed. A crash log has been created at:\n{log_file_path}. \n\nYou can create a new Navi GitHub issue here: \nhttps://github.com/SaintsSec/Navi/issues. \n\nThank you for helping us make Navi better!")
+ print(
+ f"\nDang! Navi crashed. A crash log has been created at:\n{log_file_path}. \n\nYou can create a new Navi GitHub issue here: \nhttps://github.com/SaintsSec/Navi/issues. \n\nThank you for helping us make Navi better!")
print("\nWould you like to:")
print("1) Open the crash log")
@@ -48,6 +54,7 @@ def handle_exception(exc_type, exc_value, exc_traceback) -> None:
sys.exit(1)
+
sys.excepthook = handle_exception
user = getpass.getuser()
@@ -60,39 +67,101 @@ def handle_exception(exc_type, exc_value, exc_traceback) -> None:
parser.add_argument('--skip-update', action='store_true',
help='Skip the update check (used internally to prevent update loop)')
parser.add_argument('--install', action='store_true', help='installs Navi based on the current downloaded version.')
+parser.add_argument('--remote', action='store_true', help='Use remote server instead of local server')
args = parser.parse_args()
-def restart_navi() -> None:
- os.execv(sys.executable, [sys.executable] + sys.argv + ["--skip-update"]) # nosec
+
+def restart_navi(custom_flag: bool = False, flag: str = "") -> None:
+ import subprocess # nosec
+ command = [sys.executable] + sys.argv + (["--skip-update"] if not custom_flag else [flag])
+ subprocess.run(command, check=True)
+
+ sys.exit()
+
+
+def local_model_check():
+ if local_model.ollama_installed():
+ if not local_model.is_ollama_service_running():
+ local_model.start_ollama_service()
+ is_installed, has_unexpected_error = local_model.check_model_installed()
+ if is_installed:
+ local_model.start_ollama_service()
+ else:
+ if has_unexpected_error:
+ print(f"{Fore.YELLOW}Warning: We can't verify that the local navi model is installed.{Fore.RESET}")
+ install_decision()
+ else:
+ print(f"{Fore.YELLOW}The local Navi model not installed.{Fore.RESET}")
+ install_decision()
+ else:
+ print(f"{Fore.YELLOW}Warning: Ollama is required to run the local Navi model.{Fore.RESET}")
+ install_decision()
+
+
+def install_decision():
+ user_decision = input("(C)ontinue with --remote flag or begin (i)nstallation: ")
+ if user_decision == "c" or user_decision == "C":
+ print("To never see this prompt again, set 'use_local_model' to False using the 'settings'"
+ "command.")
+ default_input = input("Would you like us to set it for you? (Y)es, (N)o, please restart: ")
+ if default_input.lower() == "y" or default_input.lower() == "yes":
+ modify_navi_settings("use_local_model", False)
+ restart_navi(True, "--remote")
+ if user_decision == "i" or user_decision == "I":
+ print("Beginning installation")
+ local_model.install_model()
+
+
+def get_navi_settings() -> dict:
+ return SSG.navi_settings.settings_init()
+
+
+def modify_navi_settings(key, value) -> None:
+ SSG.navi_settings.modify_config(key, value)
+
def main() -> None:
navi_instance = navi_internal.navi_instance
- navi_instance.set_user(user)
+ navi_settings = get_navi_settings()
+ navi_instance.set_user(navi_settings["username"])
+ navi_instance.set_navi_name(navi_settings["navi_name"])
try:
if args.q:
response_message, http_status = navi_instance.llm_chat(
f"{args.q}",
- True
+ True,
+ args.remote
)
navi_instance.print_message(
f"{response_message if http_status == 200 else f'Trouble connecting to Navi server.'}"
)
exit(0)
- if not args.noupdate and not args.skip_update:
+ if not args.noupdate and not args.skip_update and not navi_settings["dont_check_for_updates"]:
+ if navi_settings["update_branch"] == "edge":
+ args.edge = True
download_url = check_version(args.edge)
if download_url:
update_script(download_url)
if args.install:
os.system('cd ./install && ./install.sh')
+ if args.remote or not navi_settings["use_local_model"]:
+ navi_instance.set_local(False)
+ if not args.remote and navi_settings["use_local_model"]:
+ local_model_check()
+ # Only process files if local model is used
+ navi_instance.process_knowledge_files()
navi_instance.setup_navi_vocab()
navi_instance.clear_terminal()
navi_instance.setup_history()
+ navi_instance.setup_memory()
+ navi_instance.set_active_session(navi_settings["session"])
navi_instance.chat_with_navi()
- navi_instance.print_message(f"How can I help you {user}")
+ navi_instance.print_message(f"How can I help you, {user}")
except KeyboardInterrupt:
navi_instance.print_message(f"\nKeyboard interrupt has been registered, talk soon {user}!")
exit(0)
+
if __name__ == "__main__":
main()
diff --git a/navi_updater.py b/navi_updater.py
index 330aa7c..7d2b097 100644
--- a/navi_updater.py
+++ b/navi_updater.py
@@ -1,12 +1,11 @@
-from typing import Any
-
-import requests
import os
-import sys
+import shutil
import subprocess # nosec
+import sys
import zipfile
-import shutil
+from typing import Any
+import requests
global prime_navi_version
navi_version_path = ".navi_version"
@@ -67,7 +66,8 @@ def update_version_number(version: str) -> None:
version_file.write(version)
-def check_for_new_release(current_version: str, repo_owner: str, repo_name: str, edge: bool = False) -> tuple[str, str | None]:
+def check_for_new_release(current_version: str, repo_owner: str, repo_name: str, edge: bool = False) -> tuple[
+ str, str | None]:
latest_release = get_latest_release(repo_owner, repo_name, edge)
if current_version == "Unknown" or (latest_release and is_new_release(current_version, latest_release['tag_name'])):
global prime_navi_version