I min förra artikel pratade jag om fördelarna med att implementera asynkron bearbetning med Service Broker i SQL Server framför de andra metoderna som finns för frikopplad bearbetning av långa uppgifter. I den här artikeln går vi igenom alla komponenter som behöver konfigureras för en grundläggande Service Broker-konfiguration i en enda databas, och de viktiga övervägandena för konversationshanteringen mellan mäklartjänster. För att komma igång måste vi skapa en databas och aktivera databasen för användning av Service Broker:
SKAPA DATABAS AsyncProcessingDemo;GO IF (SELECT is_broker_enabled FROM sys.databases WHERE name =N'AsyncProcessingDemo') =0BÖRJA ÄNDRA DATABAS AsyncProcessingDemo SET ENABLE_BROKER;ENDGO USE AsyncProcessingDemo;Konfigurera mäklarkomponenter
De grundläggande objekten som behöver skapas i databasen är meddelandetyperna för meddelandena, ett kontrakt som definierar hur meddelandena ska skickas mellan tjänsterna, en kö och initiatortjänsten samt en kö och måltjänsten. Många exempel online för servicemäklare visar komplexa objektnamn för meddelandetyper, kontrakt och tjänster för Service Broker. Det finns dock inget krav på att namnen ska vara komplexa, och enkla objektnamn kan användas för vilket som helst av objekten.
För meddelandena måste vi skapa en meddelandetyp för begäran, som kommer att kallas
AsyncRequest
, och en meddelandetyp för resultatet, som kommer att kallasAsyncResult
. Båda kommer att använda XML som kommer att valideras som korrekt format av mäklartjänsterna för att skicka och ta emot de data som krävs av tjänsterna.-- Skapa meddelandetyperna CREATE MESSAGE TYPE [AsyncRequest] VALIDATION =WELL_FORMED_XML;CREATE MEDDELANDE TYPE [AsyncResult] VALIDATION =WELL_FORMED_XML;Kontraktet anger att
AsyncRequest
kommer att skickas av den initierande tjänsten till måltjänsten och att måltjänsten returnerar ettAsyncResult
meddelande tillbaka till den initierande tjänsten. Kontraktet kan också ange flera meddelandetyper för initiatorn och målet, eller att en specifik meddelandetyp kan skickas av vilken tjänst som helst, om den specifika behandlingen kräver det.-- Skapa kontraktet SKAPA KONTRAKT [AsyncContract] ( [AsyncRequest] SÄCKT AV INITIATOR, [AsyncResult] SÄCKT AV MÅL);För var och en av tjänsterna måste en kö skapas för att tillhandahålla lagring av de meddelanden som tas emot av tjänsten. Måltjänsten dit begäran kommer att skickas måste skapas med angivande av
AsyncContract
för att tillåta att meddelanden skickas till tjänsten. I det här fallet heter tjänstenProcessingService
och kommer att skapas iProcessingQueue
i databasen. Den initierande tjänsten kräver inte att ett kontrakt anges, vilket gör att den endast kan ta emot meddelanden som svar på en konversation som initierades från den.-- Skapa bearbetningskön och tjänsten - ange kontraktet för att tillåta sändning till tjänstenCREATE QUEUE ProcessingQueue;CREATE SERVICE [ProcessingService] ON QUEUE ProcessingQueue ([AsyncContract]); -- Skapa förfrågningskön och tjänst CREATE QUEUE RequestQueue; CREATE SERVICE [RequestService] ON QUEUE RequestQueue;Skicka ett meddelande för bearbetning
Som jag förklarade i föregående artikel, föredrar jag att implementera en omslagslagrad procedur för att skicka ett nytt meddelande till en mäklartjänst, så att det kan ändras en gång för att skala prestanda om det behövs. Denna procedur är ett enkelt inslag kring att skapa en ny konversation och skicka meddelandet till
ProcessingService
.-- Skapa omslagsproceduren för att skicka meddelanden CREATE PROCEDURE dbo.SendBrokerMessage @FromService SYSNAME, @ToService SYSNAME, @Contract SYSNAME, @MessageType SYSNAME, @MessageBody XMLASBEGIN SET NOCOUNT ON; DEKLARE @conversation_handle UNIQUEIDENTIFIER; BÖRJA TRANSAKTIONEN; BÖRJA DIALOG KONVERSATION @conversation_handle FRÅN SERVICE @FromService TILL SERVICE @ToService PÅ KONTRAKT @Kontrakt MED KRYPTERING =AV; SKICKA PÅ KONVERSATION @conversation_handle MEDDELANDETYP @MessageType(@MessageBody); BETA TRANSAKTION;ENDGOMed hjälp av den lagrade proceduren för wrapper kan vi nu skicka ett testmeddelande till
ProcessingService
för att verifiera att vi har ställt in mäklartjänsterna korrekt.-- Skicka en begäranEXECUTE dbo.SendBrokerMessage @FromService =N'RequestService', @ToService =N'ProcessingService', @Contract =N'AsyncContract', @MessageType =N'AsyncRequest', @MessageBody =N''; -- Sök efter meddelande om bearbetningsköSELECT CAST(message_body AS XML) FROM ProcessingQueue;GO 12345 Bearbetar meddelanden
Även om vi kunde bearbeta meddelanden manuellt från
ProcessingQueue
, vi vill förmodligen att meddelandena ska behandlas automatiskt när de skickas tillProcessingService
. För att göra detta måste en lagrad aktiveringsprocedur skapas som vi testar och sedan binder till kön för att automatisera behandlingen vid köaktivering. För att behandla ett meddelande måste viRECEIVE
meddelandet från kön i en transaktion, tillsammans med meddelandetypen och konversationshandtaget för meddelandet. Meddelandetypen säkerställer att lämplig logik tillämpas på meddelandet som behandlas, och konversationshandtaget tillåter att ett svar skickas tillbaka till den initierande tjänsten när meddelandet har bearbetats.
RECEIVE
kommandot låter ett enda meddelande eller flera meddelanden inom samma konversationshandtag eller grupp bearbetas i en enda transaktion. För att bearbeta flera meddelanden måste en tabellvariabel användas, eller för att bearbeta enstaka meddelanden kan en lokal variabel användas. Aktiveringsproceduren nedan hämtar ett enstaka meddelande från kön, kontrollerar meddelandetypen för att avgöra om det är enAsyncRequest
meddelande och utför sedan den långa processen baserat på den mottagna meddelandeinformationen. Om det inte tar emot ett meddelande i slingan, kommer det att vänta upp till 5000ms, eller 5 sekunder, på att ett annat meddelande kommer in i kön innan den lämnar slingan och avslutar dess exekvering. Efter att ha bearbetat ett meddelande bygger det ettAsyncResult
meddelande och skickar tillbaka det till initiativtagaren på samma konversationshandtag som meddelandet togs emot från. Proceduren kontrollerar också meddelandetypen för att avgöra om enEndDialog
ellerError
meddelande har tagits emot för att rensa upp konversationen genom att avsluta den.-- Skapa bearbetningsprocedur för bearbetning av köCREATE PROCEDURE dbo.ProcessingQueueActivationASBEGIN SET NOCOUNT ON; DEKLARE @conversation_handle UNIQUEIDENTIFIER; DECLARE @message_body XML; DECLARE @message_type_name sysname; MEDAN (1=1) BÖRJA BÖRJA TRANSAKTIONEN; WAITFOR ( RECECIVE TOPP (1) @conversation_handle =konversationshandtag, @message_body =CAST(meddelande_kropp AS XML), @meddelandetypnamn =meddelandetypnamn FRÅN ProcessingQueue ), TIMEOUT 5000; IF (@@ROWCOUNT =0) BÖRJA ÅTERKALLA TRANSAKTION; HA SÖNDER; END IF @message_type_name =N'AsyncRequest' BÖRJA -- Hantera komplex lång bearbetning här -- För demonstration hämtar vi kontonumret och skickar endast ett svar tillbaka DECLARE @AccountNumber INT =@message_body.value('(AsyncRequest/AccountNumber) [1]', 'INT'); -- Skapa svarsmeddelande och skicka tillbaka DECLARE @reply_message_body XML =N' ' + CAST(@AccountNumber AS NVARCHAR(11)) + ' '; SKICKA PÅ KONVERSATION @conversation_handle MEDDELANDETYP [AsyncResult] (@reply_message_body); END -- Om dialogrutan avslutas, avsluta dialogen ELSE IF @message_type_name =N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' BEGIN END CONVERSATION @conversation_handle; END -- Om felmeddelande, logga och avsluta konversationen ELSE IF @message_type_name =N'http://schemas.microsoft.com/SQL/ServiceBroker/Error' BEGIN -- Logga felkoden och utför eventuell hantering här -- Sluta konversationen för felet END CONVERSATION @conversation_handle; AVSLUTA ÅTAGANDE TRANSAKTION; ENDENDGO
RequestQueue
kommer också att behöva bearbeta meddelandena som skickas till den, så en ytterligare procedur för att bearbetaAsyncResult
meddelanden som returneras av ProcessingQueueActivation-proceduren måste skapas. Eftersom vi vet att AsnycResult-meddelandet betyder att allt bearbetningsarbete har slutförts, kan konversationen avslutas när vi bearbetar det meddelandet, vilket kommer att skicka ett EndDialog-meddelande till ProcessingService, som sedan kommer att bearbetas av dess aktiveringsprocedur för att avsluta konversation städar upp allt och undviker elden och glömmer problem som händer när konversationer avslutas ordentligt.-- Skapa procedur för att bearbeta svar på förfrågningskönCREATE PROCEDURE dbo.RequestQueueActivationASBEGIN SET NOCOUNT ON; DEKLARE @conversation_handle UNIQUEIDENTIFIER; DECLARE @message_body XML; DECLARE @message_type_name sysname; MEDAN (1=1) BÖRJA BÖRJA TRANSAKTIONEN; WAITFOR ( RECEIVE TOPP (1) @conversation_handle =conversation_handle, @message_body =CAST(message_body AS XML), @message_type_name =message_type_name FROM RequestQueue ), TIMEOUT 5000; IF (@@ROWCOUNT =0) BÖRJA ÅTERKALLA TRANSAKTION; HA SÖNDER; END IF @message_type_name =N'AsyncResult' BEGIN -- Om nödvändigt hantera svarsmeddelandet här DECLARE @AccountNumber INT =@message_body.value('(AsyncResult/AccountNumber)[1]', 'INT'); -- Eftersom detta är allt arbete som görs, avsluta konversationen för att skicka EndDialog-meddelandet END CONVERSATION @conversation_handle; END -- Om dialogrutan avslutas, avsluta dialogen ELSE IF @message_type_name =N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' BEGIN END CONVERSATION @conversation_handle; END -- Om felmeddelande, logga och avsluta konversationen ELSE IF @message_type_name =N'http://schemas.microsoft.com/SQL/ServiceBroker/Error' BÖRJA AVSLUTA KONVERSATION @conversation_handle; AVSLUTA ÅTAGANDE TRANSAKTION; ENDENDGOTesta procedurerna
Innan köbehandlingen för våra tjänster automatiseras är det viktigt att testa aktiveringsprocedurerna för att säkerställa att de behandlar meddelandena på rätt sätt och för att förhindra att en kö inaktiveras om ett fel skulle uppstå som inte hanteras korrekt. Eftersom det redan finns ett meddelande i
ProcessingQueue
ProcessingQueueActivation
procedur kan utföras för att bearbeta det meddelandet. Tänk på attWAITFOR
kommer att göra att proceduren tar 5 sekunder att avslutas, trots att meddelandet bearbetas direkt från kön. Efter att ha bearbetat meddelandet kan vi verifiera att proceduren fungerade korrekt genom att frågaRequestQueue
för att se om ettAsyncResult
meddelandet finns, och sedan kan vi verifiera attRequestQueueActivation
proceduren fungerar korrekt genom att utföra den.-- Bearbeta meddelandet från bearbetningskönEXECUTE dbo.ProcessingQueueActivation;GO -- Sök efter svarsmeddelande på begäran queueSELECT CAST(meddelande_kropp AS XML) FROM RequestQueue;GO -- Bearbeta meddelandet från begäran queueEXECUTE dbo.RequestQueueActivation;GOAutomatisera bearbetningen
Vid det här laget är alla komponenter färdiga för att helt automatisera vår bearbetning. Det enda som återstår är att binda aktiveringsprocedurerna till deras lämpliga köer och sedan skicka ett nytt testmeddelande för att validera att det bearbetas och ingenting finns kvar i köerna efteråt.
-- Ändra bearbetningskön för att specificera intern aktiveringALTER QUEUE ProcessingQueue MED AKTIVERING ( STATUS =PÅ, PROCEDURE_NAME =dbo.ProcessingQueueActivation, MAX_QUEUE_READERS =10, UTFÖR SOM SJÄLV );GO -- Ändra den interna begärandeskön för att specificera QUEUEALTERQueue MED AKTIVERING ( STATUS =PÅ, PROCEDURE_NAME =dbo.RequestQueueActivation, MAX_QUEUE_READERS =10, UTFÖR SOM SJÄLV );GO -- Testa automatiserad aktivering-- Skicka en begäran UTFÖR dbo.SendBrokerMessage @FromService =', @ToService N', @ToServiceService ProcessingService', @Contract =N'AsyncContract', @MessageType =N'AsyncRequest', @MessageBody =N''; -- Sök efter meddelande på bearbetningskö -- ingenting finns där eftersom det automatiskt bearbetades VÄLJ CAST(meddelande_kropp AS XML) FRÅN ProcessingQueue;GO -- Sök efter svarsmeddelande på begäran kö -- ingenting finns där eftersom det automatiskt bearbetades SELECT CAST(meddelande_kropp AS XML) FRÅN RequestQueue;GO 12345 Sammanfattning
De grundläggande komponenterna för automatiserad asynkron bearbetning i SQL Server Service Broker kan konfigureras i en enda databasuppställning för att möjliggöra frikopplad bearbetning av långvariga uppgifter. Detta kan vara ett kraftfullt verktyg för att förbättra applikationsprestanda, utifrån en slutanvändares upplevelse, genom att koppla bearbetningen från slutanvändarens interaktion med applikationen.