REDIGERA 2018-01-27:
Det visar sig att det här problemet är relaterat till DirectRunner. Om du kör samma pipeline med DataflowRunner bör du få batcher som faktiskt är upp till 1 000 poster. DirectRunner skapar alltid paket i storlek 1 efter en grupperingsoperation.
Ursprungligt svar:
Jag har stött på samma problem när jag skrev till molndatabaser med Apache Beams JdbcIO. Problemet är att även om JdbcIO stöder att skriva upp till 1 000 poster i en batch, har jag aldrig sett den skriva mer än en rad åt gången (jag måste erkänna:Det här använde alltid DirectRunner i en utvecklingsmiljö).
Jag har därför lagt till en funktion till JdbcIO där du själv kan styra storleken på batcherna genom att gruppera dina data och skriva varje grupp som en batch. Nedan är ett exempel på hur man använder den här funktionen baserat på det ursprungliga WordCount-exemplet av Apache Beam.
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
// Count words in input file(s)
.apply(new CountWords())
// Format as text
.apply(MapElements.via(new FormatAsTextFn()))
// Make key-value pairs with the first letter as the key
.apply(ParDo.of(new FirstLetterAsKey()))
// Group the words by first letter
.apply(GroupByKey.<String, String> create())
// Get a PCollection of only the values, discarding the keys
.apply(ParDo.of(new GetValues()))
// Write the words to the database
.apply(JdbcIO.<String> writeIterable()
.withDataSourceConfiguration(
JdbcIO.DataSourceConfiguration.create(options.getJdbcDriver(), options.getURL()))
.withStatement(INSERT_OR_UPDATE_SQL)
.withPreparedStatementSetter(new WordCountPreparedStatementSetter()));
Skillnaden mot den normala skrivmetoden för JdbcIO är den nya metoden writeIterable()
som tar en PCollection<Iterable<RowT>>
som indata istället för PCollection<RowT>
. Varje Iterable skrivs som en batch till databasen.
Versionen av JdbcIO med detta tillägg kan hittas här:https://github.com/olavloite/beam/blob/JdbcIOIterableWrite/sdks/java/io/jdbc/src/main/java /org/apache/beam/sdk/io/jdbc/JdbcIO.java
Hela exempelprojektet som innehåller exemplet ovan finns här:https://github.com/ olavloite/spanner-beam-example
(Det finns också en pull-begäran som väntar på Apache Beam för att inkludera detta i projektet)