Jag tror att TypeError
kommer från multiprocessing
s get
.
Jag har tagit bort all DB-kod från ditt skript. Ta en titt på det här:
import multiprocessing
import sqlalchemy.exc
def do(kwargs):
i = kwargs['i']
print i
raise sqlalchemy.exc.ProgrammingError("", {}, None)
return i
pool = multiprocessing.Pool(processes=5) # start 4 worker processes
results = []
arglist = []
for i in range(10):
arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append) # evaluate "f(10)" asynchronously
# Use get or wait?
# r.get()
r.wait()
pool.close()
pool.join()
print results
Använder r.wait
returnerar det förväntade resultatet, men med r.get
höjer TypeError
. Som beskrivs i pythons dokument
, använd r.wait
efter en map_async
.
Redigera :Jag måste ändra mitt tidigare svar. Jag tror nu på TypeError
kommer från SQLAlchemy. Jag har ändrat mitt skript för att återskapa felet.
Redigera 2 :Det verkar som att problemet är att multiprocessing.pool
spelar inte bra om någon arbetare höjer ett undantag vars konstruktor kräver en parameter (se även här
).
Jag har ändrat mitt manus för att markera detta.
import multiprocessing
class BadExc(Exception):
def __init__(self, a):
'''Non-optional param in the constructor.'''
self.a = a
class GoodExc(Exception):
def __init__(self, a=None):
'''Optional param in the constructor.'''
self.a = a
def do(kwargs):
i = kwargs['i']
print i
raise BadExc('a')
# raise GoodExc('a')
return i
pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
# set a timeout in order to be able to catch C-c
r.get(1e100)
except KeyboardInterrupt:
pass
print results
I ditt fall, med tanke på att din kod ger upphov till ett SQLAlchemy-undantag, är den enda lösningen jag kan komma på att fånga alla undantag i do
funktion och återskapa ett normalt Exception
istället. Något så här:
import multiprocessing
class BadExc(Exception):
def __init__(self, a):
'''Non-optional param in the constructor.'''
self.a = a
def do(kwargs):
try:
i = kwargs['i']
print i
raise BadExc('a')
return i
except Exception as e:
raise Exception(repr(e))
pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
# set a timeout in order to be able to catch C-c
r.get(1e100)
except KeyboardInterrupt:
pass
print results
Redigera 3 :så det verkar vara en bugg med Python , men korrekta undantag i SQLAlchemy skulle lösa det:därför har jag upptäckt problemet med SQLAlchemy också.
Som en lösning på problemet tror jag lösningen i slutet av Redigera 2 skulle göra (omsluta callbacks i try-except och re-raise).