Страницы

четверг, 3 августа 2017 г.

Проектирование OLAP-кубов и многомерные CLR-сборки

Тренируясь в решении задач проектирования OLAP-кубов, я неожиданно нашел интересное и полезное применение многомерной clr-сборки для mdx-запросов. Рассмотрим следующий пример.

Как и всегда при работе с кубами начнем с таблицы фактов, в которой хранятся меры и есть измерения. Измерение времени является фундаментальным. Однако при большом объеме данных удобно хранить их в реляционной базе данных не на каждый день, а только за те дни, когда происходят изменения в значении меры. Это позволяет избежать дублирования большого объема идентичных значений меры и намного уменьшает объем хранимых данных. Рассмотрим такой пример реляционных таблиц.


if object_id ( 'dbo.Dates', 'U' ) is null
begin
    create table dbo.Dates
    (
        dtDate date not null,
        constraint PK_Dates_dtDate primary key clustered ( dtDate asc ) on [primary]
    ) on [primary]
end
go

if object_id ( 'dbo.AttrData', 'U' ) is null
begin
    create table dbo.AttrData
    (
        iAttrId int not null,
        constraint PK_AttrData_iAttrid primary key clustered ( iAttrId asc ) on [primary]
    ) on [primary]
end
go

if object_id ( 'dbo.Data', 'U' ) is null
begin
    create table dbo.Data
    (
        iDataId int not null,
        iAttrId int not null,
        constraint PK_Data_iDataId primary key clustered ( iDataId asc ) on [primary],
        constraint FK_Data_iAttrId foreign key ( iAttrId ) references dbo.AttrData ( iAttrId )
            on update cascade on delete no action
    ) on [primary]
end
go

if object_id ( 'dbo.FactStore', 'U' ) is null
begin
    create table dbo.FactStore
    (
        iDataId    int   not null,
        dtDate     date  not null,
        dtDateNext date  not null,
        fSum       float not null,
        constraint PK_FactStore_dtDate_dtDateNext_iDataId primary key clustered
            ( dtDate asc, dtDateNext desc, iDataId asc ) on [primary],
        constraint AK_FactStore_iDataId_dtDate unique nonclustered
            ( iDataId asc, dtDate asc ) on [primary],
        constraint FK_FactStore_iDataId foreign key ( iDataId ) references dbo.Data ( iDataId )
            on update cascade on delete no action,
        constraint FK_FactStore_dtDate foreign key ( dtDate ) references dbo.Dates ( dtDate )
            on update no action on delete no action,
        constraint FK_FactStore_dtDateNext foreign key ( dtDateNext ) references dbo.Dates ( dtDate )
            on update no action on delete no action
    ) on [primary]
end

Таблица dbo.FactsStore это таблица фактов, dbo.Data - сущности, dbo.AttrData и dbo.Dates - измерения. Создадим вспомогательную таблицу для возможности хранения данных в кубе только на уровне атрибутов iDataId без хранения самих iDataId:


if object_id ( 'dbo.Facts', 'U' ) is null
begin
    create table dbo.Facts
    (
        dtDate     date  not null,
        dtDateNext date  not null,
        fSum       float not null,
        iAttrId    int   not null,
        constraint PK_Facts_dtDate_dtDateNext_iAttrId primary key clustered
            ( dtDate asc, dtDateNext desc, iAttrId asc ) on [primary],
        constraint AK_Facts_iAttrId_dtDate unique nonclustered ( iAttrId asc, dtDate asc ) on [primary],
        constraint FK_Facts_iAttrId foreign key ( iAttrId ) references dbo.AttrData ( iAttrId )
            on update cascade on delete no action,
        constraint FK_Facts_dtDate foreign key ( dtDate ) references dbo.Dates ( dtDate )
            on update no action on delete no action,
        constraint FK_Facts_dtDateNext foreign key ( dtDateNext ) references dbo.Dates ( dtDate )
            on update no action on delete no action
    ) on [primary]
end
go

Создадим многомерную базу данных на основе этой реляционной схемы. Надо отметить, что для простоты в этом примере помимо измерения времени есть только одно измерение. Однако в реальности измерений может быть намного больше: десятки и сотни. Поэтому в целях уменьшения объема хранимых ячеек в кубе можно хранить данные так же как и в таблице фактов dbo.FactStore, то есть не на каждый день, а только за даты изменения мер. Ниже мы также посмотрим какие есть подводные камни на этом пути и как их обойти.

В среде Visual Studio создадим новый проект типа Analysis Services Multidimensional and Data Mining Project. Затем требуется выполнить такие тривиальные операции как создание источника данных и представления источника данных. После этого все готово для создания измерений. В измерениях можно и нужно создавать иерархии, но это не является главной темой статьи, поэтому таблицы измерений простые, содержат всего по одному атрибуту, в связи с чем пользовательские иерархии создавать не будем. Для удобства имена измерений и атрибутов оставим такими же как и в реляционной базе данных.

Создадим куб Facts на основе таблицы dbo.Facts. В нем будет одна группа мер с одной мерой fSum. Измерения времени в кубе назовем Dates и DatesNext. Приступим к разработке запросов. Для получения суммы fSum в группировке по iAttrId на определенный день (например, на 2017-05-12) в реляционной базе данных используется такой запрос:


select data.iAttrId, fSum = sum ( facts.fSum )
from
    dbo.FactStore facts
        inner join
    dbo.Data data on facts.iDataId = data.iDataId
where '2017-05-12' between facts.dtDate and facts.dtDateNext
group by data.iAttrId

Для того чтобы переписать этот запрос на языке MDX, вспомним, что в MDX есть операция пересечения множеств. То есть нужно взять измерение даты начала от начальной точки до интересующей даты и пересечь это множество с частью измерения даты окончания от даты отчета до конечной точки. Можно сделать это таким образом:


select
    Measures.fSum on columns,
    non empty AttrData.iAttrId.iAttrId on rows
from Facts
where
    (
        { Dates.dtDate.dtDate.[yyyy-mm-dd_min] : Dates.dtDate.dtDate.[2017-05-12] },
        { DatesNext.dtDate.dtDate.[2017-05-12] : DatesNext.dtDate.dtDate.[yyyy-mm-dd_max] }
    )

В запрос вместо символов yyyy-mm-dd_min, yyyy-mm-dd_max нужно подставить минимальную и максимальную дату во временном измерении. Здесь мы натыкаемся на проблему, которая состоит в том, что при большом числе ячеек в кубе и большом числе дней операция пересечения временных измерений начинает работать очень медленно. Выход из этой ситуации можно найти, применяя секционирование. Чтобы облегчить операцию поиска пересечений, было бы хорошо работать не с самого начала измерения Dates, а с некоторой не очень далекой точки, и не до самого конца измерения DatesNext, а до определенной концевой не очень далекой точки. Достичь этого можно следующим образом. Представим себе, что есть некоторый интервал, например, месяц. И за каждый такой месяц физически есть своя история изменения меры fSum для сущностей iDataId. То есть на каждую дату начала секции (месяца) в таблице хранятся состояния fSum для всех имеющихся в этот день сущностей. Если есть iDataId, у которого fSum не меняется с 20.05 по 05.06, то в таблице dbo.FactStore хранится не одна, а 2 строки. У первой будет дата окончания 31.05, у второй дата начала 01.06. С одной стороны это создает некоторую избыточность. Но с другой стороны облегчает и работу последующих запросов mdx и работу с самой реляционной таблицей: можно работать с месяцами как с различными секциями одной секционированной таблицы, что облегчает перезагрузку данных за периоды, уменьшает уровень блокировок. Данный подход является компромиссом между хранением данных на каждый день и хранением за периоды без секционирования. Наличие срезов на начало секций создает определенную избыточность, однако при не очень коротких секциях сокращение объема за счет хранения данных в пределах секции за период очень значительно по сравнению с хранением атрибутов на каждый день. Секционируя таблицу фактов, конечно, нужно секционировать и куб.

Для реализации этого подхода на стороне Analysis Services необходимо наполнить таблицу dbo.Facts. Делать это лучше посекционно. Код ниже показывает пример того как это можно сделать для одного месяца (05.2017).


if object_id ( 'tempdb..#Buf', 'U' ) is not null
begin
    drop table #Buf
end
select dates.dtDate, data.iAttrid, fSum = sum ( fSum )
from
    dbo.FactStore facts
        inner join
    dbo.Dates dates on dates.dtDate between facts.dtDate and facts.dtDateNext
        inner join
    dbo.Data data on data.iDataId = facts.iDataId
where facts.dtDate between '2017-05-01' and '2017-05-31'
group by dates.dtDate, data.iAttrId

create unique clustered index IX_iAttrId_dtDate on #Buf ( iAttrId asc, dtDate asc ) on [primary]

if object_id ( 'tempdb..#BufDiff', 'U' ) is not null
begin
    drop table #BufDiff
end
select iAttrId, dtDate, fSum
    into #BufDiff
from
(
    select
        iAttrId,
        dtDate,
        fSum,
        dtDate_Prev = lag ( dtDate ) over ( partition by iAttrId order by dtDate asc ),
        fSum_Prev = lag ( fSum ) over ( partition by iAttrId order by dtDate asc )
) data
    ( dtDate <> dateadd ( day, 1, dtDate_Prev ) or dtDate_Prev is null )
    or
    ( fSum <> fSum_Prev or fSum_Prev is null )
create unique clustered index IX_iAttrId_dtDate on #BufDiff ( iAttrId asc, dtDate asc ) on [primary]

if object_id ( 'tempdb..#BufEnds', 'U' ) is not null
begin
    drop table #BufEnds
end
select iAttrId, dtDate
    into #BufEnds
from #Buf curr
where not exists
(
    select *
    from #Buf nxt
    where
        curr.dtDate = dateadd ( day, -1, nxt.dtDate )
        and
        curr.iAttrId = nxt.iAttrId
)
create unique clustered index IX_iAttrId_dtDate on #BufEnds ( iAttrId asc, dtDate asc ) on [primary]

if object_id ( 'tempdb..#BufHist', 'U' ) is not null
begin
    drop table #BufHist
end
select buf.iAttrId, buf.dtDate, buf.fSum, dates.dtDateNext
    into #BufHist
from
    #BufDiff buf
        inner join
    (
        select iAttrId, dtDate,
            dtDateNext = isnull
            (
                dateadd ( day, -1, lead ( dtDate ) over ( partition by iAttrId order by dtDate asc ) ),
                '9999-12-31'
            )
    ) dates on
        buf.iAttrId = dates.iAttrId and
        buf.dtDate = dates.dtDate
create unique clustered index IX_iAttrId_dtDate_dtDateNext on #BufHist ( iAttrId asc, dtDate asc, dtDateNext desc ) on [primary]

update hist with ( tablock )
    set hist.dtDateNext = ends.dtDate
    from
        #BufHist hist
            inner join
        #BufEnds ends on
            hist.iAttrId = ends.iAttrId and
            ends.dtDate between hist.dtDate and dateadd ( day, -1, hist.dtDateNext )

/* вместо delete лучше секцинировать dbo.Facts и использовать switch partition (до ms sql 2016 для этого требуется вспомогательная буферная таблица) */
delete dbo.Facts with ( tablock )
    where dtDate between '2017-05-01' and '2017-05-31'

insert into dbo.Facts ( iAttrId, dtDate, dtDateNext, fSum )
    select iAttrId, dtDate, dtDateNext, fSum
    from #BufHist

В результате запрос на T-SQL перепишется так:


select data.iAttrId, fSum = sum ( facts.fSum )
from
    dbo.FactStore facts
        inner join
    dbo.Data data on facts.iDataId = data.iDataId
