Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import requests
- import os
- import csv
- import logging
- import threading
- import sys
- import time
- import yaml
- import shutil
- from rich.progress import Progress, BarColumn, TextColumn, TimeRemainingColumn, SpinnerColumn
- from concurrent.futures import ThreadPoolExecutor, as_completed
- from rich.console import Console
- from colorama import Fore, Style, init
- from rich.traceback import install
- import webbrowser
- # Initialize modules
- install()
- console = Console()
- init(autoreset=True)
- # Logging setup
- LOG_FILE = "Waylink.log"
- logging.basicConfig(
- filename=LOG_FILE,
- level=logging.DEBUG,
- format="%(asctime)s - %(levelname)s - %(message)s",
- datefmt="%Y-%m-%d %H:%M:%S"
- )
- logging.info("Script initialized.")
- class WaybackMachineArchiver:
- def __init__(self, config_file="config.yaml"):
- """Loads configuration and initializes paths."""
- try:
- with open(config_file, "r") as file:
- config = yaml.safe_load(file)
- if not isinstance(config, dict):
- raise ValueError("Configuration file must be a dictionary.")
- except Exception as e:
- console.print(f"[red]Failed to load configuration: {e}[/red]")
- logging.exception("Config loading error.")
- exit(1)
- self.input_file = config.get("input_file", "urls.txt")
- self.output_dir = config.get("output_dir", "output")
- self.max_workers = max(1, config.get("max_workers", os.cpu_count() or 4))
- self.retries = config.get("retries", 3)
- self.timeout = config.get("timeout", 10)
- self.initial_delay = config.get("initial_delay", 1)
- self.max_delay = config.get("max_delay", 16)
- self.verbose = config.get("verbose", True)
- self.open_output_dir_on_completion = config.get("open_output_dir_on_completion", False)
- self.save_summary = config.get("save_summary", True)
- os.makedirs(self.output_dir, exist_ok=True)
- self.executor = ThreadPoolExecutor(max_workers=self.max_workers)
- self.session = requests.Session()
- def shutdown_executors(self):
- self.executor.shutdown(wait=True)
- self.session.close()
- logging.info("Executors and session shut down.")
- def backup_input_file(self):
- try:
- backup_file = self.input_file + ".bak"
- shutil.copyfile(self.input_file, backup_file)
- logging.info(f"Backup created: {backup_file}")
- except Exception as e:
- logging.error(f"Failed to backup input file {self.input_file}: {e}")
- def attempt_url(self, url):
- attempt = 1
- delay = self.initial_delay
- while attempt <= self.retries:
- try:
- full_url = f"http://web.archive.org/save/{url}"
- response = self.session.get(full_url, timeout=self.timeout, allow_redirects=True)
- status = response.status_code
- logging.debug(f"URL: {url} - Response: {status} (Attempt {attempt})")
- if self.verbose:
- console.print(f"[cyan]URL: {url} - Status: {status} (Attempt {attempt})[/cyan]")
- if status == 200:
- return {'url': url, 'status': 200, 'error': None}
- elif status == 403:
- return {'url': url, 'status': 403, 'error': "Excluded"}
- elif status == 429:
- logging.warning(f"Rate-limited for URL: {url} - Attempt {attempt}")
- time.sleep(delay)
- delay = min(delay * 2, self.max_delay)
- attempt += 1
- continue
- except requests.exceptions.RequestException as e:
- logging.error(f"Request failed for {url} (Attempt {attempt}): {e}")
- time.sleep(delay)
- delay = min(delay * 2, self.max_delay)
- attempt += 1
- return {'url': url, 'status': None, 'error': "Max retries exceeded"}
- def process_all_urls(self, urls):
- results = []
- future_to_url = {self.executor.submit(self.attempt_url, url): url for url in urls}
- with Progress(SpinnerColumn(), TextColumn("[progress.description]{task.description}"), BarColumn(), TimeRemainingColumn()) as progress:
- task = progress.add_task("[cyan]Archiving URLs...", total=len(urls))
- for future in as_completed(future_to_url):
- url = future_to_url[future]
- try:
- result = future.result(timeout=self.timeout * self.retries)
- results.append(result)
- except Exception as exc:
- logging.error(f"{url} generated an exception: {exc}")
- results.append({'url': url, 'status': None, 'error': str(exc)})
- finally:
- progress.update(task, advance=1)
- return results
- def handle_result(self, result):
- url, status = result['url'], result['status']
- if status == 200:
- console.print(f"[green]Saved: {url}[/green]")
- elif status == 403:
- console.print(f"[yellow]Excluded: {url} (403)[/yellow]")
- else:
- console.print(f"[red]Failed: {url} ({result.get('error')})[/red]")
- def write_summary(self, saved, excluded, failed, execution_time):
- summary_file = os.path.join(self.output_dir, "summary.csv")
- try:
- with open(summary_file, "w", newline="", encoding="utf-8") as f:
- writer = csv.writer(f)
- writer.writerow(["Status", "URL"])
- for url in saved:
- writer.writerow(["Saved", url])
- for url in excluded:
- writer.writerow(["Excluded", url])
- for url in failed:
- writer.writerow(["Failed", url])
- writer.writerow([])
- writer.writerow(["Total time (s)", round(execution_time, 2)])
- logging.info(f"Summary saved to: {summary_file}")
- except Exception as e:
- logging.error(f"Error writing summary: {e}")
- def summarize_results(self, results, execution_time):
- saved = [r['url'] for r in results if r['status'] == 200]
- excluded = [r['url'] for r in results if r['status'] == 403]
- failed = [r['url'] for r in results if r['status'] not in (200, 403)]
- console.print("\n[bold green]Summary:[/bold green]")
- if saved:
- console.print(f"[green]Saved ({len(saved)}):[/green]")
- for url in saved:
- console.print(f"[green]{url}[/green]")
- if excluded:
- console.print(f"\n[yellow]Excluded ({len(excluded)}):[/yellow]")
- for url in excluded:
- console.print(f"[yellow]{url}[/yellow]")
- if failed:
- console.print(f"\n[red]Failed ({len(failed)}):[/red]")
- for url in failed:
- console.print(f"[red]{url}[/red]")
- if self.save_summary:
- self.write_summary(saved, excluded, failed, execution_time)
- if self.open_output_dir_on_completion:
- try:
- webbrowser.open(self.output_dir)
- except Exception as e:
- logging.warning(f"Failed to open output directory: {e}")
- if __name__ == "__main__":
- archiver = WaybackMachineArchiver(config_file="config.yaml")
- try:
- archiver.backup_input_file()
- with open(archiver.input_file, "r") as file:
- urls = list(dict.fromkeys(line.strip() for line in file if line.strip())) # Deduplicated, order preserved
- if not urls:
- console.print("[yellow]No URLs found in the input file.[/yellow]")
- sys.exit(0)
- start = time.time()
- results = archiver.process_all_urls(urls)
- duration = time.time() - start
- for result in results:
- archiver.handle_result(result)
- archiver.summarize_results(results, duration)
- finally:
- archiver.shutdown_executors()
- console.input("\n[cyan]Press [bold cyan]Enter[/bold cyan] to exit...[/cyan]")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement