Skip to content

Core Classes

This page documents the core hardware interface classes provided by mock_machine.

Pin

Pin(id, mode=0, pull=0, value=None, drive=0, alt=-1)

Unittest support class for machine.Pin

Allows manual setting of input or output pin's value.

https://docs.micropython.org/en/latest/library/machine.Pin.html

Source code in mock_machine.py
def __init__(self, id, mode=0, pull=0, value=None, drive=0, alt=-1):
    self._value = value
    self._mode = mode
    self._pull = pull
    self._alt = None
    self._irq_handler = None
    self._irq_trigger = None

Functions

init(mode=0, pull=0, alt=0, value=None)

Source code in mock_machine.py
def init(self, mode=0, pull=0, alt=0, value=None):
    self._mode = mode
    self._pull = pull
    self._alt = alt
    self._value = value

value(new_value=None)

Source code in mock_machine.py
def value(self, new_value=None):
    if new_value is not None:
        old_value = self._value
        self._value = int(new_value)

        if self._irq_trigger and self._irq_handler:
            if self._irq_trigger & self.IRQ_RISING and self._value > old_value:
                micropython.schedule(self._irq_handler, self)
            if self._irq_trigger & self.IRQ_FALLING and self._value < old_value:
                micropython.schedule(self._irq_handler, self)
        return None
    return self._value

high()

Source code in mock_machine.py
def high(self):
    self.value(1)

low()

Source code in mock_machine.py
def low(self):
    self.value(0)

irq(handler=None, trigger=IRQ_FALLING | IRQ_RISING, priority=1, wake=None, hard=False)

Source code in mock_machine.py
def irq(
    self, handler=None, trigger=IRQ_FALLING | IRQ_RISING, priority=1, wake=None, hard=False
):
    self._irq_handler = handler
    self._irq_trigger = trigger
    return self  # non-standard return value

mode()

Source code in mock_machine.py
def mode(self):
    return self._mode

__call__(x=None)

Source code in mock_machine.py
def __call__(self, x=None):
    return self.value(x)

Pin Constants

  • Pin.IN - Input mode
  • Pin.OUT - Output mode
  • Pin.OPEN_DRAIN - Open drain mode
  • Pin.ALT - Alternate function mode
  • Pin.ALT_OPEN_DRAIN - Alternate open drain mode
  • Pin.ANALOG - Analog mode
  • Pin.PULL_UP - Enable pull-up resistor
  • Pin.PULL_DOWN - Enable pull-down resistor
  • Pin.IRQ_RISING - Interrupt on rising edge
  • Pin.IRQ_FALLING - Interrupt on falling edge

I2C

I2C(*args, id=None, scl=None, sda=None, freq=400000)

Unittest support class for machine.I2C.

https://docs.micropython.org/en/latest/library/machine.I2C.html

Construct a new I2C object.

Initialise mock I2C with register values.

Source code in mock_machine.py
def __init__(self, *args, id=None, scl=None, sda=None, freq=400000):  # pylint: disable=unused-argument,redefined-builtin
    """
    Construct a new I2C object.

    Initialise mock I2C with register values.
    """
    self.devices = {}

Functions

init(scl, sda, *args, freq=400000)

Initialise the I2C bus.

Source code in mock_machine.py
def init(self, scl, sda, *args, freq=400000):
    """
    Initialise the I2C bus.
    """

deinit()

Turn off the I2C bus.

Source code in mock_machine.py
def deinit(self):
    """
    Turn off the I2C bus.
    """

scan()

Scan I2C for responding devices.

Scans all I2C addresses between 0x08 and 0x77 inclusive and return a list of those that respond.

Returns a list of addresses that responded to the scan.

Source code in mock_machine.py
def scan(self):
    """
    Scan I2C for responding devices.

    Scans all I2C addresses between 0x08 and 0x77 inclusive and return a list of those that
    respond.

    Returns a list of addresses that responded to the scan.
    """
    scan_list = []
    for addr in self.devices:
        if 0x08 <= addr <= 0x77:
            scan_list.append(addr)
    return scan_list

readfrom(addr, nbytes, stop=True)

Read nbytes from the peripheral specified by addr.

If stop is true then a STOP condition is generated at the end of the transfer.

Returns a bytes object with the data read.

