Files
F6--/张阳脚本/工具/宜搭表单收集信息.ipynb
2026-01-30 11:28:35 +08:00

402 lines
18 KiB
Plaintext

{
"cells": [
{
"metadata": {},
"cell_type": "markdown",
"source": "",
"id": "638d0568b123f7bb"
},
{
"metadata": {
"ExecuteTime": {
"end_time": "2024-12-09T06:59:57.185211Z",
"start_time": "2024-12-09T06:59:56.644377Z"
}
},
"cell_type": "code",
"source": [
"# 基础函数配置\n",
"import requests\n",
"from pathlib import Path\n",
"import json\n",
"import numpy as np\n",
"\n",
"ROOT = Path('.').absolute() # 当前工作目录\n",
"\n",
"def generateToken() -> str:\n",
" \"\"\" 生成 token \"\"\"\n",
"\n",
" token_api = 'https://api.dingtalk.com/v1.0/oauth2/accessToken'\n",
"\n",
" # 该信息在钉钉开放应用中\n",
" data = {\n",
" \"appKey\": \"ding5kqocon5s9oph5uq\",\n",
" \"appSecret\": 'HL1jgsIIfLAC0eTH0A1m4mwxUDqbgsiPeCCGGE3ocM6qJBTIW7Ivt9drxF_Z4Kb_'\n",
" }\n",
"\n",
" res = requests.post(token_api, json=data)\n",
" token = res.json()['accessToken']\n",
"\n",
" return token\n",
"\n",
"\n",
"class NpEncoder(json.JSONEncoder):\n",
" def default(self, obj):\n",
" if isinstance(obj, np.integer):\n",
" return int(obj)\n",
" elif isinstance(obj, np.floating):\n",
" return float(obj)\n",
" elif isinstance(obj, np.ndarray):\n",
" return obj.tolist()\n",
" else:\n",
" return super(NpEncoder, self).default(obj)\n",
"TOKEN = generateToken()\n",
"\n",
"def read_new(FORMID,formData):\n",
" \"\"\" 新建表单实例 \"\"\"\n",
" api = f'https://api.dingtalk.com/v1.0/yida/forms/instances'\n",
"\n",
" headers = {\n",
" \"Content-Type\": \"application/json\",\n",
" \"x-acs-dingtalk-access-token\": TOKEN\n",
" }\n",
" payload = {\n",
" \"formUuid\" : FORMID,\n",
" \"appType\" : \"APP_TNVBVZ3K8G56HG03Z45Q\",\n",
" \"formDataJson\" : json.dumps(formData, cls=NpEncoder),\n",
" \"systemToken\" : \"CH7669818R0WN18TYTYJ42PE6GY22WZN0BYWKD1\",\n",
" \"language\" : \"zh_CN\",\n",
" \"userId\" : \"yida_pub_account\"\n",
" }\n",
"\n",
" res = requests.post(api, headers=headers, json=payload)\n",
" print(res.json())\n",
"\n",
" return res.json()\n",
"FORMID = 'FORM-D60584CCEBDB4CEF8AA1CF81D4E50770KEMY'\n",
"formData = {'textField_m4gewm33': '2024-12-09 14:36:40', 'textField_m4glr9fm': '2024-12-09 14:36:40',\n",
" 'textField_m4glr9fn': '0:00:00.025503',\n",
" 'textField_m4glr9fo': 'C:\\\\Users\\\\Administrator.DESKTOP-7IC2USJ\\\\Downloads\\\\处理后_emoji表情样例.xls',\n",
" 'textField_m4glr9fp': 'C:\\\\Users\\\\Administrator.DESKTOP-7IC2USJ\\\\Downloads\\\\处理后_emoji表情样例.xls',\n",
" 'textField_m4gewm38': True}\n",
"aaaa = read_new(FORMID,formData)"
],
"id": "cd12ce80a0003a79",
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{'result': 'FINST-QP566FA10H0R4N3J9UMR98GEIKGA244BNOG4M35'}\n"
]
}
],
"execution_count": 10
},
{
"metadata": {},
"cell_type": "markdown",
"source": [
"修改模板\n",
"\n",
"steps1: 宜搭api固定函数\n"
],
"id": "13a21065991f8c93"
},
{
"metadata": {
"ExecuteTime": {
"end_time": "2024-12-09T03:25:59.429143Z",
"start_time": "2024-12-09T03:25:58.187681Z"
}
},
"cell_type": "code",
"source": [
"import requests\n",
"import json\n",
"from datetime import datetime\n",
"\n",
"class ExcelMergerApp:\n",
" # ... (其他代码)\n",
"\n",
" def generateToken(self) -> str:\n",
" \"\"\"生成访问令牌\"\"\"\n",
" token_api = 'https://api.dingtalk.com/v1.0/oauth2/accessToken'\n",
" data = {\n",
" \"appKey\": self.appKey,\n",
" \"appSecret\": self.appSecret\n",
" }\n",
" response = requests.post(token_api, json=data)\n",
" if response.status_code == 200:\n",
" return response.json().get('accessToken')\n",
" else:\n",
" raise Exception(f\"Failed to get token: {response.text}\")\n",
"\n",
" def forms_instances(self, token, formDataJson):\n",
" \"\"\"新建表单内容\"\"\"\n",
" api = 'https://api.dingtalk.com/v1.0/yida/forms/instances'\n",
" headers = {\n",
" \"Content-Type\": \"application/json\",\n",
" \"x-acs-dingtalk-access-token\": token\n",
" }\n",
" payload = {\n",
" \"appType\": \"APP_TNVBVZ3K8G56HG03Z45Q\",\n",
" \"systemToken\": \"CH7669818R0WN18TYTYJ42PE6GY22WZN0BYWKD1\",\n",
" \"userId\": \"yida_pub_account\", # 曹伟 id\n",
" \"formUuid\": \"FORM-D60584CCEBDB4CEF8AA1CF81D4E50770KEMY\",\n",
" \"formDataJson\": json.dumps(formDataJson)\n",
" }\n",
" response = requests.post(api, headers=headers, json=payload)\n",
" return response"
],
"id": "f60347dd63a15799",
"outputs": [
{
"ename": "NameError",
"evalue": "name '__file__' is not defined",
"output_type": "error",
"traceback": [
"\u001B[1;31m---------------------------------------------------------------------------\u001B[0m",
"\u001B[1;31mNameError\u001B[0m Traceback (most recent call last)",
"Cell \u001B[1;32mIn[1], line 187\u001B[0m\n\u001B[0;32m 185\u001B[0m \u001B[38;5;28;01mif\u001B[39;00m \u001B[38;5;18m__name__\u001B[39m \u001B[38;5;241m==\u001B[39m \u001B[38;5;124m\"\u001B[39m\u001B[38;5;124m__main__\u001B[39m\u001B[38;5;124m\"\u001B[39m:\n\u001B[0;32m 186\u001B[0m root \u001B[38;5;241m=\u001B[39m tk\u001B[38;5;241m.\u001B[39mTk()\n\u001B[1;32m--> 187\u001B[0m app \u001B[38;5;241m=\u001B[39m EmojiCleanerApp(root)\n\u001B[0;32m 188\u001B[0m root\u001B[38;5;241m.\u001B[39mmainloop()\n",
"Cell \u001B[1;32mIn[1], line 69\u001B[0m, in \u001B[0;36mEmojiCleanerApp.__init__\u001B[1;34m(self, root)\u001B[0m\n\u001B[0;32m 67\u001B[0m \u001B[38;5;66;03m# 记录程序启动时的打开时间\u001B[39;00m\n\u001B[0;32m 68\u001B[0m \u001B[38;5;28mself\u001B[39m\u001B[38;5;241m.\u001B[39mopen_time \u001B[38;5;241m=\u001B[39m datetime\u001B[38;5;241m.\u001B[39mnow()\n\u001B[1;32m---> 69\u001B[0m \u001B[38;5;28mself\u001B[39m\u001B[38;5;241m.\u001B[39mprocessing_info[\u001B[38;5;124m'\u001B[39m\u001B[38;5;124mtool_path\u001B[39m\u001B[38;5;124m'\u001B[39m] \u001B[38;5;241m=\u001B[39m get_application_path() \u001B[38;5;66;03m# 获取程序的绝对路径\u001B[39;00m\n\u001B[0;32m 70\u001B[0m \u001B[38;5;28mself\u001B[39m\u001B[38;5;241m.\u001B[39mprocessing_info[\u001B[38;5;124m'\u001B[39m\u001B[38;5;124mtool_name\u001B[39m\u001B[38;5;124m'\u001B[39m] \u001B[38;5;241m=\u001B[39m \u001B[38;5;28mself\u001B[39m\u001B[38;5;241m.\u001B[39mroot\u001B[38;5;241m.\u001B[39mtitle() \u001B[38;5;66;03m# 工具名称\u001B[39;00m\n\u001B[0;32m 71\u001B[0m \u001B[38;5;28mself\u001B[39m\u001B[38;5;241m.\u001B[39mprocessing_info[\u001B[38;5;124m'\u001B[39m\u001B[38;5;124mopen_time\u001B[39m\u001B[38;5;124m'\u001B[39m] \u001B[38;5;241m=\u001B[39m \u001B[38;5;28mself\u001B[39m\u001B[38;5;241m.\u001B[39mopen_time\u001B[38;5;241m.\u001B[39mstrftime(\u001B[38;5;124m'\u001B[39m\u001B[38;5;124m%\u001B[39m\u001B[38;5;124mY-\u001B[39m\u001B[38;5;124m%\u001B[39m\u001B[38;5;124mm-\u001B[39m\u001B[38;5;132;01m%d\u001B[39;00m\u001B[38;5;124m \u001B[39m\u001B[38;5;124m%\u001B[39m\u001B[38;5;124mH:\u001B[39m\u001B[38;5;124m%\u001B[39m\u001B[38;5;124mM:\u001B[39m\u001B[38;5;124m%\u001B[39m\u001B[38;5;124mS\u001B[39m\u001B[38;5;124m'\u001B[39m)\n",
"Cell \u001B[1;32mIn[1], line 20\u001B[0m, in \u001B[0;36mget_application_path\u001B[1;34m()\u001B[0m\n\u001B[0;32m 17\u001B[0m \u001B[38;5;28;01mreturn\u001B[39;00m os\u001B[38;5;241m.\u001B[39mpath\u001B[38;5;241m.\u001B[39mdirname(sys\u001B[38;5;241m.\u001B[39mexecutable)\n\u001B[0;32m 18\u001B[0m \u001B[38;5;28;01melse\u001B[39;00m:\n\u001B[0;32m 19\u001B[0m \u001B[38;5;66;03m# 如果是普通的Python脚本\u001B[39;00m\n\u001B[1;32m---> 20\u001B[0m \u001B[38;5;28;01mreturn\u001B[39;00m os\u001B[38;5;241m.\u001B[39mpath\u001B[38;5;241m.\u001B[39mdirname(os\u001B[38;5;241m.\u001B[39mpath\u001B[38;5;241m.\u001B[39mabspath(\u001B[38;5;18m__file__\u001B[39m))\n",
"\u001B[1;31mNameError\u001B[0m: name '__file__' is not defined"
]
}
],
"execution_count": 1
},
{
"metadata": {},
"cell_type": "markdown",
"source": "steps2:修改__init__方法以记录程序启动时的信息",
"id": "925e9a1b44b1586a"
},
{
"metadata": {},
"cell_type": "code",
"outputs": [],
"execution_count": null,
"source": [
"def __init__(self, root):\n",
" self.root = root\n",
" self.root.title(\"Excel 文件合并工具\")\n",
"\n",
" # ... (其他初始化代码)\n",
"\n",
" # 宜搭API配置\n",
" self.appKey = \"ding5kqocon5s9oph5uq\"\n",
" self.appSecret = \"HL1jgsIIfLAC0eTH0A1m4mwxUDqbgsiPeCCGGE3ocM6qJBTIW7Ivt9drxF_Z4Kb_\"\n",
"\n",
" # 记录程序启动时的打开时间\n",
" self.processing_info = {} # 用于存储处理信息的字典\n",
" self.open_time = datetime.now()\n",
" self.processing_info['textField_m4gewm35'] = self.get_application_path() # 获取程序的绝对路径\n",
" self.processing_info['textField_m4gewm34'] = self.root.title() # 工具名称\n",
" self.processing_info['textField_m4gewm36'] = self.open_time.strftime('%Y-%m-%d %H:%M:%S') # 程序打开时间\n",
"\n",
" # ... (其他初始化代码)"
],
"id": "d5395ddc56de83ea"
},
{
"metadata": {},
"cell_type": "markdown",
"source": "steps3:添加get_application_path方法",
"id": "7cd04f9038155668"
},
{
"metadata": {},
"cell_type": "code",
"outputs": [],
"execution_count": null,
"source": [
"import sys\n",
"import os\n",
"\n",
"def get_application_path(self):\n",
" \"\"\"根据是否为打包的exe文件,返回应用程序的绝对路径\"\"\"\n",
" if getattr(sys, 'frozen', False):\n",
" # 如果是打包的exe文件\n",
" return os.path.dirname(sys.executable)\n",
" else:\n",
" # 如果是普通的Python脚本\n",
" return os.path.dirname(os.path.abspath(__file__))"
],
"id": "683ae4c8c4ae5575"
},
{
"metadata": {},
"cell_type": "markdown",
"source": [
"steps4:修改处理的方法以记录信息并提交表单\n",
"\n",
"主要修改:\n",
"1. 在处理开始时,记录处理开始时间,并添加到处理信息字典中。\n",
"2. 在处理结束时,记录处理结束时间,并添加到处理信息字典中。\n",
"3. 在处理结束时,将处理信息提交给宜搭API,并将返回的实例ID添加到处理信息字典中。\n",
"4. 在处理结束时,将处理信息字典转换为JSON字符串,并提交给宜搭API。\n",
"\n",
"注意:\n",
"1. 文件路径需要根据工具来获取。\n",
"2. 提交表单放到finally里面。"
],
"id": "9e529693f4443bb5"
},
{
"metadata": {},
"cell_type": "code",
"outputs": [],
"execution_count": null,
"source": [
"def merge_files(self):\n",
" # 获取用户选择的信息\n",
" file1 = self.file1_path\n",
" file2 = self.file2_path\n",
" sheet1 = self.sheet1_var.get()\n",
" sheet2 = self.sheet2_var.get()\n",
" col1 = self.column1_var.get()\n",
" col2 = self.column2_var.get()\n",
" col3 = self.column3_var.get()\n",
"\n",
" # 检查所有必要信息是否已选择\n",
" if not all([file1, file2, sheet1, sheet2, col1, col2, col3]):\n",
" self.update_log(\"请确保所有选项都已选择。\\n\")\n",
" return\n",
"\n",
" start_time = datetime.now() # 记录处理开始时间\n",
" processing_info = {} # 用于存储处理信息的字典\n",
"\n",
" try:\n",
" # 读取数据\n",
" if file1.endswith('.xlsx'):\n",
" workbook1 = openpyxl.load_workbook(file1)\n",
" ws1 = workbook1[sheet1]\n",
" rows1 = list(ws1.iter_rows(values_only=True))\n",
" else:\n",
" workbook1 = xlrd.open_workbook(file1)\n",
" ws1 = workbook1.sheet_by_name(sheet1)\n",
" rows1 = [[ws1.cell_value(row, col) for col in range(ws1.ncols)] for row in range(ws1.nrows)]\n",
"\n",
" if file2.endswith('.xlsx'):\n",
" workbook2 = openpyxl.load_workbook(file2)\n",
" ws2 = workbook2[sheet2]\n",
" rows2 = list(ws2.iter_rows(values_only=True))\n",
" else:\n",
" workbook2 = xlrd.open_workbook(file2)\n",
" ws2 = workbook2.sheet_by_name(sheet2)\n",
" rows2 = [[ws2.cell_value(row, col) for col in range(ws2.ncols)] for row in range(ws2.nrows)]\n",
"\n",
" # 获取列索引\n",
" col1_idx = None\n",
" col2_idx = None\n",
" col3_idx = None\n",
" for idx, value in enumerate(rows1[0]):\n",
" if value == col1:\n",
" col1_idx = idx\n",
" break\n",
"\n",
" for idx, value in enumerate(rows2[0]):\n",
" if value == col2:\n",
" col2_idx = idx\n",
" if value == col3:\n",
" col3_idx = idx\n",
"\n",
" if col1_idx is None or col2_idx is None or col3_idx is None:\n",
" self.update_log(\"无法找到指定的列,请检查列名是否正确。\\n\")\n",
" return\n",
"\n",
" # 合并数据\n",
" merged_data = []\n",
" for row in rows1[1:]:\n",
" merged_data.append(list(row))\n",
"\n",
" for row in rows2[1:]:\n",
" card_number = row[col2_idx]\n",
" phone_number = row[col3_idx]\n",
" for merged_row in merged_data:\n",
" if merged_row[col1_idx] == card_number:\n",
" merged_row.append(phone_number)\n",
" break\n",
"\n",
" # 保存结果\n",
" output_path = filedialog.asksaveasfilename(defaultextension=\".xlsx\", filetypes=[(\"Excel files\", \"*.xlsx\")])\n",
" if output_path:\n",
" workbook_out = openpyxl.Workbook()\n",
" ws_out = workbook_out.active\n",
" headers = list(rows1[0]) + [rows2[0][col3_idx]]\n",
" ws_out.append(headers)\n",
" for row in merged_data:\n",
" ws_out.append(row)\n",
" workbook_out.save(output_path)\n",
" self.update_log(f\"合并完成,结果已保存到 {output_path}\\n\")\n",
"\n",
" # 收集处理信息\n",
" success = True\n",
" processing_info = {\n",
" 'textField_m4gewm33': start_time.strftime('%Y-%m-%d %H:%M:%S'), # 处理开始时间\n",
" 'textField_m4glr9fm': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), # 处理结束时间\n",
" 'textField_m4glr9fn': str(datetime.now() - start_time), # 处理时长\n",
" 'textField_m4glr9fo': os.path.abspath(file1), # 处理的文件绝对路径\n",
" 'textField_m4glr9fp': os.path.abspath(output_path), # 新文件保存的绝对路径\n",
" 'textField_m4gewm38': success, # 是否处理成功\n",
" }\n",
"\n",
" except Exception as e:\n",
" success = f\"处理失败:{str(e)}\"\n",
" processing_info = {\n",
" 'textField_m4gewm33': start_time.strftime('%Y-%m-%d %H:%M:%S'), # 处理开始时间\n",
" 'textField_m4glr9fm': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), # 处理结束时间\n",
" 'textField_m4glr9fn': str(datetime.now() - start_time), # 处理时长\n",
" 'textField_m4glr9fo': os.path.abspath(file1), # 处理的文件绝对路径\n",
" 'textField_m4glr9fp': '', # 新文件保存的绝对路径\n",
" 'textField_m4gewm38': success, # 是否处理成功\n",
" }\n",
" self.update_log(f\"处理过程中发生了错误: {str(e)}\\n\")\n",
" finally:\n",
" self.processing_info.update(processing_info)\n",
" # 生成令牌并提交表单\n",
" token = self.generateToken()\n",
" response = self.forms_instances(token, self.processing_info)\n",
" if response.status_code == 200:\n",
" processing_info['form_submission_status'] = \"提交成功\"\n",
" else:\n",
" processing_info['form_submission_status'] = f\"提交失败:{response.text}\"\n",
"\n",
" # 打印处理信息\n",
" print(\"处理信息:\")\n",
" for key, value in processing_info.items():\n",
" print(f\"{key}: {value}\")\n",
"\n",
" if isinstance(success, bool) and success:\n",
" self.update_log(f\"表单提交状态: {processing_info['form_submission_status']}\\n\")\n",
" elif isinstance(success, str) and \"处理失败\" in success:\n",
" self.update_log(f\"处理过程中发生了错误: {success}\\n\")"
],
"id": "aefcb627f40c99c2"
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.6"
}
},
"nbformat": 4,
"nbformat_minor": 5
}