Source code for tonos_ts4.ts4

"""
    This file is part of Ever OS.

    Ever OS is free software: you can redistribute it and/or modify
    it under the terms of the Apache License 2.0 (http://www.apache.org/licenses/)

    Copyright 2019-2021 TON LABS, 2022 Everscale Guild
"""

import sys
import base64
import secrets
import json
import numbers
import re
import copy
import os.path
from glob import glob

from .util      import *
from .core      import *
from .address   import *
from .abi       import *
from .decoder   import *
from .dump      import *
from .global_functions  import *

from .globals       import core
from .BaseContract  import BaseContract, decode_contract_answer

__version__ = version()

core = load_linker_lib()
globals.set_core(core)

class BaseException(Exception):
    def __init__(self, msg):
        super(Exception, self).__init__(msg)
    def clone(self):
        return ts4.BaseException(str(self))

class UnexpectedExitCodeException(BaseException):
    def __init__(self, expected, real):
        self.expected   = expected
        self.real       = real
        super(BaseException, self).__init__(
            'Unexpected exit code: {}, expected {}'.format(real, expected))
    def clone(self):
        return UnexpectedExitCodeException(self.expected, self.real)

def translate_exception(exception):
    # print('translate_exception:', exception)
    if globals.G_SHOW_FULL_STACKTRACE:
        return exception
    verbose_(exception)
    if isinstance(exception, ts4.BaseException):
        return exception.clone()
    return exception

# TODO: Global decoding params. Add documentation
decoder = Decoder.defaults()

def check_exitcode(expected_ec, real_ec):
    expected_ec = either_or(ts4.globals.G_OVERRIDE_EXPECT_EC, expected_ec)
    assert isinstance(expected_ec, list)
    if real_ec not in expected_ec:
        xtra = None
        if real_ec == 51:   xtra = 'Calling of contract\'s constructor that has already been called.'
        if real_ec == 52:   xtra = 'Replay protection exception.'
        if real_ec == 60:   xtra = 'Inbound message has wrong function id.'
        if real_ec == 76:   xtra = 'Public function was called before constructor.'
        # TODO: add more codes here...

        if xtra is not None:
            xtra = ': ' + xtra
        else:
            xtra = ''
        last_error = globals.core.get_last_error_msg()
        if last_error is not None:
            verbose_('{}{}'.format(last_error, xtra))
        if globals.G_STOP_AT_CRASH:
            raise UnexpectedExitCodeException(expected_ec, real_ec)

def process_actions(result: ExecutionResult, expect_ec = [0]):
    # print('process_actions: expect_ec = {}'.format(expect_ec))
    assert isinstance(expect_ec, list)
    assert isinstance(result, ExecutionResult)
    ec = result.exit_code

    if globals.G_VERBOSE:
        if ec != 0:
            print(grey('    exit_code: ') + yellow(ec) + '\n')

    if ec not in expect_ec:
        verbose_(globals.core.get_last_error_msg())

    check_exitcode(expect_ec, ec)

    if result.error is not None:
        raise ts4.BaseException("Transaction aborted: {}".format(result.error))

    answer = None

    for j in result.actions:
        # print(j)
        msg = Msg(json.loads(j))
        # if globals.G_VERBOSE:
            # print('process msg:', msg)
        # print('process msg:', msg)
        if msg.is_event():
            if globals.G_VERBOSE or globals.G_SHOW_EVENTS:
                # TODO: move this printing code to a separate function and file
                xtra = ''
                params = msg.params
                if msg.is_event('DebugEvent'):
                    xtra = ' ={}'.format(decode_int(params['x']))
                elif msg.is_event('LogEvent'):
                    params['comment'] = bytearray.fromhex(params['comment']).decode()
                print(bright_blue('< event') + grey(': '), end='')
                print(cyan('          '), grey('<-'), bright_cyan(format_addr(msg.src)))
                print(cyan(grey('    name:   ') + cyan('{}'.format(bright_cyan(msg.event)))))
                print(grey('    params: ') + cyan(Params.stringify(params)), cyan(xtra), '\n')
            globals.EVENTS.append(msg)
        else:
            # not event
            if msg.is_unknown():
                #print(msg)
                if globals.G_VERBOSE:
                    print(yellow('WARNING! Unknown message!'))
            elif msg.is_bounced():
                pass
            elif msg.is_answer():
                # We expect only one answer
                assert answer is None
                answer = msg
                continue
            else:
                assert msg.is_call() or msg.is_empty(), red('Unexpected type: {}'.format(msg.type))
            globals.QUEUE.append(msg)
    return (result.gas_used, answer)

[docs]def dispatch_messages(callback = None, limit = None, expect_ec = [0]): """Dispatches all messages in the queue one by one until the queue becomes empty. :param callback: Callback to be called for each processed message. If callback returns False then the given message is skipped. :param num limit: Limit the number of processed messages by a given value. :param num expect_ec: List of expected exit codes :return: False if queue was empty, True otherwise :rtype: bool """ count = 0 while len(globals.QUEUE) > 0: count = count + 1 msg = peek_msg() if callback is not None and callback(msg, False) == False: pop_msg() continue dispatch_one_message(expect_ec) if callback is not None: callback(msg, True) if limit is not None: if count >= limit: break return count > 0
[docs]def dispatch_one_message(expect_ec = 0): """Takes first unprocessed message from the queue and dispatches it. Use `expect_ec` parameter if you expect non-zero exit code. :param num expect_ec: Expected exit code :return: The amount of gas spent on the execution of the transaction :rtype: num """ if isinstance(expect_ec, int): expect_ec = [expect_ec] msg = pop_msg() globals.ALL_MESSAGES.append(msg) dump1 = globals.G_VERBOSE or globals.G_DUMP_MESSAGES dump2 = globals.G_MSG_FILTER is not None and globals.G_MSG_FILTER(msg.data) if dump1 or dump2: print_int_msg(msg) if msg.dst.is_none(): # TODO: a getter's reply. Add a test for that return # dump_struct(msg.data) # CONFIRM_INPUT_ADDR = Address('-31:16653eaf34c921467120f2685d425ff963db5cbb5aa676a62a2e33bfc3f6828a') # if msg.dst == CONFIRM_INPUT_ADDR: # verbose_('!!!!!!!!!!!!') result = dispatch_message_ext(msg.id) globals.G_LAST_GAS_USED = result.gas_used # print('actions =', result.actions) exception = None try: gas, answer = process_actions(result, expect_ec) except Exception as err: exception = translate_exception(err) if exception is not None: raise exception if result.debot_answer_msg is not None: answer_msg = Msg(json.loads(result.debot_answer_msg)) # verbose_(answer_msg) globals.QUEUE.append(answer_msg) if answer is not None: # verbose_('debot_answer = {}'.format(answer)) translated_msg = core.debot_translate_getter_answer(answer.id) # verbose_('translated_msg = {}'.format(translated_msg)) translated_msg = Msg(json.loads(translated_msg)) # verbose_(translated_msg) globals.QUEUE.append(translated_msg) return gas
######################################################################################################### # TODO: add docs? class BalanceWatcher: def __init__(self, contract): self.contract_ = contract self.balance_ = contract.balance self.epsilon_ = 2 def ensure_change(self, expected_diff, epsilon = None): cur_balance = self.contract_.balance prev_balance = self.balance_ ensure_balance(prev_balance + expected_diff, cur_balance, epsilon = either_or(epsilon, self.epsilon_)) self.balance_ = cur_balance #########################################################################################################