Python Simpy simulation enables AGV to run to the specified power to charge

Python Simpy simulation of AGV running process

1, Achieve the purpose

simpy provides a discrete-time simulation library. I want to use this simulation library to simulate the operation process of AGV in the workshop. A simple scenario can be described as: the simulation time starts, the AGV with an initial power of a certain value starts to run in the workshop, and goes to the charging area to charge until its power is lower than a certain threshold.
Obviously, this scenario is easy to implement with ordinary single threaded process control statements. Here, we only learn Simpy's modeling and simulation through this scenario, which is convenient to realize more complex scenarios and functions.

2, Basic knowledge required

You can refer to this article in Jianshu: Python SimPy simulation series (1)
Or Simpy's: Official link

3, Logical relation

To implement this scenario, there are two main processes that need to be described: operation and charging. There are also some auxiliary processes: SOC monitoring and SOC real-time recording, and two events: charging process activation and charging stop events. And this object must also have an attribute. As a Container that can be consumed, stored and has capacity limit, Container is used to represent the SOC of AGV.
At the initial moment, the AGV process starts running, and its initial SOC value is a random quantity of [20,40], which consumes a certain amount of power per second. At the same time, the SOC monitoring and real-time recording process also starts to run. When the SOC is monitored to be lower than a certain threshold, the AGV is immediately required to stop running and go to the charging area. After reaching the charging area, the charging process is activated, and then the charging process starts to execute. It starts to charge at a certain charging speed. After it is fully charged, the charging end event is executed, and then the AGV returns to the working area to continue running.

4, Code implementation

Operating conditions:
python >= 3.5
Install simpy

from random import seed, randint
seed(23)
import simpy
import matplotlib.pyplot as plt
AGV_power_curve=[]
class AGV:
    def __init__(self, env):
        self.env = env
        self.run_proc = env.process(self.run(env))            #AGV operation
        self.charing_proc = env.process(self.charging(env))   #Charging process
        self.charging_reactivate = env.event()
        self.charging_stop = env.event()
        self.SOC_monitor_proc = env.process(self.SOC_monitor(env))
        self.SOC = simpy.Container(env,init=randint(20, 40),capacity=100)
        self.charging_speed = 2 #Charge 1%/s per second
        self.csp_speed = 0.5 #Consumption speed 1%/s
        self.charging_flag = 0 #Charging sign
        self.SOC_record_proc = env.process(self.SOC_record(env))
        self.SOC_threshold = 20
    def run(self, env):
        """AGV Running process"""
        print("Start run time: ", env.now)
        print("AGV Initial power:", self.SOC.level)
        while True:
            # run 20-40 s
            print("Continuous running time: ", env.now)
            yield self.SOC.get(self.csp_speed) 
            yield env.timeout(1)                                 #Let it consume 1% power per second
            if self.charging_flag == 1:
                print("Pause time: ", env.now)
                yield self.charging_stop
    def charging(self, env):
        """Battery charging process"""
        while True:
            print("Time when the charging process is in sleep state:", env.now)
            yield self.charging_reactivate  # Sleep until the charging event is activated
            print("Charging activation time:", env.now)
            print("Start charging")
            charing_power = self.SOC.capacity - self.SOC.level
            for i in range(int(charing_power/self.charging_speed)):
                self.SOC.put(self.charging_speed) 
                yield env.timeout(1)
            print("Charging end time:", env.now)
            print("Power after charging:", self.SOC.level)
            self.charging_stop.succeed()
            self.charging_stop = env.event()
            self.charging_flag = 0
    def SOC_monitor(self,env):
        while True:
            if self.SOC.level/self.SOC.capacity *100 < self.SOC_threshold:
                self.charging_flag = 1
                print("Insufficient power, request charging, about to go to the charging area time:", env.now)
                yield env.timeout(2)
                print("Arrival time of charging area:", env.now)
                self.charging_reactivate.succeed()
                self.charging_reactivate = env.event()
                yield self.charging_stop
            yield env.timeout(1)  #Check every 2s
    def SOC_record(self,env):
        while True:
            AGV_power_curve.append(self.SOC.level)
            yield env.timeout(1)
            
def main():
    env = simpy.Environment()
    agv = AGV(env)
    env.run(until=2000)
    plt.plot(AGV_power_curve)
if __name__ == '__main__':
    main()

Keywords: Python multiple processes

Added by departedmind on Mon, 31 Jan 2022 08:50:40 +0200