app.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. from flask import Flask, render_template, request, redirect
  2. import pymysql
  3. import random
  4. import string
  5. from flask_bootstrap import Bootstrap
  6. app = Flask(__name__)
  7. Bootstrap(app)
  8. # 配置 MySQL 数据库连接
  9. app.config['MYSQL_DATABASE_USER'] = 'root'
  10. app.config['MYSQL_DATABASE_PASSWORD'] = 'guo784512'
  11. app.config['MYSQL_DATABASE_DB'] = 'short_url'
  12. app.config['MYSQL_DATABASE_HOST'] = 'localhost'
  13. app.config['MYSQL_DATABASE_PORT'] = 3306
  14. # 初始化 MySQL 连接
  15. db = pymysql.connect(
  16. user=app.config['MYSQL_DATABASE_USER'],
  17. password=app.config['MYSQL_DATABASE_PASSWORD'],
  18. database=app.config['MYSQL_DATABASE_DB'],
  19. host=app.config['MYSQL_DATABASE_HOST'],
  20. port=app.config['MYSQL_DATABASE_PORT']
  21. )
  22. def is_short_url_exists(short_url):
  23. with db.cursor() as cursor:
  24. cursor.execute("SELECT COUNT(*) FROM urls WHERE short_url = %s", (short_url,))
  25. result = cursor.fetchone()
  26. return result[0] > 0
  27. """旧"""
  28. # def generate_unique_short_url():
  29. # current_length = 3
  30. # while True:
  31. # short_url = ''.join(random.choices(string.digits + string.ascii_letters, k=current_length))
  32. # if not is_short_url_exists(short_url):
  33. # yield short_url # 使用 yield 生成迭代器
  34. # current_length = 4 # 在使用完当前长度后,设置下一次长度为4
  35. def generate_unique_short_url():
  36. current_length = 3
  37. while True:
  38. short_url = ''.join(random.choices(string.digits + string.ascii_letters, k=current_length))
  39. if not is_short_url_exists(short_url):
  40. yield short_url
  41. else:
  42. current_length += 1 # 只有在使用完所有 3 位短链接后才递增长度
  43. def check_and_reconnect_db(db_connection):
  44. try:
  45. db_connection.ping(reconnect=True)
  46. except Exception as e:
  47. print(f"Error: {e}")
  48. db_connection = pymysql.connect(
  49. user=app.config['MYSQL_DATABASE_USER'],
  50. password=app.config['MYSQL_DATABASE_PASSWORD'],
  51. database=app.config['MYSQL_DATABASE_DB'],
  52. host=app.config['MYSQL_DATABASE_HOST'],
  53. port=app.config['MYSQL_DATABASE_PORT']
  54. )
  55. short_url_generator = generate_unique_short_url()
  56. @app.route('/')
  57. def index():
  58. return render_template('index.html')
  59. @app.route('/shorten', methods=['POST'])
  60. def shorten():
  61. original_url = request.form['url']
  62. # 检查原始链接是否已经有对应的短链接
  63. with db.cursor() as cursor:
  64. cursor.execute("SELECT short_url FROM urls WHERE original_url = %s", (original_url,))
  65. result = cursor.fetchone()
  66. if result:
  67. existing_short_url = result[0]
  68. return render_template('result.html', original_url=original_url, short_url=existing_short_url)
  69. # 如果没有,生成新的短链接
  70. short_url = next(short_url_generator)
  71. # 将原始URL和短网址保存到数据库
  72. with db.cursor() as cursor:
  73. cursor.execute("INSERT INTO urls (original_url, short_url) VALUES (%s, %s)", (original_url, short_url))
  74. db.commit()
  75. check_and_reconnect_db(db) # 在每次操作后检查并重新连接数据库
  76. return render_template('result.html', original_url=original_url, short_url=short_url)
  77. @app.route('/<short_url>')
  78. def redirect_to_original(short_url):
  79. check_and_reconnect_db(db) # 在每次操作后检查并重新连接数据库
  80. # 从数据库中获取原始URL并重定向
  81. with db.cursor() as cursor:
  82. cursor.execute("SELECT original_url FROM urls WHERE short_url = %s", (short_url,))
  83. result = cursor.fetchone()
  84. if result:
  85. original_url = result[0]
  86. return redirect(original_url)
  87. else:
  88. return "Short URL not found."
  89. if __name__ == '__main__':
  90. # 创建数据库表
  91. with app.app_context():
  92. check_and_reconnect_db(db) # 在每次操作后检查并重新连接数据库
  93. with db.cursor() as cursor:
  94. cursor.execute("CREATE TABLE IF NOT EXISTS urls (id INT AUTO_INCREMENT PRIMARY KEY, original_url VARCHAR(255), short_url VARCHAR(255), UNIQUE KEY short_url_unique (short_url))")
  95. db.commit()
  96. app.run(debug=True, port=8082) # 将端口号改为 8082