sql >> Databasteknik >  >> RDS >> Database

Konfigurera Service Broker för asynkron bearbetning

I min förra artikel pratade jag om fördelarna med att implementera asynkron bearbetning med Service Broker i SQL Server framför de andra metoderna som finns för frikopplad bearbetning av långa uppgifter. I den här artikeln går vi igenom alla komponenter som behöver konfigureras för en grundläggande Service Broker-konfiguration i en enda databas, och de viktiga övervägandena för konversationshanteringen mellan mäklartjänster. För att komma igång måste vi skapa en databas och aktivera databasen för användning av Service Broker:

SKAPA DATABAS AsyncProcessingDemo;GO IF (SELECT is_broker_enabled FROM sys.databases WHERE name =N'AsyncProcessingDemo') =0BÖRJA ÄNDRA DATABAS AsyncProcessingDemo SET ENABLE_BROKER;ENDGO USE AsyncProcessingDemo; 

Konfigurera mäklarkomponenter

De grundläggande objekten som behöver skapas i databasen är meddelandetyperna för meddelandena, ett kontrakt som definierar hur meddelandena ska skickas mellan tjänsterna, en kö och initiatortjänsten samt en kö och måltjänsten. Många exempel online för servicemäklare visar komplexa objektnamn för meddelandetyper, kontrakt och tjänster för Service Broker. Det finns dock inget krav på att namnen ska vara komplexa, och enkla objektnamn kan användas för vilket som helst av objekten.

För meddelandena måste vi skapa en meddelandetyp för begäran, som kommer att kallas AsyncRequest , och en meddelandetyp för resultatet, som kommer att kallas AsyncResult . Båda kommer att använda XML som kommer att valideras som korrekt format av mäklartjänsterna för att skicka och ta emot de data som krävs av tjänsterna.

-- Skapa meddelandetyperna CREATE MESSAGE TYPE [AsyncRequest] VALIDATION =WELL_FORMED_XML;CREATE MEDDELANDE TYPE [AsyncResult] VALIDATION =WELL_FORMED_XML;

Kontraktet anger att AsyncRequest kommer att skickas av den initierande tjänsten till måltjänsten och att måltjänsten returnerar ett AsyncResult meddelande tillbaka till den initierande tjänsten. Kontraktet kan också ange flera meddelandetyper för initiatorn och målet, eller att en specifik meddelandetyp kan skickas av vilken tjänst som helst, om den specifika behandlingen kräver det.

-- Skapa kontraktet SKAPA KONTRAKT [AsyncContract] ( [AsyncRequest] SÄCKT AV INITIATOR, [AsyncResult] SÄCKT AV MÅL);

För var och en av tjänsterna måste en kö skapas för att tillhandahålla lagring av de meddelanden som tas emot av tjänsten. Måltjänsten dit begäran kommer att skickas måste skapas med angivande av AsyncContract för att tillåta att meddelanden skickas till tjänsten. I det här fallet heter tjänsten ProcessingService och kommer att skapas i ProcessingQueue i databasen. Den initierande tjänsten kräver inte att ett kontrakt anges, vilket gör att den endast kan ta emot meddelanden som svar på en konversation som initierades från den.

-- Skapa bearbetningskön och tjänsten - ange kontraktet för att tillåta sändning till tjänstenCREATE QUEUE ProcessingQueue;CREATE SERVICE [ProcessingService] ON QUEUE ProcessingQueue ([AsyncContract]); -- Skapa förfrågningskön och tjänst CREATE QUEUE RequestQueue; CREATE SERVICE [RequestService] ON QUEUE RequestQueue;

Skicka ett meddelande för bearbetning

Som jag förklarade i föregående artikel, föredrar jag att implementera en omslagslagrad procedur för att skicka ett nytt meddelande till en mäklartjänst, så att det kan ändras en gång för att skala prestanda om det behövs. Denna procedur är ett enkelt inslag kring att skapa en ny konversation och skicka meddelandet till ProcessingService .

-- Skapa omslagsproceduren för att skicka meddelanden CREATE PROCEDURE dbo.SendBrokerMessage @FromService SYSNAME, @ToService SYSNAME, @Contract SYSNAME, @MessageType SYSNAME, @MessageBody XMLASBEGIN SET NOCOUNT ON; DEKLARE @conversation_handle UNIQUEIDENTIFIER; BÖRJA TRANSAKTIONEN; BÖRJA DIALOG KONVERSATION @conversation_handle FRÅN SERVICE @FromService TILL SERVICE @ToService PÅ KONTRAKT @Kontrakt MED KRYPTERING =AV; SKICKA PÅ KONVERSATION @conversation_handle MEDDELANDETYP @MessageType(@MessageBody); BETA TRANSAKTION;ENDGO

Med hjälp av den lagrade proceduren för wrapper kan vi nu skicka ett testmeddelande till ProcessingService för att verifiera att vi har ställt in mäklartjänsterna korrekt.

