Files
zjui-ece445/python/fetch.py

122 lines
4.2 KiB
Python

from influxdb import InfluxDBClient
import json
from coord_transform import wgs84_to_gcj02
import numpy as np
class AirQualityDB:
def __init__(self):
self._records = {}
self.client = InfluxDBClient('***REMOVED***', 8086, '***REMOVED***', '***REMOVED***', 'air_quality')
self._get_gps_coordinates()
def _get_gps_coordinates(self):
self.gps_lat = {}
self.gps_lon = {}
result = self.client.query('select last(value) from gps_lat, gps_lon where value != 0 and time > now() - 7d group by id')
for measurement, id in result.keys():
points = list(result.get_points(measurement, id))
value = points[0]['last']
time = points[0]['time']
# Conversion from NMEA to real coordinates
value = (value // 100) + (value % 100) / 60
if measurement == 'gps_lat':
self.gps_lat[id['id']] = value
elif measurement == 'gps_lon':
self.gps_lon[id['id']] = value
self._add_record(id['id'], time, measurement, value)
for id in self.gps_lat:
self.gps_lon[id], self.gps_lat[id] = wgs84_to_gcj02(self.gps_lon[id], self.gps_lat[id])
print(self.gps_lat)
print(self.gps_lon)
return self
def get_enabled_measurements(self):
return [
'bme680_co2',
'bme680_hum',
'bme680_iaq',
'bme680_prs',
'bme680_tmp',
'bme680_tvoc',
'mics_co',
# Disabled MICS_NH3 for inaccurate result
# 'mics_nh3',
'mics_no2',
'pms5003_pm1',
'pms5003_pm2_5',
'pms5003_pm10',
# Disabled since STM32's metrics are useless to users
# 'stm32_tmp',
# 'stm32_vbat',
# 'stm32_vref',
]
def get_results(self, measurement):
if measurement.startswith('pms5003'):
result = self.client.query('select last(value) from ' + measurement + ' where value != 65535 and time > now() - 7d group by id')
else:
result = self.client.query('select last(value) from ' + measurement + ' where value != -1 and time > now() - 7d group by id')
results = []
for _, id in result.keys():
points = list(result.get_points(measurement, id))
value = points[0]['last']
time = points[0]['time']
if id['id'] in self.gps_lat and id['id'] in self.gps_lon:
lat, lon = self.gps_lat[id['id']], self.gps_lon[id['id']]
else:
lat, lon = 0, 0
results.append([measurement, id['id'], time, lat, lon, value])
self._add_record(id['id'], time, measurement, value)
return results
def get_wind(self):
direction = self.client.query('select * from wind_direction')
speed = self.client.query('select * from wind_speed')
return (
[e['value'] for e in direction[('wind_direction', None)]],
[e['value'] for e in speed[('wind_speed', None)]]
)
def _add_record(self, id, time, measurement, value):
# Only allow replacing with newer records
if id not in self._records:
self._records[id] = {}
elif self._records[id]['time'] < time:
self._records[id] = {}
elif self._records[id]['time'] > time:
return self
self._records[id]['id'] = id
self._records[id]['time'] = time
self._records[id][measurement] = value
return self
def get_records(self):
return self._records
def get_geojson(self):
geojson = {
'type': 'FeatureCollection',
'features': []
}
for id in self._records:
if id not in self.gps_lat or id not in self.gps_lon:
continue
geojson['features'].append({
"type": "Feature",
"properties": self._records[id],
"geometry": {
"type": "Point",
"coordinates": [
self.gps_lon[id],
self.gps_lat[id]
]
}
})
return json.dumps(geojson)