- Created by Alexander Toufexis, last modified on Jun 01, 2018
You are viewing an old version of this page. View the current version.
Compare with Current View Page History
« Previous Version 3 Next »
CODE FROM: http://www.madeiradata.com/service-broker-asynchronous-triggers/
INSTALL SCRIPTS
Install procs and triggers.
Expand source
/* =========================================================================== Service Broker Sample 2: Asynchronous Triggers =========================================================================== This script creates a set of objects that allow the easy implementation and usage of Asynchronous Triggers. Usage: 1. Run this script on the database where you want the asynchronous trigger(s). 2. Create a stored procedure which will receive the following two parameters: @inserted XML, @deleted XML This procedure will be responsible for parsing the inserted/deleted data and executing the actual trigger logic based on that data. 3. Inside the actual table trigger, use the following code: DECLARE @inserted XML, @deleted XML; SELECT @inserted = ( SELECT * FROM inserted FOR XML PATH('row'), ROOT('inserted') ); SELECT @deleted = ( SELECT * FROM deleted FOR XML PATH('row'), ROOT('deleted') ); EXECUTE SB_AT_Fire_Trigger '{YourProcedureName}', @inserted, @deleted; But replace {YourProcedureName} with the name of the procedure you've created in step 2. You may review script SB_AT_Sample for an AdventureWorks2008R2 example. =========================================================================== Copyright: Eitan Blumin (C) 2013 Email: eitan@madeira.co.il Source: www.madeira.co.il Disclaimer: The author is not responsible for any damage this script or any of its variations may cause. Do not execute it or any variations of it on production environments without first verifying its validity on controlled testing and/or QA environments. You may use this script at your own risk and may change it to your liking, as long as you leave this disclaimer header fully intact and unchanged. =========================================================================== */ -- Creation of the table to hold SB logs IF OBJECT_ID('SB_AT_ServiceBrokerLogs') IS NULL BEGIN CREATE TABLE SB_AT_ServiceBrokerLogs ( LogID BIGINT IDENTITY(1,1) NOT NULL, LogDate DATETIME NOT NULL DEFAULT (GETDATE()), SPID INT NOT NULL DEFAULT (@@SPID), ProgramName NVARCHAR(255) NOT NULL DEFAULT (APP_NAME()), HostName NVARCHAR(255) NOT NULL DEFAULT (HOST_NAME()), ErrorSeverity INT NOT NULL DEFAULT (0), ErrorMessage NVARCHAR(MAX) NULL, ErrorLine INT NULL, ErrorProc SYSNAME NOT NULL DEFAULT (COALESCE(ERROR_PROCEDURE(),OBJECT_NAME(@@PROCID),'<unknown>')), QueueMessage XML NULL, PRIMARY KEY NONCLUSTERED (LogID) ); CREATE CLUSTERED INDEX IX_SB_AT_ServiceBrokerLogs ON SB_AT_ServiceBrokerLogs (LogDate ASC) WITH FILLFACTOR=100; PRINT 'Table SB_AT_ServiceBrokerLogs Created'; END ELSE TRUNCATE TABLE SB_AT_ServiceBrokerLogs GO IF OBJECT_ID('SB_AT_HandleQueue') IS NOT NULL DROP PROCEDURE SB_AT_HandleQueue RAISERROR(N'Creating SB_AT_HandleQueue...',0,0) WITH NOWAIT; GO -- This procedure is activated to handle each item in the Request queue CREATE PROCEDURE SB_AT_HandleQueue AS SET NOCOUNT ON; SET ARITHABORT ON DECLARE @msg XML DECLARE @DlgId UNIQUEIDENTIFIER DECLARE @Info nvarchar(max) DECLARE @ErrorsCount int SET @ErrorsCount = 0 -- Set whether to log verbose status messages before and after each operation DECLARE @Verbose BIT = 1 -- Allow 10 retries in case of service broker errors WHILE @ErrorsCount < 10 BEGIN BEGIN TRANSACTION BEGIN TRY -- Make sure queue is active IF EXISTS (SELECT NULL FROM sys.service_queues WHERE NAME = 'SB_AT_Request_Queue' AND is_receive_enabled = 0) ALTER QUEUE SB_AT_Request_Queue WITH STATUS = ON; -- handle one message at a time WAITFOR ( RECEIVE TOP(1) @msg = convert(xml,message_body), @DlgId = conversation_handle FROM dbo.SB_AT_Request_Queue ); -- exit when waiting has been timed out IF @@ROWCOUNT = 0 BEGIN IF @@TRANCOUNT > 0 ROLLBACK TRANSACTION; BREAK; END -- Retreive data from xml message DECLARE @ProcedureName VARCHAR(1000), @inserted XML, @deleted XML SELECT @ProcedureName = x.value('(/Request/ProcedureName)[1]','VARCHAR(1000)'), @inserted = x.query('/Request/inserted/inserted'), @deleted = x.query('/Request/deleted/deleted') FROM @msg.nodes('/Request') AS T(x); -- Log operation start IF @Verbose = 1 INSERT INTO SB_AT_ServiceBrokerLogs(ErrorSeverity,ErrorMessage,QueueMessage) VALUES(0,'Starting Process',@msg); -- Encapsulate execution in TRY..CATCH -- to catch errors in the specific request BEGIN TRY -- Execute Request EXEC @ProcedureName @inserted, @deleted; END TRY BEGIN CATCH -- log operation fail INSERT INTO SB_AT_ServiceBrokerLogs(ErrorSeverity,ErrorMessage,ErrorLine,ErrorProc,QueueMessage) VALUES(ERROR_SEVERITY(),ERROR_MESSAGE(),ERROR_LINE(),ERROR_PROCEDURE(),@msg); END CATCH -- commit IF @@TRANCOUNT > 0 COMMIT TRANSACTION; -- Log operation end IF @Verbose = 1 INSERT INTO SB_AT_ServiceBrokerLogs(ErrorSeverity,ErrorMessage,ErrorProc,QueueMessage) VALUES(0,'Finished Process',OBJECT_NAME(@@PROCID),@msg); -- Close dialogue END CONVERSATION @DlgId; -- reset xml message SET @msg = NULL; END TRY BEGIN CATCH -- rollback transaction -- this will also rollback the extraction of the message from the queue IF @@TRANCOUNT > 0 ROLLBACK TRANSACTION; -- log operation fail INSERT INTO SB_AT_ServiceBrokerLogs(ErrorSeverity,ErrorMessage,ErrorLine,ErrorProc,QueueMessage) VALUES(ERROR_SEVERITY(),ERROR_MESSAGE(),ERROR_LINE(),ERROR_PROCEDURE(),@msg); -- increase error counter SET @ErrorsCount = @ErrorsCount + 1; -- wait 5 seconds before retrying WAITFOR DELAY '00:00:05' END CATCH END GO IF OBJECT_ID('SB_AT_CloseDialogs') IS NOT NULL DROP PROCEDURE SB_AT_CloseDialogs RAISERROR(N'Creating SB_AT_CloseDialogs...',0,0) WITH NOWAIT; GO -- This procedure is activated to handle each item in the Response queue CREATE PROCEDURE SB_AT_CloseDialogs AS SET NOCOUNT ON; SET ARITHABORT ON DECLARE @MsgType SYSNAME DECLARE @msg XML DECLARE @DlgId UNIQUEIDENTIFIER DECLARE @Info nvarchar(max) DECLARE @ErrorsCount int SET @ErrorsCount = 0 -- Set whether to log verbose status messages before and after each operation DECLARE @Verbose BIT = 0 -- Allow 10 retries in case of service broker errors WHILE @ErrorsCount < 10 BEGIN BEGIN TRY -- Make sure queue is active IF EXISTS (SELECT NULL FROM sys.service_queues WHERE NAME = 'SB_AT_Response_Queue' AND is_receive_enabled = 0) ALTER QUEUE SB_AT_Response_Queue WITH STATUS = ON; -- handle one message at a time WAITFOR ( RECEIVE TOP(1) @msg = CONVERT(xml, message_body), @MsgType = message_type_name, @DlgId = conversation_handle FROM dbo.SB_AT_Response_Queue ); -- exit when waiting has been timed out IF @@ROWCOUNT = 0 BREAK; -- If message type is end dialog or error, end the conversation IF (@MsgType = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' OR @MsgType = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error') BEGIN END CONVERSATION @DlgId; IF @Verbose = 1 INSERT INTO SB_AT_ServiceBrokerLogs(ErrorSeverity,ErrorMessage,ErrorProc,QueueMessage) VALUES(0,'Ended Conversation ' + CONVERT(nvarchar(max),@DlgId),OBJECT_NAME(@@PROCID),@msg); END ELSE IF @Verbose = 1 INSERT INTO SB_AT_ServiceBrokerLogs(ErrorSeverity,ErrorMessage,ErrorProc,QueueMessage) VALUES(0,'Unknown Message from ' + CONVERT(nvarchar(max),@DlgId),OBJECT_NAME(@@PROCID),@msg); -- reset variables SET @MsgType = NULL; SET @msg = NULL; END TRY BEGIN CATCH -- log operation fail INSERT INTO SB_AT_ServiceBrokerLogs(ErrorSeverity,ErrorMessage,ErrorLine,ErrorProc) VALUES(ERROR_SEVERITY(),ERROR_MESSAGE(),ERROR_LINE(),ERROR_PROCEDURE()); -- increase error counter SET @ErrorsCount = @ErrorsCount + 1; -- wait 5 seconds before retrying WAITFOR DELAY '00:00:05' END CATCH END GO DECLARE @SQL nvarchar(max) -- Enable service broker IF EXISTS (SELECT * FROM sys.databases WHERE database_id = DB_ID() AND is_broker_enabled = 0) BEGIN SET @SQL = 'ALTER DATABASE [' + DB_NAME() + '] SET NEW_BROKER WITH ROLLBACK IMMEDIATE'; EXEC(@SQL); PRINT 'Enabled Service Broker for DB ' + DB_NAME(); END GO -- Drop existing objects IF EXISTS (SELECT NULL FROM sys.services WHERE NAME = '//SB_AT/ProcessReceivingService') DROP SERVICE [//SB_AT/ProcessReceivingService]; IF EXISTS (SELECT NULL FROM sys.services WHERE NAME = '//SB_AT/ProcessStartingService') DROP SERVICE [//SB_AT/ProcessStartingService]; IF EXISTS (SELECT NULL FROM sys.service_queues WHERE NAME = 'SB_AT_Request_Queue') DROP QUEUE dbo.SB_AT_Request_Queue; IF EXISTS (SELECT NULL FROM sys.service_queues WHERE NAME = 'SB_AT_Response_Queue') DROP QUEUE dbo.SB_AT_Response_Queue; IF EXISTS (SELECT NULL FROM sys.service_contracts WHERE NAME = '//SB_AT/Contract') DROP CONTRACT [//SB_AT/Contract]; IF EXISTS (SELECT NULL FROM sys.service_message_types WHERE name='//SB_AT/Message') DROP MESSAGE TYPE [//SB_AT/Message]; GO -- Create service broker objects RAISERROR(N'Creating Message Type...',0,0) WITH NOWAIT; CREATE MESSAGE TYPE [//SB_AT/Message] VALIDATION = WELL_FORMED_XML; RAISERROR(N'Creating Contract...',0,0) WITH NOWAIT; CREATE CONTRACT [//SB_AT/Contract] ([//SB_AT/Message] SENT BY ANY); RAISERROR(N'Creating Response Queue...',0,0) WITH NOWAIT; CREATE QUEUE dbo.SB_AT_Response_Queue WITH STATUS=ON, ACTIVATION ( PROCEDURE_NAME = SB_AT_CloseDialogs, -- sproc to run when queue receives message MAX_QUEUE_READERS = 10, -- max concurrent instances EXECUTE AS SELF ); RAISERROR(N'Creating Request Queue...',0,0) WITH NOWAIT; CREATE QUEUE dbo.SB_AT_Request_Queue WITH STATUS=ON, ACTIVATION ( PROCEDURE_NAME = SB_AT_HandleQueue, -- sproc to run when queue receives message MAX_QUEUE_READERS = 10, -- max concurrent instances EXECUTE AS SELF ); RAISERROR(N'Creating Recieving Service...',0,0) WITH NOWAIT; CREATE SERVICE [//SB_AT/ProcessReceivingService] AUTHORIZATION dbo ON QUEUE dbo.SB_AT_Request_Queue ([//SB_AT/Contract]); RAISERROR(N'Creating Sending Service...',0,0) WITH NOWAIT; CREATE SERVICE [//SB_AT/ProcessStartingService] AUTHORIZATION dbo ON QUEUE dbo.SB_AT_Response_Queue ([//SB_AT/Contract]); GO IF OBJECT_ID('SB_AT_Fire_Trigger') IS NOT NULL DROP PROCEDURE SB_AT_Fire_Trigger; RAISERROR(N'Creating SB_AT_Fire_Trigger...',0,0) WITH NOWAIT; GO -- This procedure sends items to the queue for asynchronous triggers CREATE PROCEDURE SB_AT_Fire_Trigger @ProcedureName VARCHAR(1000), @inserted XML = NULL, @deleted XML = NULL AS SET NOCOUNT ON; DECLARE @msg XML -- build the XML message SET @msg = (SELECT ProcedureName = @ProcedureName, inserted = @inserted, deleted = @deleted FOR XML PATH('Request')) DECLARE @DlgId UNIQUEIDENTIFIER BEGIN DIALOG @DlgId FROM SERVICE [//SB_AT/ProcessStartingService] TO SERVICE '//SB_AT/ProcessReceivingService', 'CURRENT DATABASE' ON CONTRACT [//SB_AT/Contract] WITH ENCRYPTION = OFF; -- send the message SEND ON CONVERSATION @DlgId MESSAGE TYPE [//SB_AT/Message] (@msg); PRINT N'Started SB_AT process on dialogId ' + ISNULL(convert(varchar(100),@DlgId),'(null)'); GO
SAMPLE SCRIPTS
Sample Scripts on how to use async trigger
Expand source
USE AdventureWorks2008R2 GO SET ANSI_NULLS ON GO SET QUOTED_IDENTIFIER ON GO IF OBJECT_ID('Purchasing.usp_AT_uPurchaseOrderDetail', 'P') IS NOT NULL DROP PROCEDURE Purchasing.usp_AT_uPurchaseOrderDetail; GO CREATE PROCEDURE Purchasing.usp_AT_uPurchaseOrderDetail @inserted XML, @deleted XML = NULL AS SET NOCOUNT ON; BEGIN TRY IF EXISTS ( SELECT NULL FROM @inserted.nodes('inserted/row') AS T(X) ) BEGIN -- Insert record into TransactionHistory INSERT INTO [Production].[TransactionHistory] ([ProductID] ,[ReferenceOrderID] ,[ReferenceOrderLineID] ,[TransactionType] ,[TransactionDate] ,[Quantity] ,[ActualCost]) SELECT inserted.[ProductID] ,inserted.[PurchaseOrderID] ,inserted.[PurchaseOrderDetailID] ,'P' ,GETDATE() ,inserted.[OrderQty] ,inserted.[UnitPrice] FROM ( SELECT X.query('.').value('(row/PurchaseOrderID)[1]', 'int') AS PurchaseOrderID , X.query('.').value('(row/PurchaseOrderDetailID)[1]', 'int') AS PurchaseOrderDetailID , X.query('.').value('(row/ProductID)[1]', 'int') AS ProductID , X.query('.').value('(row/OrderQty)[1]', 'smallint') AS OrderQty , X.query('.').value('(row/UnitPrice)[1]', 'money') AS UnitPrice FROM @inserted.nodes('inserted/row') AS T(X) ) AS inserted INNER JOIN [Purchasing].[PurchaseOrderDetail] ON inserted.[PurchaseOrderID] = [Purchasing].[PurchaseOrderDetail].[PurchaseOrderID]; -- Update SubTotal in PurchaseOrderHeader record. Note that this causes the -- PurchaseOrderHeader trigger to fire which will update the RevisionNumber. UPDATE [Purchasing].[PurchaseOrderHeader] SET [Purchasing].[PurchaseOrderHeader].[SubTotal] = (SELECT SUM([Purchasing].[PurchaseOrderDetail].[LineTotal]) FROM [Purchasing].[PurchaseOrderDetail] WHERE [Purchasing].[PurchaseOrderHeader].[PurchaseOrderID] = [Purchasing].[PurchaseOrderDetail].[PurchaseOrderID]) WHERE [Purchasing].[PurchaseOrderHeader].[PurchaseOrderID] IN ( SELECT inserted.[PurchaseOrderID] FROM ( SELECT X.query('.').value('(row/PurchaseOrderID)[1]', 'int') AS PurchaseOrderID FROM @inserted.nodes('inserted/row') AS T(X) ) AS inserted ); UPDATE [Purchasing].[PurchaseOrderDetail] SET [Purchasing].[PurchaseOrderDetail].[ModifiedDate] = GETDATE() FROM ( SELECT X.query('.').value('(row/PurchaseOrderID)[1]', 'int') AS PurchaseOrderID , X.query('.').value('(row/PurchaseOrderDetailID)[1]', 'int') AS PurchaseOrderDetailID FROM @inserted.nodes('inserted/row') AS T(X) ) AS inserted WHERE inserted.[PurchaseOrderID] = [Purchasing].[PurchaseOrderDetail].[PurchaseOrderID] AND inserted.[PurchaseOrderDetailID] = [Purchasing].[PurchaseOrderDetail].[PurchaseOrderDetailID]; END; END TRY BEGIN CATCH -- Since we're in an Asynchronous Trigger, rolling back an update operation -- is a lot more complicated than in a regular trigger. -- For now, for this scenario we'll take the risk of having partial data. EXECUTE [dbo].[uspLogError]; END CATCH; GO ALTER TRIGGER [Purchasing].[uPurchaseOrderDetail] ON [Purchasing].[PurchaseOrderDetail] AFTER UPDATE AS BEGIN DECLARE @Count int; SET @Count = @@ROWCOUNT; IF @Count = 0 RETURN; SET NOCOUNT ON; BEGIN TRY IF UPDATE([ProductID]) OR UPDATE([OrderQty]) OR UPDATE([UnitPrice]) BEGIN DECLARE @inserted XML, @deleted XML; SELECT @inserted = ( SELECT * FROM inserted FOR XML PATH('row'), ROOT('inserted') ); SELECT @deleted = ( SELECT * FROM deleted FOR XML PATH('row'), ROOT('deleted') ); EXECUTE SB_AT_Fire_Trigger 'Purchasing.usp_AT_uPurchaseOrderDetail', @inserted, @deleted; END; END TRY BEGIN CATCH EXECUTE [dbo].[uspPrintError]; -- Rollback any active or uncommittable transactions before -- inserting information in the ErrorLog IF @@TRANCOUNT > 0 BEGIN ROLLBACK TRANSACTION; END EXECUTE [dbo].[uspLogError]; END CATCH; END; GO /* ==================================================== Test script ==================================================== */ -- See the data before the update SELECT * FROM Purchasing.PurchaseOrderDetail WHERE PurchaseOrderID = 8 -- Update the data without actually performing any change UPDATE Purchasing.PurchaseOrderDetail SET ProductID = ProductID WHERE PurchaseOrderID = 8 -- Wait 5 seconds WAITFOR DELAY '00:00:05'; -- See the updated data (ModifiedDate should be updated) SELECT * FROM Purchasing.PurchaseOrderDetail WHERE PurchaseOrderID = 8 GO SELECT * FROM SB_AT_ServiceBrokerLogs SELECT * FROM sys.conversation_endpoints /* -- cleanup closed conversations (SQL Server eventually does this automatically) declare @q uniqueidentifier; select @q = conversation_handle from sys.conversation_endpoints where state='CD'; end conversation @q with cleanup */
- No labels