secure_command.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. #!/usr/bin/env python3
  2. '''
  3. perform a secure parameter change on a ArduRemoteID node via DroneCAN
  4. user must supply a private key corresponding to one of the public keys on the node
  5. '''
  6. import dronecan, time, sys, random, base64, struct
  7. from dronecan import uavcan
  8. try:
  9. import monocypher
  10. except ImportError:
  11. print("Please install monocypher with: python3 -m pip install pymonocypher")
  12. sys.exit(1)
  13. # get command line arguments
  14. from argparse import ArgumentParser
  15. parser = ArgumentParser(description='secure_command')
  16. parser.add_argument("--bitrate", default=1000000, type=int, help="CAN bit rate")
  17. parser.add_argument("--node-id", default=100, type=int, help="local CAN node ID")
  18. parser.add_argument("--target-node", default=None, type=int, help="target node ID")
  19. parser.add_argument("--private-key", default=None, type=str, help="private key file")
  20. parser.add_argument("uri", default=None, type=str, help="CAN URI")
  21. parser.add_argument("paramop", default=None, type=str, help="parameter operation")
  22. args = parser.parse_args()
  23. should_exit = False
  24. if args.target_node is None:
  25. print("Must specify target node ID")
  26. should_exit = True
  27. if args.private_key is None:
  28. print("Must specify private key file")
  29. should_exit = True
  30. if should_exit:
  31. sys.exit(1)
  32. SECURE_COMMAND_GET_REMOTEID_SESSION_KEY = dronecan.dronecan.remoteid.SecureCommand.Request().SECURE_COMMAND_GET_REMOTEID_SESSION_KEY
  33. SECURE_COMMAND_SET_REMOTEID_CONFIG = dronecan.dronecan.remoteid.SecureCommand.Request().SECURE_COMMAND_SET_REMOTEID_CONFIG
  34. session_key = None
  35. sequence = random.randint(0, 0xFFFFFFFF)
  36. last_session_key_req = 0
  37. last_set_config = 0
  38. # Initializing a DroneCAN node instance.
  39. node = dronecan.make_node(args.uri, node_id=args.node_id, bitrate=args.bitrate)
  40. # Initializing a node monitor
  41. node_monitor = dronecan.app.node_monitor.NodeMonitor(node)
  42. def get_session_key_response(reply):
  43. if not reply:
  44. # timed out
  45. return
  46. global session_key
  47. session_key = bytearray(reply.response.data)
  48. print("Got session key")
  49. def get_private_key():
  50. '''get private key, return 32 byte key or None'''
  51. if args.private_key is None:
  52. return None
  53. try:
  54. d = open(args.private_key,'r').read()
  55. except Exception as ex:
  56. return None
  57. ktype = "PRIVATE_KEYV1:"
  58. if not d.startswith(ktype):
  59. return None
  60. return base64.b64decode(d[len(ktype):])
  61. def make_signature(seq, command, data):
  62. '''make a signature'''
  63. private_key = get_private_key()
  64. d = struct.pack("<II", seq, command)
  65. d += data
  66. if command != SECURE_COMMAND_GET_REMOTEID_SESSION_KEY:
  67. if session_key is None:
  68. print("No session key")
  69. raise Exception("No session key")
  70. d += session_key
  71. return monocypher.signature_sign(private_key, d)
  72. def request_session_key():
  73. '''request a session key'''
  74. global sequence
  75. sig = make_signature(sequence, SECURE_COMMAND_GET_REMOTEID_SESSION_KEY, bytes())
  76. node.request(dronecan.dronecan.remoteid.SecureCommand.Request(
  77. sequence=sequence,
  78. operation=SECURE_COMMAND_GET_REMOTEID_SESSION_KEY,
  79. sig_length=len(sig),
  80. data=sig),
  81. args.target_node,
  82. get_session_key_response)
  83. sequence = (sequence+1) % (1<<32)
  84. print("Requested session key")
  85. def config_change_response(reply):
  86. if not reply:
  87. # timed out
  88. return
  89. result_map = {
  90. 0: "ACCEPTED",
  91. 1: "TEMPORARILY_REJECTED",
  92. 2: "DENIED",
  93. 3: "UNSUPPORTED",
  94. 4: "FAILED" }
  95. result = result_map.get(reply.response.result, "invalid")
  96. print("Got change response: %s" % result)
  97. sys.exit(reply.response.result)
  98. def send_config_change():
  99. '''send remoteid config change'''
  100. global sequence
  101. req = args.paramop.encode('utf-8')
  102. sig = make_signature(sequence, SECURE_COMMAND_SET_REMOTEID_CONFIG, req)
  103. node.request(dronecan.dronecan.remoteid.SecureCommand.Request(
  104. sequence=sequence,
  105. operation=SECURE_COMMAND_SET_REMOTEID_CONFIG,
  106. sig_length=len(sig),
  107. data=req+sig),
  108. args.target_node,
  109. config_change_response)
  110. sequence = (sequence+1) % (1<<32)
  111. print("Requested config change")
  112. def update():
  113. now = time.time()
  114. global last_session_key_req, last_set_config, session_key
  115. if session_key is None and now - last_session_key_req > 2.0:
  116. last_session_key_req = now
  117. request_session_key()
  118. if session_key is not None and now - last_set_config > 2.0:
  119. last_set_config = now
  120. send_config_change()
  121. while True:
  122. try:
  123. update()
  124. node.spin(timeout=0.1)
  125. except Exception as ex:
  126. print(ex)