簡易空氣盒子系統實作
-
import os import csv import time import pigpio import httplib import urllib import logging import commands from datetime import datetime def bytes2hex(s): return "".join("{:02x}".format(c) for c in s) def decode_data(dstr): value = [-1, -1, -1, -1, -1] ## style of standard data ## standard_head = "424d001c" ## length of standard data ## length = 64 ## index of reading ## index = dstr.find(standard_head) if(index == -1 or len(dstr) < length): ## sense failed (missing data) ## return value, -1 else: ## sense successfully ## data_slice = dstr[index : index + length] ## pm1 ## value[0] = (int(data_slice[20] + data_slice[21] + data_slice[22] + data_slice[23], 16)) ## pm2.5 ## value[1] = (int(data_slice[24] + data_slice[25] + data_slice[26] + data_slice[27], 16)) ## pm10 ## value[2] = (int(data_slice[28] + data_slice[29] + data_slice[30] + data_slice[31], 16)) ## temperature ## value[3] = (int(data_slice[48] + data_slice[49] + data_slice[50] + data_slice[51], 16) / 10) ## humidity ## value[4] = (int(data_slice[52] + data_slice[53] + data_slice[54] + data_slice[55], 16) / 10) return value, 1 if __name__ == '__main__': ########## Logger Part ########## ## set debug log file ## logging.basicConfig(filename="/home/pi/PiM25/debug.log", format='%(asctime)s - %(levelname)s - %(message)s', filemode='w', datefmt='%Y-%m-%d %H:%M:%S') ## creating an logging object ## logger = logging.getLogger() ## setting the threshold of logger to DEBUG ## logger.setLevel(logging.DEBUG) ############################## ########## PIGPIO Part ########## ## initial PIGPIO library ## (status, process) = commands.getstatusoutput('sudo pidof pigpiod') if status: logger.warning("PIGPIO was not running") commands.getstatusoutput('sudo pigpiod') time.sleep(0.1) ## PIGPIO start to run ## if not status: logger.info("PIGPIO is running, process ID is %s", process) pi = pigpio.pi() logger.info("Start to sense environment") ############################## ########## Setting Part ########## API_KEY = "L1YL098LWTXB3K0G" G5T_GPIO = 23 DATA_PATH = "/home/pi/Data/" DEVICE_IP = commands.getoutput("hostname -I") DEVICE_ID = open('/sys/class/net/eth0/address').readline().upper().strip().replace(':','') while(1): READING_TIME = datetime.now().strftime("%Y-%m-%d %H:%M:%S").split(" ") ## sensor information ## sensor_info = { "PM1" : -1, \ "PM25" : -1, \ "PM10" : -1, \ "TEMP" : -1, \ "HUM" : -1, \ "DATE" : READING_TIME[0], \ "TIME" : READING_TIME[1], \ "DEVICE_ID" : DEVICE_ID, \ "DEVICE_IP" : DEVICE_IP } logger.info("Reading time: %s %s", READING_TIME[0], READING_TIME[1]) logger.info("==================================") ############################## ########## G5T Part ########## try: pi.bb_serial_read_close(G5T_GPIO) except Exception: pass logger.info("G5T port has already closed") try: pi.bb_serial_read_open(G5T_GPIO, 9600) time.sleep(1) (status, G5T_raw_data) = pi.bb_serial_read(G5T_GPIO) if status: logger.info("G5T read data successfully") hex_data = bytes2hex(G5T_raw_data) (value, check) = decode_data(hex_data) if check: logger.info("@@ PM1.0 DATA: %s", str(value[0])) logger.info("@@ PM2.5 DATA: %s", str(value[1])) logger.info("@@ PM10 DATA: %s", str(value[2])) logger.info("@@ TEMP DATA: %s", str(value[3])) logger.info("@@ HUM DATA: %s", str(value[4])) sensor_info["PM1"] = value[0] sensor_info["PM25"] = value[1] sensor_info["PM10"] = value[2] sensor_info["TEMP"] = value[3] sensor_info["HUM"] = value[4] else: logger.error("G5T occur missing data") else: logger.error("G5T read data failed") except Exception as e: logger.error("G5T port open failed, error msg: %s", e) try: pi.bb_serial_read_close(G5T_GPIO) except Exception: pass logger.info("G5T port close successfully") logger.info("==================================") ############################## ########## Store MSG ############# sensor_fields = sensor_info.keys() if os.path.exists(DATA_PATH + "record.csv") is False: logger.info("Write a new record file") with open(DATA_PATH + "record.csv", "a") as output_file: dict_writer = csv.DictWriter(output_file, sensor_fields) dict_writer.writeheader() with open(DATA_PATH + "record.csv", "a") as output_file: try: dict_writer = csv.DictWriter(output_file, sensor_fields) dict_writer.writerows([sensor_info]) logger.info("Storing data successfully") except Exception as e: logger.error("Storing data failed, error msg: %s", e) logger.info("==================================") ################################### ########## Upload Part ############ params = urllib.urlencode({"field1": sensor_info["PM1"], "field2": sensor_info["PM25"], "field3": sensor_info["PM10"], "field4": sensor_info["TEMP"], "field5": sensor_info["HUM"], "key": API_KEY}) headers = {"Content-typZZe": "application/x-www-form-urlencoded","Accept": "text/plain"} conn = httplib.HTTPConnection("api.thingspeak.com:80")ˇ logger.info("Upload to ThinkSpeak, key: %s", API_KEY) try: conn.request("POST", "/update", params, headers) response = conn.getresponse() logger.info("Upload response: %s, %s", response.status, response.reason) conn.close() except: logger.error("Connection Failed") logger.info("==================================") ################################### logger.info("Sense Over") time.sleep(30) pi.stop()
-
如何使用感測器測量溫度?
-
如何使用網站收集資料?
2345
-
如何使用感測器測量PM2.5值?
1234
-
如何從網站擷取資料判斷是否超標?
1234
-
如何使用感測器測量溫度?