from flask import Flask, request, jsonify import subprocess import csv import pymysql import mysql.connector import requests from datetime import datetime, timedelta import math import json import os import codecs import quopri from werkzeug.utils import secure_filename from email.header import decode_header app = Flask(__name__) SQServerHost = "http://111.4.141.191:18081" @app.after_request def add_header(response): response.headers['Access-Control-Allow-Origin'] = '*' response.headers['Access-Control-Allow-Headers'] = 'Content-Type' return response # 定义数据模型类 class SensorData: def __init__(self, sample_time, recv_time, node_name, node_value, node_type, node_unit, device_id, channel_id, project_id): self.sample_time = sample_time self.recv_time = recv_time self.node_name = node_name self.node_value = node_value self.node_type = node_type self.node_unit = node_unit self.device_id = device_id self.channel_id = channel_id self.project_id = project_id def __repr__(self): # 为了方便打印对象列表,定义一个简单的字符串表示方法 return f"{self.sample_time} - {self.node_name}: {self.node_value}" def GetStartEndTime(deltaHour = 6): # 获取当前时间 current_time = datetime.now() # 取整到最近的小时,即当前时间的小时数不变,分钟和秒数归零 rounded_time = datetime(current_time.year, current_time.month, current_time.day, current_time.hour, 0, 0) # 往前推6个小时 previous_time = rounded_time - timedelta(hours=deltaHour) # 输出两个时间 return rounded_time.strftime('%Y-%m-%d %H:%M:%S'), previous_time.strftime('%Y-%m-%d %H:%M:%S') #return "2024-07-13 16:00:00", "2024-07-10 16:00:00" @app.route('/sync_from_access', methods=['POST']) def sync_from_access(): sensor_data_list = [] mdb_file = 'test.mdb' table_name = 'AcsData' output_file = 'output.csv' # 构建命令,确保提供可执行文件的完整路径 cmd = ['mdb-export', mdb_file, table_name] # 执行命令并捕获输出 try: result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, check=True) # 将结果写入输出文件 with open(output_file, 'w', encoding='utf-8') as f: f.write(result.stdout) except subprocess.CalledProcessError as e: print(f"An error occurred: {e}") # 可以写入错误信息到文件或进行其他错误处理 with open(output_file, 'w', encoding='utf-8') as f: f.write(e.stderr.decode('utf-8')) # 将stderr从字节解码为字符串 # 建立MySQL连接 conn = mysql.connector.connect( host="localhost", user="root", password="HelloWorld123", database="water" ) # 设置MySQL连接的字符集 conn.set_charset_collation('utf8') # 定义CSV文件和表格名称 csv_file = 'output.csv' table_name = 'sensor_data' try: # 打开CSV文件进行读取 with open(csv_file, mode='r', encoding='utf-8') as file: # 创建CSV阅读器 reader = csv.reader(file) # 跳过第一行,因为它是标题行 next(reader) # 创建游标对象 cursor = conn.cursor() index = 0 # 逐行插入数据 for row in reader: print(index) index += 1 # 将日期时间字符串转换为MySQL可接受的格式 SampleTime = datetime.strptime(row[0], '%m/%d/%y %H:%M:%S').strftime('%Y-%m-%d %H:%M:%S') RecvTime = datetime.strptime(row[1], '%m/%d/%y %H:%M:%S').strftime('%Y-%m-%d %H:%M:%S') NodeName = row[2] NodeValue = float(row[3]) if math.isnan(NodeValue): NodeValue = 0.0 NodeType = row[4] NodeUnit = row[5] DeviceID = row[6] ChannelID = row[7] ProjectID = 0 sensor_data_list.append(SensorData(SampleTime,RecvTime,NodeName,NodeValue,NodeType,NodeUnit,DeviceID,ChannelID,ProjectID)) # 对列表进行排序,根据sample_time倒序排序 sensor_data_list.sort(key=lambda x: x.recv_time, reverse=True) index = 0 sensor_data_list_true = [] for sd in sensor_data_list: index += 1 sensor_data_list_true.append(sd) if index > 100: break for sd in sensor_data_list_true: # 检查数据库中是否已存在相同的记录 check_sql = "SELECT COUNT(*) FROM {0} WHERE RecvTime=%s AND DeviceID=%s AND ChannelID=%s".format(table_name) cursor.execute(check_sql, (sd.recv_time, sd.device_id,sd.channel_id)) exists = cursor.fetchone()[0] print(exists) # 如果数据库中不存在相同的记录,则插入数据 if exists == 0: # 准备SQL语句,使用参数化查询 sql = "INSERT INTO {0} (SampleTime, RecvTime, NodeName, NodeValue, NodeType, NodeUnit, DeviceID, ChannelID, ProjectID) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)".format(table_name) print(sql) # 执行插入操作,使用元组传递参数确保字符编码正确 cursor.execute(sql, (sd.sample_time, sd.recv_time, sd.node_name, sd.node_value, sd.node_type, sd.node_unit, sd.device_id, sd.channel_id, sd.project_id)) # 提交事务 conn.commit() except mysql.connector.Error as e: return jsonify({"error": str(e)}), 500 finally: if cursor: cursor.close() if conn: conn.close() return jsonify({"message": "Data synced successfully"}), 200 @app.route('/get_last_data', methods=['POST']) def get_last_data(): # 解析 JSON 请求数据 data = request.get_json() ChannelID = data.get('ChannelID') DeviceID = data.get('DeviceID') n = data.get('N', 1) # 默认为1,如果请求中没有提供n,则取默认值 if not ChannelID or not DeviceID: return jsonify({"error": "Missing ChannelID or DeviceID"}), 400 # 建立MySQL连接 conn = mysql.connector.connect( host="localhost", user="root", password="HelloWorld123", database="water" ) # 设置MySQL连接的字符集 conn.set_charset_collation('utf8') try: cursor = conn.cursor() # 编写 SQL 查询,以获取匹配 ChannelID 和 DeviceID 的最近的 n 条记录的全部数据 query = """ WITH Stats AS ( SELECT MAX(NodeValue) AS MaxNodeValue, AVG(NodeValue) AS AvgNodeValue FROM sensor_data WHERE ChannelID=%s AND DeviceID=%s ) SELECT sd.*, s.MaxNodeValue, s.AvgNodeValue FROM sensor_data sd CROSS JOIN Stats s WHERE sd.ChannelID=%s AND sd.DeviceID=%s ORDER BY sd.RecvTime DESC LIMIT %s; """ cursor.execute(query, (ChannelID,DeviceID,ChannelID, DeviceID, n)) results = cursor.fetchall() if results: # 将查询结果转换为列表的字典 rows = [dict(zip([column[0] for column in cursor.description], row)) for row in results] jsonData = {} jsonData["datas"] = rows return jsonify(jsonData) else: return jsonify({"message": "No data found"}), 404 except mysql.connector.Error as e: return jsonify({"error": str(e)}), 500 finally: # 关闭游标和数据库连接 if cursor: cursor.close() if conn: conn.close() @app.route('/get_data_by_avg', methods=['POST']) def get_data_by_avg(): data = request.get_json() ChannelID = data.get('ChannelID') DeviceID = data.get('DeviceID') AvgType = data.get('AvgType') N = data.get('N', 6) # 默认为6小时,如果请求中没有提供hour,则取默认值 if not ChannelID or not DeviceID or not AvgType: return jsonify({"error": "Missing ChannelID or DeviceID or AvgType"}), 400 # 建立MySQL连接 conn = mysql.connector.connect( host="localhost", user="root", password="HelloWorld123", database="water" ) # 设置MySQL连接的字符集 conn.set_charset_collation('utf8') try: cursor = conn.cursor() # 编写 SQL 查询,以获取匹配 ChannelID 和 DeviceID 的最近的 n 条记录的全部数据 query = """ SELECT * FROM sensor_data WHERE ChannelID=%s AND DeviceID=%s ORDER BY RecvTime DESC LIMIT %s """ if AvgType == "hour": cursor.execute(query, (ChannelID, DeviceID, N * 3)) elif AvgType == "day": cursor.execute(query, (ChannelID, DeviceID, N * 3 * 24)) elif AvgType == "month": cursor.execute(query, (ChannelID, DeviceID, N * 3 * 24 * 7 * 31)) else: return jsonify({"message": "AvgType type error!"}), 404 results = cursor.fetchall() if not results: return jsonify({"message": "No data found"}), 404 rows = [dict(zip([column[0] for column in cursor.description], row)) for row in results] # 按小时分组并计算每个小时的平均值 avg_data = {} for data in rows: if AvgType == "hour": n_key = data["SampleTime"].strftime('%Y-%m-%d %H:00:00') elif AvgType == "day": n_key = data["SampleTime"].strftime('%Y-%m-%d') elif AvgType == "month": n_key = data["SampleTime"].strftime('%Y-%m') if n_key not in avg_data: avg_data[n_key] = {'total_value': 0, 'count': 0} avg_data[n_key]['total_value'] += float(data["NodeValue"]) avg_data[n_key]['count'] += 1 avg_data[n_key]['ChannelID'] = int(data["ChannelID"]) avg_data[n_key]['DeviceID'] = int(data["DeviceID"]) avg_data[n_key]['NodeName'] = str(data["NodeName"]) avg_data[n_key]['NodeType'] = str(data["NodeType"]) avg_data[n_key]['NodeUnit'] = str(data["NodeUnit"]) avg_data[n_key]['ProjectID'] = int(data["ProjectID"]) # 构建包含每个小时平均值的列表 average_values = [] for n_key in reversed(sorted(avg_data.keys())): average_values.append({ 'SampleTime': n_key, 'RecvTime': n_key, "ProjectID" : avg_data[n_key]['ProjectID'], "NodeUnit" : avg_data[n_key]['NodeUnit'], "NodeName" : avg_data[n_key]['NodeName'], "NodeType" : avg_data[n_key]['NodeType'], "ChannelID" : avg_data[n_key]['ChannelID'], "DeviceID" : avg_data[n_key]['DeviceID'], 'NodeValue': avg_data[n_key]['total_value'] / avg_data[n_key]['count'] }) if len(average_values) >= N: break # 检查平均值列表长度是否等于hour,如果不足,用None填充 average_values += [None] * (N - len(average_values)) jsonData = {"datas": average_values} return jsonify(jsonData) except mysql.connector.Error as e: return jsonify({"error": str(e)}), 500 finally: if cursor: cursor.close() if conn: conn.close() @app.route('/get_last_datas', methods=['POST']) def get_last_datas(): # 解析 JSON 请求数据 data = request.get_json() ids_list = data.get('idsList') if not ids_list or not isinstance(ids_list, list): return jsonify({"error": "Missing or invalid idsList parameter"}), 400 # 存储结果的列表 results = [] # 建立MySQL连接 conn = mysql.connector.connect( host="localhost", user="root", password="HelloWorld123", database="water" ) if isinstance(conn, str): return jsonify({"error": conn}), 500 try: cursor = conn.cursor() # 创建一个临时表来存储每个 (ChannelID, DeviceID) 的最大 id ids_temp_table_query = """ CREATE TEMPORARY TABLE ids_temp AS SELECT MAX(id) as max_id FROM sensor_data WHERE (ChannelID, DeviceID) IN (%s) GROUP BY ChannelID, DeviceID """ # 构建 (ChannelID, DeviceID) 的值 channel_device_pairs = [(item['ChannelID'], item['DeviceID']) for item in ids_list if 'ChannelID' in item and 'DeviceID' in item] channel_device_pairs_str = ', '.join([f"({channel_id}, {device_id})" for channel_id, device_id in channel_device_pairs]) # 执行创建临时表的查询 cursor.execute(ids_temp_table_query % channel_device_pairs_str) # 使用临时表中的最大 id 来获取对应的完整记录 final_query = """ SELECT sd.* FROM sensor_data sd JOIN ids_temp it ON sd.id = it.max_id """ # 执行最终的查询 cursor.execute(final_query) results = cursor.fetchall() # 将查询结果转换为字典列表 result_dicts = [dict(zip([column[0] for column in cursor.description], result)) for result in results] return jsonify(result_dicts) except mysql.connector.Error as e: return jsonify({"error": str(e)}), 500 finally: # 关闭游标和数据库连接 if cursor: cursor.close() if conn: conn.close() ######################################一张图接口####################################### @app.route('/get-water-management-projects', methods=['GET']) def get_water_management_projects(): # 建立MySQL连接 conn = mysql.connector.connect( host="localhost", user="root", password="HelloWorld123", database="water" ) # 设置MySQL连接的字符集 conn.set_charset_collation('utf8') try: cursor = conn.cursor() # 查询WaterManagementProjects表中的所有数据 query = """ SELECT * FROM WaterManagementProjects """ cursor.execute(query) projects = cursor.fetchall() # 将查询结果转换为字典列表 projects_json = [] for project in projects: project_json = { 'id': project[0], # 项目ID 'special': bool(project[1]), # 将tinyint转换为布尔值 'type': project[2], # 项目类型 'longitude': project[3], # 经度 'latitude': project[4], # 纬度 'name': project[5], # 项目名称 'name_pri': project[6], # 项目主要名称 'text1': project[7], # 文本1 'text2': project[8] # 文本2 } projects_json.append(project_json) # 返回JSON响应,包含msg, code和data response = { "msg": None, "code": 200, "data": projects_json } return jsonify(response), 200 except mysql.connector.Error as e: # 捕获MySQL错误并返回错误信息和状态码 500 response = { "msg": str(e), "code": 500, "data": [] } return jsonify(response), 500 finally: # 关闭数据库连接 conn.close() @app.route('/get-hot-point-data', methods=['GET']) def get_hot_point_data(): # 建立MySQL连接 conn = mysql.connector.connect( host="localhost", user="root", password="HelloWorld123", database="water" ) # 设置MySQL连接的字符集 conn.set_charset_collation('utf8') try: cursor = conn.cursor() # 查询Move_Plans表中的所有数据 query = """ SELECT * FROM WaterHotPoints """ cursor.execute(query) hotPoints = cursor.fetchall() # 将查询结果转换为字典列表 hps_json = [] for hp in hotPoints: hp_json = { 'type': hp[1], 'longitude': hp[2], 'latitude': hp[3], 'name': hp[4], 'name_pri': hp[5] } hps_json.append(hp_json) # 返回JSON响应,包含msg, code和data response = { "msg": None, "code": 200, "data": hps_json } return jsonify(response), 200 except mysql.connector.Error as e: # 捕获MySQL错误并返回错误信息和状态码 500 response = { "msg": str(e), "code": 500, "data": [] } return jsonify(response), 500 finally: # 关闭数据库连接 conn.close() @app.route('/get-project-milestones', methods=['GET']) def get_project_milestones(): # 建立MySQL连接 conn = mysql.connector.connect( host="localhost", user="root", password="HelloWorld123", database="water" ) # 设置MySQL连接的字符集 conn.set_charset_collation('utf8') try: cursor = conn.cursor() # 查询Project_milestones表中的所有数据 query = """ SELECT * FROM Project_milestones """ cursor.execute(query) projects = cursor.fetchall() # 将查询结果转换为字典列表 projects_json = [] for project in projects: project_json = { 'id': project[0], # 项目ID 'year': project[1], # 年份 'date': project[2], # 日期 'title': project[3], # 标题 'pos': project[4], # 位置 'content': project[5], # 内容 } projects_json.append(project_json) # 返回JSON响应,包含msg, code和data response = { "msg": None, "code": 200, "data": projects_json } return jsonify(response), 200 except mysql.connector.Error as e: # 捕获MySQL错误并返回错误信息和状态码 500 response = { "msg": str(e), "code": 500, "data": [] } return jsonify(response), 500 finally: # 关闭数据库连接 conn.close() @app.route('/get-move-plans', methods=['GET']) def get_move_plans(): # 建立MySQL连接 conn = mysql.connector.connect( host="localhost", user="root", password="HelloWorld123", database="water" ) # 设置MySQL连接的字符集 conn.set_charset_collation('utf8') try: cursor = conn.cursor() # 查询Move_Plans表中的所有数据 query = """ SELECT * FROM Move_Plans """ cursor.execute(query) plans = cursor.fetchall() # 将查询结果转换为字典列表 plans_json = [] for plan in plans: plan_json = { 'id': plan[0], # 自增主键ID 'isAfter': bool(plan[1]), # 将tinyint转换为布尔值 'isOut': bool(plan[2]), # 将tinyint转换为布尔值 'desc': plan[3], 'from': plan[4], 'to': plan[5], 'manNum': plan[6], 'homeNum': plan[7], 'completeNum': plan[8], 'dateTime': str(plan[9]), 'villageCount': plan[10], 'materialPrize': str(plan[11]) } plans_json.append(plan_json) # 返回JSON响应,包含msg, code和data response = { "msg": None, "code": 200, "data": plans_json } return jsonify(response), 200 except mysql.connector.Error as e: # 捕获MySQL错误并返回错误信息和状态码 500 response = { "msg": str(e), "code": 500, "data": [] } return jsonify(response), 500 finally: # 关闭数据库连接 conn.close() @app.route('/get-rkzy-base-data', methods=['GET']) def get_rkzy_base_data(): # 建立MySQL连接 conn = mysql.connector.connect( host="localhost", user="root", password="HelloWorld123", database="water" ) # 设置MySQL连接的字符集 conn.set_charset_collation('utf8') try: cursor = conn.cursor() # 查询Move_Plans表中的所有数据 query = """ SELECT * FROM SafetyZone """ cursor.execute(query) plans = cursor.fetchall() # 将查询结果转换为字典列表 plans_json = {} for plan in plans: plan_json = { 'population': plan[1], 'area': plan[2], 'capacity': plan[3] } plans_json = plan_json break # 返回JSON响应,包含msg, code和data response = { "msg": None, "code": 200, "data": plans_json } return jsonify(response), 200 except mysql.connector.Error as e: # 捕获MySQL错误并返回错误信息和状态码 500 response = { "msg": str(e), "code": 500, "data": [] } return jsonify(response), 500 finally: # 关闭数据库连接 conn.close() @app.route('/get-swyj-base-data', methods=['GET']) def get_swyj_base_data(): # 建立MySQL连接 conn = mysql.connector.connect( host="localhost", user="root", password="HelloWorld123", database="water" ) # 设置MySQL连接的字符集 conn.set_charset_collation('utf8') try: cursor = conn.cursor() # 查询Move_Plans表中的所有数据 query = """ SELECT * FROM YangtzeRiverWaterLevels """ cursor.execute(query) waterLevels = cursor.fetchall() # 将查询结果转换为字典列表 waterLevel_jsons = [] for level in waterLevels: waterLevel_json = { 'location': level[0], 'flood_protection_level': level[1], 'warning_level': level[2], 'guarantee_level':level[3] } waterLevel_jsons.append(waterLevel_json) # 返回JSON响应,包含msg, code和data response = { "msg": None, "code": 200, "data": waterLevel_jsons } return jsonify(response), 200 except mysql.connector.Error as e: # 捕获MySQL错误并返回错误信息和状态码 500 response = { "msg": str(e), "code": 500, "data": [] } return jsonify(response), 500 finally: # 关闭数据库连接 conn.close() ######################天气相关接口################################### # 假设 qxData 是一个全局变量,在这里初始化它 qxData = [] #获取6小时内降雨数据 @app.route('/get-six-hour-qx-data', methods=['GET']) def get_six_hour_qx_data(): global qxData # 准备第一个 API 请求的数据和头部信息 requestData = { "eqaddvcd": "421083", "pageSize": "80" } requestHead = { "token": "6C60F37D40B48DECF9A3F2CC1A5A50142C4BE52F26D15B171153F20DB63960696A439E435DDCA9AF925F13338470FFBDDC98CEE65AFBC92C2EB4E44C0A757DB5" } # 连接数据库 conn = pymysql.connect(host='localhost', user='root', password='HelloWorld123', db='water', charset='utf8mb4') try: # 发送第一个请求以获取基本数据 response = requests.post("{0}/shareddata/api/v1/baseinfo/stbprp/qx".format(SQServerHost), data=requestData, headers=requestHead) response.raise_for_status() jsonData = response.json() qxData = [] # 初始化一个字典以将 STCD 映射到其在 qxData 中的索引 qxDataDic = {} index = 0 # 连接数据库并开始操作 with conn.cursor() as cursor: for jsData in jsonData["data"]["list"]: if (114.115926000756 > jsData["LGTD"] and jsData["LGTD"] > 113.410714576698 and 30.2676753765206 > jsData["LTTD"] and jsData["LTTD"] > 29.8114499954776): qxData.append(jsData) qxDataDic[jsData["STCD"]] = index index += 1 # 检查数据库中是否已有该记录 cursor.execute("SELECT id FROM WaterHotPoints WHERE longitude=%s AND latitude=%s", (jsData["LGTD"], jsData["LTTD"])) result = cursor.fetchone() if result: pass else: # 如果记录不存在,执行插入操作 cursor.execute("INSERT INTO WaterHotPoints (type, longitude, latitude, name, name_pri) VALUES (%s, %s, %s, %s, %s)", (1, jsData['LGTD'], jsData['LTTD'], jsData['STNM'], "")) conn.commit() # 准备第二个 API 请求的数据 requestData = {} sts = ','.join([qx["STCD"] for qx in qxData]) # 假设 GetStartEndTime 函数返回一个元组 (etm, stm) etm, stm = GetStartEndTime(6) requestData["sts"] = sts requestData["etm"] = etm requestData["stm"] = stm # 发送第二个请求以获取降雨数据 response = requests.post("{0}/shareddata/api/v1/monitdata/pptn/qx".format(SQServerHost), data=requestData, headers=requestHead) response.raise_for_status() jsonData = response.json() # 初始化一个字典以存储六小时的降雨数据 dropDic = {} for jd in jsonData['data']: if jd["STCD"] not in dropDic: dropDic[jd["STCD"]] = jd["DRP"] else: dropDic[jd["STCD"]] += jd["DRP"] # 更新 qxData 以包含降雨数据 for key, value in dropDic.items(): if key in qxDataDic: qxData[qxDataDic[key]]["dropSum6"] = value # 返回最终的 qxData 作为 JSON 响应 response = { "msg": None, "code": 200, "data": qxData } return jsonify(response), 200 except requests.RequestException as e: # 处理任何 HTTP 错误 response = { "msg": str(e), "code": 500, "data": [] } return jsonify(response), 500 # 假设 swData 是一个全局变量,在这里初始化它 swData = [] @app.route('/get-six-hour-sw-data', methods=['GET']) def get_six_hour_sw_data(): global swData swData = [] # 准备第一个 API 请求的数据和头部信息 requestData = { "eqaddvcd": "421083", "pageSize": "80" } requestHead = { "token": "6C60F37D40B48DECF9A3F2CC1A5A50142C4BE52F26D15B171153F20DB63960696A439E435DDCA9AF925F13338470FFBDDC98CEE65AFBC92C2EB4E44C0A757DB5" } # 连接数据库 conn = pymysql.connect(host='localhost', user='root', password='HelloWorld123', db='water', charset='utf8mb4') try: # 发送第一个请求以获取基本数据 response = requests.post("{0}/shareddata/api/v1/baseinfo/stbprp/sw".format(SQServerHost), data=requestData, headers=requestHead) response.raise_for_status() jsonData = response.json() # 初始化一个字典以将 STCD 映射到其在 swData 中的索引 swDataDic = {} index = 0 # 连接数据库并开始操作 with conn.cursor() as cursor: for jsData in jsonData["data"]["list"]: if (114.115926000756 > jsData["LGTD"] and jsData["LGTD"] > 113.410714576698 and 30.2676753765206 > jsData["LTTD"] and jsData["LTTD"] > 29.8114499954776): swData.append(jsData) swDataDic[jsData["STCD"]] = index index += 1 # 检查数据库中是否已有该记录 cursor.execute("SELECT id FROM WaterHotPoints WHERE longitude=%s AND latitude=%s", (jsData["LGTD"], jsData["LTTD"])) result = cursor.fetchone() if result: pass else: # 如果记录不存在,执行插入操作 cursor.execute("INSERT INTO WaterHotPoints (type, longitude, latitude, name, name_pri) VALUES (%s, %s, %s, %s, %s)", (2, jsData['LGTD'], jsData['LTTD'], jsData['STNM'], "")) conn.commit() # 准备第二个 API 请求的数据 requestData = {} sts = ','.join([sw["STCD"] for sw in swData]) # 假设 GetStartEndTime 函数返回一个元组 (etm, stm) etm, stm = GetStartEndTime(6) requestData["sts"] = sts requestData["etm"] = etm requestData["stm"] = stm # 发送第二个请求以获取降雨数据 response = requests.post("{0}/shareddata/api/v1/monitdata/pptn/sw".format(SQServerHost), data=requestData, headers=requestHead) response.raise_for_status() jsonData = response.json() # 初始化字典以存储六小时的降雨数据和天气数据 dropDic = {} wthDic = {} for jd in jsonData['data']: if jd["STCD"] not in dropDic: dropDic[jd["STCD"]] = jd["DRP"] else: dropDic[jd["STCD"]] += jd["DRP"] wthDic[jd["STCD"]] = jd["WTH"] # 更新 swData 以包含降雨和天气数据 for key in dropDic.keys(): if key in swDataDic: swData[swDataDic[key]]["dropSum6"] = dropDic[key] swData[swDataDic[key]]["wth"] = wthDic[key] # 使用列表推导过滤出包含 'dropSum6' 字段的字典 filtered_swData = [item for item in swData if 'dropSum6' in item] # 更新全局变量 swData swData = filtered_swData # 返回最终的 swData 作为 JSON 响应 response = { "msg": None, "code": 200, "data": swData } return jsonify(response), 200 except requests.RequestException as e: # 处理任何 HTTP 错误 response = { "msg": str(e), "code": 500, "data": [] } return jsonify(response), 500 swHeightData = [] @app.route('/get-sw-height-data', methods=['GET']) def get_sw_height_data(): global swHeightData swDataDic = {} swHeightData = [] # 准备头部信息 requestHead = { "token": "6C60F37D40B48DECF9A3F2CC1A5A50142C4BE52F26D15B171153F20DB63960696A439E435DDCA9AF925F13338470FFBDDC98CEE65AFBC92C2EB4E44C0A757DB5" } # 如果本地文件不存在,则从API获取数据并保存到本地文件 if not os.path.exists("importZa.json"): requestData = { "pageSize": "10000" } response = requests.post("{0}/shareddata/api/v1/baseinfo/stbprp/sw".format(SQServerHost), data=requestData, headers=requestHead) response.raise_for_status() importZ = ["福田寺", "黄丝南", "下新河", "子贝渊", "沙螺"] jsonData = response.json() index = 0 for jsData in jsonData["data"]["list"]: esixt = False for im in importZ: if im in jsData["STNM"]: esixt = True if esixt: swHeightData.append(jsData) swDataDic[jsData["STCD"]] = index index += 1 # 保存到本地文件 newJ = json.dumps(swHeightData, ensure_ascii=False) with codecs.open("importZa.json", "w+", encoding="utf-8") as nj: nj.write(newJ) else: # 如果本地文件存在,则从文件加载数据 index = 0 with codecs.open("importZa.json", "r", encoding="utf-8") as iz: izContent = json.loads(iz.read()) for izc in izContent: swHeightData.append(izc) swDataDic[izc["STCD"]] = index index += 1 # 连接数据库 conn = pymysql.connect(host='localhost', user='root', password='HelloWorld123', db='water', charset='utf8mb4') # 连接数据库并开始操作 with conn.cursor() as cursor: for swhData in swHeightData: # 检查数据库中是否已有该记录 cursor.execute("SELECT id FROM WaterHotPoints WHERE name=%s", (swhData["STNM"])) result = cursor.fetchone() if result: pass else: # 如果记录不存在,执行插入操作 cursor.execute("INSERT INTO WaterHotPoints (type, longitude, latitude, name, name_pri) VALUES (%s, %s, %s, %s, %s)", (4, swhData['LGTD'], swhData['LTTD'], swhData['STNM'], "")) conn.commit() # 准备第二个 API 请求的数据 etm, stm = GetStartEndTime(0) requestData = { "pageSize": 300, "etm": etm, "stm": stm } # 发送第二个请求以获取水位数据 response = requests.post("{0}/shareddata/api/v1/monitdata/rw/st_was_r".format(SQServerHost), data=requestData, headers=requestHead) response.raise_for_status() jsonData = response.json() # 初始化字典以存储水位数据 dwzDic = {} upzDic = {} tgtqDic = {} for jd in jsonData['data']["list"]: dwzDic[jd["STCD"]] = -1 if jd["DWZ"] == None else jd["DWZ"] upzDic[jd["STCD"]] = -1 if jd["UPZ"] == None else jd["UPZ"] tgtqDic[jd["STCD"]] = -1 if jd["TGTQ"] == None else jd["TGTQ"] # 更新 swHeightData 以包含水位数据 for key in dwzDic.keys(): if key in swDataDic: swHeightData[swDataDic[key]]["dwz"] = dwzDic[key] swHeightData[swDataDic[key]]["upz"] = upzDic[key] swHeightData[swDataDic[key]]["tgtq"] = tgtqDic[key] # 使用列表推导过滤出包含 'dwz' 字段的字典 filtered_swData = [item for item in swHeightData if 'dwz' in item] # 处理没有 'dwz' 数据的项 sub_swData = [item for item in swHeightData if 'dwz' not in item] for ssw in sub_swData: ssw["dwz"] = -1 ssw["upz"] = -1 ssw["tgtq"] = -1 filtered_swData.append(ssw) # 更新全局变量 swHeightData swHeightData = filtered_swData # 返回最终的 swHeightData 作为 JSON 响应 response = { "msg": None, "code": 200, "data": swHeightData } return jsonify(response), 200 # 配置文件上传的目录 UPLOAD_FOLDER = 'uploads/images/' ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif'} app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER def allowed_file(filename): return '.' in filename and \ filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS def decode_mime(encoded_str): """解码MIME编码的字符串""" try: # 只处理以 `=?utf-8?Q?` 开头并以 `?=` 结尾的字符串 if encoded_str.startswith("=?utf-8?Q?") and encoded_str.endswith("?="): encoded_str = encoded_str[10:-2] decoded_bytes = quopri.decodestring(encoded_str) decoded_str = decoded_bytes.decode('utf-8') return decoded_str else: return encoded_str except Exception as e: print(f"Error decoding MIME: {e}") return encoded_str @app.route('/upload', methods=['POST']) def upload_file(): global UPLOAD_FOLDER # 创建上传目录 if not os.path.exists(UPLOAD_FOLDER): os.makedirs(UPLOAD_FOLDER) # 检查是否有文件在请求中 if 'file' not in request.files: return jsonify({'error': '没有文件部分'}) file = request.files['file'] # 如果用户没有选择文件,浏览器也会提交一个没有文件名的空部分 if file.filename == '': return jsonify({'error': '没有选择文件'}) # 解码文件名 decoded_filename = decode_mime(file.filename) print(decoded_filename) if file and allowed_file(decoded_filename): filename = decoded_filename file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename)) return jsonify({'message': '文件上传成功', 'filename': filename}) return jsonify({'error': '不支持的文件类型'}) if __name__ == '__main__': app.run(host='192.168.0.7',debug=True,port=15000)