Simple test¶
Ensure your device works with this simple test.
1# SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries
2# SPDX-License-Identifier: MIT
3
4import time
5import board
6import busio
7from digitalio import DigitalInOut
8import neopixel
9import adafruit_connection_manager
10from adafruit_esp32spi import adafruit_esp32spi
11from adafruit_esp32spi import adafruit_esp32spi_wifimanager
12
13import adafruit_minimqtt.adafruit_minimqtt as MQTT
14from adafruit_gc_iot_core import Cloud_Core, MQTT_API
15
16### WiFi ###
17
18# Get wifi details and more from a secrets.py file
19try:
20 from secrets import secrets
21except ImportError:
22 print("WiFi secrets are kept in secrets.py, please add them there!")
23 raise
24
25# If you are using a board with pre-defined ESP32 Pins:
26esp32_cs = DigitalInOut(board.ESP_CS)
27esp32_ready = DigitalInOut(board.ESP_BUSY)
28esp32_reset = DigitalInOut(board.ESP_RESET)
29
30# If you have an externally connected ESP32:
31# esp32_cs = DigitalInOut(board.D9)
32# esp32_ready = DigitalInOut(board.D10)
33# esp32_reset = DigitalInOut(board.D5)
34
35spi = busio.SPI(board.SCK, board.MOSI, board.MISO)
36esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset)
37"""Use below for Most Boards"""
38status_light = neopixel.NeoPixel(
39 board.NEOPIXEL, 1, brightness=0.2
40) # Uncomment for Most Boards
41"""Uncomment below for ItsyBitsy M4"""
42# status_light = dotstar.DotStar(board.APA102_SCK, board.APA102_MOSI, 1, brightness=0.2)
43# Uncomment below for an externally defined RGB LED
44# import adafruit_rgbled
45# from adafruit_esp32spi import PWMOut
46# RED_LED = PWMOut.PWMOut(esp, 26)
47# GREEN_LED = PWMOut.PWMOut(esp, 27)
48# BLUE_LED = PWMOut.PWMOut(esp, 25)
49# status_light = adafruit_rgbled.RGBLED(RED_LED, BLUE_LED, GREEN_LED)
50wifi = adafruit_esp32spi_wifimanager.ESPSPI_WiFiManager(esp, secrets, status_light)
51
52### Code ###
53
54
55# Define callback methods which are called when events occur
56# pylint: disable=unused-argument, redefined-outer-name
57def connect(client, userdata, flags, rc):
58 # This function will be called when the client is connected
59 # successfully to the broker.
60 print("Connected to MQTT Broker!")
61 print("Flags: {0}\n RC: {1}".format(flags, rc))
62 # Subscribes to commands/# topic
63 google_mqtt.subscribe_to_all_commands()
64
65 # Publish to the default "events" topic
66 google_mqtt.publish("testing", "events", qos=1)
67
68
69def disconnect(client, userdata, rc):
70 # This method is called when the client disconnects
71 # from the broker.
72 print("Disconnected from MQTT Broker!")
73
74
75def subscribe(client, userdata, topic, granted_qos):
76 # This method is called when the client subscribes to a new topic.
77 print("Subscribed to {0} with QOS level {1}".format(topic, granted_qos))
78
79
80def unsubscribe(client, userdata, topic, pid):
81 # This method is called when the client unsubscribes from a topic.
82 print("Unsubscribed from {0} with PID {1}".format(topic, pid))
83
84
85def publish(client, userdata, topic, pid):
86 # This method is called when the client publishes data to a topic.
87 print("Published to {0} with PID {1}".format(topic, pid))
88
89
90def message(client, topic, msg):
91 # This method is called when the client receives data from a topic.
92 print("Message from {}: {}".format(topic, msg))
93
94
95# Connect to WiFi
96print("Connecting to WiFi...")
97wifi.connect()
98print("Connected!")
99
100pool = adafruit_connection_manager.get_radio_socketpool(esp)
101ssl_context = adafruit_connection_manager.get_radio_ssl_context(esp)
102
103# Initialize Google Cloud IoT Core interface
104google_iot = Cloud_Core(esp, secrets)
105
106# Optional JSON-Web-Token (JWT) Generation
107# print("Generating JWT...")
108# jwt = google_iot.generate_jwt()
109# print("Your JWT is: ", jwt)
110
111# Set up a new MiniMQTT Client
112client = MQTT.MQTT(
113 broker=google_iot.broker,
114 username=google_iot.username,
115 password=secrets["jwt"],
116 client_id=google_iot.cid,
117 socket_pool=pool,
118 ssl_context=ssl_context,
119)
120
121# Initialize Google MQTT API Client
122google_mqtt = MQTT_API(client)
123
124# Connect callback handlers to Google MQTT Client
125google_mqtt.on_connect = connect
126google_mqtt.on_disconnect = disconnect
127google_mqtt.on_subscribe = subscribe
128google_mqtt.on_unsubscribe = unsubscribe
129google_mqtt.on_publish = publish
130google_mqtt.on_message = message
131
132print("Attempting to connect to %s" % client.broker)
133google_mqtt.connect()
134
135# Pump the message loop forever, all events are
136# handled in their callback handlers
137# while True:
138# google_mqtt.loop()
139
140# Start a blocking message loop...
141# NOTE: NO code below this loop will execute
142# NOTE: Network reconnection is handled within this loop
143while True:
144 try:
145 google_mqtt.loop()
146 except (ValueError, RuntimeError) as e:
147 print("Failed to get data, retrying\n", e)
148 wifi.reset()
149 google_mqtt.reconnect()
150 continue
151 time.sleep(1)