Jag var i samma problem osäker på om du hittade en lösning eller inte men jag kunde åstadkomma något liknande genom att göra följande. Först lade jag till trigger i mitt bord
CREATE TRIGGER trigger_name
AFTER INSERT OR DELETE OR UPDATE
ON table_name
FOR EACH ROW
EXECUTE PROCEDURE trigger_function_name;
Detta kommer att sätta en trigger på bordet när en rad uppdateras, tas bort eller infogas. Då kommer den att anropa triggerfunktionen jag har ställt in som såg ut ungefär så här:
CREATE FUNCTION trigger_function_name
RETURNS trigger
LANGUAGE 'plpgsql'
COST 100
VOLATILE NOT LEAKPROOF
AS
$BODY$
DECLARE
payload JSON;
BEGIN
payload = row_to_json(NEW);
PERFORM pg_notify('notification_name', payload::text);
RETURN NULL;
END;
$BODY$;
Detta gör att jag kan "lyssna" på någon av dessa uppdateringar från mitt springboot-projekt och det kommer att skicka hela raden som en nyttolast. Därefter konfigurerade jag i mitt springboot-projekt en anslutning till min db.
@Configuration
@EnableR2dbcRepositories("com.(point to wherever repository is)")
public class R2DBCConfig extends AbstractR2dbcConfiguration {
@Override
@Bean
public ConnectionFactory connectionFactory() {
return new PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
.host("host")
.database("db")
.port(port)
.username("username")
.password("password")
.schema("schema")
.connectTimeout(Duration.ofMinutes(2))
.build());
}
}
Med det autoansluter jag (beroendeinjektion) den till konstruktorn i min serviceklass och castar den till en r2dbc PostgressqlConnection-klass så här:
this.postgresqlConnection = Mono.from(connectionFactory.create()).cast(PostgresqlConnection.class).block();
Nu vill vi "lyssna" på vårt bord och få ett meddelande när vi utför någon uppdatering av vårt bord. För att göra det ställer vi in en initialiseringsmetod som utförs efter beroendeinjektion genom att använda @PostContruct-kommentaren
@PostConstruct
private void postConstruct() {
postgresqlConnection.createStatement("LISTEN notification_name").execute()
.flatMap(PostgresqlResult::getRowsUpdated).subscribe();
}
Lägg märke till att vi lyssnar på vilket namn vi än lägger in i pg_notify-metoden. Vi vill också sätta upp en metod för att stänga anslutningen när bönan är på väg att kastas bort, så här:
@PreDestroy
private void preDestroy() {
postgresqlConnection.close().subscribe();
}
Nu skapar jag helt enkelt en metod som returnerar ett flöde av vad som för närvarande finns i min tabell, och jag slår också samman det med mina aviseringar, som jag sa innan aviseringarna kommer in som en json, så jag var tvungen att deserialisera den och jag bestämde mig för att använda ObjectMapper. Så det kommer att se ut ungefär så här:
private Flux<YourClass> getUpdatedRows() {
return postgresqlConnection.getNotifications().map(notification -> {
try {
//deserialize json
return objectMapper.readValue(notification.getParameter(), YourClass.class);
} catch (IOException e) {
//handle exception
}
});
}
public Flux<YourClass> getDocuments() {
return documentRepository.findAll().share().concatWith(getUpdatedRows());
}
Hoppas detta hjälper. Heja!