Problemet löst ! Jag kan inte fatta att jag har ägnat två hela dagar åt det här... Jag tittade helt åt fel håll.
Problemet var inte med någon dataflödes- eller GCP-nätverkskonfiguration, och så vitt jag kan säga...
är sant.
Problemet låg naturligtvis i min kod:bara problemet avslöjades endast i en distribuerad miljö. Jag hade gjort misstaget att öppna tunneln från huvudrörsprocessorn, istället för arbetarna. Så SSH-tunneln var uppe men inte mellan arbetarna och målservern, bara mellan huvudledningen och målet!
För att fixa detta var jag tvungen att ändra min begärande DoFn för att sluta köra frågan med tunneln :
class TunnelledSQLSourceDoFn(sql.SQLSourceDoFn):
"""Wraps SQLSourceDoFn in a ssh tunnel"""
def __init__(self, *args, **kwargs):
self.dbport = kwargs["port"]
self.dbhost = kwargs["host"]
self.args = args
self.kwargs = kwargs
super().__init__(*args, **kwargs)
def process(self, query, *args, **kwargs):
# Remote side of the SSH Tunnel
remote_address = (self.dbhost, self.dbport)
ssh_tunnel = (self.kwargs['ssh_host'], self.kwargs['ssh_port'])
with open_tunnel(
ssh_tunnel,
ssh_username=self.kwargs["ssh_user"],
ssh_password=self.kwargs["ssh_password"],
remote_bind_address=remote_address,
set_keepalive=10.0
) as tunnel:
forwarded_port = tunnel.local_bind_port
self.kwargs["port"] = forwarded_port
source = sql.SQLSource(*self.args, **self.kwargs)
sql.SQLSouceInput._build_value(source, source.runtime_params)
logging.info("Processing - {}".format(query))
for records, schema in source.client.read(query):
for row in records:
yield source.client.row_as_dict(row, schema)
som du kan se var jag tvungen att åsidosätta vissa bitar av pysql_beam-biblioteket.
Slutligen öppnar varje arbetare sin egen tunnel för varje begäran. Det går förmodligen att optimera detta beteende men det räcker för mina behov.