Страницы

воскресенье, 4 сентября 2016 г.

Отслеживание событий сервера с помощью уведомлений о событиях

В MS SQL 2005 вместе с компонентом Service Broker появился новый способ наблюдения за событиями на сервере. Это объект уведомление о событиях. Данный объект создается на уровне сервера с привязкой к определенной базе данных. Объект подписывается на набор событий, о которых он получает уведомления в момент когда события происходят.
Особенность уведомлений о событиях состоит в том, что они интегрированы с Service Broker. В результате появляется возможность автоматического запуска своего кода в процедуре активации именно в тот момент, когда возникает событие. Для сохранения информации о событии такой подход гораздо удобнее чем периодическая проверка хранилища логов в файловой системе или где-то еще, как того требуют другие средства логирования (например, профайлер). Это создает сходство с ddl-триггерами, только тут нельзя делать откат. В качестве примера создадим уведомление о событии, которое позволяет автоматически сохранять в таблицах структурированные данные о графе взаимоблокировок. Для начала создадим базу данных с таблицами, в которых будут храниться наши логи.

use master
go

if db_id ( N'LogStore' ) is null
begin
 create database LogStore
  on primary
  (
   name  = LogStoreData,
   filename = N'C:\В\DbData\LogStoreData.mdf',
   size  = 100 Mb,
   filegrowth = 100 Mb,
   maxsize  = unlimited
  )
  log on
  (
   name  = LogStoreLog,
   filename = 'C:\В\TranLog\LogStoreLog.ldf',
   size  = 100 Mb,
   filegrowth = 100 Mb,
   maxsize  = unlimited
  )
end
go

use LogStore
go

-- Событие графа взаимоблокировки несет в себе информацию об участниках взаимоблокировки.
-- Каждый участник имеет набор атрибутов. Для их хранения создадим два справочника.
if object_id ( N'dbo.AttrTypes', N'U' ) is null
begin
 create table dbo.AttrTypes
 (
  iAttrTpId smallint identity ( 1, 1 ) not null,
  vcAttrTpName varchar ( 100 )   not null,
  constraint PK_AttrTypes_iAttrTpId primary key clustered
   ( iAttrTpId asc ) on [primary],
  constraint AK_AttrTypes_vcAttrTpName unique nonclustered
   ( vcAttrTpName asc ) on [primary]
 ) on [primary]
end
go

insert into dbo.AttrTypes ( vcAttrTpName )
 values
 ( 'База данных' ),
 ( 'Логин' ),
 ( 'Хост' ),
 ( 'Уровень изоляции транзакций' ),
 ( 'Приложение' ),
 ( 'Файл базы данных' ),
 ( 'Объект' ),
 ( 'Индекс' ),
 ( 'Тип блокировки' )
go

if object_id ( N'dbo.Attrs', N'U' ) is null
begin
 create table dbo.Attrs
 (
  iAttrId   int identity ( 1, 1 ) not null,
  iAttrTpId  smallint  not null,
  vcAttrName  varchar ( 100 )  not null,
  constraint PK_Attrs_iAttrId primary key clustered
   ( iAttrId asc ) on [primary],
  constraint AK_Attrs_vcAttrName_iAttrTpId unique nonclustered
   ( vcAttrName asc, iAttrTpId asc ) on [primary],
  constraint FK_Attrs_iAttrTpId foreign key ( iAttrTpId )
   references dbo.AttrTypes ( iAttrTpId ) on update cascade on delete no action
 ) on [primary]
end
go

-- Следующая таблица содержит одну строку на каждое событие. Здесь только идентификатор события и время.
if object_id ( N'dbo.Deadlocks', N'U' ) is null
begin
 create table dbo.Deadlocks
 (
  iEventId int identity ( 1, 1 ) not null,
  dtDate  datetime  not null,
  constraint PK_Deadlocks_iEventId primary key clustered
   ( iEventId asc ) on [primary]
 ) on [primary]
end
go

-- Таблица с перечнем участников взаимоблокировки. В ней перечислены атрибуты участников, а также полем с признаком жертвы.
if object_id ( N'dbo.DeadlockChains', N'U' ) is null
begin
 create table dbo.DeadlockChains
 (
  iEventId  int   not null,
  iProcessId  int identity ( 1, 1 ) not null,
  vcProcessId  varchar ( 20 )  not null,
  bVictim   bit   not null,
  vcResource  varchar ( 100 )  not null,
  iLoginId  int   not null,
  iHostId   int   not null,
  iIsolationId  int   not null,
  iAppId   int   not null,
  dtLastBatchStart datetime  not null,
  dtLastBathEnd  datetime  not null,
  iLockId   int   not null,
  vcBatch   varchar ( 8000 ) not null,
  constraint PK_DeadlockChains_iProcessId primary key clustered
   ( iProcessId asc ) on [primary],
  constraint FK_DeadlockChains_iEventId foreign key ( iEventId )
   references dbo.Deadlocks ( iEventId ) on update no action on delete no action,
  constraint FK_DeadlockChains_iLoginId foreign key ( iLoginId )
   references dbo.Attrs ( iAttrId ) on update no action on delete no action,
  constraint FK_DeadlockChains_iHostId foreign key ( iHostId )
   references dbo.Attrs ( iAttrId ) on update no action on delete no action,
  constraint FK_DeadlockChains_iIsolationId foreign key ( iIsolationId )
   references dbo.Attrs ( iAttrId ) on update no action on delete no action,
  constraint FK_DeadlockChains_iAppId foreign key ( iAppId )
   references dbo.Attrs ( iAttrId ) on update no action on delete no action,
  constraint FK_DeadlockChains_iLockId foreign key ( iLockId )
   references dbo.Attrs ( iAttrId ) on update no action on delete no action
 ) on [primary]
end
go

-- Здесь хранится перечень ресурсов, которые образовали цепочку.
if object_id ( N'dbo.DeadlockRes', N'U' ) is null
begin
 create table dbo.DeadlockRes
 (
  iEventId int   not null,
  iResId  int identity ( 1, 1 ) not null,
  iDbId  int   not null,
  iFileId  int   not null,
  iPageId  int   not null,
  iObjId  int   not null,
  iIndId  int   not null,
  iHobtId  bigint   not null,
  constraint PK_DeadlockRes_iResId primary key clustered
   ( iResId asc ),
  constraint FK_DeadlockRes_iEventId foreign key ( iEventId )
   references dbo.Deadlocks ( iEventId ) on update cascade on delete no action,
  constraint FK_DeadlockRes_iDbId foreign key ( iDbId )
   references dbo.Attrs ( iAttrId ) on update no action on delete no action,
  constraint FK_DeadlockRes_iFileId foreign key ( iFileId )
   references dbo.Attrs ( iAttrId ) on update no action on delete no action,
  constraint FK_DeadlockRes_iObjId foreign key ( iObjId )
   references dbo.Attrs ( iAttrId ) on update no action on delete no action,
  constraint FK_DeadlockRes_iIndId foreign key ( iIndId )
   references dbo.Attrs ( iAttrId ) on update no action on delete no action
 ) on [primary]
end
go

-- У каждого ресурса может быть перечень владельцев.
if object_id ( N'dbo.ResOwners', N'U' ) is null
begin
 create table dbo.ResOwners
 (
  iResId  int not null,
  iProcessId int not null,
  iModeId  int not null,
  constraint PK_ResOwners_iResId_iProcessId primary key clustered
   ( iResId asc, iProcessId asc ) on [primary],
  constraint FK_ResOwners_iResId foreign key ( iResId )
   references dbo.DeadlockRes ( iResId ) on update cascade on delete no action,
  constraint FK_ResOwners_iProcessId foreign key ( iProcessId )
   references dbo.DeadlockChains ( iProcessId ) on update cascade on delete no action
 ) on [primary]
