Home  >  Article  >  Backend Development  >  How to implement critical path and seven-grid graph calculation in Python

How to implement critical path and seven-grid graph calculation in Python

WBOY
WBOYforward
2023-04-26 17:28:141577browse

1. Main program

The main program mainly implements a Project class, which contains methods for calculating critical paths and seven-frame diagrams. The specific implementation method is as follows:

1. An Activity class is defined, which contains attributes such as activity id, name, duration, and immediate task list.

2. A Project class is defined, which includes attributes such as activity list, project duration, log, etc., as well as methods such as calculating critical path, calculating seven-frame diagram, calculating total float time, and calculating free float time.

3. Read activity information from the JSON file, create a Project object and add activities.

4. Call the calculate method of the Project object to calculate the earliest start time, latest start time and other data of each activity.

5. Call the calculate_critical_path method of the Project object to calculate the critical path.

6. Call the calculate_project_duration method of the Project object to calculate the total project duration.

7. Use Jinja2 template engine to generate the activity list of the project and set the key path

import json  
from datetime import datetime  
from typing import List  
  
import graphviz  
from jinja2 import Template  
  
from activity import Activity  
  
  
class Project:  
    def __init__(self):  
        self.activities: List[Activity] = []  
        self.duration = 0  
        self.logger = []  
  
    def log(self, log: str) -> None:  
        self.logger.append(log)  
  
    def add_activity(self, activity: Activity) -> None:  
        """  
        添加一个活动到项目中  
        :param  
        activity: 待添加的活动  
        """        # 将活动添加到项目中  
        self.activities.append(activity)  
  
    def calculate(self) -> None:  
        """ 计算整个项目的关键信息  
        :return: None  
        """        self.calculate_successor()  
        self._calculate_forward_pass()  # 计算正推法  
        self._calculate_backward_pass()  # 计算倒推法  
        self._calculate_total_floats()  # 计算总浮动时间  
        self._calculate_free_floats()  # 计算自由浮动时间  
  
    def calculate_successor(self) -> None:  
        self.log("开始计算紧后活动")  
        for act in self.activities:  
            for pred in act.predecessors:  
                for act_inner in self.activities:  
                    if act_inner.id == pred:  
                        act_inner.successors.append(act.id)  
  
    def _calculate_forward_pass(self) -> None:  
        self.log("## 开始正推法计算")  
        # 进入 while 循环,只有当所有活动的最早开始时间和最早完成时间都已经计算出来时,才会退出循环  
        while not self._is_forward_pass_calculated():  
            # 遍历每个活动  
            for activity in self.activities:  
                # 如果活动的最早开始时间已经被计算过,则跳过  
                if activity.est is not None:  
                    continue  
                # 如果活动没有前置活动, 则从1开始计算最早开始时间和最早结束时间  
                if not activity.predecessors:  
                    activity.est = 1  
                    activity.eft = activity.est + activity.duration - 1  
                    self.log(  
                        f"活动 {activity.name} 没有紧前活动,设定最早开始时间为1, 并根据工期计算最早结束时间为{activity.eft}")  
                else:  
                    # 计算当前活动的所有前置活动的最早完成时间  
                    predecessors_eft = [act.eft for act in self.activities if  
                                        act.id in activity.predecessors and act.eft is not None]  
                    # 如果当前活动的所有前置活动的最早完成时间都已经计算出来,则计算当前活动的最早开始时间和最早完成时间  
                    if len(predecessors_eft) == len(activity.predecessors):  
                        activity.est = max(predecessors_eft) + 1  
                        activity.eft = activity.est + activity.duration - 1  
                        self.log(  
                            f"活动 {activity.name} 紧前活动已完成正推法计算, 开始日期按最早开始时间里面最大的," +  
                            f"设定为{activity.est}并根据工期计算最早结束时间为{activity.eft}")  
  
                        # 更新项目总持续时间为最大最早完成时间  
        self.duration = max([act.eft for act in self.activities])  
  
    def _calculate_backward_pass(self) -> None:  
        """ 计算倒推法  
        :return: None  
        """  
        self.log("## 开始倒推法计算")  # 输出提示信息  
        # 进入 while 循环,只有当所有活动的最晚开始时间和最晚完成时间都已经计算出来时,才会退出循环  
        while not self._is_backward_pass_calculated():  
            # 遍历每个活动  
            for act in reversed(self.activities):  
                # 如果活动的最晚开始时间已经被计算过,则跳过  
                if act.lft is not None:  
                    continue  
                # 如果活动没有后继活动, 则从总持续时间开始计算最晚开始时间和最晚结束时间  
                if not act.successors:  
                    act.lft = self.duration  
                    act.lst = act.lft - act.duration + 1  
                    self.log(f"活动 {act.name} 没有紧后活动,按照正推工期设定最晚结束时间为{act.lft}," +  
                             f"并根据工期计算最晚开始时间为{act.lst}")  
                else:  
                    # 计算当前活动的所有后继活动的最晚开始时间  
                    successors_lst = self._calculate_lst(act)  
                    # 如果当前活动的所有后继活动的最晚开始时间都已经计算出来,则计算当前活动的最晚开始时间和最晚完成时间  
                    if len(successors_lst) == len(act.successors):  
                        act.lft = min(successors_lst) - 1  
                        act.lst = act.lft - act.duration + 1  
                        self.log(f"活动 {act.name} 紧后活动计算完成,按照倒推工期设定最晚结束时间为{act.lft}," +  
                                 f"并根据工期计算最晚开始时间为{act.lst}")  
        # 更新项目总持续时间为最大最晚完成时间  
        self.duration = max([act.lft for act in self.activities])  
  
    def _calculate_lst(self, activity: Activity) -> List[int]:  
        """计算某一活动的所有最晚开始时间  
        :param activity: 活动对象  
        :return: 最晚开始时间列表  
        """        rst = []  # 初始化结果列表  
        for act in activity.successors:  # 遍历该活动的后继活动  
            for act2 in self.activities:  # 遍历所有活动  
                if act2.id == act and act2.lst is not None:  # 如果找到了该后继活动且其最晚开始时间不为空  
                    rst.append(act2.lst)  # 将最晚开始时间加入结果列表  
        return rst  # 返回结果列表  
  
    def _is_forward_pass_calculated(self) -> bool:  
        """ 判断整个项目正推法计算已经完成  
        :return: 若已计算正向传递则返回True,否则返回False  
        """  
        for act in self.activities:  # 遍历所有活动  
            if act.est is None or act.eft is None:  # 如果该活动的最早开始时间或最早完成时间为空  
                return False  # 则返回False,表示还未计算正向传递  
        return True  # 如果所有活动的最早开始时间和最早完成时间都已计算,则返回True,表示已计算正向传递  
  
    def _is_backward_pass_calculated(self) -> bool:  
        """ 判断整个项目倒推法计算已经完成  
        :return: 若已计算倒推法则返回True,否则返回False  
        """        for act in self.activities:  # 遍历所有活动  
            if act.lst is None or act.lft is None:  # 如果该活动的最晚开始时间或最晚完成时间为空  
                return False  # 则返回False,表示还未计算倒推法  
        return True  # 如果所有活动的最晚开始时间和最晚完成时间都已计算,则返回True,表示已计算倒推法  
  
    def _calculate_total_floats(self) -> None:  
        """ 计算所有活动的总浮动时间  
        :return: None  
         """  
        self.log(f"## 开始计算项目所有活动的总浮动时间")  
        for act in self.activities:  # 遍历所有活动  
            if act.est is not None and act.lst is not None:  # 如果该活动的最早开始时间和最晚开始时间都已计算  
                act.tf = act.lst - act.est  # 则计算该活动的总浮动时间  
                self.log(f"计算{act.name}的总浮动时间" + f"最晚开始时间{act.lst} - 最早开始时间{act.est} = {act.tf}", )  
            else:  # 如果该活动的最早开始时间或最晚开始时间为空  
                act.tf = None  # 则将该活动的总浮动时间设为None  
  
    def _calculate_free_floats(self) -> None:  
        """ 计算所有活动的自由浮动时间  
        :return: None  
        """        self.log(f"## 开始计算项目所有活动的自由浮动时间")  # 输出提示信息  
        for act in self.activities:  # 遍历所有活动  
            if act.tf == 0:  # 如果该活动的总浮动时间为0  
                self.log(f"计算{act.name}的自由浮动时间" + f"因为{act.name}的总浮动时间为0,自由浮动时间为0")  # 输出提示信息  
                act.ff = 0  # 则将该活动的自由浮动时间设为0  
            elif act.tf > 0:  # 如果该活动的总浮动时间大于0  
                self.log(f"计算{act.name}的自由浮动时间")  # 输出提示信息  
                self.log(f"- {act.name}的总浮动时间{act.tf} > 0,")  # 输出提示信息  
                tmp = []  # 初始化临时列表  
                for act2 in self.activities:  # 遍历所有活动  
                    if act2.id in act.successors:  # 如果该活动是该活动的紧后活动  
                        self.log(f"- {act.name}的紧后活动{act2.name}的自由浮动动时间为{act2.tf}")  # 输出提示信息  
                        tmp.append(act2.tf)  # 将该紧后活动的自由浮动时间加入临时列表  
                if len(tmp) != 0:  # 如果临时列表不为空  
                    act.ff = act.tf - max(tmp)  # 则计算该活动的自由浮动时间  
                    if act.ff < 0:  
                        act.ff = 0  
                    self.log(f"- 用活动自己的总浮动{act.tf}减去多个紧后活动总浮动的最大值{max(tmp)} = {act.ff}")  
                else:  # 如果临时列表为空  
                    act.ff = act.tf  # 则将该活动的自由浮动时间设为总浮动时间  
  
    def calculate_critical_path(self) -> List[Activity]:  
        """ 计算整个项目的关键路径  
        :return: 整个项目的关键路径  
        """        ctc_path = []  # 初始化关键路径列表  
        for act in self.activities:  # 遍历所有活动  
            if act.tf == 0:  # 如果该活动的总浮动时间为0  
                ctc_path.append(act)  # 则将该活动加入关键路径列表  
        return ctc_path  # 返回关键路径列表  
  
    def calculate_project_duration(self) -> int:  
        """ 计算整个项目的持续时间  
        :return: 整个项目的持续时间  
        """        return max(activity.eft for activity in self.activities)  # 返回所有活动的最早完成时间中的最大值,即整个项目的持续时间  
  
  
