Source code for stable_pretraining.utils.gdrive_utils

"""Google Drive Background Uploader.

Upload files to Google Drive without blocking your application.
Features: auto-versioning, background processing, comprehensive logging.

Example:
    >>> uploader = GDriveUploader("MyProject", "credentials.json")
    >>> uploader.upload_file("large_file.mp4")  # Returns immediately!
    >>> uploader.wait_for_uploads()  # Optional: wait for completion
"""

import atexit
import mimetypes
from pathlib import Path
from queue import Queue
from threading import Thread
from typing import Optional, Callable
from loguru import logger


[docs] class GDriveUploader: """Background Google Drive uploader with automatic folder versioning. All uploads happen in a background thread - your code never blocks! Automatically creates versioned folders (MyFolder_v2, MyFolder_v3, etc). Args: folder_name: Name of the Drive folder to create credentials_path: Path to service account JSON credentials parent_folder_id: Optional parent folder ID (None = root) callback: Optional function(file_path, file_id, success) called on completion Example: >>> def notify(path, fid, ok): ... print(f"{'✓' if ok else '✗'} {path}") >>> >>> uploader = GDriveUploader("Backups", "creds.json", callback=notify) >>> uploader.upload_file("data.csv") # Non-blocking! >>> uploader.upload_file("logs.txt") >>> uploader.wait_for_uploads() # Wait for all to finish """ def __init__( self, folder_name: str, credentials_path: str, parent_folder_id: Optional[str] = None, callback: Optional[Callable[[str, Optional[str], bool], None]] = None, ): logger.info("=" * 70) logger.info(f"🚀 Initializing GDriveUploader: {folder_name}") logger.debug( f"Credentials: {credentials_path} | Parent: {parent_folder_id or 'ROOT'}" ) self.folder_name = folder_name self.credentials_path = credentials_path self.parent_folder_id = parent_folder_id self.callback = callback self._upload_queue = Queue() self._stop_worker = False try: # Authenticate and create folder logger.info("Step 1/3: Authenticating...") self.service = self._authenticate() logger.success("✓ Authenticated") logger.info("Step 2/3: Creating/versioning folder...") self.folder_id = self._create_or_version_folder() logger.success(f"✓ Folder ready: {self.folder_id}") logger.info("Step 3/3: Starting background worker...") self._worker_thread = Thread( target=self._upload_worker, daemon=True, name="GDriveWorker" ) self._worker_thread.start() logger.success(f"✓ Worker started (TID: {self._worker_thread.ident})") atexit.register(self._cleanup) logger.success("=" * 70) logger.success(f"✅ Ready! URL: {self.get_folder_url()}") logger.success("=" * 70) except Exception as e: logger.error(f"❌ Initialization failed: {e}") logger.exception("Full error:") raise def _authenticate(self): """Authenticate and return Drive service.""" from google.oauth2 import service_account from googleapiclient.discovery import build try: logger.debug(f"Loading credentials from: {self.credentials_path}") if not Path(self.credentials_path).exists(): raise FileNotFoundError( f"Credentials not found: {self.credentials_path}" ) creds = service_account.Credentials.from_service_account_file( self.credentials_path, scopes=["https://www.googleapis.com/auth/drive"] ) service = build("drive", "v3", credentials=creds) # Test connection about = service.about().get(fields="user").execute() user = about.get("user", {}).get("emailAddress", "unknown") logger.debug(f"Connected as: {user}") return service except Exception as e: logger.error(f"Authentication failed: {e}") raise def _create_or_version_folder(self) -> str: """Create folder or version it if exists. Returns folder ID.""" try: # Search for existing folders query_parts = [ f"name contains '{self.folder_name}'", "mimeType='application/vnd.google-apps.folder'", "trashed=false", ] if self.parent_folder_id: query_parts.append(f"'{self.parent_folder_id}' in parents") query = " and ".join(query_parts) logger.debug(f"Search query: {query}") results = ( self.service.files() .list(q=query, spaces="drive", fields="files(id, name)", pageSize=100) .execute() ) folders = results.get("files", []) logger.info(f"Found {len(folders)} existing folder(s)") # Check if base name exists if not any(f["name"] == self.folder_name for f in folders): logger.info(f"Creating new folder: {self.folder_name}") return self._create_folder(self.folder_name) # Find highest version version = 1 for folder in folders: if "_v" in folder["name"]: try: ver = int(folder["name"].split("_v")[-1]) version = max(version, ver) logger.debug(f"Found version: v{ver}") except ValueError: continue # Create versioned folder version += 1 versioned_name = f"{self.folder_name}_v{version}" logger.info(f"Creating versioned folder: {versioned_name}") folder_id = self._create_folder(versioned_name) logger.success(f"✓ Created {versioned_name} (ID: {folder_id})") return folder_id except Exception as e: logger.error(f"Folder creation error: {e}") raise def _create_folder(self, name: str) -> str: """Create a folder and return its ID.""" try: metadata = {"name": name, "mimeType": "application/vnd.google-apps.folder"} if self.parent_folder_id: metadata["parents"] = [self.parent_folder_id] folder = self.service.files().create(body=metadata, fields="id").execute() return folder.get("id") except Exception as e: logger.error(f"Error creating folder '{name}': {e}") raise def _upload_worker(self): """Background worker that processes upload queue.""" logger.info(f"🔄 Worker started (TID: {Thread.current_thread().ident})") processed = 0 while not self._stop_worker: try: task = self._upload_queue.get(timeout=1) if task is None: # Shutdown signal logger.info("Worker received shutdown signal") break file_path, custom_name, subfolder_id = task processed += 1 logger.info(f"📤 Processing upload #{processed}: {file_path}") file_id = self._upload_file(file_path, custom_name, subfolder_id) # Callback if self.callback: try: self.callback(file_path, file_id, success=(file_id is not None)) except Exception as e: logger.error(f"Callback error: {e}") self._upload_queue.task_done() logger.debug(f"Queue size: {self._upload_queue.qsize()}") except Exception as e: if not self._stop_worker: logger.error(f"Worker error: {e}") logger.info(f"🛑 Worker stopped (processed {processed} uploads)") def _upload_file( self, file_path: str, custom_name: Optional[str] = None, subfolder_id: Optional[str] = None, ) -> Optional[str]: """Upload file and return file ID (or None on failure).""" from googleapiclient.http import MediaFileUpload from googleapiclient.errors import HttpError try: path = Path(file_path) # Validate if not path.exists() or not path.is_file(): logger.error(f"❌ Invalid file: {file_path}") return None # File info size_mb = path.stat().st_size / (1024 * 1024) logger.info(f"📁 Uploading: {path.name} ({size_mb:.2f} MB)") # MIME type mime_type = mimetypes.guess_type(file_path)[0] or "application/octet-stream" logger.debug(f"MIME type: {mime_type}") # Metadata metadata = { "name": custom_name or path.name, "parents": [subfolder_id or self.folder_id], } # Upload media = MediaFileUpload(file_path, mimetype=mime_type, resumable=True) logger.info("🚀 Uploading to Drive...") file = ( self.service.files() .create( body=metadata, media_body=media, fields="id, name, size, webViewLink", ) .execute() ) file_id = file.get("id") logger.success("=" * 60) logger.success(f"✅ Upload complete: {file.get('name')}") logger.success(f" File ID: {file_id}") logger.success(f" Size: {file.get('size')} bytes") logger.debug(f" Link: {file.get('webViewLink')}") logger.success("=" * 60) return file_id except HttpError as e: logger.error(f"❌ HTTP error: {e.resp.status} - {e.resp.reason}") return None except Exception as e: logger.error(f"❌ Upload failed: {e}") logger.exception("Full traceback:") return None
[docs] def upload_file( self, file_path: str, custom_name: Optional[str] = None, subfolder_id: Optional[str] = None, ) -> None: """Queue a file for background upload. Returns immediately! Args: file_path: Path to file to upload custom_name: Optional custom name in Drive subfolder_id: Optional subfolder ID (defaults to main folder) Example: >>> uploader.upload_file("video.mp4") >>> uploader.upload_file("report.pdf", custom_name="Q4_report.pdf") """ logger.info(f"📋 Queuing: {file_path}") self._upload_queue.put((file_path, custom_name, subfolder_id)) queue_size = self._upload_queue.qsize() logger.info(f"✓ Queued (queue size: {queue_size})") if queue_size > 10: logger.warning(f"⚠️ Large queue: {queue_size} pending uploads")
[docs] def wait_for_uploads(self, timeout: Optional[float] = None) -> bool: """Wait for all queued uploads to complete. Args: timeout: Max seconds to wait (None = wait forever) Returns: True if completed, False if timeout/error Example: >>> uploader.upload_file("file1.txt") >>> uploader.upload_file("file2.txt") >>> uploader.wait_for_uploads(timeout=300) # Wait max 5 min """ pending = self.get_queue_size() if pending == 0: logger.info("✓ No pending uploads") return True logger.info(f"⏳ Waiting for {pending} upload(s)...") if timeout: logger.debug(f"Timeout: {timeout}s") try: self._upload_queue.join() logger.success("✅ All uploads complete") return True except Exception as e: logger.error(f"❌ Wait error: {e}") return False
[docs] def get_queue_size(self) -> int: """Get number of pending uploads.""" return self._upload_queue.qsize()
[docs] def get_folder_id(self) -> str: """Get the Drive folder ID.""" return self.folder_id
[docs] def get_folder_url(self) -> str: """Get the Drive folder URL.""" return f"https://drive.google.com/drive/folders/{self.folder_id}"
def _cleanup(self): """Shutdown worker thread gracefully.""" logger.info("🛑 Shutting down GDriveUploader...") self._stop_worker = True self._upload_queue.put(None) # Poison pill if self._worker_thread and self._worker_thread.is_alive(): self._worker_thread.join(timeout=10) logger.info("✓ Shutdown complete")
# Usage Example if __name__ == "__main__": # Configure logging logger.add( "gdrive_{time}.log", rotation="10 MB", retention="7 days", level="DEBUG", format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {message}", ) # Callback for upload notifications def on_upload_complete(file_path: str, file_id: Optional[str], success: bool): if success: logger.info(f"✅ Uploaded: {file_path} -> {file_id}") else: logger.error(f"❌ Failed: {file_path}") # Initialize uploader uploader = GDriveUploader( folder_name="MyProject", credentials_path="service_account.json", callback=on_upload_complete, ) # Upload files - all non-blocking! uploader.upload_file("document.pdf") uploader.upload_file("image.png") uploader.upload_file("data.csv", custom_name="backup_data.csv") logger.info("Files queued! Doing other work...") # Check queue logger.info(f"Pending: {uploader.get_queue_size()}") # Wait for all uploads before exiting uploader.wait_for_uploads() logger.info(f"All done! Folder: {uploader.get_folder_url()}")