processors.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. # sqlalchemy/processors.py
  2. # Copyright (C) 2010-2017 the SQLAlchemy authors and contributors
  3. # <see AUTHORS file>
  4. # Copyright (C) 2010 Gaetan de Menten gdementen@gmail.com
  5. #
  6. # This module is part of SQLAlchemy and is released under
  7. # the MIT License: http://www.opensource.org/licenses/mit-license.php
  8. """defines generic type conversion functions, as used in bind and result
  9. processors.
  10. They all share one common characteristic: None is passed through unchanged.
  11. """
  12. import codecs
  13. import re
  14. import datetime
  15. from . import util
  16. def str_to_datetime_processor_factory(regexp, type_):
  17. rmatch = regexp.match
  18. # Even on python2.6 datetime.strptime is both slower than this code
  19. # and it does not support microseconds.
  20. has_named_groups = bool(regexp.groupindex)
  21. def process(value):
  22. if value is None:
  23. return None
  24. else:
  25. try:
  26. m = rmatch(value)
  27. except TypeError:
  28. raise ValueError("Couldn't parse %s string '%r' "
  29. "- value is not a string." %
  30. (type_.__name__, value))
  31. if m is None:
  32. raise ValueError("Couldn't parse %s string: "
  33. "'%s'" % (type_.__name__, value))
  34. if has_named_groups:
  35. groups = m.groupdict(0)
  36. return type_(**dict(list(zip(
  37. iter(groups.keys()),
  38. list(map(int, iter(groups.values())))
  39. ))))
  40. else:
  41. return type_(*list(map(int, m.groups(0))))
  42. return process
  43. def boolean_to_int(value):
  44. if value is None:
  45. return None
  46. else:
  47. return int(bool(value))
  48. def py_fallback():
  49. def to_unicode_processor_factory(encoding, errors=None):
  50. decoder = codecs.getdecoder(encoding)
  51. def process(value):
  52. if value is None:
  53. return None
  54. else:
  55. # decoder returns a tuple: (value, len). Simply dropping the
  56. # len part is safe: it is done that way in the normal
  57. # 'xx'.decode(encoding) code path.
  58. return decoder(value, errors)[0]
  59. return process
  60. def to_conditional_unicode_processor_factory(encoding, errors=None):
  61. decoder = codecs.getdecoder(encoding)
  62. def process(value):
  63. if value is None:
  64. return None
  65. elif isinstance(value, util.text_type):
  66. return value
  67. else:
  68. # decoder returns a tuple: (value, len). Simply dropping the
  69. # len part is safe: it is done that way in the normal
  70. # 'xx'.decode(encoding) code path.
  71. return decoder(value, errors)[0]
  72. return process
  73. def to_decimal_processor_factory(target_class, scale):
  74. fstring = "%%.%df" % scale
  75. def process(value):
  76. if value is None:
  77. return None
  78. else:
  79. return target_class(fstring % value)
  80. return process
  81. def to_float(value):
  82. if value is None:
  83. return None
  84. else:
  85. return float(value)
  86. def to_str(value):
  87. if value is None:
  88. return None
  89. else:
  90. return str(value)
  91. def int_to_boolean(value):
  92. if value is None:
  93. return None
  94. else:
  95. return bool(value)
  96. DATETIME_RE = re.compile(
  97. r"(\d+)-(\d+)-(\d+) (\d+):(\d+):(\d+)(?:\.(\d+))?")
  98. TIME_RE = re.compile(r"(\d+):(\d+):(\d+)(?:\.(\d+))?")
  99. DATE_RE = re.compile(r"(\d+)-(\d+)-(\d+)")
  100. str_to_datetime = str_to_datetime_processor_factory(DATETIME_RE,
  101. datetime.datetime)
  102. str_to_time = str_to_datetime_processor_factory(TIME_RE, datetime.time)
  103. str_to_date = str_to_datetime_processor_factory(DATE_RE, datetime.date)
  104. return locals()
  105. try:
  106. from sqlalchemy.cprocessors import UnicodeResultProcessor, \
  107. DecimalResultProcessor, \
  108. to_float, to_str, int_to_boolean, \
  109. str_to_datetime, str_to_time, \
  110. str_to_date
  111. def to_unicode_processor_factory(encoding, errors=None):
  112. if errors is not None:
  113. return UnicodeResultProcessor(encoding, errors).process
  114. else:
  115. return UnicodeResultProcessor(encoding).process
  116. def to_conditional_unicode_processor_factory(encoding, errors=None):
  117. if errors is not None:
  118. return UnicodeResultProcessor(encoding, errors).conditional_process
  119. else:
  120. return UnicodeResultProcessor(encoding).conditional_process
  121. def to_decimal_processor_factory(target_class, scale):
  122. # Note that the scale argument is not taken into account for integer
  123. # values in the C implementation while it is in the Python one.
  124. # For example, the Python implementation might return
  125. # Decimal('5.00000') whereas the C implementation will
  126. # return Decimal('5'). These are equivalent of course.
  127. return DecimalResultProcessor(target_class, "%%.%df" % scale).process
  128. except ImportError:
  129. globals().update(py_fallback())