Package SimPy :: Module Lib
[hide private]
[frames] | no frames]

Source Code for Module SimPy.Lib

  1  # coding=utf-8 
  2  """ 
  3  This file contains Simerror, FatalSimerror, Process, SimEvent,  
  4  the resources Resource, Level and Storage  
  5  as well as their dependencies Buffer, Queue, FIFO and PriorityQ. 
  6  """ 
  7  # $Revision: 464 $ $Date: 2010-04-05 05:59:53 +0200 (Mon, 05 Apr 2010) $ kgm 
  8  # SimPy version: 2.1 
  9  import inspect 
 10  import new 
 11  import sys 
 12  import types 
 13   
 14  from SimPy.Lister import Lister 
 15  from SimPy.Recording import Monitor, Tally 
 16   
 17  # Required for backward compatiblity 
 18  import SimPy.Globals as Globals 
 19   
20 -class Simerror(Exception):
21 """ SimPy error which terminates "simulate" with an error message"""
22 - def __init__(self, value):
23 self.value = value
24
25 - def __str__(self):
26 return `self.value`
27
28 -class FatalSimerror(Simerror):
29 """ SimPy error which terminates script execution with an exception"""
30 - def __init__(self, value):
31 Simerror.__init__(self, value) 32 self.value = value
33
34 -class Process(Lister):
35 """Superclass of classes which may use generator functions"""
36 - def __init__(self, name = 'a_process', sim = None):
37 if sim is None: sim = Globals.sim # Use global simulation object if sim is None 38 self.sim = sim 39 #the reference to this Process instances single process (==generator) 40 self._nextpoint = None 41 if type(name) == type("m"): 42 self.name = name 43 else: 44 raise FatalSimerror("Process name parameter '%s' is not a string"%name) 45 self._nextTime = None #next activation time 46 self._remainService = 0 47 self._preempted = 0 48 self._priority={} 49 self._getpriority={} 50 self._putpriority={} 51 self._terminated = False 52 self._inInterrupt = False 53 self.eventsFired = [] #which events process waited / queued for occurred 54 if hasattr(sim, 'trace'): 55 self._doTracing = True 56 else: 57 self._doTracing = False
58
59 - def active(self):
60 return self._nextTime <> None and not self._inInterrupt
61
62 - def passive(self):
63 return self._nextTime is None and not self._terminated
64
65 - def terminated(self):
66 return self._terminated
67
68 - def interrupted(self):
69 return self._inInterrupt and not self._terminated
70
71 - def queuing(self, resource):
72 return self in resource.waitQ
73
74 - def cancel(self, victim):
75 """Application function to cancel all event notices for this Process 76 instance;(should be all event notices for the _generator_).""" 77 self.sim._unpost(whom = victim)
78
79 - def start(self, pem = None, at = 'undefined', delay = 'undefined', prior = False):
80 """Activates PEM of this Process. 81 p.start(p.pemname([args])[,{at = t | delay = period}][, prior = False]) or 82 p.start([p.ACTIONS()][,{at = t | delay = period}][, prior = False]) (ACTIONS 83 parameter optional) 84 """ 85 if pem is None: 86 try: 87 pem = self.ACTIONS() 88 except AttributeError: 89 raise FatalSimerror\ 90 ('no generator function to activate') 91 else: 92 pass 93 if not (type(pem) == types.GeneratorType): 94 raise FatalSimerror('activating function which'+ 95 ' is not a generator (contains no \'yield\')') 96 if not self._terminated and not self._nextTime: 97 #store generator reference in object; needed for reactivation 98 self._nextpoint = pem 99 if at == 'undefined': 100 at = self.sim._t 101 if delay == 'undefined': 102 zeit = max(self.sim._t, at) 103 else: 104 zeit = max(self.sim._t, self.sim._t + delay) 105 if self._doTracing: 106 self.sim.trace.recordActivate(who = self, when = zeit, 107 prior = prior) 108 self.sim._post(what = self, at = zeit, prior = prior)
109
110 - def _hold(self, a):
111 if len(a[0]) == 3: ## yield hold,self,delay 112 delay = a[0][2] 113 if delay < 0: 114 raise FatalSimerror('hold: delay time negative: %s, in %s' % ( 115 delay, str(a[0][1]))) 116 else: ## yield hold,self 117 delay = 0 118 who = a[1] 119 self.interruptLeft = delay 120 self._inInterrupt = False 121 self.interruptCause = None 122 self.sim._post(what = who, at = self.sim._t + delay)
123
124 - def _passivate(self, a):
125 a[0][1]._nextTime = None
126
127 - def interrupt(self, victim):
128 """Application function to interrupt active processes""" 129 # can't interrupt terminated / passive / interrupted process 130 if victim.active(): 131 if self._doTracing: 132 save = self.sim.trace._comment 133 self.sim.trace._comment = None 134 victim.interruptCause = self # self causes interrupt 135 left = victim._nextTime - self.sim._t 136 victim.interruptLeft = left # time left in current 'hold' 137 victim._inInterrupt = True 138 self.sim.reactivate(victim) 139 if self._doTracing: 140 self.sim.trace._comment = save 141 self.sim.trace.recordInterrupt(self, victim) 142 return left 143 else: #victim not active -- can't interrupt 144 return None
145
146 - def interruptReset(self):
147 """ 148 Application function for an interrupt victim to get out of 149 'interrupted' state. 150 """ 151 self._inInterrupt = False
152
153 - def acquired(self, res):
154 """Multi - functional test for reneging for 'request' and 'get': 155 (1)If res of type Resource: 156 Tests whether resource res was acquired when proces reactivated. 157 If yes, the parallel wakeup process is killed. 158 If not, process is removed from res.waitQ (reneging). 159 (2)If res of type Store: 160 Tests whether item(s) gotten from Store res. 161 If yes, the parallel wakeup process is killed. 162 If no, process is removed from res.getQ 163 (3)If res of type Level: 164 Tests whether units gotten from Level res. 165 If yes, the parallel wakeup process is killed. 166 If no, process is removed from res.getQ. 167 """ 168 if isinstance(res, Resource): 169 test = self in res.activeQ 170 if test: 171 self.cancel(self._holder) 172 else: 173 res.waitQ.remove(self) 174 if res.monitored: 175 res.waitMon.observe(len(res.waitQ),t = self.sim.now()) 176 return test 177 elif isinstance(res, Store): 178 test = len(self.got) 179 if test: 180 self.cancel(self._holder) 181 else: 182 res.getQ.remove(self) 183 if res.monitored: 184 res.getQMon.observe(len(res.getQ),t = self.sim.now()) 185 return test 186 elif isinstance(res, Level): 187 test = not (self.got is None) 188 if test: 189 self.cancel(self._holder) 190 else: 191 res.getQ.remove(self) 192 if res.monitored: 193 res.getQMon.observe(len(res.getQ),t = self.sim.now()) 194 return test
195
196 - def stored(self, buffer):
197 """Test for reneging for 'yield put . . .' compound statement (Level and 198 Store. Returns True if not reneged. 199 If self not in buffer.putQ, kill wakeup process, else take self out of 200 buffer.putQ (reneged)""" 201 test = self in buffer.putQ 202 if test: #reneged 203 buffer.putQ.remove(self) 204 if buffer.monitored: 205 buffer.putQMon.observe(len(buffer.putQ),t = self.sim.now()) 206 else: 207 self.cancel(self._holder) 208 return not test
209 210
211 -class SimEvent(Lister):
212 """Supports one - shot signalling between processes. All processes waiting for an event to occur 213 get activated when its occurrence is signalled. From the processes queuing for an event, only 214 the first gets activated. 215 """
216 - def __init__(self, name = 'a_SimEvent', sim = None):
217 if sim is None: sim = Globals.sim # Use global simulation if sim is None 218 self.sim = sim 219 self.name = name 220 self.waits = [] 221 self.queues = [] 222 self.occurred = False 223 self.signalparam = None 224 if hasattr(sim, 'trace'): 225 self._doTracing = True 226 else: 227 self._doTracing = False
228
229 - def signal(self, param = None):
230 """Produces a signal to self; 231 Fires this event (makes it occur). 232 Reactivates ALL processes waiting for this event. (Cleanup waits lists 233 of other events if wait was for an event - group (OR).) 234 Reactivates the first process for which event(s) it is queuing for 235 have fired. (Cleanup queues of other events if wait was for an event - group (OR).) 236 """ 237 self.signalparam = param 238 if self._doTracing: 239 self.sim.trace.recordSignal(self) 240 if not self.waits and not self.queues: 241 self.occurred = True 242 else: 243 #reactivate all waiting processes 244 for p in self.waits: 245 p[0].eventsFired.append(self) 246 self.sim.reactivate(p[0], prior = True) 247 #delete waits entries for this process in other events 248 for ev in p[1]: 249 if ev != self: 250 if ev.occurred: 251 p[0].eventsFired.append(ev) 252 for iev in ev.waits: 253 if iev[0] == p[0]: 254 ev.waits.remove(iev) 255 break 256 self.waits = [] 257 if self.queues: 258 proc = self.queues.pop(0)[0] 259 proc.eventsFired.append(self) 260 self.sim.reactivate(proc)
261
262 - def _wait(self, par):
263 """Consumes a signal if it has occurred, otherwise process 'proc' 264 waits for this event. 265 """ 266 proc = par[0][1] #the process issuing the yield waitevent command 267 # test that process and SimEvent belong to same Simulation instance 268 if __debug__: 269 if not (proc.sim == self.sim): 270 raise FatalSimerror,\ 271 "waitevent: Process %s, SimEvent %s not in "\ 272 "same Simulation instance"%(proc.name,self.name) 273 proc.eventsFired = [] 274 if not self.occurred: 275 self.waits.append([proc, [self]]) 276 proc._nextTime = None #passivate calling process 277 else: 278 proc.eventsFired.append(self) 279 self.occurred = False 280 self.sim._post(proc, at = self.sim._t, prior = 1)
281
282 - def _waitOR(self, par):
283 """Handles waiting for an OR of events in a tuple / list. 284 """ 285 proc = par[0][1] 286 evlist = par[0][2] 287 proc.eventsFired = [] 288 anyoccur = False 289 for ev in evlist: 290 # test that process and SimEvent belong to same Simulation instance 291 if __debug__: 292 if not (proc.sim == ev.sim): 293 raise FatalSimerror,\ 294 "waitevent: Process %s, SimEvent %s not in "\ 295 "same Simulation instance"%(proc.name,ev.name) 296 if ev.occurred: 297 anyoccur = True 298 proc.eventsFired.append(ev) 299 ev.occurred = False 300 if anyoccur: #at least one event has fired; continue process 301 self.sim._post(proc, at = self.sim._t, prior = 1) 302 303 else: #no event in list has fired, enter process in all 'waits' lists 304 proc.eventsFired = [] 305 proc._nextTime = None #passivate calling process 306 for ev in evlist: 307 ev.waits.append([proc, evlist])
308
309 - def _queue(self, par):
310 """Consumes a signal if it has occurred, otherwise process 'proc' 311 queues for this event. 312 """ 313 proc = par[0][1] #the process issuing the yield queueevent command 314 proc.eventsFired = [] 315 # test that process and SimEvent belong to same Simulation instance 316 if __debug__: 317 if not (proc.sim == self.sim): 318 raise FatalSimerror,\ 319 "queueevent: Process %s, SimEvent %s not in "\ 320 "same Simulation instance"%(proc.name,self.name) 321 if not self.occurred: 322 self.queues.append([proc, [self]]) 323 proc._nextTime = None #passivate calling process 324 else: 325 proc.eventsFired.append(self) 326 self.occurred = False 327 self.sim._post(proc, at = self.sim._t, prior = 1)
328
329 - def _queueOR(self, par):
330 """Handles queueing for an OR of events in a tuple / list. 331 """ 332 proc = par[0][1] 333 evlist = par[0][2] 334 proc.eventsFired = [] 335 anyoccur = False 336 for ev in evlist: 337 # test that process and SimEvent belong to same Simulation instance 338 if __debug__: 339 if not (proc.sim == ev.sim): 340 raise FatalSimerror,\ 341 "yield queueevent: Process %s, SimEvent %s not in "\ 342 "same Simulation instance"%(proc.name,ev.name) 343 if ev.occurred: 344 anyoccur = True 345 proc.eventsFired.append(ev) 346 ev.occurred = False 347 if anyoccur: #at least one event has fired; continue process 348 self.sim._post(proc, at = self.sim._t, prior = 1) 349 350 else: #no event in list has fired, enter process in all 'waits' lists 351 proc.eventsFired = [] 352 proc._nextTime = None #passivate calling process 353 for ev in evlist: 354 ev.queues.append([proc, evlist])
355 356
357 -class Queue(list):
358 - def __init__(self, res, moni):
359 if not moni is None: #moni == []: 360 self.monit = True # True if a type of Monitor / Tally attached 361 else: 362 self.monit = False 363 self.moni = moni # The Monitor / Tally 364 self.resource = res # the resource / buffer this queue belongs to
365
366 - def enter(self, obj):
367 pass
368
369 - def leave(self):
370 pass
371
372 - def takeout(self, obj):
373 self.remove(obj) 374 if self.monit: 375 self.moni.observe(len(self), t = self.moni.sim.now())
376
377 -class FIFO(Queue):
378 - def __init__(self, res, moni):
379 Queue.__init__(self, res, moni)
380
381 - def enter(self, obj):
382 self.append(obj) 383 if self.monit: 384 self.moni.observe(len(self),t = self.moni.sim.now())
385
386 - def enterGet(self, obj):
387 self.enter(obj)
388
389 - def enterPut(self, obj):
390 self.enter(obj)
391
392 - def leave(self):
393 a = self.pop(0) 394 if self.monit: 395 self.moni.observe(len(self),t = self.moni.sim.now()) 396 return a
397
398 -class PriorityQ(FIFO):
399 """Queue is always ordered according to priority. 400 Higher value of priority attribute == higher priority. 401 """
402 - def __init__(self, res, moni):
403 FIFO.__init__(self, res, moni)
404
405 - def enter(self, obj):
406 """Handles request queue for Resource""" 407 if len(self): 408 ix = self.resource 409 if self[-1]._priority[ix] >= obj._priority[ix]: 410 self.append(obj) 411 else: 412 z = 0 413 while self[z]._priority[ix] >= obj._priority[ix]: 414 z += 1 415 self.insert(z, obj) 416 else: 417 self.append(obj) 418 if self.monit: 419 self.moni.observe(len(self),t = self.moni.sim.now())
420
421 - def enterGet(self, obj):
422 """Handles getQ in Buffer""" 423 if len(self): 424 ix = self.resource 425 #print 'priority:', [x._priority[ix] for x in self] 426 if self[-1]._getpriority[ix] >= obj._getpriority[ix]: 427 self.append(obj) 428 else: 429 z = 0 430 while self[z]._getpriority[ix] >= obj._getpriority[ix]: 431 z += 1 432 self.insert(z, obj) 433 else: 434 self.append(obj) 435 if self.monit: 436 self.moni.observe(len(self),t = self.moni.sim.now())
437
438 - def enterPut(self, obj):
439 """Handles putQ in Buffer""" 440 if len(self): 441 ix = self.resource 442 #print 'priority:', [x._priority[ix] for x in self] 443 if self[-1]._putpriority[ix] >= obj._putpriority[ix]: 444 self.append(obj) 445 else: 446 z = 0 447 while self[z]._putpriority[ix] >= obj._putpriority[ix]: 448 z += 1 449 self.insert(z, obj) 450 else: 451 self.append(obj) 452 if self.monit: 453 self.moni.observe(len(self),t = self.moni.sim.now())
454
455 -class Resource(Lister):
456 """Models shared, limited capacity resources with queuing; 457 FIFO is default queuing discipline. 458 """ 459
460 - def __init__(self, capacity = 1, name = 'a_resource', unitName = 'units', 461 qType = FIFO, preemptable = 0, monitored = False, 462 monitorType = Monitor,sim=None):
463 """ 464 monitorType={Monitor(default) | Tally} 465 """ 466 467 if sim is None: sim = Globals.sim # Use global simulation if sim is Non 468 self.sim = sim 469 self.name = name # resource name 470 self.capacity = capacity # resource units in this resource 471 self.unitName = unitName # type name of resource units 472 self.n = capacity # uncommitted resource units 473 self.monitored = monitored 474 475 if self.monitored: # Monitor waitQ, activeQ 476 self.actMon = monitorType(name = 'Active Queue Monitor %s'%self.name, 477 ylab = 'nr in queue', tlab = 'time', 478 sim=self.sim) 479 monact = self.actMon 480 self.waitMon = monitorType(name = 'Wait Queue Monitor %s'%self.name, 481 ylab = 'nr in queue', tlab = 'time', 482 sim=self.sim) 483 monwait = self.waitMon 484 else: 485 monwait = None 486 monact = None 487 self.waitQ = qType(self, monwait) 488 self.preemptable = preemptable 489 self.activeQ = qType(self, monact) 490 self.priority_default = 0
491
492 - def _request(self, arg):
493 """Process request event for this resource""" 494 obj = arg[1] 495 # test that process and Resource belong to same Simulation instance 496 if __debug__: 497 if not (obj.sim == self.sim): 498 raise FatalSimerror,\ 499 "yield request: Process %s, Resource %s not in "\ 500 "same Simulation instance"%(obj.name,self.name) 501 if len(arg[0]) == 4: # yield request, self, resource, priority 502 obj._priority[self] = arg[0][3] 503 else: # yield request, self, resource 504 obj._priority[self] = self.priority_default 505 if self.preemptable and self.n == 0: # No free resource 506 # test for preemption condition 507 preempt = obj._priority[self] > self.activeQ[-1]._priority[self] 508 # If yes: 509 if preempt: 510 z = self.activeQ[-1] 511 # Keep track of preempt level 512 z._preempted += 1 513 # suspend lowest priority process being served 514 # record remaining service time at first preempt only 515 if z._preempted == 1: 516 z._remainService = z._nextTime - self.sim._t 517 # cancel only at first preempt 518 Process(sim=self.sim).cancel(z) 519 # remove from activeQ 520 self.activeQ.remove(z) 521 # put into front of waitQ 522 self.waitQ.insert(0, z) 523 # if self is monitored, update waitQ monitor 524 if self.monitored: 525 self.waitMon.observe(len(self.waitQ), self.sim.now()) 526 # passivate re - queued process 527 z._nextTime = None 528 # assign resource unit to preemptor 529 self.activeQ.enter(obj) 530 # post event notice for preempting process 531 self.sim._post(obj, at = self.sim._t, prior = 1) 532 else: 533 self.waitQ.enter(obj) 534 # passivate queuing process 535 obj._nextTime = None 536 else: # treat non - preemption case 537 if self.n == 0: 538 self.waitQ.enter(obj) 539 # passivate queuing process 540 obj._nextTime = None 541 else: 542 self.n -= 1 543 self.activeQ.enter(obj) 544 self.sim._post(obj, at = self.sim._t, prior = 1)
545
546 - def _release(self, arg):
547 """Process release request for this resource""" 548 actor = arg[1] 549 self.n += 1 550 self.activeQ.remove(arg[1]) 551 if self.monitored: 552 self.actMon.observe(len(self.activeQ),t = self.sim.now()) 553 #reactivate first waiting requestor if any; assign Resource to it 554 if self.waitQ: 555 obj = self.waitQ.leave() 556 self.n -= 1 #assign 1 resource unit to object 557 self.activeQ.enter(obj) 558 # if resource preemptable: 559 if self.preemptable: 560 # if object had been preempted: 561 if obj._preempted: 562 # keep track of preempt level 563 obj._preempted -= 1 564 # reactivate object delay = remaining service time 565 # but only, if all other preempts are over 566 if obj._preempted == 0: 567 self.sim.reactivate(obj, delay = obj._remainService, 568 prior = 1) 569 # else reactivate right away 570 else: 571 self.sim.reactivate(obj, delay = 0, prior = 1) 572 # else: 573 else: 574 self.sim.reactivate(obj, delay = 0, prior = 1) 575 self.sim._post(arg[1], at = self.sim._t, prior = 1)
576
577 -class Buffer(Lister):
578 """Abstract class for buffers 579 Blocks a process when a put would cause buffer overflow or a get would cause 580 buffer underflow. 581 Default queuing discipline for blocked processes is FIFO.""" 582 583 priorityDefault = 0
584 - def __init__(self, name = None, capacity = 'unbounded', unitName = 'units', 585 putQType = FIFO, getQType = FIFO, 586 monitored = False, monitorType = Monitor, initialBuffered = None, 587 sim = None):
588 if sim is None: sim = Globals.sim # Use global simulation if sim is None 589 self.sim = sim 590 if capacity == 'unbounded': capacity = sys.maxint 591 self.capacity = capacity 592 self.name = name 593 self.putQType = putQType 594 self.getQType = getQType 595 self.monitored = monitored 596 self.initialBuffered = initialBuffered 597 self.unitName = unitName 598 if self.monitored: 599 ## monitor for Producer processes' queue 600 self.putQMon = monitorType(name = 'Producer Queue Monitor %s'%self.name, 601 ylab = 'nr in queue', tlab = 'time', 602 sim=self.sim) 603 ## monitor for Consumer processes' queue 604 self.getQMon = monitorType(name = 'Consumer Queue Monitor %s'%self.name, 605 ylab = 'nr in queue', tlab = 'time', 606 sim=self.sim) 607 ## monitor for nr items in buffer 608 self.bufferMon = monitorType(name = 'Buffer Monitor %s'%self.name, 609 ylab = 'nr in buffer', tlab = 'time', 610 sim=self.sim) 611 else: 612 self.putQMon = None 613 self.getQMon = None 614 self.bufferMon = None 615 self.putQ = self.putQType(res = self, moni = self.putQMon) 616 self.getQ = self.getQType(res = self, moni = self.getQMon) 617 if self.monitored: 618 self.putQMon.observe(y = len(self.putQ),t = self.sim.now()) 619 self.getQMon.observe(y = len(self.getQ),t = self.sim.now()) 620 self._putpriority={} 621 self._getpriority={} 622 623 def _put(self): 624 pass
625 def _get(self): 626 pass
627
628 -class Level(Buffer):
629 """Models buffers for processes putting / getting un - distinguishable items. 630 """
631 - def getamount(self):
632 return self.nrBuffered
633
634 - def gettheBuffer(self):
635 return self.nrBuffered
636 637 theBuffer = property(gettheBuffer) 638
639 - def __init__(self,**pars):
640 Buffer.__init__(self,**pars) 641 if self.name is None: 642 self.name = 'a_level' ## default name 643 644 if (type(self.capacity) != type(1.0) and\ 645 type(self.capacity) != type(1)) or\ 646 self.capacity < 0: 647 raise FatalSimerror\ 648 ('Level: capacity parameter not a positive number: %s'\ 649 %self.initialBuffered) 650 651 if type(self.initialBuffered) == type(1.0) or\ 652 type(self.initialBuffered) == type(1): 653 if self.initialBuffered > self.capacity: 654 raise FatalSimerror('initialBuffered exceeds capacity') 655 if self.initialBuffered >= 0: 656 self.nrBuffered = self.initialBuffered ## nr items initially in buffer 657 ## buffer is just a counter (int type) 658 else: 659 raise FatalSimerror\ 660 ('initialBuffered param of Level negative: %s'\ 661 %self.initialBuffered) 662 elif self.initialBuffered is None: 663 self.initialBuffered = 0 664 self.nrBuffered = 0 665 else: 666 raise FatalSimerror\ 667 ('Level: wrong type of initialBuffered (parameter=%s)'\ 668 %self.initialBuffered) 669 if self.monitored: 670 self.bufferMon.observe(y = self.amount, t = self.sim.now())
671 amount = property(getamount) 672
673 - def _put(self, arg):
674 """Handles put requests for Level instances""" 675 obj = arg[1] 676 whichSim=self.sim 677 # test that process and Level belong to same Simulation instance 678 if __debug__: 679 if not (obj.sim == self.sim): 680 raise FatalSimerror,\ 681 "put: Process %s, Level %s not in "\ 682 "same Simulation instance"%(obj.name,self.name) 683 if len(arg[0]) == 5: # yield put, self, buff, whattoput, priority 684 obj._putpriority[self] = arg[0][4] 685 whatToPut = arg[0][3] 686 elif len(arg[0]) == 4: # yield get, self, buff, whattoput 687 obj._putpriority[self] = Buffer.priorityDefault #default 688 whatToPut = arg[0][3] 689 else: # yield get, self, buff 690 obj._putpriority[self] = Buffer.priorityDefault #default 691 whatToPut = 1 692 if type(whatToPut) != type(1) and type(whatToPut) != type(1.0): 693 raise FatalSimerror('Level: put parameter not a number') 694 if not whatToPut >= 0.0: 695 raise FatalSimerror('Level: put parameter not positive number') 696 whatToPutNr = whatToPut 697 if whatToPutNr + self.amount > self.capacity: 698 obj._nextTime = None #passivate put requestor 699 obj._whatToPut = whatToPutNr 700 self.putQ.enterPut(obj) #and queue, with size of put 701 else: 702 self.nrBuffered += whatToPutNr 703 if self.monitored: 704 self.bufferMon.observe(y = self.amount, t = self.sim.now()) 705 # service any getters waiting 706 # service in queue - order; do not serve second in queue before first 707 # has been served 708 while len(self.getQ) and self.amount > 0: 709 proc = self.getQ[0] 710 if proc._nrToGet <= self.amount: 711 proc.got = proc._nrToGet 712 self.nrBuffered -= proc.got 713 if self.monitored: 714 self.bufferMon.observe(y = self.amount, t = self.sim.now()) 715 self.getQ.takeout(proc) # get requestor's record out of queue 716 whichSim._post(proc, at = whichSim._t) # continue a blocked get requestor 717 else: 718 break 719 whichSim._post(obj, at = whichSim._t, prior = 1) # continue the put requestor
720
721 - def _get(self, arg):
722 """Handles get requests for Level instances""" 723 obj = arg[1] 724 # test that process and Store belong to same Simulation instance 725 if __debug__: 726 if not (obj.sim == self.sim): 727 raise FatalSimerror,\ 728 "get: Process %s, Level %s not in "\ 729 "same Simulation instance"%(obj.name,self.name) 730 obj.got = None 731 if len(arg[0]) == 5: # yield get, self, buff, whattoget, priority 732 obj._getpriority[self] = arg[0][4] 733 nrToGet = arg[0][3] 734 elif len(arg[0]) == 4: # yield get, self, buff, whattoget 735 obj._getpriority[self] = Buffer.priorityDefault #default 736 nrToGet = arg[0][3] 737 else: # yield get, self, buff 738 obj._getpriority[self] = Buffer.priorityDefault 739 nrToGet = 1 740 if type(nrToGet) != type(1.0) and type(nrToGet) != type(1): 741 raise FatalSimerror\ 742 ('Level: get parameter not a number: %s'%nrToGet) 743 if nrToGet < 0: 744 raise FatalSimerror\ 745 ('Level: get parameter not positive number: %s'%nrToGet) 746 if self.amount < nrToGet: 747 obj._nrToGet = nrToGet 748 self.getQ.enterGet(obj) 749 # passivate queuing process 750 obj._nextTime = None 751 else: 752 obj.got = nrToGet 753 self.nrBuffered -= nrToGet 754 if self.monitored: 755 self.bufferMon.observe(y = self.amount, t = self.sim.now()) 756 self.sim._post(obj, at = self.sim._t, prior = 1) 757 # reactivate any put requestors for which space is now available 758 # service in queue - order; do not serve second in queue before first 759 # has been served 760 while len(self.putQ): #test for queued producers 761 proc = self.putQ[0] 762 if proc._whatToPut + self.amount <= self.capacity: 763 self.nrBuffered += proc._whatToPut 764 if self.monitored: 765 self.bufferMon.observe(y = self.amount, t = self.sim.now()) 766 self.putQ.takeout(proc)#requestor's record out of queue 767 self.sim._post(proc, at = self.sim._t) # continue a blocked put requestor 768 else: 769 break
770
771 -class Store(Buffer):
772 """Models buffers for processes coupled by putting / getting distinguishable 773 items. 774 Blocks a process when a put would cause buffer overflow or a get would cause 775 buffer underflow. 776 Default queuing discipline for blocked processes is priority FIFO. 777 """
778 - def getnrBuffered(self):
779 return len(self.theBuffer)
780 nrBuffered = property(getnrBuffered) 781
782 - def getbuffered(self):
783 return self.theBuffer
784 buffered = property(getbuffered) 785
786 - def __init__(self,**pars):
787 Buffer.__init__(self,**pars) 788 self.theBuffer = [] 789 if self.name is None: 790 self.name = 'a_store' ## default name 791 if type(self.capacity) != type(1) or self.capacity <= 0: 792 raise FatalSimerror\ 793 ('Store: capacity parameter not a positive integer: %s'\ 794 %self.capacity) 795 if type(self.initialBuffered) == type([]): 796 if len(self.initialBuffered) > self.capacity: 797 raise FatalSimerror\ 798 ('Store: number initialBuffered exceeds capacity') 799 else: 800 ## buffer receives list of objects 801 self.theBuffer[:] = self.initialBuffered 802 elif self.initialBuffered is None: 803 self.theBuffer = [] 804 else: 805 raise FatalSimerror\ 806 ('Store: initialBuffered not a list') 807 if self.monitored: 808 self.bufferMon.observe(y = self.nrBuffered, t = self.sim.now()) 809 self._sort = None
810 811 812
813 - def addSort(self, sortFunc):
814 """Adds buffer sorting to this instance of Store. It maintains 815 theBuffer sorted by the sortAttr attribute of the objects in the 816 buffer. 817 The user - provided 'sortFunc' must look like this: 818 819 def mySort(self, par): 820 tmplist = [(x.sortAttr, x) for x in par] 821 tmplist.sort() 822 return [x for (key, x) in tmplist] 823 824 """ 825 826 self._sort = new.instancemethod(sortFunc, self, self.__class__) 827 self.theBuffer = self._sort(self.theBuffer)
828
829 - def _put(self, arg):
830 """Handles put requests for Store instances""" 831 obj = arg[1] 832 # test that process and Store belong to same Simulation instance 833 if __debug__: 834 if not (obj.sim == self.sim): 835 raise FatalSimerror,\ 836 "put: Process %s, Store %s not in "\ 837 "same Simulation instance"%(obj.name,self.name) 838 whichSim=self.sim 839 if len(arg[0]) == 5: # yield put, self, buff, whattoput, priority 840 obj._putpriority[self] = arg[0][4] 841 whatToPut = arg[0][3] 842 elif len(arg[0]) == 4: # yield put, self, buff, whattoput 843 obj._putpriority[self] = Buffer.priorityDefault #default 844 whatToPut = arg[0][3] 845 else: # error, whattoput missing 846 raise FatalSimerror('Item to put missing in yield put stmt') 847 if type(whatToPut) != type([]): 848 raise FatalSimerror('put parameter is not a list') 849 whatToPutNr = len(whatToPut) 850 if whatToPutNr + self.nrBuffered > self.capacity: 851 obj._nextTime = None #passivate put requestor 852 obj._whatToPut = whatToPut 853 self.putQ.enterPut(obj) #and queue, with items to put 854 else: 855 self.theBuffer.extend(whatToPut) 856 if not(self._sort is None): 857 self.theBuffer = self._sort(self.theBuffer) 858 if self.monitored: 859 self.bufferMon.observe(y = self.nrBuffered, t = whichSim.now()) 860 861 # service any waiting getters 862 # service in queue order: do not serve second in queue before first 863 # has been served 864 while self.nrBuffered > 0 and len(self.getQ): 865 proc = self.getQ[0] 866 if inspect.isfunction(proc._nrToGet): 867 movCand = proc._nrToGet(self.theBuffer) #predicate parameter 868 if movCand: 869 proc.got = movCand[:] 870 for i in movCand: 871 self.theBuffer.remove(i) 872 self.getQ.takeout(proc) 873 if self.monitored: 874 self.bufferMon.observe( 875 y = self.nrBuffered, t = whichSim._t) 876 whichSim._post(what = proc, at = whichSim._t) # continue a blocked get requestor 877 else: 878 break 879 else: #numerical parameter 880 if proc._nrToGet <= self.nrBuffered: 881 nrToGet = proc._nrToGet 882 proc.got = [] 883 proc.got[:] = self.theBuffer[0:nrToGet] 884 self.theBuffer[:] = self.theBuffer[nrToGet:] 885 if self.monitored: 886 self.bufferMon.observe( 887 y = self.nrBuffered, t = whichSim._t) 888 # take this get requestor's record out of queue: 889 self.getQ.takeout(proc) 890 whichSim._post(what = proc, at = whichSim._t) # continue a blocked get requestor 891 else: 892 break 893 894 whichSim._post(what = obj, at = whichSim._t, prior = 1) # continue the put requestor
895
896 - def _get(self, arg):
897 """Handles get requests""" 898 filtfunc = None 899 obj = arg[1] 900 # test that process and Store belong to same Simulation instance 901 if __debug__: 902 if not (obj.sim == self.sim): 903 raise FatalSimerror,\ 904 "get: Process %s, Store %s not in "\ 905 "same Simulation instance"%(obj.name,self.name) 906 whichSim=obj.sim 907 obj.got = [] # the list of items retrieved by 'get' 908 if len(arg[0]) == 5: # yield get, self, buff, whattoget, priority 909 obj._getpriority[self] = arg[0][4] 910 if inspect.isfunction(arg[0][3]): 911 filtfunc = arg[0][3] 912 else: 913 nrToGet = arg[0][3] 914 elif len(arg[0]) == 4: # yield get, self, buff, whattoget 915 obj._getpriority[self] = Buffer.priorityDefault #default 916 if inspect.isfunction(arg[0][3]): 917 filtfunc = arg[0][3] 918 else: 919 nrToGet = arg[0][3] 920 else: # yield get, self, buff 921 obj._getpriority[self] = Buffer.priorityDefault 922 nrToGet = 1 923 if not filtfunc: #number specifies nr items to get 924 if nrToGet < 0: 925 raise FatalSimerror\ 926 ('Store: get parameter not positive number: %s'%nrToGet) 927 if self.nrBuffered < nrToGet: 928 obj._nrToGet = nrToGet 929 self.getQ.enterGet(obj) 930 # passivate / block queuing 'get' process 931 obj._nextTime = None 932 else: 933 for i in range(nrToGet): 934 obj.got.append(self.theBuffer.pop(0)) # move items from 935 # buffer to requesting process 936 if self.monitored: 937 self.bufferMon.observe(y = self.nrBuffered, t = whichSim.now()) 938 whichSim._post(obj, at = whichSim._t, prior = 1) 939 # reactivate any put requestors for which space is now available 940 # serve in queue order: do not serve second in queue before first 941 # has been served 942 while len(self.putQ): 943 proc = self.putQ[0] 944 if len(proc._whatToPut) + self.nrBuffered <= self.capacity: 945 for i in proc._whatToPut: 946 self.theBuffer.append(i) #move items to buffer 947 if not(self._sort is None): 948 self.theBuffer = self._sort(self.theBuffer) 949 if self.monitored: 950 self.bufferMon.observe( 951 y = self.nrBuffered, t = whichSim.now()) 952 self.putQ.takeout(proc) # dequeue requestor's record 953 whichSim._post(proc, at = whichSim._t) # continue a blocked put requestor 954 else: 955 break 956 else: # items to get determined by filtfunc 957 movCand = filtfunc(self.theBuffer) 958 if movCand: # get succeded 959 whichSim._post(obj, at = whichSim._t, prior = 1) 960 obj.got = movCand[:] 961 for item in movCand: 962 self.theBuffer.remove(item) 963 if self.monitored: 964 self.bufferMon.observe(y = self.nrBuffered, t = whichSim.now()) 965 # reactivate any put requestors for which space is now available 966 # serve in queue order: do not serve second in queue before first 967 # has been served 968 while len(self.putQ): 969 proc = self.putQ[0] 970 if len(proc._whatToPut) + self.nrBuffered <= self.capacity: 971 for i in proc._whatToPut: 972 self.theBuffer.append(i) #move items to buffer 973 if not(self._sort is None): 974 self.theBuffer = self._sort(self.theBuffer) 975 if self.monitored: 976 self.bufferMon.observe( 977 y = self.nrBuffered, t = whichSim.now()) 978 self.putQ.takeout(proc) # dequeue requestor's record 979 whichSim._post(proc, at = whichSim._t) # continue a blocked put requestor 980 else: 981 break 982 else: # get did not succeed, block 983 obj._nrToGet = filtfunc 984 self.getQ.enterGet(obj) 985 # passivate / block queuing 'get' process 986 obj._nextTime = None
987