-- Skicka en begäranEXECUTE dbo.SendBrokerMessage @FromService =N'RequestService', @ToService =N'ProcessingService', @Contract =N'AsyncContract', @MessageType =N'AsyncRequest', @MessageBody =N'12345'; -- Sök efter meddelande om bearbetningsköSELECT CAST(message_body AS XML) FROM ProcessingQueue;GO

Bearbetar meddelanden

Även om vi kunde bearbeta meddelanden manuellt från ProcessingQueue , vi vill förmodligen att meddelandena ska behandlas automatiskt när de skickas till ProcessingService . För att göra detta måste en lagrad aktiveringsprocedur skapas som vi testar och sedan binder till kön för att automatisera behandlingen vid köaktivering. För att behandla ett meddelande måste vi RECEIVE meddelandet från kön i en transaktion, tillsammans med meddelandetypen och konversationshandtaget för meddelandet. Meddelandetypen säkerställer att lämplig logik tillämpas på meddelandet som behandlas, och konversationshandtaget tillåter att ett svar skickas tillbaka till den initierande tjänsten när meddelandet har bearbetats.

RECEIVE kommandot låter ett enda meddelande eller flera meddelanden inom samma konversationshandtag eller grupp bearbetas i en enda transaktion. För att bearbeta flera meddelanden måste en tabellvariabel användas, eller för att bearbeta enstaka meddelanden kan en lokal variabel användas. Aktiveringsproceduren nedan hämtar ett enstaka meddelande från kön, kontrollerar meddelandetypen för att avgöra om det är en AsyncRequest meddelande och utför sedan den långa processen baserat på den mottagna meddelandeinformationen. Om det inte tar emot ett meddelande i slingan, kommer det att vänta upp till 5000ms, eller 5 sekunder, på att ett annat meddelande kommer in i kön innan den lämnar slingan och avslutar dess exekvering. Efter att ha bearbetat ett meddelande bygger det ett AsyncResult meddelande och skickar tillbaka det till initiativtagaren på samma konversationshandtag som meddelandet togs emot från. Proceduren kontrollerar också meddelandetypen för att avgöra om en EndDialog eller Error meddelande har tagits emot för att rensa upp konversationen genom att avsluta den.

-- Skapa bearbetningsprocedur för bearbetning av köCREATE PROCEDURE dbo.ProcessingQueueActivationASBEGIN SET NOCOUNT ON; DEKLARE @conversation_handle UNIQUEIDENTIFIER; DECLARE @message_body XML; DECLARE @message_type_name sysname; MEDAN (1=1) BÖRJA BÖRJA TRANSAKTIONEN; WAITFOR ( RECECIVE TOPP (1) @conversation_handle =konversationshandtag, @message_body =CAST(meddelande_kropp AS XML), @meddelandetypnamn =meddelandetypnamn FRÅN ProcessingQueue ), TIMEOUT 5000; IF (@@ROWCOUNT =0) BÖRJA ÅTERKALLA TRANSAKTION; HA SÖNDER; END IF @message_type_name =N'AsyncRequest' BÖRJA -- Hantera komplex lång bearbetning här -- För demonstration hämtar vi kontonumret och skickar endast ett svar tillbaka DECLARE @AccountNumber INT =@message_body.value('(AsyncRequest/AccountNumber) [1]', 'INT'); -- Skapa svarsmeddelande och skicka tillbaka DECLARE @reply_message_body XML =N' ' + CAST(@AccountNumber AS NVARCHAR(11)) + ' '; SKICKA PÅ KONVERSATION @conversation_handle MEDDELANDETYP [AsyncResult] (@reply_message_body); END -- Om dialogrutan avslutas, avsluta dialogen ELSE IF @message_type_name =N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' BEGIN END CONVERSATION @conversation_handle; END -- Om felmeddelande, logga och avsluta konversationen ELSE IF @message_type_name =N'http://schemas.microsoft.com/SQL/ServiceBroker/Error' BEGIN -- Logga felkoden och utför eventuell hantering här -- Sluta konversationen för felet END CONVERSATION @conversation_handle; AVSLUTA ÅTAGANDE TRANSAKTION; ENDENDGO

RequestQueue kommer också att behöva bearbeta meddelandena som skickas till den, så en ytterligare procedur för att bearbeta AsyncResult meddelanden som returneras av ProcessingQueueActivation-proceduren måste skapas. Eftersom vi vet att AsnycResult-meddelandet betyder att allt bearbetningsarbete har slutförts, kan konversationen avslutas när vi bearbetar det meddelandet, vilket kommer att skicka ett EndDialog-meddelande till ProcessingService, som sedan kommer att bearbetas av dess aktiveringsprocedur för att avsluta konversation städar upp allt och undviker elden och glömmer problem som händer när konversationer avslutas ordentligt.

