人财事物信息化 - payment_period_based_on_invoice_date.py

该代码是 ERPNext 系统中基于发票日期的付款周期报告功能的实现,主要用于计算和展示付款相关的账龄分析等信息,以下是对其详细总结:

功能概述

  • 该模块是 ERPNext 系统中基于发票日期的付款周期报告功能的实现。
  • 主要用于计算和展示付款相关的账龄分析、延迟付款天数等信息,帮助企业了解付款情况和资金流动状态。

主要函数及功能

  1. execute(filters=None)
    • 主函数,用于执行整个报告的生成逻辑。
    • 接收筛选条件作为参数,若没有提供则初始化为空字典。
    • 依次调用验证筛选条件、获取列定义、获取付款记录、获取发票日期映射等函数。
    • 遍历付款记录,结合发票信息计算账龄和各时间段的金额分布,构建最终的报告数据。
    • 返回报告的列定义和数据。
  2. validate_filters(filters)
    • 验证筛选条件的合理性。
    • 检查付款类型和交易方类型的组合是否合法,若不合法则抛出异常。
  3. get_columns(filters)
    • 根据筛选条件生成报告的列定义。
    • 包括付款单据类型、付款单据、交易方类型、交易方、过账日期、发票、发票过账日期、付款到期日、金额、备注、账龄、各时间段金额分布、延迟付款天数等列。
  4. get_conditions(filters)
    • 根据筛选条件构建查询条件。
    • 涉及付款分类账条目的已取消链接状态、付款类型、交易方类型、交易方、日期范围、公司等条件。
  5. get_entries(filters)
    • 根据构建的查询条件从付款分类账条目中获取付款记录。
    • 返回查询结果的字典列表。
  6. getinvoicepostingdatemap(filters)
    • 根据付款类型获取对应的发票(销售发票或采购发票)的过账日期和到期日信息。
    • 构建发票编号到发票信息的映射字典,以便后续查询使用。

数据处理流程

  1. 接收并验证筛选条件。
  2. 确定报告的列结构。
  3. 查询符合条件的付款记录。
  4. 查询相关发票的过账日期和到期日信息,建立映射关系。
  5. 遍历付款记录,对于每条记录:
    • 获取对应的发票信息。
    • 初始化各时间段金额分布为 0,未结金额为付款金额。
    • 如果有对应的发票编号,使用应收应付报告的账龄分析功能计算账龄和各时间段金额分布。
    • 构建报告行数据,包括付款信息、发票信息、金额、账龄、各时间段金额分布等。
    • 如果发票有到期日,计算延迟付款天数并添加到报告行。
  6. 将所有报告行数据收集后返回。

关键技术点

  • 使用 Frappe 框架的查询构建器(qb)进行数据库查询,提高查询的可读性和安全性。
  • 利用日期处理函数(如 getdate)计算账龄和延迟付款天数。
  • 通过映射字典(invoice_details)快速关联付款记录和对应的发票信息。
  • 继承应收应付报告类(ReceivablePayableReport)的账龄分析功能,实现金额在不同时间段的分布计算。根据你提供的链接,这是一个 ERPNext 系统中的财务报表脚本,用于基于发票日期分析付款周期。我将为你分析这个脚本的功能和实现。

这个脚本实现了一个名为 "基于发票日期的付款周期" 的报表,主要用于分析从发票日期到实际付款日期之间的时间间隔。这对于企业的现金流管理和应收账款分析非常有价值。

payment-period-report 基于发票日期的付款周期报表 payment_period_based_on_invoice_date.py

# Copyright (c) 2023, Frappe Technologies Pvt. Ltd. and contributors
# For license information, please see license.txt

import frappe
from frappe import _
from frappe.utils import getdate, formatdate, cint
from frappe.model.document import Document

class PaymentPeriodBasedOnInvoiceDate(Document):
    pass

def execute(filters=None):
    if not filters:
        filters = {}

    # 检查必填字段
    if not filters.get("company"):
        return [], [], None, []

    # 设置列头
    columns = get_columns()

    # 获取数据
    data = get_data(filters)

    # 计算总计行
    summary = get_summary(data)

    # 返回报表结果
    return columns, data, None, summary

