Source code for gratools.logger_config

# Standard library imports
import logging
import logging.config  # Not directly used, but good to have if you plan advanced config
import queue
import sys
import threading
from pathlib import Path
from typing import List  # For type hinting list of colors

# Third-party imports
from rich.console import Console  # For shared console instance
from rich.logging import RichHandler  # Base class for custom handler
from rich.traceback import install as install_rich_traceback  # For enhanced tracebacks

# Create a shared console instance to be used by RichHandler and potentially elsewhere.
# `record=True` allows capturing output, useful for testing or saving console content.
shared_console = Console(record=True)


# Define a list of available colors for potential use in logging or UI.
# This list is not directly used by the current logging setup but is good to have defined.
AVAILABLE_RICH_COLORS: List[str] = [
    "black", "red", "green", "yellow", "blue", "magenta", "cyan", "white",
    "bright_black", "bright_red", "bright_green", "bright_yellow",
    "bright_blue", "bright_magenta", "bright_cyan", "bright_white",
]


[docs] class ThreadedRichHandler(RichHandler): # Renamed for clarity (was MyRichLogHandler) """ Custom RichHandler that processes log records in a separate thread to prevent blocking the main application thread, especially during I/O-bound logging operations (like writing to console or complex formatting). It also includes a feature to trigger a program exit if a log record of ERROR level or higher is emitted through it. Attributes ---------- running : bool A flag indicating whether the log processing worker thread should continue running. log_queue : queue.Queue[logging.LogRecord] # Type hint for clarity A thread-safe queue used to buffer log records before they are processed by the worker thread. worker_thread : threading.Thread The background thread responsible for consuming log records from `log_queue`. critical_error_occurred : bool A flag set to True if an ERROR or CRITICAL log has been processed, leading to program termination. """
[docs] def __init__(self, *args, **kwargs) -> None: """ Initializes the ThreadedRichHandler. Sets up the log queue and starts the background worker thread. """ super().__init__(*args, **kwargs) self.running: bool = True self.log_queue: queue.Queue[logging.LogRecord] = queue.Queue() # Maxsize can be set if needed self.worker_thread: threading.Thread = threading.Thread(target=self._process_log_queue, name="RichLogHandlerThread") self.worker_thread.daemon = True # Allows main program to exit even if this thread is running self.worker_thread.start() self.critical_error_occurred: bool = False
def _handle_single_queued_record(self) -> bool: """ Retrieves and emits a single log record from the internal queue. This method attempts a non-blocking get from the queue with a short timeout. If a record is found, it's emitted using the parent RichHandler's `emit` method. Returns ------- bool True if a log record was successfully retrieved and processed, False otherwise (e.g., queue was empty). """ try: # Get a record from the queue. Timeout allows the loop to periodically check `self.running`. record = self.log_queue.get(block=True, timeout=0.05) # block=True with timeout super().emit(record) # Use the actual emit method of RichHandler (or its parent) self.log_queue.task_done() # Signal that the retrieved item has been processed return True except queue.Empty: return False # Queue was empty within the timeout def _process_log_queue(self) -> None: """ The main loop for the worker thread. Continuously retrieves log records from the `log_queue` and processes them as long as `self.running` is True. After `self.running` is set to False, it attempts to process any remaining records in the queue before exiting. """ while self.running: self._handle_single_queued_record() # After self.running is set to False, process any remaining items in the queue while self._handle_single_queued_record(): pass # Loop until queue is empty or _handle_single_queued_record fails to get an item
[docs] def emit(self, record: logging.LogRecord) -> None: """ Queues a log record for processing by the worker thread. If the log record's level is ERROR or higher, this method sets a flag to indicate a critical error and initiates the process to stop the log processing and exit the application. Parameters ---------- record : logging.LogRecord The log record to be emitted. """ # Add the record to the queue for the worker thread to handle self.log_queue.put(record) # Critical error handling: if an ERROR or CRITICAL message is logged, # prepare to stop everything. if record.levelno >= logging.ERROR and not self.critical_error_occurred: self.critical_error_occurred = True # Set flag to prevent multiple exits if errors flood # The actual sys.exit(1) should ideally be coordinated at a higher level # after this handler and other resources have a chance to clean up. # Forcing an immediate exit here might be too abrupt. # A cleaner way is to signal the main application to shut down. # However, if immediate stop is desired: self.stop_processing() # Attempt to flush queue and stop worker # Note: sys.exit() from a non-main thread can be problematic. # It's better to raise a specific exception or signal the main thread. # For now, keeping original behavior but with a warning. # Consider using a custom exception that the main thread can catch for graceful shutdown. # Forcing exit here: logging.shutdown() # Attempt to flush all standard logging handlers sys.exit(1) # This is very aggressive.
[docs] def stop_processing(self) -> None: """ Signals the worker thread to stop and waits for it to terminate. This ensures that the queue is flushed of pending log records before the thread exits. """ if self.running: # Prevent multiple calls or calls if already stopped self.running = False if self.worker_thread.is_alive(): self.log_queue.join() # Wait for all items in queue to be processed (task_done called) self.worker_thread.join(timeout=1.0) # Wait for worker thread to finish, with a timeout if self.worker_thread.is_alive(): self.console.print("[yellow]Warning: Log worker thread did not terminate gracefully.[/yellow]", file=sys.stderr)
[docs] def close(self) -> None: """ Closes the handler, ensuring the worker thread is stopped and resources are released. """ self.stop_processing() # Ensure worker is stopped and queue flushed super().close() # Call close on the parent RichHandler
[docs] def configure_logger( name: str, log_dir_path: Path, # Changed from str to Path for consistency verbosity_level: str, # Renamed for clarity file_suffix: str = "" # Renamed for clarity ) -> logging.Logger: """ Configures and returns a logger instance with specified settings. The logger will output to: 1. The console (via RichHandler for formatted, colorful output). 2. A general log file (e.g., 'name_log.o'). 3. An error log file for WARNING and higher messages (e.g., 'name_log.e'). Parameters ---------- name : str The name for the logger (e.g., "GraTools"). log_dir_path : Path The directory path where log files will be stored. verbosity_level : str The logging verbosity level (e.g., "DEBUG", "INFO", "ERROR"). This sets the minimum level for messages to be processed by the logger. file_suffix : str, optional An optional suffix to append to the base name of log files. Defaults to an empty string. Returns ------- logging.Logger The configured logger instance. """ # Ensure log directory exists try: log_dir_path.mkdir(parents=True, exist_ok=True) except OSError as e: # Fallback to console print if logger isn't fully up for this error. print(f"CRITICAL: Failed to create log directory '{log_dir_path}': {e}. Log files may not be written.", file=sys.stderr) # Depending on severity, might raise e or attempt to continue without file logging. log_file_basename = log_dir_path / f"{name}{file_suffix}_log" # e.g., /path/to/logs/GraTools_query1_log # Normalize verbosity level and determine if debug features are active effective_verbosity = verbosity_level.upper() is_debug_mode = (effective_verbosity == "DEBUG") if is_debug_mode: # Install Rich's traceback handler for prettier exception displays during debug mode. install_rich_traceback(show_locals=True, console=shared_console) # Pass console for consistency # Standard formatter for file logs # Includes more details (module, function) if in debug mode. file_log_formatter = logging.Formatter( fmt=( "%(asctime)s |" + (" %(funcName)-25s | " if is_debug_mode else "") + " %(message)s" ), datefmt="%Y-%m-%d %H:%M", # Changed to standard ISO-like format ) # Configure the custom RichHandler for console output console_rich_handler = ThreadedRichHandler( rich_tracebacks=is_debug_mode, # Enable rich tracebacks in debug mode markup=True, # Allow Rich markup in log messages show_time=False, # Time will be part of the formatter for consistency show_level=True, # Level will be part of the formatter show_path=is_debug_mode, # Show path to source file in debug mode console=shared_console, # Use the globally shared Rich Console keywords=RichHandler.KEYWORDS + ["GraTools", "GFA"] # Example: Highlight custom keywords ) # Apply a formatter to RichHandler to control prefix (asctime, levelname, etc.) # RichHandler itself handles the styling of the message body. console_rich_handler.setFormatter(file_log_formatter) # Use the same detailed formatter for console consistency # Configure FileHandler for all logs (e.g., .o for output) general_log_file_handler = logging.FileHandler(f"{log_file_basename}.log", mode="w", encoding="utf-8") # Changed to .log, mode 'w' general_log_file_handler.setFormatter(file_log_formatter) general_log_file_handler.setLevel(effective_verbosity) # Log everything from the specified level # Configure FileHandler for errors and warnings (e.g., .e for error) error_log_file_handler = logging.FileHandler(f"{log_file_basename}.err", mode="w", encoding="utf-8") # Changed to .err, mode 'w' error_log_file_handler.setFormatter(file_log_formatter) error_log_file_handler.setLevel(logging.WARNING) # Capture WARNING, ERROR, CRITICAL # Get or create the main logger instance logger = logging.getLogger(name) logger.setLevel(effective_verbosity) # Set the logger's own effective level # Clear existing handlers, if any (important if reconfiguring) if logger.hasHandlers(): logger.handlers.clear() # Add the configured handlers logger.addHandler(general_log_file_handler) logger.addHandler(error_log_file_handler) logger.addHandler(console_rich_handler) # Prevent log messages from propagating to the root logger, # especially if the root logger has its own handlers (e.g., default basicConfig). logger.propagate = False return logger
[docs] def update_logger_file_suffix(logger: logging.Logger, new_file_suffix: str) -> logging.Logger: # Renamed """ Updates file handlers of a logger to use a new file suffix. Existing log content from the old file (if any) is copied to the new file location before switching. The old file is then deleted. This is useful if, for instance, a specific operation (like a query) should have its logs in a uniquely named file determined mid-execution. Parameters ---------- logger : logging.Logger The logger instance whose file handlers need updating. new_file_suffix : str The new suffix to be incorporated into the log filenames. Example: if old was "main_log.log", new_suffix="_query123", new becomes "main_log_query123.log". Returns ------- logging.Logger The same logger instance, now with updated file handlers. """ if not new_file_suffix: # If new suffix is empty, no change needed or could be an error. return logger handlers_to_add_back = [] # Store new handlers to add after iterating for handler_idx in range(len(logger.handlers) - 1, -1, -1): # Iterate backwards for safe removal handler = logger.handlers[handler_idx] if isinstance(handler, logging.FileHandler): original_formatter = handler.formatter original_level = handler.level # Close current handler before manipulating its file handler.flush() handler.close() old_log_path = Path(handler.baseFilename) # Construct new filename: insert new_file_suffix before the final extension. new_stem = f"{old_log_path.stem}{new_file_suffix}" new_log_path = old_log_path.with_name(f"{new_stem}{old_log_path.suffix}") if old_log_path.exists() and old_log_path != new_log_path: try: # Ensure target directory exists for the new log file new_log_path.parent.mkdir(parents=True, exist_ok=True) with open(old_log_path, "r", encoding="utf-8") as src_file, \ open(new_log_path, "w", encoding="utf-8") as dst_file: # 'w' to overwrite/create new content = src_file.read() dst_file.write(content) # Content copied, now remove old handler and prepare new one except IOError as e: logger.error(f"Error copying log content from '{old_log_path}' to '{new_log_path}': {e}", exc_info=True) # If copy fails, might reconsider removing handler or deleting old file logger.removeHandler(handler) # Remove the old handler from the logger # Create and add the new FileHandler try: new_file_handler = logging.FileHandler(new_log_path, mode="a", encoding="utf-8") # Append to new/copied file new_file_handler.setFormatter(original_formatter) new_file_handler.setLevel(original_level) handlers_to_add_back.append(new_file_handler) # Delete old log file only if successfully switched and content copied if old_log_path.exists() and old_log_path != new_log_path: try: old_log_path.unlink(missing_ok=True) # missing_ok in case it was already removed except OSError as e: logger.warning(f"Failed to delete old log file '{old_log_path.name}': {e}", exc_info=True) except Exception as e: logger.error(f"Failed to create new FileHandler for '{new_log_path}': {e}", exc_info=True) # If new handler fails, might want to re-add old one if possible, or log critical error. # Add all newly created handlers for new_h in handlers_to_add_back: logger.addHandler(new_h) return logger
# Example usage demonstration (typically not in a library file like this) if __name__ == "__main__": # Create a dummy log directory for the example example_log_dir = Path("./example_gratools_logs") example_log_dir.mkdir(exist_ok=True) # Configure logger with an initial setup main_logger = configure_logger( name="GraToolsApp", log_dir_path=example_log_dir, verbosity_level="DEBUG", # Test with DEBUG level file_suffix="_initial" ) main_logger.info("This is an informational message in the initial log.") main_logger.debug("This is a debug message for the initial setup.") # Simulate a scenario where a new suffix is needed query_specific_suffix = "_query_XYZ" main_logger.info(f"Updating logger for query-specific suffix: '{query_specific_suffix}'") main_logger = update_logger_file_suffix(main_logger, query_specific_suffix) main_logger.warning("This is a warning message in the query-specific log.") main_logger.info("Logging continues to the new file with the updated suffix.") # Example of an error that would trigger sys.exit in ThreadedRichHandler # To test this, you might need to run this script and observe its exit code. # Be cautious as this will terminate the script. try: main_logger.error("This is a test error that should cause the program to exit via ThreadedRichHandler.") main_logger.info("This line might not be reached if the error above exits.") except SystemExit as e: print(f"Program exited with code: {e.code} (as expected after error log via ThreadedRichHandler).") print(f"Log files should be in: {example_log_dir.resolve()}")