161 lines
5.6 KiB
Python
161 lines
5.6 KiB
Python
import pandas as pd
|
|
import requests
|
|
import json
|
|
import numpy as np
|
|
import time
|
|
from typing import List, Dict
|
|
from tqdm import tqdm
|
|
|
|
|
|
class API:
|
|
def __init__(self):
|
|
self.appKey = "ding5kqocon5s9oph5uq"
|
|
self.appSecret = "HL1jgsIIfLAC0eTH0A1m4mwxUDqbgsiPeCCGGE3ocM6qJBTIW7Ivt9drxF_Z4Kb_"
|
|
|
|
def generateToken(self) -> str:
|
|
"""
|
|
函数功能:生成访问令牌(token)
|
|
Returns:
|
|
str: 返回生成的访问令牌字符串。此token用于后续API调用的身份验证。
|
|
"""
|
|
token_api = 'https://api.dingtalk.com/v1.0/oauth2/accessToken'
|
|
data = {
|
|
"appKey": self.appKey,
|
|
"appSecret": self.appSecret
|
|
}
|
|
try:
|
|
res = requests.post(token_api, json=data, timeout=10)
|
|
res.raise_for_status() # 抛出HTTP错误
|
|
token = res.json().get('accessToken')
|
|
if not token:
|
|
raise ValueError("获取Token失败,返回结果无accessToken字段")
|
|
return token
|
|
except Exception as e:
|
|
raise Exception(f"生成Token出错: {str(e)}")
|
|
|
|
@staticmethod
|
|
def Retrieve_employee_id(TOKEN, offset):
|
|
""" 获取在职员工列表 """
|
|
api = f'https://oapi.dingtalk.com/topapi/smartwork/hrm/employee/queryonjob?access_token={TOKEN}'
|
|
payload = {
|
|
"offset": offset,
|
|
"size": 50,
|
|
"status_list": "2,3,5,-1" # 在职、试用期、待入职、其他在职状态
|
|
}
|
|
try:
|
|
res = requests.post(api, json=payload, timeout=10)
|
|
res.raise_for_status()
|
|
return res.json()
|
|
except Exception as e:
|
|
raise Exception(f"获取员工ID列表出错(offset={offset}): {str(e)}")
|
|
|
|
@staticmethod
|
|
def Retrieve_employee_list(TOKEN, userid):
|
|
""" 获取员工花名册字段信息 """
|
|
api = f'https://oapi.dingtalk.com/topapi/smartwork/hrm/employee/v2/list?access_token={TOKEN}'
|
|
payload = {
|
|
"userid_list": userid,
|
|
"field_filter_list": "sys00-name", # 仅获取姓名字段
|
|
"agentid": "1405052868"
|
|
}
|
|
try:
|
|
res = requests.post(api, json=payload, timeout=10)
|
|
res.raise_for_status()
|
|
return res.json()
|
|
except Exception as e:
|
|
raise Exception(f"获取员工姓名出错(userid={userid}): {str(e)}")
|
|
|
|
|
|
class NpEncoder(json.JSONEncoder):
|
|
def default(self, obj):
|
|
if isinstance(obj, np.integer):
|
|
return int(obj)
|
|
elif isinstance(obj, np.floating):
|
|
return float(obj)
|
|
elif isinstance(obj, np.ndarray):
|
|
return obj.tolist()
|
|
else:
|
|
return super(NpEncoder, self).default(obj)
|
|
|
|
|
|
class GetDingDingID:
|
|
def __init__(self):
|
|
self.api_instance = API()
|
|
# 要排除的员工ID
|
|
self.exclude_user_id = "69365621842352"
|
|
# Excel保存路径(使用原始字符串避免转义问题)
|
|
self.excel_path = r"D:\Idea Project\F6+宜搭+其它(1)\张阳脚本\文件输出\dingding_user_list.xlsx"
|
|
|
|
def get_employee_info(self, token: str) -> List[Dict]:
|
|
"""
|
|
获取员工ID和姓名的核心逻辑
|
|
"""
|
|
offset = 0
|
|
employee_info_list = []
|
|
|
|
while True:
|
|
# 分页获取员工ID
|
|
res_list = self.api_instance.Retrieve_employee_id(token, offset)
|
|
|
|
# 检查API返回是否正常
|
|
if res_list.get("errcode") != 0:
|
|
raise Exception(f"API调用失败: {res_list.get('errmsg', '未知错误')}")
|
|
|
|
result = res_list.get("result", {})
|
|
data_list = result.get("data_list", [])
|
|
|
|
# 没有数据则退出循环
|
|
if not data_list:
|
|
break
|
|
|
|
# 遍历获取每个员工的姓名
|
|
for user_id in tqdm(data_list):
|
|
|
|
# 调用接口获取姓名(添加短暂延迟避免限流)
|
|
time.sleep(0.1)
|
|
name_res = self.api_instance.Retrieve_employee_list(token, user_id)
|
|
|
|
# 解析姓名
|
|
name = "未知姓名"
|
|
if name_res.get("errcode") == 0:
|
|
field_data = name_res.get("result", [{}])[0].get("field_data_list", [{}])[0]
|
|
name = field_data.get("field_value_list", [{}])[0].get("label", "未知姓名")
|
|
print(name_res)
|
|
# 添加到结果列表
|
|
employee_info_list.append({
|
|
"员工ID": user_id,
|
|
"员工姓名": name
|
|
})
|
|
|
|
# 更新分页游标,没有下一页则退出
|
|
next_cursor = result.get("next_cursor", 0)
|
|
if next_cursor == 0 or next_cursor == offset:
|
|
break
|
|
offset = next_cursor
|
|
|
|
return employee_info_list
|
|
|
|
def main(self):
|
|
try:
|
|
# 生成Token
|
|
token = self.api_instance.generateToken()
|
|
print(f"成功获取Token: {token[:10]}...")
|
|
|
|
# 获取员工信息
|
|
print("开始获取员工信息...")
|
|
employee_info = self.get_employee_info(token)
|
|
|
|
# 导出到Excel
|
|
if employee_info:
|
|
df = pd.DataFrame(employee_info)
|
|
df.to_excel(self.excel_path, index=False, engine="openpyxl")
|
|
print(f"成功导出{len(employee_info)}条员工信息到: {self.excel_path}")
|
|
else:
|
|
print("未获取到任何员工信息")
|
|
|
|
except Exception as e:
|
|
print(f"程序执行出错: {str(e)}")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
GetDingDingID().main() |