Files
jdy_fastapi/app/module/module.py
T
2025-11-07 17:48:49 +08:00

227 lines
9.0 KiB
Python

import requests
import hashlib
from urllib.parse import quote
from datetime import datetime
from app.api import API
from typing import Optional, Dict, AnyStr
from PIL import Image, ImageEnhance
import pytesseract
import logging
from datetime import datetime
api_instance = API()
logger = logging.getLogger('app')
class F6Module:
@staticmethod
def get_captcha() -> AnyStr:
captcha_url = 'https://yunxiu.f6car.cn/kzf6/login/captcha-image'
response = requests.get(captcha_url)
with open('captcha.png', 'wb') as f:
f.write(response.content)
image = Image.open('captcha.png')
enhancer = ImageEnhance.Contrast(image)
image = enhancer.enhance(2.0)
enhancer = ImageEnhance.Brightness(image)
image = enhancer.enhance(1.5)
image = image.convert('L')
image = image.point(lambda x: 0 if x < 128 else 255, '1')
image.save('preprocessed_captcha.png')
captcha_text = pytesseract.image_to_string(image)
print(f"识别的验证码为: {captcha_text}")
return captcha_text
@staticmethod
def login_in(username: str, password: str, company_name: str = '默认门店',) -> Optional[requests.Response]:
url = "https://yunxiu.f6car.com/kzf6/login/confirm"
session = requests.Session()
header = {
'Referer': url,
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/130.0.0.0 Safari/537.36 Edg/129.0.0.0'
}
data = {
'username': username,
'password': hashlib.md5(password.encode('utf-8')).hexdigest(),
}
try:
res = session.post(url=url, headers=header, data=data)
res_json = res.json()
if res_json.get('message') == '请输入图形验证码':
logger.warning("触发图形验证码")
captcha_text = F6Module.get_captcha()
data.update({'imageCode': captcha_text})
res = session.post(url=url, headers=header, data=data)
res_json = res.json()
if res_json.get("data") is None:
return res
else:
group_id = ''
for group in res_json.get('data', []):
if group.get("groupName") == company_name:
group_id = group.get("groupId")
break
if not group_id:
logger.error(f"未找到公司名称: {company_name}")
return None
token = quote(res_json['token']) # URL 编码
url = (f'https://yunxiu.f6car.cn/kzf6/user/loginAfterChooseGroup?'
f'token={token}&groupId={group_id}&macAddress=')
res1 = session.get(url, cookies=res.cookies)
return res1
except Exception as e:
print(f"Error during login: {e}")
return None
def accept_login_message(self, data: Dict[str, str]) -> Dict[str, str]:
username = data['username']
password = data['password']
company_name = data['company_name']
res = self.login_in(username, password, company_name)
if res is not None:
cookies = requests.utils.dict_from_cookiejar(res.cookies)
json = res.json()
url = 'https://yunxiu.f6car.cn/hive/company/getGroupName'
res1 = requests.get(url=url, cookies=cookies)
data1 = res1.json()
if data1['code'] == 200:
if data1['data'] == company_name:
if json['status'] == 'success':
json['status'] = '登录成功'
elif json['status'] == 'Error':
json['status'] = '登录失败,请检查账号密码'
else:
json['status'] = '公司名称不正确或未选择公司名称,请重试'
else:
json['status'] = '请输入正确的账号密码并选择公司名称'
return json
else:
return {"status": "登录失败,请检查公司名称"}
def get_company_information(self, data: Dict[str, str]) -> Dict[str, str]:
username = data['username']
password = data['password']
timestamp = datetime.now().strftime("%Y-%m-%d %H-%M-%S")
print(username)
url = "https://yunxiu.f6car.com/kzf6/login/confirm"
session = requests.Session()
header = {
'Referer': url,
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/130.0.0.0 Safari/537.36 Edg/130.0.0.0'
}
data = {
'username': username,
'password': hashlib.md5(password.encode('utf-8')).hexdigest(),
}
try:
res = session.post(url=url, headers=header, data=data)
res_json = res.json()
if res_json.get('message') == '请输入图形验证码':
pass
jiandaoyun_data = {'api_key': '6694d3c4fcb69ca9a111a6c4', 'entry_id': '6736e2112ad50045f041a827'}
if res_json.get("data") is None:
print('单店')
res = self.login_in(username, password)
if res is not None:
cookies = requests.utils.dict_from_cookiejar(res.cookies)
url = 'https://yunxiu.f6car.cn/hive/company/getGroupName'
res = requests.get(url=url, cookies=cookies)
data = res.json()
store_name = data['data']
jiandaoyun_data['data_list'] = [
{"_widget_1731650067055": {"value": f'{username}{password}{timestamp}'},
"_widget_1731650067056": {"value": f"{store_name}"}}]
api_instance.entry_data_batch_create(jiandaoyun_data)
res = {'msg': f'{username}{password}{timestamp}'}
else:
jiandaoyun_data_list = []
for group in res_json.get('data', []):
append_data = {"_widget_1731650067055": {"value": f'{username}{password}{timestamp}'},
"_widget_1731650067056": {"value": f"{group['groupName']}"}}
jiandaoyun_data_list.append(append_data)
jiandaoyun_data['data_list'] = jiandaoyun_data_list
res = api_instance.entry_data_batch_create(jiandaoyun_data)
print(res)
res = {'msg': f'{username}{password}{timestamp}'}
return res
except Exception as e:
print(f"获取公司名称失败: {e}")
res = {'msg': '获取公司名称失败,请重新获取'}
return res
def get_store_information(self, data: Dict[str, str]) -> Dict[str, dict[str, str]]:
username = data['username']
password = data['password']
company_name = data['company_name']
timestamp = datetime.now().strftime("%Y-%m-%d %H-%M-%S")
login_response = self.login_in(username, password, company_name)
if login_response is None:
return {"msg": {'msg': '未执行', 'msg_details': '登录失败'}}
cookies = requests.utils.dict_from_cookiejar(login_response.cookies)
url = 'https://yunxiu.f6car.cn/hive/org/getPageOrgGroupMembers?currentPage=1&pageSize=100&name='
res = requests.get(url=url, cookies=cookies)
data = res.json()
org_lists = data['data']['list']
url = 'https://yunxiu.f6car.cn/member/car/carListForPermission'
json = {"pageSize": 10, "pageNo": 1}
car_res = requests.post(url=url, json=json, cookies=cookies)
total_cars_data = car_res.json()
total_cars = total_cars_data['data']['total']
url = 'https://yunxiu.f6car.cn/member/customer/listForPermission?pageSize=10&pageNo=1'
customer_res = requests.get(url=url, cookies=cookies)
total_customer_data = customer_res.json()
total_customer = total_customer_data['data']['total']
jiandaoyun_data = {'api_key': '6694d3c4fcb69ca9a111a6c4',
'entry_id': '673c38ccca57a5cf266eb18c'}
jiandaoyun_data_list = []
for org in org_lists:
append_data = {"_widget_1731999948708": {"value": f'{username}{password}{company_name}{timestamp}'},
"_widget_1731999948709": {"value": f"{org['orgName']}"}}
jiandaoyun_data_list.append(append_data)
jiandaoyun_data['data_list'] = jiandaoyun_data_list
api_instance.entry_data_batch_create(jiandaoyun_data)
res = {'msg': {"msg": f'{username}{password}{company_name}{timestamp}',
"total_cars": f"{total_cars}条客户车辆",
"total_customer": f"{total_customer}条客户"}}
return res
@staticmethod
def get_keep_heart(data: Dict[str, str]) -> Dict[str, str]:
return data