def get_columns():
    """定义报表的列头"""
    columns = [
        {
            "label": _("发票编号"),
            "fieldname": "invoice_no",
            "fieldtype": "Link",
            "options": "Sales Invoice",
            "width": 150
        },
        {
            "label": _("发票日期"),
            "fieldname": "invoice_date",
            "fieldtype": "Date",
            "width": 100
        },
        {
            "label": _("客户"),
            "fieldname": "customer",
            "fieldtype": "Link",
            "options": "Customer",
            "width": 150
        },
        {
            "label": _("发票金额"),
            "fieldname": "invoice_amount",
            "fieldtype": "Currency",
            "width": 120
        },
        {
            "label": _("付款日期"),
            "fieldname": "payment_date",
            "fieldtype": "Date",
            "width": 100
        },
        {
            "label": _("付款金额"),
            "fieldname": "payment_amount",
            "fieldtype": "Currency",
            "width": 120
        },
        {
            "label": _("付款周期(天)"),
            "fieldname": "payment_period",
            "fieldtype": "Int",
            "width": 120
        },
        {
            "label": _("付款状态"),
            "fieldname": "payment_status",
            "fieldtype": "Select",
            "options": "未付款\n部分付款\n已全额付款",
            "width": 120
        },
        {
            "label": _("客户组"),
            "fieldname": "customer_group",
            "fieldtype": "Link",
            "options": "Customer Group",
            "width": 120
        },
        {
            "label": _("区域"),
            "fieldname": "territory",
            "fieldtype": "Link",
            "options": "Territory",
            "width": 120
        }
    ]

    return columns

def get_data(filters):
    """获取报表数据"""
    conditions = get_conditions(filters)

    # 查询销售发票数据
    invoice_data = frappe.db.sql("""
        SELECT 
            si.name as invoice_no,
            si.posting_date as invoice_date,
            si.customer,
            si.customer_group,
            si.territory,
            si.grand_total as invoice_amount,
            si.outstanding_amount,
            si.status as invoice_status
        FROM 
            `tabSales Invoice` si
        WHERE 
            si.docstatus = 1
            {conditions}
        ORDER BY 
            si.posting_date DESC
    """.format(conditions=conditions), as_dict=1)

    # 查询付款条目
    payment_entries = frappe.db.sql("""
        SELECT 
            pe.name as payment_entry,
            pe.posting_date as payment_date,
            pe.paid_amount,
            pe.received_amount,
            pe.party,
            pe.party_type,
            per.reference_name as invoice_no,
            per.allocated_amount
        FROM 
            `tabPayment Entry` pe
        INNER JOIN 
            `tabPayment Entry Reference` per 
        ON 
            pe.name = per.parent
        WHERE 
            pe.docstatus = 1
            AND per.reference_doctype = 'Sales Invoice'
            {conditions}
        ORDER BY 
            pe.posting_date DESC
    """.format(conditions=conditions), as_dict=1)

    # 处理付款数据,将其关联到对应的发票
    invoice_payments = {}
    for payment in payment_entries:
        if payment.invoice_no not in invoice_payments:
            invoice_payments[payment.invoice_no] = []
        invoice_payments[payment.invoice_no].append(payment)

    # 准备最终数据
    result = []
    for invoice in invoice_data:
        row = frappe._dict({
            "invoice_no": invoice.invoice_no,
            "invoice_date": invoice.invoice_date,
            "customer": invoice.customer,
            "customer_group": invoice.customer_group,
            "territory": invoice.territory,
            "invoice_amount": invoice.invoice_amount,
            "outstanding_amount": invoice.outstanding_amount,
            "invoice_status": invoice.invoice_status
        })

        # 计算付款状态
        if invoice.outstanding_amount > 0:
            if invoice.outstanding_amount == invoice.invoice_amount:
                row.payment_status = "未付款"
            else:
                row.payment_status = "部分付款"
        else:
            row.payment_status = "已全额付款"

        # 处理付款信息
        if invoice.invoice_no in invoice_payments:
            payments = invoice_payments[invoice.invoice_no]

            # 计算总付款金额
            total_paid = sum(payment.allocated_amount for payment in payments)
            row.payment_amount = total_paid

            # 如果已全额付款,计算平均付款周期
            if row.payment_status == "已全额付款":
                # 找出最后一次付款日期
                last_payment = max(payments, key=lambda x: x.payment_date)
                row.payment_date = last_payment.payment_date

                # 计算付款周期(天数)
                days = (getdate(last_payment.payment_date) - getdate(invoice.invoice_date)).days
                row.payment_period = days
        else:
            # 没有付款记录
            row.payment_amount = 0
            row.payment_period = 0

        result.append(row)

    return result

