-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathgetHT.py
More file actions
111 lines (101 loc) · 3.73 KB
/
getHT.py
File metadata and controls
111 lines (101 loc) · 3.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import Adafruit_DHT
import csv
import datetime
import math
import requests
import sys
import time
'''
function: write [datetime, temperature, humidity] in csv
ipt: dhtwriter, temperature, humidity
opt: none
'''
def WriteInCSV(dhtwriter, temperature, humidity):
dhtwriter.writerow([datetime.datetime.now().strftime('%y %m %d %H-%M-%S'),
'{:0.1f}'.format(temperature),
'{:0.1f}'.format(humidity)])
# set Ubidots uploading variables
TOKEN = "<ubidots-token>" # Put your TOKEN here
DEVICE_LABEL = "Raspi_DHT11" # Put your device label here
VARIABLE_LABEL_1 = "temperature" # Put your first variable label here
VARIABLE_LABEL_2 = "humidity" # Put your second variable label here
'''
function: build payload(the essencial datas need upload to ubidots)
ipt: VARIABLE_LABEL_1, VARIABLE_LABEL_2, temperature, humidity
opt: payload object in dict
'''
def build_payload(variable_1, variable_2, var_temperature, var_humidity):
payload = {variable_1: var_temperature,
variable_2: var_humidity}
return payload
'''
function: exec post_request (to ubidots)
ipt: payload
opt: none
'''
def post_request(payload):
# Creates the headers for the HTTP requests
url = "http://industrial.api.ubidots.com"
url = "{}/api/v1.6/devices/{}".format(url, DEVICE_LABEL)
headers = {"X-Auth-Token": TOKEN, "Content-Type": "application/json"}
# Makes the HTTP requests
status = 400
attempts = 0
while status >= 400 and attempts <= 5:
req = requests.post(url=url, headers=headers, json=payload)
status = req.status_code
attempts += 1
time.sleep(1)
# Processes results
if status >= 400:
print("[ERROR] Could not send data after 5 attempts, please check"
"your token credentials and internet connection.")
return False
print("[INFO] request made properly, your device is updated.")
return True
# set Adafruit_DHT's varibles
sensor = Adafruit_DHT.DHT11
pin = 4
'''
Main Execting codes
'''
# get sencor data from DHT11
try:
print('[INFO]press Ctrl-C to abort the process.')
while True:
# Get sensor data. Use the read_retry method which will retry up
# to 15 times to get a sensor reading (waiting 2 seconds between each retry).
humidity, temperature = Adafruit_DHT.read_retry(sensor, pin)
# Check sensor detecting correctly
if humidity is not None and temperature is not None:
print('[SENSOR]Temperature={0:0.1f}*C Humidity={1:0.1f}%'.format(temperature, humidity))
# Record in CSV
flag = True
while flag:
try:
# write in csv with append mode
dhtfile = open('dht11_record.csv', 'a')
# create a csv writer object
dhtwriter = csv.writer(dhtfile, dialect='excel')
# function: write [datetime, temperature, humidity] in csv
WriteInCSV(dhtwriter, temperature, humidity)
flag = False
except:
print("[ERROR]CSV.write failed. Try again!")
flag = True
finally:
dhtfile.close()
# function: build ubidots payload
payload = build_payload(
VARIABLE_LABEL_1, VARIABLE_LABEL_2, temperature, humidity)
# posting data to ubidots cloud
print("[INFO] Attemping to send data.")
# function: exec post_request
post_request(payload)
print("[INFO] Post finished.")
else:
print('[ERROR]DHT.read failed. Try again!')
continue
time.sleep(3600)
except KeyboardInterrupt:
print('[INFO]Abort!')