簡易物聮網系統實作(1)
-
本單元重點在於引導學生如何將感測器的資料透過Python程式碼送出到教師架設的客製化網站,完成資料的傳輸與儲存。
import time import Adafruit_DHT GPIO_PIN = 4 try: print("--------------") while True: h, t = Adafruit_DHT.read_retry(Adafruit_DHT.DHT11,GPIO_PIN) if h is not None and t is not None: print("temp={0:0.1f}".format(t)) else: print("fail,please retry") time.sleep(10) except KeyboarInterrupt: print('close')
import requests import time import uuid import Adafruit_DHT LOGIN_URL = 'http://drweb.nksh.tp.edu.tw:3000/create/' headers = { 'accept': 'text/html,application/xhtml+xml,application/xml', 'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36' } response = requests.get(LOGIN_URL, headers=headers, verify=False) headers['cookie'] = '; '.join([x.name + '=' + x.value for x in response.cookies]) headers['content-type'] = 'application/x-www-form-urlencoded' GPIO_PIN = 4 try: print("--------------") while True: h, t = Adafruit_DHT.read_retry(Adafruit_DHT.DHT11,GPIO_PIN) if h is not None and t is not None: print("temp={0:0.1f}".format(t)) payload = { 'value1': 0, 'value2': format(t), 'value3': 0, 'mac_address': uuid.UUID(int = uuid.getnode()).hex[-12:] } else: print("fail,please retry") response = requests.post(LOGIN_URL, data=payload, headers=headers, verify=False) headers['cookie'] = '; '.join([x.name + '=' + x.value for x in response.cookies]) time.sleep(60) except KeyboardInterrupt: print('close')
import os import csv import time import pigpio import httplib import urllib import logging import commands from datetime import datetime import requests def bytes2hex(s): return "".join("{:02x}".format(c) for c in s) def decode_data(dstr): value = [-1, -1, -1, -1, -1] ## style of standard data ## standard_head = "424d001c" ## length of standard data ## length = 64 ## index of reading ## index = dstr.find(standard_head) if(index == -1 or len(dstr) < length): ## sense failed (missing data) ## return value, -1 else: ## sense successfully ## data_slice = dstr[index : index + length] ## pm1 ## value[0] = (int(data_slice[20] + data_slice[21] + data_slice[22] + data_slice[23], 16)) ## pm2.5 ## value[1] = (int(data_slice[24] + data_slice[25] + data_slice[26] + data_slice[27], 16)) ## pm10 ## value[2] = (int(data_slice[28] + data_slice[29] + data_slice[30] + data_slice[31], 16)) ## temperature ## value[3] = (int(data_slice[48] + data_slice[49] + data_slice[50] + data_slice[51], 16) / 10) ## humidity ## value[4] = (int(data_slice[52] + data_slice[53] + data_slice[54] + data_slice[55], 16) / 10) return value, 1 if __name__ == '__main__': ########## Logger Part ########## ## set debug log file ## logging.basicConfig(filename="/home/pi/PiM25/debug.log", format='%(asctime)s - %(levelname)s - %(message)s', filemode='w', datefmt='%Y-%m-%d %H:%M:%S') ## creating an logging object ## logger = logging.getLogger() ## setting the threshold of logger to DEBUG ## logger.setLevel(logging.DEBUG) ############################## ########## PIGPIO Part ########## ## initial PIGPIO library ## (status, process) = commands.getstatusoutput('sudo pidof pigpiod') if status: logger.warning("PIGPIO was not running") commands.getstatusoutput('sudo pigpiod') time.sleep(0.1) ## PIGPIO start to run ## if not status: logger.info("PIGPIO is running, process ID is %s", process) pi = pigpio.pi() logger.info("Start to sense environment") ############################## ########## Setting Part ########## API_KEY = "L1YL098LWTXB3K0G" G5T_GPIO = 23 DATA_PATH = "/home/pi/Data/" DEVICE_IP = commands.getoutput("hostname -I") DEVICE_ID = open('/sys/class/net/eth0/address').readline().upper().strip().replace(':','') counter = 0 while(counter==0): READING_TIME = datetime.now().strftime("%Y-%m-%d %H:%M:%S").split(" ") ## sensor information ## sensor_info = { "PM1" : -1, \ "PM25" : -1, \ "PM10" : -1, \ "TEMP" : -1, \ "HUM" : -1, \ "DATE" : READING_TIME[0], \ "TIME" : READING_TIME[1], \ "DEVICE_ID" : DEVICE_ID, \ "DEVICE_IP" : DEVICE_IP } logger.info("Reading time: %s %s", READING_TIME[0], READING_TIME[1]) logger.info("==================================") ############################## ########## G5T Part ########## try: pi.bb_serial_read_close(G5T_GPIO) except Exception: pass logger.info("G5T port has already closed") try: pi.bb_serial_read_open(G5T_GPIO, 9600) time.sleep(1) (status, G5T_raw_data) = pi.bb_serial_read(G5T_GPIO) if status: logger.info("G5T read data successfully") hex_data = bytes2hex(G5T_raw_data) (value, check) = decode_data(hex_data) if check: logger.info("@@ PM1.0 DATA: %s", str(value[0])) logger.info("@@ PM2.5 DATA: %s", str(value[1])) logger.info("@@ PM10 DATA: %s", str(value[2])) logger.info("@@ TEMP DATA: %s", str(value[3])) logger.info("@@ HUM DATA: %s", str(value[4])) sensor_info["PM1"] = value[0] sensor_info["PM25"] = value[1] ######################################################## url = 'http://200.1.1.22:8080/create/' d = {'value': sensor_info["PM25"]} r = requests.post(url, data=d) print(sensor_info["PM25"]) ######################################################## sensor_info["PM10"] = value[2] sensor_info["TEMP"] = value[3] sensor_info["HUM"] = value[4] else: logger.error("G5T occur missing data") else: logger.error("G5T read data failed") except Exception as e: logger.error("G5T port open failed, error msg: %s", e) try: pi.bb_serial_read_close(G5T_GPIO) except Exception: pass logger.info("G5T port close successfully") logger.info("==================================") ############################## ########## Store MSG ############# sensor_fields = sensor_info.keys() if os.path.exists(DATA_PATH + "record.csv") is False: logger.info("Write a new record file") with open(DATA_PATH + "record.csv", "a") as output_file: dict_writer = csv.DictWriter(output_file, sensor_fields) dict_writer.writeheader() with open(DATA_PATH + "record.csv", "a") as output_file: try: dict_writer = csv.DictWriter(output_file, sensor_fields) dict_writer.writerows([sensor_info]) logger.info("Storing data successfully") except Exception as e: logger.error("Storing data failed, error msg: %s", e) logger.info("==================================") ################################### ########## Upload Part ############ #params = urllib.urlencode({"field1": sensor_info["PM1"], # "field2": sensor_info["PM25"], # "field3": sensor_info["PM10"], # "field4": sensor_info["TEMP"], # "field5": sensor_info["HUM"], # "key": API_KEY}) #headers = {"Content-typZZe": "application/x-www-form-urlencoded","Accept": "text/plain"} #conn = httplib.HTTPConnection("api.thingspeak.com:80") #logger.info("Upload to ThinkSpeak, key: %s", API_KEY) #try: # conn.request("POST", "/update", params, headers) # response = conn.getresponse() # logger.info("Upload response: %s, %s", response.status, response.reason) # conn.close() #except: # logger.error("Connection Failed") #logger.info("==================================") ################################### #logger.info("Sense Over") #time.sleep(30) counter += 1 pi.stop()