where
    dtDate between '2017-05-01' and '2017-05-31' and
    '2017-05-12' between facts.dtDate and facts.dtDateNext
group by data.iAttrId

, а запрос на MDX будет выглядеть так:

select
    Measures.fSum on columns,
    AttrData.iAttrId.iAttrId on rows
from Facts
where
    (
        { Dates.dtDate.dtDate.[2017-05-01] : Dates.dtDate.dtDate.[2017-05-12] },
        { DatesNext.dtDate.dtDate.[2017-05-12] : DatesNext.dtDate.dtDate.[2017-05-31] }
    )

Продолжим разработку запросов. Очень часто данные надо агрегировать не для одного дня, а для многих дней из определенного периода. Например, если требуется получить данные в группировке по дню и iAttrId за 05.2017, то на T-SQL можно написать такой запрос:


select data.iAttrId, dates.dtDate, fSum = sum ( facts.fSum )
from
    dbo.FactStore facts
        inner join
    dbo.Data data on facts.iDataId = data.iDataId
        inner join
    dbo.Dates dates on dates.dtDate between facts.dtDate and facts.dtDateNext
where
    dates.dtDate between '2017-05-01' and '2017-05-31' and
    facts.dtDate between '2017-05-01' and '2017-05-31'
group by data.iAttrId, dates.dtDate

Чтобы переписать этот запрос на MDX напишем для начала такую заготовку:


select
    Measures.fSum on columns,
    non empty
    (
        AttrData.iAttrId.iAttrId
        *
        (
            { Dates.dtDate.dtDate.[2017-05-01] : Dates.dtDate.dtDate.[2017-05-31] },
            { DatesNext.dtDate.dtDate }
        )
    ) on rows
from Facts

Данный запрос предоставляет нужные значения атрибутов, но не на каждый день периода, а только за те дни, когда для атрибута менялась мера. Честно говоря, не уверен, что средствами mdx, можно переписать запрос так, чтобы он из одной ячейки, соответствующей периоду в 5 дней, смог вернуть 5 ячеек на каждый день внутри периода. Предлагаю решить эту задачу, используя CLR-сборку, содержащую метод, который принимает на вход и возвращает многомерное множество. В коде предполагается, что во входном множестве первая координата это дата начала, а вторая - дата окончания. Исходный код выглядит следующим образом:


using System;
using System.Data;
using Microsoft.AnalysisServices;
using Microsoft.AnalysisServices.AdomdServer;

namespace Tasks
{
    public Set ConvertPeriods(Set InputSet)
    {
        SetBuilder SetB = new SetBuilder();
        foreach (Microsoft.AnalysisServices.AdomdServer.Tuple tp in InputSet.Tuples)
        {
            System.DateTime DateStart = System.DateTime.Parse(tp.Members[0].Name),
                DateEnd = System.DateTime.Parse(tp.Members[1].Name);

            for (int i = 0; i < (DateEnd - DateStart).TotalDays + 1; i++)
            {
                Expression exp = new Expression();

                System.DateTime dt = DateStart.AddDays(i);
                exp.ExpressionText = "Dates.dtDate.dtDate.[" + dt.Year.ToString() + "-" +
                    (dt.Month.ToString().Length == 1 ? "0" : "") + dt.Month.ToString() + "-" +
                    (dt.Day.ToString().Length == 1 ? "0" : "") + dt.Day.ToString() + "]";

                Member mem = exp.CalculateMdxObject(null).ToMember();
                TupleBuilder tpBuild = new TupleBuilder();

                tpBuild.Add(mem);
                for (int h = 2; h < tp.Members.Count; h++)
                {
                    tpBuild.Add(tp.Members[h]);
                }
                SetB.Add(tpBuild.ToTuple());
            }
        }
    }
}

Используя Management Studio или XML/A, можно развернуть сборку в многомерной базе данных Analysis Services. Пусть она называется MdTasks. Тогда запрос переписывается в таком виде:


select
    Measures.fSum on columns,
    non empty MdTasks.ConvertPeriods
    (
        nonempty
        (
            (
                { Dates.dtDate.dtDate.[2017-05-01] : Dates.dtDate.dtDate.[2017-05-31] },
                { DatesNext.dtDate.dtDate }
            )
            *
            AttrData.iAttrId.iAttrId
        )
    ) on rows
from Facts

Множество, передаваемое clr-функции, обернуто в функцию nonempty для ускорения работы метода. Теперь и на стороне MDX можно получать агрегаты на каждый день из кубов, хранящих данные в экономном формате за периоды изменения мер!


вторник, 1 августа 2017 г.

In-Memory базы данных Analysis Services

Для повышения производительности аналитических отчетов, использующих таблицы с большими объемами данных, постоянно разрабатываются и внедряются все новые технологии. В частности в СУБД MS SQL Server в 2012 году появились колоночные индексы, а начиная с MS SQL 2014 и далее развивается интеграция реляционного движка с таблицами, оптимизированными для оперативной памяти. В то же время еще в 2012 году в Analysis Services появилась возможность устанавливать его в одном из двух режимов: многомерном или табличном. В новом табличном режиме в Analysis Services можно создавать базы данных, содержащие табличные модели, которые хранятся в оперативной памяти. При этом таблицы в таких базах данных хранятся в колоночном режиме (каждый столбец хранится отдельно, с применением специальных алгоритмов сжатия и сбора статистики).

Если развитие реляционных таблиц, оптимизированных для оперативной памяти, замедляется необходимостью их интеграции с обилием компонентов, создававшихся десятилетиями, то табличный Analysis Services был создан с нуля, и для многих это может стать преимуществом. К примеру в MS SQL 2014 запросы к memory-таблицам выполняются на одном ядре, что автоматически делает их непригодными для больших и даже средних объемов в данной версии СУБД. В Analysis Services такой проблемы нет, там все распараллеливается по доступным ядрам. Рассмотрим пример того как создавать и делать запросы к табличным моделям и сравним их производительность с запросами к классическим реляционным таблицам.

Создадим среду тестирования с базой данных, состоящей из нескольких таблиц фактов и измерений.

if db_id ( 'Testing' ) is null
begin
    create database Testing
        on primary
        (
            name = TestingData,
            filename = 'C:\Users\dataUser\Desktop\TestingData.mdf',
            size = 1 Gb,
            maxsize = unlimited,
            filegrowth = 100 Mb
        )
        log on
        (
            name = TestingLog,
            filename = 'C:\Users\dataUser\Desktop\TestingLog.ldf',
            size = 1 Gb,
            maxsize = unlimited,
            filegrowth = 100 Mb
        )
end
go

alter database Testing set recovery simple
go

use Testing
go

if object_id ( 'dbo.Attrs', 'U' ) is null
begin
    create table dbo.Attrs
    (
        iAttrId int identity ( 1, 1 ) not null,
        vcAttr  varchar ( 100 )       not null,
        constraint PK_Attrs_iAttrId primary key clustered ( iAttrId asc ) on [primary]
    ) on [primary]
end
go

if object_id ( 'dbo.Entities', 'U' ) is null
begin
    create table dbo.Entities
    (
        iEntityId int identity ( 1, 1 ) not null,
        vcEntity  varchar ( 100 )       not null,
        iAttrId   int                   not null,
        constraint PK_Entities_iEntityId primary key clustered ( iEntityId asc ) on [primary],
        constraint FK_Entities_iAttrId foreign key ( iAttrId ) references dbo.Attrs ( iAttrId )
            on update cascade on delete no action
    ) on [primary]
end
go

if object_id ( 'dbo.Dates', 'U' ) is null
begin
    create table dbo.Dates
    (
        dtDate date not null,
        constraint PK_Dates_dtDate primary key clustered ( dtDate asc ) on [primary]
    ) on [primary]
end
go

if object_id ( 'dbo.FactsDayly', 'U' ) is null
begin
    create table dbo.FactsDayly
    (
        iEntityId int   not null,        
        dtDate    date  not null,
        fSum      float not null,
        constraint PK_FactsDayly_dtDate_iEntityId primary key clustered
            ( dtDate asc, iEntityId asc ) on [primary],
        constraint FK_FactsDayly_iEntityId foreign key ( iEntityId ) references dbo.Entities ( iEntityId )
            on update cascade on delete no action,
        constraint FK_FactsDayly_dtDate foreign key ( dtDate ) references dbo.Dates ( dtDate )
            on update cascade on delete no action
    ) on [primary]
end
go
if object_id ( 'dbo.FactsPeriods', 'U' ) is null
begin
    create table dbo.FactsPeriods
    (
        iEntityId    int   not null,        
        dtDate       date  not null,
        dtDateNext   date  not null,
        fSum         float not null,
        constraint PK_FactsPeriods_iEntityId_dtDate primary key clustered
            ( iEntityId asc, dtDate asc ) on [primary],
        constraint AK_FactsPeriods_dtDate_dtDateNext_iEntityId unique nonclustered
            ( dtDate asc, dtDateNext desc, iEntityId asc ) on [primary],
        constraint FK_FactsPeriods_iEntityId foreign key ( iEntityId ) references dbo.Entities ( iEntityId )
            on update cascade on delete no action,
        constraint FK_FactsPeriods_dtDate foreign key ( dtDate ) references dbo.Dates ( dtDate )
            on update no action on delete no action,
        constraint FK_FactsPeriods_dtDateNext foreign key ( dtDateNext ) references dbo.Dates ( dtDate )
            on update no action on delete no action
    ) on [primary]
end
go
Теперь запустим скрипты для генерации данных в этих таблицах достаточно большого объема, который поместится в память (~100 млн. строк).

set nocount, xact_abort on

declare @dtStart date = '2017-01-01', @dtEnd date = '2017-03-31'
;
with Dates
as
(
    select dtDate = @dtStart
        union all
    select dateadd ( day, 1, dtDate )
    from Dates
    where dtDate < @dtEnd
)
insert into dbo.Dates ( dtDate )
    select dtDate
    from Dates
    option ( maxrecursion 0 )

declare @iMin int = 1, @iMax int = 20
;
with Attrs
as
(
    select iAttr = @iMin
        union all
    select iAttr + 1
    from Attrs
    where iAttr < @iMax
)
insert into dbo.Attrs ( vcAttr )
    select cast ( iAttr as varchar ( 100 ) )
    from Attrs
    option ( maxrecursion 0 )

insert into dbo.Entities with ( tablock ) ( vcEntity, iAttrId )
    select top ( 40000000 )
        cast ( abs ( binary_checksum ( newid () ) ) as varchar ( 100 ) ),
        1 + abs ( binary_checksum ( newid () ) ) % 20
    from
        master.dbo.spt_values as tab1
            cross join
        master.dbo.spt_values as tab2
            cross join
        master.dbo.spt_values as tab3

go
alter table dbo.FactsDayly drop constraint PK_FactsDayly_dtDate_iEntityId 
declare @dtStart date = '2017-01-01', @dtEnd date = '2017-03-31', @dtCurr date
set @dtCurr = @dtStart
while @dtCurr <= @dtEnd
begin
    insert into dbo.FactsDayly with ( tablock )
    (
        iEntityId,
        dtDate,
        fSum
    )
    select top ( 1000000 ) iEntityId, @dtCurr, abs ( binary_checksum ( newid () ) ) * rand () / 1000
    from dbo.Entities
    order by binary_checksum ( newid () )
    
    set @dtCurr = dateadd ( day, 1, @dtCurr )
end
alter table dbo.FactsDayly add constraint PK_FactsDayly_dtDate_iEntityId
    primary key clustered ( dtDate asc, iEntityId asc ) on [primary]
go

alter table FactsPeriods drop constraint AK_FactsPeriods_dtDate_dtDateNext_iEntityId

if object_id ( 'tempdb..#Ents', 'U' ) is not null
begin
    drop table #Ents
