diff --git a/contextframe/connectors/__init__.py b/contextframe/connectors/__init__.py index 652c50d..925d526 100644 --- a/contextframe/connectors/__init__.py +++ b/contextframe/connectors/__init__.py @@ -10,8 +10,13 @@ SourceConnector, SyncResult, ) +from contextframe.connectors.discord import DiscordConnector from contextframe.connectors.github import GitHubConnector +from contextframe.connectors.google_drive import GoogleDriveConnector from contextframe.connectors.linear import LinearConnector +from contextframe.connectors.notion import NotionConnector +from contextframe.connectors.obsidian import ObsidianConnector +from contextframe.connectors.slack import SlackConnector __all__ = [ "SourceConnector", @@ -20,4 +25,9 @@ "AuthType", "GitHubConnector", "LinearConnector", + "GoogleDriveConnector", + "NotionConnector", + "SlackConnector", + "DiscordConnector", + "ObsidianConnector", ] diff --git a/contextframe/connectors/discord.py b/contextframe/connectors/discord.py new file mode 100644 index 0000000..e358706 --- /dev/null +++ b/contextframe/connectors/discord.py @@ -0,0 +1,449 @@ +"""Discord connector for importing server messages into ContextFrame.""" + +import json +from contextframe import FrameRecord +from contextframe.connectors.base import ( + AuthType, + ConnectorConfig, + SourceConnector, + SyncResult, +) +from contextframe.schema import RecordType +from datetime import UTC, datetime, timedelta, timezone +from typing import Any, Dict, List, Optional, Set + + +class DiscordConnector(SourceConnector): + """Connector for importing Discord server content.""" + + def __init__(self, config: ConnectorConfig, dataset): + """Initialize Discord connector. + + Args: + config: Connector configuration with Discord-specific settings + dataset: Target FrameDataset + """ + super().__init__(config, dataset) + + # Configuration options + self.guild_ids = config.sync_config.get("guild_ids", []) + self.channel_ids = config.sync_config.get("channel_ids", []) + self.channel_names = config.sync_config.get("channel_names", []) + self.include_threads = config.sync_config.get("include_threads", True) + self.include_forum_posts = config.sync_config.get("include_forum_posts", True) + self.include_voice_text = config.sync_config.get("include_voice_text", False) + self.days_to_sync = config.sync_config.get("days_to_sync", 30) + self.user_ids = config.sync_config.get("user_ids", []) + self.include_reactions = config.sync_config.get("include_reactions", True) + self.include_attachments = config.sync_config.get("include_attachments", True) + + # Set up Discord API client + self._setup_client() + + def _setup_client(self): + """Set up Discord API client.""" + try: + import discord + self.discord = discord + + # Configure intents + intents = discord.Intents.default() + intents.message_content = True + intents.members = True + intents.guilds = True + + self.intents = intents + except ImportError: + raise ImportError( + "discord.py is required for Discord connector. " + "Install with: pip install discord.py" + ) + + # We'll create the client when needed since it requires async + self.client = None + self._bot_token = None + + # Initialize based on auth type + if self.config.auth_type == AuthType.TOKEN: + self._bot_token = self.config.auth_config.get("bot_token") + if not self._bot_token: + raise ValueError("Discord bot token required for authentication") + else: + raise ValueError("Discord connector requires bot token authentication") + + def validate_connection(self) -> bool: + """Validate Discord connection.""" + # Discord.py requires async operations, so we'll validate during sync + # For now, just check that we have a token + return bool(self._bot_token) + + def discover_content(self) -> dict[str, Any]: + """Discover Discord server structure.""" + # This would require async operations, so we'll discover during sync + return { + "note": "Discord discovery happens during sync due to async requirements", + "configured_guilds": self.guild_ids, + "configured_channels": self.channel_ids, + } + + def sync(self, incremental: bool = True) -> SyncResult: + """Sync Discord content to ContextFrame.""" + # Discord.py requires async operations, so we'll use a sync wrapper + import asyncio + + try: + # Use existing event loop if available, otherwise create new one + try: + loop = asyncio.get_running_loop() + # If we're already in an async context, we can't use run_until_complete + # This is a limitation of the sync interface with async Discord.py + result = SyncResult(success=False) + result.add_error( + "Discord connector requires async execution. " + "Consider using the async version or running from a sync context." + ) + return result + except RuntimeError: + # No running loop, we can create one + return asyncio.run(self._async_sync(incremental)) + except Exception as e: + result = SyncResult(success=False) + result.add_error(f"Failed to sync Discord: {e}") + result.complete() + return result + + async def _async_sync(self, incremental: bool) -> SyncResult: + """Async implementation of sync.""" + result = SyncResult(success=True) + + # Get last sync state if incremental + last_sync_state = None + if incremental: + last_sync_state = self.get_last_sync_state() + + # Create Discord client + client = self.discord.Client(intents=self.intents) + + # Store sync data + sync_data = { + "result": result, + "last_sync_state": last_sync_state, + "processed_messages": set(), + "synced_channels": {}, + } + + @client.event + async def on_ready(): + """Called when Discord client is ready.""" + self.logger.info(f"Connected to Discord as {client.user}") + + # Create main collection + collection_id = self.create_collection( + "Discord Servers", + "Messages and threads from Discord" + ) + + # Process guilds + for guild in client.guilds: + if self.guild_ids and guild.id not in self.guild_ids: + continue + + await self._sync_guild( + guild, + collection_id, + sync_data + ) + + # Close client + await client.close() + + # Run client + try: + await client.start(self._bot_token) + except Exception as e: + result.add_error(f"Discord client error: {e}") + result.success = False + + # Save sync state + if result.success: + new_state = { + "last_sync": datetime.now().isoformat(), + "processed_messages": list(sync_data["processed_messages"]), + "synced_channels": sync_data["synced_channels"], + } + self.save_sync_state(new_state) + + result.complete() + return result + + async def _sync_guild( + self, + guild: Any, + parent_collection_id: str, + sync_data: dict[str, Any] + ): + """Sync a Discord guild (server).""" + result = sync_data["result"] + + try: + # Create collection for guild + guild_collection_id = self.create_collection( + f"Server: {guild.name}", + f"Discord server: {guild.name}" + ) + + # Sync channels + for channel in guild.channels: + # Skip voice channels unless include_voice_text is True + if channel.type == self.discord.ChannelType.voice and not self.include_voice_text: + continue + + # Check if specific channels requested + if self.channel_ids and channel.id not in self.channel_ids: + continue + if self.channel_names and channel.name not in self.channel_names: + continue + + # Only sync text-based channels + if hasattr(channel, 'history'): + await self._sync_channel( + channel, + guild_collection_id, + sync_data + ) + + except Exception as e: + result.add_error(f"Failed to sync guild {guild.name}: {e}") + + async def _sync_channel( + self, + channel: Any, + parent_collection_id: str, + sync_data: dict[str, Any] + ): + """Sync a Discord channel.""" + result = sync_data["result"] + last_sync_state = sync_data["last_sync_state"] + processed_messages = sync_data["processed_messages"] + + try: + # Create collection for channel + channel_desc = channel.topic if hasattr(channel, 'topic') and channel.topic else f"Discord channel #{channel.name}" + channel_collection_id = self.create_collection( + f"#{channel.name}", + channel_desc + ) + + sync_data["synced_channels"][str(channel.id)] = channel_collection_id + + # Calculate time range + after = None + if self.days_to_sync > 0: + after = datetime.now(UTC) - timedelta(days=self.days_to_sync) + + if incremental and last_sync_state: + # Use last sync time + after = datetime.fromisoformat(last_sync_state["last_sync"]) + # Discord expects timezone-aware datetime + if after.tzinfo is None: + after = after.replace(tzinfo=UTC) + + # Get messages + message_count = 0 + async for message in channel.history(limit=None, after=after, oldest_first=True): + # Filter by user if specified + if self.user_ids and message.author.id not in self.user_ids: + continue + + # Skip bot messages unless specifically included + if message.author.bot and not self.config.sync_config.get("include_bots", False): + continue + + # Process message + frame = await self._map_message_to_frame(message, channel, channel_collection_id) + if frame: + try: + message_id = f"{channel.id}:{message.id}" + + existing = self.dataset.search( + f"custom_metadata.x_discord_message_id:'{message_id}'", + limit=1 + ) + + if existing: + self.dataset.update(existing[0].metadata["uuid"], frame) + result.frames_updated += 1 + else: + self.dataset.add(frame) + result.frames_created += 1 + + processed_messages.add(message_id) + message_count += 1 + + except Exception as e: + result.frames_failed += 1 + result.add_error(f"Failed to sync message: {e}") + + # Sync threads if enabled + if self.include_threads and hasattr(channel, 'threads'): + for thread in channel.threads: + await self._sync_thread(thread, channel_collection_id, sync_data) + + # Sync forum posts if enabled + if self.include_forum_posts and channel.type == self.discord.ChannelType.forum: + async for thread in channel.archived_threads(limit=100): + await self._sync_thread(thread, channel_collection_id, sync_data) + + self.logger.info(f"Synced {message_count} messages from #{channel.name}") + + except Exception as e: + result.add_error(f"Failed to sync channel #{channel.name}: {e}") + + async def _sync_thread( + self, + thread: Any, + parent_collection_id: str, + sync_data: dict[str, Any] + ): + """Sync a Discord thread.""" + result = sync_data["result"] + processed_messages = sync_data["processed_messages"] + + try: + # Create collection for thread + thread_collection_id = self.create_collection( + f"Thread: {thread.name}", + f"Discord thread in #{thread.parent.name}" + ) + + # Get messages + async for message in thread.history(limit=None, oldest_first=True): + frame = await self._map_message_to_frame(message, thread, thread_collection_id, is_thread=True) + if frame: + try: + message_id = f"{thread.id}:{message.id}" + + existing = self.dataset.search( + f"custom_metadata.x_discord_message_id:'{message_id}'", + limit=1 + ) + + if existing: + self.dataset.update(existing[0].metadata["uuid"], frame) + result.frames_updated += 1 + else: + self.dataset.add(frame) + result.frames_created += 1 + + processed_messages.add(message_id) + + except Exception as e: + result.add_warning(f"Failed to sync thread message: {e}") + + except Exception as e: + result.add_warning(f"Failed to sync thread {thread.name}: {e}") + + def map_to_frame(self, source_data: dict[str, Any]) -> FrameRecord | None: + """Map Discord data to FrameRecord.""" + # This is a sync method, but Discord operations are async + # Return None for the base implementation + return None + + async def _map_message_to_frame( + self, + message: Any, + channel: Any, + collection_id: str, + is_thread: bool = False + ) -> FrameRecord | None: + """Map Discord message to FrameRecord.""" + try: + # Build author name + author = message.author.display_name or message.author.name + if message.author.discriminator != "0": + author = f"{author}#{message.author.discriminator}" + + # Build title + title = f"Message from {author}" + if is_thread: + title = f"Thread message from {author}" + + metadata = { + "title": title, + "record_type": RecordType.DOCUMENT, + "source_type": "discord_message", + "source_url": message.jump_url, + "collection": collection_id, + "collection_id": collection_id, + "author": author, + "created_at": message.created_at.isoformat(), + "custom_metadata": { + "x_discord_message_id": f"{channel.id}:{message.id}", + "x_discord_channel_id": str(channel.id), + "x_discord_channel_name": channel.name, + "x_discord_author_id": str(message.author.id), + "x_discord_guild_id": str(channel.guild.id) if hasattr(channel, 'guild') else None, + } + } + + # Add edited timestamp if edited + if message.edited_at: + metadata["updated_at"] = message.edited_at.isoformat() + + # Build content + content = f"**{author}** - {message.created_at.strftime('%Y-%m-%d %H:%M:%S UTC')}\n\n" + content += message.content or "" + + # Add embeds + if message.embeds: + content += "\n\n**Embeds:**\n" + for embed in message.embeds: + if embed.title: + content += f"\n### {embed.title}\n" + if embed.description: + content += f"{embed.description}\n" + if embed.url: + content += f"URL: {embed.url}\n" + if embed.fields: + for field in embed.fields: + content += f"\n**{field.name}**: {field.value}\n" + + # Add reactions + if self.include_reactions and message.reactions: + content += "\n\n**Reactions:**\n" + for reaction in message.reactions: + emoji = str(reaction.emoji) + content += f"{emoji} ({reaction.count}) " + content += "\n" + + # Add attachments + if self.include_attachments and message.attachments: + content += "\n\n**Attachments:**\n" + for attachment in message.attachments: + content += f"- [{attachment.filename}]({attachment.url})" + if attachment.content_type: + content += f" ({attachment.content_type})" + content += "\n" + + # Add reply reference + if message.reference and message.reference.message_id: + metadata["custom_metadata"]["x_discord_reply_to"] = str(message.reference.message_id) + + frame = FrameRecord( + text_content=content, + metadata=metadata, + context=message.content[:200] if message.content else "", + ) + + # Add relationships + if message.reference and message.reference.message_id: + frame.add_relationship( + "reply_to", + id=f"{message.reference.channel_id}:{message.reference.message_id}" + ) + + return frame + + except Exception as e: + self.logger.error(f"Failed to map message: {e}") + return None \ No newline at end of file diff --git a/contextframe/connectors/google_drive.py b/contextframe/connectors/google_drive.py new file mode 100644 index 0000000..2cf0c1c --- /dev/null +++ b/contextframe/connectors/google_drive.py @@ -0,0 +1,491 @@ +"""Google Drive connector for importing documents into ContextFrame.""" + +import io +import mimetypes +from contextframe import FrameRecord +from contextframe.connectors.base import ( + AuthType, + ConnectorConfig, + SourceConnector, + SyncResult, +) +from contextframe.schema import RecordType +from datetime import datetime +from typing import Any, Dict, List, Optional, Set + + +class GoogleDriveConnector(SourceConnector): + """Connector for importing Google Drive content.""" + + GOOGLE_MIME_TYPES = { + "application/vnd.google-apps.document": "text/plain", + "application/vnd.google-apps.spreadsheet": "text/csv", + "application/vnd.google-apps.presentation": "text/plain", + "application/vnd.google-apps.drawing": "image/png", + } + + EXPORT_FORMATS = { + "application/vnd.google-apps.document": "text/plain", + "application/vnd.google-apps.spreadsheet": "text/csv", + "application/vnd.google-apps.presentation": "text/plain", + } + + def __init__(self, config: ConnectorConfig, dataset): + """Initialize Google Drive connector. + + Args: + config: Connector configuration with Google Drive settings + dataset: Target FrameDataset + """ + super().__init__(config, dataset) + + # Configuration options + self.folder_ids = config.sync_config.get("folder_ids", []) + self.shared_drives = config.sync_config.get("shared_drives", []) + self.file_patterns = config.sync_config.get("file_patterns", ["*"]) + self.exclude_patterns = config.sync_config.get("exclude_patterns", []) + self.include_trashed = config.sync_config.get("include_trashed", False) + self.export_google_formats = config.sync_config.get("export_google_formats", True) + + # Set up Google Drive API client + self._setup_client() + + def _setup_client(self): + """Set up Google Drive API client.""" + try: + from google.oauth2 import service_account + from googleapiclient.discovery import build + from googleapiclient.http import MediaIoBaseDownload + + self.MediaIoBaseDownload = MediaIoBaseDownload + except ImportError: + raise ImportError( + "google-api-python-client and google-auth are required. " + "Install with: pip install google-api-python-client google-auth" + ) + + if self.config.auth_type == AuthType.OAUTH: + # OAuth flow for user authentication + from google.auth.transport.requests import Request + from google.oauth2.credentials import Credentials + from google_auth_oauthlib.flow import InstalledAppFlow + + SCOPES = ['https://www.googleapis.com/auth/drive.readonly'] + + creds = None + token_data = self.config.auth_config.get("token_data") + + if token_data: + creds = Credentials.from_authorized_user_info(token_data, SCOPES) + + if not creds or not creds.valid: + if creds and creds.expired and creds.refresh_token: + creds.refresh(Request()) + else: + flow = InstalledAppFlow.from_client_config( + self.config.auth_config.get("client_config"), + SCOPES + ) + creds = flow.run_local_server(port=0) + + # Save credentials back to config + self.config.auth_config["token_data"] = { + "token": creds.token, + "refresh_token": creds.refresh_token, + "token_uri": creds.token_uri, + "client_id": creds.client_id, + "client_secret": creds.client_secret, + "scopes": creds.scopes + } + + self.service = build('drive', 'v3', credentials=creds) + + elif self.config.auth_type == AuthType.API_KEY: + # Service account authentication + credentials = service_account.Credentials.from_service_account_info( + self.config.auth_config.get("service_account_info"), + scopes=['https://www.googleapis.com/auth/drive.readonly'] + ) + self.service = build('drive', 'v3', credentials=credentials) + else: + raise ValueError("Google Drive requires OAuth or service account auth") + + def validate_connection(self) -> bool: + """Validate Google Drive connection.""" + try: + # Try to get user info or list files + about = self.service.about().get(fields="user").execute() + self.logger.info(f"Connected to Google Drive as: {about['user']['displayName']}") + return True + except Exception as e: + self.logger.error(f"Failed to validate Google Drive connection: {e}") + return False + + def discover_content(self) -> dict[str, Any]: + """Discover Google Drive content structure.""" + discovery = { + "drive_info": {}, + "folders": [], + "file_stats": { + "total_files": 0, + "file_types": {}, + "total_size": 0, + }, + "shared_drives": [] + } + + try: + # Get drive info + about = self.service.about().get( + fields="user,storageQuota" + ).execute() + + discovery["drive_info"] = { + "user": about["user"]["displayName"], + "email": about["user"]["emailAddress"], + "storage_used": about.get("storageQuota", {}).get("usage", 0), + "storage_limit": about.get("storageQuota", {}).get("limit", 0), + } + + # Discover shared drives + if self.shared_drives: + shared_drives = self.service.drives().list( + pageSize=100, + fields="drives(id,name)" + ).execute() + + discovery["shared_drives"] = [ + {"id": d["id"], "name": d["name"]} + for d in shared_drives.get("drives", []) + ] + + # Discover folders and files + query_parts = [] + if not self.include_trashed: + query_parts.append("trashed=false") + + # Add folder filters + if self.folder_ids: + folder_queries = [f"'{fid}' in parents" for fid in self.folder_ids] + query_parts.append(f"({' or '.join(folder_queries)})") + + query = " and ".join(query_parts) if query_parts else None + + # Get files + page_token = None + while True: + results = self.service.files().list( + q=query, + pageSize=1000, + fields="nextPageToken, files(id, name, mimeType, size, parents)", + pageToken=page_token + ).execute() + + files = results.get('files', []) + + for file in files: + if file['mimeType'] == 'application/vnd.google-apps.folder': + discovery["folders"].append({ + "id": file["id"], + "name": file["name"], + "parents": file.get("parents", []) + }) + else: + discovery["file_stats"]["total_files"] += 1 + size = int(file.get("size", 0)) + discovery["file_stats"]["total_size"] += size + + # Track file types + mime_type = file["mimeType"] + discovery["file_stats"]["file_types"][mime_type] = \ + discovery["file_stats"]["file_types"].get(mime_type, 0) + 1 + + page_token = results.get('nextPageToken') + if not page_token: + break + + except Exception as e: + self.logger.error(f"Failed to discover Google Drive content: {e}") + discovery["error"] = str(e) + + return discovery + + def sync(self, incremental: bool = True) -> SyncResult: + """Sync Google Drive content to ContextFrame.""" + result = SyncResult(success=True) + + # Get last sync state if incremental + last_sync_state = None + if incremental: + last_sync_state = self.get_last_sync_state() + + # Create main collection + collection_id = self.create_collection( + "Google Drive", + "Documents and files from Google Drive" + ) + + # Track processed files + processed_files: set[str] = set() + + # Process folders + if self.folder_ids: + for folder_id in self.folder_ids: + self._sync_folder( + folder_id, + collection_id, + result, + last_sync_state, + processed_files + ) + else: + # Sync root folder + self._sync_folder( + "root", + collection_id, + result, + last_sync_state, + processed_files + ) + + # Process shared drives + for drive_id in self.shared_drives: + self._sync_shared_drive( + drive_id, + collection_id, + result, + last_sync_state, + processed_files + ) + + # Save sync state + if result.success: + new_state = { + "last_sync": datetime.now().isoformat(), + "processed_files": list(processed_files), + } + self.save_sync_state(new_state) + + result.complete() + return result + + def _sync_folder( + self, + folder_id: str, + parent_collection_id: str, + result: SyncResult, + last_sync_state: dict[str, Any] | None, + processed_files: set[str], + folder_name: str = None + ): + """Sync a specific folder and its contents.""" + try: + # Get folder info if not root + if folder_id != "root" and not folder_name: + folder = self.service.files().get( + fileId=folder_id, + fields="name" + ).execute() + folder_name = folder["name"] + + # Create collection for folder + if folder_name: + collection_id = self.create_collection( + f"Folder: {folder_name}", + f"Google Drive folder: {folder_name}" + ) + else: + collection_id = parent_collection_id + + # List files in folder + query = f"'{folder_id}' in parents" + if not self.include_trashed: + query += " and trashed=false" + + page_token = None + while True: + results = self.service.files().list( + q=query, + pageSize=100, + fields="nextPageToken, files(id, name, mimeType, modifiedTime, size, webViewLink)", + pageToken=page_token + ).execute() + + files = results.get('files', []) + + for file in files: + if file["mimeType"] == "application/vnd.google-apps.folder": + # Recursively sync subfolders + self._sync_folder( + file["id"], + collection_id, + result, + last_sync_state, + processed_files, + file["name"] + ) + else: + # Check if file matches patterns + if self._matches_patterns(file["name"]): + # Check if needs update + if incremental and last_sync_state: + last_sync = datetime.fromisoformat(last_sync_state["last_sync"]) + modified = datetime.fromisoformat( + file["modifiedTime"].replace("Z", "+00:00") + ) + if modified <= last_sync: + continue + + # Process file + frame = self.map_to_frame(file) + if frame: + frame.metadata["collection"] = collection_id + frame.metadata["collection_id"] = collection_id + + try: + # Check if exists + existing = self.dataset.search( + f"source_url:'{file['webViewLink']}'", + limit=1 + ) + + if existing: + self.dataset.update(existing[0].metadata["uuid"], frame) + result.frames_updated += 1 + else: + self.dataset.add(frame) + result.frames_created += 1 + + processed_files.add(file["id"]) + + except Exception as e: + result.frames_failed += 1 + result.add_error(f"Failed to import {file['name']}: {e}") + + page_token = results.get('nextPageToken') + if not page_token: + break + + except Exception as e: + result.add_error(f"Failed to sync folder {folder_id}: {e}") + + def _sync_shared_drive( + self, + drive_id: str, + parent_collection_id: str, + result: SyncResult, + last_sync_state: dict[str, Any] | None, + processed_files: set[str] + ): + """Sync a shared drive.""" + try: + # Get drive info + drive = self.service.drives().get(driveId=drive_id).execute() + + # Create collection for shared drive + collection_id = self.create_collection( + f"Shared Drive: {drive['name']}", + f"Google Shared Drive: {drive['name']}" + ) + + # Sync drive contents (similar to folder sync but with supportsAllDrives=True) + # Implementation would be similar to _sync_folder with drive-specific parameters + + except Exception as e: + result.add_error(f"Failed to sync shared drive {drive_id}: {e}") + + def _matches_patterns(self, filename: str) -> bool: + """Check if a filename matches the configured patterns.""" + from pathlib import Path + + path = Path(filename) + + # Check exclude patterns first + for pattern in self.exclude_patterns: + if path.match(pattern): + return False + + # Check include patterns + if not self.file_patterns or "*" in self.file_patterns: + return True + + for pattern in self.file_patterns: + if path.match(pattern): + return True + + return False + + def map_to_frame(self, file_data: dict[str, Any]) -> FrameRecord | None: + """Map Google Drive file to FrameRecord.""" + try: + file_id = file_data["id"] + mime_type = file_data["mimeType"] + + # Create base metadata + metadata = { + "title": file_data["name"], + "record_type": RecordType.DOCUMENT, + "source_type": "google_drive", + "source_url": file_data.get("webViewLink", f"https://drive.google.com/file/d/{file_id}"), + "source_file": file_data["name"], + "updated_at": file_data.get("modifiedTime"), + "custom_metadata": { + "x_google_drive_id": file_id, + "x_google_drive_mime_type": mime_type, + "x_google_drive_size": file_data.get("size", "0"), + } + } + + # Download file content + text_content = None + raw_data = None + raw_data_type = None + + if mime_type in self.GOOGLE_MIME_TYPES and self.export_google_formats: + # Export Google formats to readable format + export_mime_type = self.EXPORT_FORMATS.get(mime_type, "text/plain") + + try: + response = self.service.files().export( + fileId=file_id, + mimeType=export_mime_type + ).execute() + + if isinstance(response, bytes): + text_content = response.decode('utf-8', errors='replace') + else: + text_content = str(response) + + except Exception as e: + self.logger.warning(f"Failed to export {file_data['name']}: {e}") + text_content = f"Google {mime_type} document: {file_data['name']}" + + else: + # Download regular files + if mime_type.startswith("text/"): + try: + response = self.service.files().get_media(fileId=file_id).execute() + text_content = response.decode('utf-8', errors='replace') + except Exception as e: + self.logger.warning(f"Failed to download {file_data['name']}: {e}") + text_content = f"File: {file_data['name']}" + elif mime_type.startswith("image/"): + # For images, store reference + raw_data_type = mime_type + text_content = f"Image file: {file_data['name']}" + # Could download image data here if needed + else: + # Other binary files + text_content = f"Binary file: {file_data['name']} ({mime_type})" + + # Create frame + return FrameRecord( + text_content=text_content, + metadata=metadata, + raw_data=raw_data, + raw_data_type=raw_data_type, + ) + + except Exception as e: + self.logger.error(f"Failed to map file {file_data.get('name', 'Unknown')}: {e}") + return None \ No newline at end of file diff --git a/contextframe/connectors/notion.py b/contextframe/connectors/notion.py new file mode 100644 index 0000000..8775ad6 --- /dev/null +++ b/contextframe/connectors/notion.py @@ -0,0 +1,683 @@ +"""Notion connector for importing pages and databases into ContextFrame.""" + +import json +from contextframe import FrameRecord +from contextframe.connectors.base import ( + AuthType, + ConnectorConfig, + SourceConnector, + SyncResult, +) +from contextframe.schema import RecordType +from datetime import datetime +from typing import Any, Dict, List, Optional, Set + + +class NotionConnector(SourceConnector): + """Connector for importing Notion workspace content.""" + + def __init__(self, config: ConnectorConfig, dataset): + """Initialize Notion connector. + + Args: + config: Connector configuration with Notion-specific settings + dataset: Target FrameDataset + """ + super().__init__(config, dataset) + + # Configuration options + self.workspace_ids = config.sync_config.get("workspace_ids", []) + self.database_ids = config.sync_config.get("database_ids", []) + self.page_ids = config.sync_config.get("page_ids", []) + self.include_archived = config.sync_config.get("include_archived", False) + self.sync_databases = config.sync_config.get("sync_databases", True) + self.sync_pages = config.sync_config.get("sync_pages", True) + self.include_comments = config.sync_config.get("include_comments", True) + + # Set up Notion API client + self._setup_client() + + def _setup_client(self): + """Set up Notion API client.""" + try: + from notion_client import Client + except ImportError: + raise ImportError( + "notion-client is required for Notion connector. " + "Install with: pip install notion-client" + ) + + # Initialize client based on auth type + if self.config.auth_type == AuthType.TOKEN: + token = self.config.auth_config.get("token") + if not token: + raise ValueError("Notion integration token required for authentication") + self.client = Client(auth=token) + else: + raise ValueError("Notion connector requires token authentication") + + def validate_connection(self) -> bool: + """Validate Notion connection.""" + try: + # Try to get user info + users = self.client.users.list() + self.logger.info(f"Connected to Notion workspace with {len(users['results'])} users") + return True + except Exception as e: + self.logger.error(f"Failed to validate Notion connection: {e}") + return False + + def discover_content(self) -> dict[str, Any]: + """Discover Notion workspace structure.""" + discovery = { + "workspace": { + "users": [], + "bot_info": {}, + }, + "databases": [], + "pages": [], + "stats": { + "total_databases": 0, + "total_pages": 0, + "page_types": {}, + } + } + + try: + # Get bot info + bot = self.client.users.me() + discovery["workspace"]["bot_info"] = { + "id": bot["id"], + "name": bot.get("name", "Notion Integration"), + "type": bot["type"], + } + + # Get users + users = self.client.users.list() + for user in users["results"]: + discovery["workspace"]["users"].append({ + "id": user["id"], + "name": user.get("name", "Unknown"), + "type": user["type"], + }) + + # Search for all content + cursor = None + while True: + results = self.client.search( + filter={"property": "object", "value": "page"}, + start_cursor=cursor, + page_size=100 + ) + + for item in results["results"]: + if item["object"] == "database": + discovery["databases"].append({ + "id": item["id"], + "title": self._get_title(item), + "url": item["url"], + "archived": item.get("archived", False), + }) + discovery["stats"]["total_databases"] += 1 + else: # page + page_type = "child_page" if item.get("parent", {}).get("type") == "page_id" else "root_page" + discovery["pages"].append({ + "id": item["id"], + "title": self._get_title(item), + "url": item["url"], + "archived": item.get("archived", False), + "type": page_type, + }) + discovery["stats"]["total_pages"] += 1 + discovery["stats"]["page_types"][page_type] = \ + discovery["stats"]["page_types"].get(page_type, 0) + 1 + + if not results["has_more"]: + break + cursor = results["next_cursor"] + + except Exception as e: + self.logger.error(f"Failed to discover Notion content: {e}") + discovery["error"] = str(e) + + return discovery + + def sync(self, incremental: bool = True) -> SyncResult: + """Sync Notion content to ContextFrame.""" + result = SyncResult(success=True) + + # Get last sync state if incremental + last_sync_state = None + if incremental: + last_sync_state = self.get_last_sync_state() + + # Create main collection + collection_id = self.create_collection( + "Notion Workspace", + "Pages and databases from Notion" + ) + + # Track processed items + processed_items: set[str] = set() + + # Sync databases + if self.sync_databases: + self._sync_databases(collection_id, result, last_sync_state, processed_items) + + # Sync pages + if self.sync_pages: + self._sync_pages(collection_id, result, last_sync_state, processed_items) + + # Save sync state + if result.success: + new_state = { + "last_sync": datetime.now().isoformat(), + "processed_items": list(processed_items), + } + self.save_sync_state(new_state) + + result.complete() + return result + + def _sync_databases( + self, + parent_collection_id: str, + result: SyncResult, + last_sync_state: dict[str, Any] | None, + processed_items: set[str] + ): + """Sync Notion databases.""" + try: + # Search for databases + cursor = None + while True: + results = self.client.search( + filter={"property": "object", "value": "database"}, + start_cursor=cursor, + page_size=100 + ) + + for database in results["results"]: + if not self.include_archived and database.get("archived", False): + continue + + # Check if specific databases requested + if self.database_ids and database["id"] not in self.database_ids: + continue + + # Check if needs update + if incremental and last_sync_state: + last_sync = datetime.fromisoformat(last_sync_state["last_sync"]) + last_edited = datetime.fromisoformat( + database["last_edited_time"].replace("Z", "+00:00") + ) + if last_edited <= last_sync: + continue + + # Create collection for database + db_title = self._get_title(database) + db_collection_id = self.create_collection( + f"Database: {db_title}", + f"Notion database: {db_title}" + ) + + # Create database frame + frame = self._map_database_to_frame(database, parent_collection_id) + if frame: + try: + existing = self.dataset.search( + f"source_url:'{database['url']}'", + limit=1 + ) + + if existing: + self.dataset.update(existing[0].metadata["uuid"], frame) + result.frames_updated += 1 + else: + self.dataset.add(frame) + result.frames_created += 1 + + processed_items.add(database["id"]) + + # Sync database entries + self._sync_database_entries( + database["id"], + db_collection_id, + result, + last_sync_state, + processed_items + ) + + except Exception as e: + result.frames_failed += 1 + result.add_error(f"Failed to sync database {db_title}: {e}") + + if not results["has_more"]: + break + cursor = results["next_cursor"] + + except Exception as e: + result.add_error(f"Failed to sync databases: {e}") + result.success = False + + def _sync_database_entries( + self, + database_id: str, + collection_id: str, + result: SyncResult, + last_sync_state: dict[str, Any] | None, + processed_items: set[str] + ): + """Sync entries from a Notion database.""" + try: + cursor = None + while True: + results = self.client.databases.query( + database_id=database_id, + start_cursor=cursor, + page_size=100 + ) + + for entry in results["results"]: + if not self.include_archived and entry.get("archived", False): + continue + + # Map and save entry + frame = self._map_page_to_frame(entry, collection_id) + if frame: + try: + existing = self.dataset.search( + f"source_url:'{entry['url']}'", + limit=1 + ) + + if existing: + self.dataset.update(existing[0].metadata["uuid"], frame) + result.frames_updated += 1 + else: + self.dataset.add(frame) + result.frames_created += 1 + + processed_items.add(entry["id"]) + + except Exception as e: + result.frames_failed += 1 + result.add_error(f"Failed to sync database entry: {e}") + + if not results["has_more"]: + break + cursor = results["next_cursor"] + + except Exception as e: + result.add_warning(f"Failed to sync database entries for {database_id}: {e}") + + def _sync_pages( + self, + parent_collection_id: str, + result: SyncResult, + last_sync_state: dict[str, Any] | None, + processed_items: set[str] + ): + """Sync Notion pages.""" + try: + # Search for pages + cursor = None + while True: + results = self.client.search( + filter={"property": "object", "value": "page"}, + start_cursor=cursor, + page_size=100 + ) + + for page in results["results"]: + # Skip databases (they're handled separately) + if page["object"] == "database": + continue + + if not self.include_archived and page.get("archived", False): + continue + + # Check if specific pages requested + if self.page_ids and page["id"] not in self.page_ids: + continue + + # Check if needs update + if incremental and last_sync_state: + last_sync = datetime.fromisoformat(last_sync_state["last_sync"]) + last_edited = datetime.fromisoformat( + page["last_edited_time"].replace("Z", "+00:00") + ) + if last_edited <= last_sync: + continue + + # Process page + frame = self._map_page_to_frame(page, parent_collection_id) + if frame: + try: + existing = self.dataset.search( + f"source_url:'{page['url']}'", + limit=1 + ) + + if existing: + self.dataset.update(existing[0].metadata["uuid"], frame) + result.frames_updated += 1 + else: + self.dataset.add(frame) + result.frames_created += 1 + + processed_items.add(page["id"]) + + # Sync comments if enabled + if self.include_comments: + self._sync_page_comments(page["id"], parent_collection_id, result) + + except Exception as e: + result.frames_failed += 1 + result.add_error(f"Failed to sync page {self._get_title(page)}: {e}") + + if not results["has_more"]: + break + cursor = results["next_cursor"] + + except Exception as e: + result.add_error(f"Failed to sync pages: {e}") + result.success = False + + def _sync_page_comments(self, page_id: str, collection_id: str, result: SyncResult): + """Sync comments for a page.""" + try: + comments = self.client.comments.list(block_id=page_id) + + for comment in comments["results"]: + frame = self._map_comment_to_frame(comment, page_id, collection_id) + if frame: + try: + existing = self.dataset.search( + f"custom_metadata.x_notion_comment_id:'{comment['id']}'", + limit=1 + ) + + if existing: + self.dataset.update(existing[0].metadata["uuid"], frame) + result.frames_updated += 1 + else: + self.dataset.add(frame) + result.frames_created += 1 + + except Exception as e: + result.add_warning(f"Failed to sync comment: {e}") + + except Exception as e: + # Comments API might not be available for all integrations + result.add_warning(f"Failed to sync comments for page {page_id}: {e}") + + def map_to_frame(self, source_data: dict[str, Any]) -> FrameRecord | None: + """Map Notion data to FrameRecord.""" + object_type = source_data.get("object") + + if object_type == "database": + return self._map_database_to_frame(source_data, "") + elif object_type == "page": + return self._map_page_to_frame(source_data, "") + else: + self.logger.warning(f"Unknown Notion object type: {object_type}") + return None + + def _map_database_to_frame( + self, database: dict[str, Any], collection_id: str + ) -> FrameRecord | None: + """Map Notion database to FrameRecord.""" + try: + title = self._get_title(database) + + metadata = { + "title": f"Database: {title}", + "record_type": RecordType.COLLECTION_HEADER, + "source_type": "notion_database", + "source_url": database["url"], + "collection": collection_id, + "collection_id": collection_id, + "created_at": database.get("created_time"), + "updated_at": database.get("last_edited_time"), + "custom_metadata": { + "x_notion_id": database["id"], + "x_notion_archived": database.get("archived", False), + } + } + + # Build content with database schema + content = f"# {title}\n\n" + content += "## Database Properties\n\n" + + if "properties" in database: + for prop_name, prop_config in database["properties"].items(): + content += f"- **{prop_name}** ({prop_config['type']})\n" + + return FrameRecord( + text_content=content, + metadata=metadata, + context=f"Notion database: {title}", + ) + + except Exception as e: + self.logger.error(f"Failed to map database: {e}") + return None + + def _map_page_to_frame( + self, page: dict[str, Any], collection_id: str + ) -> FrameRecord | None: + """Map Notion page to FrameRecord.""" + try: + title = self._get_title(page) + + metadata = { + "title": title, + "record_type": RecordType.DOCUMENT, + "source_type": "notion_page", + "source_url": page["url"], + "collection": collection_id, + "collection_id": collection_id, + "created_at": page.get("created_time"), + "updated_at": page.get("last_edited_time"), + "custom_metadata": { + "x_notion_id": page["id"], + "x_notion_archived": page.get("archived", False), + "x_notion_parent_type": page.get("parent", {}).get("type"), + "x_notion_parent_id": self._get_parent_id(page), + } + } + + # Get page content + content = f"# {title}\n\n" + + # Get page blocks (content) + try: + blocks = self._get_page_blocks(page["id"]) + content += self._blocks_to_markdown(blocks) + except Exception as e: + self.logger.warning(f"Failed to get page content: {e}") + content += f"_Content unavailable: {e}_\n" + + # Add properties if it's a database entry + if "properties" in page: + content += "\n## Properties\n\n" + for prop_name, prop_value in page["properties"].items(): + value = self._extract_property_value(prop_value) + if value: + content += f"**{prop_name}**: {value}\n" + + return FrameRecord( + text_content=content, + metadata=metadata, + context=title, + ) + + except Exception as e: + self.logger.error(f"Failed to map page: {e}") + return None + + def _map_comment_to_frame( + self, comment: dict[str, Any], page_id: str, collection_id: str + ) -> FrameRecord | None: + """Map Notion comment to FrameRecord.""" + try: + metadata = { + "title": "Comment on page", + "record_type": RecordType.DOCUMENT, + "source_type": "notion_comment", + "collection": collection_id, + "collection_id": collection_id, + "created_at": comment.get("created_time"), + "custom_metadata": { + "x_notion_comment_id": comment["id"], + "x_notion_page_id": page_id, + } + } + + # Extract comment text + content = "## Comment\n\n" + for text_block in comment.get("rich_text", []): + content += text_block.get("plain_text", "") + + frame = FrameRecord( + text_content=content, + metadata=metadata, + ) + + # Add relationship to page + frame.add_relationship("comment_on", id=page_id) + + return frame + + except Exception as e: + self.logger.error(f"Failed to map comment: {e}") + return None + + def _get_page_blocks(self, page_id: str) -> list[dict[str, Any]]: + """Get all blocks from a page.""" + blocks = [] + cursor = None + + while True: + results = self.client.blocks.children.list( + block_id=page_id, + start_cursor=cursor, + page_size=100 + ) + + blocks.extend(results["results"]) + + if not results["has_more"]: + break + cursor = results["next_cursor"] + + return blocks + + def _blocks_to_markdown(self, blocks: list[dict[str, Any]]) -> str: + """Convert Notion blocks to markdown.""" + markdown = "" + + for block in blocks: + block_type = block["type"] + + if block_type == "paragraph": + text = self._extract_rich_text(block[block_type].get("rich_text", [])) + markdown += f"{text}\n\n" + + elif block_type in ["heading_1", "heading_2", "heading_3"]: + level = int(block_type[-1]) + text = self._extract_rich_text(block[block_type].get("rich_text", [])) + markdown += f"{'#' * level} {text}\n\n" + + elif block_type == "bulleted_list_item": + text = self._extract_rich_text(block[block_type].get("rich_text", [])) + markdown += f"- {text}\n" + + elif block_type == "numbered_list_item": + text = self._extract_rich_text(block[block_type].get("rich_text", [])) + markdown += f"1. {text}\n" + + elif block_type == "code": + code = self._extract_rich_text(block[block_type].get("rich_text", [])) + language = block[block_type].get("language", "") + markdown += f"```{language}\n{code}\n```\n\n" + + elif block_type == "quote": + text = self._extract_rich_text(block[block_type].get("rich_text", [])) + markdown += f"> {text}\n\n" + + elif block_type == "divider": + markdown += "---\n\n" + + # Handle nested blocks + if block.get("has_children"): + children = self._get_page_blocks(block["id"]) + markdown += self._blocks_to_markdown(children) + + return markdown + + def _extract_rich_text(self, rich_text: list[dict[str, Any]]) -> str: + """Extract plain text from Notion rich text.""" + text = "" + for segment in rich_text: + text += segment.get("plain_text", "") + return text + + def _extract_property_value(self, prop: dict[str, Any]) -> str: + """Extract value from a Notion property.""" + prop_type = prop["type"] + + if prop_type == "title": + return self._extract_rich_text(prop.get("title", [])) + elif prop_type == "rich_text": + return self._extract_rich_text(prop.get("rich_text", [])) + elif prop_type == "number": + return str(prop.get("number", "")) + elif prop_type == "select": + return prop.get("select", {}).get("name", "") + elif prop_type == "multi_select": + return ", ".join([s["name"] for s in prop.get("multi_select", [])]) + elif prop_type == "date": + date = prop.get("date", {}) + if date: + return f"{date.get('start', '')} - {date.get('end', '')}" if date.get('end') else date.get('start', '') + elif prop_type == "checkbox": + return "✓" if prop.get("checkbox") else "✗" + elif prop_type == "url": + return prop.get("url", "") + elif prop_type == "email": + return prop.get("email", "") + elif prop_type == "phone_number": + return prop.get("phone_number", "") + else: + return "" + + def _get_title(self, item: dict[str, Any]) -> str: + """Extract title from a Notion page or database.""" + if "title" in item: + # Database title + return self._extract_rich_text(item["title"]) + elif "properties" in item: + # Page title (usually in a property named "Name" or "Title") + for prop_name in ["Title", "Name", "title", "name"]: + if prop_name in item["properties"]: + prop = item["properties"][prop_name] + if prop["type"] == "title": + return self._extract_rich_text(prop.get("title", [])) + + return "Untitled" + + def _get_parent_id(self, page: dict[str, Any]) -> str | None: + """Get parent ID from a page.""" + parent = page.get("parent", {}) + parent_type = parent.get("type") + + if parent_type == "page_id": + return parent.get("page_id") + elif parent_type == "database_id": + return parent.get("database_id") + elif parent_type == "workspace": + return "workspace" + + return None \ No newline at end of file diff --git a/contextframe/connectors/obsidian.py b/contextframe/connectors/obsidian.py new file mode 100644 index 0000000..bcdb077 --- /dev/null +++ b/contextframe/connectors/obsidian.py @@ -0,0 +1,549 @@ +"""Obsidian connector for importing vault content into ContextFrame.""" + +import json +import re +from contextframe import FrameRecord +from contextframe.connectors.base import ( + AuthType, + ConnectorConfig, + SourceConnector, + SyncResult, +) +from contextframe.schema import RecordType +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, List, Optional, Set + + +class ObsidianConnector(SourceConnector): + """Connector for importing Obsidian vault content.""" + + def __init__(self, config: ConnectorConfig, dataset): + """Initialize Obsidian connector. + + Args: + config: Connector configuration with Obsidian-specific settings + dataset: Target FrameDataset + """ + super().__init__(config, dataset) + + # Configuration options + self.vault_path = Path(config.sync_config.get("vault_path", "")) + self.include_attachments = config.sync_config.get("include_attachments", True) + self.include_daily_notes = config.sync_config.get("include_daily_notes", True) + self.include_templates = config.sync_config.get("include_templates", False) + self.folders_to_include = config.sync_config.get("folders_to_include", []) + self.folders_to_exclude = config.sync_config.get("folders_to_exclude", [".obsidian", ".trash"]) + self.extract_frontmatter = config.sync_config.get("extract_frontmatter", True) + self.extract_tags = config.sync_config.get("extract_tags", True) + self.extract_backlinks = config.sync_config.get("extract_backlinks", True) + + # Validate vault path + if not self.vault_path.exists(): + raise ValueError(f"Obsidian vault path does not exist: {self.vault_path}") + if not self.vault_path.is_dir(): + raise ValueError(f"Obsidian vault path is not a directory: {self.vault_path}") + + # Look for .obsidian folder to confirm it's a vault + obsidian_config = self.vault_path / ".obsidian" + if not obsidian_config.exists(): + self.logger.warning(f"No .obsidian folder found in {self.vault_path}. Are you sure this is an Obsidian vault?") + + def validate_connection(self) -> bool: + """Validate Obsidian vault access.""" + try: + # Check if we can read the vault + if not self.vault_path.exists(): + return False + + # Try to list files + list(self.vault_path.glob("*.md")) + self.logger.info(f"Connected to Obsidian vault: {self.vault_path}") + return True + except Exception as e: + self.logger.error(f"Failed to validate Obsidian vault access: {e}") + return False + + def discover_content(self) -> dict[str, Any]: + """Discover Obsidian vault structure.""" + discovery = { + "vault_path": str(self.vault_path), + "vault_name": self.vault_path.name, + "folders": [], + "file_stats": { + "total_notes": 0, + "total_attachments": 0, + "total_size": 0, + "file_types": {}, + }, + "metadata": { + "tags_found": set(), + "backlinks_count": 0, + "has_frontmatter": 0, + } + } + + try: + # Walk through vault + for file_path in self.vault_path.rglob("*"): + if file_path.is_file(): + # Skip excluded folders + if any(excluded in file_path.parts for excluded in self.folders_to_exclude): + continue + + file_size = file_path.stat().st_size + discovery["file_stats"]["total_size"] += file_size + + # Track file types + ext = file_path.suffix.lower() + discovery["file_stats"]["file_types"][ext] = \ + discovery["file_stats"]["file_types"].get(ext, 0) + 1 + + if ext == ".md": + discovery["file_stats"]["total_notes"] += 1 + + # Analyze note content + try: + content = file_path.read_text(encoding='utf-8') + + # Check for frontmatter + if content.startswith("---"): + discovery["metadata"]["has_frontmatter"] += 1 + + # Extract tags + tags = re.findall(r'#[\w\-\/]+', content) + discovery["metadata"]["tags_found"].update(tags) + + # Count backlinks + backlinks = re.findall(r'\[\[([^\]]+)\]\]', content) + discovery["metadata"]["backlinks_count"] += len(backlinks) + + except Exception as e: + self.logger.warning(f"Failed to analyze {file_path}: {e}") + + elif ext in ['.png', '.jpg', '.jpeg', '.gif', '.pdf', '.mp4', '.webm']: + discovery["file_stats"]["total_attachments"] += 1 + + elif file_path.is_dir(): + # Skip excluded folders + if file_path.name not in self.folders_to_exclude: + rel_path = file_path.relative_to(self.vault_path) + discovery["folders"].append(str(rel_path)) + + # Convert set to list for JSON serialization + discovery["metadata"]["tags_found"] = list(discovery["metadata"]["tags_found"]) + + except Exception as e: + self.logger.error(f"Failed to discover Obsidian content: {e}") + discovery["error"] = str(e) + + return discovery + + def sync(self, incremental: bool = True) -> SyncResult: + """Sync Obsidian vault to ContextFrame.""" + result = SyncResult(success=True) + + # Get last sync state if incremental + last_sync_state = None + if incremental: + last_sync_state = self.get_last_sync_state() + + # Create main collection + collection_id = self.create_collection( + f"Obsidian: {self.vault_path.name}", + f"Notes and attachments from Obsidian vault: {self.vault_path}" + ) + + # Track processed files and relationships + processed_files: set[str] = set() + note_relationships: dict[str, list[str]] = {} # note_path -> [linked_notes] + + # Process markdown files + self._sync_notes( + collection_id, + result, + last_sync_state, + processed_files, + note_relationships + ) + + # Process attachments if enabled + if self.include_attachments: + self._sync_attachments( + collection_id, + result, + last_sync_state, + processed_files + ) + + # Create backlink relationships after all notes are processed + self._create_backlink_relationships(note_relationships, result) + + # Save sync state + if result.success: + new_state = { + "last_sync": datetime.now().isoformat(), + "vault_path": str(self.vault_path), + "processed_files": list(processed_files), + } + self.save_sync_state(new_state) + + result.complete() + return result + + def _sync_notes( + self, + collection_id: str, + result: SyncResult, + last_sync_state: dict[str, Any] | None, + processed_files: set[str], + note_relationships: dict[str, list[str]] + ): + """Sync Obsidian notes (.md files).""" + try: + # Find all markdown files + for note_path in self.vault_path.rglob("*.md"): + # Skip excluded folders + if any(excluded in note_path.parts for excluded in self.folders_to_exclude): + continue + + # Skip templates unless included + if not self.include_templates and "template" in note_path.name.lower(): + continue + + # Check folder filters + rel_path = note_path.relative_to(self.vault_path) + if self.folders_to_include: + if not any(str(rel_path).startswith(folder) for folder in self.folders_to_include): + continue + + # Check if needs update + if incremental and last_sync_state: + last_sync = datetime.fromisoformat(last_sync_state["last_sync"]) + modified = datetime.fromtimestamp(note_path.stat().st_mtime) + if modified <= last_sync: + continue + + # Process note + frame = self._map_note_to_frame(note_path, collection_id, note_relationships) + if frame: + try: + # Use relative path as unique identifier + file_id = str(rel_path) + + existing = self.dataset.search( + f"source_file:'{file_id}'", + limit=1 + ) + + if existing: + self.dataset.update(existing[0].metadata["uuid"], frame) + result.frames_updated += 1 + else: + self.dataset.add(frame) + result.frames_created += 1 + + processed_files.add(file_id) + + except Exception as e: + result.frames_failed += 1 + result.add_error(f"Failed to sync note {rel_path}: {e}") + + except Exception as e: + result.add_error(f"Failed to sync notes: {e}") + result.success = False + + def _sync_attachments( + self, + collection_id: str, + result: SyncResult, + last_sync_state: dict[str, Any] | None, + processed_files: set[str] + ): + """Sync Obsidian attachments (images, PDFs, etc.).""" + try: + attachment_extensions = {'.png', '.jpg', '.jpeg', '.gif', '.svg', '.pdf', '.mp4', '.webm', '.mov', '.mp3', '.wav'} + + for file_path in self.vault_path.rglob("*"): + if not file_path.is_file(): + continue + + if file_path.suffix.lower() not in attachment_extensions: + continue + + # Skip excluded folders + if any(excluded in file_path.parts for excluded in self.folders_to_exclude): + continue + + # Check if needs update + if incremental and last_sync_state: + last_sync = datetime.fromisoformat(last_sync_state["last_sync"]) + modified = datetime.fromtimestamp(file_path.stat().st_mtime) + if modified <= last_sync: + continue + + # Process attachment + frame = self._map_attachment_to_frame(file_path, collection_id) + if frame: + try: + rel_path = file_path.relative_to(self.vault_path) + file_id = str(rel_path) + + existing = self.dataset.search( + f"source_file:'{file_id}'", + limit=1 + ) + + if existing: + self.dataset.update(existing[0].metadata["uuid"], frame) + result.frames_updated += 1 + else: + self.dataset.add(frame) + result.frames_created += 1 + + processed_files.add(file_id) + + except Exception as e: + result.frames_failed += 1 + result.add_error(f"Failed to sync attachment {rel_path}: {e}") + + except Exception as e: + result.add_warning(f"Failed to sync attachments: {e}") + + def _create_backlink_relationships( + self, + note_relationships: dict[str, list[str]], + result: SyncResult + ): + """Create backlink relationships between notes.""" + if not self.extract_backlinks: + return + + try: + for source_path, linked_notes in note_relationships.items(): + # Find the source frame + source_results = self.dataset.search( + f"source_file:'{source_path}'", + limit=1 + ) + + if not source_results: + continue + + source_frame = source_results[0] + + for linked_note in linked_notes: + # Find the target frame + target_results = self.dataset.search( + f"source_file:'{linked_note}'", + limit=1 + ) + + if target_results: + target_frame = target_results[0] + # Add relationship + source_frame.add_relationship( + "links_to", + id=target_frame.metadata["uuid"] + ) + + # Update the source frame + self.dataset.update(source_frame.metadata["uuid"], source_frame) + + except Exception as e: + result.add_warning(f"Failed to create backlink relationships: {e}") + + def map_to_frame(self, source_data: dict[str, Any]) -> FrameRecord | None: + """Map Obsidian data to FrameRecord.""" + file_path = Path(source_data.get("file_path", "")) + if file_path.suffix == ".md": + return self._map_note_to_frame(file_path, "", {}) + else: + return self._map_attachment_to_frame(file_path, "") + + def _map_note_to_frame( + self, + note_path: Path, + collection_id: str, + note_relationships: dict[str, list[str]] + ) -> FrameRecord | None: + """Map Obsidian note to FrameRecord.""" + try: + # Read note content + content = note_path.read_text(encoding='utf-8', errors='replace') + + # Extract frontmatter + frontmatter = {} + main_content = content + + if self.extract_frontmatter and content.startswith("---"): + try: + parts = content.split("---", 2) + if len(parts) >= 3: + frontmatter_text = parts[1].strip() + main_content = parts[2].strip() + + # Parse YAML frontmatter + try: + import yaml + frontmatter = yaml.safe_load(frontmatter_text) or {} + except ImportError: + # Parse simple key-value pairs if PyYAML not available + for line in frontmatter_text.split('\n'): + if ':' in line: + key, value = line.split(':', 1) + frontmatter[key.strip()] = value.strip() + except Exception as e: + self.logger.warning(f"Failed to parse frontmatter in {note_path}: {e}") + + except Exception as e: + self.logger.warning(f"Failed to extract frontmatter from {note_path}: {e}") + + # Extract title (from frontmatter or filename) + title = frontmatter.get("title", note_path.stem) + + # Get file stats + stat = note_path.stat() + rel_path = note_path.relative_to(self.vault_path) + + # Build metadata + metadata = { + "title": title, + "record_type": RecordType.DOCUMENT, + "source_type": "obsidian_note", + "source_file": str(rel_path), + "created_at": datetime.fromtimestamp(stat.st_ctime).isoformat(), + "updated_at": datetime.fromtimestamp(stat.st_mtime).isoformat(), + "collection": collection_id, + "collection_id": collection_id, + "custom_metadata": { + "x_obsidian_vault": self.vault_path.name, + "x_obsidian_folder": str(rel_path.parent) if rel_path.parent != Path(".") else "", + "x_obsidian_basename": note_path.stem, + } + } + + # Add frontmatter to metadata + if frontmatter: + metadata["custom_metadata"]["x_obsidian_frontmatter"] = frontmatter + + # Extract common frontmatter fields + if "tags" in frontmatter: + metadata["tags"] = frontmatter["tags"] if isinstance(frontmatter["tags"], list) else [frontmatter["tags"]] + if "author" in frontmatter: + metadata["author"] = frontmatter["author"] + if "created" in frontmatter: + metadata["created_at"] = frontmatter["created"] + if "modified" in frontmatter: + metadata["updated_at"] = frontmatter["modified"] + + # Extract tags from content + if self.extract_tags: + content_tags = re.findall(r'#[\w\-\/]+', main_content) + if content_tags: + existing_tags = metadata.get("tags", []) + all_tags = list(set(existing_tags + content_tags)) + metadata["tags"] = all_tags + + # Extract and store backlinks + linked_notes = [] + if self.extract_backlinks: + # Find [[Note Name]] style links + backlinks = re.findall(r'\[\[([^\]]+)\]\]', main_content) + for link in backlinks: + # Handle alias syntax [[Note Name|Display Text]] + if '|' in link: + link = link.split('|')[0] + + # Convert to potential file path + linked_file = f"{link}.md" + + # Try to find the actual file (case-insensitive) + for potential_path in self.vault_path.rglob("*.md"): + if potential_path.name.lower() == linked_file.lower(): + linked_rel_path = str(potential_path.relative_to(self.vault_path)) + linked_notes.append(linked_rel_path) + break + + # Store for relationship creation later + note_relationships[str(rel_path)] = linked_notes + + # Replace wiki-links with markdown links for better readability + main_content = re.sub(r'\[\[([^\]]+)\]\]', r'[\1]', main_content) + + # Build full content + full_content = f"# {title}\n\n" + if frontmatter: + full_content += "## Metadata\n\n" + for key, value in frontmatter.items(): + full_content += f"**{key}**: {value}\n" + full_content += "\n" + + full_content += main_content + + return FrameRecord( + text_content=full_content, + metadata=metadata, + context=main_content[:500], # First 500 chars as context + ) + + except Exception as e: + self.logger.error(f"Failed to map note {note_path}: {e}") + return None + + def _map_attachment_to_frame( + self, + file_path: Path, + collection_id: str + ) -> FrameRecord | None: + """Map Obsidian attachment to FrameRecord.""" + try: + stat = file_path.stat() + rel_path = file_path.relative_to(self.vault_path) + + metadata = { + "title": file_path.name, + "record_type": RecordType.DOCUMENT, + "source_type": "obsidian_attachment", + "source_file": str(rel_path), + "created_at": datetime.fromtimestamp(stat.st_ctime).isoformat(), + "updated_at": datetime.fromtimestamp(stat.st_mtime).isoformat(), + "collection": collection_id, + "collection_id": collection_id, + "custom_metadata": { + "x_obsidian_vault": self.vault_path.name, + "x_obsidian_folder": str(rel_path.parent) if rel_path.parent != Path(".") else "", + "x_obsidian_file_size": stat.st_size, + "x_obsidian_file_type": file_path.suffix, + } + } + + # Determine content based on file type + if file_path.suffix.lower() in ['.png', '.jpg', '.jpeg', '.gif', '.svg']: + text_content = f"# {file_path.name}\n\nImage attachment from Obsidian vault.\n\n**File**: {rel_path}\n**Size**: {stat.st_size} bytes" + + # Could read image data here if needed + # raw_data = file_path.read_bytes() + # raw_data_type = f"image/{file_path.suffix[1:]}" + + elif file_path.suffix.lower() == '.pdf': + text_content = f"# {file_path.name}\n\nPDF document from Obsidian vault.\n\n**File**: {rel_path}\n**Size**: {stat.st_size} bytes" + + elif file_path.suffix.lower() in ['.mp4', '.webm', '.mov']: + text_content = f"# {file_path.name}\n\nVideo file from Obsidian vault.\n\n**File**: {rel_path}\n**Size**: {stat.st_size} bytes" + + elif file_path.suffix.lower() in ['.mp3', '.wav']: + text_content = f"# {file_path.name}\n\nAudio file from Obsidian vault.\n\n**File**: {rel_path}\n**Size**: {stat.st_size} bytes" + + else: + text_content = f"# {file_path.name}\n\nAttachment from Obsidian vault.\n\n**File**: {rel_path}\n**Type**: {file_path.suffix}\n**Size**: {stat.st_size} bytes" + + return FrameRecord( + text_content=text_content, + metadata=metadata, + # raw_data and raw_data_type could be added here for binary files + ) + + except Exception as e: + self.logger.error(f"Failed to map attachment {file_path}: {e}") + return None \ No newline at end of file diff --git a/contextframe/connectors/slack.py b/contextframe/connectors/slack.py new file mode 100644 index 0000000..fdf6e7a --- /dev/null +++ b/contextframe/connectors/slack.py @@ -0,0 +1,513 @@ +"""Slack connector for importing channel messages into ContextFrame.""" + +import json +from contextframe import FrameRecord +from contextframe.connectors.base import ( + AuthType, + ConnectorConfig, + SourceConnector, + SyncResult, +) +from contextframe.schema import RecordType +from datetime import UTC, datetime, timezone +from typing import Any, Dict, List, Optional, Set + + +class SlackConnector(SourceConnector): + """Connector for importing Slack workspace content.""" + + def __init__(self, config: ConnectorConfig, dataset): + """Initialize Slack connector. + + Args: + config: Connector configuration with Slack-specific settings + dataset: Target FrameDataset + """ + super().__init__(config, dataset) + + # Configuration options + self.channel_ids = config.sync_config.get("channel_ids", []) + self.channel_names = config.sync_config.get("channel_names", []) + self.include_private = config.sync_config.get("include_private", False) + self.include_archived = config.sync_config.get("include_archived", False) + self.include_threads = config.sync_config.get("include_threads", True) + self.include_reactions = config.sync_config.get("include_reactions", True) + self.days_to_sync = config.sync_config.get("days_to_sync", 30) # Default 30 days + self.user_ids = config.sync_config.get("user_ids", []) # Filter by user + + # Set up Slack API client + self._setup_client() + + def _setup_client(self): + """Set up Slack API client.""" + try: + from slack_sdk import WebClient + from slack_sdk.errors import SlackApiError + + self.SlackApiError = SlackApiError + except ImportError: + raise ImportError( + "slack-sdk is required for Slack connector. " + "Install with: pip install slack-sdk" + ) + + # Initialize client based on auth type + if self.config.auth_type == AuthType.TOKEN: + token = self.config.auth_config.get("token") + if not token: + raise ValueError("Slack bot token required for authentication") + self.client = WebClient(token=token) + elif self.config.auth_type == AuthType.OAUTH: + # OAuth token from OAuth flow + token = self.config.auth_config.get("access_token") + if not token: + raise ValueError("Slack OAuth access token required") + self.client = WebClient(token=token) + else: + raise ValueError("Slack connector requires token or OAuth authentication") + + # Cache for user info + self.user_cache: dict[str, Any] = {} + + def validate_connection(self) -> bool: + """Validate Slack connection.""" + try: + # Test authentication + auth_test = self.client.auth_test() + self.workspace_info = auth_test + self.logger.info( + f"Connected to Slack workspace: {auth_test['team']} as {auth_test['user']}" + ) + return True + except Exception as e: + self.logger.error(f"Failed to validate Slack connection: {e}") + return False + + def discover_content(self) -> dict[str, Any]: + """Discover Slack workspace structure.""" + discovery = { + "workspace": { + "name": self.workspace_info.get("team", "Unknown"), + "team_id": self.workspace_info.get("team_id"), + "user": self.workspace_info.get("user"), + "user_id": self.workspace_info.get("user_id"), + }, + "channels": [], + "users": [], + "stats": { + "total_channels": 0, + "public_channels": 0, + "private_channels": 0, + "total_users": 0, + "active_users": 0, + } + } + + try: + # Get channels + channels = [] + cursor = None + + while True: + result = self.client.conversations_list( + exclude_archived=not self.include_archived, + types="public_channel,private_channel" if self.include_private else "public_channel", + cursor=cursor, + limit=1000 + ) + + channels.extend(result["channels"]) + + if not result.get("response_metadata", {}).get("next_cursor"): + break + cursor = result["response_metadata"]["next_cursor"] + + # Process channels + for channel in channels: + channel_info = { + "id": channel["id"], + "name": channel["name"], + "is_private": channel.get("is_private", False), + "is_archived": channel.get("is_archived", False), + "topic": channel.get("topic", {}).get("value", ""), + "purpose": channel.get("purpose", {}).get("value", ""), + "num_members": channel.get("num_members", 0), + } + discovery["channels"].append(channel_info) + + discovery["stats"]["total_channels"] += 1 + if channel.get("is_private"): + discovery["stats"]["private_channels"] += 1 + else: + discovery["stats"]["public_channels"] += 1 + + # Get users + users = self.client.users_list() + for user in users["members"]: + if not user.get("deleted", False) and not user.get("is_bot", False): + user_info = { + "id": user["id"], + "name": user.get("real_name", user.get("name", "Unknown")), + "display_name": user.get("profile", {}).get("display_name", ""), + "is_active": not user.get("deleted", False), + } + discovery["users"].append(user_info) + discovery["stats"]["total_users"] += 1 + if not user.get("deleted", False): + discovery["stats"]["active_users"] += 1 + + except Exception as e: + self.logger.error(f"Failed to discover Slack content: {e}") + discovery["error"] = str(e) + + return discovery + + def sync(self, incremental: bool = True) -> SyncResult: + """Sync Slack content to ContextFrame.""" + result = SyncResult(success=True) + + # Get last sync state if incremental + last_sync_state = None + if incremental: + last_sync_state = self.get_last_sync_state() + + # Create main collection + collection_id = self.create_collection( + f"Slack: {self.workspace_info.get('team', 'Workspace')}", + "Messages and threads from Slack workspace" + ) + + # Track processed items + processed_messages: set[str] = set() + synced_channels: dict[str, str] = {} + + # Get channels to sync + channels_to_sync = self._get_channels_to_sync() + + # Sync each channel + for channel in channels_to_sync: + channel_collection_id = self._sync_channel( + channel, + collection_id, + result, + last_sync_state, + processed_messages + ) + if channel_collection_id: + synced_channels[channel["id"]] = channel_collection_id + + # Save sync state + if result.success: + new_state = { + "last_sync": datetime.now().isoformat(), + "processed_messages": list(processed_messages), + "synced_channels": synced_channels, + } + self.save_sync_state(new_state) + + result.complete() + return result + + def _get_channels_to_sync(self) -> list[dict[str, Any]]: + """Get list of channels to sync based on configuration.""" + channels = [] + + # Get channels by ID + for channel_id in self.channel_ids: + try: + info = self.client.conversations_info(channel=channel_id) + channels.append(info["channel"]) + except Exception as e: + self.logger.warning(f"Failed to get channel {channel_id}: {e}") + + # Get channels by name + if self.channel_names: + cursor = None + while True: + result = self.client.conversations_list( + exclude_archived=not self.include_archived, + types="public_channel,private_channel" if self.include_private else "public_channel", + cursor=cursor + ) + + for channel in result["channels"]: + if channel["name"] in self.channel_names: + channels.append(channel) + + if not result.get("response_metadata", {}).get("next_cursor"): + break + cursor = result["response_metadata"]["next_cursor"] + + # If no specific channels requested, get all + if not self.channel_ids and not self.channel_names: + cursor = None + while True: + result = self.client.conversations_list( + exclude_archived=not self.include_archived, + types="public_channel,private_channel" if self.include_private else "public_channel", + cursor=cursor, + limit=100 + ) + + channels.extend(result["channels"]) + + if not result.get("response_metadata", {}).get("next_cursor"): + break + cursor = result["response_metadata"]["next_cursor"] + + return channels + + def _sync_channel( + self, + channel: dict[str, Any], + parent_collection_id: str, + result: SyncResult, + last_sync_state: dict[str, Any] | None, + processed_messages: set[str] + ) -> str | None: + """Sync a specific Slack channel.""" + try: + # Create collection for channel + channel_collection_id = self.create_collection( + f"#{channel['name']}", + channel.get("topic", {}).get("value", "") or + channel.get("purpose", {}).get("value", "") or + f"Slack channel #{channel['name']}" + ) + + # Calculate time range + oldest = None + if self.days_to_sync > 0: + oldest = int( + (datetime.now(UTC) - + datetime.timedelta(days=self.days_to_sync)).timestamp() + ) + + if incremental and last_sync_state: + # Use last sync time as oldest + last_sync = datetime.fromisoformat(last_sync_state["last_sync"]) + oldest = int(last_sync.timestamp()) + + # Get messages + cursor = None + while True: + try: + result = self.client.conversations_history( + channel=channel["id"], + oldest=str(oldest) if oldest else None, + cursor=cursor, + limit=1000 + ) + + messages = result.get("messages", []) + + for message in messages: + # Filter by user if specified + if self.user_ids and message.get("user") not in self.user_ids: + continue + + # Skip bot messages unless specifically included + if message.get("subtype") == "bot_message" and not self.config.sync_config.get("include_bots", False): + continue + + # Process message + frame = self._map_message_to_frame(message, channel, channel_collection_id) + if frame: + try: + # Create unique ID for message + message_id = f"{channel['id']}:{message['ts']}" + + existing = self.dataset.search( + f"custom_metadata.x_slack_message_id:'{message_id}'", + limit=1 + ) + + if existing: + self.dataset.update(existing[0].metadata["uuid"], frame) + result.frames_updated += 1 + else: + self.dataset.add(frame) + result.frames_created += 1 + + processed_messages.add(message_id) + + # Sync thread if it exists + if self.include_threads and message.get("thread_ts") == message.get("ts"): + self._sync_thread( + channel["id"], + message["ts"], + channel_collection_id, + result, + processed_messages + ) + + except Exception as e: + result.frames_failed += 1 + result.add_error(f"Failed to sync message: {e}") + + if not result.get("has_more"): + break + cursor = result.get("response_metadata", {}).get("next_cursor") + + except self.SlackApiError as e: + if e.response["error"] == "not_in_channel": + result.add_warning(f"Bot not in channel #{channel['name']}") + else: + result.add_error(f"Failed to get messages from #{channel['name']}: {e}") + break + + return channel_collection_id + + except Exception as e: + result.add_error(f"Failed to sync channel #{channel['name']}: {e}") + return None + + def _sync_thread( + self, + channel_id: str, + thread_ts: str, + collection_id: str, + result: SyncResult, + processed_messages: set[str] + ): + """Sync thread replies.""" + try: + cursor = None + while True: + thread_result = self.client.conversations_replies( + channel=channel_id, + ts=thread_ts, + cursor=cursor, + limit=1000 + ) + + replies = thread_result.get("messages", [])[1:] # Skip parent message + + for reply in replies: + frame = self._map_message_to_frame(reply, {"id": channel_id}, collection_id, is_thread_reply=True) + if frame: + try: + message_id = f"{channel_id}:{reply['ts']}" + + # Add thread relationship + frame.add_relationship("reply_to", id=f"{channel_id}:{thread_ts}") + + existing = self.dataset.search( + f"custom_metadata.x_slack_message_id:'{message_id}'", + limit=1 + ) + + if existing: + self.dataset.update(existing[0].metadata["uuid"], frame) + result.frames_updated += 1 + else: + self.dataset.add(frame) + result.frames_created += 1 + + processed_messages.add(message_id) + + except Exception as e: + result.add_warning(f"Failed to sync thread reply: {e}") + + if not thread_result.get("has_more"): + break + cursor = thread_result.get("response_metadata", {}).get("next_cursor") + + except Exception as e: + result.add_warning(f"Failed to sync thread {thread_ts}: {e}") + + def map_to_frame(self, source_data: dict[str, Any]) -> FrameRecord | None: + """Map Slack data to FrameRecord.""" + return self._map_message_to_frame(source_data, {}, "") + + def _map_message_to_frame( + self, + message: dict[str, Any], + channel: dict[str, Any], + collection_id: str, + is_thread_reply: bool = False + ) -> FrameRecord | None: + """Map Slack message to FrameRecord.""" + try: + # Get user info + user_info = self._get_user_info(message.get("user", "")) + author = user_info.get("real_name", user_info.get("name", "Unknown")) + + # Create timestamp + ts = float(message["ts"]) + created_at = datetime.fromtimestamp(ts, tz=UTC).isoformat() + + # Build title + title = f"Message from {author}" + if is_thread_reply: + title = f"Reply from {author}" + + metadata = { + "title": title, + "record_type": RecordType.DOCUMENT, + "source_type": "slack_message", + "source_url": f"https://{self.workspace_info.get('team', 'slack')}.slack.com/archives/{channel.get('id', '')}/p{message['ts'].replace('.', '')}", + "collection": collection_id, + "collection_id": collection_id, + "author": author, + "created_at": created_at, + "custom_metadata": { + "x_slack_message_id": f"{channel.get('id', '')}:{message['ts']}", + "x_slack_channel_id": channel.get("id", ""), + "x_slack_user_id": message.get("user", ""), + "x_slack_ts": message["ts"], + } + } + + # Build content + content = f"**{author}** - {created_at}\n\n" + content += message.get("text", "") + + # Add reactions if present + if self.include_reactions and message.get("reactions"): + content += "\n\n**Reactions:**\n" + for reaction in message["reactions"]: + content += f":{reaction['name']}: ({reaction['count']}) " + content += "\n" + + # Add attachments info + if message.get("attachments"): + content += "\n\n**Attachments:**\n" + for attachment in message["attachments"]: + if attachment.get("title"): + content += f"- {attachment['title']}\n" + if attachment.get("text"): + content += f" {attachment['text']}\n" + + # Add files info + if message.get("files"): + content += "\n\n**Files:**\n" + for file in message["files"]: + content += f"- {file.get('name', 'Unnamed')} ({file.get('mimetype', 'unknown')})\n" + + return FrameRecord( + text_content=content, + metadata=metadata, + context=message.get("text", "")[:200], # First 200 chars as context + ) + + except Exception as e: + self.logger.error(f"Failed to map message: {e}") + return None + + def _get_user_info(self, user_id: str) -> dict[str, Any]: + """Get user info with caching.""" + if not user_id: + return {} + + if user_id in self.user_cache: + return self.user_cache[user_id] + + try: + result = self.client.users_info(user=user_id) + user_info = result["user"] + self.user_cache[user_id] = user_info + return user_info + except Exception as e: + self.logger.warning(f"Failed to get user info for {user_id}: {e}") + return {"name": user_id} \ No newline at end of file diff --git a/contextframe/tests/test_all_connectors.py b/contextframe/tests/test_all_connectors.py new file mode 100644 index 0000000..7b13a5b --- /dev/null +++ b/contextframe/tests/test_all_connectors.py @@ -0,0 +1,167 @@ +"""Test imports and basic functionality for all connectors.""" + +import pytest +from contextframe.connectors import ( + SourceConnector, + ConnectorConfig, + SyncResult, + AuthType, + GitHubConnector, + LinearConnector, + GoogleDriveConnector, + NotionConnector, + SlackConnector, + DiscordConnector, + ObsidianConnector, +) + + +def test_connector_imports(): + """Test that all connectors can be imported.""" + # Base classes + assert SourceConnector is not None + assert ConnectorConfig is not None + assert SyncResult is not None + assert AuthType is not None + + # Connector implementations + assert GitHubConnector is not None + assert LinearConnector is not None + assert GoogleDriveConnector is not None + assert NotionConnector is not None + assert SlackConnector is not None + assert DiscordConnector is not None + assert ObsidianConnector is not None + + +def test_auth_types(): + """Test AuthType enum values.""" + assert AuthType.API_KEY.value == "api_key" + assert AuthType.OAUTH.value == "oauth" + assert AuthType.BASIC.value == "basic" + assert AuthType.TOKEN.value == "token" + assert AuthType.NONE.value == "none" + + +def test_connector_config_creation(): + """Test ConnectorConfig creation.""" + config = ConnectorConfig( + name="Test Connector", + auth_type=AuthType.TOKEN, + auth_config={"token": "test-token"}, + sync_config={"test": "value"} + ) + + assert config.name == "Test Connector" + assert config.auth_type == AuthType.TOKEN + assert config.auth_config["token"] == "test-token" + assert config.sync_config["test"] == "value" + assert config.timeout == 30 # default value + + +def test_sync_result_creation(): + """Test SyncResult creation and methods.""" + result = SyncResult(success=True) + + assert result.success is True + assert result.frames_created == 0 + assert result.frames_updated == 0 + assert result.frames_failed == 0 + assert len(result.errors) == 0 + assert len(result.warnings) == 0 + + # Test adding errors and warnings + result.add_error("Test error") + result.add_warning("Test warning") + + assert len(result.errors) == 1 + assert len(result.warnings) == 1 + assert result.errors[0] == "Test error" + assert result.warnings[0] == "Test warning" + + # Test completion + result.complete() + assert result.end_time is not None + assert result.duration is not None + assert result.duration >= 0 + + +def test_connector_inheritance(): + """Test that all connectors inherit from SourceConnector.""" + assert issubclass(GitHubConnector, SourceConnector) + assert issubclass(LinearConnector, SourceConnector) + assert issubclass(GoogleDriveConnector, SourceConnector) + assert issubclass(NotionConnector, SourceConnector) + assert issubclass(SlackConnector, SourceConnector) + assert issubclass(DiscordConnector, SourceConnector) + assert issubclass(ObsidianConnector, SourceConnector) + + +def test_connector_required_methods(): + """Test that all connectors implement required abstract methods.""" + required_methods = [ + "validate_connection", + "discover_content", + "sync", + "map_to_frame" + ] + + connectors = [ + GitHubConnector, + LinearConnector, + GoogleDriveConnector, + NotionConnector, + SlackConnector, + DiscordConnector, + ObsidianConnector, + ] + + for connector_class in connectors: + for method_name in required_methods: + assert hasattr(connector_class, method_name), \ + f"{connector_class.__name__} missing method {method_name}" + method = getattr(connector_class, method_name) + assert callable(method), \ + f"{connector_class.__name__}.{method_name} is not callable" + + +@pytest.mark.parametrize("connector_class,expected_deps", [ + (GitHubConnector, ["github"]), + (LinearConnector, ["linear"]), + (GoogleDriveConnector, ["googleapiclient"]), + (NotionConnector, ["notion_client"]), + (SlackConnector, ["slack_sdk"]), + (DiscordConnector, ["discord"]), + (ObsidianConnector, []), # No external dependencies +]) +def test_connector_dependencies(connector_class, expected_deps): + """Test that connectors handle missing dependencies gracefully.""" + config = ConnectorConfig( + name="Test", + auth_type=AuthType.NONE, + sync_config={} + ) + + # For connectors with dependencies, they should raise ImportError + # if the dependency is not available + if expected_deps: + # We can't easily test missing dependencies without actually + # uninstalling packages, so we'll just verify the connectors + # can be instantiated if dependencies are available + try: + # This might fail due to missing auth config, but not due to imports + connector_class(config, None) + except ImportError as e: + # Expected if dependency is missing + assert any(dep in str(e) for dep in expected_deps) + except (ValueError, AttributeError): + # Expected for invalid config or missing dataset + pass + else: + # Obsidian should work without external dependencies + # (though it might fail due to missing vault_path) + try: + connector_class(config, None) + except (ValueError, AttributeError): + # Expected for invalid config + pass \ No newline at end of file diff --git a/docs/external-connectors.md b/docs/external-connectors.md index f9e37e1..bae5996 100644 --- a/docs/external-connectors.md +++ b/docs/external-connectors.md @@ -1,6 +1,6 @@ # External System Connectors -ContextFrame provides connectors to import data from external systems like GitHub, Linear, Google Drive, and more. These connectors enable you to build a unified knowledge base from your existing tools and platforms. +ContextFrame provides connectors to import data from external systems like GitHub, Linear, Google Drive, Notion, Slack, Discord, Obsidian, and more. These connectors enable you to build a unified knowledge base from your existing tools and platforms. ## Overview @@ -87,6 +87,177 @@ connector = LinearConnector(config, dataset) result = connector.sync() ``` +### Google Drive Connector + +Import documents and files from Google Drive. + +**Features:** +- Import from specific folders or entire drive +- Export Google Docs, Sheets, Slides to readable formats +- Support for both personal and shared drives +- Handle various file types (documents, PDFs, images) +- Incremental sync based on modification time + +**Example:** +```python +from contextframe.connectors import GoogleDriveConnector, ConnectorConfig, AuthType + +# Service account authentication (recommended) +config = ConnectorConfig( + name="Google Drive Docs", + auth_type=AuthType.API_KEY, + auth_config={ + "service_account_info": { + "type": "service_account", + "project_id": "your-project-id", + "private_key": "-----BEGIN PRIVATE KEY-----\n...", + "client_email": "your-service-account@project.iam.gserviceaccount.com", + # ... other service account fields + } + }, + sync_config={ + "folder_ids": ["folder-id-1", "folder-id-2"], + "export_google_formats": True, + "file_patterns": ["*.pdf", "*.docx"], + "include_trashed": False, + } +) + +connector = GoogleDriveConnector(config, dataset) +result = connector.sync() +``` + +### Notion Connector + +Import pages and databases from Notion workspaces. + +**Features:** +- Import pages, databases, and their entries +- Preserve page hierarchy and relationships +- Extract properties from database entries +- Convert rich text and blocks to markdown +- Support for comments and page metadata + +**Example:** +```python +from contextframe.connectors import NotionConnector, ConnectorConfig, AuthType + +config = ConnectorConfig( + name="Notion Knowledge Base", + auth_type=AuthType.TOKEN, + auth_config={"token": "secret_xxxxx"}, # Notion integration token + sync_config={ + "sync_databases": True, + "sync_pages": True, + "include_archived": False, + "include_comments": True, + # Optional filters: + # "database_ids": ["db-uuid-1"], + # "page_ids": ["page-uuid-1"], + } +) + +connector = NotionConnector(config, dataset) +result = connector.sync() +``` + +### Slack Connector + +Import messages and threads from Slack workspaces. + +**Features:** +- Import from specific channels or entire workspace +- Preserve thread structure and replies +- Include reactions and file attachments +- Support for both public and private channels +- User information and message metadata + +**Example:** +```python +from contextframe.connectors import SlackConnector, ConnectorConfig, AuthType + +config = ConnectorConfig( + name="Team Slack", + auth_type=AuthType.TOKEN, + auth_config={"token": "xoxb-your-bot-token"}, + sync_config={ + "channel_names": ["general", "engineering", "product"], + "include_threads": True, + "include_reactions": True, + "days_to_sync": 30, # Last 30 days + "include_private": False, + } +) + +connector = SlackConnector(config, dataset) +result = connector.sync() +``` + +### Discord Connector + +Import messages and threads from Discord servers. + +**Features:** +- Import from specific servers and channels +- Support for threads and forum posts +- Include reactions, embeds, and attachments +- Handle message replies and relationships +- Voice channel text support + +**Example:** +```python +from contextframe.connectors import DiscordConnector, ConnectorConfig, AuthType + +config = ConnectorConfig( + name="Discord Community", + auth_type=AuthType.TOKEN, + auth_config={"bot_token": "your-discord-bot-token"}, + sync_config={ + "guild_ids": [123456789], # Server IDs + "channel_names": ["general", "development"], + "include_threads": True, + "include_forum_posts": True, + "days_to_sync": 14, + } +) + +# Note: Discord connector requires async execution +# See examples/all_connectors_usage.py for async usage +``` + +### Obsidian Connector + +Import notes and attachments from Obsidian vaults. + +**Features:** +- Import markdown notes with full content +- Extract and preserve frontmatter metadata +- Parse and link wiki-style backlinks +- Include attachments (images, PDFs, etc.) +- Support for tags and folder structure + +**Example:** +```python +from contextframe.connectors import ObsidianConnector, ConnectorConfig, AuthType + +config = ConnectorConfig( + name="Personal Vault", + auth_type=AuthType.NONE, # Local file access + sync_config={ + "vault_path": "/path/to/obsidian/vault", + "include_attachments": True, + "include_daily_notes": True, + "folders_to_exclude": [".obsidian", ".trash"], + "extract_frontmatter": True, + "extract_tags": True, + "extract_backlinks": True, + } +) + +connector = ObsidianConnector(config, dataset) +result = connector.sync() +``` + ## Base Connector Architecture All connectors inherit from the `SourceConnector` base class: diff --git a/examples/all_connectors_usage.py b/examples/all_connectors_usage.py new file mode 100644 index 0000000..72f4756 --- /dev/null +++ b/examples/all_connectors_usage.py @@ -0,0 +1,364 @@ +"""Comprehensive examples of all ContextFrame external system connectors.""" + +import os +from pathlib import Path + +from contextframe import FrameDataset +from contextframe.connectors import ( + GitHubConnector, + LinearConnector, + GoogleDriveConnector, + NotionConnector, + SlackConnector, + DiscordConnector, + ObsidianConnector, + ConnectorConfig, + AuthType, +) + + +def example_google_drive_sync(): + """Example of syncing Google Drive documents.""" + + dataset_path = Path("data/google_drive.lance") + dataset = FrameDataset.create(dataset_path) if not dataset_path.exists() else FrameDataset(dataset_path) + + # Service account authentication (recommended for production) + config = ConnectorConfig( + name="Google Drive Documents", + auth_type=AuthType.API_KEY, + auth_config={ + "service_account_info": { + # Service account JSON key content + "type": "service_account", + "project_id": os.getenv("GOOGLE_PROJECT_ID"), + "private_key_id": os.getenv("GOOGLE_PRIVATE_KEY_ID"), + "private_key": os.getenv("GOOGLE_PRIVATE_KEY").replace('\\n', '\n'), + "client_email": os.getenv("GOOGLE_CLIENT_EMAIL"), + "client_id": os.getenv("GOOGLE_CLIENT_ID"), + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": "https://oauth2.googleapis.com/token", + } + }, + sync_config={ + "folder_ids": ["folder-id-1", "folder-id-2"], # Specific folders + "export_google_formats": True, # Convert Google Docs to text + "include_trashed": False, + "file_patterns": ["*.pdf", "*.docx", "*.txt"], + } + ) + + connector = GoogleDriveConnector(config, dataset) + + if connector.validate_connection(): + print("Discovering Drive content...") + discovery = connector.discover_content() + print(f"Found {discovery['file_stats']['total_files']} files") + + print("\nSyncing Drive...") + result = connector.sync(incremental=True) + print(f"Created: {result.frames_created}, Updated: {result.frames_updated}") + + +def example_notion_sync(): + """Example of syncing Notion workspace.""" + + dataset_path = Path("data/notion_workspace.lance") + dataset = FrameDataset.create(dataset_path) if not dataset_path.exists() else FrameDataset(dataset_path) + + config = ConnectorConfig( + name="Notion Knowledge Base", + auth_type=AuthType.TOKEN, + auth_config={ + "token": os.getenv("NOTION_TOKEN"), # Notion integration token + }, + sync_config={ + "sync_databases": True, + "sync_pages": True, + "include_archived": False, + "include_comments": True, + # "database_ids": ["db-uuid-1", "db-uuid-2"], # Specific databases + # "page_ids": ["page-uuid-1"], # Specific pages + } + ) + + connector = NotionConnector(config, dataset) + + if connector.validate_connection(): + print("Discovering Notion content...") + discovery = connector.discover_content() + print(f"Found {discovery['stats']['total_pages']} pages and {discovery['stats']['total_databases']} databases") + + print("\nSyncing Notion...") + result = connector.sync(incremental=True) + print(f"Created: {result.frames_created}, Updated: {result.frames_updated}") + + +def example_slack_sync(): + """Example of syncing Slack workspace.""" + + dataset_path = Path("data/slack_workspace.lance") + dataset = FrameDataset.create(dataset_path) if not dataset_path.exists() else FrameDataset(dataset_path) + + config = ConnectorConfig( + name="Slack Team Chat", + auth_type=AuthType.TOKEN, + auth_config={ + "token": os.getenv("SLACK_BOT_TOKEN"), # Slack bot token (xoxb-...) + }, + sync_config={ + "channel_names": ["general", "engineering", "product"], # Specific channels + "include_threads": True, + "include_reactions": True, + "days_to_sync": 30, # Last 30 days + "include_private": False, # Only public channels + "include_bots": False, # Skip bot messages + } + ) + + connector = SlackConnector(config, dataset) + + if connector.validate_connection(): + print("Discovering Slack content...") + discovery = connector.discover_content() + print(f"Workspace: {discovery['workspace']['name']}") + print(f"Found {discovery['stats']['total_channels']} channels") + + print("\nSyncing Slack...") + result = connector.sync(incremental=True) + print(f"Created: {result.frames_created}, Updated: {result.frames_updated}") + + +def example_discord_sync(): + """Example of syncing Discord server.""" + + dataset_path = Path("data/discord_server.lance") + dataset = FrameDataset.create(dataset_path) if not dataset_path.exists() else FrameDataset(dataset_path) + + config = ConnectorConfig( + name="Discord Community", + auth_type=AuthType.TOKEN, + auth_config={ + "bot_token": os.getenv("DISCORD_BOT_TOKEN"), # Discord bot token + }, + sync_config={ + "guild_ids": [123456789], # Server IDs + "channel_names": ["general", "development", "support"], + "include_threads": True, + "include_forum_posts": True, + "days_to_sync": 14, # Last 2 weeks + "include_reactions": True, + "include_bots": False, + } + ) + + # Note: Discord connector requires async execution + print("Discord connector requires async execution - see documentation for async examples") + + +def example_obsidian_sync(): + """Example of syncing Obsidian vault.""" + + dataset_path = Path("data/obsidian_vault.lance") + dataset = FrameDataset.create(dataset_path) if not dataset_path.exists() else FrameDataset(dataset_path) + + config = ConnectorConfig( + name="Personal Knowledge Vault", + auth_type=AuthType.NONE, # Local file access + sync_config={ + "vault_path": "/path/to/obsidian/vault", # Path to Obsidian vault + "include_attachments": True, + "include_daily_notes": True, + "include_templates": False, + "folders_to_exclude": [".obsidian", ".trash", "Archive"], + "extract_frontmatter": True, + "extract_tags": True, + "extract_backlinks": True, + } + ) + + connector = ObsidianConnector(config, dataset) + + if connector.validate_connection(): + print("Discovering Obsidian vault...") + discovery = connector.discover_content() + print(f"Vault: {discovery['vault_name']}") + print(f"Found {discovery['file_stats']['total_notes']} notes") + print(f"Found {len(discovery['metadata']['tags_found'])} unique tags") + + print("\nSyncing Obsidian...") + result = connector.sync(incremental=True) + print(f"Created: {result.frames_created}, Updated: {result.frames_updated}") + + +def example_enterprise_knowledge_base(): + """Example of building a comprehensive enterprise knowledge base.""" + + # Create unified dataset + dataset_path = Path("data/enterprise_knowledge.lance") + dataset = FrameDataset.create(dataset_path) if not dataset_path.exists() else FrameDataset(dataset_path) + + connectors = [] + + # 1. GitHub - Code and documentation + if os.getenv("GITHUB_TOKEN"): + github_config = ConnectorConfig( + name="Company GitHub", + auth_type=AuthType.TOKEN, + auth_config={"token": os.getenv("GITHUB_TOKEN")}, + sync_config={ + "owner": "mycompany", + "repo": "documentation", + "paths": ["/docs", "/README.md"], + "file_patterns": ["*.md", "*.rst"], + } + ) + connectors.append(("GitHub", GitHubConnector(github_config, dataset))) + + # 2. Notion - Product requirements and specs + if os.getenv("NOTION_TOKEN"): + notion_config = ConnectorConfig( + name="Product Specs", + auth_type=AuthType.TOKEN, + auth_config={"token": os.getenv("NOTION_TOKEN")}, + sync_config={ + "sync_databases": True, + "sync_pages": True, + "include_archived": False, + } + ) + connectors.append(("Notion", NotionConnector(notion_config, dataset))) + + # 3. Linear - Project management and issues + if os.getenv("LINEAR_API_KEY"): + linear_config = ConnectorConfig( + name="Engineering Issues", + auth_type=AuthType.API_KEY, + auth_config={"api_key": os.getenv("LINEAR_API_KEY")}, + sync_config={ + "sync_teams": True, + "sync_projects": True, + "sync_issues": True, + "include_archived": False, + } + ) + connectors.append(("Linear", LinearConnector(linear_config, dataset))) + + # 4. Slack - Team discussions and decisions + if os.getenv("SLACK_BOT_TOKEN"): + slack_config = ConnectorConfig( + name="Team Discussions", + auth_type=AuthType.TOKEN, + auth_config={"token": os.getenv("SLACK_BOT_TOKEN")}, + sync_config={ + "channel_names": ["engineering", "product", "general"], + "include_threads": True, + "days_to_sync": 14, + } + ) + connectors.append(("Slack", SlackConnector(slack_config, dataset))) + + # Sync all sources + total_created = 0 + total_updated = 0 + + for name, connector in connectors: + if connector.validate_connection(): + print(f"\nSyncing {name}...") + result = connector.sync(incremental=True) + print(f"{name}: {result.frames_created} created, {result.frames_updated} updated") + total_created += result.frames_created + total_updated += result.frames_updated + + if result.errors: + print(f"{name} errors: {len(result.errors)}") + else: + print(f"Failed to connect to {name}") + + print(f"\n=== Enterprise Knowledge Base Complete ===") + print(f"Total frames created: {total_created}") + print(f"Total frames updated: {total_updated}") + + # Demonstrate unified search + print("\n=== Cross-Platform Search Examples ===") + + # Search for API documentation across all sources + api_docs = dataset.search("API documentation", limit=5) + print(f"\nFound {len(api_docs)} API-related documents:") + for doc in api_docs: + source = doc.metadata.get('source_type', 'unknown') + title = doc.metadata.get('title', 'Untitled') + print(f" - {title} ({source})") + + # Search for security-related content + security_content = dataset.search("security authentication", limit=5) + print(f"\nFound {len(security_content)} security-related items:") + for doc in security_content: + source = doc.metadata.get('source_type', 'unknown') + title = doc.metadata.get('title', 'Untitled') + print(f" - {title} ({source})") + + +def example_dependencies_check(): + """Check which connectors can be used based on installed dependencies.""" + + connectors_status = { + "GitHub": ("PyGithub", "github"), + "Linear": ("linear-python", "linear"), + "Google Drive": ("google-api-python-client", "googleapiclient"), + "Notion": ("notion-client", "notion_client"), + "Slack": ("slack-sdk", "slack_sdk"), + "Discord": ("discord.py", "discord"), + "Obsidian": ("Built-in", None), # No external dependencies + } + + print("=== Connector Dependencies Status ===") + + for connector_name, (package_name, import_name) in connectors_status.items(): + if import_name is None: + status = "✅ Available" + else: + try: + __import__(import_name) + status = "✅ Available" + except ImportError: + status = f"❌ Missing (install: pip install {package_name})" + + print(f"{connector_name:15} : {status}") + + +if __name__ == "__main__": + print("=== ContextFrame Connectors Examples ===") + + # Check dependencies first + example_dependencies_check() + print() + + # Run individual examples based on available credentials + examples = [ + ("Google Drive", example_google_drive_sync, ["GOOGLE_PROJECT_ID", "GOOGLE_CLIENT_EMAIL"]), + ("Notion", example_notion_sync, ["NOTION_TOKEN"]), + ("Slack", example_slack_sync, ["SLACK_BOT_TOKEN"]), + ("Obsidian", example_obsidian_sync, []), # No credentials needed + ] + + for name, example_func, required_env_vars in examples: + if not required_env_vars or all(os.getenv(var) for var in required_env_vars): + print(f"\n=== {name} Example ===") + try: + example_func() + except ImportError as e: + print(f"Skipping {name}: Missing dependency ({e})") + except Exception as e: + print(f"Error in {name} example: {e}") + else: + missing = [var for var in required_env_vars if not os.getenv(var)] + print(f"\nSkipping {name}: Set {', '.join(missing)} environment variable(s)") + + # Enterprise example if multiple sources available + if (os.getenv("GITHUB_TOKEN") and os.getenv("NOTION_TOKEN") and + os.getenv("LINEAR_API_KEY") and os.getenv("SLACK_BOT_TOKEN")): + print("\n=== Enterprise Knowledge Base Example ===") + example_enterprise_knowledge_base() + else: + print("\nSet multiple credentials to run the enterprise knowledge base example") \ No newline at end of file