from flask import Flask, render_template, request, redirect import pymysql import random import string from flask_bootstrap import Bootstrap app = Flask(__name__) Bootstrap(app) # 配置 MySQL 数据库连接 app.config['MYSQL_DATABASE_USER'] = 'root' app.config['MYSQL_DATABASE_PASSWORD'] = 'guo784512' app.config['MYSQL_DATABASE_DB'] = 'short_url' app.config['MYSQL_DATABASE_HOST'] = 'localhost' app.config['MYSQL_DATABASE_PORT'] = 3306 # 初始化 MySQL 连接 db = pymysql.connect( user=app.config['MYSQL_DATABASE_USER'], password=app.config['MYSQL_DATABASE_PASSWORD'], database=app.config['MYSQL_DATABASE_DB'], host=app.config['MYSQL_DATABASE_HOST'], port=app.config['MYSQL_DATABASE_PORT'] ) def is_short_url_exists(short_url): with db.cursor() as cursor: cursor.execute("SELECT COUNT(*) FROM urls WHERE short_url = %s", (short_url,)) result = cursor.fetchone() return result[0] > 0 """旧""" # def generate_unique_short_url(): # current_length = 3 # while True: # short_url = ''.join(random.choices(string.digits + string.ascii_letters, k=current_length)) # if not is_short_url_exists(short_url): # yield short_url # 使用 yield 生成迭代器 # current_length = 4 # 在使用完当前长度后,设置下一次长度为4 def generate_unique_short_url(): current_length = 3 while True: short_url = ''.join(random.choices(string.digits + string.ascii_letters, k=current_length)) if not is_short_url_exists(short_url): yield short_url else: current_length += 1 # 只有在使用完所有 3 位短链接后才递增长度 def check_and_reconnect_db(db_connection): try: db_connection.ping(reconnect=True) except Exception as e: print(f"Error: {e}") db_connection = pymysql.connect( user=app.config['MYSQL_DATABASE_USER'], password=app.config['MYSQL_DATABASE_PASSWORD'], database=app.config['MYSQL_DATABASE_DB'], host=app.config['MYSQL_DATABASE_HOST'], port=app.config['MYSQL_DATABASE_PORT'] ) short_url_generator = generate_unique_short_url() @app.route('/') def index(): return render_template('index.html') @app.route('/shorten', methods=['POST']) def shorten(): original_url = request.form['url'] # 检查原始链接是否已经有对应的短链接 with db.cursor() as cursor: cursor.execute("SELECT short_url FROM urls WHERE original_url = %s", (original_url,)) result = cursor.fetchone() if result: existing_short_url = result[0] return render_template('result.html', original_url=original_url, short_url=existing_short_url) # 如果没有,生成新的短链接 short_url = next(short_url_generator) # 将原始URL和短网址保存到数据库 with db.cursor() as cursor: cursor.execute("INSERT INTO urls (original_url, short_url) VALUES (%s, %s)", (original_url, short_url)) db.commit() check_and_reconnect_db(db) # 在每次操作后检查并重新连接数据库 return render_template('result.html', original_url=original_url, short_url=short_url) @app.route('/') def redirect_to_original(short_url): check_and_reconnect_db(db) # 在每次操作后检查并重新连接数据库 # 从数据库中获取原始URL并重定向 with db.cursor() as cursor: cursor.execute("SELECT original_url FROM urls WHERE short_url = %s", (short_url,)) result = cursor.fetchone() if result: original_url = result[0] return redirect(original_url) else: return "Short URL not found." if __name__ == '__main__': # 创建数据库表 with app.app_context(): check_and_reconnect_db(db) # 在每次操作后检查并重新连接数据库 with db.cursor() as cursor: cursor.execute("CREATE TABLE IF NOT EXISTS urls (id INT AUTO_INCREMENT PRIMARY KEY, original_url VARCHAR(255), short_url VARCHAR(255), UNIQUE KEY short_url_unique (short_url))") db.commit() app.run(debug=True, port=8082) # 将端口号改为 8082