end
create table #Ents ( iEntityId int not null, primary key clustered ( iEntityId asc ) on [primary] ) on [primary]

declare @iMin int = 1, @iMax int = 20, @iCurr int, @vcSql nvarchar ( max )
set @iCurr = @iMin
while @iCurr <= @iMax
begin
    truncate table #Ents
    insert into #Ents with ( tablock ) ( iEntityId )
        select top ( 900000 ) iEntityId
        from dbo.Entities
        where iAttrId = @iCurr
        order by newid ()

    insert into dbo.FactsPeriods with ( tablock )
    (
        iEntityId,
        dtDate,
        dtDateNext,
        fSum
    )
    select iEntityId, dtDate = '2017-01-01',
        dtDateNext = dateadd ( day, abs ( binary_checksum ( newid () ) ) % 20 + 1, '2017-01-01' ),
        fSum = abs ( binary_checksum ( newid () ) ) * rand () / 1000
    from #Ents

    set @vcSql = N'
    insert into dbo.FactsPeriods
    (
        iEntityId,
        dtDate,
        dtDateNext,
        fSum
    )
    select iEntityId, dtDate, dtDateNext, fSum = abs ( binary_checksum ( newid () ) ) * rand () / 1000
    from
    (
        select ents.iEntityId, dtDate = dateadd ( day, 1, per.dtDateNext ),
            dtDateNext = dateadd
            (
                day,
                abs ( binary_checksum ( newid () ) ) % 20 + 1,
                dateadd ( day, 1, per.dtDateNext )
            )
        from
            #Ents ents
                inner join
            (
                select iEntityId, dtDateNext = max ( dtDateNext )
                from dbo.FactsPeriods
                group by iEntityId
            ) per on ents.iEntityId = per.iEntityId
    ) dates
    where dtDateNext <= '2017-03-31'
    '
    exec sp_executesql @vcSql
    exec sp_executesql @vcSql
    exec sp_executesql @vcSql
    exec sp_executesql @vcSql
    
    set @iCurr += 1
end

alter table FactsPeriods add constraint AK_FactsPeriods_dtDate_dtDateNext_iEntityId
    unique nonclustered ( dtDate asc, dtDateNext desc, iEntityId asc ) on [primary]

Таблица FactsDayly хранит историю изменения некоторой характеристики сущностей из dbo.Entities на каждый день. Таблица dbo.FactsPeriods хранит историю изменения атрибута в более экономном формате: за даты изменения. Тестирование скорости работы отчетов в реляционной и In-Memory базах данных будет проведено с использованием этих двух таблиц. В распоряжении имеется сервер с 16 Гб памяти, четырех ядерным процессором Core i5. Для сервера зададим предел используемой памяти:

sp_configure 'max server memory', 8800
reconfigure
Цель отчета, который будет разработан, состоит в том, чтобы получить сумму атрибута fSum в группировке по атрибуту iAttrId на каждый день. При использовании таблицы dbo.FactsDayly запрос на Transact-SQL выглядит так:

dbcc setcpuweight ( 1000 ) with no_infomsgs
select ents.iAttrId, fSum = sum ( facts.fSum )
from
    dbo.FactsDayly facts
        inner join
    dbo.Entities ents on facts.iEntityId = ents.iEntityId
group by ents.iAttrId, facts.dtDate
Оптимизатор подсказывает такой индекс:

create nonclustered index IX_iEntityId_incl on dbo.FactsDayly ( iEntityId asc )
    include ( dtDate, fSum ) on [primary]
С ним запрос отработал за 31 сек. в первый раз (план выполнения заранее построен). Последующие запуски давали почти такой же результат. Уровень потребления памяти процессом sqlservr.exe составляет 7.7 Гб, однако предел который установлен для уровня памяти составляет 8.8 Гб. Остановимся пока на этом результате, и посмотрим как обстоят дела в In-Memory базе данных. Используя Visual Studio, можно создать проект типа Analysis Services Tabular Project. Для экономии памяти будем использовать workspace-базу данных в качестве основной. В открытом проекте, используя меню Model запускаем мастер импорта таблиц. В условиях небольшого объема памяти заставляем сервер ею поделиться:

sp_configure 'max server memory', 500
reconfigure
Для начала импортируем таблицы dbo.Dates, dbo.Attrs, dbo.Entities, dbo.FactsDayly. В получившейся табличной модели должны быть установлены отношения внешних ключей между таблицами. В данном случае, сервер, видя внешние ключи в реляционной базе данных, создает связи автоматически. В настройках импорта я исключил из таблицы dbo.Entities текстовый столбец: это делается для экономии памяти, и ввиду колоночного хранения данных, не должно улучшать производительность. Аналог запроса на T-SQL выглядит на языке DAX следующим образом:

evaluate
(
    summarize
    (
        FactsDayly,
        Attrs[iAttrId],
        Dates[dtDate],
        "fSumAgg",
        sum ( FactsDayly[fSum] )
    )
)
Если вы используете не самую современную версию SQL Server Management Studio и под рукой нет стороннего редактора запросов DAX, то данный запрос можно выполнить в редакторе MDX. Если же его требуется выполнить из среды T-SQL, то сделать это можно, создав связанный сервер и запустив запрос через openquery либо exec ... at ... Запустив этот запрос в первый раз, получаем результат за 6 секунд, в последующие запуски время выполнения сократилось до одной секунды! Уровень потребления памяти базой данных Analysis Services составляет 8.6 Гб. Мы получили две базы данных, отчеты которых работают принципиально разное время, в ситуации когда обе базы используют примерно одинаковый уровень ресурсов и идентичное оборудование. Однако для реляционных БД также можно использовать колоночные индексы. Поэтому не все возможности реляционного сервера были использованы. Построим некластерный колоночный индекс на таблице dbo.FactsDayly:

create nonclustered columnstore index IX_col on dbo.FactsDayly ( iEntityId, dtDate, fSum ) on [primary]
Перезапустив запрос на T-SQL, видим в плане, что сканирование сжатого колоночного индекса в пакетном режиме делает свое дело: среднее время выполнения сократилось до 17 сек. Уровень потребления памяти при этом достиг допустимого максимума в 8.6 Гб. Результат намного лучше чем 31 сек., но все же не дотягивает до скорости DAX-запроса, работая в 17 раз медленнее! В ходе данного тестирования In-Memory база данных показала свою силу и преимущества в создании высокоскоростных аналитических отчетов, работающих с большими массивами данных. Рассмотрим теперь пример, где эти преимущества окажутся еще более явными.

При проектировании таблиц фактов может потребоваться их хранение не на каждый день, а только за даты изменения меры, как это сделано в таблице dbo.FactsPeriods. Такой подход позволяет сэкономить значительный объем места, которое занимает таблица, что серьезно облегчает ее сканирование и снижает требования к объему дисков. Для получения агрегатов в группировке по dtDate и iAttrId на основе таблицы dbo.FactsPeriods требуется такой чуть более сложный запрос на T-SQL:


dbcc setcpuweight ( 1000 ) with no_infomsgs
select ents.iAttrId, dates.dtDate, fSum = sum ( pers.fSum )
from
    dbo.FactsPeriods pers
        inner join
    dbo.Entities ents on pers.iEntityId = ents.iEntityId
        inner join
    dbo.Dates dates on dates.dtDate between pers.dtDate and pers.dtDateNext
group by ents.iAttrId, dates.dtDate
Перед его запуском зададим новое значение для максимального уровня памяти, доступного серверу:

exec sp_configure 'max Server memory', 8300
reconfigure

На кэшированных данных запрос работает 44 сек. Проверим его скорость с колоночным индексом:

create nonclustered columnstore index IX_col on dbo.FactsPeriods
    ( iEntityId, dtDate, dtDateNext, fSum ) on [primary]
С колоночным индексом запрос работает 18 секунд (добавление колоночного индекса на dbo.Entities ситуацию не изменило). Теперь ограничим уровень потребления памяти сервером:

exec sp_configure 'max Server memory', 500
reconfigure
В целях экономии памяти удалим из табличной модели In-Memory базы данных таблицу FactsDayly и добавим в модель таблицу FactsPeriods. После загрузки данных в память таблица FactsDayly должна ссылаться на таблицу Entities по столбцу iEntityId и на таблицу Dates по двум столбцам dtDate и dtDateNext. Переведем запрос с языка T-SQL на язык DAX следующим образом:

evaluate
    generate
    (
        values ( Dates[dtDate] ),
        calculatetable
        (
            summarize
            (
                FactsPeriods,
                Attrs[iAttrId],
                "fSumAgg",
                sum ( FactsPeriods[fSum] )
            ),
            filter
            (
                FactsPeriods,
                FactsPeriods[dtDate] <= Dates[dtDate]
                &&
                FactsPeriods[dtDateNext] >= Dates[dtDate]
            )
        )
    )
В первый раз запрос отработал 4 секунды, в последующем - за одну секунду! Уровень потребления памяти In-Memory базой данных оставался при этом на уровне 8.3 Гб.

Если вы будете вводить In-Memory базы данных в промышленную эксплуатацию, конечно, требуют автоматизации задачи администрирования. Например, задачи загрузки данных в память. Используя SQL Server Profiler, несложно увидеть команды на языке XML/A для перезагрузки таблиц. Данные команды также можно запускать на стороне T-SQL через связанный сервер. Наполнять таблицы можно с опцией ProcessAdd, если требуется перегружать не всю таблицу, а лишь вставить новые строки. Либо можно использовать опцию ProcessFull полной перезагрузки данных. Также как и в реляционных базах данных, таблицы в табличных моделях можно секционировать. Например, можно секционировать таблицы фактов по дате, благодаря чему можно не тратить много времени на полную перезагрузку, а перегружать лишь нужные секции. Дополнительно меня порадовало то, что в качестве источника строк для таблицы может выступать хранимая процедура (это недоступно в многомерном Analysis Services). Правда, в табличных моделях в отличие от многомерных баз данных, к сожалению, пока еще нельзя создавать своих clr-сборок.

Сам же язык DAX является мощным и активно развивающимся языком, предоставляющим много конструкций для перевода задач с языка T-SQL на DAX, обеспечивая огромную скорость. Используя SQL Server Profiler, можно отслеживать события, возникающие при выполнения DAX-запросов, например, сканирование VertiPaq. Также профайлером можно находить логический и физический планы выполнения запросов DAX, которые можно анализировать в текстовом виде, что дает возможность заниматься полноценной оптимизацией DAX-запросов. Технология, предоставляемая табличным Analysis Services, еще не используется активно в нашей стране, призываю всех активно начать ее использовать!

воскресенье, 4 сентября 2016 г.

Отслеживание событий сервера с помощью уведомлений о событиях

В MS SQL 2005 вместе с компонентом Service Broker появился новый способ наблюдения за событиями на сервере. Это объект уведомление о событиях. Данный объект создается на уровне сервера с привязкой к определенной базе данных. Объект подписывается на набор событий, о которых он получает уведомления в момент когда события происходят.
Особенность уведомлений о событиях состоит в том, что они интегрированы с Service Broker. В результате появляется возможность автоматического запуска своего кода в процедуре активации именно в тот момент, когда возникает событие. Для сохранения информации о событии такой подход гораздо удобнее чем периодическая проверка хранилища логов в файловой системе или где-то еще, как того требуют другие средства логирования (например, профайлер). Это создает сходство с ddl-триггерами, только тут нельзя делать откат. В качестве примера создадим уведомление о событии, которое позволяет автоматически сохранять в таблицах структурированные данные о графе взаимоблокировок. Для начала создадим базу данных с таблицами, в которых будут храниться наши логи.

use master
go

