threading.Semaphore (etwas länglich)

28/03/2011 - 22:10 von Thomas Rachel | Report spam
Hallo,

ich wollte in einem Programm ein Alternativkonzept zu Workerthreads, die
eine Jobqueue abarbeiten, implementieren, nàmlich Threads, die selbst
Jobs sind und sich mit selbigem beenden.

Implementierung ist diese: Jobs werden als Threads gestartet, nachdem
das acquire() eines Semaphore, das der Beschrànkung der gleichzeitig
laufenden Threadanzahl dient, zurückkehrenderweise dem Starten
zugestimmt hat. Das zugehörige release() geschieht innerhalb der Jobs,
die damit sagen "ich habe fertig, der Nàchste bitte."

Dummerweise beendet sich das Testprogramm nach Drücken von Strg-C nicht
immer/meistens nicht.

Daraufhin hab ich mir die Implementierung von threading.Semaphore mal
genau angeschaut und fand dabei diesen Code:

def acquire(self, blocking=1):
rc = False
self.__cond.acquire() [1]
[...]
self.__cond.release() [2]
return rc

__enter__ = acquire

def release(self):
self.__cond.acquire() [3]
[...]
self.__cond.release()

Anhand der Exception wurde [1] des Mainthreads unterbrochen (bzw. nach
Beendigung kam KeyboardInterrupt). [2] wurde nie erreicht, worauf sich
die Threads ihrerseits an [3] festgebissen haben.

Bei den anderen Klassen sind diese "Helper-Lock-Aufrufe" schön in ein
try...finally gepackt. Dort würde nach einem Strg-C immerhin noch das
self.__cond.release() aufgerufen werden.

Zunàchst dachte ich daran, einen Bugreport zu verfassen. Aber macht das
die Sache besser? Denn nun kann es ja immer noch passieren, daß zwischen
finally: und release() eine solche Exception kommt - oder? Zwar
wesentlich unwahrscheinlicher, da das Zeitfenster viel kleiner ist, aber
nicht unmöglich, was diesen Fehler noch subtiler macht.


Daher folgende Fragen:

0. (so zuallererst und prinzipiell) Ist das, was ich mache,
grundsàtzlich "böse"? Also release() woanders als acquire() bei einem
Semaphore? Ich dachte eigentlich, genau für solche Fàlle sind Semaphoren
ja eigentlich konzipiert...

1. Lohnt sich ein Bugreport, oder würde das die Sache nicht besser
machen, sondern den Fehler nur noch subtiler?

2. Ist es generell ein Fehler, im MainThread mit Semaphoren (oder
generell mit Locks) herumzuwerkeln? Oder sollte man das tunlichst in
einem Extra-Thread laufenlassen, der ja die KeyboardInterrupt-Exceptioon
dann nicht bekommt und darüber zu einem geeigneten Zeitpunkt
benachrichtigt werden kann?


Unten ist noch ein Minimalbeispiel (t.py), das alle drei
Verhaltensweisen demonstriert.

python -m t 1 führt das m. E. fehlerhafte Verhalten aus: Strg-C beendet
den MainThread und làßt die anderen an dem Lock verhungern.

python -m t 2 lagert das Spawnen in einen separaten Thread aus. Der
Main.Thread reagiert auf Strg-C mit run = False und sagt den andern
Threads damit, sie sollen sich bei Gelegenheit mal zur Ruhe begeben.

python -m t 3 verbessert Semaphore(), indem self.__cond.release() in
einem finally: ausgeführt wird.

python -m t 4 jedoch "zerstört" das schöne neue Konzept, indem an einer
ungünstigen Stelle eine Assertion geworfen wird (anstelle eines
KeyboardInterrupt).


Die große Frage also: Bug(report) wegen finally: oder falsch angewendet?

TIA und Gruß,
Thomas


import threading
import time
import random

run = True

sema = threading.Semaphore(5)
def target():
print threading.current_thread(), ' works'
time.sleep(max(random.normalvariate(2, .4), 0))
sema.release()

