人财事物信息化 - bank_statement_import.py
代码解释
这段代码实现了银行对账单导入功能,包含了数据验证、导入流程控制、文件处理、状态查询等功能,以下是对各部分的详细解释:
1. 类 BankStatementImport
该类继承自 DataImport
,用于处理银行对账单的导入操作。
- __init__
方法:调用父类的构造函数进行初始化。
- validate
方法:
- 验证导入文件或 Google Sheets URL 是否存在。
- 根据银行配置生成模板选项,包括列到字段的映射。
- 调用 validate_import_file
和 validate_google_sheets_url
方法进行文件和 URL 的验证。
- start_import
方法:
- 获取导入数据的预览,检查是否包含 “Bank Account” 列。
- 检查调度器是否激活,若未激活则抛出异常。
- 使用 enqueue
函数将导入任务放入后台队列执行。
2. 白名单函数
get_preview_from_template
:获取导入模板的预览数据。form_start_import
:启动导入流程。download_errored_template
:下载导入出错的模板文件。download_import_log
:下载导入日志。
3. 辅助函数
parse_data_from_template
:解析模板数据,过滤掉空行。start_import
:在后台作业中执行导入操作,包括更新映射数据库、添加银行账户信息、写入文件和调用Importer
进行数据导入。update_mapping_db
:更新银行交易映射数据库。add_bank_account
:在数据中添加银行账户信息。write_files
:根据文件扩展名(CSV 或 XLSX)将数据写入文件。write_xlsx
:将数据写入 XLSX 文件,处理非法字符和 HTML 标签。get_import_status
:获取导入状态,包括成功、失败和总记录数。get_import_logs
:获取导入日志。upload_bank_statement
:创建新的银行对账单导入文档。
代码优化
1. 错误处理
在文件操作和数据库操作中添加更详细的错误处理,提高代码的健壮性。例如,在 write_files
方法中:
def write_files(import_file, data):
full_file_path = import_file.file_doc.get_full_path()
parts = import_file.file_doc.get_extension()
extension = parts[1].lstrip(".")
try:
if extension == "csv":
with open(full_file_path, "w", newline="") as file:
writer = csv.writer(file)
writer.writerows(data)
elif extension in ("xlsx", "xls"):
write_xlsx(data, "trans", file_path=full_file_path)
except Exception as e:
frappe.log_error(f"Error writing file: {e}", title="File Write Error")
2. 代码复用
将一些重复的代码提取成独立的函数,提高代码的可维护性。例如,获取银行对账单导入文档的代码可以提取成一个函数:
def get_bank_statement_import(docname):
return frappe.get_doc("Bank Statement Import", docname)
3. 性能优化
在处理大量数据时,避免一次性加载所有数据到内存中。可以使用生成器或迭代器来逐行处理数据。
添加功能
1. 数据预处理
在导入数据之前,添加数据预处理功能,如数据清洗、格式转换等。例如,在 parse_data_from_template
方法中添加更多的数据清洗逻辑:
def parse_data_from_template(raw_data):
data = []
for _i, row in enumerate(raw_data):
if all(v in INVALID_VALUES for v in row):
# empty row
continue
# 数据清洗
cleaned_row = [str(cell).strip() if cell is not None else cell for cell in row]
data.append(cleaned_row)
return data
2. 进度跟踪
在导入过程中,添加进度跟踪功能,实时反馈导入进度。可以使用 frappe.publish_realtime
函数发送进度信息到前端:
def start_import(data_import, bank_account, import_file_path, google_sheets_url, bank, template_options):
"""This method runs in background job"""
update_mapping_db(bank, template_options)
data_import = get_bank_statement_import(data_import)
file = import_file_path if import_file_path else google_sheets_url
import_file = ImportFile("Bank Transaction", file=file, import_type="Insert New Records")
data = parse_data_from_template(import_file.raw_data)
if not data_import.get("payload_count"):
data_import.payload_count = len(data) - 1
if import_file_path:
add_bank_account(data, bank_account)
write_files(import_file, data)
total_rows = len(data) - 1
try:
i = Importer(data_import.reference_doctype, data_import=data_import)
for index, row in enumerate(data[1:], start=1):
# 处理每行数据
i.process_row(row)
progress = index / total_rows * 100
frappe.publish_realtime("bank_statement_import_progress", {
"data_import": data_import.name,
"progress": progress
})
i.import_data()
except Exception:
frappe.db.rollback()
data_import.db_set("status", "Error")
data_import.log_error("Bank Statement Import failed")
finally:
frappe.flags.in_import = False
frappe.publish_realtime("data_import_refresh", {"data_import": data_import.name})
3. 导入数据验证
在导入数据之前,添加数据验证功能,确保数据的完整性和准确性。例如,验证日期格式、金额范围等。可以在 start_import
方法中添加验证逻辑:
def start_import(data_import, bank_account, import_file_path, google_sheets_url, bank, template_options):
# ...
data = parse_data_from_template(import_file.raw_data)
headers = data[0]
for row in data[1:]:
row_dict = dict(zip(headers, row))
# 验证日期格式
date_str = row_dict.get("Transaction Date")
if date_str:
try:
# 假设日期格式为 YYYY-MM-DD
datetime.datetime.strptime(date_str, "%Y-%m-%d")
except ValueError:
frappe.log_error(f"Invalid date format in row: {row}", title="Date Validation Error")
# 验证金额范围
amount = row_dict.get("Amount")
if amount:
try:
amount = float(amount)
if amount < 0:
frappe.log_error(f"Negative amount in row: {row}", title="Amount Validation Error")
except ValueError:
frappe.log_error(f"Invalid amount format in row: {row}", title="Amount Validation Error")
# ...
通过以上优化和功能添加,代码的健壮性、可维护性和功能性都得到了提升。
如何一次性导入不同公司不同币种不同银行账户的流水文件?并自动生成收支分类统计日报
若要一次性导入不同公司、不同币种、不同银行账户的流水文件,并且自动生成收支分类统计日报,可以按以下步骤进行:
1. 数据结构设计
要保证数据模型能够记录公司、币种、银行账户以及流水信息。可以在 Bank Statement Import
类或者相关文档类型里添加对应的字段。
# 假设 Bank Statement Import 类增加以下字段
class BankStatementImport(DataImport):
# ... 已有字段 ...
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from frappe.types import DF
currency: DF.Link # 币种字段
# ... 其他字段 ...
2. 批量导入流水文件
要修改导入逻辑,使其能够处理多个流水文件,并且根据文件中的信息关联到对应的公司、币种和银行账户。
def batch_import_bank_statements(import_files):
for import_file in import_files:
# 假设文件路径包含公司、币种和银行账户信息,例如:CompanyA_USD_Bank1.csv
parts = import_file.split("_")
company = parts[0]
currency = parts[1]
bank_account = parts[2].replace(".csv", "")
bsi = frappe.new_doc("Bank Statement Import")
bsi.update({
"company": company,
"currency": currency,
"bank_account": bank_account,
"import_file": import_file,
"import_type": "Insert New Records"
})
bsi.save()
bsi.start_import()
3. 数据处理与统计
在导入数据之后,需要对流水数据进行处理,按公司、币种、银行账户和日期进行分类统计,生成收支分类统计日报。
def generate_daily_report(date):
# 查询当天的银行流水记录
filters = {
"transaction_date": date
}
bank_transactions = frappe.get_all("Bank Transaction", filters=filters, fields=["company", "currency", "bank_account", "amount", "transaction_type"])
report = {}
for transaction in bank_transactions:
company = transaction.company
currency = transaction.currency
bank_account = transaction.bank_account
amount = transaction.amount
transaction_type = transaction.transaction_type
if company not in report:
report[company] = {}
if currency not in report[company]:
report[company][currency] = {}
if bank_account not in report[company][currency]:
report[company][currency][bank_account] = {
"income": 0,
"expense": 0
}
if transaction_type == "Income":
report[company][currency][bank_account]["income"] += amount
elif transaction_type == "Expense":
report[company][currency][bank_account]["expense"] += amount
return report
4. 自动化任务
可以使用 Frappe 的调度器来实现每日自动生成统计日报的功能。
@frappe.whitelist()
def daily_report_task():
from frappe.utils import getdate
today = getdate()
report = generate_daily_report(today)
# 可以将报告保存为文档或者发送邮件通知相关人员
frappe.msgprint(f"收支分类统计日报: {report}")
# 在 hooks.py 中添加调度任务
scheduler_events = {
"daily": [
"your_app_name.daily_report_task"
]
}
5. 调用示例
# 批量导入流水文件
import_files = ["CompanyA_USD_Bank1.csv", "CompanyB_EUR_Bank2.csv"]
batch_import_bank_statements(import_files)
# 手动生成统计日报
from frappe.utils import getdate
today = getdate()
report = generate_daily_report(today)
print(report)
总结
通过以上步骤,你能够实现一次性导入不同公司、不同币种、不同银行账户的流水文件,并且自动生成收支分类统计日报。需要注意的是,要根据实际的数据结构和业务需求对代码进行适当调整。