if db_id ( N'LogStore' ) is null
begin
 create database LogStore
  on primary
  (
   name  = LogStoreData,
   filename = N'C:\В\DbData\LogStoreData.mdf',
   size  = 100 Mb,
   filegrowth = 100 Mb,
   maxsize  = unlimited
  )
  log on
  (
   name  = LogStoreLog,
   filename = 'C:\В\TranLog\LogStoreLog.ldf',
   size  = 100 Mb,
   filegrowth = 100 Mb,
   maxsize  = unlimited
  )
end
go

use LogStore
go

-- Событие графа взаимоблокировки несет в себе информацию об участниках взаимоблокировки.
-- Каждый участник имеет набор атрибутов. Для их хранения создадим два справочника.
if object_id ( N'dbo.AttrTypes', N'U' ) is null
begin
 create table dbo.AttrTypes
 (
  iAttrTpId smallint identity ( 1, 1 ) not null,
  vcAttrTpName varchar ( 100 )   not null,
  constraint PK_AttrTypes_iAttrTpId primary key clustered
   ( iAttrTpId asc ) on [primary],
  constraint AK_AttrTypes_vcAttrTpName unique nonclustered
   ( vcAttrTpName asc ) on [primary]
 ) on [primary]
end
go

insert into dbo.AttrTypes ( vcAttrTpName )
 values
 ( 'База данных' ),
 ( 'Логин' ),
 ( 'Хост' ),
 ( 'Уровень изоляции транзакций' ),
 ( 'Приложение' ),
 ( 'Файл базы данных' ),
 ( 'Объект' ),
 ( 'Индекс' ),
 ( 'Тип блокировки' )
go

if object_id ( N'dbo.Attrs', N'U' ) is null
begin
 create table dbo.Attrs
 (
  iAttrId   int identity ( 1, 1 ) not null,
  iAttrTpId  smallint  not null,
  vcAttrName  varchar ( 100 )  not null,
  constraint PK_Attrs_iAttrId primary key clustered
   ( iAttrId asc ) on [primary],
  constraint AK_Attrs_vcAttrName_iAttrTpId unique nonclustered
   ( vcAttrName asc, iAttrTpId asc ) on [primary],
  constraint FK_Attrs_iAttrTpId foreign key ( iAttrTpId )
   references dbo.AttrTypes ( iAttrTpId ) on update cascade on delete no action
 ) on [primary]
end
go

-- Следующая таблица содержит одну строку на каждое событие. Здесь только идентификатор события и время.
if object_id ( N'dbo.Deadlocks', N'U' ) is null
begin
 create table dbo.Deadlocks
 (
  iEventId int identity ( 1, 1 ) not null,
  dtDate  datetime  not null,
  constraint PK_Deadlocks_iEventId primary key clustered
   ( iEventId asc ) on [primary]
 ) on [primary]
end
go

-- Таблица с перечнем участников взаимоблокировки. В ней перечислены атрибуты участников, а также полем с признаком жертвы.
if object_id ( N'dbo.DeadlockChains', N'U' ) is null
begin
 create table dbo.DeadlockChains
 (
  iEventId  int   not null,
  iProcessId  int identity ( 1, 1 ) not null,
  vcProcessId  varchar ( 20 )  not null,
  bVictim   bit   not null,
  vcResource  varchar ( 100 )  not null,
  iLoginId  int   not null,
  iHostId   int   not null,
  iIsolationId  int   not null,
  iAppId   int   not null,
  dtLastBatchStart datetime  not null,
  dtLastBathEnd  datetime  not null,
  iLockId   int   not null,
  vcBatch   varchar ( 8000 ) not null,
  constraint PK_DeadlockChains_iProcessId primary key clustered
   ( iProcessId asc ) on [primary],
  constraint FK_DeadlockChains_iEventId foreign key ( iEventId )
   references dbo.Deadlocks ( iEventId ) on update no action on delete no action,
  constraint FK_DeadlockChains_iLoginId foreign key ( iLoginId )
   references dbo.Attrs ( iAttrId ) on update no action on delete no action,
  constraint FK_DeadlockChains_iHostId foreign key ( iHostId )
   references dbo.Attrs ( iAttrId ) on update no action on delete no action,
  constraint FK_DeadlockChains_iIsolationId foreign key ( iIsolationId )
   references dbo.Attrs ( iAttrId ) on update no action on delete no action,
  constraint FK_DeadlockChains_iAppId foreign key ( iAppId )
   references dbo.Attrs ( iAttrId ) on update no action on delete no action,
  constraint FK_DeadlockChains_iLockId foreign key ( iLockId )
   references dbo.Attrs ( iAttrId ) on update no action on delete no action
 ) on [primary]
end
go

-- Здесь хранится перечень ресурсов, которые образовали цепочку.
if object_id ( N'dbo.DeadlockRes', N'U' ) is null
begin
 create table dbo.DeadlockRes
 (
  iEventId int   not null,
  iResId  int identity ( 1, 1 ) not null,
  iDbId  int   not null,
  iFileId  int   not null,
  iPageId  int   not null,
  iObjId  int   not null,
  iIndId  int   not null,
  iHobtId  bigint   not null,
  constraint PK_DeadlockRes_iResId primary key clustered
   ( iResId asc ),
  constraint FK_DeadlockRes_iEventId foreign key ( iEventId )
   references dbo.Deadlocks ( iEventId ) on update cascade on delete no action,
  constraint FK_DeadlockRes_iDbId foreign key ( iDbId )
   references dbo.Attrs ( iAttrId ) on update no action on delete no action,
  constraint FK_DeadlockRes_iFileId foreign key ( iFileId )
   references dbo.Attrs ( iAttrId ) on update no action on delete no action,
  constraint FK_DeadlockRes_iObjId foreign key ( iObjId )
   references dbo.Attrs ( iAttrId ) on update no action on delete no action,
  constraint FK_DeadlockRes_iIndId foreign key ( iIndId )
   references dbo.Attrs ( iAttrId ) on update no action on delete no action
 ) on [primary]
end
go

-- У каждого ресурса может быть перечень владельцев.
if object_id ( N'dbo.ResOwners', N'U' ) is null
begin
 create table dbo.ResOwners
 (
  iResId  int not null,
  iProcessId int not null,
  iModeId  int not null,
  constraint PK_ResOwners_iResId_iProcessId primary key clustered
   ( iResId asc, iProcessId asc ) on [primary],
  constraint FK_ResOwners_iResId foreign key ( iResId )
   references dbo.DeadlockRes ( iResId ) on update cascade on delete no action,
  constraint FK_ResOwners_iProcessId foreign key ( iProcessId )
   references dbo.DeadlockChains ( iProcessId ) on update cascade on delete no action
 ) on [primary]
end
go

-- У каждого ресурса может быть перечень процессов, ожидающих получения блокировки на данный ресурс.
if object_id ( N'dbo.ResWaiters', N'U' ) is null
begin
 create table dbo.ResWaiters
 (
  iResId  int not null,
  iProcessId int not null,
  iModeId  int not null,
  constraint PK_ResWaiters_iResId_iProcessId primary key clustered
   ( iResId asc, iProcessId asc ) on [primary],
  constraint FK_ResWaiters_iResId foreign key ( iResId )
   references dbo.DeadlockRes ( iResId ) on update cascade on delete no action,
  constraint FK_ResWaiters_iProcessId foreign key ( iProcessId )
   references dbo.DeadlockChains ( iProcessId ) on update cascade on delete no action
 ) on [primary]
end
go
В результате получилась такая диаграмма:






















Теперь создадим объекты брокера в базе данных. Понадобится создать только очередь и службу, так как для типов сообщений и контракта используются специальные системные объекты, имеющиеся в каждой базе данных.

alter database LogStore set enable_broker with rollback immediate
go

if object_id ( N'dbo.LogDeadlockGraph', N'P' ) is null
begin
 exec sp_executesql N'create proc dbo.LogDeadlockGraph as return'
end
go

if object_id ( N'dbo.Logger', N'SQ' ) is null
begin
 create queue dbo.Logger
  with
   status = on,
   retention = off,
   activation
   (
    status = on,
    procedure_name = dbo.LogDeadlockGraph,
    max_queue_readers = 4,
    exec as owner
   )
end
go

