Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions prosimos/control_flow_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ def __init__(self):
self.gateway_execution_limit = 1000
self.simulation_execution_stats = SimulationExecutionStats()

def set_element_probabilities(self, element_probability, task_resource_probability):
self.element_probability = element_probability
self.task_resource_probability = task_resource_probability

def set_additional_fields_from_json(self, element_probability, task_resource_probability,
event_distribution, batch_processing, gateway_conditions,
gateway_execution_limit):
Expand Down Expand Up @@ -361,9 +365,13 @@ def update_process_state(self, case_id, e_id, p_state, completed_datetime_prev_e
return enabled_tasks, visited_at

def get_all_attributes(self, case_id):
if self.all_attributes is None:
return {}
all_current_attributes = {}
all_current_attributes.update(self.all_attributes["global"])
all_current_attributes.update(self.all_attributes[case_id])
if "global" in self.all_attributes:
all_current_attributes.update(self.all_attributes["global"])
if case_id in self.all_attributes:
all_current_attributes.update(self.all_attributes[case_id])
return all_current_attributes

def is_task_batched(self, task_id):
Expand Down Expand Up @@ -876,7 +884,9 @@ def _find_next(self, f_arc_and_duration, case_id, p_state, enabled_tasks, to_exe
to_execute.append(next_e)

def _check_and_update_enabling_time(self, p_case, e_id, enabled_at: CustomDatetimeAndSeconds):
if self.last_datetime[e_id][p_case] is None or self.last_datetime[e_id][p_case].datetime < enabled_at.datetime:
if p_case not in self.last_datetime[e_id]:
self.last_datetime[e_id][p_case] = enabled_at
elif self.last_datetime[e_id][p_case] is None or self.last_datetime[e_id][p_case].datetime < enabled_at.datetime:
self.last_datetime[e_id][p_case] = enabled_at
elif self.last_datetime[e_id][p_case].datetime > enabled_at.datetime:
enabled_at = self.last_datetime[e_id][p_case]
Expand Down
Empty file.
126 changes: 126 additions & 0 deletions prosimos/multitasking/multitasking_struct.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import random
from datetime import datetime

from pix_framework.discovery.probabilistic_multitasking.discovery import MultiType
from prosimos.simulation_queues_ds import PriorityQueue


class MultiTaskDS:
def __init__(self, mt_type: str, g_size: int = 0):
self.mt_type = MultiTaskDS._extract_type(mt_type)
self.g_size = g_size
self.total_granules = 1440 // g_size if g_size > 0 else 1
self.res_multitask_info = dict()
self.allocated_tasks = dict()
self.active_datetimes = dict()
self.relative_workload = dict()
self.expected_workload = dict()
self.executed_tasks = dict()
self.total_tasks = dict()

def init_relative_workload(self, task_res_distr: dict):
for t_id in task_res_distr:
total_workload = 0.0
for r_id in task_res_distr[t_id]:
if r_id not in self.relative_workload:
self.relative_workload[r_id] = dict()
self.executed_tasks[r_id] = dict()
total_workload += self.expected_workload[r_id]
for r_id in task_res_distr[t_id]:
if total_workload > 0:
self.relative_workload[r_id][t_id] = self.expected_workload[r_id] / total_workload
else:
self.relative_workload[r_id][t_id] = 0.0
self.executed_tasks[r_id][t_id] = 0
self.total_tasks[t_id] = 0

def update_expected_workload(self, r_id, workload):
self.expected_workload[r_id] = workload

def resource_workload(self, r_id, t_id):
if r_id in self.executed_tasks and self.total_tasks[t_id] > 0:
return self.executed_tasks[r_id][t_id] / self.total_tasks[t_id]
return 0.0

def workload_diff(self, r_id, t_id):
if r_id in self.executed_tasks:
return abs(self.resource_workload(r_id, t_id) - self.relative_workload[r_id][t_id])
return 0.0

def allocate_task_to(self, r_id: str, t_id: str, completed_at: datetime):
if r_id in self.allocated_tasks:
self.allocated_tasks[r_id] += 1
if self.active_datetimes[r_id] is None:
self.active_datetimes[r_id] = completed_at
else:
self.active_datetimes[r_id] = max(self.active_datetimes[r_id], completed_at)
self.executed_tasks[r_id][t_id] += 1
self.total_tasks[t_id] += 1

def release_tasks_from(self, r_id: str, completed_at: datetime):
if r_id in self.allocated_tasks and self.active_datetimes[r_id] is not None:
completed_at = max(self.active_datetimes[r_id], completed_at)
self.active_datetimes[r_id] = None
self.allocated_tasks[r_id] = 0
return completed_at

def can_get_new_tasks(self, r_id: str, at_datetime: datetime):
if r_id in self.allocated_tasks and self.allocated_tasks[r_id] > 0:
if self.mt_type is MultiType.GLOBAL:
if self.allocated_tasks[r_id] >= len(self.res_multitask_info[r_id]):
return False
return random.random() <= self.res_multitask_info[r_id][self.allocated_tasks[r_id]]
else:
wd = at_datetime.weekday()
gr = self.interval_index(at_datetime)
if self.allocated_tasks[r_id] >= len(self.res_multitask_info[r_id][wd][gr]):
return False
return random.random() <= self.res_multitask_info[r_id][wd][gr][self.allocated_tasks[r_id]]
return True

def register_resource(self, r_id: str):
if r_id not in self.res_multitask_info:
self.allocated_tasks[r_id] = 0
self.active_datetimes[r_id] = None
if self.mt_type is MultiType.GLOBAL:
self.res_multitask_info[r_id] = [0.0]
else:
total_granules = self.total_granules
self.res_multitask_info[r_id] = [[[0.0] for _ in range(total_granules)] for _ in range(7)]

def register_multitasks(self, r_id: str, task_freq: int, prob: float, week_day: int = None, granule: int = None):
if not 0 <= prob <= 1.0:
raise ValueError("Probability 'prob' must be between 0 and 1.0 inclusive")
if self.mt_type is MultiType.GLOBAL:
self.res_multitask_info[r_id].extend([prob] * (task_freq - len(self.res_multitask_info[r_id])))
self.res_multitask_info[r_id].append(prob)
else:
self.res_multitask_info[r_id][week_day][granule].extend(
[prob] * (task_freq - len(self.res_multitask_info[r_id][week_day][granule]))
)
self.res_multitask_info[r_id][week_day][granule].append(prob)

def register_local_multitasks(self, r_id: str, week_day: int, from_dt: datetime, to_dt: datetime, task_freq: int,
prob: float):
if week_day is None or not 0 <= week_day <= 6:
raise ValueError("Weekdays must be between 0 and 6 inclusive - from 0: MONDAY to 6: SUNDAY")
from_i = self.interval_index(from_dt)
to_i = self.interval_index(to_dt) - 1
if to_i < 0:
to_i = self.total_granules - 1

while from_i <= to_i:
self.register_multitasks(r_id, task_freq, prob, week_day, from_i)
from_i += 1

def interval_index(self, current_date: datetime):
return (current_date.hour * 60 + current_date.minute) // self.g_size

@staticmethod
def _extract_type(type_str: str):
upp = type_str.upper()
if upp == 'GLOBAL':
return MultiType.GLOBAL
elif upp == 'LOCAL':
return MultiType.LOCAL
raise ValueError("Multitasking Type (mt_type) must be GLOBAL or LOCAL")
93 changes: 80 additions & 13 deletions prosimos/simulation_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import numpy as np
from datetime import timedelta
from typing import List
import random

import pytz

Expand Down Expand Up @@ -211,13 +212,15 @@ def pop_and_allocate_resource(self, task_id: str, num_allocated_tasks: int):
return r_id, r_avail_at

def execute_task(self, c_event: EnabledEvent):
r_id, r_avail_at = self.pop_and_allocate_resource(c_event.task_id, 1)

r_avail_at = max(c_event.enabled_at, r_avail_at)
avail_datetime = self._datetime_from(r_avail_at)
is_working, _ = self.sim_setup.get_resource_calendar(r_id).is_working_datetime(avail_datetime)
if not is_working:
r_avail_at = r_avail_at + self.sim_setup.next_resting_time(r_id, avail_datetime)
if self.sim_setup.multitask_info is None:
r_id, r_avail_at = self.pop_and_allocate_resource(c_event.task_id, 1)
r_avail_at = max(c_event.enabled_at, r_avail_at)
avail_datetime = self._datetime_from(r_avail_at)
is_working, _ = self.sim_setup.get_resource_calendar(r_id).is_working_datetime(avail_datetime)
if not is_working:
r_avail_at = r_avail_at + self.sim_setup.next_resting_time(r_id, avail_datetime)
else:
r_id, r_avail_at = self.allocate_multitasking_resource(c_event)

full_evt = TaskEvent(
c_event.p_case,
Expand All @@ -231,13 +234,16 @@ def execute_task(self, c_event: EnabledEvent):

self.log_info.add_event_info(c_event.p_case, full_evt, self.sim_setup.resources_map[r_id].cost_per_hour)

r_next_available = full_evt.completed_at
if self.sim_setup.multitask_info is None:
r_next_available = full_evt.completed_at

if self.sim_resources[r_id].switching_time > 0:
r_next_available += self.sim_setup.next_resting_time(r_id, full_evt.completed_datetime)
if self.sim_resources[r_id].switching_time > 0:
r_next_available += self.sim_setup.next_resting_time(r_id, full_evt.completed_datetime)

self.resource_queue.update_resource_availability(r_id, r_next_available)
self.sim_resources[r_id].worked_time += full_evt.ideal_duration
self.resource_queue.update_resource_availability(r_id, r_next_available)
self.sim_resources[r_id].worked_time += full_evt.ideal_duration
else:
self.release_multitasking_resource(r_id, full_evt, r_avail_at)

self.update_attributes(c_event)
self.log_writer.add_csv_row(self.get_csv_row_data(full_evt))
Expand All @@ -247,6 +253,67 @@ def execute_task(self, c_event: EnabledEvent):

return completed_at, completed_datetime

def allocate_multitasking_resource(self, c_event: EnabledEvent):
r_id, r_avail_at = self.resource_queue.pop_resource_for(c_event.task_id)

candidates = [[r_id, r_avail_at]]
while r_avail_at is not None and r_avail_at <= c_event.enabled_at:
r_id, r_avail_at = self.resource_queue.pop_resource_for(c_event.task_id)
if r_id is not None:
candidates.append([r_id, r_avail_at])

if len(candidates) > 1:
i = random.randint(0, len(candidates) - 1)
[r_id, r_avail_at] = candidates[i]
for j in range(0, len(candidates)):
if j != i:
self.resource_queue.update_resource_availability(candidates[j][0], candidates[j][1])
elif r_id is None and len(candidates) == 1:
[r_id, r_avail_at] = candidates[0]

# best_r, best_avail = r_id, r_avail_at
# while r_avail_at is not None and r_avail_at <= c_event.enabled_at:
# c_workload = self.sim_setup.multitask_info.workload_diff(r_id, c_event.task_id)
# if c_workload > max_workload_diff:
# candidates.append([best_r, best_avail])
# best_r, best_avail = r_id, r_avail_at
# max_workload_diff = c_workload
# else:
# candidates.append([r_id, r_avail_at])
# r_id, r_avail_at = self.resource_queue.pop_resource_for(c_event.task_id)

# print(f'Selected: {best_r, best_avail}')
# if 'Loan Officer' in best_r and len(candidates) > 1:
# print("hola")
# if len(candidates) > 0:
# # for r in candidates:
# # print(r)
# # print("-------------------------------------")
# for j in range(0, len(candidates)):
# self.resource_queue.update_resource_availability(candidates[j][0], candidates[j][1])
#
# r_id, r_avail_at = best_r, best_avail
# self.sim_resources[r_id].allocated_tasks += 1
if c_event.enabled_at > r_avail_at:
next_avail_at = c_event.enabled_at
avail_datetime = self._datetime_from(next_avail_at)
is_working, _ = self.sim_setup.get_resource_calendar(r_id).is_working_datetime(avail_datetime)
if not is_working:
r_avail_at = r_avail_at + self.sim_setup.next_resting_time(r_id, avail_datetime)

return r_id, r_avail_at

def release_multitasking_resource(self, r_id: str, full_evt: TaskEvent, r_init_avail):
completed_dt = self._datetime_from(full_evt.completed_at)
self.sim_setup.multitask_info.allocate_task_to(r_id, full_evt.task_id, completed_dt)
r_next_avail = r_init_avail
if not self.sim_setup.multitask_info.can_get_new_tasks(r_id, completed_dt):
last_time = self.sim_setup.multitask_info.release_tasks_from(r_id, completed_dt)
r_next_avail += self.sim_setup.next_resting_time(r_id, last_time)

self.resource_queue.update_resource_availability(r_id, r_next_avail)
self.sim_resources[r_id].worked_time += full_evt.ideal_duration

def update_attributes(self, current_event):
event_attributes = self.sim_setup.all_attributes.event_attributes.attributes
global_event_attributes = self.sim_setup.all_attributes.global_event_attributes.attributes
Expand Down Expand Up @@ -623,7 +690,7 @@ def run_simulation(
diffsim_info.set_starting_datetime(starting_at_datetime)

if stat_out_path is None and log_out_path is None:
return run_simpy_simulation(diffsim_info, None, None)
return run_simpy_simulation(diffsim_info, None, None, fixed_arrival_times)

csv_writer_config = {
'delimiter': ',',
Expand Down
59 changes: 57 additions & 2 deletions prosimos/simulation_properties_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from prosimos.case_attributes import AllCaseAttributes, CaseAttribute
from prosimos.control_flow_manager import BPMN, EVENT_TYPE, BPMNGraph, ElementInfo
from prosimos.histogram_distribution import HistogramDistribution
from prosimos.multitasking.multitasking_struct import MultiTaskDS
from prosimos.prioritisation import AllPriorityRules
from prosimos.prioritisation_parser import PrioritisationParser
from prosimos.probability_distributions import Choice
Expand Down Expand Up @@ -41,6 +42,7 @@
EVENT_ATTRIBUTES = "event_attributes"
GLOBAL_ATTRIBUTES = "global_attributes"
GATEWAY_EXECUTION_LIMIT = "gateway_execution_limit"
MULTITASKING_SECTION = "multitask"

DEFAULT_GATEWAY_EXECUTION_LIMIT = 1000

Expand Down Expand Up @@ -120,6 +122,10 @@ def parse_json_sim_parameters(json_path):
if GATEWAY_EXECUTION_LIMIT in json_data \
else DEFAULT_GATEWAY_EXECUTION_LIMIT

multitasking_info = parse_multitasking_model(json_data[MULTITASKING_SECTION], task_resource_distribution) \
if MULTITASKING_SECTION in json_data \
else None

return (
resources_map,
calendars_map,
Expand All @@ -133,10 +139,59 @@ def parse_json_sim_parameters(json_path):
gateway_conditions,
all_attributes,
gateway_execution_limit,
model_type
model_type,
multitasking_info
)


def parse_multitasking_model(json_data, task_resource_distribution):
if json_data["type"] == "local":
return _parse_local_multitasking(json_data, task_resource_distribution)
elif json_data["type"] == "global":
return _parse_global_multitasking(json_data, task_resource_distribution)
return None


def _parse_global_multitasking(json_data, task_res_distr):
multi_info = MultiTaskDS(json_data["type"])
for res_info in json_data["values"]:
r_id = res_info["resource_id"]
multi_info.update_expected_workload(r_id, res_info["r_workload"])
multi_info.register_resource(r_id)
for mt_info in res_info["multitask_info"]:
multi_info.register_multitasks(r_id, mt_info["parallel_tasks"], mt_info["probability"])
# multi_info.init_relative_workload(task_res_distr)
return multi_info


def _parse_local_multitasking(json_data, task_res_distr):
multi_info = MultiTaskDS(
json_data["type"],
60
)
# multi_info = MultiTaskDS(
# json_data["type"],
# json_data["granule_size"]["value"] * granule_units[(json_data["granule_size"]["time_unit"]).upper()]
# )
for res_info in json_data["values"]:
r_id = res_info["resource_id"]
multi_info.register_resource(r_id)
multi_info.update_expected_workload(r_id, res_info["r_workload"])
for wd_info in res_info["weekly_probability"]:
for gr_info in wd_info:
time_periods = convert_to_fuzzy_time_periods(gr_info)
for p_info in time_periods:
for mt_info in gr_info["multitask_info"]:
multi_info.register_local_multitasks(r_id,
int_week_days[p_info["weekDay"]],
parse_datetime(p_info["beginTime"], False),
parse_datetime(p_info["endTime"], False),
mt_info["parallel_tasks"],
mt_info["probability"])
# multi_info.init_relative_workload(task_res_distr)
return multi_info


def parse_fuzzy_calendar(json_data):
granule_size = json_data["granule_size"]["value"] * granule_units[(json_data["granule_size"]["time_unit"]).upper()]
fuzzy_calendars = dict()
Expand Down Expand Up @@ -173,7 +228,7 @@ def convert_to_fuzzy_time_periods(time_period):
"weekDay": week_day,
"beginTime": time_period["beginTime"],
"endTime": time_period["endTime"],
"probability": time_period["probability"],
"probability": time_period["probability"] if "probability" in time_period else 0.0,
}
time_periods.append(time_period)

Expand Down
Loading
Loading