123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118 |
- from flask import Flask, render_template, request, redirect
- import pymysql
- import random
- import string
- from flask_bootstrap import Bootstrap
- app = Flask(__name__)
- Bootstrap(app)
- # 配置 MySQL 数据库连接
- app.config['MYSQL_DATABASE_USER'] = 'root'
- app.config['MYSQL_DATABASE_PASSWORD'] = 'guo784512'
- app.config['MYSQL_DATABASE_DB'] = 'short_url'
- app.config['MYSQL_DATABASE_HOST'] = 'localhost'
- app.config['MYSQL_DATABASE_PORT'] = 3306
- # 初始化 MySQL 连接
- db = pymysql.connect(
- user=app.config['MYSQL_DATABASE_USER'],
- password=app.config['MYSQL_DATABASE_PASSWORD'],
- database=app.config['MYSQL_DATABASE_DB'],
- host=app.config['MYSQL_DATABASE_HOST'],
- port=app.config['MYSQL_DATABASE_PORT']
- )
- def is_short_url_exists(short_url):
- with db.cursor() as cursor:
- cursor.execute("SELECT COUNT(*) FROM urls WHERE short_url = %s", (short_url,))
- result = cursor.fetchone()
- return result[0] > 0
- """旧"""
- # def generate_unique_short_url():
- # current_length = 3
- # while True:
- # short_url = ''.join(random.choices(string.digits + string.ascii_letters, k=current_length))
- # if not is_short_url_exists(short_url):
- # yield short_url # 使用 yield 生成迭代器
- # current_length = 4 # 在使用完当前长度后,设置下一次长度为4
- def generate_unique_short_url():
- current_length = 3
- while True:
- short_url = ''.join(random.choices(string.digits + string.ascii_letters, k=current_length))
- if not is_short_url_exists(short_url):
- yield short_url
- else:
- current_length += 1 # 只有在使用完所有 3 位短链接后才递增长度
- def check_and_reconnect_db(db_connection):
- try:
- db_connection.ping(reconnect=True)
- except Exception as e:
- print(f"Error: {e}")
- db_connection = pymysql.connect(
- user=app.config['MYSQL_DATABASE_USER'],
- password=app.config['MYSQL_DATABASE_PASSWORD'],
- database=app.config['MYSQL_DATABASE_DB'],
- host=app.config['MYSQL_DATABASE_HOST'],
- port=app.config['MYSQL_DATABASE_PORT']
- )
- short_url_generator = generate_unique_short_url()
- @app.route('/')
- def index():
- return render_template('index.html')
- @app.route('/shorten', methods=['POST'])
- def shorten():
- original_url = request.form['url']
- # 检查原始链接是否已经有对应的短链接
- with db.cursor() as cursor:
- cursor.execute("SELECT short_url FROM urls WHERE original_url = %s", (original_url,))
- result = cursor.fetchone()
- if result:
- existing_short_url = result[0]
- return render_template('result.html', original_url=original_url, short_url=existing_short_url)
- # 如果没有,生成新的短链接
- short_url = next(short_url_generator)
- # 将原始URL和短网址保存到数据库
- with db.cursor() as cursor:
- cursor.execute("INSERT INTO urls (original_url, short_url) VALUES (%s, %s)", (original_url, short_url))
- db.commit()
- check_and_reconnect_db(db) # 在每次操作后检查并重新连接数据库
- return render_template('result.html', original_url=original_url, short_url=short_url)
- @app.route('/<short_url>')
- def redirect_to_original(short_url):
- check_and_reconnect_db(db) # 在每次操作后检查并重新连接数据库
- # 从数据库中获取原始URL并重定向
- with db.cursor() as cursor:
- cursor.execute("SELECT original_url FROM urls WHERE short_url = %s", (short_url,))
- result = cursor.fetchone()
- if result:
- original_url = result[0]
- return redirect(original_url)
- else:
- return "Short URL not found."
- if __name__ == '__main__':
- # 创建数据库表
- with app.app_context():
- check_and_reconnect_db(db) # 在每次操作后检查并重新连接数据库
- with db.cursor() as cursor:
- cursor.execute("CREATE TABLE IF NOT EXISTS urls (id INT AUTO_INCREMENT PRIMARY KEY, original_url VARCHAR(255), short_url VARCHAR(255), UNIQUE KEY short_url_unique (short_url))")
- db.commit()
- app.run(debug=True, port=8082) # 将端口号改为 8082
|