Source code for starcoin.sdk.utils
# Copyright (c) The Diem Core Contributors
# SPDX-License-Identifier: Apache-2.0
# Copyright (c) The starcoin Core Contributors
"""Utilities for data type converting, construction and hashing."""
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ed25519
import hashlib
import typing
from starcoin import starcoin_types
from starcoin import serde_types
from starcoin import bcs
SUB_ADDRESS_LEN: int = 8
STARCOIN_HASH_PREFIX: bytes = b"STARCOIN::"
CORE_CODE_ADDRESS: str = "0x00000000000000000000000000000001"
ACCOUNT_ADDRESS_LEN: int = 16
RESOURCE_TAG: int = 1
[docs]class InvalidAccountAddressError(Exception):
pass
[docs]class InvalidSubAddressError(Exception):
pass
[docs]class InvalidSignedMessage(Exception):
pass
[docs]def hex_to_tuple(input: str) -> tuple:
if input.startswith("0x"):
input = input[2:]
return tuple(serde_types.uint8(x) for x in bytes.fromhex(input))
[docs]def account_address(addr: typing.Union[starcoin_types.AccountAddress, bytes, str]) -> starcoin_types.AccountAddress:
"""convert an account address from hex-encoded or bytes into `starcoin_types.AccountAddress`
Returns given address if it is `starcoin_types.AccountAddress` already
"""
if isinstance(addr, starcoin_types.AccountAddress):
return addr
try:
if isinstance(addr, str):
return starcoin_types.AccountAddress(hex_to_tuple(addr))
return starcoin_types.AccountAddress(tuple(serde_types.uint8(x) for x in addr))
except ValueError as e:
raise InvalidAccountAddressError(e)
[docs]def account_address_hex(addr: typing.Union[starcoin_types.AccountAddress, str]) -> str:
"""convert `starcoin_types.AccountAddress` into hex-encoded string
This function converts given parameter into account address bytes first, then convert bytes
into hex-encoded string
"""
return account_address_bytes(addr).hex()
[docs]def account_address_bytes(addr: typing.Union[starcoin_types.AccountAddress, str]) -> bytes:
"""convert `starcoin_types.AccountAddress` or hex-encoded account address into bytes"""
if isinstance(addr, str):
return account_address_bytes(account_address(addr))
return bytes(typing.cast(typing.Iterable[int], addr.value))
[docs]def sub_address(addr: typing.Union[str, bytes]) -> bytes:
"""convert hex-encoded sub-address into bytes
This function validates bytes length, and raises `InvalidSubAddressError` if length
does not match sub-address length (8 bytes)
"""
ret = bytes.fromhex(addr) if isinstance(addr, str) else addr
if len(ret) != SUB_ADDRESS_LEN:
raise InvalidSubAddressError(
f"{addr}(len={len(ret)}) is a valid sub-address, sub-address is {SUB_ADDRESS_LEN} bytes"
)
return ret
[docs]def public_key_bytes(public_key: ed25519.Ed25519PublicKey) -> bytes:
"""convert cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PublicKey into bytes"""
return public_key.public_bytes(encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw)
[docs]def currency_code(code: str) -> starcoin_types.TypeTag:
"""converts currency code string to starcoin_types.TypeTag"""
if isinstance(code, str):
return starcoin_types.TypeTag__Struct(
value=starcoin_types.StructTag(
address=account_address(CORE_CODE_ADDRESS),
module=starcoin_types.Identifier(code),
name=starcoin_types.Identifier(code),
type_params=[],
)
)
raise TypeError(f"unknown currency code type: {code}")
[docs]def currency_user_code(address: str, code: str) -> starcoin_types.TypeTag:
"""converts currency code string to starcoin_types.TypeTag"""
if isinstance(code, str):
return starcoin_types.TypeTag__Struct(
value=starcoin_types.StructTag(
address=account_address(address),
module=starcoin_types.Identifier(code),
name=starcoin_types.Identifier(code),
type_params=[],
)
)
raise TypeError(f"unknown currency code type: {code}")
[docs]def type_tag_to_str(code: starcoin_types.TypeTag) -> str:
"""converts currency code TypeTag into string"""
if isinstance(code, starcoin_types.TypeTag__Struct):
if isinstance(self, TypeTag__Struct):
return self.value.name.value
raise TypeError(f"unknown currency code type: {self}")
raise TypeError(f"unknown currency code type: {code}")
[docs]def create_signed_transaction(
txn: starcoin_types.RawTransaction, public_key: bytes, signature: bytes
) -> starcoin_types.SignedUserTransaction:
"""create single signed `starcoin_types.SignedTransaction`"""
return starcoin_types.SignedUserTransaction(
raw_txn=txn,
authenticator=starcoin_types.TransactionAuthenticator__Ed25519(
public_key=starcoin_types.Ed25519PublicKey(value=public_key),
signature=starcoin_types.Ed25519Signature(value=signature),
),
)
[docs]def raw_transaction_signing_msg(txn: starcoin_types.RawTransaction) -> bytes:
"""create signing message from given `starcoin_types.RawTransaction`"""
return starcoin_hash_seed(b"RawUserTransaction") + txn.bcs_serialize()
[docs]def transaction_hash(txn: starcoin_types.SignedUserTransaction) -> str:
"""create transaction hash from given `starcoin_types.SignedTransaction`
"""
user_txn = starcoin_types.Transaction__UserTransaction(value=txn)
return hash(starcoin_hash_seed(b"Transaction"), user_txn.bcs_serialize()).hex()
[docs]def starcoin_hash_seed(typ: bytes) -> bytes:
return hash(STARCOIN_HASH_PREFIX, typ)
[docs]def hash(b1: bytes, b2: bytes) -> bytes:
hash = hashlib.sha3_256()
hash.update(b1)
hash.update(b2)
return hash.digest()
[docs]def payload_bcs_decode(payload: str) -> typing.Union[starcoin_types.Script, starcoin_types.Package]:
payload = starcoin_types.TransactionPayload.bcs_deserialize(
bytes.fromhex(payload[2:])).value
return payload
[docs]def verify_signed_message(signed_message_hex: str) -> starcoin_types.SignedMessage:
if signed_message_hex.startswith("0x"):
signed_message_hex = signed_message_hex[2:]
signed_message_bytes = bytes.fromhex(signed_message_hex)
try:
signed_message = starcoin_types.SignedMessage.bcs_deserialize(
signed_message_bytes)
public_key = signed_message.authenticator.public_key.value
signature = signed_message.authenticator.signature.value
data = starcoin_hash_seed(b"SigningMessage") + \
signed_message.signing_message.bcs_serialize()
ed = ed25519.Ed25519PublicKey.from_public_bytes(public_key)
ed.verify(signature, data)
except Exception as e:
raise InvalidSignedMessage(e)
return signed_message