Optimize word cloud generation script by adding logging and exception handling.
This commit is contained in:
@@ -1,2 +0,0 @@
|
||||
2025-01-09 23:29:06,246 [INFO] 尝试连接到数据库: root@localhost:3306/Weibo_PublicOpinion_AnalysisSystem
|
||||
2025-01-09 23:29:06,346 [ERROR] 数据库连接失败: (1045, "Access denied for user 'root'@'localhost' (using password: YES)")
|
||||
+162
-71
@@ -5,95 +5,180 @@ import matplotlib.pyplot as plt
|
||||
from PIL import Image
|
||||
import numpy as np
|
||||
import pymysql
|
||||
import logging
|
||||
|
||||
def stopWordList():
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s [%(levelname)s] %(message)s',
|
||||
handlers=[
|
||||
logging.FileHandler("wordcloud_generator.log"),
|
||||
logging.StreamHandler()
|
||||
]
|
||||
)
|
||||
|
||||
# Global cache for stop words
|
||||
STOP_WORDS = set()
|
||||
|
||||
def load_stop_words():
|
||||
"""
|
||||
如果 stopWords.txt 文件内容较大,或被频繁读取,
|
||||
可以考虑将其缓存起来,避免重复读文件。
|
||||
Load and cache stop words.
|
||||
If the stop words file does not exist or fails to read, log an error and return an empty set.
|
||||
"""
|
||||
with open('./model/stopWords.txt', encoding='utf8') as f:
|
||||
return [line.strip() for line in f.readlines()]
|
||||
global STOP_WORDS
|
||||
if STOP_WORDS:
|
||||
return STOP_WORDS
|
||||
stop_words_path = './model/stopWords.txt'
|
||||
if not os.path.exists(stop_words_path):
|
||||
logging.error(f"Stop words file does not exist: {stop_words_path}")
|
||||
return set()
|
||||
try:
|
||||
with open(stop_words_path, encoding='utf8') as f:
|
||||
STOP_WORDS = set(line.strip() for line in f if line.strip())
|
||||
logging.info(f"Loaded {len(STOP_WORDS)} stop words")
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to load stop words file: {e}")
|
||||
return STOP_WORDS
|
||||
|
||||
def generate_word_cloud(text, mask_path, font_path, output_path):
|
||||
"""生成词云并保存到 output_path"""
|
||||
img = Image.open(mask_path)
|
||||
img_arr = np.array(img)
|
||||
"""
|
||||
Generate a word cloud and save it to output_path.
|
||||
|
||||
:param text: Processed text
|
||||
:param mask_path: Path to the mask image
|
||||
:param font_path: Path to the font file
|
||||
:param output_path: Path to save the generated word cloud image
|
||||
"""
|
||||
if not os.path.exists(mask_path):
|
||||
logging.error(f"Mask image file does not exist: {mask_path}")
|
||||
return
|
||||
try:
|
||||
img = Image.open(mask_path)
|
||||
img_arr = np.array(img)
|
||||
logging.info(f"Successfully loaded mask image: {mask_path}")
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to load mask image: {e}")
|
||||
return
|
||||
|
||||
wc = WordCloud(
|
||||
background_color="#fff",
|
||||
mask=img_arr,
|
||||
font_path=font_path
|
||||
)
|
||||
wc.generate_from_text(text)
|
||||
try:
|
||||
wc = WordCloud(
|
||||
background_color="#fff",
|
||||
mask=img_arr,
|
||||
font_path=font_path,
|
||||
max_words=2000,
|
||||
max_font_size=100,
|
||||
random_state=42,
|
||||
width=800,
|
||||
height=600
|
||||
)
|
||||
wc.generate_from_text(text)
|
||||
logging.info("Word cloud generated successfully")
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to generate word cloud: {e}")
|
||||
return
|
||||
|
||||
plt.figure(figsize=(8, 6))
|
||||
plt.imshow(wc, interpolation='bilinear')
|
||||
plt.axis('off')
|
||||
plt.savefig(output_path, dpi=300, bbox_inches='tight')
|
||||
plt.close() # 保存后关闭
|
||||
try:
|
||||
plt.figure(figsize=(8, 6))
|
||||
plt.imshow(wc, interpolation='bilinear')
|
||||
plt.axis('off')
|
||||
plt.savefig(output_path, dpi=300, bbox_inches='tight')
|
||||
plt.close()
|
||||
logging.info(f"Word cloud saved to: {output_path}")
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to save word cloud image: {e}")
|
||||
|
||||
def get_db_connection_interactive():
|
||||
"""
|
||||
通过终端交互获取数据库连接参数,若按回车则使用默认值。
|
||||
"""
|
||||
print("请依次输入数据库连接信息(直接按回车使用默认值):")
|
||||
|
||||
host = input(" 1. 主机 (默认: localhost): ") or "localhost"
|
||||
port_str = input(" 2. 端口 (默认: 3306): ") or "3306"
|
||||
port = int(port_str)
|
||||
|
||||
user = input(" 3. 用户名 (默认: root): ") or "root"
|
||||
password = input(" 4. 密码 (默认: 312517): ") or "12345678"
|
||||
db_name = input(" 5. 数据库名 (默认: Weibo_PublicOpinion_AnalysisSystem): ") or "Weibo_PublicOpinion_AnalysisSystem"
|
||||
|
||||
print(f"\n即将连接到数据库: {user}@{host}:{port}/{db_name}\n")
|
||||
Interactively obtain database connection parameters from the terminal.
|
||||
Press Enter to use default values.
|
||||
|
||||
return pymysql.connect(
|
||||
host=host,
|
||||
user=user,
|
||||
password=password,
|
||||
database=db_name,
|
||||
port=port,
|
||||
charset='utf8mb4'
|
||||
)
|
||||
:return: pymysql.connections.Connection object
|
||||
"""
|
||||
print("Please enter database connection information (press Enter to use default values):")
|
||||
|
||||
host = input(" 1. Host (default: localhost): ") or "localhost"
|
||||
port_str = input(" 2. Port (default: 3306): ") or "3306"
|
||||
try:
|
||||
port = int(port_str)
|
||||
except ValueError:
|
||||
logging.error(f"Invalid port number: {port_str}")
|
||||
port = 3306
|
||||
|
||||
user = input(" 3. Username (default: root): ") or "root"
|
||||
password = input(" 4. Password (default: 12345678): ") or "12345678"
|
||||
db_name = input(" 5. Database name (default: Weibo_PublicOpinion_AnalysisSystem): ") or "Weibo_PublicOpinion_AnalysisSystem"
|
||||
|
||||
logging.info(f"Attempting to connect to database: {user}@{host}:{port}/{db_name}")
|
||||
|
||||
try:
|
||||
connection = pymysql.connect(
|
||||
host=host,
|
||||
user=user,
|
||||
password=password,
|
||||
database=db_name,
|
||||
port=port,
|
||||
charset='utf8mb4'
|
||||
)
|
||||
logging.info("Database connection successful")
|
||||
return connection
|
||||
except pymysql.MySQLError as e:
|
||||
logging.error(f"Database connection failed: {e}")
|
||||
raise
|
||||
|
||||
def get_img(field, table_name, target_img_src, res_img_src, connection, font_path='STHUPO.TTF'):
|
||||
"""
|
||||
从数据库拉取指定字段的文本数据,分词处理后生成词云。
|
||||
:param field: 数据库字段名
|
||||
:param table_name: 数据表名
|
||||
:param target_img_src: 词云形状图
|
||||
:param res_img_src: 输出词云文件路径
|
||||
:param connection: 已建立的数据库连接
|
||||
:param font_path: 字体文件路径
|
||||
"""
|
||||
cursor = connection.cursor()
|
||||
sql = f'SELECT {field} FROM {table_name}'
|
||||
cursor.execute(sql)
|
||||
data = cursor.fetchall()
|
||||
Retrieve text data from a specified field and table in the database,
|
||||
perform word segmentation and stop word removal, then generate a word cloud.
|
||||
|
||||
:param field: Database field name
|
||||
:param table_name: Database table name
|
||||
:param target_img_src: Path to the mask image
|
||||
:param res_img_src: Path to save the generated word cloud image
|
||||
:param connection: Established database connection
|
||||
:param font_path: Path to the font file
|
||||
"""
|
||||
try:
|
||||
with connection.cursor() as cursor:
|
||||
sql = f'SELECT {field} FROM {table_name}'
|
||||
cursor.execute(sql)
|
||||
data = cursor.fetchall()
|
||||
logging.info(f"Fetched {len(data)} records from '{table_name}' table, field '{field}'")
|
||||
except pymysql.MySQLError as e:
|
||||
logging.error(f"Database query failed: {e}")
|
||||
return
|
||||
|
||||
text = ''
|
||||
for item in data:
|
||||
text += item[0] # item 是元组 (内容,),取第一个元素即可
|
||||
text = ''.join(item[0] for item in data if item[0])
|
||||
|
||||
cursor.close()
|
||||
# Tokenization & Stop word removal
|
||||
try:
|
||||
stop_words = load_stop_words()
|
||||
if not stop_words:
|
||||
logging.warning("Stop words set is empty, proceeding without stop word removal")
|
||||
cut_words = jieba.cut(text)
|
||||
filtered_words = [word for word in cut_words if word not in stop_words]
|
||||
final_text = ' '.join(filtered_words)
|
||||
logging.info(f"Completed tokenization and stop word removal, generated {len(filtered_words)} words")
|
||||
except Exception as e:
|
||||
logging.error(f"Text processing failed: {e}")
|
||||
return
|
||||
|
||||
# 分词 & 去停用词
|
||||
cut_words = jieba.cut(text)
|
||||
stop_words = set(stopWordList())
|
||||
filtered_words = [word for word in cut_words if word not in stop_words]
|
||||
final_text = ' '.join(filtered_words)
|
||||
|
||||
# 生成词云
|
||||
# Generate word cloud
|
||||
generate_word_cloud(final_text, target_img_src, font_path, res_img_src)
|
||||
|
||||
def main():
|
||||
# 1. 获取数据库连接(交互式输入)
|
||||
connection = get_db_connection_interactive()
|
||||
|
||||
# 2. 根据需求生成词云
|
||||
# 例如:从 article 表的 content 字段生成词云
|
||||
"""
|
||||
Main function to execute the word cloud generation process.
|
||||
"""
|
||||
try:
|
||||
# Obtain database connection interactively
|
||||
connection = get_db_connection_interactive()
|
||||
except Exception:
|
||||
logging.error("Failed to establish database connection, terminating program")
|
||||
return
|
||||
|
||||
try:
|
||||
# Generate word cloud as per requirements
|
||||
# Example: Generate word cloud from 'content' field in 'article' table
|
||||
get_img(
|
||||
field='content',
|
||||
table_name='article',
|
||||
@@ -101,10 +186,16 @@ def main():
|
||||
res_img_src='./static/contentCloud.jpg',
|
||||
connection=connection
|
||||
)
|
||||
print("词云生成完毕!")
|
||||
print("Word cloud generation completed!")
|
||||
except Exception as e:
|
||||
logging.error(f"An error occurred during word cloud generation: {e}")
|
||||
finally:
|
||||
# 关闭数据库连接
|
||||
connection.close()
|
||||
# Close the database connection
|
||||
try:
|
||||
connection.close()
|
||||
logging.info("Database connection closed")
|
||||
except Exception as e:
|
||||
logging.error(f"Error closing database connection: {e}")
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user