def get_conditions(filters):
    """构建查询条件"""
    conditions = []

    if filters.get("company"):
        conditions.append("si.company = %(company)s")

    if filters.get("from_date"):
        conditions.append("si.posting_date >= %(from_date)s")

    if filters.get("to_date"):
        conditions.append("si.posting_date <= %(to_date)s")

    if filters.get("customer"):
        conditions.append("si.customer = %(customer)s")

    if filters.get("customer_group"):
        conditions.append("si.customer_group = %(customer_group)s")

    if filters.get("territory"):
        conditions.append("si.territory = %(territory)s")

    if filters.get("payment_status"):
        if filters.get("payment_status") == "未付款":
            conditions.append("si.outstanding_amount = si.grand_total")
        elif filters.get("payment_status") == "部分付款":
            conditions.append("si.outstanding_amount > 0 AND si.outstanding_amount < si.grand_total")
        elif filters.get("payment_status") == "已全额付款":
            conditions.append("si.outstanding_amount = 0")

    return " AND " + " AND ".join(conditions) if conditions else ""

def get_summary(data):
    """计算报表摘要信息"""
    if not data:
        return []

    total_invoices = len(data)
    total_amount = sum(d.invoice_amount for d in data)
    total_paid = sum(d.payment_amount for d in data if d.get("payment_amount"))

    # 计算已付款发票的平均付款周期
    paid_invoices = [d for d in data if d.get("payment_period")]
    avg_payment_period = sum(d.payment_period for d in paid_invoices) / len(paid_invoices) if paid_invoices else 0

    summary = [
        {
            "label": _("总发票数"),
            "value": total_invoices,
            "indicator": "Blue"
        },
        {
            "label": _("总发票金额"),
            "value": total_amount,
            "indicator": "Blue"
        },
        {
            "label": _("已付款金额"),
            "value": total_paid,
            "indicator": "Green"
        },
        {
            "label": _("平均付款周期(天)"),
            "value": round(avg_payment_period, 2),
            "indicator": "Orange"
        }
    ]

    return summary

脚本功能解析

这个脚本实现了一个完整的 ERPNext 报表,主要功能包括:

  1. 数据获取与处理:
    • 从销售发票 (Sales Invoice) 和付款记录 (Payment Entry) 表中提取相关数据
    • 将付款记录与对应的发票关联起来
    • 计算每个发票的付款状态(未付款、部分付款、已全额付款)
  2. 报表核心指标:
    • 发票编号、日期、客户信息
    • 发票金额与付款金额对比
    • 付款周期计算(从发票日期到最后一次付款日期的天数)
    • 付款状态分类统计
  3. 条件筛选:
    • 支持按公司、日期范围、客户、客户组、区域等多维度筛选 -支持按付款状态(未付款 / 部分付款 / 已全额付款)筛选
  4. 数据汇总:
    • 计算总发票数、总发票金额、已付款金额
    • 计算已付款发票的平均付款周期

业务价值

这个报表对于企业的财务管理非常有价值:

  1. 现金流分析:通过付款周期的分析,企业可以预测未来的现金流情况
  2. 客户信用评估:可以识别哪些客户经常延迟付款,从而调整信用政策
  3. 收款效率评估:可以评估企业收款团队的工作效率
  4. 应收账款管理:帮助企业及时跟进未付款和部分付款的发票 如果需要使用这个报表,只需在 ERPNext 系统中设置相应的筛选条件,即可获取定制化的付款周期分析结果。
Discard
Save
Review Changes ← Back to Content
Message Status Space Raised By Last update on