hzxc 发表于 2023-4-20 14:10:57

物联网项目-智能终端程序

本帖最后由 hzxc 于 2023-9-17 23:37 编辑

室内环境实时监测系统(请勿点击代码末尾“复制代码”,手动选中程序进行复制,到BXY中修改并烧录程序)
#***************WIFI模块内部代码*************
class Obloq(object):
    def read_until(self, min_num_bytes, ending, timeout=10):
      timeout_count = 0
      while not uart.any():
            if timeout_count >= 1000 * timeout:
                return ''
            sleep(1)
            timeout_count += 1
      data = str(uart.read(min_num_bytes), 'UTF-8')
      while timeout_count < 1000 * timeout:
            if type(ending) == str:
                if data.endswith(ending):
                  break
            elif type(ending) == list:
                end = False
                for i in ending:
                  if data.endswith(i):
                        end = True
                        break
                if end:
                  break
            if uart.any():
                new_data = str(uart.read(min_num_bytes), 'UTF-8')
                data += new_data
                timeout_count = 0
            else:
                sleep(1)
                timeout_count += 1
      return data

    def exit_raw_repl(self):
      uart.write('+++')
      data = self.read_until(1, '+++', 1)
      if data == '':
            pass
      elif not data.endswith('+++'):
            return False
      self.in_raw_repl = False
      return True

    error_msg1 = 'wifi module not detected'
    error_msg2 = '...' #disconnect from USB wire & restart haoda:bit
    in_raw_repl = True
    def connectWifi(self, SSID, PASSWORD, TIMEOUT=10000):
      if self.in_raw_repl:
            while not self.exit_raw_repl():
                pass
      uart.write('AT+RST\r\n')
      try:
            data = self.read_until(256, 'ready\r\n', 5)
      except:
            return False
      if data == '':
            display.scroll(self.error_msg1)
            return False
      elif not data.endswith('ready\r\n'):
            display.scroll(self.error_msg2)
            return False
      uart.write('AT+CWMODE_CUR=3\r\n')
      data = self.read_until(1, 'OK\r\n', 1)
      if not data.endswith('OK\r\n'):
            display.scroll(self.error_msg2)
            return False
      #设置Wi-Fi模块连接无线网络
      uart.write('AT+CWJAP_CUR="%s","%s"\r\n' %(SSID, PASSWORD))
      display.scroll('wait...')
      data = self.read_until(1, ['OK\r\n', 'FAIL\r\n'], TIMEOUT / 1000)
      if data.endswith('OK\r\n') and data.find('WIFI GOT I') > -1:
            pass
      elif data.endswith('FAIL\r\n'):
            if data.find('WIFI CONNECTED') > -1:
                display.scroll('dhcp not available / ip conflict')
                return False
            else:
                display.scroll('check wifi ssid / password')
                return False
      else:
            display.scroll(self.error_msg2)
            return False
      return True

    def ifconfig(self, retry=10):
      if self.in_raw_repl:
            return 'cannot get IP in raw mode'
      retry_count = 0
      while retry_count < retry:
            #设置这个后 读串口返回绑定IP
            uart.write('AT+CIFSR\r\n')
            data = self.read_until(64, 'OK\r\n', 2)
            if not data.endswith('OK\r\n'):
                display.scroll(self.error_msg2)
            elif data.find('STAIP') > -1:
                temp = data.split('"')
                if len(temp) > 5:
                  ip_address = temp
                  return ip_address
                retry_count += 1
            else:
                retry_count += 1
      return 'IP Error'

    def httpSet(self, IP, PORT):
      if self.in_raw_repl:
            while not self.exit_raw_repl():
                pass
      while not self.in_raw_repl:
            uart.write('AT+CIPSTART="TCP","%s",%s\r\n' %(IP, PORT))
            data = self.read_until(1, ['CONNECT\r\n\r\nOK\r\n', 'ERROR\r\nCLOSED\r\n'], 6)
            if data.endswith('ERROR\r\nCLOSED\r\n'):
                display.scroll('server offline')
                continue
            elif not data.endswith('CONNECT\r\n\r\nOK\r\n'):
                display.scroll(self.error_msg2)
                continue
            #设置WIFI模块为透传模式
            uart.write('AT+CIPMODE=1\r\n')
            data = self.read_until(1, 'OK\r\n', 0.5)
            if not data.endswith('OK\r\n'):
                display.scroll(self.error_msg2)
                continue
            #在透传模式下开始发送数据
            uart.write('AT+CIPSEND\r\n')
            data = self.read_until(1, 'OK\r\n\r\n>', 3)
            if not data.endswith('OK\r\n\r\n>'):
                display.scroll(self.error_msg2)
                continue
            self.in_raw_repl = True
      return True

    def get(self, url, TIMEOUT):
      uart.write('GET /' + url + '\r\n\r\n')
      #data = self.read_until(1, '', TIMEOUT / 1000)
      timeout_count = 0
      while not uart.any():
            if timeout_count >= TIMEOUT:
                return (404, 'Not Found')
            sleep(1)
            timeout_count += 1
      data = str(uart.read(), 'UTF-8')
      if data == '':
            pass
      elif not data.endswith('ERROR\r\n'):
            return (200, data)
      return (404, 'Not Found')

