# Standard library imports
import logging
import logging.config # Not directly used, but good to have if you plan advanced config
import queue
import sys
import threading
from pathlib import Path
from typing import List # For type hinting list of colors
# Third-party imports
from rich.console import Console # For shared console instance
from rich.logging import RichHandler # Base class for custom handler
from rich.traceback import install as install_rich_traceback # For enhanced tracebacks
# Create a shared console instance to be used by RichHandler and potentially elsewhere.
# `record=True` allows capturing output, useful for testing or saving console content.
shared_console = Console(record=True)
# Define a list of available colors for potential use in logging or UI.
# This list is not directly used by the current logging setup but is good to have defined.
AVAILABLE_RICH_COLORS: List[str] = [
"black", "red", "green", "yellow", "blue", "magenta", "cyan", "white",
"bright_black", "bright_red", "bright_green", "bright_yellow",
"bright_blue", "bright_magenta", "bright_cyan", "bright_white",
]
[docs]
class ThreadedRichHandler(RichHandler): # Renamed for clarity (was MyRichLogHandler)
"""
Custom RichHandler that processes log records in a separate thread
to prevent blocking the main application thread, especially during
I/O-bound logging operations (like writing to console or complex formatting).
It also includes a feature to trigger a program exit if a log record
of ERROR level or higher is emitted through it.
Attributes
----------
running : bool
A flag indicating whether the log processing worker thread should continue running.
log_queue : queue.Queue[logging.LogRecord] # Type hint for clarity
A thread-safe queue used to buffer log records before they are processed by the worker thread.
worker_thread : threading.Thread
The background thread responsible for consuming log records from `log_queue`.
critical_error_occurred : bool
A flag set to True if an ERROR or CRITICAL log has been processed,
leading to program termination.
"""
[docs]
def __init__(self, *args, **kwargs) -> None:
"""
Initializes the ThreadedRichHandler.
Sets up the log queue and starts the background worker thread.
"""
super().__init__(*args, **kwargs)
self.running: bool = True
self.log_queue: queue.Queue[logging.LogRecord] = queue.Queue() # Maxsize can be set if needed
self.worker_thread: threading.Thread = threading.Thread(target=self._process_log_queue,
name="RichLogHandlerThread")
self.worker_thread.daemon = True # Allows main program to exit even if this thread is running
self.worker_thread.start()
self.critical_error_occurred: bool = False
def _handle_single_queued_record(self) -> bool:
"""
Retrieves and emits a single log record from the internal queue.
This method attempts a non-blocking get from the queue with a short timeout.
If a record is found, it's emitted using the parent RichHandler's `emit` method.
Returns
-------
bool
True if a log record was successfully retrieved and processed, False otherwise (e.g., queue was empty).
"""
try:
# Get a record from the queue. Timeout allows the loop to periodically check `self.running`.
record = self.log_queue.get(block=True, timeout=0.05) # block=True with timeout
super().emit(record) # Use the actual emit method of RichHandler (or its parent)
self.log_queue.task_done() # Signal that the retrieved item has been processed
return True
except queue.Empty:
return False # Queue was empty within the timeout
def _process_log_queue(self) -> None:
"""
The main loop for the worker thread.
Continuously retrieves log records from the `log_queue` and processes them
as long as `self.running` is True. After `self.running` is set to False,
it attempts to process any remaining records in the queue before exiting.
"""
while self.running:
self._handle_single_queued_record()
# After self.running is set to False, process any remaining items in the queue
while self._handle_single_queued_record():
pass # Loop until queue is empty or _handle_single_queued_record fails to get an item
[docs]
def emit(self, record: logging.LogRecord) -> None:
"""
Queues a log record for processing by the worker thread.
If the log record's level is ERROR or higher, this method sets a flag
to indicate a critical error and initiates the process to stop the
log processing and exit the application.
Parameters
----------
record : logging.LogRecord
The log record to be emitted.
"""
# Add the record to the queue for the worker thread to handle
self.log_queue.put(record)
# Critical error handling: if an ERROR or CRITICAL message is logged,
# prepare to stop everything.
if record.levelno >= logging.ERROR and not self.critical_error_occurred:
self.critical_error_occurred = True # Set flag to prevent multiple exits if errors flood
# The actual sys.exit(1) should ideally be coordinated at a higher level
# after this handler and other resources have a chance to clean up.
# Forcing an immediate exit here might be too abrupt.
# A cleaner way is to signal the main application to shut down.
# However, if immediate stop is desired:
self.stop_processing() # Attempt to flush queue and stop worker
# Note: sys.exit() from a non-main thread can be problematic.
# It's better to raise a specific exception or signal the main thread.
# For now, keeping original behavior but with a warning.
# Consider using a custom exception that the main thread can catch for graceful shutdown.
# Forcing exit here:
logging.shutdown() # Attempt to flush all standard logging handlers
sys.exit(1) # This is very aggressive.
[docs]
def stop_processing(self) -> None:
"""
Signals the worker thread to stop and waits for it to terminate.
This ensures that the queue is flushed of pending log records before
the thread exits.
"""
if self.running: # Prevent multiple calls or calls if already stopped
self.running = False
if self.worker_thread.is_alive():
self.log_queue.join() # Wait for all items in queue to be processed (task_done called)
self.worker_thread.join(timeout=1.0) # Wait for worker thread to finish, with a timeout
if self.worker_thread.is_alive():
self.console.print("[yellow]Warning: Log worker thread did not terminate gracefully.[/yellow]",
file=sys.stderr)
[docs]
def close(self) -> None:
"""
Closes the handler, ensuring the worker thread is stopped and
resources are released.
"""
self.stop_processing() # Ensure worker is stopped and queue flushed
super().close() # Call close on the parent RichHandler
[docs]
def update_logger_file_suffix(logger: logging.Logger, new_file_suffix: str) -> logging.Logger: # Renamed
"""
Updates file handlers of a logger to use a new file suffix.
Existing log content from the old file (if any) is copied to the new file location
before switching. The old file is then deleted.
This is useful if, for instance, a specific operation (like a query)
should have its logs in a uniquely named file determined mid-execution.
Parameters
----------
logger : logging.Logger
The logger instance whose file handlers need updating.
new_file_suffix : str
The new suffix to be incorporated into the log filenames.
Example: if old was "main_log.log", new_suffix="_query123", new becomes "main_log_query123.log".
Returns
-------
logging.Logger
The same logger instance, now with updated file handlers.
"""
if not new_file_suffix: # If new suffix is empty, no change needed or could be an error.
return logger
handlers_to_add_back = [] # Store new handlers to add after iterating
for handler_idx in range(len(logger.handlers) - 1, -1, -1): # Iterate backwards for safe removal
handler = logger.handlers[handler_idx]
if isinstance(handler, logging.FileHandler):
original_formatter = handler.formatter
original_level = handler.level
# Close current handler before manipulating its file
handler.flush()
handler.close()
old_log_path = Path(handler.baseFilename)
# Construct new filename: insert new_file_suffix before the final extension.
new_stem = f"{old_log_path.stem}{new_file_suffix}"
new_log_path = old_log_path.with_name(f"{new_stem}{old_log_path.suffix}")
if old_log_path.exists() and old_log_path != new_log_path:
try:
# Ensure target directory exists for the new log file
new_log_path.parent.mkdir(parents=True, exist_ok=True)
with open(old_log_path, "r", encoding="utf-8") as src_file, \
open(new_log_path, "w", encoding="utf-8") as dst_file: # 'w' to overwrite/create new
content = src_file.read()
dst_file.write(content)
# Content copied, now remove old handler and prepare new one
except IOError as e:
logger.error(f"Error copying log content from '{old_log_path}' to '{new_log_path}': {e}",
exc_info=True)
# If copy fails, might reconsider removing handler or deleting old file
logger.removeHandler(handler) # Remove the old handler from the logger
# Create and add the new FileHandler
try:
new_file_handler = logging.FileHandler(new_log_path, mode="a",
encoding="utf-8") # Append to new/copied file
new_file_handler.setFormatter(original_formatter)
new_file_handler.setLevel(original_level)
handlers_to_add_back.append(new_file_handler)
# Delete old log file only if successfully switched and content copied
if old_log_path.exists() and old_log_path != new_log_path:
try:
old_log_path.unlink(missing_ok=True) # missing_ok in case it was already removed
except OSError as e:
logger.warning(f"Failed to delete old log file '{old_log_path.name}': {e}", exc_info=True)
except Exception as e:
logger.error(f"Failed to create new FileHandler for '{new_log_path}': {e}", exc_info=True)
# If new handler fails, might want to re-add old one if possible, or log critical error.
# Add all newly created handlers
for new_h in handlers_to_add_back:
logger.addHandler(new_h)
return logger
# Example usage demonstration (typically not in a library file like this)
if __name__ == "__main__":
# Create a dummy log directory for the example
example_log_dir = Path("./example_gratools_logs")
example_log_dir.mkdir(exist_ok=True)
# Configure logger with an initial setup
main_logger = configure_logger(
name="GraToolsApp",
log_dir_path=example_log_dir,
verbosity_level="DEBUG", # Test with DEBUG level
file_suffix="_initial"
)
main_logger.info("This is an informational message in the initial log.")
main_logger.debug("This is a debug message for the initial setup.")
# Simulate a scenario where a new suffix is needed
query_specific_suffix = "_query_XYZ"
main_logger.info(f"Updating logger for query-specific suffix: '{query_specific_suffix}'")
main_logger = update_logger_file_suffix(main_logger, query_specific_suffix)
main_logger.warning("This is a warning message in the query-specific log.")
main_logger.info("Logging continues to the new file with the updated suffix.")
# Example of an error that would trigger sys.exit in ThreadedRichHandler
# To test this, you might need to run this script and observe its exit code.
# Be cautious as this will terminate the script.
try:
main_logger.error("This is a test error that should cause the program to exit via ThreadedRichHandler.")
main_logger.info("This line might not be reached if the error above exits.")
except SystemExit as e:
print(f"Program exited with code: {e.code} (as expected after error log via ThreadedRichHandler).")
print(f"Log files should be in: {example_log_dir.resolve()}")