物联网项目-智能终端程序
本帖最后由 hzxc 于 2023-9-17 23:37 编辑室内环境实时监测系统(请勿点击代码末尾“复制代码”,手动选中程序进行复制,到BXY中修改并烧录程序)
#***************WIFI模块内部代码*************
class Obloq(object):
def read_until(self, min_num_bytes, ending, timeout=10):
timeout_count = 0
while not uart.any():
if timeout_count >= 1000 * timeout:
return ''
sleep(1)
timeout_count += 1
data = str(uart.read(min_num_bytes), 'UTF-8')
while timeout_count < 1000 * timeout:
if type(ending) == str:
if data.endswith(ending):
break
elif type(ending) == list:
end = False
for i in ending:
if data.endswith(i):
end = True
break
if end:
break
if uart.any():
new_data = str(uart.read(min_num_bytes), 'UTF-8')
data += new_data
timeout_count = 0
else:
sleep(1)
timeout_count += 1
return data
def exit_raw_repl(self):
uart.write('+++')
data = self.read_until(1, '+++', 1)
if data == '':
pass
elif not data.endswith('+++'):
return False
self.in_raw_repl = False
return True
error_msg1 = 'wifi module not detected'
error_msg2 = '...' #disconnect from USB wire & restart haoda:bit
in_raw_repl = True
def connectWifi(self, SSID, PASSWORD, TIMEOUT=10000):
if self.in_raw_repl:
while not self.exit_raw_repl():
pass
uart.write('AT+RST\r\n')
try:
data = self.read_until(256, 'ready\r\n', 5)
except:
return False
if data == '':
display.scroll(self.error_msg1)
return False
elif not data.endswith('ready\r\n'):
display.scroll(self.error_msg2)
return False
uart.write('AT+CWMODE_CUR=3\r\n')
data = self.read_until(1, 'OK\r\n', 1)
if not data.endswith('OK\r\n'):
display.scroll(self.error_msg2)
return False
#设置Wi-Fi模块连接无线网络
uart.write('AT+CWJAP_CUR="%s","%s"\r\n' %(SSID, PASSWORD))
display.scroll('wait...')
data = self.read_until(1, ['OK\r\n', 'FAIL\r\n'], TIMEOUT / 1000)
if data.endswith('OK\r\n') and data.find('WIFI GOT I') > -1:
pass
elif data.endswith('FAIL\r\n'):
if data.find('WIFI CONNECTED') > -1:
display.scroll('dhcp not available / ip conflict')
return False
else:
display.scroll('check wifi ssid / password')
return False
else:
display.scroll(self.error_msg2)
return False
return True
def ifconfig(self, retry=10):
if self.in_raw_repl:
return 'cannot get IP in raw mode'
retry_count = 0
while retry_count < retry:
#设置这个后 读串口返回绑定IP
uart.write('AT+CIFSR\r\n')
data = self.read_until(64, 'OK\r\n', 2)
if not data.endswith('OK\r\n'):
display.scroll(self.error_msg2)
elif data.find('STAIP') > -1:
temp = data.split('"')
if len(temp) > 5:
ip_address = temp
return ip_address
retry_count += 1
else:
retry_count += 1
return 'IP Error'
def httpSet(self, IP, PORT):
if self.in_raw_repl:
while not self.exit_raw_repl():
pass
while not self.in_raw_repl:
uart.write('AT+CIPSTART="TCP","%s",%s\r\n' %(IP, PORT))
data = self.read_until(1, ['CONNECT\r\n\r\nOK\r\n', 'ERROR\r\nCLOSED\r\n'], 6)
if data.endswith('ERROR\r\nCLOSED\r\n'):
display.scroll('server offline')
continue
elif not data.endswith('CONNECT\r\n\r\nOK\r\n'):
display.scroll(self.error_msg2)
continue
#设置WIFI模块为透传模式
uart.write('AT+CIPMODE=1\r\n')
data = self.read_until(1, 'OK\r\n', 0.5)
if not data.endswith('OK\r\n'):
display.scroll(self.error_msg2)
continue
#在透传模式下开始发送数据
uart.write('AT+CIPSEND\r\n')
data = self.read_until(1, 'OK\r\n\r\n>', 3)
if not data.endswith('OK\r\n\r\n>'):
display.scroll(self.error_msg2)
continue
self.in_raw_repl = True
return True
def get(self, url, TIMEOUT):
uart.write('GET /' + url + '\r\n\r\n')
#data = self.read_until(1, '', TIMEOUT / 1000)
timeout_count = 0
while not uart.any():
if timeout_count >= TIMEOUT:
return (404, 'Not Found')
sleep(1)
timeout_count += 1
data = str(uart.read(), 'UTF-8')
if data == '':
pass
elif not data.endswith('ERROR\r\n'):
return (200, data)
return (404, 'Not Found')
Obloq = Obloq()
#*************************************************
from microbit import *
#LM35连接在pin2
#设置服务器地址和端口
IP="192.168.1.10"
PORT="8080"
#设置当前环境网络账号和密码
SSID="my_wifi"
PASSWORD="1234"
#设置串口引脚,连接到wifi模块,好搭Bit发送在pin12(接模块R),接收在pin8(接模块T)
uart.init(baudrate=115200, bits=8, parity=None, stop=1, tx=pin12, rx=pin8)
while Obloq.connectWifi(SSID,PASSWORD,10000) != True:
display.show(".")
#点阵屏显示wifi module not detected,未检测到wifi模块,可能是wifi模块未连接、引脚设置错误或连接线接触不良等原因
display.scroll(Obloq.ifconfig()) #显示连上无线网络后wifi模块获取到的IP
Obloq.httpSet(IP,PORT)
#点阵屏显示server offline,可能是服务器未开启、防火墙阻挡或无线网络需要认证等原因,使用手机尝试打开网页http://(IP):(PORT)
while True:
errno,resp = Obloq.get("input?id=1&val=" + str("%.1f"%(pin2.read_analog()*3.3/10.24)), 10000)
if errno == 200:
display.scroll(resp)
else:
display.scroll(str(errno))
sleep(1000*5)
盆栽伴侣(请勿点击代码末尾“复制代码”,手动选中程序进行复制,到BXY中修改并烧录程序)#***************WIFI模块内部代码*************
class Obloq(object):
def read_until(self, min_num_bytes, ending, timeout=10):
timeout_count = 0
while not uart.any():
if timeout_count >= 1000 * timeout:
return ''
sleep(1)
timeout_count += 1
data = str(uart.read(min_num_bytes), 'UTF-8')
while timeout_count < 1000 * timeout:
if type(ending) == str:
if data.endswith(ending):
break
elif type(ending) == list:
end = False
for i in ending:
if data.endswith(i):
end = True
break
if end:
break
if uart.any():
new_data = str(uart.read(min_num_bytes), 'UTF-8')
data += new_data
timeout_count = 0
else:
sleep(1)
timeout_count += 1
return data
def exit_raw_repl(self):
uart.write('+++')
data = self.read_until(1, '+++', 1)
if data == '':
pass
elif not data.endswith('+++'):
return False
self.in_raw_repl = False
return True
error_msg1 = 'wifi module not detected'
error_msg2 = '...' #disconnect from USB wire & restart haoda:bit
in_raw_repl = True
def connectWifi(self, SSID, PASSWORD, TIMEOUT=10000):
if self.in_raw_repl:
while not self.exit_raw_repl():
pass
uart.write('AT+RST\r\n')
try:
data = self.read_until(256, 'ready\r\n', 5)
except:
return False
if data == '':
display.scroll(self.error_msg1)
return False
elif not data.endswith('ready\r\n'):
display.scroll(self.error_msg2)
return False
uart.write('AT+CWMODE_CUR=3\r\n')
data = self.read_until(1, 'OK\r\n', 1)
if not data.endswith('OK\r\n'):
display.scroll(self.error_msg2)
return False
#设置Wi-Fi模块连接无线网络
uart.write('AT+CWJAP_CUR="%s","%s"\r\n' %(SSID, PASSWORD))
display.scroll('wait...')
data = self.read_until(1, ['OK\r\n', 'FAIL\r\n'], TIMEOUT / 1000)
if data.endswith('OK\r\n') and data.find('WIFI GOT I') > -1:
pass
elif data.endswith('FAIL\r\n'):
if data.find('WIFI CONNECTED') > -1:
display.scroll('dhcp not available / ip conflict')
return False
else:
display.scroll('check wifi ssid / password')
return False
else:
display.scroll(self.error_msg2)
return False
return True
def ifconfig(self, retry=10):
if self.in_raw_repl:
return 'cannot get IP in raw mode'
retry_count = 0
while retry_count < retry:
#设置这个后 读串口返回绑定IP
uart.write('AT+CIFSR\r\n')
data = self.read_until(64, 'OK\r\n', 2)
if not data.endswith('OK\r\n'):
display.scroll(self.error_msg2)
elif data.find('STAIP') > -1:
temp = data.split('"')
if len(temp) > 5:
ip_address = temp
return ip_address
retry_count += 1
else:
retry_count += 1
return 'IP Error'
def httpSet(self, IP, PORT):
if self.in_raw_repl:
while not self.exit_raw_repl():
pass
while not self.in_raw_repl:
uart.write('AT+CIPSTART="TCP","%s",%s\r\n' %(IP, PORT))
data = self.read_until(1, ['CONNECT\r\n\r\nOK\r\n', 'ERROR\r\nCLOSED\r\n'], 6)
if data.endswith('ERROR\r\nCLOSED\r\n'):
display.scroll('server offline')
continue
elif not data.endswith('CONNECT\r\n\r\nOK\r\n'):
display.scroll(self.error_msg2)
continue
#设置WIFI模块为透传模式
uart.write('AT+CIPMODE=1\r\n')
data = self.read_until(1, 'OK\r\n', 0.5)
if not data.endswith('OK\r\n'):
display.scroll(self.error_msg2)
continue
#在透传模式下开始发送数据
uart.write('AT+CIPSEND\r\n')
data = self.read_until(1, 'OK\r\n\r\n>', 3)
if not data.endswith('OK\r\n\r\n>'):
display.scroll(self.error_msg2)
continue
self.in_raw_repl = True
return True
def get(self, url, TIMEOUT):
uart.write('GET /' + url + '\r\n\r\n')
#data = self.read_until(1, '', TIMEOUT / 1000)
timeout_count = 0
while not uart.any():
if timeout_count >= TIMEOUT:
return (404, 'Not Found')
sleep(1)
timeout_count += 1
data = str(uart.read(), 'UTF-8')
if data == '':
pass
elif not data.endswith('ERROR\r\n'):
return (200, data)
return (404, 'Not Found')
Obloq = Obloq()
#*************************************************
from microbit import *
import dht11
#设置服务器地址和端口
IP="192.168.1.10"
PORT="8080"
#设置当前环境网络账号和密码
SSID="my_wifi"
PASSWORD="1234"
#设置串口引脚,连接到wifi模块,好搭Bit发送在pin12(接模块R),接收在pin8(接模块T)
uart.init(baudrate=115200, bits=8, parity=None, stop=1, tx=pin12, rx=pin8)
while Obloq.connectWifi(SSID,PASSWORD,10000) != True:
display.show(".")
#点阵屏显示wifi module not detected,未检测到wifi模块,可能是wifi模块未连接、引脚设置错误或连接线接触不良等原因
display.scroll(Obloq.ifconfig()) #显示连上无线网络后wifi模块获取到的IP
Obloq.httpSet(IP,PORT)
#点阵屏显示server offline,可能是服务器未开启、防火墙阻挡或无线网络需要认证等原因,使用手机尝试打开网页http://(IP):(PORT)
watering = 0
while True:
temp,hum=dht11.read(pin0)
light=pin1.read_analog()
moisture=pin2.read_analog()
errno,resp = Obloq.get("input?id=l&val="+str(temp)+','+str(hum)+','+str(light)+','+str(moisture)+','+str(watering),500)
if errno == 200:
if resp=='2':
pin8.write_digital(0) #继电器低电平打开,高电平关闭
else:
pin8.write_digital(1)
sleep(2000)
else:
pin8.write_digital(1)
display.scroll(str(errno))
sleep(1000*5)
页:
[1]