人财事物信息化 - bank_statement_import.py

代码解释

这段代码实现了银行对账单导入功能,包含了数据验证、导入流程控制、文件处理、状态查询等功能,以下是对各部分的详细解释:

1. 类 BankStatementImport

该类继承自 DataImport,用于处理银行对账单的导入操作。 - __init__ 方法:调用父类的构造函数进行初始化。 - validate 方法: - 验证导入文件或 Google Sheets URL 是否存在。 - 根据银行配置生成模板选项,包括列到字段的映射。 - 调用 validate_import_filevalidate_google_sheets_url 方法进行文件和 URL 的验证。 - start_import 方法: - 获取导入数据的预览,检查是否包含 “Bank Account” 列。 - 检查调度器是否激活,若未激活则抛出异常。 - 使用 enqueue 函数将导入任务放入后台队列执行。

2. 白名单函数

  • get_preview_from_template:获取导入模板的预览数据。
  • form_start_import:启动导入流程。
  • download_errored_template:下载导入出错的模板文件。
  • download_import_log:下载导入日志。

3. 辅助函数

  • parse_data_from_template:解析模板数据,过滤掉空行。
  • start_import:在后台作业中执行导入操作,包括更新映射数据库、添加银行账户信息、写入文件和调用 Importer 进行数据导入。
  • update_mapping_db:更新银行交易映射数据库。
  • add_bank_account:在数据中添加银行账户信息。
  • write_files:根据文件扩展名(CSV 或 XLSX)将数据写入文件。
  • write_xlsx:将数据写入 XLSX 文件,处理非法字符和 HTML 标签。
  • get_import_status:获取导入状态,包括成功、失败和总记录数。
  • get_import_logs:获取导入日志。
  • upload_bank_statement:创建新的银行对账单导入文档。

代码优化

1. 错误处理

在文件操作和数据库操作中添加更详细的错误处理,提高代码的健壮性。例如,在 write_files 方法中:

def write_files(import_file, data):
    full_file_path = import_file.file_doc.get_full_path()
    parts = import_file.file_doc.get_extension()
    extension = parts[1].lstrip(".")

    try:
        if extension == "csv":
            with open(full_file_path, "w", newline="") as file:
                writer = csv.writer(file)
                writer.writerows(data)
        elif extension in ("xlsx", "xls"):
            write_xlsx(data, "trans", file_path=full_file_path)
    except Exception as e:
        frappe.log_error(f"Error writing file: {e}", title="File Write Error")

2. 代码复用

将一些重复的代码提取成独立的函数,提高代码的可维护性。例如,获取银行对账单导入文档的代码可以提取成一个函数:

def get_bank_statement_import(docname):
    return frappe.get_doc("Bank Statement Import", docname)

3. 性能优化

在处理大量数据时,避免一次性加载所有数据到内存中。可以使用生成器或迭代器来逐行处理数据。

添加功能

1. 数据预处理

在导入数据之前,添加数据预处理功能,如数据清洗、格式转换等。例如,在 parse_data_from_template 方法中添加更多的数据清洗逻辑:

def parse_data_from_template(raw_data):
    data = []

    for _i, row in enumerate(raw_data):
        if all(v in INVALID_VALUES for v in row):
            # empty row
            continue

        # 数据清洗
        cleaned_row = [str(cell).strip() if cell is not None else cell for cell in row]
        data.append(cleaned_row)

    return data

2. 进度跟踪

在导入过程中,添加进度跟踪功能,实时反馈导入进度。可以使用 frappe.publish_realtime 函数发送进度信息到前端:

def start_import(data_import, bank_account, import_file_path, google_sheets_url, bank, template_options):
    """This method runs in background job"""

    update_mapping_db(bank, template_options)

    data_import = get_bank_statement_import(data_import)
    file = import_file_path if import_file_path else google_sheets_url

    import_file = ImportFile("Bank Transaction", file=file, import_type="Insert New Records")

    data = parse_data_from_template(import_file.raw_data)
    if not data_import.get("payload_count"):
        data_import.payload_count = len(data) - 1

    if import_file_path:
        add_bank_account(data, bank_account)
        write_files(import_file, data)

    total_rows = len(data) - 1
    try:
        i = Importer(data_import.reference_doctype, data_import=data_import)
        for index, row in enumerate(data[1:], start=1):
            # 处理每行数据
            i.process_row(row)
            progress = index / total_rows * 100
            frappe.publish_realtime("bank_statement_import_progress", {
                "data_import": data_import.name,
                "progress": progress
            })
        i.import_data()
    except Exception:
        frappe.db.rollback()
        data_import.db_set("status", "Error")
        data_import.log_error("Bank Statement Import failed")
    finally:
        frappe.flags.in_import = False

    frappe.publish_realtime("data_import_refresh", {"data_import": data_import.name})

3. 导入数据验证