# 从JSON文件中读取活动信息  
with open('activities.json', 'r', encoding='utf-8') as f:  
    activities_data = json.load(f)  
  
# 创建Project对象并添加活动  
project = Project()  
for activity_data in activities_data:  
    activity = Activity(  
        activity_data['id'],  
        activity_data['name'],  
        activity_data['duration'],  
        activity_data['predecessors']  
    )    project.add_activity(activity)  
  
# 计算每个活动的最早开始时间、最晚开始时间等数据  
project.calculate()  
  
# 计算关键路径和项目总工期  
critical_path = project.calculate_critical_path()  
project_duration = project.calculate_project_duration()  
  
# 生成项目的活动清单  
with open('template.html', 'r', encoding='utf-8') as f:  
    template = Template(f.read())  
html = template.render(  
    activities=project.activities,  
    critical_path=critical_path,  
    project_duration=project_duration,  
    log=project.logger  
)  
  
# 生成项目进度网络图  
aon_graph = graphviz.Digraph(format='png', graph_attr={'rankdir': 'LR'})  
for activity in project.activities:  
    aon_graph.node(str(activity.id), activity.name)  
    for predecessor in activity.predecessors:  
        aon_graph.edge(str(predecessor), str(activity.id))  
  
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')  
  
aon_filename = f"aon_{timestamp}"  
  
