sql >> Databasteknik >  >> RDS >> PostgreSQL

Häng i Python-skriptet med SQLAlchemy och multiprocessing

Jag tror att TypeError kommer från multiprocessing s get .

Jag har tagit bort all DB-kod från ditt skript. Ta en titt på det här:

import multiprocessing
import sqlalchemy.exc

def do(kwargs):
    i = kwargs['i']
    print i
    raise sqlalchemy.exc.ProgrammingError("", {}, None)
    return i


pool = multiprocessing.Pool(processes=5)               # start 4 worker processes
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append) # evaluate "f(10)" asynchronously

# Use get or wait?
# r.get()
r.wait()

pool.close()
pool.join()
print results

Använder r.wait returnerar det förväntade resultatet, men med r.get höjer TypeError . Som beskrivs i pythons dokument , använd r.wait efter en map_async .

Redigera :Jag måste ändra mitt tidigare svar. Jag tror nu på TypeError kommer från SQLAlchemy. Jag har ändrat mitt skript för att återskapa felet.

Redigera 2 :Det verkar som att problemet är att multiprocessing.pool spelar inte bra om någon arbetare höjer ett undantag vars konstruktor kräver en parameter (se även här ).

Jag har ändrat mitt manus för att markera detta.

import multiprocessing

class BadExc(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        self.a = a

class GoodExc(Exception):
    def __init__(self, a=None):
        '''Optional param in the constructor.'''
        self.a = a

def do(kwargs):
    i = kwargs['i']
    print i
    raise BadExc('a')
    # raise GoodExc('a')
    return i

pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
    # set a timeout in order to be able to catch C-c
    r.get(1e100)
except KeyboardInterrupt:
    pass
print results

I ditt fall, med tanke på att din kod ger upphov till ett SQLAlchemy-undantag, är den enda lösningen jag kan komma på att fånga alla undantag i do funktion och återskapa ett normalt Exception istället. Något så här:

import multiprocessing

class BadExc(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        self.a = a

def do(kwargs):
    try:
        i = kwargs['i']
        print i
        raise BadExc('a')
        return i
    except Exception as e:
        raise Exception(repr(e))

pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
    # set a timeout in order to be able to catch C-c
    r.get(1e100)
except KeyboardInterrupt:
    pass
print results

Redigera 3 :så det verkar vara en bugg med Python , men korrekta undantag i SQLAlchemy skulle lösa det:därför har jag upptäckt problemet med SQLAlchemy också.

Som en lösning på problemet tror jag lösningen i slutet av Redigera 2 skulle göra (omsluta callbacks i try-except och re-raise).



  1. DB Design för att lagra anpassade fält för en tabell

  2. Byt värden för två kolumner mellan två tabeller

  3. Grundläggande DB-anslutningspool med Java och Tomcat 7

  4. Uppdatera innehåll automatiskt om databasen ändras