class MySema(threading._Semaphore):
def acquire(self, blocking=1):
rc = False
try:
self._Semaphore__cond.acquire()
while self._Semaphore__value == 0:
if not blocking:
break
if __debug__:
self._note("%s.acquire(%s): blocked waiting, value=%s",
self, blocking, self._Semaphore__value)
self._Semaphore__cond.wait()
else:
self._Semaphore__value = self._Semaphore__value - 1
if __debug__:
self._note("%s.acquire: success, value=%s",
self, self._Semaphore__value)
rc = True
finally:
self._Semaphore__cond.release()
return rc

__enter__ = acquire

def release(self):
try:
self._Semaphore__cond.acquire()
self._Semaphore__value = self._Semaphore__value + 1
if __debug__:
self._note("%s.release: success, value=%s",
self, self._Semaphore__value)
self._Semaphore__cond.notify()
finally:
self._Semaphore__cond.release()

def thread():
t = threading.Thread(target=target)
t.start()
return t

def erroneous():
global run
try:
while True:
sema.acquire()
t = thread()
print 'main thread spawned', t
finally:
run = False

def extrathread():
global run
def extra():
while run:
sema.acquire()
t = thread()
print 'control thread spawned', t
threading.Thread(target=extra).start()
try:
while True:
time.sleep(.1)
finally:
run = False

def alternative():
global sema
sema = MySema(5)
erroneous()


if __name__ == '__main__':
import sys
if len(sys.argv) < 2 or sys.argv[1] == '1':
f = erroneous
elif sys.argv[1] == '2':
f = extrathread
else:
f = alternative
print f
f()
 

Lesen sie die antworten

#1 Christopher Arndt
29/03/2011 - 16:36 | Warnen spam
Hallo Thomas,

interessantes Problem, auch wenn dein Post einige Verwirrung aufwirft.

Vorweg: wenn es um Threads geht, bekomme ich leicht Knoten im Hirn, wenn
ich hier also Unsinn erzàhlen sollte, korrigiert mich bitte.

Zunàchst zu deinem Beispielcode:

- 4 als Kommandozeilenparameter ruft die selbe Funktion wie 3 auf. Wo
soll da ein AssertionError geworfen werden?

- Das Semaphore als global Variable finde ich unschön. Warum übergibst
du es nicht den Threads als Parameter?

- Das Setzen von 'run' in erroneous() ist überflüssig, oder?

- Statt try/finally könnte man in MySema.acquire() auch das with
statement verwenden.

Thomas Rachel schrieb:
Zunàchst dachte ich daran, einen Bugreport zu verfassen. Aber macht das
die Sache besser? Denn nun kann es ja immer noch passieren, daß zwischen
finally: und release() eine solche Exception kommt - oder? Zwar
wesentlich unwahrscheinlicher, da das Zeitfenster viel kleiner ist, aber
nicht unmöglich, was diesen Fehler noch subtiler macht.



So gut kenne ich mich mit den Interna des Interpreters nicht aus, aber
m.E. kann da zwischen finally und release() nix passieren, das sollte
der GIl verhindern.

Zu guter letzt, welches Problem willst du mit deinem Ansatz eigentlich
lösen? Warum willst du nicht das Job Queue/WorkerThread Paradigma
verwenden? Geht es dir nur darum, dass der Main Thread keine
Even-Poll-Loop haben soll? Das wàre m.E. sowieso nur sinnvoll, wenn der
Main Thread sonst keine Aufgaben zu erledigen hat.

Den Beispielen in der Dokumentation des threading-Moduls entnehme ich,
dass derjenige, der ein Semaphore 'acquired' es auch wieder 'releasen'
sollte. Das würde dir Gefahr der Situation, wie sie in deinem Code
entsteht verringern aber nicht ausschließen. Ich denke also schon, dass
du den Bug melden solltest. Und falls er als invalid abgelehnt werden
sollte, führt es vielleicht wenigstens dazu, dass die Dokumentation
pràzisiert wird.


Chris

Ähnliche fragen