Files
saas/test/续约请求接口结果保存.ipynb
2026-04-09 09:53:47 +08:00

139 lines
3.7 KiB
Plaintext

{
"cells": [
{
"metadata": {},
"cell_type": "markdown",
"source": "## 保存boss请求结果",
"id": "311a82d4faf8e2d"
},
{
"cell_type": "code",
"id": "initial_id",
"metadata": {
"collapsed": true,
"ExecuteTime": {
"end_time": "2025-11-10T06:24:23.858755Z",
"start_time": "2025-11-10T06:24:22.994108Z"
}
},
"source": [
"# 标准库\n",
"import os\n",
"import time\n",
"import random\n",
"import json\n",
"import binascii\n",
"from datetime import date, timedelta, datetime\n",
"from urllib.parse import quote\n",
"from pathlib import Path\n",
"\n",
"# 第三方库\n",
"import numpy as np\n",
"import pandas as pd\n",
"import requests\n",
"from pyDes import des, CBC, PAD_PKCS5\n",
"import mysql.connector\n",
"from mysql.connector import Error\n",
"\n",
"# PostgreSQL(如果你用到了)\n",
"import psycopg2\n",
"\n",
"# 自定义模块\n",
"from config import Config\n",
"from api import API\n",
"from back_ground_module import CommonModule\n",
"from log_config import configure_task_logger, configure_error_task_logger\n",
"\n",
"\n",
"logger = configure_task_logger()\n",
"error_task_logger = configure_error_task_logger()\n",
"api_instance = API()\n",
"common_module = CommonModule()\n",
"output_dir = \"output\" # 设置输出目录\n",
"os.makedirs(output_dir, exist_ok=True)\n",
"\n",
"def des_encrypt(s):\n",
" \"\"\"\n",
" DES 加密\n",
" :param s: 原始字符串\n",
" :return: 加密后字符串,16进制\n",
" \"\"\"\n",
" secret_key = 'HwdMBW8o'\n",
" iv = secret_key\n",
" k = des(secret_key, CBC, iv, pad=None, padmode=PAD_PKCS5)\n",
" en = k.encrypt(s, padmode=PAD_PKCS5)\n",
" return binascii.b2a_base64(en, newline=False)\n",
"\n",
"\n",
"def des_descrypt(s):\n",
" \"\"\"\n",
" DES 解密\n",
" :param s: 加密后的字符串,16进制\n",
" :return: 解密后的字符串\n",
" \"\"\"\n",
" secret_key = 'HwdMBW8o'\n",
" iv = secret_key\n",
" k = des(secret_key, CBC, iv, pad=None, padmode=PAD_PKCS5)\n",
" de = k.decrypt(binascii.a2b_base64(s), padmode=PAD_PKCS5)\n",
" return de\n",
"\n",
"data_NGV = common_module.get_renewal_details()\n",
"\n",
"\n",
"for i in range(0,len(data_NGV[\"date_fmt\"])):\n",
" t = time.time()\n",
" ts = int(round(t * 1000))\n",
" randint = random.randint(100000000, 999999999)\n",
" req = data_NGV['id_own_org'][i] + \"_\" + str(ts) + \"_\" + str(randint)\n",
" str_en = des_encrypt(req)\n",
" req_new = str_en.decode('utf-8')\n",
"\n",
" url = f\"http://manage.f6yc.com/hive-admin/py/yida/renewal/orgInfo\"\n",
" data = {\n",
" 'req':req_new,\n",
" 't':ts,\n",
" 'r':randint\n",
" }\n",
" res = requests.post(url,data=data)\n",
" # print(res.json.json())\n",
"\n",
" break\n",
"\n",
"print(len(data_NGV))"
],
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"距离今天还有120天的日期是:2026-03-10\n",
"29\n"
]
}
],
"execution_count": 2
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.6"
}
},
"nbformat": 4,
"nbformat_minor": 5
}