Source code in mock_machine.py
def readfrom(self, addr, nbytes, stop=True):
    """
    Read nbytes from the peripheral specified by addr.

    If stop is true then a STOP condition is generated at the end of the transfer.

    Returns a bytes object with the data read.
    """
    if addr not in self.devices:
        raise OSError(errno.ENODEV)
    return self.devices[addr].readfrom(nbytes, stop)

readfrom_into(addr, buf, stop=True)

Read into buf from the peripheral specified by addr.

The number of bytes read will be the length of buf. If stop is true then a STOP condition is generated at the end of the transfer.

The method returns None.

Source code in mock_machine.py
def readfrom_into(self, addr, buf, stop=True):
    """
    Read into buf from the peripheral specified by addr.

    The number of bytes read will be the length of buf. If stop is true then a STOP condition
    is generated at the end of the transfer.

    The method returns None.
    """
    if addr not in self.devices:
        raise OSError(errno.ENODEV)
    return self.devices[addr].readfrom_into(buf, stop)

writeto(addr, buf, stop=True)

Write the bytes from buf to the peripheral specified by addr.

If a NACK is received following the write of a byte from buf then the remaining bytes are not sent. If stop is true then a STOP condition is generated at the end of the transfer, even if a NACK is received.

The function returns the number of ACKs that were received.

Source code in mock_machine.py
def writeto(self, addr, buf, stop=True):
    """
    Write the bytes from buf to the peripheral specified by addr.

    If a NACK is received following the write of a byte from buf then the remaining bytes are
    not sent. If stop is true then a STOP condition is generated at the end of the transfer,
    even if a NACK is received.

    The function returns the number of ACKs that were received.
    """
    if addr not in self.devices:
        raise OSError(errno.ENODEV)
    return self.devices[addr].writeto(buf, stop)

readfrom_mem(addr, memaddr, nbytes, *args, addrsize=8)

Read nbytes from the peripheral specified by addr, starting at memaddr.

Starting from the memory address specified by memaddr. The argument addrsize specifies the address size in bits.

Returns a bytes object with the data read.

Source code in mock_machine.py
def readfrom_mem(self, addr, memaddr, nbytes, *args, addrsize=8):  # pylint: disable=unused-argument
    """
    Read nbytes from the peripheral specified by addr, starting at memaddr.

    Starting from the memory address specified by memaddr. The argument addrsize specifies the
    address size in bits.

    Returns a bytes object with the data read.
    """
    if addr not in self.devices:
        raise OSError(errno.ENODEV)
    return self.devices[addr].readfrom_mem(memaddr, nbytes)

readfrom_mem_into(addr, memaddr, buf, *args, addrsize=8)

Read into buf from the peripheral specified by addr, starting at memaddr.

The number of bytes read is the length of buf. The argument addrsize specifies the address size in bits.

The method returns None.

Source code in mock_machine.py
def readfrom_mem_into(self, addr, memaddr, buf, *args, addrsize=8):
    """
    Read into buf from the peripheral specified by addr, starting at memaddr.

    The number of bytes read is the length of buf. The argument addrsize specifies the address
    size in bits.

    The method returns None.
    """
    if addr not in self.devices:
        raise OSError(errno.ENODEV)
    return self.devices[addr].readfrom_mem_into(memaddr, buf)

writeto_mem(addr, memaddr, buf, *args, addrsize=8)

Write buf to the peripheral specified by addr, starting at memaddr.

The number of bytes written is the length of buf. The argument addrsize specifies the address size in bits.

The method returns None.

Source code in mock_machine.py
def writeto_mem(self, addr, memaddr, buf, *args, addrsize=8):
    """
    Write buf to the peripheral specified by addr, starting at memaddr.

    The number of bytes written is the length of buf. The argument addrsize specifies the
    address size in bits.

    The method returns None.
    """
    if addr not in self.devices:
        raise OSError(errno.ENODEV)
    self.devices[addr].writeto_mem(memaddr, buf)

add_device(device)

Add I2CDevice at address I2CDevice.address.

Source code in mock_machine.py
def add_device(self, device):
    """
    Add I2CDevice at address I2CDevice.address.
    """
    if device.addr in self.devices:
        raise ValueError("Device with given address already on bus.")
    self.devices[device.addr] = device

SPI

SPI(id=None)

Unittest support class for machine.SPI

https://docs.micropython.org/en/latest/library/machine.SPI.html