end
go

-- У каждого ресурса может быть перечень процессов, ожидающих получения блокировки на данный ресурс.
if object_id ( N'dbo.ResWaiters', N'U' ) is null
begin
 create table dbo.ResWaiters
 (
  iResId  int not null,
  iProcessId int not null,
  iModeId  int not null,
  constraint PK_ResWaiters_iResId_iProcessId primary key clustered
   ( iResId asc, iProcessId asc ) on [primary],
  constraint FK_ResWaiters_iResId foreign key ( iResId )
   references dbo.DeadlockRes ( iResId ) on update cascade on delete no action,
  constraint FK_ResWaiters_iProcessId foreign key ( iProcessId )
   references dbo.DeadlockChains ( iProcessId ) on update cascade on delete no action
 ) on [primary]
end
go
В результате получилась такая диаграмма:






















Теперь создадим объекты брокера в базе данных. Понадобится создать только очередь и службу, так как для типов сообщений и контракта используются специальные системные объекты, имеющиеся в каждой базе данных.

alter database LogStore set enable_broker with rollback immediate
go

if object_id ( N'dbo.LogDeadlockGraph', N'P' ) is null
begin
 exec sp_executesql N'create proc dbo.LogDeadlockGraph as return'
end
go

if object_id ( N'dbo.Logger', N'SQ' ) is null
begin
 create queue dbo.Logger
  with
   status = on,
   retention = off,
   activation
   (
    status = on,
    procedure_name = dbo.LogDeadlockGraph,
    max_queue_readers = 4,
    exec as owner
   )
end
go

