|
| 1 | +# SPDX-License-Identifier: MIT |
| 2 | +# Copyright (C) 2025, CodeMagic LTD |
| 3 | + |
| 4 | +import asyncio |
| 5 | +import os |
| 6 | +from pathlib import Path |
| 7 | + |
| 8 | +import requests |
| 9 | +from littlefs import LittleFS |
| 10 | + |
| 11 | +from wokwi_client import GET_TOKEN_URL, WokwiClient |
| 12 | + |
| 13 | +EXAMPLE_DIR = Path(__file__).parent |
| 14 | +FIRMWARE_NAME = "ESP32_GENERIC-20250415-v1.25.0.bin" |
| 15 | +FIRMWARE_FILE = EXAMPLE_DIR / FIRMWARE_NAME |
| 16 | +FIRMWARE_URL = f"https://micropython.org/resources/firmware/{FIRMWARE_NAME}" |
| 17 | +FIRMWARE_OFFSET = 0x1000 |
| 18 | +FLASH_SIZE = 0x400000 |
| 19 | +FS_OFFSET = 0x200000 |
| 20 | +FS_SIZE = 0x200000 |
| 21 | +SLEEP_TIME = int(os.getenv("WOKWI_SLEEP_TIME", "3")) |
| 22 | + |
| 23 | +MICROPYTHON_CODE = """ |
| 24 | +# This is a MicroPython script that runs on the simulated ESP32 chip. |
| 25 | +# It prints some information about the ESP32 chip and the filesystem. |
| 26 | +
|
| 27 | +import esp |
| 28 | +import esp32 |
| 29 | +import gc |
| 30 | +import machine |
| 31 | +import os |
| 32 | +
|
| 33 | +print("--------------------------------") |
| 34 | +print("Hello, MicroPython! I'm running on a Wokwi ESP32 simulator.") |
| 35 | +print("--------------------------------") |
| 36 | +
|
| 37 | +# Chip information |
| 38 | +print(f"CPU Frequency: {machine.freq()}") |
| 39 | +mac_address = ':'.join(['{:02x}'.format(x) for x in machine.unique_id()]) |
| 40 | +print(f"MAC Address: {mac_address}") |
| 41 | +
|
| 42 | +# Memory information |
| 43 | +print(f"Flash Size: {esp.flash_size() // 1024} kB") |
| 44 | +print(f"Total Heap: {gc.mem_alloc() + gc.mem_free()} bytes") |
| 45 | +print(f"Free Heap: {gc.mem_free()} bytes") |
| 46 | +
|
| 47 | +# Filesystem information |
| 48 | +statvfs = os.statvfs("/") |
| 49 | +block_size, free_blocks = statvfs[0], statvfs[3] |
| 50 | +print("Filesystem:") |
| 51 | +print(f" Root path : /") |
| 52 | +print(f" Block size: {block_size} bytes") |
| 53 | +print(f" Free space: {block_size * free_blocks} bytes") |
| 54 | +print("--------------------------------") |
| 55 | +""" |
| 56 | + |
| 57 | + |
| 58 | +async def main() -> None: |
| 59 | + token = os.getenv("WOKWI_CLI_TOKEN") |
| 60 | + if not token: |
| 61 | + raise SystemExit( |
| 62 | + f"Set WOKWI_CLI_TOKEN in your environment. You can get it from {GET_TOKEN_URL}." |
| 63 | + ) |
| 64 | + |
| 65 | + if not FIRMWARE_FILE.exists(): |
| 66 | + print(f"Downloading {FIRMWARE_NAME} from {FIRMWARE_URL}") |
| 67 | + response = requests.get(FIRMWARE_URL) |
| 68 | + response.raise_for_status() |
| 69 | + with open(FIRMWARE_FILE, "wb") as f: |
| 70 | + f.write(response.content) |
| 71 | + |
| 72 | + firmware_data = bytearray(FLASH_SIZE) |
| 73 | + with open(FIRMWARE_FILE, "rb") as f: |
| 74 | + firmware_data[FIRMWARE_OFFSET : FIRMWARE_OFFSET + f.tell()] = f.read() |
| 75 | + |
| 76 | + # Inject some micropython code into a littlefs filesystem inside the firmware image |
| 77 | + lfs = LittleFS(block_size=4096, block_count=512, prog_size=256) |
| 78 | + with lfs.open("main.py", "w") as lfs_file: |
| 79 | + lfs_file.write(MICROPYTHON_CODE) |
| 80 | + firmware_data[FS_OFFSET : FS_OFFSET + FS_SIZE] = lfs.context.buffer |
| 81 | + |
| 82 | + with open(EXAMPLE_DIR / "firmware.bin", "wb") as f: |
| 83 | + f.write(bytes(firmware_data)) |
| 84 | + |
| 85 | + client = WokwiClient(token) |
| 86 | + print(f"Wokwi client library version: {client.version}") |
| 87 | + |
| 88 | + hello = await client.connect() |
| 89 | + print("Connected to Wokwi Simulator, server version:", hello["version"]) |
| 90 | + |
| 91 | + # Upload the diagram and firmware files |
| 92 | + await client.upload_file("diagram.json", EXAMPLE_DIR / "diagram.json") |
| 93 | + await client.upload(FIRMWARE_NAME, bytes(firmware_data)) |
| 94 | + |
| 95 | + # Start the simulation |
| 96 | + await client.start_simulation(firmware=FIRMWARE_NAME, elf=FIRMWARE_NAME) |
| 97 | + |
| 98 | + # Stream serial output for a few seconds |
| 99 | + serial_task = asyncio.create_task(client.serial_monitor_cat()) |
| 100 | + print(f"Simulation started, waiting for {SLEEP_TIME} seconds…") |
| 101 | + await client.wait_until_simulation_time(SLEEP_TIME) |
| 102 | + serial_task.cancel() |
| 103 | + |
| 104 | + # Disconnect from the simulator |
| 105 | + await client.disconnect() |
| 106 | + |
| 107 | + |
| 108 | +if __name__ == "__main__": |
| 109 | + asyncio.run(main()) |
0 commit comments