Construct an SPI object on the given bus, id.

Source code in mock_machine.py
def __init__(self, id=None):  # pylint: disable=unused-argument,redefined-builtin
    """
    Construct an SPI object on the given bus, id.
    """
    # Use write_buf/read_buf if dealing with the same data regardless of the
    # number of calls.
    self.write_buf = bytearray()  # what has been written last
    self.read_buf = bytearray()  # what to return from read

    # Use writes/reads if dealing with different data in different calls.
    self.writes = []  # what has been written in order of calls
    self.reads = []  # what to return from read in order of calls
    self.num_reads = 0  # the number of reads so far

Functions

init(baudrate=1000000, *, polarity=0, phase=0, bits=8, firstbit=None, sck=None, mosi=None, miso=None, pins=None)

Initialise the SPI bus with the given parameters

Source code in mock_machine.py
def init(
    self,
    baudrate=1000000,
    *,
    polarity=0,
    phase=0,
    bits=8,
    firstbit=None,
    sck=None,
    mosi=None,
    miso=None,
    pins=None,
):
    """
    Initialise the SPI bus with the given parameters
    """

deinit()

Turn off the SPI bus

Source code in mock_machine.py
def deinit(self):
    """
    Turn off the SPI bus
    """

read(nbytes, write=0)

Read a number of bytes specified by nbytes.

Continuously writes the single byte given by write, to ensure read data is clocked.

Returns a bytes object with the data that was read.

Source code in mock_machine.py
def read(self, nbytes, write=0x00):  # pylint: disable=unused-argument
    """
    Read a number of bytes specified by nbytes.

    Continuously writes the single byte given by write, to ensure read data is clocked.

    Returns a bytes object with the data that was read.
    """
    # Return the data from read_buf again and again, if specified.
    if self.read_buf:
        return self.read_buf[:nbytes]

    # Otherwise, return the data from reads in order of calls, if specified.
    if self.reads:
        index = self.num_reads
        self.num_reads += 1
        return self.reads[index]

    raise NotImplementedError

readinto(buf, write=0)

Read into the buffer specified by buf.

Number of bytes read is the length of the buffer. Continuously writing the single byte given by write, to ensure read data is clocked.

Returns None.

Source code in mock_machine.py
def readinto(self, buf, write=0x00):
    """
    Read into the buffer specified by buf.

    Number of bytes read is the length of the buffer.
    Continuously writing the single byte given by write, to ensure read data is clocked.

    Returns None.
    """
    buf[:] = self.read(len(buf))

write(buf)

Write the bytes contained in buf.

Returns None.

Source code in mock_machine.py
def write(self, buf):
    """
    Write the bytes contained in buf.

    Returns None.
    """
    # Always record copy of the last written data to write_buf.
    self.write_buf = bytes(buf)

    # Always append copy of the written data to writes in order of calls.
    self.writes.append(bytes(buf))

write_readinto(write_buf, read_buf)

Write the bytes from write_buf while reading into read_buf.

The buffers can be the same or different, but both buffers must have the same length.

Returns None.

Source code in mock_machine.py
def write_readinto(self, write_buf, read_buf):
    """
    Write the bytes from write_buf while reading into read_buf.

    The buffers can be the same or different, but both buffers must have the same length.

    Returns None.
    """
    self.readinto(read_buf)
    self.write_buf = write_buf

reset()

Reset to the initial state of the mock.

Source code in mock_machine.py
def reset(self):
    """
    Reset to the initial state of the mock.
    """
    self.writes = []
    self.reads = []
    self.num_reads = 0

ADC

ADC(pin)

Source code in mock_machine.py
def __init__(self, pin):
    self.pin = pin

Functions

init(*, sample_ns, atten)

Apply the given settings to the ADC.

Only those arguments that are specified will be changed.

Source code in mock_machine.py
def init(self, *, sample_ns, atten):
    """
    Apply the given settings to the ADC.

    Only those arguments that are specified will be changed.
    """

block()

Return the ADCBlock instance associated with this ADC object.

Source code in mock_machine.py
def block(self):
    """
    Return the ADCBlock instance associated with this ADC object.
    """

read_u16()

Take an analog reading and return an integer in the range 0-65535.

The return value represents the raw reading taken by the ADC.