在导入数据之前,添加数据验证功能,确保数据的完整性和准确性。例如,验证日期格式、金额范围等。可以在 start_import 方法中添加验证逻辑:

def start_import(data_import, bank_account, import_file_path, google_sheets_url, bank, template_options):
    # ...
    data = parse_data_from_template(import_file.raw_data)
    headers = data[0]
    for row in data[1:]:
        row_dict = dict(zip(headers, row))
        # 验证日期格式
        date_str = row_dict.get("Transaction Date")
        if date_str:
            try:
                # 假设日期格式为 YYYY-MM-DD
                datetime.datetime.strptime(date_str, "%Y-%m-%d")
            except ValueError:
                frappe.log_error(f"Invalid date format in row: {row}", title="Date Validation Error")
        # 验证金额范围
        amount = row_dict.get("Amount")
        if amount:
            try:
                amount = float(amount)
                if amount < 0:
                    frappe.log_error(f"Negative amount in row: {row}", title="Amount Validation Error")
            except ValueError:
                frappe.log_error(f"Invalid amount format in row: {row}", title="Amount Validation Error")
    # ...

通过以上优化和功能添加,代码的健壮性、可维护性和功能性都得到了提升。

如何一次性导入不同公司不同币种不同银行账户的流水文件?并自动生成收支分类统计日报

若要一次性导入不同公司、不同币种、不同银行账户的流水文件,并且自动生成收支分类统计日报,可以按以下步骤进行:

1. 数据结构设计

要保证数据模型能够记录公司、币种、银行账户以及流水信息。可以在 Bank Statement Import 类或者相关文档类型里添加对应的字段。

# 假设 Bank Statement Import 类增加以下字段
class BankStatementImport(DataImport):
    # ... 已有字段 ...
    from typing import TYPE_CHECKING
    if TYPE_CHECKING:
        from frappe.types import DF
        currency: DF.Link  # 币种字段
        # ... 其他字段 ...

2. 批量导入流水文件

要修改导入逻辑,使其能够处理多个流水文件,并且根据文件中的信息关联到对应的公司、币种和银行账户。

def batch_import_bank_statements(import_files):
    for import_file in import_files:
        # 假设文件路径包含公司、币种和银行账户信息,例如:CompanyA_USD_Bank1.csv
        parts = import_file.split("_")
        company = parts[0]
        currency = parts[1]
        bank_account = parts[2].replace(".csv", "")

        bsi = frappe.new_doc("Bank Statement Import")
        bsi.update({
            "company": company,
            "currency": currency,
            "bank_account": bank_account,
            "import_file": import_file,
            "import_type": "Insert New Records"
        })
        bsi.save()
        bsi.start_import()

3. 数据处理与统计

在导入数据之后,需要对流水数据进行处理,按公司、币种、银行账户和日期进行分类统计,生成收支分类统计日报。

def generate_daily_report(date):
    # 查询当天的银行流水记录
    filters = {
        "transaction_date": date
    }
    bank_transactions = frappe.get_all("Bank Transaction", filters=filters, fields=["company", "currency", "bank_account", "amount", "transaction_type"])

    report = {}
    for transaction in bank_transactions:
        company = transaction.company
        currency = transaction.currency
        bank_account = transaction.bank_account
        amount = transaction.amount
        transaction_type = transaction.transaction_type

        if company not in report:
            report[company] = {}
        if currency not in report[company]:
            report[company][currency] = {}
        if bank_account not in report[company][currency]:
            report[company][currency][bank_account] = {
                "income": 0,
                "expense": 0
            }

        if transaction_type == "Income":
            report[company][currency][bank_account]["income"] += amount
        elif transaction_type == "Expense":
            report[company][currency][bank_account]["expense"] += amount

    return report

4. 自动化任务

可以使用 Frappe 的调度器来实现每日自动生成统计日报的功能。

@frappe.whitelist()
def daily_report_task():
    from frappe.utils import getdate
    today = getdate()
    report = generate_daily_report(today)
    # 可以将报告保存为文档或者发送邮件通知相关人员
    frappe.msgprint(f"收支分类统计日报: {report}")

# 在 hooks.py 中添加调度任务
scheduler_events = {
    "daily": [
        "your_app_name.daily_report_task"
    ]
}

5. 调用示例

# 批量导入流水文件
import_files = ["CompanyA_USD_Bank1.csv", "CompanyB_EUR_Bank2.csv"]
batch_import_bank_statements(import_files)

# 手动生成统计日报
from frappe.utils import getdate
today = getdate()
report = generate_daily_report(today)
print(report)

总结

通过以上步骤,你能够实现一次性导入不同公司、不同币种、不同银行账户的流水文件,并且自动生成收支分类统计日报。需要注意的是,要根据实际的数据结构和业务需求对代码进行适当调整。

Discard
Save
Review Changes ← Back to Content
Message Status Space Raised By Last update on