Files
WebServer/app.py
2025-08-25 14:54:37 +08:00

287 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from waitress import serve
from flask import Flask, render_template, request, redirect, url_for, jsonify, session, send_from_directory
from datetime import datetime
import os
import logging
import requests
from config import COMPANY_INFO, DATA_DIR, LOG_DIR
from languages import LANGUAGES, TRANSLATIONS, DEFAULT_LANGUAGE
from futu import *
import pandas as pd
app = Flask(__name__)
app.secret_key = os.urandom(24) # 设置会话密钥
# 配置日志
if not os.path.exists(LOG_DIR):
os.makedirs(LOG_DIR)
logging.basicConfig(
filename=os.path.join(LOG_DIR, 'app.log'),
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# 确保数据目录存在
if not os.path.exists(DATA_DIR):
os.makedirs(DATA_DIR)
# 获取当前语言
def get_current_language():
return session.get('language', DEFAULT_LANGUAGE)
# 设置语言
@app.route('/set_language/<lang_code>')
def set_language(lang_code):
if lang_code in LANGUAGES:
session['language'] = lang_code
return redirect(request.referrer or url_for('index'))
# 获取翻译文本
def get_translation(key, lang=None):
if lang is None:
lang = get_current_language()
return TRANSLATIONS.get(lang, {}).get(key, key)
@app.route('/')
def index():
lang = get_current_language()
return render_template('index.html',
company=COMPANY_INFO,
t=get_translation,
lang=lang,
languages=LANGUAGES)
@app.route('/about')
def about():
lang = get_current_language()
return render_template('about.html',
company=COMPANY_INFO,
t=get_translation,
lang=lang,
languages=LANGUAGES)
@app.route('/services')
def services():
lang = get_current_language()
return render_template('services.html',
company=COMPANY_INFO,
t=get_translation,
lang=lang,
languages=LANGUAGES)
@app.route('/download')
def download():
lang = get_current_language()
return render_template('download.html',
company=COMPANY_INFO,
t=get_translation,
lang=lang,
languages=LANGUAGES)
@app.route('/pricing')
def pricing():
lang = get_current_language()
return render_template('pricing.html',
company=COMPANY_INFO,
t=get_translation,
lang=lang,
languages=LANGUAGES)
@app.route('/help')
def help():
lang = get_current_language()
return render_template('help.html',
company=COMPANY_INFO,
t=get_translation,
lang=lang,
languages=LANGUAGES)
# 收到小程序,发送过来的数据,暂时在这里进行存储
@app.route('/contact', methods=['GET', 'POST'])
def contact():
lang = get_current_language()
if request.method == 'POST':
try:
form_data = request.get_json() # 直接解析 JSON
# 输入验证
if not form_data:
return jsonify({"status": "error", "message": "无效的JSON数据"}), 400
customerName = form_data.get('customerName', '').strip()
projectSource = form_data.get('projectSource', '').strip()
projectLeader = form_data.get('projectLeader', '').strip()
clientdemand = form_data.get('clientdemand', '').strip()
handovertasks = form_data.get('handovertasks', '').strip()
remarks = form_data.get('remarks', '').strip()
# 基本验证
if not customerName or not projectLeader:
return jsonify({"status": "error", "message": "客户名称和项目负责人不能为空"}), 400
# 生成文件名
filename = os.path.join(DATA_DIR, datetime.now().strftime("%Y%m%d_") + customerName + "_" + projectLeader + ".txt")
# 要保存的内容
content = f"文件生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
content += f"客户名称:{customerName}\n\n"
content += f"项目来源:{projectSource}\n\n"
content += f"项目负责人:{projectLeader}\n\n"
content += f"客户需求:{clientdemand}\n\n"
content += f"交接任务:{handovertasks}\n\n"
content += f"备注:{remarks}"
# 写入文件
with open(filename, "w", encoding="utf-8") as file:
file.write(content)
logging.info(f"成功保存数据到文件: {filename}")
return jsonify({"status": "success", "message": "提交成功"})
except Exception as e:
logging.error(f"处理数据时发生错误: {str(e)}")
return jsonify({"status": "error", "message": "服务器内部错误"}), 500
return render_template('contact.html',
company=COMPANY_INFO,
success=request.args.get('success'),
t=get_translation,
lang=lang,
languages=LANGUAGES)
# 服务日志目录中的文件
@app.route('/logs/<path:filename>')
def serve_log_file(filename):
return send_from_directory(LOG_DIR, filename)
# 服务下载目录中的文件
@app.route('/downloads/<path:filename>')
def serve_download_file(filename):
return send_from_directory('downloads', filename)
# 这个地方后期需要改成 长桥的数据源 实时推送
# 实时市场数据API端点
@app.route('/api/market-data')
def market_data():
# try:
# quote_ctx = OpenQuoteContext(host='127.0.0.1', port=11111)
# market_data_list = []
# ret_sub, err_message = quote_ctx.subscribe(['HK.00700'], [SubType.QUOTE], subscribe_push=False)
# # 先订阅 K 线类型。订阅成功后 OpenD 将持续收到服务器的推送False 代表暂时不需要推送给脚本
# if ret_sub == RET_OK: # 订阅成功
# ret, data = quote_ctx.get_stock_quote(['HK.00700']) # 获取订阅股票报价的实时数据
# if ret == RET_OK:
# # market_data_list.append({
# # 'symbol': data['name'], # 使用中文名称
# # 'price': float(data['last_price']),
# # 'change': float(data['last_price']),
# # 'change_percent': 0
# # })
# pass
# else:
# print('error:', data)
# else:
# print('subscription failed', err_message)
# quote_ctx.close() # 关闭当条连接OpenD 会在1分钟后自动取消相应股票相应类型的订阅
# return jsonify({
# 'status': 'success',
# 'data': market_data_list,
# 'timestamp': datetime.now().isoformat()
# })
# except ImportError:
# pass
try:
# 尝试使用akshare获取真实市场数据
import akshare as ak
import pandas as pd
# 获取指数实时数据
index_data = ak.stock_zh_index_spot()
index_codes = ['sh000001', 'sz399001'] # 上证指数和深证成指
index_df = index_data[index_data['code'].isin(index_codes)]
# 获取股票实时数据
stock_data = ak.stock_zh_a_spot()
stock_codes = ['sh601318', 'sz000858'] # 中国平安和五粮液
stock_df = stock_data[stock_data['code'].isin(stock_codes)]
# 合并数据
combined_data = pd.concat([index_df, stock_df], ignore_index=True)
market_data_list = []
for index, row in combined_data.iterrows():
# 处理价格字段指数使用latest股票使用price
price = row['latest'] if 'latest' in row else row['price']
change = row['change']
change_percent = row['change_percent']
market_data_list.append({
'symbol': row['name'], # 使用中文名称
'price': float(price),
'change': float(change),
'change_percent': float(change_percent)
})
return jsonify({
'status': 'success',
'data': market_data_list,
'timestamp': datetime.now().isoformat()
})
except ImportError:
# akshare未安装使用模拟数据
logging.error("akshare未安装使用模拟数据")
import random
base_prices = {'上证指数': 3000, '深证成指': 10000, '中国平安': 50, '五粮液': 200}
market_data = []
for symbol, base_price in base_prices.items():
change_percent = random.uniform(-0.01, 0.01)
current_price = round(base_price * (1 + change_percent), 2)
change_value = round(current_price - base_price, 2)
market_data.append({
'symbol': symbol,
'price': current_price,
'change': change_value,
'change_percent': round(change_percent * 100, 2)
})
return jsonify({
'status': 'success',
'data': market_data,
'timestamp': datetime.now().isoformat()
})
except Exception as e:
logging.error(f"获取市场数据时发生错误: {str(e)}")
# 出错时使用模拟数据
import random
base_prices = {'上证指数': 3000, '深证成指': 10000, '中国平安': 50, '五粮液': 200}
market_data = []
for symbol, base_price in base_prices.items():
change_percent = random.uniform(-0.01, 0.01)
current_price = round(base_price * (1 + change_percent), 2)
change_value = round(current_price - base_price, 2)
market_data.append({
'symbol': symbol,
'price': current_price,
'change': change_value,
'change_percent': round(change_percent * 100, 2)
})
return jsonify({
'status': 'success',
'data': market_data,
'timestamp': datetime.now().isoformat()
})
if __name__ == '__main__':
serve(app, host='0.0.0.0', port=778)