Source code in mock_machine.py
def read_u16(self):
    """
    Take an analog reading and return an integer in the range 0-65535.

    The return value represents the raw reading taken by the ADC.
    """
    if self.pin in ADC.pin_adc_map:
        return ADC.pin_adc_map[self.pin]

    return 0

read_uv()

Take an analog reading and return an integer value with units of microvolts.

It is up to the particular port whether or not this value is calibrated, and how calibration is done. Note: stm32 port does not include this functionality as of August 2023.

Source code in mock_machine.py
def read_uv(self):
    """
    Take an analog reading and return an integer value with units of microvolts.

    It is up to the particular port whether or not this value is calibrated, and how
    calibration is done.
    Note: stm32 port does not include this functionality as of August 2023.
    """
    return self.value_uv

PWM

PWM(dest, *, freq, duty_u16=None, duty_ns=None, invert=False)

https://docs.micropython.org/en/latest/library/machine.PWM.html

Source code in mock_machine.py
def __init__(
    self,
    dest: Pin,
    *,
    freq: int,
    duty_u16: Optional[int] = None,
    duty_ns: Optional[int] = None,
    invert: bool = False,
):
    self._pin = dest
    self._freq = freq
    self._duty_u16 = duty_u16
    self._duty_ns = duty_ns
    if duty_ns is None and duty_u16 is None:
        raise ValueError("Needs duty_ns or duty_u16 provided")
    self.invert = invert

Functions

init(freq, duty_u16, duty_ns)

Source code in mock_machine.py
def init(self, freq, duty_u16, duty_ns):
    self._freq = freq
    self._duty_u16 = duty_u16
    self._duty_ns = duty_ns

deinit()

Source code in mock_machine.py
def deinit(self):
    self._freq = None
    self._duty_u16 = None
    self._duty_ns = None

freq(value=None)

Source code in mock_machine.py
def freq(self, value=None):
    if value is None:
        return self._freq
    self._freq = value

duty_u16(value=None)

Source code in mock_machine.py
def duty_u16(self, value=None):
    if value is None:
        return self._duty_u16
    self._duty_u16 = value

duty_ns(value=None)

Source code in mock_machine.py
def duty_ns(self, value=None):
    if value is None:
        return self._duty_ns
    self._duty_ns = value

UART

UART(id=None, baudrate=9600, bits=8, parity=None, stop=1, tx=None, rx=None, txbuf=256, rxbuf=256, timeout=0, timeout_char=0, invert=0, flow=0, read_buf_len=256, data_for_read=b'')

Bases: IOBase

Mock UART https://docs.micropython.org/en/latest/library/machine.UART.html

Source code in mock_machine.py
def __init__(
    self,
    id=None,
    baudrate=9600,
    bits=8,
    parity=None,
    stop=1,
    tx=None,
    rx=None,
    txbuf=256,
    rxbuf=256,
    timeout=0,
    timeout_char=0,
    invert=0,
    flow=0,
    read_buf_len=256,
    data_for_read=b"",
):
    rx_size = rxbuf if rxbuf != 256 else read_buf_len
    tx_size = txbuf

    # Create RingIO buffers for RX and TX
    self._rx_ring = micropython.RingIO(rx_size)
    self._tx_ring = micropython.RingIO(tx_size)

    if data_for_read:
        self.inject_data(data_for_read)

    super().__init__()

Functions

write(data)

Source code in mock_machine.py
def write(self, data):
    return self._tx_ring.write(data)

read(nbytes=-1)

Source code in mock_machine.py
def read(self, nbytes=-1):
    if nbytes == -1:
        return self._rx_ring.read()
    return self._rx_ring.read(nbytes)

readinto(buf, nbytes=None)

Source code in mock_machine.py
def readinto(self, buf, nbytes=None):
    if nbytes is None:
        nbytes = len(buf)
    return self._rx_ring.readinto(buf, nbytes)

readline()

Source code in mock_machine.py
def readline(self):
    return self._rx_ring.readline()

flush()

Source code in mock_machine.py
def flush(self):
    pass

close()

Source code in mock_machine.py
def close(self):
    pass

any()

Source code in mock_machine.py
def any(self):
    return self._rx_ring.any()

ioctl(op, arg)

