В MS SQL 2008 появилось новое средство слежения за данными: cdc, change data capture - отслеживание измененных данных. Этот механизм позволяет логировать изменения в таблице. Наибольший интерес представляет лог операций insert, update, delete. Однако кроме этого можно отслеживать и изменения в структуре таблицы. Рассмотрим на примере какие возможности есть у cdc и как с их помощью можно получить снэпшот таблицы на любой момент времени. Создадим для тестирования отдельную базу данных:
use master
create database Watcher
use Watcher
Создадим таблицу, которую мы наполним тестовыми данными:
create
table dbo.Logg
(
i int not null,
k int not null,
constraint
PK_Loggs_i_k primary
key clustered ( i asc, k asc ) on [PRIMARY]
)
on [PRIMARY]
go
insert
into dbo.Logg ( i, k )
values
( 1,
1 ),
( 1,
2 ),
( 1,
3 ),
( 2,
1 ),
( 2,
2 ),
( 2, 3 )
Для включения cdc сперва требуется разрешить его на уровне всей базы. Для этого запускаем такой код:
exec sys.sp_cdc_enable_db
Теперь можно проверить, что для базы данных включено cdc:
select
is_cdc_enabled,
*
from
sys.databases
where name = db_name ()
Следующим шагом нужно включить cdc на уровне таблицы:
exec
sys.sp_cdc_enable_table
@source_schema =
'dbo',
@source_name =
'Logg',
@role_name =
'LogData',
@capture_instance = 'Watcher',
@filegroup_name =
[PRIMARY],
@index_name =
PK_Loggs_i_k,
@supports_net_changes = 1,
@captured_column_list = 'i, k'
Когда процедура завершает работу, то для таблицы dbo.Logg создается специальная системная таблица, в которой фиксируются изменения от операций dml. Эта таблица содержится в создаваемой также схеме cdc. В этой схеме есть также таблица ddl_history для учета изменений в структуре таблицы. Также в схеме есть различные системные функции для просмотра информации об изменениях в таблице. Лучше пользоваться ими вместо запросов к системным таблицам.
Автоматически создается и роль (параметр @role_name), члены которой имеют доступ на просмотр информации об изменениях.
Параметр @filegroup_name отвечает за то, в какой файловой группе будет храниться лог изменений. В параметре @captured_column_list задается перечень столбцов, изменения в которых будут отслеживаться (по умолчанию null, то есть отслеживаются все столбцы).
Отслеживание изменений с помощью cdc использует те же методы, что и репликация транзакций. Есть процесс, который читает журнал транзакций (для этого используется обычная для репликации расширенная хранимая процедура xp_replcmds).
Можно найти все таблицы, для которых включено cdc с помощью такого запроса:
select
*
from
sys.tables
where is_tracked_by_cdc = 1
Когда cdc включается для базы, то в SQL Agent создаются два задания: cdc.Watcher_capture, cdc.Watcher_cleanup. Первый следит за журналом транзакций для логирования изменений, второй - выполняет периодически очистку данных логов. По умолчанию история изменений хранится трое суток. При необходимости очистку можно выполнять самостоятельно с удобной для Вас прериодичностью. Для этого можно воспользоваться хранимой процедурой sys.sp_cdc_cleanup_change_table. Например, можно сделать такой вызов:
exec sys.sp_cdc_cleanup_change_table @capture_instance = 'Watcher', @low_water_mark = 0x00000035000000A00006
В этом коде мы указываем, что очистка должна быть выполнена для определенного экземпляра системы отслеживания. И что удалиться должны все записи до указанного номера журнала транзакций. Ниже я расскажу о том, как получать такие номера.
Теперь можно посмотреть на работу cdc в действии. Сейчас таблица dbo.Logg выглядит так:
Сделаем несколько обновлений через интервалы времени:
waitfor
delay '00:01:00'
insert
into dbo.Logg ( i, k )
values
(
4, 1 ),
(
4, 2 ),
(
4, 3 ),
(
4, 4 )
waitfor
delay '00:03:00'
update dbo.Logg
set i = 3
where i = 2
waitfor
delay '00:02:00'
insert
into dbo.Logg ( i, k )
values
( 7,
1 ),
( 7,
2 ),
( 7,
3 ),
( 7, 4 )
Для просмотра лога изменений используется такая автоматически созданная функция: cdc.fn_cdc_get_all_changes_Watcher. Правда, она принимает в качестве параметров не временной интервал, а регистрационные номера журнала транзакий. Функция sys.fn_cdc_map_time_to_lsn может использоваться для конвертации времени в номер. Следующий запрос показывает лог изменений за период:
declare
@startTime datetime
= '2014-08-25 11:35:00', @endTime datetime = '2014-08-25 11:43:00'
declare
@startLsn binary ( 10 ), @endLsn binary ( 10 )
set
@startLsn = sys.fn_cdc_map_time_to_lsn ( 'smallest greater than or
equal', @startTime
)
set
@endLsn = sys.fn_cdc_map_time_to_lsn ( 'largest less than or
equal', @endTime
)
select
case
when
__$operation =
1 then 'удаление'
when
__$operation =
2 then 'вставка'
end Операция,
i, k,
sys.fn_cdc_map_lsn_to_time ( __$start_lsn ) [Время операции],
cast ( dense_rank () over ( order by __$start_lsn, sys.fn_cdc_map_lsn_to_time ( __$start_lsn ) ) as nvarchar ( 100 ) ) + '-' +
cast ( row_number () over ( partition by __$start_lsn order by __$seqval ) as nvarchar ( 100 ) ) [Номер
операции],
dense_rank () over ( order by __$start_lsn, sys.fn_cdc_map_lsn_to_time ( __$start_lsn ) ) TranNum
, __$update_mask, __$start_lsn
from cdc.fn_cdc_get_all_changes_Watcher ( @startLsn, @endLsn, 'all' )
Запрос возвращает такие результаты:
В выводе не упомянут столбец __$update_mask, который является битовой маской, позволяющей определить какие столбцы были действительно обновлены в ходе операции update.
Встроенного компонента для получения снэпшота таблицы на дату нет. Поэтому будем использовать предыдущий запрос, чтобы "отматать" таблицу от текущего момента до нужного времени. То есть будем идти в цикле по номерам транзакий от самого последнего до того, который наиболее близок сверху к дате снэпшота. Создадим также временную таблицу с текущим содержимым dbo.Logg и сделаем из нее снэпшот:
if
object_id ( N'tempdb..#LoggCopy', N'U' ) is not null
drop table #LoggCopy
create
table #LoggCopy
(
i int not null,
k int not null,
primary key clustered ( i asc, k asc ) on [PRIMARY]
)
on [PRIMARY]
insert
into #LoggCopy ( i, k )
select i, k
from dbo.Logg
Теперь реализуем цикл:
declare
@SnapshotTime datetime
= '2014-08-25 11:35:00'
declare
@startLsn binary ( 10 ), @endLsn binary ( 10 )
set
@startLsn = sys.fn_cdc_map_time_to_lsn ( 'smallest greater than or
equal', @SnapshotTime
)
set
@endLsn = sys.fn_cdc_map_time_to_lsn ( 'largest less than or
equal', getdate () )
declare
@StartTranNum int, @EndTranNum int, @CurTranNum int
select
@StartTranNum =
min ( LogData.TranNum ), @EndTranNum = max ( LogData.TranNum )
from
(
select
case
when
__$operation =
1 then 'удаление'
when
__$operation =
2 then 'вставка'
end
Операция,
i, k,
sys.fn_cdc_map_lsn_to_time ( __$start_lsn ) [Время операции],
cast ( dense_rank () over ( order by __$start_lsn, sys.fn_cdc_map_lsn_to_time ( __$start_lsn ) ) as nvarchar ( 100 ) ) + '-' +
cast ( row_number () over ( partition by __$start_lsn order by __$seqval ) as nvarchar ( 100 ) ) [Номер
операции],
dense_rank () over ( order by __$start_lsn, sys.fn_cdc_map_lsn_to_time ( __$start_lsn ) ) TranNum
--__$update_mask
from cdc.fn_cdc_get_all_changes_Watcher ( @startLsn, @endLsn, 'all' )
)
LogData
set
@CurTranNum = @EndTranNum
while
@CurTranNum >=
@StartTranNum
begin
delete data
from
#LoggCopy
data
inner join
(
select
case
when __$operation = 1 then 'удаление'
when __$operation = 2 then 'вставка'
end Операция,
i, k,
sys.fn_cdc_map_lsn_to_time ( __$start_lsn ) [Время
операции],
cast ( dense_rank () over ( order by __$start_lsn, sys.fn_cdc_map_lsn_to_time ( __$start_lsn ) ) as nvarchar ( 100 ) ) + '-' +
cast ( row_number () over ( partition by __$start_lsn order by __$seqval ) as nvarchar ( 100 ) ) [Номер операции],
dense_rank () over ( order by __$start_lsn, sys.fn_cdc_map_lsn_to_time ( __$start_lsn ) ) TranNum
--__$update_mask
from cdc.fn_cdc_get_all_changes_Watcher ( @startLsn, @endLsn, 'all' )
)
LogData on
LogData.TranNum = @CurTranNum and
LogData.i = data.i and
LogData.k = data.k
where
LogData.Операция = 'вставка'
insert into #LoggCopy ( i, k )
select
LogData.i, LogData.k
from
(
select
case
when __$operation = 1 then 'удаление'
when __$operation = 2 then 'вставка'
end Операция,
i, k,
sys.fn_cdc_map_lsn_to_time ( __$start_lsn ) [Время
операции],
cast ( dense_rank () over ( order by __$start_lsn, sys.fn_cdc_map_lsn_to_time ( __$start_lsn ) ) as nvarchar ( 100 ) ) + '-' +
cast ( row_number () over ( partition by __$start_lsn order by __$seqval ) as nvarchar ( 100 ) ) [Номер операции],
dense_rank () over ( order by __$start_lsn, sys.fn_cdc_map_lsn_to_time ( __$start_lsn ) ) TranNum
--__$update_mask
from
cdc.fn_cdc_get_all_changes_Watcher ( @startLsn, @endLsn, 'all' )
) LogData
where
LogData.Операция = 'удаление' and LogData.TranNum = @CurTranNum
set @CurTranNum -= 1
end
Выше мы определили границы номеров транзакий и прошлись по ним циклом в обратном порядке. Там где делался insert, мы сделали delete. А там где имел место delete, наоборот был выполнен insert. Теперь таблица #LogCopy выглядит так как dbo.Logg в самом начале:
select *
from #LoggCopy
Есть и более удобный способ получения снэпшота таблицы на дату без использования циклов. Надо лишь сразу включить cdc, пока в таблице еще нет записей. Создадим таблицу заново:
if
object_id ( N'dbo.Logg', N'U' ) is not null
drop table dbo.Logg
create
table dbo.Logg
(
i int not null,
k int not null,
Attr1 int not null,
Attr2 int not null,
constraint
PK_Loggs_i_k primary
key clustered ( i asc, k asc ) on [PRIMARY]
)
on [PRIMARY]
go
exec
sys.sp_cdc_enable_table
@source_schema = 'dbo',
@source_name =
'Logg',
@role_name =
'LogData',
@capture_instance = 'Watcher',
@filegroup_name = [PRIMARY],
@index_name =
PK_Loggs_i_k,
@supports_net_changes = 1,
@captured_column_list = 'i, k, Attr1, Attr2'
go
Мы добавили в таблицу 2 дополнительных атрибута. Наполним таблицу данными:
insert
into dbo.Logg ( i, k, Attr1, Attr2 )
values
(
1, 1, 11, 111 ),
(
1, 2, 12, 222 ),
(
1, 3, 13, 333 ),
(
2, 1, 14, 444 ),
(
2, 2, 15, 555 ),
(
2, 3, 16, 666 )
waitfor
delay '00:01:00'
update dbo.Logg
set Attr1 *= 10
where i = 2 and k in ( 2, 3 )
waitfor
delay '00:01:03'
update dbo.Logg
set Attr1 *= 10, Attr2 *= 10
where i = 1
При включении cdc параметр @support_net_changes равен единице. Этот параметр позволяет получать суммарные изменения. Это и позволяет получать снэпшот таблиц на дату. То есть мы идем уже не от текущего состояния таблицы, а от начального, в обратном направлении. И специальная функция суммирует все изменения, произошедшие с того начального момента. Надо только указать время искомого состояния таблицы, и можно получить снэпшот:
declare
@startTime datetime, @endTime datetime = '2014-10-03 12:33:00'
declare
@startLsn binary ( 10 ), @endLsn binary ( 10 )
select
@startTime = create_date
from
sys.tables
where
[object_id] = object_id ( N'dbo.Logg', N'U' )
set
@startLsn = sys.fn_cdc_map_time_to_lsn ( 'smallest greater than or
equal', @startTime
)
set
@endLsn = sys.fn_cdc_map_time_to_lsn ( 'largest less than or
equal', @endTime
)
select
i, k, Attr1, Attr2
from cdc.fn_cdc_get_net_changes_Watcher ( @startLsn, @endLsn, 'all' )
Поскольку мы обновляли поля, не входящие в первичный ключ, то функция cdc.fn_cdc_get_all_changes_Watcher может возвращать в поле __$operation значение 4, что означает наличие операции update. А поле __$update_mask дает битовую маску тех полей, которые были обновлены. Ниже приведен удобный запрос для получения в компактном виде информации обо всех изменениях, которые имели место для таблицы:
declare
@startTime datetime, @endTime datetime = getdate ()
declare
@startLsn binary ( 10 ), @endLsn binary ( 10 )
select
@startTime = create_date
from
sys.tables
where
[object_id] = object_id ( N'dbo.Logg', N'U' )
set
@startLsn = sys.fn_cdc_map_time_to_lsn ( 'smallest greater than or
equal', @startTime
)
set
@endLsn = sys.fn_cdc_map_time_to_lsn ( 'largest less than or
equal', @endTime
)
select
case
when
__$operation =
1 then 'удаление'
when
__$operation =
2 then 'вставка'
when
__$operation =
4 then 'обновление'
end Операция,
i, k, Attr1, Attr2,
sys.fn_cdc_map_lsn_to_time ( __$start_lsn ) [Время операции],
cast ( dense_rank () over ( order by __$start_lsn, sys.fn_cdc_map_lsn_to_time ( __$start_lsn ) ) as nvarchar ( 100 ) ) + '-' +
cast ( row_number () over ( partition by __$start_lsn order by __$seqval ) as nvarchar ( 100 ) ) [Номер
операции],
dense_rank () over ( order by __$start_lsn, sys.fn_cdc_map_lsn_to_time ( __$start_lsn ) ) TranNum,
case when __$operation <> 4 then '' else
(
select
left ( data, len ( data ) - 1 )
from
(
select
(
select data.name + ', ' as [text()]
from
(
select power ( 2, row_number () over ( order by column_id ) - 1 ) colnum, name
from sys.columns
where [object_id] = object_id ( N'dbo.Logg', N'U' )
) data
where data.colnum & __$update_mask = data.colnum
for xml path ( '' )
)
data
) data
) end [Обновленные
столбцы]
from cdc.fn_cdc_get_all_changes_Watcher ( @startLsn, @endLsn, 'all' )
В приведенном запросе используется выражение для получения списка обновленных столбцов. Для этого делается запрос к sys.columns для получения пронумерованного списка столбцов. С помощью поля __$update_mask выделяются только обновленные столбцы. А конструкция for xml path ( '' ) в сочетании с псевдонимом столбца [text()] дает быструю конкатенацию обновленных столбцов.