aon_graph.render(aon_filename)  
  
# 将项目进度网络图插入到HTML文件中  
aon_image = f'Precedence Diagramming Method: AON'  
html = html.replace('

Precedence Diagramming Method: AON:
[image]

', '

紧前关系绘图法: AON:
' + aon_image + '

') filename = datetime.now().strftime('%Y%m%d%H%M%S') + '.html' with open(filename, 'w', encoding='utf-8') as f: f.write(html)

2. Activity class

Program name: activity.py

class Activity:  
    """  
    活动类,用于表示项目中的一个活动。  
  
    Attributes:        id (int): 活动的唯一标识符。  
        name (str): 活动的名称。  
        duration (int): 活动的持续时间。  
        predecessors (List[int]): 活动的前置活动列表,存储前置活动的id。  
        est (int): 活动的最早开始时间。  
        lst (int): 活动的最晚开始时间。  
        eft (int): 活动的最早完成时间。  
        lft (int): 活动的最晚完成时间。  
        tf (int): 活动的总浮动时间。  
        ff (int): 活动的自由浮动时间。  
        successors (List[int]): 活动的后继活动列表,存储后继活动的Activity对象。  
    """  
    def __init__(self, id: int, name: str, duration: int, predecessors: List[int]):  
        """  
        初始化活动对象。  
  
        Args:            id (int): 活动的唯一标识符。  
            name (str): 活动的名称。  
            duration (int): 活动的持续时间。  
            predecessors (List[int]): 活动的前置活动列表,存储前置活动的id。  
        """        self.id = id  
        self.name = name  
        self.duration = duration  
        self.predecessors = predecessors  
        self.est = None  
        self.lst = None  
        self.eft = None  
        self.lft = None  
        self.tf = None  
        self.ff = None  
        self.successors = []  
  
    def __str__(self):  
        return f"id: {self.id}, name: {self.name}, est: {self.est}, lst: {self.lst}, eft: {self.eft}, lft: {self.lft},"  
        + f"successors: {self.successors}"

3. Task list JSON file

File name: activities.json

[  
  {    "id": 1,  
    "name": "A",  
    "duration": 2,  
    "predecessors": []  
  },  
  {  
    "id": 9,  
    "name": "A2",  
    "duration": 3,  
    "predecessors": []  
  },  
    {  
    "id": 10,  
    "name": "A3",  
    "duration": 2,  
    "predecessors": []  
  },  
  {  
    "id": 2,  
    "name": "B",  
    "duration": 3,  
    "predecessors": [  
      1,  
      9  
    ]  
  },  
  {  
    "id": 3,  
    "name": "C",  
    "duration": 4,  
    "predecessors": [  
      1  
    ]  
  },  
  {  
    "id": 4,  
    "name": "D",  
    "duration": 2,  
    "predecessors": [  
      2,10  
    ]  
  },  
  {  
    "id": 5,  
    "name": "E",  
    "duration": 3,  
    "predecessors": [  
      2  
    ]  
  },  
  {  
    "id": 6,  
    "name": "F",  
    "duration": 2,  
    "predecessors": [  
      3  
    ]  
  },  
  {  
    "id": 7,  
    "name": "G",  
    "duration": 3,  
    "predecessors": [  
      4,  
      5  
    ]  
  },  
  {  
    "id": 8,  
    "name": "H",  
    "duration": 2,  
    "predecessors": [  
      6,  
      7  
    ]  
  },  
  {  
    "id": 11,  
    "name": "H2",  
    "duration": 4,  
    "predecessors": [  
      6,  
      7  
    ]  
  }  
]

4. Output template file

  
  
  
      
    PMP关键路径计算  
      

活动清单

{% for activity in activities %} {% endfor %}
ID 活动名 持续时间 紧前活动 紧后活动 最早开始时间EST 最早结束时间EFT 最晚开始时间LST 最晚结束时间LFT 总浮动时间TF 自由浮动时间FF
{{ activity.id }} {{ activity.name }} {{ activity.duration }} {% for predecessor in activity.predecessors %} {% for act in activities %} {% if act.id == predecessor %} {{ act.name }} {% endif %} {% endfor %} {% if not loop.last %}, {% endif %} {% endfor %} {% for successor in activity.successors %} {% for act in activities %} {% if act.id == successor %} {{ act.name }} {% endif %} {% endfor %} {% if not loop.last %}, {% endif %} {% endfor %} {{ activity.est }} {{ activity.eft }} {{ activity.lst }} {{ activity.lft }} {{ activity.tf }} {{ activity.ff }}

关键路径是: {% for activity in critical_path %}{{ activity.name }}{% if not loop.last %} -> {% endif %}{% endfor %}

项目总工期: {{ project_duration }}

Precedence Diagramming Method: AON:
[image]

{% for i in log %} {% endfor %}
执行过程
{{i}}

The above is the detailed content of How to implement critical path and seven-grid graph calculation in Python. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:yisu.com. If there is any infringement, please contact admin@php.cn delete