if not exists
(
 select *
 from sys.services
 where name = N'LogService'
)
begin
 create service LogService
  on queue dbo.Logger ( [http://schemas.microsoft.com/SQL/Notifications/PostEventNotification] )
end
go

if not exists
(
 select *
 from sys.server_event_notifications
 where name = N'ServerLogger'
)
begin
 create event notification ServerLogger
  on server
  for deadlock_graph
  to service 'LogService', 'current database'
end
go
Теперь обновим процедуру активации. Данная процедура будет читать очередь, получая XML-представление о событии. Для парсинга XML используются методы типа данных XML и выражения языка XQuery. Общая схема работы процедуры состоит в следующем.

1. Наполняем табличную переменную с перечнем участников взаимоблокировки.
2. Наполняем справочник значениями атрибутов участников.
3. Наполняем нормализованную таблицу dbo.DeadlockChains
4. Наполняем табличную переменную списков ресурсов с их блокировками от участников. При этом для простоты рассматриваем случай, когда ресурсы это строки или ключи.
5. Наполняем справочник атрибутами ресурсов
6. Наполняем нормализованную таблицу dbo.DeadlockRes. При этом для наполнения используем merge+output для сохранения соответствия узлов ресурсов и идентификаторов ресурсов.
7. Наполняем таблицы с владельцами и заблокированными соединениями.

Для конвертации xml-документа в таблицу используем метод nodes. Ниже приведен полный код хранимой процедуры.

alter proc dbo.LogDeadlockGraph
as
begin
 set nocount, xact_abort on
 
 begin try
  declare @xBody xml, @vcVictProcessId varchar ( 100 ), @dtEventTime datetime, @iEventId int, @vcSql nvarchar ( max ), @vcErr nvarchar ( max )
  begin tran
   ;
   receive top ( 1 ) @xBody = try_convert ( xml, message_body )
   from dbo.Logger
   if @@rowcount = 0
   begin
    return
   end
   
   declare @tblProcesses table
   (
    vcId  varchar ( 100 )  not null,
    vcResource varchar ( 100 )  not null,
    vcLogin  varchar ( 100 )  not null,
    vcHost  varchar ( 100 )  not null,
    vcIsolation  varchar ( 100 )  not null,
    vcApp  varchar ( 100 )  not null,
    dtStart  datetime   not null,
    dtEnd  datetime   not null,
    vcBatch  varchar ( 8000 )  not null,
    vcLock  varchar ( 100 )   not null
   )
   
   insert into @tblProcesses
   (
    vcId,
    vcResource,
    vcLogin,
    vcHost,
    vcIsolation,
    vcApp,
    dtStart,
    dtEnd,
    vcBatch,
    vcLock
   )
   select
    vcId  = left ( isnull ( process.value ( '(@id)[1]', 'nvarchar ( max )' ), '' ), 100 ),
    vcResource = left ( isnull ( process.value ( '(@waitresource)[1]', 'nvarchar ( max )' ), '' ), 100 ),
    vcLogin  = left ( isnull ( process.value ( '(@loginname)[1]', 'nvarchar ( max )' ), '' ), 100 ),
    vcHost  = left ( isnull ( process.value ( '(@hostname)[1]', 'nvarchar ( max )' ), '' ), 100 ),
    vcIsolation = left ( isnull ( process.value ( '(@isolationlevel)[1]', 'nvarchar ( max )' ), '' ), 100 ),
    vcApp  = left ( isnull ( process.value ( '(@clientapp)[1]', 'nvarchar ( max )' ), '' ), 100 ),
    dtStart  = isnull ( process.value ( '(@lastbatchstarted)[1]', 'nvarchar ( max )' ), '1900-01-01' ),
    dtEnd  = isnull ( process.value ( '(@lastbatchcompleted)[1]', 'nvarchar ( max )' ), '1900-01-01' ),
    vcBatch  = left ( isnull ( process.value ( '(inputbuf)[1]', 'nvarchar ( max )' ), '' ), 8000 ),
    vcLock  = left ( isnull ( process.value ( '(@lockMode)[1]', 'nvarchar ( max )' ), '' ), 100 )
   from @xBody.nodes ( '/EVENT_INSTANCE/TextData/deadlock-list/deadlock/process-list/process' ) processes ( process )
   
   ;
   with Attrs
   as
   (
    select vcAttr, vcAttrTp
    from
    (
     select distinct vcLogin, N'Логин' from @tblProcesses
     union all
     select distinct vcHost, N'Хост' from @tblProcesses
     union all
     select distinct vcIsolation, N'Уровень изоляции транзакций' from @tblProcesses
     union all
     select distinct vcApp, N'Приложение' from @tblProcesses
     union all
     select distinct vcLock, N'Тип блокировки' from @tblProcesses
    ) attrs ( vcAttr, vcAttrTp )
   )
   insert into dbo.Attrs ( iAttrTpId, vcAttrName )
    select attrtp.iAttrTpId, newattrs.vcAttr
    from
     Attrs newattrs
      inner join
     dbo.AttrTypes attrtp on newattrs.vcAttrTp = attrtp.vcAttrTpName
      left outer join
     dbo.Attrs attrs on
      attrs.iAttrTpId = attrtp.iAttrTpId and
      attrs.vcAttrName = newattrs.vcAttr
    where attrs.iAttrId is null
   set @vcVictProcessId = @xBody.value ( '(/EVENT_INSTANCE/TextData/deadlock-list/deadlock/@victim)[1]',
       'nvarchar ( max )' )
   set @dtEventTime = (
    select dtEnd
    from @tblProcesses
    where vcId = @vcVictProcessId
   )
   
   insert into dbo.Deadlocks ( dtDate )
    values ( @dtEventTime )
   set @iEventId = scope_identity ()
   
   ;
   with AttrData
   as
   (
    select attrs.iAttrId, attrs.vcAttrName, attrtp.vcAttrTpName
    from
     dbo.Attrs attrs
      inner join
     dbo.AttrTypes attrtp on attrs.iAttrTpId = attrtp.iAttrTpId
   )
   insert into dbo.DeadlockChains
   (
    iEventId, vcProcessId, bVictim, vcResource, iLoginId, iHostId, iIsolationId, iAppId,
    dtLastBatchStart, dtLastBathEnd, iLockId, vcBatch
   )
   select @iEventId, vcId, case when vcId = @vcVictProcessId then 1 else 0 end, vcResource,
    attrlog.iAttrId, attrhost.iAttrId, attrlevel.iAttrId, attrapp.iAttrId, processes.dtStart, processes.dtEnd,
    attrlock.iAttrId, processes.vcBatch
   from
    @tblProcesses processes
     inner join
    AttrData attrlog on processes.vcLogin = attrlog.vcAttrName and attrlog.vcAttrTpName = N'Логин'
     inner join
    AttrData attrhost on processes.vcHost = attrhost.vcAttrName and attrhost.vcAttrTpName = N'Хост'
     inner join
    AttrData attrlevel on processes.vcIsolation = attrlevel.vcAttrName and attrlevel.vcAttrTpName = N'Уровень изоляции транзакций'
     inner join
    AttrData attrapp on processes.vcApp = attrapp.vcAttrName and attrapp.vcAttrTpName = N'Приложение'
     inner join
    AttrData attrlock on processes.vcLock = attrlock.vcAttrName and attrlock.vcAttrTpName = N'Тип блокировки'
   
   declare @tblResData table
   (
    iResId  int  not null,
    iDbId  int  not null,
    iPageId  int  not null,
    iFileId  int  not null,
    iHobtId  bigint  not null,
    vcObj  varchar ( 600 ) not null,
    vcIndex  varchar ( 200 ) not null,
    vcMode  varchar ( 100 ) not null,
    vcOwner  varchar ( 100 ) not null,
    vcOwnerMode varchar ( 100 ) not null,
    vcWaiter varchar ( 100 ) not null,
    vcWaiterMode varchar ( 100 ) not null
   )
   
   set @vcSql = N'
   select
    ResData.iResId,
    ResData.iDb,
    ResData.iPage,
    ResData.iFile,
    ResData.iHobtId,
    ResData.vcObj,
    ResData.vcIndex,
    ResData.vcMode,
    vcOwner   = left ( isnull ( own.value ( ''(/owner-list/owner/@id)[1]'', ''nvarchar ( max )'' ), '''' ), 100 ),
    vcOwnerMode  = left ( isnull ( own.value ( ''(/owner-list/owner/@mode)[1]'', ''nvarchar ( max )'' ), '''' ), 100 ),
    vcWaiter  = left ( isnull ( wait.value ( ''(/waiter-list/waiter/@id)[1]'', ''nvarchar ( max )'' ), '''' ), 100 ),
    vcWaiterMode = left ( isnull ( wait.value ( ''(/waiter-list/waiter/@mode)[1]'', ''nvarchar ( max )'' ), '''' ), 100 )
   from
    (
     select
      iResId = row_number () over ( order by ( select 1 ) ),
      iDb  = isnull ( ResData.Res.value ( ''(@dbid)[1]'', ''int'' ), -1 ),
      iPage = isnull ( ResData.Res.value ( ''(@pageid)[1]'', ''int'' ), -1 ),
      iFile = isnull ( ResData.Res.value ( ''(@fileid)[1]'', ''int'' ), -1 ),
      iHobtId = isnull ( ResData.Res.value ( ''(@hobtid)[1]'', ''bigint'' ), -1 ),
      vcObj = left ( isnull ( ResData.Res.value ( ''(@objectname)[1]'', ''nvarchar ( max )'' ), '''' ), 100 ),
      vcIndex = left ( isnull ( ResData.Res.value ( ''(@indexname)[1]'', ''nvarchar ( max )'' ), '''' ), 100 ),
      vcMode = left ( isnull ( ResData.Res.value ( ''(@mode)[1]'', ''nvarchar ( max )'' ), '''' ), 100 ),
      xOwner = ResData.Res.query ( ''owner-list'' ),
      xWaiter = ResData.Res.query ( ''waiter-list'' )
     from @xBody.nodes ( ' +
      iif
      (
       @xBody.exist ( '/EVENT_INSTANCE/TextData/deadlock-list/deadlock/resource-list/ridlock' ) = 1,
       '''/EVENT_INSTANCE/TextData/deadlock-list/deadlock/resource-list/ridlock''',
       '''/EVENT_INSTANCE/TextData/deadlock-list/deadlock/resource-list/keylock'''
      )
     + ' ) ResData ( Res )
    ) ResData
     cross apply
    ResData.xOwner.nodes ( ''.'' ) owners ( own )
     cross apply
    ResData.xWaiter.nodes ( ''.'' ) waiters ( wait )
   '
   insert into @tblResData
   (
    iResId,
    iDbId,
    iPageId,
    iFileId,
    iHobtId,
    vcObj,
    vcIndex,
    vcMode,
    vcOwner,
    vcOwnerMode,
    vcWaiter,
    vcWaiterMode
   ) exec sp_executesql @vcSql, N'@xBody xml', @xBody = @xBody
   
   ;
   with AttrData
   as
   (
    select distinct vcAttrTpName, vcAttrName
    from
    (
     select distinct 'База данных', isnull ( db_name ( iDbId ), '' ) from @tblResData
     union all
     select distinct 'Файл базы данных', isnull ( dbfil.name, '' )
     from @tblResData fildata left outer join sys.master_files dbfil on
        fildata.iDbId = dbfil.database_id and fildata.iFileId = dbfil.file_id
     union all
     select distinct 'Объект', vcObj from @tblResData
     union all
     select distinct 'Индекс', vcIndex from @tblResData
     union all
     select distinct 'Тип блокировки', vcMode from @tblResData
     union all
     select distinct 'Тип блокировки', vcOwnerMode from @tblResData
     union all
     select distinct 'Тип блокировки', vcWaiterMode from @tblResData
    ) attr ( vcAttrTpName, vcAttrName )
   )
   insert into dbo.Attrs ( iAttrTpId, vcAttrName )
    select attrtp.iAttrTpId, attrdata.vcAttrName
    from
     AttrData attrdata
      inner join
     dbo.AttrTypes attrtp on attrdata.vcAttrTpName = attrtp.vcAttrTpName
      left outer join
     dbo.Attrs attr on
      attr.iAttrTpId = attrtp.iAttrTpId and
      attr.vcAttrName = attrdata.vcAttrName
    where attr.iAttrId is null
   
   declare @tblResIds table ( iResId int not null, iResNewId int not null )
   
   ;
   with AttrData
   as
   (
    select attrs.iAttrId, attrs.vcAttrName, attrtp.vcAttrTpName
    from
     dbo.Attrs attrs
      inner join
     dbo.AttrTypes attrtp on attrs.iAttrTpId = attrtp.iAttrTpId
   )
   merge dbo.DeadlockRes as trg
   using
   (
    select iResId, iEventId = @iEventId, iDbId = attrdb.iAttrId, iFileId = attrfil.iAttrId,
        res.iPageId, iObjId = attrobj.iAttrId, iIndId = attrind.iAttrId, res.iHobtId
    from
     (
      select distinct
       iResId,
       iDbId,
       iPageId,
       iFileId,
       iHobtId,
       vcObj,
       vcIndex,
       vcMode
      from @tblResData
     ) res
      inner join
     AttrData attrdb on attrdb.vcAttrTpName = N'База данных' and attrdb.vcAttrName = isnull ( db_name ( res.iDbId ), '' )
      inner join
     AttrData attrobj on attrobj.vcAttrTpName = N'Объект' and attrobj.vcAttrName = res.vcObj
      inner join
     AttrData attrind on attrind.vcAttrTpName = N'Индекс' and attrind.vcAttrName = res.vcIndex
      left outer join
     sys.master_files fil on
      fil.database_id = res.iDbId and
      fil.file_id = res.iFileId
      
      inner join
     AttrData attrfil on attrfil.vcAttrTpName = N'Файл базы данных' and attrfil.vcAttrName = isnull ( fil.name, '' )
   ) as src on 1 = 0
   when not matched then insert ( iEventId, iDbId, iFileId, iPageId, iObjId, iIndId, iHobtId )
    values ( src.iEventId, src.iDbId, src.iFileId, src.iPageId, src.iObjId, src.iIndId, src.iHobtId )
   output src.iResId, inserted.iResId
   into @tblResIds ( iResId, iResNewId )
   ;
   
   insert into dbo.ResOwners ( iResId, iProcessId, iModeId )
    select resids.iResNewId, pr.iProcessId, attr.iAttrId
    from
     @tblResData resdata
      inner join
     @tblResIds resids on resdata.iResId = resids.iResId
      inner join
     dbo.DeadlockChains pr on
      pr.iEventId = @iEventId and
      pr.vcProcessId = resdata.vcOwner
      
      inner join
     (
      select attr.iAttrId, attr.vcAttrName
      from
       dbo.Attrs attr
        inner join
       dbo.AttrTypes attrtp on
        attr.iAttrTpId = attrtp.iAttrTpId and
        attrtp.vcAttrTpName = 'Тип блокировки'
     ) attr on resdata.vcOwnerMode = attr.vcAttrName
   
   insert into dbo.ResWaiters ( iResId, iProcessId, iModeId )
    select resids.iResNewId, pr.iProcessId, attr.iAttrId
    from
     @tblResData resdata
      inner join
     @tblResIds resids on resdata.iResId = resids.iResId
      inner join
     dbo.DeadlockChains pr on
      pr.iEventId = @iEventId and
      pr.vcProcessId = resdata.vcWaiter
      
      inner join
     (
      select attr.iAttrId, attr.vcAttrName
      from
       dbo.Attrs attr
        inner join
       dbo.AttrTypes attrtp on
        attr.iAttrTpId = attrtp.iAttrTpId and
        attrtp.vcAttrTpName = 'Тип блокировки'
     ) attr on resdata.vcWaiterMode = attr.vcAttrName
  commit tran
 end try
 begin catch
  if xact_state () <> 0
  begin
   rollback tran
  end
  set @vcErr = concat ( error_message (), ' ', error_number (), ' ', error_line () )
  ; throw 60000, @vcErr, 1
 end catch
end
go

Протестируем работу уведомления, создав классическую взаимоблокировку. Откроем два соединения. В первом запустим такой код:

create table ##data1 ( i int )
create table ##data2 ( i int )

begin tran

insert into ##data1 ( i )
 values ( 1 )

Во втором соединении запустим код:

begin tran

insert into ##data2 ( i )
 values ( 1 )
В первом попробуем обратиться к таблице, заблокированной вторым соединением:

update ##data2
 set i += 1
Аналогично поступим со вторым соединением:

update ##data1
 set i += 1
В течение нескольких секунд менеджер блокировок обнаруживает неразрешимую цепочку и выбирает второе соединение в качестве жертвы взаимоблокировки. Сделав запрос к таблицам, видим результаты.

declare @iEventId int = 1
select evn.vcProcessId, evn.bVictim, evn.vcResource,
 vcLogin = attrlog.vcAttrName,
 vcHost = attrhost.vcAttrName,
 vcIsolation = attrisol.vcAttrName,
 vcApp = attrapp.vcAttrName,
 evn.dtLastBatchStart, evn.dtLastBathEnd,
 vcLock = attrlock.vcAttrName,
 evn.vcBatch
from
 dbo.DeadlockChains evn
  inner join
 dbo.Attrs attrlog on evn.iLoginId = attrlog.iAttrId
  inner join
 dbo.Attrs attrhost on evn.iHostId = attrhost.iAttrId
  inner join
 dbo.Attrs attrisol on evn.iIsolationId = attrisol.iAttrId
  inner join
 dbo.Attrs attrapp on evn.iAppId = attrapp.iAttrId
  inner join
 dbo.Attrs attrlock on evn.iLockId = attrlock.iAttrId
where evn.iEventId = @iEventId

select vcDb = attrdb.vcAttrName, vcDbFile = attrfil.vcAttrName, res.iPageId, res.iHobtId,
 vcObj = attrobj.vcAttrName, vcIndex = attrind.vcAttrName,
 vcOwner = resowns.vcProcessId, vcOwnerLockMode = lockown.vcAttrName, vcWaiter = reswait.vcProcessId,
 vcWaiterLockMode = lockwait.vcAttrName
from
 dbo.DeadlockRes res
  inner join
 dbo.Attrs attrdb on res.iDbId = attrdb.iAttrId
  inner join
 dbo.Attrs attrfil on res.iFileId = attrfil.iAttrId
  inner join
 dbo.Attrs attrobj on res.iObjId = attrobj.iAttrId
  inner join
 dbo.Attrs attrind on res.iIndId = attrind.iAttrId
  inner join
 dbo.ResOwners own on res.iResId = own.iResId

  inner join
 dbo.DeadlockChains resowns on
  resowns.iEventId = res.iEventId and
  own.iProcessId = resowns.iProcessId
  
  inner join
 dbo.ResWaiters wait on res.iResId = wait.iResId
  inner join
 dbo.DeadlockChains reswait on
  reswait.iEventId = res.iEventId and
  wait.iProcessId = reswait.iProcessId

  inner join
 dbo.Attrs lockown on own.iModeId = lockown.iAttrId
  inner join
 dbo.Attrs lockwait on wait.iModeId = lockwait.iAttrId
where res.iEventId = @iEventId







Полный перечень наборов событий можно посмотреть в специальном системном представлении sys.event_notification_event_types. Тут есть много интересных событий. Например, можно получать уведомления при обнаружении поврежденной страницы в базе данных, нехватки памяти при операциях сортировки или построении hash-таблицы, изменении уровня потребления памяти сервером, расширении файлов базы, эскалации блокировок, изменении в доступах, настройках базы или сервера, наличии заблокированных соединений и многом другом.

воскресенье, 17 июля 2016 г.

Зеркальное отображение (безопасность на основе сертификатов, следящий сервер).

Сегодня речь пойдет о реализации такого решения высокого уровня доступности как зеркалирование баз данных. Это очень полезное и легкое в развертывании решение, которое может значительно повысить доступность приложений. Когда зеркалирование появилось в MS SQL 2005 его даже стали называть убийцей кластеров за отсутствие сложных административных настроек, легкость в реализации, возможность автоматической отработки отказа. При этом в отличие от кластеров здесь нет общего дискового хранилища. Вместо этого каждый участник сеанса зеркального отображения может располагаться на собственных дисках, что еще больше повышает безопасность. В качестве примера создадим простую базу данных с одной таблицей для хранения документов. В данном решении будут задействованы 3 сервера: основной, к которому могут подключаться приложения, зеркальный с синхронизированной резервной копией данных и следящий, который нужен для автоматической отработки отказа (если работа будет настроена в безопасном режиме).

if db_id ( N'Docs' ) is null
begin
 create database Docs
  on primary
  (
   name  = DocData,
   filename = 'C:\В\DbData\DocData.mdf',
   size  = 10 Mb,
   filegrowth = 10 Mb,
   maxsize  = unlimited
  )
  log on
  (
   name  = DocLog,
   filename = 'C:\В\TranLog\DocLog.ldf',
   size  = 10 Mb,
   filegrowth = 10 Mb,
   maxsize  = unlimited
  )
end
go

alter database Docs set recovery full
go

use Docs
go

create table dbo.Documents
(
 iDocId int identity ( 1, 1 ) not null,
 vcDoc varchar ( 100 )  not null,
 varDoc varbinary ( max ) not null,
 constraint PK_Documents_iDocId primary key clustered ( iDocId asc ) on [primary],
 constraint AK_Documents_vcDoc unique nonclustered ( vcDoc asc ) on [primary]
) on [primary]
go

insert into dbo.Documents ( vcDoc, varDoc )
 values ( 'Test1', 0x )
go
Теперь для подготовки сеанса зеркального отображения необходимо сделать полный бэкап базы и бэкап ее журнала транзакций:
backup database Docs
 to disk = 'C:\В\Docs.bak'
 with format, init, compression, stats = 1, norewind, checksum
go

backup log Docs
 to disk = 'C:\В\Docs.trn'
 with format, init, compression, stats = 1, norewind, checksum
go
Созданные резервные копии необходимо восстановить на зеркальном сервере с опцией norecovery.

if db_id ( N'Docs' ) is null
begin
 restore database Docs
  from disk = 'C:\В\Docs.bak'
  with
   move 'DocData' to 'C:\Program Files\Microsoft SQL Server\MSSQL12.MIRROR\MSSQL\DATA\DocData.mdf',
   move 'DocLog' to 'C:\Program Files\Microsoft SQL Server\MSSQL12.MIRROR\MSSQL\DATA\DocLog.ldf',
   stats = 1,
   norecovery
 restore log Docs
  from disk = 'C:\В\Docs.trn'
  with
   stats = 1,
   norecovery
end
go
Приступим к настройке объектов для передачи данных между серверами. Понадобятся конечные точки и учетные записи. Безопасность конечных точек лучше настраивать с помощью сертификатов (а не Windows). В этом случае не придется беспокоиться о том под какими учетными записями работают службы сервера (локальными или доменными), входят ли компьютеры в один домен или в разные домены с определенными отношениями (или же находятся вне доменов).

Итак, на основном сервере требуется создать логин для доступа к зеркальному и следящему серверам, настроить сертификат, создать конечную точку, подготовить резервные копии сертификата для восстановления на двух остальных серверах.

use master
go

if suser_id ( N'MainRepl' ) is null
begin
 create login MainRepl
  with
  password  = N'2#$@#!',
  check_policy  = off,
  default_database = master,
  default_language = russian
end
go

use master
go

if database_principal_id ( N'MainReplUser' ) is null
begin
 create user MainReplUser
  from login MainRepl
  with default_schema = dbo
end
go

use master
go

if not exists
(
 select *
 from sys.symmetric_keys
 where name = N'##MS_DatabaseMasterKey##'
)
begin
 create master key
  encryption by password = N'$R#tt5feR#R#D'
end
go

use master
go

if cert_id ( N'MainReplHost' ) is null
begin
 create certificate MainReplHost
  authorization dbo
  with subject = N'Сертификат главного сервера.',
  expiry_date = '2017-12-31',
  start_date = '2016-07-01'
end
go

if not exists
(
 select *
 from sys.endpoints
 where name = N'Mirroring'
)
begin
 create endpoint Mirroring
  state = started
  as tcp
  (
   listener_port = 7024,
   listener_ip = all
  )
  for database_mirroring
  (
   authentication = certificate MainReplHost,
   encryption = required algorithm aes,
   role = all
  )
end
go

backup certificate MainReplHost
 to file = 'C:\Program Files\Microsoft SQL Server\MSSQL12.MIRROR\MSSQL\DATA\MainReplHost.cer'
go
backup certificate MainReplHost
 to file = 'C:\Program Files\Microsoft SQL Server\MSSQL12.WITNESS\MSSQL\DATA\MainReplHost.cer'
go
В коде, приведенном выше, перед созданием сертификата в master был создан главный ключ БД для защиты сертификата, так как последний не защищен паролем. Для конечной точки прописана безопасность на основе сертификата. Резервная копия сертификата создана дважды: в доступных местах для зеркального и следящего серверов (следящий и зеркальный сервера на моем компьютере работают под локальными учетными записями). Аналогичные действия требуется выполнить на двух оставшихся серверах. Вот код для зеркального сервера:

use master
go

if suser_id ( N'MirrorRepl' ) is null
begin
 create login MirrorRepl
  with
  password  = N'2#$@#!',
  check_policy  = off,
  default_database = master,
  default_language = russian
end
go

use master
go

if database_principal_id ( N'MirrorReplUser' ) is null
begin
 create user MirrorReplUser
  from login MirrorRepl
  with default_schema = dbo
end
go

use master
go

if not exists
(
 select *
 from sys.symmetric_keys
 where name = N'##MS_DatabaseMasterKey##'
)
begin
 create master key
  encryption by password = N'^YgfeHEF$#23'
end
go

use master
go

if cert_id ( N'MirrorReplHost' ) is null
begin
 create certificate MirrorReplHost
  authorization dbo
  with subject = N'Сертификат зеркального сервера.',
  expiry_date = '2017-12-31',
  start_date = '2016-07-01'
end
go

if not exists
(
 select *
 from sys.endpoints
 where name = N'Mirroring'
)
begin
 create endpoint Mirroring
  state = started
  as tcp
  (
   listener_port = 7025,
   listener_ip = all
  )
  for database_mirroring
  (
   authentication = certificate MirrorReplHost,
   encryption = required algorithm aes,
   role = all
  )
end
go

backup certificate MirrorReplHost
 to file = 'C:\В\MirrorReplHost.cer'
go
backup certificate MirrorReplHost
 to file = 'C:\Program Files\Microsoft SQL Server\MSSQL12.WITNESS\MSSQL\DATA\MirrorReplHost.cer'
go
Здесь все аналогично коду для основного сервера, только для конечной точки указан другой порт из-за того что в данном примере все 3 сервера находятся на одном компьютере. Наконец приведем такой же код для следящего сервера (у конечной точки опять-таки порт отличен от портов конечных точек основного и зеркального серверов).

use master
go

if suser_id ( N'WitnessRepl' ) is null
begin
 create login WitnessRepl
  with
  password  = N'erg343grF#$FE',
  check_policy  = off,
  default_database = master,
  default_language = russian
end
go

use master
go

if database_principal_id ( N'WitnessReplUser' ) is null
begin
 create user WitnessReplUser
  from login WitnessRepl
  with default_schema = dbo
end
go

use master
go

if not exists
(
 select *
 from sys.symmetric_keys
 where name = N'##MS_DatabaseMasterKey##'
)
begin
 create master key
  encryption by password = N'we32@#ESe'
end
go

use master
go

if cert_id ( N'WitnessReplHost' ) is null
begin
 create certificate WitnessReplHost
  authorization dbo
  with subject = N'Сертификат следящего сервера.',
  expiry_date = '2017-12-31',
  start_date = '2016-07-01'
end
go

if not exists
(
 select *
 from sys.endpoints
 where name = N'Mirroring'
)
begin
 create endpoint Mirroring
  state = started
  as tcp
  (
   listener_port = 7023,
   listener_ip = all
  )
  for database_mirroring
  (
   authentication = certificate WitnessReplHost,
   encryption = required algorithm aes,
   role = all
  )
end
go

backup certificate WitnessReplHost
 to file = 'C:\В\WitnessReplHost.cer'
go
backup certificate WitnessReplHost
 to file = 'C:\Program Files\Microsoft SQL Server\MSSQL12.MIRROR\MSSQL\DATA\WitnessReplHost.cer'
go
После того как все конечные точки созданы необходимо обеспечить доступ с каждого сервера на 2 других сервера. Для этого для сертификата и логина на каждом сервере необходимо на двух других серверах восстановить резервную копию сертификата и создать одноименного пользователя с правами на конечную точку и на сертификат. Так это делается на основном сервере (пароли на каждом сервере не обязаны совпадать).

use master
go

if suser_id ( N'MirrorRepl' ) is null
begin
 create login MirrorRepl
  with
   password  = 'EG#WferWF43edf',
   check_policy  = off,
   default_database = master,
   default_language = russian
end
go
if suser_id ( N'WitnessRepl' ) is null
begin
 create login WitnessRepl
  with
   password  = 'rf34ewfFWfdsd#@#$$',
   check_policy  = off,
   default_database = master,
   default_language = russian
end
go

use master
go
if database_principal_id ( N'MirrorReplUser' ) is null
begin
 create user MirrorReplUser
  from login MirrorRepl
  with default_schema = dbo
end
go
if database_principal_id ( N'WitnessReplUser' ) is null
begin
 create user WitnessReplUser
  from login WitnessRepl
  with default_schema = dbo
end
go

use master
go
if cert_id ( N'MirrorReplHost' ) is null
begin
 create certificate MirrorReplHost
  authorization MirrorReplUser
  from file = 'C:\В\MirrorReplHost.cer'
end
go
if cert_id ( N'WitnessReplHost' ) is null
begin
 create certificate WitnessReplHost
  authorization WitnessReplUser
  from file = 'C:\В\WitnessReplHost.cer'
end
go

use master
go
grant connect on endpoint::Mirroring to MirrorRepl, WitnessRepl
go
Аналогичный код прогоняется на зеркальном сервере (создаются логины и пользователи для основного и следящего серверов, создаются сертификаты из резервных копий, пользователи делаются владельцами своих сертификатов, логинам предоставляется доступ на подключение к конечной точке).

use master
go

if suser_id ( N'MainRepl' ) is null
begin
 create login MainRepl
  with
   password  = '43#T%Fev',
   check_policy  = off,
   default_database = master,
   default_language = russian
end
go
if suser_id ( N'WitnessRepl' ) is null
begin
 create login WitnessRepl
  with
   password  = 'dgf3t4SVSDv',
   check_policy  = off,
   default_database = master,
   default_language = russian
end
go

use master
go
if database_principal_id ( N'MainReplUser' ) is null
begin
 create user MainReplUser
  from login MainRepl
  with default_schema = dbo
end
go
if database_principal_id ( N'WitnessReplUser' ) is null
begin
 create user WitnessReplUser
  from login WitnessRepl
  with default_schema = dbo
end
go

use master
go
if cert_id ( N'MainReplHost' ) is null
begin
 create certificate MainReplHost
  authorization MainReplUser
  from file = 'C:\Program Files\Microsoft SQL Server\MSSQL12.MIRROR\MSSQL\DATA\MainReplHost.cer'
end
go
if cert_id ( N'WitnessReplHost' ) is null
begin
 create certificate WitnessReplHost
  authorization WitnessReplUser
  from file = 'C:\Program Files\Microsoft SQL Server\MSSQL12.MIRROR\MSSQL\DATA\WitnessReplHost.cer'
end
go

use master
go
grant connect on endpoint::Mirroring to MainRepl, WitnessRepl
go
В последний раз запустим аналогичный код для сервере-свидетеле:

use master
go

if suser_id ( N'MainRepl' ) is null
begin
 create login MainRepl
  with
   password  = 'rfT#$FRged',
   check_policy  = off,
   default_database = master,
   default_language = russian
end
go
if suser_id ( N'MirrorRepl' ) is null
begin
 create login MirrorRepl
  with
   password  = 'e3e#Rwd',
   check_policy  = off,
   default_database = master,
   default_language = russian
end
go

use master
go
if database_principal_id ( N'MainReplUser' ) is null
begin
 create user MainReplUser
  from login MainRepl
  with default_schema = dbo
end
go
if database_principal_id ( N'MirrorReplUser' ) is null
begin
 create user MirrorReplUser
  from login MirrorRepl
  with default_schema = dbo
end
go

use master
go
if cert_id ( N'MainReplHost' ) is null
begin
 create certificate MainReplHost
  authorization MainReplUser
  from file = 'C:\Program Files\Microsoft SQL Server\MSSQL12.WITNESS\MSSQL\DATA\MainReplHost.cer'
end
go
if cert_id ( N'MirrorReplHost' ) is null
begin
 create certificate MirrorReplHost
  authorization MirrorReplUser
  from file = 'C:\Program Files\Microsoft SQL Server\MSSQL12.WITNESS\MSSQL\DATA\MirrorReplHost.cer'
end
go

use master
go
grant connect on endpoint::Mirroring to MainRepl, MirrorRepl
go
Все готово к включению сеанса зеркального отображения. Для этого сперва надо на зеркальном сервере указать основной сервер в качестве партнера:
alter database Docs set partner = 'tcp://LAPTOP-LBO51UJL:7024'
Выше в качестве значения свойства partner указывается полное доменное имя и через двоеточие порт конечной точки на другом сервере (впереди пишется "tcp://"). Следующим шагом нужно на основном сервере указать адрес зеркального:
alter database Docs set partner = 'tcp://LAPTOP-LBO51UJL:7025'
Зеркальное отображение заработало. Надо отметить что зеркалирование может работать в одном из двух режимов: режим высокой безопасности либо режим высокой производительности. При втором режиме фиксация транзакции происходит асинхронно относительно передачи данных на зеркальный сервер. Этот режим подходит для серверов, в которых не происходят массовые операции и допустима некоторая потеря данных. В асинхронном режиме недоступна автоматическая отработка отказа. Если же требуется наивысший уровень безопасности и нет массовых операций, то можно использовать зеркалирование с синхронном режиме. Для асинхронного режима требуется выполнить инструкцию на основном сервере:
alter database Docs set partner safety off
Опция full позволяет работать в синхронном режиме:
alter database Docs set partner safety full
На основном или зеркальном сервере можно выполнить запрос:
select db_name ( database_id ), *
from sys.database_mirroring
where  db_name ( database_id ) = 'Docs'
Это позволит проверить роль текущего сервера (основной или зеркальный), состояние сеанса зеркалирования (синхронизировано либо идет прием новых строк журнала транзакций), адрeса партнеров и следящего сервера, состояние соединения со свидетелем и режим зеркалирования.
Для выполнения ручной отработки отказа, нужно на сервере, являющемся на данный момент основным выполнить код (требуется перевод в режим высокой безопасности):
use master alter database Docs set partner failover
В этот момент на сервере, который был основным, база данных Docs перейдет в состояние restoring, а на сервере, который был зеркальным, она станет доступной. Перенос данных уже пойдет в обратную сторону: от зеркального сервера к основному. Эти изменения немедленно отразятся и в представлении sys.database_mirroring. Зеркальное отображение можно приостановить, если в этом есть необходимость, а затем продолжить. Для этого используются соответственно инструкции:
alter database Docs set partner suspend
alter database Docs set partner resume
Надо только помнить, что если сеанс приостановлен, то журнал транзакций в базе на основном сервере не будет усекаться до тех пор пока зеркало не станет синхронизированным с основным. Для удаления сеанса используется значение off для опции partner:
alter database Docs set partner off
Если основной сервер стал недоступным и не произошло отработки отказа (при этом следящего нет либо он подключен к зеркальному), то можно принудительно сделать доступным зеркальный сервер. Это приведет к остановке зеркалирования и возможно потере данных, которые не были реплицированы с основного сервера. Зеркальный сервер делается доступным с помощью такой инструкции:
alter database Docs set partner force_service_allow_data_loss
Подключим теперь следящий сервер к основному для возможности автоматической отработки отказа:
alter database Docs set witness = 'tcp://LAPTOP-LBO51UJL:7023'
Для отключения следящего сервера достаточно выполнить код:
alter database Docs set witness off
На следящем сервере можно делать запросы к представлению sys.database_mirroring_witnesses для просмотра состояния сеанса зеркалирования. Теперь подключения приложений могут автоматически перенаправляться на доступный сервер. Для этого требуется указать дополнительный атрибут в строке подключения. Например, на C# можно так создать экземпляр подключения:
using (SqlConnection cn = new SqlConnection(@"Data Source=LAPTOP-LBO51UJL;Failover Partner=LAPTOP-LBO51UJL\MIRROR;Initial Catalog=Docs;Integrated Security=True;"))
Если в момент отработки отказа соединение будет выполнять какую-либо работу с базой, то активная транзакция откатится, но последующие подключения пройдут без проблем.
То что со свидетелем работает автоматическая отработка отказа, это здорово: повышается доступность приложений. Но хорошо бы получать автоматические уведомления о факте отработки отказа. Приведу пример того как это сделать с помощью уведомлений о событиях. Для удобства на основном и зеркальном серверах создадим базу данных с таблицей, в которой будет храниться лог сообщений о состоянии сеанса зеркалирования. Также понадобится настройка объектов Service Broker и создание уведомления о событии. Итак, код, приведенный ниже, нужно выполнить на основном и зеркальном серверах:

if db_id ( 'CheckData' ) is null
begin
 create database CheckData
end
go
alter database CheckData set enable_broker
go

use CheckData
go
if object_id ( 'dbo.DbMirrorLog', 'U' ) is null
begin
 create table dbo.DbMirrorLog
 (
  dtDate datetime  not null,
  vcLog varchar ( max )  not null,
  vcDb varchar ( 200 )  not null,
  iState int   not null,
  iRowId int identity ( 1, 1 ) not null,
  constraint PK_bMirrorLog primary key clustered ( dtDate asc, iRowId asc ) on [primary]
 ) on [primary]
end
go

if object_id ( 'dbo.LogMirrorState', 'P' ) is null
begin
 exec sp_executesql 'create proc dbo.LogMirrorState as return'
end
go

alter proc dbo.LogMirrorState
as
begin
 declare @xMes xml, @uidHan uniqueidentifier

 ;
 receive top ( 1 ) @xMes = message_body, @uidHan = conversation_handle
 from dbo.MirrorNotify

 if @@rowcount = 0
 begin
  return
 end

 insert into dbo.DbMirrorLog
 (
  dtDate,
  vcLog,
  vcDb,
  iState
 )
 values
 (
  isnull ( @xMes.value ( '(/EVENT_INSTANCE/PostTime)[1]', 'datetime' ), getdate () ),
  isnull ( @xMes.value ( '(/EVENT_INSTANCE/TextData)[1]', 'varchar ( max )' ), '' ),
  isnull ( @xMes.value ( '(/EVENT_INSTANCE/DatabaseName)[1]', 'varchar ( max )' ), '' ),
  isnull ( @xMes.value ( '(/EVENT_INSTANCE/State)[1]', 'int' ), -1 )
 )
end
go

if object_id ( 'dbo.MirrorNotify', 'SQ' ) is null
begin
 create queue dbo.MirrorNotify
 with
  status = on,
  activation
  (
   status = on,
   procedure_name = dbo.LogMirrorState,
   max_queue_readers = 4,
   exec as owner
  )
 on [default]
end
go

if not exists
(
 select *
 from sys.services
 where name = 'MirrorLog'
)
begin
 create service MirrorLog
  on queue MirrorNotify ( [http://schemas.microsoft.com/SQL/Notifications/PostEventNotification] )
end
go

if not exists
(
 select *
 from sys.server_event_notifications
 where name = 'DbMirrorLog'
)
begin
 create event notification DbMirrorLog
  on server for DATABASE_MIRRORING_STATE_CHANGE
  to service 'MirrorLog', 'current database'
end
go

Можно выполнить несколько мануальных отработок отказа, а затем на основном сервере выполнить инструкцию shutdown для его отключения (либо отключить его в диспетчере конфигураций). При этом следящий сервер сразу определит, что основной недоступен и выполнит отработку отказа на зеркало. Затем нужно включить основной сервер, и он синхронизируется с бывшим зеркальным и сам станет зеркальным. Если сделать запросы к таблице DbMirrorLog, то увидим такой лог:










Если настроить на сервере Database Mail, то в процедуре активации очереди можно вызывать процедуру msdb.dbo.sp_send_dbmail для мгновенного оповещения об отработке отказа.

суббота, 9 июля 2016 г.

Исключения в logon-триггерах

Logon-триггеры, которые появились в MS SQL 2005 предоставляют удобный механизм для логирования подключений. Подключения могут протоколироваться и средствами профайлера и другими методами слежения за процессами. Однако Logon-триггеры превратили процесс подключения в транзакцию, которую при желании можно откатить. Например, можно выполнять проверки для разрешения или запрета на подключение к серверу.

Приведем такой пример. Создадим базу данных для хранения настроек и логов:

use master
go


if db_id ( N'LogStore' ) is null
begin
        create database LogStore
        on primary
        (
                name = LogData,
                filename = N'C:\Users\В\Desktop\LogData.mdf',
                size = 100 Mb,
                filegrowth = 100 Mb,
                maxsize = unlimited
        )
        log on
        (
                name = LogDataLog,
                filename = N'C:\Users\В\Desktop\LogDataLog.ldf',
                size = 100 Mb,
                filegrowth = 100 Mb,
                maxsize = unlimited
        )

end

go


Создадим таблицу, где будут перечислены компьютеры, с которых запрещено подключаться к серверу:
use LogStore
go


if object_id ( N'dbo.ExHosts', N'U' ) is null

begin
        create table dbo.ExHosts
        (
                iHostId int identity ( 1, 1 ) not null,
                vcHost nvarchar ( 200 ) not null,
                constraint PK_ExHosts_iHostId primary key clustered ( iHostId asc ) on [primary],
                constraint AK_ExHosts_vcHost unique nonclustered ( vcHost asc ) on [primary]
        )
end

go


Логон-триггер может проверять компьютер соединения и откатывать транзакцию, если окажется, что в таблице есть строка с таким компьютером. Перед созданием триггера надо определиться с тем под какой учетной записью он будет работать (ведь не стоит давать всем доступ на базу и на таблицу). Создадим такую учетную запись и предоставим ей права:

use master

go


if suser_id ( N'LogLogin' ) is null

begin


create login LogLogin
        with
                   password = N'qw3@',
                   check_policy = off,
                   default_database = LogStore,
                   default_language = russian

end

go


use LogStore


go


if database_principal_id ( N'LogUser' ) is null

begin

        create user LogUser
                from login LogLogin
                with default_schema = dbo
end

go


if database_principal_id ( N'LogRole' ) is null

begin
        create role LogRole
                  authorization dbo

end

go

grant select on object::dbo.ExHosts to LogRole

go


alter role LogRole add member LogUser

go




Теперь можно создать триггер на вход:

use master

go


if not exists
(
        select *
        from sys.server_triggers
        where name = N'CheckHost'
)
begin
        exec sp_executesql N'create trigger CheckHost on all server for logon as return'

end
go


alter trigger CheckHost on all server
with exec as 'LogLogin'
for logon

as
begin
        set nocount, xact_abort on
        if exists
        (
                select *
                from LogStore.dbo.ExHosts
                where vcHost = isnull ( host_name (), N'' )
        )
        begin
                ; throw 60000, N'Попытка входа с запрещенного хоста.', 1
                if @@trancount > 0
                begin
                        rollback tran
                end
        end
end
go




При попытке зайти на сервер с запрещенного компьютера произойдет ошибка. Например, если попытаться сделать такое подключение в Management Studio, то появится сообщение об ошибке:

Текста сообщения об ошибке, написанного в коде триггера, здесь не видно. Но такие ошибки попадают в журнал сервера, где их можно обнаружить:


Казалось бы все неплохо. Но при работе с logon-триггерами есть один подводный камень, состоящий в следующем. Код в таких триггерах выполняется в контексте неявной транзакции, опция xact_abort всегда включена. Любое исключение, возникающее из-за процессов, которые не предусмотрел разработчик, приводит к откату этой транзакции и, как следствие, невозможности подключения к серверу. В худшем случае, когда ошибка происходит при любой попытке входа, создается ситуация, когда никто не может подключиться к серверу. Никакая обработка ошибок, например, с помощью try/catch, не поможет, поскольку, как было отмечено выше, транзакция уже откатилась, и ничто не поможет. В этой ситуации можно входить на сервер используя некоторые специальные режимы. Например, режим приоритетного соединения администратора позволяет зайти на сервер, поскольку на такое соединение триггеры не действуют. Для этого требуется ввести в командной строке: sqlcmd -S <имя сервера> -E -A. После чего требуется ввести текст команды: disable trigger CheckHost on all server. Вместо командной строки можно использовать и Management Studio, если в окне подключения перед именем сервера написать: admin:. Возможно, также потребуется произвести эти действия с серверного компьютера (если отключена опция remote admin connection).

Хотел бы предложить подход, который минимизирует вероятность ошибок в работе триггера. Во-первых, логика работы кода должна быть как можно более простой. Пусть это будут простые проверки. Если требуются более сложные способы обработки данных, не связанные с проверкой возможности подключения, то лучше вынести их в другие, нетранзакционные, средства отслеживания событий. Во-вторых, нужно проанализировать все выполняемые в триггере действия по порядку. В нашем случае они такие:
1. Триггер работает от имени логина LogLogin. Пока триггер работает в его контексте удалить логин не получится. Здесь все в порядке.
2. Делается попытка просмотра объекта в базе данных LogStore. Значит, чтобы все было хорошо, требуется, чтобы существовала база данных с таким именем, она должна быть доступной, и текущая учетная запись должна иметь права на подключение к ней.
3. Производится попытка запроса к таблице dbo.ExHosts. Для ее успеха требуется наличие таблицы с таким именем и права на запросы к ней для текущей учетной записи.
4. В запросе к таблице делается условие на столбец. Поэтому требуется удостовериться, что у заданной таблицы существует столбец с таким именем и что его тип данных такой какой мы ожидаем: nvarchar.

Итак, вставим в текст триггера все эти проверки, сделав его работу максимально безопасной. Ниже идет полный код с моими комментариями.

alter trigger CheckHost on all server
with exec as 'LogLogin'
for logon
as
begin

        set nocount, xact_abort on

        -- проверим возможность выполнения запросов к системныму каталогу с перечнем баз данных
    if isnull ( has_perms_by_name ( 'sys.databases', 'object', 'select', null, null ), 0 ) = 0
    begin
        return
    end

    -- существование базы данных с заданным именем в состоянии online
    if not exists
    (
        select *
        from sys.databases
        where name = 'LogStore' and state_desc = 'ONLINE'
    )
    begin
        return
    end

    -- возможность подключения к базе данных
    if isnull ( has_perms_by_name ( 'LogStore', 'database', 'connect', null, null ), 0 ) = 0
    begin
        return
    end

    -- наличие таблицы
    if object_id ( N'LogStore.dbo.ExHosts', N'U' ) is null
    begin
        return
    end

    -- наличие доступов на select к таблице (эта функция корректно учтет наличие несколько доступов и запретов)
    if isnull ( has_perms_by_name ( 'LogStore.dbo.ExHosts', 'object', 'select', null, null ), 0 ) = 0
    begin
        return
    end

    -- проверка существования столбца с определенным именем
    declare @vcSql nvarchar ( max ), @iColId int
    set @vcSql = N'use LogStore set @iColId = columnproperty ( object_id ( N''dbo.ExHosts'', N''U'' ), N''vcHost'', N''ColumnId'' )'
    exec sp_executesql @vcSql, N'@iColId int out', @iColId = @iColId out
    if @iColId is null
    begin
        return
    end

    -- возможность выполнения запросов в системному представлению с перечнем столбцов
    declare @bPerm int
    set @vcSql = N'use LogStore set @bPerm = has_perms_by_name ( ''LogStore.sys.columns'', ''object'', ''select'', null, null )'
    exec sp_executesql @vcSql, N'@bPerm int out', @bPerm = @bPerm out
    if @bPerm = 0
    begin
        return
    end


    -- наличие у столбца нужного типа данных
    if not exists
    (
        select *
        from LogStore.sys.columns
        where
            name = 'vcHost' and
            object_id = object_id ( N'LogStore.dbo.ExHosts', N'U' ) and
            type_name ( system_type_id ) = 'nvarchar' and
            type_name ( user_type_id ) = 'nvarchar' and
            max_length = 400
    )
    begin
        return
    end
    if exists
    (
        select *
        from LogStore.dbo.ExHosts
        where vcHost = isnull ( host_name (), N'' )
    )
    begin
        ; throw 60000, N'Попытка входа с запрещенного компьютера.', 1
        if @@trancount > 0
        begin
            rollback tran
        end
    end
end
go

В коде делается проверка возможности всех действий, которые выполняются при проверке законности входа. Причем, если для проверки возможности действия, происходит обращение к системному объекту, то предварительно делается проверка возможности обращения к системному объекту. Теперь, если, скажем, изменить имя столбца (exec LogStore..sp_rename 'dbo.ExHosts.vcHost', 'vcName', 'column') или сделать запрет для учетной записи на запросы к таблице (exec ( N'use LogStore deny select on object::dbo.ExHosts to LogUser' )) или вообще удалить учетную запись, триггер спокойно продолжит свою работу. В этой ситуации он, конечно, не сможет выполнять свои прямые обязанности по проверке законности подключений. Но в большинстве случаев это лучше чем совсем никого не пускать.