if not exists
(
 select *
 from sys.services
 where name = N'LogService'
)
begin
 create service LogService
  on queue dbo.Logger ( [http://schemas.microsoft.com/SQL/Notifications/PostEventNotification] )
end
go

if not exists
(
 select *
 from sys.server_event_notifications
 where name = N'ServerLogger'
)
begin
 create event notification ServerLogger
  on server
  for deadlock_graph
  to service 'LogService', 'current database'
end
go
Теперь обновим процедуру активации. Данная процедура будет читать очередь, получая XML-представление о событии. Для парсинга XML используются методы типа данных XML и выражения языка XQuery. Общая схема работы процедуры состоит в следующем.

1. Наполняем табличную переменную с перечнем участников взаимоблокировки.
2. Наполняем справочник значениями атрибутов участников.
3. Наполняем нормализованную таблицу dbo.DeadlockChains
4. Наполняем табличную переменную списков ресурсов с их блокировками от участников. При этом для простоты рассматриваем случай, когда ресурсы это строки или ключи.
5. Наполняем справочник атрибутами ресурсов
6. Наполняем нормализованную таблицу dbo.DeadlockRes. При этом для наполнения используем merge+output для сохранения соответствия узлов ресурсов и идентификаторов ресурсов.
7. Наполняем таблицы с владельцами и заблокированными соединениями.

Для конвертации xml-документа в таблицу используем метод nodes. Ниже приведен полный код хранимой процедуры.

alter proc dbo.LogDeadlockGraph
as
begin
 set nocount, xact_abort on
 
 begin try
  declare @xBody xml, @vcVictProcessId varchar ( 100 ), @dtEventTime datetime, @iEventId int, @vcSql nvarchar ( max ), @vcErr nvarchar ( max )
  begin tran
   ;
   receive top ( 1 ) @xBody = try_convert ( xml, message_body )
   from dbo.Logger
   if @@rowcount = 0
   begin
    return
   end
   
   declare @tblProcesses table
   (
    vcId  varchar ( 100 )  not null,
    vcResource varchar ( 100 )  not null,
    vcLogin  varchar ( 100 )  not null,
    vcHost  varchar ( 100 )  not null,
    vcIsolation  varchar ( 100 )  not null,
    vcApp  varchar ( 100 )  not null,
    dtStart  datetime   not null,
    dtEnd  datetime   not null,
    vcBatch  varchar ( 8000 )  not null,
    vcLock  varchar ( 100 )   not null
   )
   
   insert into @tblProcesses
   (
    vcId,
    vcResource,
    vcLogin,
    vcHost,
    vcIsolation,
    vcApp,
    dtStart,
    dtEnd,
    vcBatch,
    vcLock
   )
   select
    vcId  = left ( isnull ( process.value ( '(@id)[1]', 'nvarchar ( max )' ), '' ), 100 ),
    vcResource = left ( isnull ( process.value ( '(@waitresource)[1]', 'nvarchar ( max )' ), '' ), 100 ),
    vcLogin  = left ( isnull ( process.value ( '(@loginname)[1]', 'nvarchar ( max )' ), '' ), 100 ),
    vcHost  = left ( isnull ( process.value ( '(@hostname)[1]', 'nvarchar ( max )' ), '' ), 100 ),
    vcIsolation = left ( isnull ( process.value ( '(@isolationlevel)[1]', 'nvarchar ( max )' ), '' ), 100 ),
    vcApp  = left ( isnull ( process.value ( '(@clientapp)[1]', 'nvarchar ( max )' ), '' ), 100 ),
    dtStart  = isnull ( process.value ( '(@lastbatchstarted)[1]', 'nvarchar ( max )' ), '1900-01-01' ),
    dtEnd  = isnull ( process.value ( '(@lastbatchcompleted)[1]', 'nvarchar ( max )' ), '1900-01-01' ),
    vcBatch  = left ( isnull ( process.value ( '(inputbuf)[1]', 'nvarchar ( max )' ), '' ), 8000 ),
    vcLock  = left ( isnull ( process.value ( '(@lockMode)[1]', 'nvarchar ( max )' ), '' ), 100 )
   from @xBody.nodes ( '/EVENT_INSTANCE/TextData/deadlock-list/deadlock/process-list/process' ) processes ( process )
   
   ;
   with Attrs
   as
   (
    select vcAttr, vcAttrTp
    from
    (
     select distinct vcLogin, N'Логин' from @tblProcesses
     union all
     select distinct vcHost, N'Хост' from @tblProcesses
     union all
     select distinct vcIsolation, N'Уровень изоляции транзакций' from @tblProcesses
     union all
     select distinct vcApp, N'Приложение' from @tblProcesses
     union all
     select distinct vcLock, N'Тип блокировки' from @tblProcesses
    ) attrs ( vcAttr, vcAttrTp )
   )
   insert into dbo.Attrs ( iAttrTpId, vcAttrName )
    select attrtp.iAttrTpId, newattrs.vcAttr
    from
     Attrs newattrs
      inner join
     dbo.AttrTypes attrtp on newattrs.vcAttrTp = attrtp.vcAttrTpName
      left outer join
     dbo.Attrs attrs on
      attrs.iAttrTpId = attrtp.iAttrTpId and
      attrs.vcAttrName = newattrs.vcAttr
    where attrs.iAttrId is null
   set @vcVictProcessId = @xBody.value ( '(/EVENT_INSTANCE/TextData/deadlock-list/deadlock/@victim)[1]',
       'nvarchar ( max )' )
   set @dtEventTime = (
    select dtEnd
    from @tblProcesses
    where vcId = @vcVictProcessId
   )
   
   insert into dbo.Deadlocks ( dtDate )
    values ( @dtEventTime )
   set @iEventId = scope_identity ()
   
   ;
   with AttrData
   as
   (
    select attrs.iAttrId, attrs.vcAttrName, attrtp.vcAttrTpName
    from
     dbo.Attrs attrs
      inner join
     dbo.AttrTypes attrtp on attrs.iAttrTpId = attrtp.iAttrTpId
   )
   insert into dbo.DeadlockChains
   (
    iEventId, vcProcessId, bVictim, vcResource, iLoginId, iHostId, iIsolationId, iAppId,
    dtLastBatchStart, dtLastBathEnd, iLockId, vcBatch
   )
   select @iEventId, vcId, case when vcId = @vcVictProcessId then 1 else 0 end, vcResource,
    attrlog.iAttrId, attrhost.iAttrId, attrlevel.iAttrId, attrapp.iAttrId, processes.dtStart, processes.dtEnd,
    attrlock.iAttrId, processes.vcBatch
   from
    @tblProcesses processes
     inner join
    AttrData attrlog on processes.vcLogin = attrlog.vcAttrName and attrlog.vcAttrTpName = N'Логин'
     inner join
    AttrData attrhost on processes.vcHost = attrhost.vcAttrName and attrhost.vcAttrTpName = N'Хост'
     inner join
    AttrData attrlevel on processes.vcIsolation = attrlevel.vcAttrName and attrlevel.vcAttrTpName = N'Уровень изоляции транзакций'
     inner join
    AttrData attrapp on processes.vcApp = attrapp.vcAttrName and attrapp.vcAttrTpName = N'Приложение'
     inner join
    AttrData attrlock on processes.vcLock = attrlock.vcAttrName and attrlock.vcAttrTpName = N'Тип блокировки'
   
   declare @tblResData table
   (
    iResId  int  not null,
    iDbId  int  not null,
    iPageId  int  not null,
    iFileId  int  not null,
    iHobtId  bigint  not null,
    vcObj  varchar ( 600 ) not null,
    vcIndex  varchar ( 200 ) not null,
    vcMode  varchar ( 100 ) not null,
    vcOwner  varchar ( 100 ) not null,
    vcOwnerMode varchar ( 100 ) not null,
    vcWaiter varchar ( 100 ) not null,
    vcWaiterMode varchar ( 100 ) not null
   )
   
   set @vcSql = N'
   select
    ResData.iResId,
    ResData.iDb,
    ResData.iPage,
    ResData.iFile,
    ResData.iHobtId,
    ResData.vcObj,
    ResData.vcIndex,
    ResData.vcMode,
    vcOwner   = left ( isnull ( own.value ( ''(/owner-list/owner/@id)[1]'', ''nvarchar ( max )'' ), '''' ), 100 ),
    vcOwnerMode  = left ( isnull ( own.value ( ''(/owner-list/owner/@mode)[1]'', ''nvarchar ( max )'' ), '''' ), 100 ),
    vcWaiter  = left ( isnull ( wait.value ( ''(/waiter-list/waiter/@id)[1]'', ''nvarchar ( max )'' ), '''' ), 100 ),
    vcWaiterMode = left ( isnull ( wait.value ( ''(/waiter-list/waiter/@mode)[1]'', ''nvarchar ( max )'' ), '''' ), 100 )
   from
    (
     select
      iResId = row_number () over ( order by ( select 1 ) ),
      iDb  = isnull ( ResData.Res.value ( ''(@dbid)[1]'', ''int'' ), -1 ),
      iPage = isnull ( ResData.Res.value ( ''(@pageid)[1]'', ''int'' ), -1 ),
      iFile = isnull ( ResData.Res.value ( ''(@fileid)[1]'', ''int'' ), -1 ),
      iHobtId = isnull ( ResData.Res.value ( ''(@hobtid)[1]'', ''bigint'' ), -1 ),
      vcObj = left ( isnull ( ResData.Res.value ( ''(@objectname)[1]'', ''nvarchar ( max )'' ), '''' ), 100 ),
      vcIndex = left ( isnull ( ResData.Res.value ( ''(@indexname)[1]'', ''nvarchar ( max )'' ), '''' ), 100 ),
      vcMode = left ( isnull ( ResData.Res.value ( ''(@mode)[1]'', ''nvarchar ( max )'' ), '''' ), 100 ),
      xOwner = ResData.Res.query ( ''owner-list'' ),
      xWaiter = ResData.Res.query ( ''waiter-list'' )
     from @xBody.nodes ( ' +
      iif
      (
       @xBody.exist ( '/EVENT_INSTANCE/TextData/deadlock-list/deadlock/resource-list/ridlock' ) = 1,
       '''/EVENT_INSTANCE/TextData/deadlock-list/deadlock/resource-list/ridlock''',
       '''/EVENT_INSTANCE/TextData/deadlock-list/deadlock/resource-list/keylock'''
      )
     + ' ) ResData ( Res )
    ) ResData
     cross apply
    ResData.xOwner.nodes ( ''.'' ) owners ( own )
     cross apply
    ResData.xWaiter.nodes ( ''.'' ) waiters ( wait )
   '
   insert into @tblResData
   (
    iResId,
    iDbId,
    iPageId,
    iFileId,
    iHobtId,
    vcObj,
    vcIndex,
    vcMode,
    vcOwner,
    vcOwnerMode,
    vcWaiter,
    vcWaiterMode
   ) exec sp_executesql @vcSql, N'@xBody xml', @xBody = @xBody
   
   ;
   with AttrData
   as
   (
    select distinct vcAttrTpName, vcAttrName
    from
    (
     select distinct 'База данных', isnull ( db_name ( iDbId ), '' ) from @tblResData
     union all
     select distinct 'Файл базы данных', isnull ( dbfil.name, '' )
     from @tblResData fildata left outer join sys.master_files dbfil on
        fildata.iDbId = dbfil.database_id and fildata.iFileId = dbfil.file_id
     union all
     select distinct 'Объект', vcObj from @tblResData
     union all
     select distinct 'Индекс', vcIndex from @tblResData
     union all
     select distinct 'Тип блокировки', vcMode from @tblResData
     union all
     select distinct 'Тип блокировки', vcOwnerMode from @tblResData
     union all
     select distinct 'Тип блокировки', vcWaiterMode from @tblResData
    ) attr ( vcAttrTpName, vcAttrName )
   )
   insert into dbo.Attrs ( iAttrTpId, vcAttrName )
    select attrtp.iAttrTpId, attrdata.vcAttrName
    from
     AttrData attrdata
      inner join
     dbo.AttrTypes attrtp on attrdata.vcAttrTpName = attrtp.vcAttrTpName
      left outer join
     dbo.Attrs attr on
      attr.iAttrTpId = attrtp.iAttrTpId and
      attr.vcAttrName = attrdata.vcAttrName
    where attr.iAttrId is null
   
   declare @tblResIds table ( iResId int not null, iResNewId int not null )
   
   ;
   with AttrData
   as
   (
    select attrs.iAttrId, attrs.vcAttrName, attrtp.vcAttrTpName
    from
     dbo.Attrs attrs
      inner join
     dbo.AttrTypes attrtp on attrs.iAttrTpId = attrtp.iAttrTpId
   )
   merge dbo.DeadlockRes as trg
   using
   (
    select iResId, iEventId = @iEventId, iDbId = attrdb.iAttrId, iFileId = attrfil.iAttrId,
        res.iPageId, iObjId = attrobj.iAttrId, iIndId = attrind.iAttrId, res.iHobtId
    from
     (
      select distinct
       iResId,
       iDbId,
       iPageId,
       iFileId,
       iHobtId,
       vcObj,
       vcIndex,
       vcMode
      from @tblResData
     ) res
      inner join
     AttrData attrdb on attrdb.vcAttrTpName = N'База данных' and attrdb.vcAttrName = isnull ( db_name ( res.iDbId ), '' )
      inner join
     AttrData attrobj on attrobj.vcAttrTpName = N'Объект' and attrobj.vcAttrName = res.vcObj
      inner join
     AttrData attrind on attrind.vcAttrTpName = N'Индекс' and attrind.vcAttrName = res.vcIndex
      left outer join
     sys.master_files fil on
      fil.database_id = res.iDbId and
      fil.file_id = res.iFileId
      
      inner join
     AttrData attrfil on attrfil.vcAttrTpName = N'Файл базы данных' and attrfil.vcAttrName = isnull ( fil.name, '' )
   ) as src on 1 = 0
   when not matched then insert ( iEventId, iDbId, iFileId, iPageId, iObjId, iIndId, iHobtId )
    values ( src.iEventId, src.iDbId, src.iFileId, src.iPageId, src.iObjId, src.iIndId, src.iHobtId )
   output src.iResId, inserted.iResId
   into @tblResIds ( iResId, iResNewId )
   ;
   
   insert into dbo.ResOwners ( iResId, iProcessId, iModeId )
    select resids.iResNewId, pr.iProcessId, attr.iAttrId
    from
     @tblResData resdata
      inner join
     @tblResIds resids on resdata.iResId = resids.iResId
      inner join
     dbo.DeadlockChains pr on
      pr.iEventId = @iEventId and
      pr.vcProcessId = resdata.vcOwner
      
      inner join
     (
      select attr.iAttrId, attr.vcAttrName
      from
       dbo.Attrs attr
        inner join
       dbo.AttrTypes attrtp on
        attr.iAttrTpId = attrtp.iAttrTpId and
        attrtp.vcAttrTpName = 'Тип блокировки'
     ) attr on resdata.vcOwnerMode = attr.vcAttrName
   
   insert into dbo.ResWaiters ( iResId, iProcessId, iModeId )
    select resids.iResNewId, pr.iProcessId, attr.iAttrId
    from
     @tblResData resdata
      inner join
     @tblResIds resids on resdata.iResId = resids.iResId
      inner join
     dbo.DeadlockChains pr on
      pr.iEventId = @iEventId and
      pr.vcProcessId = resdata.vcWaiter
      
      inner join
     (
      select attr.iAttrId, attr.vcAttrName
      from
       dbo.Attrs attr
        inner join
       dbo.AttrTypes attrtp on
        attr.iAttrTpId = attrtp.iAttrTpId and
        attrtp.vcAttrTpName = 'Тип блокировки'
     ) attr on resdata.vcWaiterMode = attr.vcAttrName
  commit tran
 end try
 begin catch
  if xact_state () <> 0
  begin
   rollback tran
  end
  set @vcErr = concat ( error_message (), ' ', error_number (), ' ', error_line () )
  ; throw 60000, @vcErr, 1
 end catch
end
go

Протестируем работу уведомления, создав классическую взаимоблокировку. Откроем два соединения. В первом запустим такой код:

create table ##data1 ( i int )
create table ##data2 ( i int )

begin tran

insert into ##data1 ( i )
 values ( 1 )

Во втором соединении запустим код:

begin tran

insert into ##data2 ( i )
 values ( 1 )
В первом попробуем обратиться к таблице, заблокированной вторым соединением:

update ##data2
 set i += 1
Аналогично поступим со вторым соединением:

update ##data1
 set i += 1
В течение нескольких секунд менеджер блокировок обнаруживает неразрешимую цепочку и выбирает второе соединение в качестве жертвы взаимоблокировки. Сделав запрос к таблицам, видим результаты.

declare @iEventId int = 1
select evn.vcProcessId, evn.bVictim, evn.vcResource,
 vcLogin = attrlog.vcAttrName,
 vcHost = attrhost.vcAttrName,
 vcIsolation = attrisol.vcAttrName,
 vcApp = attrapp.vcAttrName,
 evn.dtLastBatchStart, evn.dtLastBathEnd,
 vcLock = attrlock.vcAttrName,
 evn.vcBatch
from
 dbo.DeadlockChains evn
  inner join
 dbo.Attrs attrlog on evn.iLoginId = attrlog.iAttrId
  inner join
 dbo.Attrs attrhost on evn.iHostId = attrhost.iAttrId
  inner join
 dbo.Attrs attrisol on evn.iIsolationId = attrisol.iAttrId
  inner join
 dbo.Attrs attrapp on evn.iAppId = attrapp.iAttrId
  inner join
 dbo.Attrs attrlock on evn.iLockId = attrlock.iAttrId
where evn.iEventId = @iEventId

select vcDb = attrdb.vcAttrName, vcDbFile = attrfil.vcAttrName, res.iPageId, res.iHobtId,
 vcObj = attrobj.vcAttrName, vcIndex = attrind.vcAttrName,
 vcOwner = resowns.vcProcessId, vcOwnerLockMode = lockown.vcAttrName, vcWaiter = reswait.vcProcessId,
 vcWaiterLockMode = lockwait.vcAttrName
from
 dbo.DeadlockRes res
  inner join
 dbo.Attrs attrdb on res.iDbId = attrdb.iAttrId
  inner join
 dbo.Attrs attrfil on res.iFileId = attrfil.iAttrId
  inner join
 dbo.Attrs attrobj on res.iObjId = attrobj.iAttrId
  inner join
 dbo.Attrs attrind on res.iIndId = attrind.iAttrId
  inner join
 dbo.ResOwners own on res.iResId = own.iResId

  inner join
 dbo.DeadlockChains resowns on
  resowns.iEventId = res.iEventId and
  own.iProcessId = resowns.iProcessId
  
  inner join
 dbo.ResWaiters wait on res.iResId = wait.iResId
  inner join
 dbo.DeadlockChains reswait on
  reswait.iEventId = res.iEventId and
  wait.iProcessId = reswait.iProcessId

  inner join
 dbo.Attrs lockown on own.iModeId = lockown.iAttrId
  inner join
 dbo.Attrs lockwait on wait.iModeId = lockwait.iAttrId
where res.iEventId = @iEventId







Полный перечень наборов событий можно посмотреть в специальном системном представлении sys.event_notification_event_types. Тут есть много интересных событий. Например, можно получать уведомления при обнаружении поврежденной страницы в базе данных, нехватки памяти при операциях сортировки или построении hash-таблицы, изменении уровня потребления памяти сервером, расширении файлов базы, эскалации блокировок, изменении в доступах, настройках базы или сервера, наличии заблокированных соединений и многом другом.

воскресенье, 17 июля 2016 г.

Зеркальное отображение (безопасность на основе сертификатов, следящий сервер).

Сегодня речь пойдет о реализации такого решения высокого уровня доступности как зеркалирование баз данных. Это очень полезное и легкое в развертывании решение, которое может значительно повысить доступность приложений. Когда зеркалирование появилось в MS SQL 2005 его даже стали называть убийцей кластеров за отсутствие сложных административных настроек, легкость в реализации, возможность автоматической отработки отказа. При этом в отличие от кластеров здесь нет общего дискового хранилища. Вместо этого каждый участник сеанса зеркального отображения может располагаться на собственных дисках, что еще больше повышает безопасность. В качестве примера создадим простую базу данных с одной таблицей для хранения документов. В данном решении будут задействованы 3 сервера: основной, к которому могут подключаться приложения, зеркальный с синхронизированной резервной копией данных и следящий, который нужен для автоматической отработки отказа (если работа будет настроена в безопасном режиме).

if db_id ( N'Docs' ) is null
begin
 create database Docs
  on primary
  (
   name  = DocData,
   filename = 'C:\В\DbData\DocData.mdf',
   size  = 10 Mb,
   filegrowth = 10 Mb,
   maxsize  = unlimited
  )
  log on
  (
   name  = DocLog,
   filename = 'C:\В\TranLog\DocLog.ldf',
   size  = 10 Mb,
   filegrowth = 10 Mb,
   maxsize  = unlimited
  )
end
go

alter database Docs set recovery full
go

use Docs
go

create table dbo.Documents
(
 iDocId int identity ( 1, 1 ) not null,
 vcDoc varchar ( 100 )  not null,
 varDoc varbinary ( max ) not null,
 constraint PK_Documents_iDocId primary key clustered ( iDocId asc ) on [primary],
 constraint AK_Documents_vcDoc unique nonclustered ( vcDoc asc ) on [primary]
) on [primary]
go

insert into dbo.Documents ( vcDoc, varDoc )
 values ( 'Test1', 0x )
go
Теперь для подготовки сеанса зеркального отображения необходимо сделать полный бэкап базы и бэкап ее журнала транзакций:
backup database Docs
 to disk = 'C:\В\Docs.bak'
 with format, init, compression, stats = 1, norewind, checksum
go

backup log Docs
 to disk = 'C:\В\Docs.trn'
 with format, init, compression, stats = 1, norewind, checksum
go
Созданные резервные копии необходимо восстановить на зеркальном сервере с опцией norecovery.

if db_id ( N'Docs' ) is null
begin
 restore database Docs
  from disk = 'C:\В\Docs.bak'
  with
   move 'DocData' to 'C:\Program Files\Microsoft SQL Server\MSSQL12.MIRROR\MSSQL\DATA\DocData.mdf',
   move 'DocLog' to 'C:\Program Files\Microsoft SQL Server\MSSQL12.MIRROR\MSSQL\DATA\DocLog.ldf',
   stats = 1,
   norecovery
 restore log Docs
  from disk = 'C:\В\Docs.trn'
  with
   stats = 1,
   norecovery
end
go
Приступим к настройке объектов для передачи данных между серверами. Понадобятся конечные точки и учетные записи. Безопасность конечных точек лучше настраивать с помощью сертификатов (а не Windows). В этом случае не придется беспокоиться о том под какими учетными записями работают службы сервера (локальными или доменными), входят ли компьютеры в один домен или в разные домены с определенными отношениями (или же находятся вне доменов).

Итак, на основном сервере требуется создать логин для доступа к зеркальному и следящему серверам, настроить сертификат, создать конечную точку, подготовить резервные копии сертификата для восстановления на двух остальных серверах.

use master
go

if suser_id ( N'MainRepl' ) is null
begin
 create login MainRepl
  with
  password  = N'2#$@#!',
  check_policy  = off,
  default_database = master,
  default_language = russian
end
go

use master
go

if database_principal_id ( N'MainReplUser' ) is null
begin
 create user MainReplUser
  from login MainRepl
  with default_schema = dbo
end
go

use master
go

if not exists
(
 select *
 from sys.symmetric_keys
 where name = N'##MS_DatabaseMasterKey##'
)
begin
 create master key
  encryption by password = N'$R#tt5feR#R#D'
end
go

use master
go

if cert_id ( N'MainReplHost' ) is null
begin
 create certificate MainReplHost
  authorization dbo
  with subject = N'Сертификат главного сервера.',
  expiry_date = '2017-12-31',
  start_date = '2016-07-01'
end
go

if not exists
(
 select *
 from sys.endpoints
 where name = N'Mirroring'
)
begin
 create endpoint Mirroring
  state = started
  as tcp
  (
   listener_port = 7024,
   listener_ip = all
  )
  for database_mirroring
  (
   authentication = certificate MainReplHost,
   encryption = required algorithm aes,
   role = all
  )
end
go

backup certificate MainReplHost
 to file = 'C:\Program Files\Microsoft SQL Server\MSSQL12.MIRROR\MSSQL\DATA\MainReplHost.cer'
go
backup certificate MainReplHost
 to file = 'C:\Program Files\Microsoft SQL Server\MSSQL12.WITNESS\MSSQL\DATA\MainReplHost.cer'
go
В коде, приведенном выше, перед созданием сертификата в master был создан главный ключ БД для защиты сертификата, так как последний не защищен паролем. Для конечной точки прописана безопасность на основе сертификата. Резервная копия сертификата создана дважды: в доступных местах для зеркального и следящего серверов (следящий и зеркальный сервера на моем компьютере работают под локальными учетными записями). Аналогичные действия требуется выполнить на двух оставшихся серверах. Вот код для зеркального сервера:

use master
go

if suser_id ( N'MirrorRepl' ) is null
begin
 create login MirrorRepl
  with
  password  = N'2#$@#!',
  check_policy  = off,
  default_database = master,
  default_language = russian
end
go

use master
go

if database_principal_id ( N'MirrorReplUser' ) is null
begin
 create user MirrorReplUser
  from login MirrorRepl
  with default_schema = dbo
end
go

use master
go

if not exists
(
 select *
 from sys.symmetric_keys
 where name = N'##MS_DatabaseMasterKey##'
)
begin
 create master key
  encryption by password = N'^YgfeHEF$#23'
end
go

use master
go

if cert_id ( N'MirrorReplHost' ) is null
begin
 create certificate MirrorReplHost
  authorization dbo
  with subject = N'Сертификат зеркального сервера.',
  expiry_date = '2017-12-31',
  start_date = '2016-07-01'
end
go

if not exists
(
 select *
 from sys.endpoints
 where name = N'Mirroring'
)
begin
 create endpoint Mirroring
  state = started
  as tcp
  (
   listener_port = 7025,
   listener_ip = all
  )
  for database_mirroring
  (
   authentication = certificate MirrorReplHost,
   encryption = required algorithm aes,
   role = all
  )
end
go

backup certificate MirrorReplHost
 to file = 'C:\В\MirrorReplHost.cer'
go
backup certificate MirrorReplHost
 to file = 'C:\Program Files\Microsoft SQL Server\MSSQL12.WITNESS\MSSQL\DATA\MirrorReplHost.cer'
go
Здесь все аналогично коду для основного сервера, только для конечной точки указан другой порт из-за того что в данном примере все 3 сервера находятся на одном компьютере. Наконец приведем такой же код для следящего сервера (у конечной точки опять-таки порт отличен от портов конечных точек основного и зеркального серверов).

use master
go

if suser_id ( N'WitnessRepl' ) is null
begin
 create login WitnessRepl
  with
  password  = N'erg343grF#$FE',
  check_policy  = off,
  default_database = master,
  default_language = russian
end
go

use master
go

if database_principal_id ( N'WitnessReplUser' ) is null
begin
 create user WitnessReplUser
  from login WitnessRepl
  with default_schema = dbo
end
go

use master
go

if not exists
(
 select *
 from sys.symmetric_keys
 where name = N'##MS_DatabaseMasterKey##'
)
begin
 create master key
  encryption by password = N'we32@#ESe'
end
go

use master
go

if cert_id ( N'WitnessReplHost' ) is null
begin
 create certificate WitnessReplHost
  authorization dbo
  with subject = N'Сертификат следящего сервера.',
  expiry_date = '2017-12-31',
  start_date = '2016-07-01'
end
go

if not exists
(
 select *
 from sys.endpoints
 where name = N'Mirroring'
)
begin
 create endpoint Mirroring
  state = started
  as tcp
  (
   listener_port = 7023,
   listener_ip = all
  )
  for database_mirroring
  (
   authentication = certificate WitnessReplHost,
   encryption = required algorithm aes,
   role = all
  )
end
go

backup certificate WitnessReplHost
 to file = 'C:\В\WitnessReplHost.cer'
go
backup certificate WitnessReplHost
 to file = 'C:\Program Files\Microsoft SQL Server\MSSQL12.MIRROR\MSSQL\DATA\WitnessReplHost.cer'
go
После того как все конечные точки созданы необходимо обеспечить доступ с каждого сервера на 2 других сервера. Для этого для сертификата и логина на каждом сервере необходимо на двух других серверах восстановить резервную копию сертификата и создать одноименного пользователя с правами на конечную точку и на сертификат. Так это делается на основном сервере (пароли на каждом сервере не обязаны совпадать).

use master
go

if suser_id ( N'MirrorRepl' ) is null
begin
 create login MirrorRepl
  with
   password  = 'EG#WferWF43edf',
   check_policy  = off,
   default_database = master,
   default_language = russian
end
go
if suser_id ( N'WitnessRepl' ) is null
begin
 create login WitnessRepl
  with
   password  = 'rf34ewfFWfdsd#@#$$',
   check_policy  = off,
   default_database = master,
   default_language = russian
end
go

use master
go
if database_principal_id ( N'MirrorReplUser' ) is null
begin
 create user MirrorReplUser
  from login MirrorRepl
  with default_schema = dbo
end
go
if database_principal_id ( N'WitnessReplUser' ) is null
begin
 create user WitnessReplUser
  from login WitnessRepl
  with default_schema = dbo
end
go

use master
go
if cert_id ( N'MirrorReplHost' ) is null
begin
 create certificate MirrorReplHost
  authorization MirrorReplUser
  from file = 'C:\В\MirrorReplHost.cer'
end
go
if cert_id ( N'WitnessReplHost' ) is null
begin
 create certificate WitnessReplHost
  authorization WitnessReplUser
  from file = 'C:\В\WitnessReplHost.cer'
end
go

use master
go
grant connect on endpoint::Mirroring to MirrorRepl, WitnessRepl
go
Аналогичный код прогоняется на зеркальном сервере (создаются логины и пользователи для основного и следящего серверов, создаются сертификаты из резервных копий, пользователи делаются владельцами своих сертификатов, логинам предоставляется доступ на подключение к конечной точке).

use master
go

if suser_id ( N'MainRepl' ) is null
begin
 create login MainRepl
  with
   password  = '43#T%Fev',
   check_policy  = off,
   default_database = master,
   default_language = russian
end
go
if suser_id ( N'WitnessRepl' ) is null
begin
 create login WitnessRepl
  with
   password  = 'dgf3t4SVSDv',
   check_policy  = off,
   default_database = master,
   default_language = russian
end
go

use master
go
if database_principal_id ( N'MainReplUser' ) is null
begin
 create user MainReplUser
  from login MainRepl
  with default_schema = dbo
end
go
if database_principal_id ( N'WitnessReplUser' ) is null
begin
 create user WitnessReplUser
  from login WitnessRepl
  with default_schema = dbo
end
go

use master
go
if cert_id ( N'MainReplHost' ) is null
begin
 create certificate MainReplHost
  authorization MainReplUser
  from file = 'C:\Program Files\Microsoft SQL Server\MSSQL12.MIRROR\MSSQL\DATA\MainReplHost.cer'
end
go
if cert_id ( N'WitnessReplHost' ) is null
begin
 create certificate WitnessReplHost
  authorization WitnessReplUser
  from file = 'C:\Program Files\Microsoft SQL Server\MSSQL12.MIRROR\MSSQL\DATA\WitnessReplHost.cer'
end
go

use master
go
grant connect on endpoint::Mirroring to MainRepl, WitnessRepl
go
В последний раз запустим аналогичный код для сервере-свидетеле:

use master
go

if suser_id ( N'MainRepl' ) is null
begin
 create login MainRepl
  with
   password  = 'rfT#$FRged',
   check_policy  = off,
   default_database = master,
   default_language = russian
end
go
if suser_id ( N'MirrorRepl' ) is null
begin
 create login MirrorRepl
  with
   password  = 'e3e#Rwd',
   check_policy  = off,
   default_database = master,
   default_language = russian
end
go

use master
go
if database_principal_id ( N'MainReplUser' ) is null
begin
 create user MainReplUser
  from login MainRepl
  with default_schema = dbo
end
go
if database_principal_id ( N'MirrorReplUser' ) is null
begin
 create user MirrorReplUser
  from login MirrorRepl
  with default_schema = dbo
end
go

use master
go
if cert_id ( N'MainReplHost' ) is null
begin
 create certificate MainReplHost
  authorization MainReplUser
  from file = 'C:\Program Files\Microsoft SQL Server\MSSQL12.WITNESS\MSSQL\DATA\MainReplHost.cer'
end
go
if cert_id ( N'MirrorReplHost' ) is null
begin
 create certificate MirrorReplHost
  authorization MirrorReplUser
  from file = 'C:\Program Files\Microsoft SQL Server\MSSQL12.WITNESS\MSSQL\DATA\MirrorReplHost.cer'
end
go

use master
go
grant connect on endpoint::Mirroring to MainRepl, MirrorRepl
go
Все готово к включению сеанса зеркального отображения. Для этого сперва надо на зеркальном сервере указать основной сервер в качестве партнера:
alter database Docs set partner = 'tcp://LAPTOP-LBO51UJL:7024'
Выше в качестве значения свойства partner указывается полное доменное имя и через двоеточие порт конечной точки на другом сервере (впереди пишется "tcp://"). Следующим шагом нужно на основном сервере указать адрес зеркального:
alter database Docs set partner = 'tcp://LAPTOP-LBO51UJL:7025'
Зеркальное отображение заработало. Надо отметить что зеркалирование может работать в одном из двух режимов: режим высокой безопасности либо режим высокой производительности. При втором режиме фиксация транзакции происходит асинхронно относительно передачи данных на зеркальный сервер. Этот режим подходит для серверов, в которых не происходят массовые операции и допустима некоторая потеря данных. В асинхронном режиме недоступна автоматическая отработка отказа. Если же требуется наивысший уровень безопасности и нет массовых операций, то можно использовать зеркалирование с синхронном режиме. Для асинхронного режима требуется выполнить инструкцию на основном сервере:
alter database Docs set partner safety off
Опция full позволяет работать в синхронном режиме:
alter database Docs set partner safety full
На основном или зеркальном сервере можно выполнить запрос:
select db_name ( database_id ), *
from sys.database_mirroring
where  db_name ( database_id ) = 'Docs'
Это позволит проверить роль текущего сервера (основной или зеркальный), состояние сеанса зеркалирования (синхронизировано либо идет прием новых строк журнала транзакций), адрeса партнеров и следящего сервера, состояние соединения со свидетелем и режим зеркалирования.
Для выполнения ручной отработки отказа, нужно на сервере, являющемся на данный момент основным выполнить код (требуется перевод в режим высокой безопасности):
use master alter database Docs set partner failover
В этот момент на сервере, который был основным, база данных Docs перейдет в состояние restoring, а на сервере, который был зеркальным, она станет доступной. Перенос данных уже пойдет в обратную сторону: от зеркального сервера к основному. Эти изменения немедленно отразятся и в представлении sys.database_mirroring. Зеркальное отображение можно приостановить, если в этом есть необходимость, а затем продолжить. Для этого используются соответственно инструкции:
alter database Docs set partner suspend
alter database Docs set partner resume
Надо только помнить, что если сеанс приостановлен, то журнал транзакций в базе на основном сервере не будет усекаться до тех пор пока зеркало не станет синхронизированным с основным. Для удаления сеанса используется значение off для опции partner:
alter database Docs set partner off
Если основной сервер стал недоступным и не произошло отработки отказа (при этом следящего нет либо он подключен к зеркальному), то можно принудительно сделать доступным зеркальный сервер. Это приведет к остановке зеркалирования и возможно потере данных, которые не были реплицированы с основного сервера. Зеркальный сервер делается доступным с помощью такой инструкции:
alter database Docs set partner force_service_allow_data_loss
Подключим теперь следящий сервер к основному для возможности автоматической отработки отказа:
alter database Docs set witness = 'tcp://LAPTOP-LBO51UJL:7023'
Для отключения следящего сервера достаточно выполнить код:
alter database Docs set witness off
На следящем сервере можно делать запросы к представлению sys.database_mirroring_witnesses для просмотра состояния сеанса зеркалирования. Теперь подключения приложений могут автоматически перенаправляться на доступный сервер. Для этого требуется указать дополнительный атрибут в строке подключения. Например, на C# можно так создать экземпляр подключения:
using (SqlConnection cn = new SqlConnection(@"Data Source=LAPTOP-LBO51UJL;Failover Partner=LAPTOP-LBO51UJL\MIRROR;Initial Catalog=Docs;Integrated Security=True;"))
Если в момент отработки отказа соединение будет выполнять какую-либо работу с базой, то активная транзакция откатится, но последующие подключения пройдут без проблем.
То что со свидетелем работает автоматическая отработка отказа, это здорово: повышается доступность приложений. Но хорошо бы получать автоматические уведомления о факте отработки отказа. Приведу пример того как это сделать с помощью уведомлений о событиях. Для удобства на основном и зеркальном серверах создадим базу данных с таблицей, в которой будет храниться лог сообщений о состоянии сеанса зеркалирования. Также понадобится настройка объектов Service Broker и создание уведомления о событии. Итак, код, приведенный ниже, нужно выполнить на основном и зеркальном серверах:

if db_id ( 'CheckData' ) is null
begin
 create database CheckData
end
go
alter database CheckData set enable_broker
go

use CheckData
go
if object_id ( 'dbo.DbMirrorLog', 'U' ) is null
begin
 create table dbo.DbMirrorLog
 (
  dtDate datetime  not null,
  vcLog varchar ( max )  not null,
  vcDb varchar ( 200 )  not null,
  iState int   not null,
  iRowId int identity ( 1, 1 ) not null,
  constraint PK_bMirrorLog primary key clustered ( dtDate asc, iRowId asc ) on [primary]
 ) on [primary]
end
go

if object_id ( 'dbo.LogMirrorState', 'P' ) is null
begin
 exec sp_executesql 'create proc dbo.LogMirrorState as return'
end
go

alter proc dbo.LogMirrorState
as
begin
 declare @xMes xml, @uidHan uniqueidentifier

 ;
 receive top ( 1 ) @xMes = message_body, @uidHan = conversation_handle
 from dbo.MirrorNotify

 if @@rowcount = 0
 begin
  return
 end

 insert into dbo.DbMirrorLog
 (
  dtDate,
  vcLog,
  vcDb,
  iState
 )
 values
 (
  isnull ( @xMes.value ( '(/EVENT_INSTANCE/PostTime)[1]', 'datetime' ), getdate () ),
  isnull ( @xMes.value ( '(/EVENT_INSTANCE/TextData)[1]', 'varchar ( max )' ), '' ),
  isnull ( @xMes.value ( '(/EVENT_INSTANCE/DatabaseName)[1]', 'varchar ( max )' ), '' ),
  isnull ( @xMes.value ( '(/EVENT_INSTANCE/State)[1]', 'int' ), -1 )
 )
end
go

if object_id ( 'dbo.MirrorNotify', 'SQ' ) is null
begin
 create queue dbo.MirrorNotify
 with
  status = on,
  activation
  (
   status = on,
   procedure_name = dbo.LogMirrorState,
   max_queue_readers = 4,
   exec as owner
  )
 on [default]
end
go

if not exists
(
 select *
 from sys.services
 where name = 'MirrorLog'
)
begin
 create service MirrorLog
  on queue MirrorNotify ( [http://schemas.microsoft.com/SQL/Notifications/PostEventNotification] )
end
go

if not exists
(
 select *
 from sys.server_event_notifications
 where name = 'DbMirrorLog'
)
begin
 create event notification DbMirrorLog
  on server for DATABASE_MIRRORING_STATE_CHANGE
  to service 'MirrorLog', 'current database'
end
go

Можно выполнить несколько мануальных отработок отказа, а затем на основном сервере выполнить инструкцию shutdown для его отключения (либо отключить его в диспетчере конфигураций). При этом следящий сервер сразу определит, что основной недоступен и выполнит отработку отказа на зеркало. Затем нужно включить основной сервер, и он синхронизируется с бывшим зеркальным и сам станет зеркальным. Если сделать запросы к таблице DbMirrorLog, то увидим такой лог:










Если настроить на сервере Database Mail, то в процедуре активации очереди можно вызывать процедуру msdb.dbo.sp_send_dbmail для мгновенного оповещения об отработке отказа.

суббота, 9 июля 2016 г.

Исключения в logon-триггерах

Logon-триггеры, которые появились в MS SQL 2005 предоставляют удобный механизм для логирования подключений. Подключения могут протоколироваться и средствами профайлера и другими методами слежения за процессами. Однако Logon-триггеры превратили процесс подключения в транзакцию, которую при желании можно откатить. Например, можно выполнять проверки для разрешения или запрета на подключение к серверу.

Приведем такой пример. Создадим базу данных для хранения настроек и логов:

use master
go


if db_id ( N'LogStore' ) is null
begin
        create database LogStore
        on primary
        (
                name = LogData,
                filename = N'C:\Users\В\Desktop\LogData.mdf',
                size = 100 Mb,
                filegrowth = 100 Mb,
                maxsize = unlimited
        )
        log on
        (
                name = LogDataLog,
                filename = N'C:\Users\В\Desktop\LogDataLog.ldf',
                size = 100 Mb,
                filegrowth = 100 Mb,
                maxsize = unlimited
        )

end

go


Создадим таблицу, где будут перечислены компьютеры, с которых запрещено подключаться к серверу:
use LogStore
go


if object_id ( N'dbo.ExHosts', N'U' ) is null

begin
        create table dbo.ExHosts
        (
                iHostId int identity ( 1, 1 ) not null,
                vcHost nvarchar ( 200 ) not null,
                constraint PK_ExHosts_iHostId primary key clustered ( iHostId asc ) on [primary],
                constraint AK_ExHosts_vcHost unique nonclustered ( vcHost asc ) on [primary]
        )
end

go


Логон-триггер может проверять компьютер соединения и откатывать транзакцию, если окажется, что в таблице есть строка с таким компьютером. Перед созданием триггера надо определиться с тем под какой учетной записью он будет работать (ведь не стоит давать всем доступ на базу и на таблицу). Создадим такую учетную запись и предоставим ей права:

use master

go


if suser_id ( N'LogLogin' ) is null

begin


create login LogLogin
        with
                   password = N'qw3@',
                   check_policy = off,
                   default_database = LogStore,
                   default_language = russian

end

go


use LogStore


go


if database_principal_id ( N'LogUser' ) is null

begin

        create user LogUser
                from login LogLogin
                with default_schema = dbo
end

go


if database_principal_id ( N'LogRole' ) is null

begin
        create role LogRole
                  authorization dbo

end

go

grant select on object::dbo.ExHosts to LogRole

go


alter role LogRole add member LogUser

go




Теперь можно создать триггер на вход:

use master

go


if not exists
(
        select *
        from sys.server_triggers
        where name = N'CheckHost'
)
begin
        exec sp_executesql N'create trigger CheckHost on all server for logon as return'

end
go


alter trigger CheckHost on all server
with exec as 'LogLogin'
for logon

as
begin
        set nocount, xact_abort on
        if exists
        (
                select *
                from LogStore.dbo.ExHosts
                where vcHost = isnull ( host_name (), N'' )
        )
        begin
                ; throw 60000, N'Попытка входа с запрещенного хоста.', 1
                if @@trancount > 0
                begin
                        rollback tran
                end
        end
end
go




При попытке зайти на сервер с запрещенного компьютера произойдет ошибка. Например, если попытаться сделать такое подключение в Management Studio, то появится сообщение об ошибке:

Текста сообщения об ошибке, написанного в коде триггера, здесь не видно. Но такие ошибки попадают в журнал сервера, где их можно обнаружить:


Казалось бы все неплохо. Но при работе с logon-триггерами есть один подводный камень, состоящий в следующем. Код в таких триггерах выполняется в контексте неявной транзакции, опция xact_abort всегда включена. Любое исключение, возникающее из-за процессов, которые не предусмотрел разработчик, приводит к откату этой транзакции и, как следствие, невозможности подключения к серверу. В худшем случае, когда ошибка происходит при любой попытке входа, создается ситуация, когда никто не может подключиться к серверу. Никакая обработка ошибок, например, с помощью try/catch, не поможет, поскольку, как было отмечено выше, транзакция уже откатилась, и ничто не поможет. В этой ситуации можно входить на сервер используя некоторые специальные режимы. Например, режим приоритетного соединения администратора позволяет зайти на сервер, поскольку на такое соединение триггеры не действуют. Для этого требуется ввести в командной строке: sqlcmd -S <имя сервера> -E -A. После чего требуется ввести текст команды: disable trigger CheckHost on all server. Вместо командной строки можно использовать и Management Studio, если в окне подключения перед именем сервера написать: admin:. Возможно, также потребуется произвести эти действия с серверного компьютера (если отключена опция remote admin connection).

Хотел бы предложить подход, который минимизирует вероятность ошибок в работе триггера. Во-первых, логика работы кода должна быть как можно более простой. Пусть это будут простые проверки. Если требуются более сложные способы обработки данных, не связанные с проверкой возможности подключения, то лучше вынести их в другие, нетранзакционные, средства отслеживания событий. Во-вторых, нужно проанализировать все выполняемые в триггере действия по порядку. В нашем случае они такие:
1. Триггер работает от имени логина LogLogin. Пока триггер работает в его контексте удалить логин не получится. Здесь все в порядке.
2. Делается попытка просмотра объекта в базе данных LogStore. Значит, чтобы все было хорошо, требуется, чтобы существовала база данных с таким именем, она должна быть доступной, и текущая учетная запись должна иметь права на подключение к ней.
3. Производится попытка запроса к таблице dbo.ExHosts. Для ее успеха требуется наличие таблицы с таким именем и права на запросы к ней для текущей учетной записи.
4. В запросе к таблице делается условие на столбец. Поэтому требуется удостовериться, что у заданной таблицы существует столбец с таким именем и что его тип данных такой какой мы ожидаем: nvarchar.

Итак, вставим в текст триггера все эти проверки, сделав его работу максимально безопасной. Ниже идет полный код с моими комментариями.

alter trigger CheckHost on all server
with exec as 'LogLogin'
for logon
as
begin

        set nocount, xact_abort on

        -- проверим возможность выполнения запросов к системныму каталогу с перечнем баз данных
    if isnull ( has_perms_by_name ( 'sys.databases', 'object', 'select', null, null ), 0 ) = 0
    begin
        return
    end

    -- существование базы данных с заданным именем в состоянии online
    if not exists
    (
        select *
        from sys.databases
        where name = 'LogStore' and state_desc = 'ONLINE'
    )
    begin
        return
    end

    -- возможность подключения к базе данных
    if isnull ( has_perms_by_name ( 'LogStore', 'database', 'connect', null, null ), 0 ) = 0
    begin
        return
    end

    -- наличие таблицы
    if object_id ( N'LogStore.dbo.ExHosts', N'U' ) is null
    begin
        return
    end

    -- наличие доступов на select к таблице (эта функция корректно учтет наличие несколько доступов и запретов)
    if isnull ( has_perms_by_name ( 'LogStore.dbo.ExHosts', 'object', 'select', null, null ), 0 ) = 0
    begin
        return
    end

    -- проверка существования столбца с определенным именем
    declare @vcSql nvarchar ( max ), @iColId int
    set @vcSql = N'use LogStore set @iColId = columnproperty ( object_id ( N''dbo.ExHosts'', N''U'' ), N''vcHost'', N''ColumnId'' )'
    exec sp_executesql @vcSql, N'@iColId int out', @iColId = @iColId out
    if @iColId is null
    begin
        return
    end

    -- возможность выполнения запросов в системному представлению с перечнем столбцов
    declare @bPerm int
    set @vcSql = N'use LogStore set @bPerm = has_perms_by_name ( ''LogStore.sys.columns'', ''object'', ''select'', null, null )'
    exec sp_executesql @vcSql, N'@bPerm int out', @bPerm = @bPerm out
    if @bPerm = 0
    begin
        return
    end


    -- наличие у столбца нужного типа данных
    if not exists
    (
        select *
        from LogStore.sys.columns
        where
            name = 'vcHost' and
            object_id = object_id ( N'LogStore.dbo.ExHosts', N'U' ) and
            type_name ( system_type_id ) = 'nvarchar' and
            type_name ( user_type_id ) = 'nvarchar' and
            max_length = 400
    )
    begin
        return
    end
    if exists
    (
        select *
        from LogStore.dbo.ExHosts
        where vcHost = isnull ( host_name (), N'' )
    )
    begin
        ; throw 60000, N'Попытка входа с запрещенного компьютера.', 1
        if @@trancount > 0
        begin
            rollback tran
        end
    end
end
go

В коде делается проверка возможности всех действий, которые выполняются при проверке законности входа. Причем, если для проверки возможности действия, происходит обращение к системному объекту, то предварительно делается проверка возможности обращения к системному объекту. Теперь, если, скажем, изменить имя столбца (exec LogStore..sp_rename 'dbo.ExHosts.vcHost', 'vcName', 'column') или сделать запрет для учетной записи на запросы к таблице (exec ( N'use LogStore deny select on object::dbo.ExHosts to LogUser' )) или вообще удалить учетную запись, триггер спокойно продолжит свою работу. В этой ситуации он, конечно, не сможет выполнять свои прямые обязанности по проверке законности подключений. Но в большинстве случаев это лучше чем совсем никого не пускать.