1
1
import copy
2
+ from typing import List , Dict , Tuple , Optional
2
3
import prettytable
3
- at_prcs_mapping = {} # arrivaltime : [processess]
4
- bt_at_mapping = {} # burst time : [processess]
5
-
6
- class CpuSheduling ():
7
- def __init__ (self , name :list = [], arrival_time :list = [], burst_time :list = [], time_quantum = None ) -> None :
8
- self .process = name
9
- self .arrival_time = arrival_time
10
- self .burst_time = burst_time
11
- self .time_quantum = time_quantum
12
-
13
- # checking if every process has a arrival time and burst time
4
+
5
+ class CpuSheduling :
6
+ def __init__ (self , prcs : List [str ] = [], arrival_time : List [float ] = [],
7
+ burst_time : List [float ] = [], time_quantum : Optional [float ] = None ) -> None :
8
+ """
9
+ Initializes the CPU scheduler with process data.
10
+
11
+ Args:
12
+ prcs: List of process names (e.g., ["P1", "P2"]).
13
+ arrival_time: List of arrival times for each process.
14
+ burst_time: List of burst times for each process.
15
+ time_quantum: Time quantum for Round Robin (optional, unused in FCFS).
16
+
17
+ Raises:
18
+ ValueError: If input lists have mismatched lengths or contain invalid types.
19
+ """
20
+ self .process : List [str ] = prcs
21
+ self .arrival_time : List [float ] = arrival_time
22
+ self .burst_time : List [float ] = burst_time
23
+ self .time_quantum : Optional [float ] = time_quantum
24
+ self .at_prcs_mapping : Dict [float , List [Tuple [str , float ]]] = {}
25
+
26
+ # Validate input lengths
14
27
if len (self .process ) != len (self .arrival_time ):
15
- raise ValueError ("Number of process(s) don 't match number of arrival time(s) or vice versa " )
28
+ raise ValueError ("Number of processes doesn 't match number of arrival times " )
16
29
if len (self .process ) != len (self .burst_time ):
17
- raise ValueError ("Number of process(s) don 't match number of burst time(s) or vice versa " )
18
-
19
- # checking if arrival time and burst time are of integer or float type
30
+ raise ValueError ("Number of processes doesn 't match number of burst times " )
31
+
32
+ # Validate input types
20
33
if not all (isinstance (at , (int , float )) for at in self .arrival_time ):
21
- raise ValueError ("arrival time can only have integer/float value(s) " )
34
+ raise ValueError ("Arrival times must be integers or floats " )
22
35
if not all (isinstance (bt , (int , float )) for bt in self .burst_time ):
23
- raise ValueError ("burst time can only have integer/float value(s) " )
36
+ raise ValueError ("Burst times must be integers or floats " )
24
37
25
- # displaying processess, arival time and burst time in a tabular format
26
- print (10 * "-" ,"given process data" , 10 * "-" )
38
+ # Display input data
39
+ print (10 * "-" , "Given Process Data" , 10 * "-" )
27
40
table = prettytable .PrettyTable ()
28
41
table .field_names = ["Process" , "Arrival Time" , "Burst Time" ]
29
42
for i in range (len (self .process )):
30
43
table .add_row ([self .process [i ], self .arrival_time [i ], self .burst_time [i ]])
31
- print (table )
32
- print ()
33
-
34
-
35
- def unique_at (self )-> list :
36
- """ returns unique arrival time in ascending order"""
37
- unique_at = []
38
- for at in self .arrival_time :
39
- if at not in unique_at :
40
- unique_at .append (at )
41
- unique_at .sort ()
42
- return unique_at
43
-
44
- def at_mapping (self )-> dict :
45
- """ returns mapping of arrival time and processess as a dictionary"""
46
- for index , at in enumerate (self .arrival_time ):
47
- if at not in at_prcs_mapping :
48
- at_prcs_mapping [at ] = [self .process [index ]]
49
- else :
50
- at_prcs_mapping [at ].append (self .process [index ])
51
- return at_prcs_mapping
52
-
53
- def bt_mapping (self )-> dict :
54
- """ returns mapping of burst time and arrival time as a dictionary"""
55
- for index , at in enumerate (self .arrival_time ):
56
- if at not in bt_at_mapping :
57
- bt_at_mapping [at ] = [self .burst_time [index ]]
58
- else :
59
- bt_at_mapping [at ].append (self .burst_time [index ])
60
- return bt_at_mapping
61
-
62
- def final_data (self ,mapping :dict )-> list :
63
- """ returns a list of processess in the order of their arrival time or burst time"""
44
+ print (table , "\n " )
45
+
46
+ def unique_at (self ) -> List [float ]:
47
+ """
48
+ Returns a sorted list of unique arrival times.
49
+
50
+ Returns:
51
+ A list of unique arrival times in ascending order.
52
+ """
53
+ return sorted (set (self .arrival_time ))
54
+
55
+ def at_mapping (self ) -> Dict [float , List [Tuple [str , float ]]]:
56
+ """
57
+ Maps arrival times to lists of (process, burst time) tuples.
58
+
59
+ Returns:
60
+ A dictionary where keys are arrival times and values are lists of
61
+ (process, burst time) tuples.
62
+ """
63
+ self .at_prcs_mapping .clear () # Prevent stale data
64
+ for i , at in enumerate (self .arrival_time ):
65
+ if at not in self .at_prcs_mapping :
66
+ self .at_prcs_mapping [at ] = []
67
+ self .at_prcs_mapping [at ].append ((self .process [i ], self .burst_time [i ]))
68
+ return self .at_prcs_mapping
69
+ # eg : {0: [("P1", 2)], 1: [("P2", 2)], 5: [("P3", 3)], 12: [("P4", 4)]}
70
+
71
+ def final_data (self , mapping : Dict [float , List [Tuple [str , float ]]], key : str = "process" ) -> List [float | str ]:
72
+ """
73
+ Converts a mapping into a flat list of processes or burst times ordered by arrival time.
74
+
75
+ Args:
76
+ mapping: Dictionary mapping arrival times to (process, burst time) tuples.
77
+ key: Either "process" (for process names) or "burst" (for burst times).
78
+
79
+ Returns:
80
+ A flat list of either process names or burst times in arrival order.
81
+ """
64
82
listed_data = []
65
- for prcs in self .unique_at ():
66
- listed_data .append (mapping [prcs ])
67
- data = [process for sublist in listed_data for process in sublist ]
68
- return data
69
-
70
- def check_halt (self ,arrival_time :list , ct :list )-> list :
71
- """ returns index and value if any halt is present in the process order"""
72
- correction_index = 0
73
- correction_value = 0
74
-
75
- for at in range (len (ct )- 1 ):
76
- if arrival_time [at + 1 ] > ct [at ]:
77
- correction_value = arrival_time [at + 1 ] - ct [at ]
78
- correction_index = at + 1
79
- return [correction_value , correction_index ]
80
-
81
- def fcfs (self ):
82
- """
83
- first come first serve short term shdeuling
84
- """
85
- execution_order = self .final_data (self .at_mapping ()) # process order
86
- process_ord = copy .deepcopy (execution_order ) # process order for printing if correction is required
87
- bt_ord = self .final_data (self .bt_mapping ()) # burst time in the order of arrival time
88
-
89
- # calculating completion time of each process
83
+ for at in self .unique_at ():
84
+ if at in mapping :
85
+ for proc , bt in mapping [at ]:
86
+ listed_data .append (proc if key == "process" else bt )
87
+ return listed_data
88
+
89
+ def fcfs (self ) -> None :
90
+ """
91
+ Implements First-Come-First-Serve scheduling.
92
+
93
+ Computes completion, turnaround, and waiting times, and displays the schedule
94
+ with a process order (including halts) and a detailed table.
95
+ """
96
+ execution_order = self .final_data (self .at_mapping (), "process" )
97
+ process_ord = copy .deepcopy (execution_order )
98
+ bt_ord = self .final_data (self .at_mapping (), "burst" )
99
+
100
+ # Calculate completion times with halts
90
101
ct = []
91
- for j in bt_ord :
92
- if ct :
93
- temp = ct [- 1 ] + j
94
- else :
95
- temp = j
96
- ct .append (temp )
97
-
98
- at = sorted (self .arrival_time ) # sorted arrival time
99
- crrction_val , crrction_index = self .check_halt (at , ct ) # correction value and index
100
-
101
- # inserting halt for correction
102
- if crrction_val == 0 :
103
- pass
104
- else :
105
- process_ord .insert (crrction_index ,f"halt for { crrction_val } sec(s)" )
106
-
107
- for correction in ct [crrction_index :]:
108
- ct [crrction_index ] += crrction_val
109
- crrction_index += 1
110
-
111
- # printing process order
112
- print ("fcfs order: " ,end = "" )
113
- for process in process_ord :
114
- if process == process_ord [- 1 ]:
115
- print (f"{ process } " ,end = "" )
116
- else :
117
- print (f"{ process } -> " ,end = "" )
118
- print ();print ()
119
-
120
- # list of turn around time for everyprocess
121
- tat_list = [a - b for a ,b in zip (ct ,sorted (self .arrival_time ))]
122
-
123
- # average turn around time
124
- tat = sum (tat_list ) / len (tat_list )
125
-
126
- # list of waiting time for each process
127
- wt_list = [a - b for a ,b in zip (tat_list ,bt_ord )]
128
-
129
- # average waiting time
130
- wt = sum (wt_list ) / len (wt_list )
131
-
132
- # printing process, arival time, burst time, completion time, turn around time, waiting time
102
+ current_time = 0
103
+ halt_count = 0
104
+ for i , (proc , bt ) in enumerate (zip (execution_order , bt_ord )):
105
+ prcs_at = self .arrival_time [self .process .index (proc )]
106
+ # Check for halt before starting the process
107
+ if prcs_at > current_time :
108
+ halt_duration = prcs_at - current_time
109
+ process_ord .insert (i + halt_count , f"halt for { halt_duration } sec(s)" )
110
+ halt_count += 1
111
+ current_time = prcs_at
112
+ # Process execution
113
+ current_time += bt
114
+ ct .append (current_time )
115
+
116
+ # Calculate turnaround and waiting times
117
+ at_order = [self .arrival_time [self .process .index (p )] for p in execution_order ]
118
+ tat_list = [c - a for c , a in zip (ct , at_order )]
119
+ wt_list = [max (0 , t - b ) for t , b in zip (tat_list , bt_ord )]
120
+ tat = sum (tat_list ) / len (tat_list ) if tat_list else 0
121
+ wt = sum (wt_list ) / len (wt_list ) if wt_list else 0
122
+
123
+ # Print process order
124
+ print ("fcfs order: " , end = "" )
125
+ for proc in process_ord :
126
+ print (f"{ proc } " , end = "" if proc == process_ord [- 1 ] else " -> " )
127
+ print ("\n " )
128
+
129
+ # Print table
133
130
table = prettytable .PrettyTable ()
134
- table .field_names = ["Process" , "Arrival Time" , "Burst Time" , "Completion Time" , "Turn around time " , "waiting time " ]
131
+ table .field_names = ["Process" , "Arrival Time" , "Burst Time" , "Completion Time" , "Turnaround Time " , "Waiting Time " ]
135
132
for i in range (len (self .process )):
136
- table .add_row ([execution_order [i ], at [i ], bt_ord [i ],ct [i ],tat_list [i ],wt_list [i ]])
133
+ table .add_row ([execution_order [i ], at_order [i ], bt_ord [i ], ct [i ], tat_list [i ], wt_list [i ]])
137
134
print (table )
138
- print (f"turn around time -> { tat } " )
139
- print (f"average waiting time was -> { wt } " )
140
-
141
-
135
+ print (f"Average turnaround time: { tat :.2f} " )
136
+ print (f"Average waiting time: { wt :.2f} " )
137
+
142
138
def sjf (self ):
143
139
"""
144
140
shortest job first: non-preemtive
@@ -158,10 +154,8 @@ def rr(self):
158
154
...
159
155
160
156
if __name__ == "__main__" :
161
- prcs = ["P1" ,"P2" ,"P3" ,"P4" ] #process
162
- at = [0 ,1 ,5 ,12 ] # arrival time
163
- bt = [2 ,2 ,3 ,4 ] # burst time
164
- shedule = CpuSheduling (prcs ,at ,bt )
165
- shedule .fcfs ()
166
-
167
-
157
+ prcs = ["P1" , "P2" , "P3" , "P4" ,"P5" ,"P6" ]
158
+ at = [4 , 19 , 2 , 3 ,2 , 2 ]
159
+ bt = [1 , 2 ,7 ,1 ,2 , 2 ]
160
+ s1 = CpuSheduling (prcs , at , bt )
161
+ s1 .fcfs ()
0 commit comments