Source code in mock_machine.py
def ioctl(self, op, arg):
    # From micropython/py/stream.h
    # This allows the class to be used in asyncio.StreamReader etc.
    _MP_STREAM_POLL = const(3)
    _MP_STREAM_POLL_RD = const(0x0001)
    _MP_STREAM_FLUSH = const(1)
    _MP_STREAM_CLOSE = const(4)

    if op == _MP_STREAM_POLL:
        ret = _MP_STREAM_POLL_RD if self.any() else 0
        return ret
    elif op == _MP_STREAM_FLUSH:
        return self.flush()
    elif op == _MP_STREAM_CLOSE:
        return self.close()

    # Return -1 for unsupported ioctl operations
    return -1

Timer

Timer(id=0, channel=None, mode=None, period=None, callback=None)

https://docs.micropython.org/en/latest/library/machine.Timer.html

Source code in mock_machine.py
def __init__(self, id=0, channel=None, mode=None, period=None, callback=None):
    self._id = id  # Micropython timer first positional param is id.
    self._start = None
    self._period = period
    self._mode = mode
    self._callback = callback
    self._task = None
    self._channel = channel
    self._freq = 1000  # default frequency

    if mode or period or callback:
        self.init(freq=self._freq, mode=mode, period=period, callback=callback)

Functions

init(freq=1000, mode=PERIODIC, period=-1, callback=None)

Source code in mock_machine.py
def init(self, freq=1000, mode=PERIODIC, period=-1, callback=None):
    if self._task:
        self._task.cancel()
    self._freq = freq or 1000
    self._mode = mode
    self._period = period if period > 0 else 1000  # fallback to 1s
    self._callback = callback
    self._task = asyncio.create_task(self._timer())

deinit()

Source code in mock_machine.py
def deinit(self):
    if self._task:
        self._task.cancel()
        self._task = None

Timer Constants

  • Timer.ONE_SHOT - One-shot timer mode
  • Timer.PERIODIC - Periodic timer mode

WDT

WDT(timeout)

https://docs.micropython.org/en/latest/library/machine.WDT.html

Source code in mock_machine.py
def __init__(self, timeout):
    self._timeout = timeout
    self._last_pat = time.ticks_ms()
    self._disabled = False
    self.running = True
    self._task = asyncio.create_task(self._tick())

Functions

disable()

Source code in mock_machine.py
def disable(self):
    self._disabled = True
    self.running = False

feed()

Source code in mock_machine.py
def feed(self):
    assert self._disabled or self.running, "you are late, WDT already stopped"
    self._last_pat = time.ticks_ms()

RTC

RTC()

https://docs.micropython.org/en/latest/library/machine.RTC.html

Source code in mock_machine.py
def __init__(self):
    self._callback = None
    self._timeout = None
    self._stop = False
    self._running = False
    self._task = None  # don't start task yet

Functions

wakeup(timeout=None, callback=None)

Register a wakeup timer.

:param timeout: Wakeup interval in ms :param callback: Function to call on wakeup

Source code in mock_machine.py
def wakeup(self, timeout=None, callback=None):
    """
    Register a wakeup timer.

    :param timeout: Wakeup interval in ms
    :param callback: Function to call on wakeup
    """
    log.info("RTC: Registering a wakeup in: %s, callback is: %s", timeout, callback)

    self._timeout = timeout
    self._callback = callback
    if callback:
        assert callable(self._callback)

    # Start the async loop only when first used
    if not self._task:
        self._task = asyncio.create_task(self._wakeup())

stop()

Source code in mock_machine.py
def stop(self):
    log.info("RTC: Calling Stop")
    self._stop = True
    if self._task:
        while self._running:
            pass
        self._task = None

info() staticmethod

Source code in mock_machine.py
@staticmethod
def info():
    return 1 << 28  # Default "RTC is good" value

init() staticmethod

Source code in mock_machine.py
@staticmethod
def init():
    pass

datetime(datetime=None) staticmethod

Source code in mock_machine.py
@staticmethod
def datetime(datetime=None):
    now = time.time()
    t = tuple(time.localtime(now))
    return t[:3] + (t[6],) + t[3:6] + (round(now * 1000000) % 1000000,)

Signal

Signal(pin, invert)

https://docs.micropython.org/en/latest/library/machine.Signal.html

Source code in mock_machine.py
def __init__(self, pin, invert):
    self.pin = pin
    self.invert = invert

Functions

value(val)

Source code in mock_machine.py
def value(self, val):
    self.pin.value(val)
    return self.pin.value()