Obloq = Obloq()
#*************************************************
from microbit import *
#LM35连接在pin2
#设置服务器地址和端口
IP="192.168.1.10"
PORT="8080"
#设置当前环境网络账号和密码
SSID="my_wifi"
PASSWORD="1234"
#设置串口引脚,连接到wifi模块,好搭Bit发送在pin12(接模块R),接收在pin8(接模块T)
uart.init(baudrate=115200, bits=8, parity=None, stop=1, tx=pin12, rx=pin8)

while Obloq.connectWifi(SSID,PASSWORD,10000) != True:
    display.show(".")
#点阵屏显示wifi module not detected,未检测到wifi模块,可能是wifi模块未连接、引脚设置错误或连接线接触不良等原因

display.scroll(Obloq.ifconfig()) #显示连上无线网络后wifi模块获取到的IP
Obloq.httpSet(IP,PORT)
#点阵屏显示server offline,可能是服务器未开启、防火墙阻挡或无线网络需要认证等原因,使用手机尝试打开网页http://(IP):(PORT)

while True:
    errno,resp = Obloq.get("input?id=1&val=" + str("%.1f"%(pin2.read_analog()*3.3/10.24)), 10000)
    if errno == 200:
      display.scroll(resp)
    else:
      display.scroll(str(errno))
    sleep(1000*5)

