__init__.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. """
  2. Create and verify jws-js format Ed25519 signatures.
  3. """
  4. __all__ = [ 'sign', 'verify' ]
  5. import json
  6. from ..util import urlsafe_b64decode, urlsafe_b64encode, native, binary
  7. ed25519ll = None
  8. ALG = "Ed25519"
  9. def get_ed25519ll():
  10. """Lazy import-and-test of ed25519 module"""
  11. global ed25519ll
  12. if not ed25519ll:
  13. try:
  14. import ed25519ll # fast (thousands / s)
  15. except (ImportError, OSError): # pragma nocover
  16. from . import ed25519py as ed25519ll # pure Python (hundreds / s)
  17. test()
  18. return ed25519ll
  19. def sign(payload, keypair):
  20. """Return a JWS-JS format signature given a JSON-serializable payload and
  21. an Ed25519 keypair."""
  22. get_ed25519ll()
  23. #
  24. header = {
  25. "alg": ALG,
  26. "jwk": {
  27. "kty": ALG, # alg -> kty in jwk-08.
  28. "vk": native(urlsafe_b64encode(keypair.vk))
  29. }
  30. }
  31. encoded_header = urlsafe_b64encode(binary(json.dumps(header, sort_keys=True)))
  32. encoded_payload = urlsafe_b64encode(binary(json.dumps(payload, sort_keys=True)))
  33. secured_input = b".".join((encoded_header, encoded_payload))
  34. sig_msg = ed25519ll.crypto_sign(secured_input, keypair.sk)
  35. signature = sig_msg[:ed25519ll.SIGNATUREBYTES]
  36. encoded_signature = urlsafe_b64encode(signature)
  37. return {"recipients":
  38. [{"header":native(encoded_header),
  39. "signature":native(encoded_signature)}],
  40. "payload": native(encoded_payload)}
  41. def assertTrue(condition, message=""):
  42. if not condition:
  43. raise ValueError(message)
  44. def verify(jwsjs):
  45. """Return (decoded headers, payload) if all signatures in jwsjs are
  46. consistent, else raise ValueError.
  47. Caller must decide whether the keys are actually trusted."""
  48. get_ed25519ll()
  49. # XXX forbid duplicate keys in JSON input using object_pairs_hook (2.7+)
  50. recipients = jwsjs["recipients"]
  51. encoded_payload = binary(jwsjs["payload"])
  52. headers = []
  53. for recipient in recipients:
  54. assertTrue(len(recipient) == 2, "Unknown recipient key {0}".format(recipient))
  55. h = binary(recipient["header"])
  56. s = binary(recipient["signature"])
  57. header = json.loads(native(urlsafe_b64decode(h)))
  58. assertTrue(header["alg"] == ALG,
  59. "Unexpected algorithm {0}".format(header["alg"]))
  60. if "alg" in header["jwk"] and not "kty" in header["jwk"]:
  61. header["jwk"]["kty"] = header["jwk"]["alg"] # b/w for JWK < -08
  62. assertTrue(header["jwk"]["kty"] == ALG, # true for Ed25519
  63. "Unexpected key type {0}".format(header["jwk"]["kty"]))
  64. vk = urlsafe_b64decode(binary(header["jwk"]["vk"]))
  65. secured_input = b".".join((h, encoded_payload))
  66. sig = urlsafe_b64decode(s)
  67. sig_msg = sig+secured_input
  68. verified_input = native(ed25519ll.crypto_sign_open(sig_msg, vk))
  69. verified_header, verified_payload = verified_input.split('.')
  70. verified_header = binary(verified_header)
  71. decoded_header = native(urlsafe_b64decode(verified_header))
  72. headers.append(json.loads(decoded_header))
  73. verified_payload = binary(verified_payload)
  74. # only return header, payload that have passed through the crypto library.
  75. payload = json.loads(native(urlsafe_b64decode(verified_payload)))
  76. return headers, payload
  77. def test():
  78. kp = ed25519ll.crypto_sign_keypair()
  79. payload = {'test': 'onstartup'}
  80. jwsjs = json.loads(json.dumps(sign(payload, kp)))
  81. verify(jwsjs)
  82. jwsjs['payload'] += 'x'
  83. try:
  84. verify(jwsjs)
  85. except ValueError:
  86. pass
  87. else: # pragma no cover
  88. raise RuntimeError("No error from bad wheel.signatures payload.")