utils.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345
  1. # -*- coding: utf-8 -*-
  2. '''
  3. flask_login.utils
  4. -----------------
  5. General utilities.
  6. '''
  7. import hmac
  8. from hashlib import sha512
  9. from functools import wraps
  10. from werkzeug.local import LocalProxy
  11. from werkzeug.security import safe_str_cmp
  12. from werkzeug.urls import url_decode, url_encode
  13. from flask import (_request_ctx_stack, current_app, request, session, url_for,
  14. has_request_context)
  15. from ._compat import text_type, urlparse, urlunparse
  16. from .config import COOKIE_NAME, EXEMPT_METHODS
  17. from .signals import user_logged_in, user_logged_out, user_login_confirmed
  18. #: A proxy for the current user. If no user is logged in, this will be an
  19. #: anonymous user
  20. current_user = LocalProxy(lambda: _get_user())
  21. def encode_cookie(payload):
  22. '''
  23. This will encode a ``unicode`` value into a cookie, and sign that cookie
  24. with the app's secret key.
  25. :param payload: The value to encode, as `unicode`.
  26. :type payload: unicode
  27. '''
  28. return u'{0}|{1}'.format(payload, _cookie_digest(payload))
  29. def decode_cookie(cookie):
  30. '''
  31. This decodes a cookie given by `encode_cookie`. If verification of the
  32. cookie fails, ``None`` will be implicitly returned.
  33. :param cookie: An encoded cookie.
  34. :type cookie: str
  35. '''
  36. try:
  37. payload, digest = cookie.rsplit(u'|', 1)
  38. if hasattr(digest, 'decode'):
  39. digest = digest.decode('ascii') # pragma: no cover
  40. except ValueError:
  41. return
  42. if safe_str_cmp(_cookie_digest(payload), digest):
  43. return payload
  44. def make_next_param(login_url, current_url):
  45. '''
  46. Reduces the scheme and host from a given URL so it can be passed to
  47. the given `login` URL more efficiently.
  48. :param login_url: The login URL being redirected to.
  49. :type login_url: str
  50. :param current_url: The URL to reduce.
  51. :type current_url: str
  52. '''
  53. l = urlparse(login_url)
  54. c = urlparse(current_url)
  55. if (not l.scheme or l.scheme == c.scheme) and \
  56. (not l.netloc or l.netloc == c.netloc):
  57. return urlunparse(('', '', c.path, c.params, c.query, ''))
  58. return current_url
  59. def login_url(login_view, next_url=None, next_field='next'):
  60. '''
  61. Creates a URL for redirecting to a login page. If only `login_view` is
  62. provided, this will just return the URL for it. If `next_url` is provided,
  63. however, this will append a ``next=URL`` parameter to the query string
  64. so that the login view can redirect back to that URL.
  65. :param login_view: The name of the login view. (Alternately, the actual
  66. URL to the login view.)
  67. :type login_view: str
  68. :param next_url: The URL to give the login view for redirection.
  69. :type next_url: str
  70. :param next_field: What field to store the next URL in. (It defaults to
  71. ``next``.)
  72. :type next_field: str
  73. '''
  74. if login_view.startswith(('https://', 'http://', '/')):
  75. base = login_view
  76. else:
  77. base = url_for(login_view)
  78. if next_url is None:
  79. return base
  80. parts = list(urlparse(base))
  81. md = url_decode(parts[4])
  82. md[next_field] = make_next_param(base, next_url)
  83. parts[4] = url_encode(md, sort=True)
  84. return urlunparse(parts)
  85. def login_fresh():
  86. '''
  87. This returns ``True`` if the current login is fresh.
  88. '''
  89. return session.get('_fresh', False)
  90. def login_user(user, remember=False, force=False, fresh=True):
  91. '''
  92. Logs a user in. You should pass the actual user object to this. If the
  93. user's `is_active` property is ``False``, they will not be logged in
  94. unless `force` is ``True``.
  95. This will return ``True`` if the log in attempt succeeds, and ``False`` if
  96. it fails (i.e. because the user is inactive).
  97. :param user: The user object to log in.
  98. :type user: object
  99. :param remember: Whether to remember the user after their session expires.
  100. Defaults to ``False``.
  101. :type remember: bool
  102. :param force: If the user is inactive, setting this to ``True`` will log
  103. them in regardless. Defaults to ``False``.
  104. :type force: bool
  105. :param fresh: setting this to ``False`` will log in the user with a session
  106. marked as not "fresh". Defaults to ``True``.
  107. :type fresh: bool
  108. '''
  109. if not force and not user.is_active:
  110. return False
  111. user_id = getattr(user, current_app.login_manager.id_attribute)()
  112. session['user_id'] = user_id
  113. session['_fresh'] = fresh
  114. session['_id'] = _create_identifier()
  115. if remember:
  116. session['remember'] = 'set'
  117. _request_ctx_stack.top.user = user
  118. user_logged_in.send(current_app._get_current_object(), user=_get_user())
  119. return True
  120. def logout_user():
  121. '''
  122. Logs a user out. (You do not need to pass the actual user.) This will
  123. also clean up the remember me cookie if it exists.
  124. '''
  125. user = _get_user()
  126. if 'user_id' in session:
  127. session.pop('user_id')
  128. if '_fresh' in session:
  129. session.pop('_fresh')
  130. cookie_name = current_app.config.get('REMEMBER_COOKIE_NAME', COOKIE_NAME)
  131. if cookie_name in request.cookies:
  132. session['remember'] = 'clear'
  133. user_logged_out.send(current_app._get_current_object(), user=user)
  134. current_app.login_manager.reload_user()
  135. return True
  136. def confirm_login():
  137. '''
  138. This sets the current session as fresh. Sessions become stale when they
  139. are reloaded from a cookie.
  140. '''
  141. session['_fresh'] = True
  142. session['_id'] = _create_identifier()
  143. user_login_confirmed.send(current_app._get_current_object())
  144. def login_required(func):
  145. '''
  146. If you decorate a view with this, it will ensure that the current user is
  147. logged in and authenticated before calling the actual view. (If they are
  148. not, it calls the :attr:`LoginManager.unauthorized` callback.) For
  149. example::
  150. @app.route('/post')
  151. @login_required
  152. def post():
  153. pass
  154. If there are only certain times you need to require that your user is
  155. logged in, you can do so with::
  156. if not current_user.is_authenticated:
  157. return current_app.login_manager.unauthorized()
  158. ...which is essentially the code that this function adds to your views.
  159. It can be convenient to globally turn off authentication when unit testing.
  160. To enable this, if the application configuration variable `LOGIN_DISABLED`
  161. is set to `True`, this decorator will be ignored.
  162. .. Note ::
  163. Per `W3 guidelines for CORS preflight requests
  164. <http://www.w3.org/TR/cors/#cross-origin-request-with-preflight-0>`_,
  165. HTTP ``OPTIONS`` requests are exempt from login checks.
  166. :param func: The view function to decorate.
  167. :type func: function
  168. '''
  169. @wraps(func)
  170. def decorated_view(*args, **kwargs):
  171. if request.method in EXEMPT_METHODS:
  172. return func(*args, **kwargs)
  173. elif current_app.login_manager._login_disabled:
  174. return func(*args, **kwargs)
  175. elif not current_user.is_authenticated:
  176. return current_app.login_manager.unauthorized()
  177. return func(*args, **kwargs)
  178. return decorated_view
  179. def fresh_login_required(func):
  180. '''
  181. If you decorate a view with this, it will ensure that the current user's
  182. login is fresh - i.e. their session was not restored from a 'remember me'
  183. cookie. Sensitive operations, like changing a password or e-mail, should
  184. be protected with this, to impede the efforts of cookie thieves.
  185. If the user is not authenticated, :meth:`LoginManager.unauthorized` is
  186. called as normal. If they are authenticated, but their session is not
  187. fresh, it will call :meth:`LoginManager.needs_refresh` instead. (In that
  188. case, you will need to provide a :attr:`LoginManager.refresh_view`.)
  189. Behaves identically to the :func:`login_required` decorator with respect
  190. to configutation variables.
  191. .. Note ::
  192. Per `W3 guidelines for CORS preflight requests
  193. <http://www.w3.org/TR/cors/#cross-origin-request-with-preflight-0>`_,
  194. HTTP ``OPTIONS`` requests are exempt from login checks.
  195. :param func: The view function to decorate.
  196. :type func: function
  197. '''
  198. @wraps(func)
  199. def decorated_view(*args, **kwargs):
  200. if request.method in EXEMPT_METHODS:
  201. return func(*args, **kwargs)
  202. elif current_app.login_manager._login_disabled:
  203. return func(*args, **kwargs)
  204. elif not current_user.is_authenticated:
  205. return current_app.login_manager.unauthorized()
  206. elif not login_fresh():
  207. return current_app.login_manager.needs_refresh()
  208. return func(*args, **kwargs)
  209. return decorated_view
  210. def set_login_view(login_view, blueprint=None):
  211. '''
  212. Sets the login view for the app or blueprint. If a blueprint is passed,
  213. the login view is set for this blueprint on ``blueprint_login_views``.
  214. :param login_view: The user object to log in.
  215. :type login_view: str
  216. :param blueprint: The blueprint which this login view should be set on.
  217. Defaults to ``None``.
  218. :type blueprint: object
  219. '''
  220. num_login_views = len(current_app.login_manager.blueprint_login_views)
  221. if blueprint is not None or num_login_views != 0:
  222. (current_app.login_manager
  223. .blueprint_login_views[blueprint.name]) = login_view
  224. if (current_app.login_manager.login_view is not None and
  225. None not in current_app.login_manager.blueprint_login_views):
  226. (current_app.login_manager
  227. .blueprint_login_views[None]) = (current_app.login_manager
  228. .login_view)
  229. current_app.login_manager.login_view = None
  230. else:
  231. current_app.login_manager.login_view = login_view
  232. def _get_user():
  233. if has_request_context() and not hasattr(_request_ctx_stack.top, 'user'):
  234. current_app.login_manager._load_user()
  235. return getattr(_request_ctx_stack.top, 'user', None)
  236. def _cookie_digest(payload, key=None):
  237. key = _secret_key(key)
  238. return hmac.new(key, payload.encode('utf-8'), sha512).hexdigest()
  239. def _get_remote_addr():
  240. address = request.headers.get('X-Forwarded-For', request.remote_addr)
  241. if address is not None:
  242. # An 'X-Forwarded-For' header includes a comma separated list of the
  243. # addresses, the first address being the actual remote address.
  244. address = address.encode('utf-8').split(b',')[0].strip()
  245. return address
  246. def _create_identifier():
  247. user_agent = request.headers.get('User-Agent')
  248. if user_agent is not None:
  249. user_agent = user_agent.encode('utf-8')
  250. base = '{0}|{1}'.format(_get_remote_addr(), user_agent)
  251. if str is bytes:
  252. base = text_type(base, 'utf-8', errors='replace') # pragma: no cover
  253. h = sha512()
  254. h.update(base.encode('utf8'))
  255. return h.hexdigest()
  256. def _user_context_processor():
  257. return dict(current_user=_get_user())
  258. def _secret_key(key=None):
  259. if key is None:
  260. key = current_app.config['SECRET_KEY']
  261. if isinstance(key, text_type): # pragma: no cover
  262. key = key.encode('latin1') # ensure bytes
  263. return key