盆栽伴侣(请勿点击代码末尾“复制代码”,手动选中程序进行复制,到BXY中修改并烧录程序)#***************WIFI模块内部代码*************
class Obloq(object):
    def read_until(self, min_num_bytes, ending, timeout=10):
      timeout_count = 0
      while not uart.any():
            if timeout_count >= 1000 * timeout:
                return ''
            sleep(1)
            timeout_count += 1
      data = str(uart.read(min_num_bytes), 'UTF-8')
      while timeout_count < 1000 * timeout:
            if type(ending) == str:
                if data.endswith(ending):
                  break
            elif type(ending) == list:
                end = False
                for i in ending:
                  if data.endswith(i):
                        end = True
                        break
                if end:
                  break
            if uart.any():
                new_data = str(uart.read(min_num_bytes), 'UTF-8')
                data += new_data
                timeout_count = 0
            else:
                sleep(1)
                timeout_count += 1
      return data

    def exit_raw_repl(self):
      uart.write('+++')
      data = self.read_until(1, '+++', 1)
      if data == '':
            pass
      elif not data.endswith('+++'):
            return False
      self.in_raw_repl = False
      return True

    error_msg1 = 'wifi module not detected'
    error_msg2 = '...' #disconnect from USB wire & restart haoda:bit
    in_raw_repl = True
    def connectWifi(self, SSID, PASSWORD, TIMEOUT=10000):
      if self.in_raw_repl:
            while not self.exit_raw_repl():
                pass
      uart.write('AT+RST\r\n')
      try:
            data = self.read_until(256, 'ready\r\n', 5)
      except:
            return False
      if data == '':
            display.scroll(self.error_msg1)
            return False
      elif not data.endswith('ready\r\n'):
            display.scroll(self.error_msg2)
            return False
      uart.write('AT+CWMODE_CUR=3\r\n')
      data = self.read_until(1, 'OK\r\n', 1)
      if not data.endswith('OK\r\n'):
            display.scroll(self.error_msg2)
            return False
      #设置Wi-Fi模块连接无线网络
      uart.write('AT+CWJAP_CUR="%s","%s"\r\n' %(SSID, PASSWORD))
      display.scroll('wait...')
      data = self.read_until(1, ['OK\r\n', 'FAIL\r\n'], TIMEOUT / 1000)
      if data.endswith('OK\r\n') and data.find('WIFI GOT I') > -1:
            pass
      elif data.endswith('FAIL\r\n'):
            if data.find('WIFI CONNECTED') > -1:
                display.scroll('dhcp not available / ip conflict')
                return False
            else:
                display.scroll('check wifi ssid / password')
                return False
      else:
            display.scroll(self.error_msg2)
            return False
      return True

    def ifconfig(self, retry=10):
      if self.in_raw_repl:
            return 'cannot get IP in raw mode'
      retry_count = 0
      while retry_count < retry:
            #设置这个后 读串口返回绑定IP
            uart.write('AT+CIFSR\r\n')
            data = self.read_until(64, 'OK\r\n', 2)
            if not data.endswith('OK\r\n'):
                display.scroll(self.error_msg2)
            elif data.find('STAIP') > -1:
                temp = data.split('"')
                if len(temp) > 5:
                  ip_address = temp
                  return ip_address
                retry_count += 1
            else:
                retry_count += 1
      return 'IP Error'

    def httpSet(self, IP, PORT):
      if self.in_raw_repl:
            while not self.exit_raw_repl():
                pass
      while not self.in_raw_repl:
            uart.write('AT+CIPSTART="TCP","%s",%s\r\n' %(IP, PORT))
            data = self.read_until(1, ['CONNECT\r\n\r\nOK\r\n', 'ERROR\r\nCLOSED\r\n'], 6)
            if data.endswith('ERROR\r\nCLOSED\r\n'):
                display.scroll('server offline')
                continue
            elif not data.endswith('CONNECT\r\n\r\nOK\r\n'):
                display.scroll(self.error_msg2)
                continue
            #设置WIFI模块为透传模式
            uart.write('AT+CIPMODE=1\r\n')
            data = self.read_until(1, 'OK\r\n', 0.5)
            if not data.endswith('OK\r\n'):
                display.scroll(self.error_msg2)
                continue
            #在透传模式下开始发送数据
            uart.write('AT+CIPSEND\r\n')
            data = self.read_until(1, 'OK\r\n\r\n>', 3)
            if not data.endswith('OK\r\n\r\n>'):
                display.scroll(self.error_msg2)
                continue
            self.in_raw_repl = True
      return True

    def get(self, url, TIMEOUT):
      uart.write('GET /' + url + '\r\n\r\n')
      #data = self.read_until(1, '', TIMEOUT / 1000)
      timeout_count = 0
      while not uart.any():
            if timeout_count >= TIMEOUT:
                return (404, 'Not Found')
            sleep(1)
            timeout_count += 1
      data = str(uart.read(), 'UTF-8')
      if data == '':
            pass
      elif not data.endswith('ERROR\r\n'):
            return (200, data)
      return (404, 'Not Found')

Obloq = Obloq()
#*************************************************
from microbit import *
import dht11
#设置服务器地址和端口
IP="192.168.1.10"
PORT="8080"
#设置当前环境网络账号和密码
SSID="my_wifi"
PASSWORD="1234"
#设置串口引脚,连接到wifi模块,好搭Bit发送在pin12(接模块R),接收在pin8(接模块T)
uart.init(baudrate=115200, bits=8, parity=None, stop=1, tx=pin12, rx=pin8)

while Obloq.connectWifi(SSID,PASSWORD,10000) != True:
    display.show(".")
#点阵屏显示wifi module not detected,未检测到wifi模块,可能是wifi模块未连接、引脚设置错误或连接线接触不良等原因

display.scroll(Obloq.ifconfig()) #显示连上无线网络后wifi模块获取到的IP
Obloq.httpSet(IP,PORT)
#点阵屏显示server offline,可能是服务器未开启、防火墙阻挡或无线网络需要认证等原因,使用手机尝试打开网页http://(IP):(PORT)

watering = 0

while True:
    temp,hum=dht11.read(pin0)
    light=pin1.read_analog()
    moisture=pin2.read_analog()
    errno,resp = Obloq.get("input?id=l&val="+str(temp)+','+str(hum)+','+str(light)+','+str(moisture)+','+str(watering),500)
    if errno == 200:
      if resp=='2':
            pin8.write_digital(0) #继电器低电平打开,高电平关闭
      else:
            pin8.write_digital(1)
      sleep(2000)
    else:
      pin8.write_digital(1)
      display.scroll(str(errno))
    sleep(1000*5)
页: [1]
查看完整版本: 物联网项目-智能终端程序