Files
saas/module.py
2025-12-31 11:05:09 +08:00

384 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
每个动作对应的函数
"""
import back_ground_module
from back_ground_module import EmailProcessor
from log_config import configure_task_logger, configure_error_task_logger
import threading
# 获取已经配置好的常规日志记录器
logger = configure_task_logger()
# 获取已经配置好的错误任务日志记录器
error_task_logger = configure_error_task_logger()
class Module:
@staticmethod
def update_ngv_data():
print("正在执行update_ngv_data")
try:
update_NGV_data = back_ground_module.UpdateNGVData() # 需要注意实例化函数 函数里的init不要放查询等费时功能
thread = threading.Thread(target=update_NGV_data.main)
thread.start()
return "update_ngv_data已添加到后台任务执行"
except Exception as e:
print("update_ngv_data", e)
return False
@staticmethod
def daily_update_ngv_data():
print("这是daily_update_ngv_data")
try:
daily_update_NGV_data = back_ground_module.UpdateAllNGVDataDaily()
thread = threading.Thread(target=daily_update_NGV_data.main)
thread.start()
return "daily_update_ngv_data已添加到后台任务执行"
except Exception as e:
print("daily_update_ngv_data", e)
return False
@staticmethod
def new_services_revisit():
print("正在执行new_services_revisit")
try:
new_services_revisit = back_ground_module.NewServicesRevisit()
thread = threading.Thread(target=new_services_revisit.main)
thread.start()
return "new_services_revisit已添加到后台任务执行"
except Exception as e:
print("new_services_revisit", e)
return False
@staticmethod
def renew_services_revisit():
print("这是renew_services_revisit")
try:
renew_services_revisit = back_ground_module.RenewServicesRevisit()
thread = threading.Thread(target=renew_services_revisit.main)
thread.start()
return "renew_services_revisit已添加到后台任务执行"
except Exception as e:
print("renew_services_revisit", e)
return False
@staticmethod
def key_services_revisit():
print("这是key_services_revisit")
try:
key_services_revisit = back_ground_module.KeyCustomerRevisit()
thread = threading.Thread(target=key_services_revisit.main)
thread.start()
return "key_services_revisit已添加到后台任务执行"
except Exception as e:
print("key_services_revisit", e)
return False
@staticmethod
def jcb_efficient_car_pickup_data():
print("这是jcb_efficient_car_pickup_data")
try:
# key_services_revisit = back_ground_module.KeyCustomerRevisit()
efficient_car_pickup_data = back_ground_module.JCBEfficientCarPickup()
thread = threading.Thread(target=efficient_car_pickup_data.main)
thread.start()
return "jcb_efficient_car_pickup_data已添加到后台任务执行"
except Exception as e:
print("jcb_efficient_car_pickup_data", e)
return False
@staticmethod
def jcb_abnormal_revisit_data():
print("这是jcb_abnormal_revisit_data")
try:
# key_services_revisit = back_ground_module.KeyCustomerRevisit()
efficient_car_pickup_data = back_ground_module.JCBAbnormalRevisit()
thread = threading.Thread(target=efficient_car_pickup_data.main)
thread.start()
return "jcb_abnormal_revisit_data已添加到后台任务执行"
except Exception as e:
print("jcb_abnormal_revisit_data", e)
return False
@staticmethod
def data_Support_Private_Mini_Program():
print("这是data_Support_Private_Mini_Program")
try:
data_Support_Private_Mini_Program = back_ground_module.importSYXCXData()
thread = threading.Thread(target=data_Support_Private_Mini_Program.main)
thread.start()
return "data_Support_Private_Mini_Program已添加到后台任务执行"
except Exception as e:
print("data_Support_Private_Mini_Program", e)
return False
@staticmethod
def data_Support_Commission():
print("这是data_Support_Commission")
try:
data_Support_Commission = back_ground_module.importCommissionData()
thread = threading.Thread(target=data_Support_Commission.main)
thread.start()
return "data_Support_Commission已添加到后台任务执行"
except Exception as e:
print("data_Support_Commission", e)
return False
@staticmethod
def data_Support_DifferentIndustries():
print("data_Support_DifferentIndustries")
try:
data_Support_DifferentIndustries = back_ground_module.importDifferentIndustriesData()
thread = threading.Thread(target=data_Support_DifferentIndustries.main)
thread.start()
return "data_Support_DifferentIndustries已添加到后台任务执行"
except Exception as e:
print("data_Support_DifferentIndustries", e)
return False
@staticmethod
def data_Support_GroupNotification():
print("data_Support_GroupNotification")
try:
data_Support_GroupNotification = back_ground_module.importGroupNotificationData()
thread = threading.Thread(target=data_Support_GroupNotification.main)
thread.start()
return "data_Support_GroupNotification"
except Exception as e:
print("data_Support_GroupNotification", e)
return False
@staticmethod
def data_Update_Email():
print("data_Update_Email")
try:
data_Update_Email = back_ground_module.update_email_to_store_daily_use
thread = threading.Thread(target=EmailProcessor.main)
thread.start()
return "data_Update_Email"
except Exception as e:
print("data_Update_Email", e)
return False
@staticmethod
def data_Exception_Task():
print("data_Exception_Task")
try:
data_Exception_Task = back_ground_module.NewExceptionTask()
thread = threading.Thread(target=data_Exception_Task.main)
thread.start()
return "data_Exception_Task"
except Exception as e:
print("data_Exception_Task", e)
return False
@staticmethod
def revisit_all_information():
print("revisit_all_information")
try:
revisit_all_information = back_ground_module.RevisitAllInformation()
thread = threading.Thread(target=revisit_all_information.main)
thread.start()
return "data_Exception_Task"
except Exception as e:
print("data_Exception_Task", e)
return False
@staticmethod
def get_process_time():
print("get_process_time")
try:
get_process_time = back_ground_module.TimeConsumingProcess()
thread = threading.Thread(target=get_process_time.main)
thread.start()
return "data_Exception_Task"
except Exception as e:
print("data_Exception_Task", e)
return False
@staticmethod
def yida_Fpo_Jandaoyun():
print("yida_Fpo_Jandaoyun")
try:
yida_Fpo_Jandaoyun = back_ground_module.YDFpoJiandaoyun()
thread = threading.Thread(target=yida_Fpo_Jandaoyun.main)
thread.start()
return "data_Exception_Task"
except Exception as e:
print("data_Exception_Task", e)
return False
@staticmethod
def CRMDataProcessor():
print("CRMDataProcessor")
try:
CRM_data_processor = back_ground_module.CRMDataProcessor()
thread = threading.Thread(target=CRM_data_processor.main)
thread.start()
return "data_Exception_Task"
except Exception as e:
print("data_Exception_Task", e)
return False
@staticmethod
def update_ID_form():
print("update_ID_form")
try:
update_id_form = back_ground_module.update_ID_form()
thread = threading.Thread(target=update_id_form.main)
thread.start()
return "data_Exception_Task"
except Exception as e:
print("data_Exception_Task", e)
return False
@staticmethod
def install_event_dispatcher():
print("install_event_dispatcher")
try:
install_event_dispatcher = back_ground_module.InstallEventDispatcher()
thread = threading.Thread(target=install_event_dispatcher.main)
thread.start()
return "data_Exception_Task"
except Exception as e:
print("data_Exception_Task", e)
return False
@staticmethod
def test():
print("test")
try:
test = back_ground_module.NewServicesRevisitTest()
thread = threading.Thread(target=test.main)
thread.start()
return "data_Exception_Task"
except Exception as e:
print("data_Exception_Task", e)
return False
@staticmethod
def update_denominator_reporting_adjustment():
print("update_denominator_reporting_adjustment")
try:
update_denominator_reporting_adjustment = back_ground_module.DenominatorReportingAdjustment()
thread = threading.Thread(target=update_denominator_reporting_adjustment.main)
thread.start()
return "data_Exception_Task"
except Exception as e:
print("data_Exception_Task", e)
return False
@staticmethod
def update_molecule_reporting_adjustment_to_bi():
print("update_molecule_reporting_adjustment_to_bi")
try:
update_molecule_reporting_adjustment_to_bi = back_ground_module.MoleculeReportingAdjustment()
thread = threading.Thread(target=update_molecule_reporting_adjustment_to_bi.main)
thread.start()
return "data_Exception_Task"
except Exception as e:
print("data_Exception_Task", e)
return False
@staticmethod
def import_performance_data():
print("import_performance_data")
try:
import_performance_data = back_ground_module.ImportPerformanceData()
thread = threading.Thread(target=import_performance_data.main)
thread.start()
return "data_Exception_Task"
except Exception as e:
print("data_Exception_Task", e)
return False
@staticmethod
def data_monitor():
print("data_monitor")
try:
data_monitor = back_ground_module.DataMonitor()
thread = threading.Thread(target=data_monitor.main)
thread.start()
return "data_Exception_Task"
except Exception as e:
print("data_Exception_Task", e)
return False
@staticmethod
def new_dealer_service_order_to_bi():
print("new_dealer_service_order_to_bi")
try:
new_dealer_service_order_to_bi = back_ground_module.NewDealerServiceOrderToBI()
thread = threading.Thread(target=new_dealer_service_order_to_bi.main)
thread.start()
return "data_Exception_Task"
except Exception as e:
print("data_Exception_Task", e)
return False
@staticmethod
def non_standar_performance_to_BI():
print("non_standar_performance_to_BI")
try:
non_standar_performance_to_BI = back_ground_module.NonStandardPerformanceToBI()
thread = threading.Thread(target=non_standar_performance_to_BI.main)
thread.start()
return "data_Exception_Task"
except Exception as e:
print("data_Exception_Task", e)
return False
@staticmethod
def partner_settlement_to_BI():
print("partner_settlement_to_BI")
try:
partner_settlement_to_BI = back_ground_module.PartnerSettlementToBI()
thread = threading.Thread(target=partner_settlement_to_BI.main)
thread.start()
return "data_Exception_Task"
except Exception as e:
print("data_Exception_Task", e)
return False
@staticmethod
def GD_match_phone_number():
print("GD_match_phone_number")
try:
GD_match_phone_number = back_ground_module.GDMatchPhoneNumber()
thread = threading.Thread(target=GD_match_phone_number.main)
thread.start()
return "data_Exception_Task"
except Exception as e:
print("data_Exception_Task", e)
return False
@staticmethod
def province_city_person_relation_to_bi():
print("GD_match_phone_number")
try:
province_city_person_relation_to_bi = back_ground_module.ProvinceCityPersonRelationToBI()
thread = threading.Thread(target=province_city_person_relation_to_bi.main)
thread.start()
return "data_Exception_Task"
except Exception as e:
print("data_Exception_Task", e)
return False
@staticmethod
def renewal_to_do():
print("GD_match_phone_number")
try:
renewal_to_do = back_ground_module.RenewalToDo()
thread = threading.Thread(target=renewal_to_do.main)
thread.start()
return "data_Exception_Task"
except Exception as e:
print("data_Exception_Task", e)
return False
@staticmethod
def text3():
print("text3")
return True
# 如果有更多任务,可以继续添加相应的函数