import asyncio import os import threading import uuid from flask import Flask, render_template, request, jsonify, send_from_directory import aiohttp import aiofiles import requests app = Flask(__name__) # Dictionary to store the state of each running job jobs = {} # --- Core Proxy Checking Logic (adapted from original script) --- async def check_proxy(session, sem, ip, port, country, org, job_id): """Checks a single proxy and updates the job status.""" line = f"{ip}:{port}" # Simple ip:port format for the output file url = f"https://proxyip.biz.id/{ip}:{port}" jobs[job_id]["checked"] += 1 try: async with session.get(url, timeout=10) as resp: data = await resp.json(content_type=None) if data.get("proxyip"): jobs[job_id]["true_proxies"].append(line) return True except asyncio.TimeoutError: pass # Timeout is a failed check except Exception: pass # Any other exception is a failed check return False async def run_checker(proxy_list, job_id): """The main async function to run the checks.""" sem = asyncio.Semaphore(150) # Concurrency limit connector = aiohttp.TCPConnector(limit=150) async with aiohttp.ClientSession(connector=connector) as session: tasks = [] for line in proxy_list: line = line.strip() if not line: continue # Support both ip:port and ip,port,... formats if ':' in line: parts = line.split(':') ip, port = parts[0], parts[1] country, org = "N/A", "N/A" elif ',' in line: parts = line.split(',') ip, port = parts[0], parts[1] country = parts[2] if len(parts) > 2 else "N/A" org = parts[3] if len(parts) > 3 else "N/A" else: continue # Skip invalid lines tasks.append(check_proxy(session, sem, ip, port, country, org, job_id)) await asyncio.gather(*tasks) # --- Save the results to a file --- output_dir = os.path.join('downloads', job_id) os.makedirs(output_dir, exist_ok=True) output_file = os.path.join(output_dir, 'proxyList.txt') async with aiofiles.open(output_file, "w") as f: await f.write("\n".join(jobs[job_id]["true_proxies"])) jobs[job_id]["status"] = "completed" jobs[job_id]["output_file"] = output_file def start_background_loop(loop, coro): """Function to run the asyncio event loop in a background thread.""" asyncio.set_event_loop(loop) loop.run_until_complete(coro) # --- Flask Routes --- @app.route('/') def index(): """Renders the main page.""" return render_template('index.html') @app.route('/start-check', methods=['POST']) def start_check(): """Starts the proxy checking process.""" data = request.get_json() proxy_url = data.get('url') if not proxy_url: return jsonify({"error": "URL is required"}), 400 try: response = requests.get(proxy_url, timeout=10) response.raise_for_status() proxy_list = response.text.strip().splitlines() except requests.RequestException as e: return jsonify({"error": f"Failed to fetch proxy list: {str(e)}"}), 400 job_id = str(uuid.uuid4()) jobs[job_id] = { "status": "running", "total": len(proxy_list), "checked": 0, "true_proxies": [], "output_file": None } # Run the asyncio checker in a separate thread loop = asyncio.new_event_loop() checker_coro = run_checker(proxy_list, job_id) thread = threading.Thread(target=start_background_loop, args=(loop, checker_coro)) thread.start() return jsonify({"job_id": job_id}) @app.route('/status/') def get_status(job_id): """Provides the status of a running job.""" job = jobs.get(job_id) if not job: return jsonify({"error": "Job not found"}), 404 return jsonify({ "status": job["status"], "checked": job["checked"], "total": job["total"], "found": len(job["true_proxies"]) }) @app.route('/download/') def download_file(job_id): """Serves the final proxy list for download.""" job = jobs.get(job_id) if not job or job["status"] != "completed": return "Job not found or not completed.", 404 output_file = job.get("output_file") if not output_file or not os.path.exists(output_file): return "Output file not found.", 404 directory = os.path.dirname(output_file) filename = os.path.basename(output_file) # Clean up old jobs after download (optional but good practice) # threading.Timer(300, cleanup_job, args=[job_id]).start() return send_from_directory(directory, filename, as_attachment=True) def cleanup_job(job_id): """Removes job data and its download file.""" if job_id in jobs: output_file = jobs[job_id].get("output_file") if output_file and os.path.exists(output_file): try: os.remove(output_file) os.rmdir(os.path.dirname(output_file)) except OSError as e: print(f"Error cleaning up file for job {job_id}: {e}") del jobs[job_id] if __name__ == '__main__': # Create downloads directory if it doesn't exist if not os.path.exists('downloads'): os.makedirs('downloads') app.run(debug=True)