-- Skapa procedur för att bearbeta svar på förfrågningskönCREATE PROCEDURE dbo.RequestQueueActivationASBEGIN SET NOCOUNT ON; DEKLARE @conversation_handle UNIQUEIDENTIFIER; DECLARE @message_body XML; DECLARE @message_type_name sysname; MEDAN (1=1) BÖRJA BÖRJA TRANSAKTIONEN; WAITFOR ( RECEIVE TOPP (1) @conversation_handle =conversation_handle, @message_body =CAST(message_body AS XML), @message_type_name =message_type_name FROM RequestQueue ), TIMEOUT 5000; IF (@@ROWCOUNT =0) BÖRJA ÅTERKALLA TRANSAKTION; HA SÖNDER; END IF @message_type_name =N'AsyncResult' BEGIN -- Om nödvändigt hantera svarsmeddelandet här DECLARE @AccountNumber INT =@message_body.value('(AsyncResult/AccountNumber)[1]', 'INT'); -- Eftersom detta är allt arbete som görs, avsluta konversationen för att skicka EndDialog-meddelandet END CONVERSATION @conversation_handle; END -- Om dialogrutan avslutas, avsluta dialogen ELSE IF @message_type_name =N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' BEGIN END CONVERSATION @conversation_handle; END -- Om felmeddelande, logga och avsluta konversationen ELSE IF @message_type_name =N'http://schemas.microsoft.com/SQL/ServiceBroker/Error' BÖRJA AVSLUTA KONVERSATION @conversation_handle; AVSLUTA ÅTAGANDE TRANSAKTION; ENDENDGO

Testa procedurerna

Innan köbehandlingen för våra tjänster automatiseras är det viktigt att testa aktiveringsprocedurerna för att säkerställa att de behandlar meddelandena på rätt sätt och för att förhindra att en kö inaktiveras om ett fel skulle uppstå som inte hanteras korrekt. Eftersom det redan finns ett meddelande i ProcessingQueue ProcessingQueueActivation procedur kan utföras för att bearbeta det meddelandet. Tänk på att WAITFOR kommer att göra att proceduren tar 5 sekunder att avslutas, trots att meddelandet bearbetas direkt från kön. Efter att ha bearbetat meddelandet kan vi verifiera att proceduren fungerade korrekt genom att fråga RequestQueue för att se om ett AsyncResult meddelandet finns, och sedan kan vi verifiera att RequestQueueActivation proceduren fungerar korrekt genom att utföra den.

-- Bearbeta meddelandet från bearbetningskönEXECUTE dbo.ProcessingQueueActivation;GO -- Sök efter svarsmeddelande på begäran queueSELECT CAST(meddelande_kropp AS XML) FROM RequestQueue;GO -- Bearbeta meddelandet från begäran queueEXECUTE dbo.RequestQueueActivation;GO 

Automatisera bearbetningen

Vid det här laget är alla komponenter färdiga för att helt automatisera vår bearbetning. Det enda som återstår är att binda aktiveringsprocedurerna till deras lämpliga köer och sedan skicka ett nytt testmeddelande för att validera att det bearbetas och ingenting finns kvar i köerna efteråt.

-- Ändra bearbetningskön för att specificera intern aktiveringALTER QUEUE ProcessingQueue MED AKTIVERING ( STATUS =PÅ, PROCEDURE_NAME =dbo.ProcessingQueueActivation, MAX_QUEUE_READERS =10, UTFÖR SOM SJÄLV );GO -- Ändra den interna begärandeskön för att specificera QUEUEALTERQueue MED AKTIVERING ( STATUS =PÅ, PROCEDURE_NAME =dbo.RequestQueueActivation, MAX_QUEUE_READERS =10, UTFÖR SOM SJÄLV );GO -- Testa automatiserad aktivering-- Skicka en begäran UTFÖR dbo.SendBrokerMessage @FromService =', @ToService N', @ToServiceService ProcessingService', @Contract =N'AsyncContract', @MessageType =N'AsyncRequest', @MessageBody =N'12345'; -- Sök efter meddelande på bearbetningskö -- ingenting finns där eftersom det automatiskt bearbetades VÄLJ CAST(meddelande_kropp AS XML) FRÅN ProcessingQueue;GO -- Sök efter svarsmeddelande på begäran kö -- ingenting finns där eftersom det automatiskt bearbetades SELECT CAST(meddelande_kropp AS XML) FRÅN RequestQueue;GO

Sammanfattning

De grundläggande komponenterna för automatiserad asynkron bearbetning i SQL Server Service Broker kan konfigureras i en enda databasuppställning för att möjliggöra frikopplad bearbetning av långvariga uppgifter. Detta kan vara ett kraftfullt verktyg för att förbättra applikationsprestanda, utifrån en slutanvändares upplevelse, genom att koppla bearbetningen från slutanvändarens interaktion med applikationen.


  1. Få ett objekts ID från dess namn i SQL Server:OBJECT_ID()

  2. Hur hittar jag de sämst presterande frågorna i SQL Server 2008?

  3. Hur man ändrar en standard MySQL/MariaDB-datakatalog i Linux

  4. Hur man använder främmande nyckel i oracle