from api import API import pandas as pd import requests import json import time import os import mysql.connector import pandas as pd import json import numpy as np import mysql.connector from mysql.connector import Error class API: appKey = "ding5kqocon5s9oph5uq" appSecret = "HL1jgsIIfLAC0eTH0A1m4mwxUDqbgsiPeCCGGE3ocM6qJBTIW7Ivt9drxF_Z4Kb_" corp_id = "dinga88e3d35525b86ca" def generateToken(self) -> str: url = "https://oapi.dingtalk.com/gettoken" params = { "appkey": self.appKey, "appsecret": self.appSecret, } res = requests.get(url, params=params) result = res.json() if result.get("errcode") != 0: raise Exception(f"Failed to get token: {result}") return result["access_token"] def get_department_list(self, token, dept_id=None): # 注意:这是 GET 请求! url = "https://oapi.dingtalk.com/topapi/v2/department/listsub" params = { "access_token": token, } data = {} if dept_id is not None: data["dept_id"] = dept_id # 钉钉 v2 部门列表接口要求 POST JSON?其实官方文档说 GET,但实测需 POST # 根据你返回的错误,实际应为 POST with JSON headers = {"Content-Type": "application/json"} res = requests.post(url, params=params, json=data, headers=headers) return res.json() def get_member_list(self, token, dept_id, cursor=0, size=100): url = "https://oapi.dingtalk.com/topapi/v2/user/list" params = {"access_token": token} headers = {"Content-Type": "application/json"} data = { "dept_id": dept_id, "cursor": cursor, "size": size, } res = requests.post(url, params=params, json=data, headers=headers) return res.json() class GetDingDingID: def __init__(self): self.api = API() self.all_users = [] def fetch_all_departments(self, token, root_dept_id=None): """广度优先遍历所有部门(包括子部门)""" from collections import deque queue = deque() if root_dept_id is None: # 获取根部门(通常为1) root_res = self.api.get_department_list(token) if root_res.get("errcode") != 0: print("Error fetching root departments:", root_res) return [] departments = root_res.get("result", []) else: departments = [{"dept_id": root_dept_id}] for dept in departments: queue.append(dept["dept_id"]) all_dept_ids = set() while queue: dept_id = queue.popleft() if dept_id in all_dept_ids: continue all_dept_ids.add(dept_id) # 获取子部门 sub_res = self.api.get_department_list(token, dept_id=dept_id) if sub_res.get("errcode") == 0: sub_depts = sub_res.get("result", []) for d in sub_depts: queue.append(d["dept_id"]) else: print(f"Error fetching sub-departments of {dept_id}: {sub_res}") return list(all_dept_ids) def fetch_users_in_dept(self, token, dept_id): """获取某个部门的所有用户(处理分页)""" cursor = 0 size = 100 users = [] while True: res = self.api.get_member_list(token, dept_id, cursor=cursor, size=size) if res.get("errcode") != 0: print(f"Error fetching users in dept {dept_id}: {res}") break result = res.get("result", {}) user_list = result.get("list", []) for user in user_list: users.append({ "userid": user.get("userid"), "job_number": user.get("job_number"), "name": user.get("name"), "dept_id": dept_id }) if not result.get("has_more"): break cursor = result.get("next_cursor", cursor + size) time.sleep(0.1) # 避免触发限流 return users def write_to_bi(self, df): # 数据库连接信息 HS_DB_Config = { 'host': "f6-public.rwlb.rds.aliyuncs.com", 'user': "rw_operation_data_relay", 'password': "m+q5Z4%IVuF9bf", 'database': "f6operation_data_relay" } table_name = "dingding_job_number_table" # 替换为你的实际表名 # 建立数据库连接 connection = mysql.connector.connect( host=HS_DB_Config["host"], port=HS_DB_Config.get("port", 3306), user=HS_DB_Config["user"], password=HS_DB_Config["password"], database=HS_DB_Config["database"], use_pure=True # 👈 关键!强制使用纯 Python 实现 ) cursor = connection.cursor() try: # 查询表列名 cursor.execute(f"SHOW COLUMNS FROM {table_name}") columns_info = cursor.fetchall() db_columns = [col[0] for col in columns_info] # 提取列名 df = df.replace([None, np.nan, pd.NA, 'nan', 'NaN', 'NAN', ''], None) # 保留 DataFrame 中与数据库列名匹配的列 filtered_df = df[df.columns.intersection(db_columns)] # 如果没有匹配的列,直接返回 if filtered_df.empty: print("DataFrame 中没有与数据库表结构匹配的列。") return # 筛选列之后,插入前处理 dict 类型 filtered_df = filtered_df.copy() for col in filtered_df.columns: if filtered_df[col].apply(lambda x: isinstance(x, (dict, list)) if x is not None else False).any(): filtered_df.loc[:, col] = filtered_df[col].apply( lambda x: json.dumps(x, ensure_ascii=False) if x is not None else x ) # 构建插入语句 placeholders = ', '.join(['%s'] * len(filtered_df.columns)) # 使用反引号避免特殊列明 columns = ', '.join([f"`{col}`" for col in filtered_df.columns]) insert_sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})" # 将 DataFrame 写入数据库 for _, row in filtered_df.iterrows(): cursor.execute(insert_sql, tuple(row)) connection.commit() except Exception as e: connection.rollback() finally: cursor.close() connection.close() def clear_table_data(self): """ 清空指定 MySQL 表的数据。 参数已写死在函数内部,直接调用即可。 """ # 数据库连接信息 HS_DB_Config = { 'host': "f6-public.rwlb.rds.aliyuncs.com", 'user': "rw_operation_data_relay", 'password': "m+q5Z4%IVuF9bf", 'database': "f6operation_data_relay" } table_name = "dingding_job_number_table" # 要清空的表名 connection = None try: # 建立数据库连接 connection = mysql.connector.connect( host=HS_DB_Config["host"], port=HS_DB_Config.get("port", 3306), user=HS_DB_Config["user"], password=HS_DB_Config["password"], database=HS_DB_Config["database"], use_pure=True # 👈 关键!强制使用纯 Python 实现 ) if connection.is_connected(): cursor = connection.cursor() # 使用TRUNCATE清空表数据 cursor.execute(f"TRUNCATE TABLE {table_name}") connection.commit() except Error as e: if connection and connection.is_connected(): connection.rollback() finally: if connection and connection.is_connected(): cursor.close() connection.close() def main(self): try: token = self.api.generateToken() print("✅ Token generated successfully.") except Exception as e: print("❌ Failed to generate token:", e) return # 获取所有部门 ID(从根开始) dept_ids = self.fetch_all_departments(token) print(f"📁 Total departments found: {len(dept_ids)}") # 遍历每个部门,拉取用户 for i, dept_id in enumerate(dept_ids): print(f"({i + 1}/{len(dept_ids)}) Fetching users from dept: {dept_id}") users = self.fetch_users_in_dept(token, dept_id) self.all_users.extend(users) time.sleep(0.1) # 礼貌请求 # 转为 DataFrame df = pd.DataFrame(self.all_users) print(f"\n✅ Total users collected: {len(df)}") print(df.head()) self.clear_table_data() self.write_to_bi(df) # 保存到 CSV(可选) df.to_csv("dingtalk_users.csv", index=False, encoding="utf-8-sig") print("💾 Saved to 'dingtalk_users.csv'") return df if __name__ == '__main__': df = GetDingDingID().main()