Сегодня я буду рассказывать об одной задаче, которую мне приходилось решать. Задача была связана с перекачкой больших объемов данных между двумя серверами.
Давайте представим себе такую жизненную ситуацию. Есть большая таблица (сотни миллионов или миллиарды записей) на сервере источнике. Ее данные надо как-то перекачать в таблицу другого сервера-приемника. Причем структуры двух таблиц различны. После загрузки данных с сервера источника надо провести определенную обработку по преобразованию данных и только после этого следует выполнить вставку в таблицу приемник.
Понятно, что функция openquery, insert через связанный сервер не подойдут для решения задачи по работе с большими объемами данных. В таких случаях следует стремиться к устранению каких-либо операций протоколирования которые могут сопровождать операции переноса данных. Репликация также не поможет, поскольку таблицы имеют совершенно различные структуры.
Можно поступить так. Сперва извлечь данные в текстовый файл, с помощью консольного приложения, которое будет идти в цикле по таблице записывая строки в файл, разделяя столбцы определенным разделителем. Затем с помощью оператора bulk insert можно операцией с минимальным протоколированием скопировать данные из текстового файла в таблицу, структура которой совпадает со структурой таблицы источника. А уже после этого на стороне сервера-приемника разбираться с задачей обработки данных перед финальной операцией вставки в таблицу.
Какие могут быть подводные камни в этом плане. Ну во-первых программа, которая идет в цикле по строкам таблицы источника может работать долго из-за большого объема записей. Дополнительно представим себе, что для базы данных, в которой находится таблица источник используется оптимистичная модель блокирования, то есть снэпшотный уровень изоляции транзакции. Это означает, что возможности выполнять "грязное" чтение может не быть. И читая таблицу в момент, когда ее модифицирует кто-то другой на сервере источнике, мы будет читать снэпшот для модифицируемых записей. Чтение снэпшота медленнее чем чтение исходных записей, поскольку при исполнении запроса серверу приходится прыгать от физической строки через указатель к тому месту, где хранится копия предыдущего значения строки. Из-за этого скорость извлечения заметно замедлится. Более того, если объем читаемых и модифицируемых записей велик, то у сервера источника (по крайней мере в случае Oracle-а) может не хватить ресурсов для хранения всего снэпшота. Тогда запрос может отвалиться с ошибкой и всю работу придется начинать заново, а драгоценное время будет потеряно.
Что же делать? Напрашивается такой выход из сложившейся ситуации. Необходимо извлекать с помощью одного запроса к таблице источнику относительно небольшой объем записей, и записывать их в отдельный текстовый файл. Например, если в таблице есть столбец, по которому таблица секционирована или который является ведущим столбцом в кластерном индексе, то можно организовать цикл по всем значениям, которые может принимать данное поле. А на каждой итерации запускать консольное приложение, которое будет идти в цикле по набору записей, который получается с помощью запроса к таблице на сервере источнике с условием на равенство столбца определенному значению. Это позволит решить проблему, связанную с параллельным доступом к таблице-источнику. Развивая эту идею можно еще и ускорить процесс формирования файлов, за счет распараллелирования задачи. То есть организовать загрузку таким образом, чтобы одновременно работало несколько экземпляров консольного приложения, каждое из которых загружает данные из таблицы для своего значения столбца, который мы используем для дробления таблицы.
Такая организация функционирования загрузки на основе многопоточности позволяет использовать ресурсы процессора полностью и тем самым ускорить загрузку на порядок!
Есть один принципиальный момент. Предположим, что в таблице-источнике T есть столбец K, который мы решили использовать в условии запроса. Пусть, столбец K может принимать много значений, например, 100. Это значит, что должно отработать в общей сложности 100 потоков, а каждый поток будет работать со своим значением столбца K. Мощности процессора может не хватит на одновременную работу всех 100 потоков. Поэтому процедура, которая будет выполнять загрузку, должна запускать некоторое ограниченное число потоков, равное, например, 10. При этом процедура должна непрерывно следить за состоянием потоков. Как только один или несколько из потоков завершат свою работу, на их месте процедурой должны запускаться такое же количество новых потоков, каждый из которых будет выполнять загрузку данных для своего значения столбца K. Благодаря такой организации в каждый момент времени будет работать одинаковое число потоков, процессор будет использовать все свои ресурсы для обслуживания потоков и загрузка будет скоростной и безопасной!
Для реализации описанного выше плана нам необходим механизм, позволяющий кодом на языке Transact-SQL выполнять асинхронный запуск процедуры. Нет сомнения в том, что с этой задачей можно справиться с помощью хранимых процедур на основе clr, запуск асинхронных процессов с помощью sp_OACreate или xp_cmdshell, а также создание и запуск кодом специально подготовленных джобов (sp_start_job). Но на мой взгляд самый удобный и безопасный способ это служба Service Broker, которая появилась в СУБД MS SQL с 2005 года. Эта служба предоставляет богатый и удобный синтаксис, позволяющий решать задачи асинхронного запуска процессов на сервере, без использования какого-либо внешнего кода.
Приступим к разработке. Сформулируем примерное условие. Предположим, что у нас есть таблица на сервере источнике, в которой хранится история изменения атрибута некоторой сущности. Тем самым в таблице есть поле с датой присвоения сущности определенного значения атрибута. Пусть также это поле является первым столбцом в кластерном индексе таблицы на сервере, с которого грузятся данные. На сервере-приемнике есть программный файл, который через командную строку получает дату и выполняет загрузку данных в текстовый файл. Начнем писать код. У нас будут 2 настроечные таблицы. В первой будет храниться общая информация о характере загрузки: имя программного файла, число потоков и пр. Также будет таблица, описывающая состояние потоков, в ней будет по одной строке на каждое значение столбца с датой.
create table dbo.LoaderParameters
Давайте представим себе такую жизненную ситуацию. Есть большая таблица (сотни миллионов или миллиарды записей) на сервере источнике. Ее данные надо как-то перекачать в таблицу другого сервера-приемника. Причем структуры двух таблиц различны. После загрузки данных с сервера источника надо провести определенную обработку по преобразованию данных и только после этого следует выполнить вставку в таблицу приемник.
Понятно, что функция openquery, insert через связанный сервер не подойдут для решения задачи по работе с большими объемами данных. В таких случаях следует стремиться к устранению каких-либо операций протоколирования которые могут сопровождать операции переноса данных. Репликация также не поможет, поскольку таблицы имеют совершенно различные структуры.
Можно поступить так. Сперва извлечь данные в текстовый файл, с помощью консольного приложения, которое будет идти в цикле по таблице записывая строки в файл, разделяя столбцы определенным разделителем. Затем с помощью оператора bulk insert можно операцией с минимальным протоколированием скопировать данные из текстового файла в таблицу, структура которой совпадает со структурой таблицы источника. А уже после этого на стороне сервера-приемника разбираться с задачей обработки данных перед финальной операцией вставки в таблицу.
Какие могут быть подводные камни в этом плане. Ну во-первых программа, которая идет в цикле по строкам таблицы источника может работать долго из-за большого объема записей. Дополнительно представим себе, что для базы данных, в которой находится таблица источник используется оптимистичная модель блокирования, то есть снэпшотный уровень изоляции транзакции. Это означает, что возможности выполнять "грязное" чтение может не быть. И читая таблицу в момент, когда ее модифицирует кто-то другой на сервере источнике, мы будет читать снэпшот для модифицируемых записей. Чтение снэпшота медленнее чем чтение исходных записей, поскольку при исполнении запроса серверу приходится прыгать от физической строки через указатель к тому месту, где хранится копия предыдущего значения строки. Из-за этого скорость извлечения заметно замедлится. Более того, если объем читаемых и модифицируемых записей велик, то у сервера источника (по крайней мере в случае Oracle-а) может не хватить ресурсов для хранения всего снэпшота. Тогда запрос может отвалиться с ошибкой и всю работу придется начинать заново, а драгоценное время будет потеряно.
Что же делать? Напрашивается такой выход из сложившейся ситуации. Необходимо извлекать с помощью одного запроса к таблице источнику относительно небольшой объем записей, и записывать их в отдельный текстовый файл. Например, если в таблице есть столбец, по которому таблица секционирована или который является ведущим столбцом в кластерном индексе, то можно организовать цикл по всем значениям, которые может принимать данное поле. А на каждой итерации запускать консольное приложение, которое будет идти в цикле по набору записей, который получается с помощью запроса к таблице на сервере источнике с условием на равенство столбца определенному значению. Это позволит решить проблему, связанную с параллельным доступом к таблице-источнику. Развивая эту идею можно еще и ускорить процесс формирования файлов, за счет распараллелирования задачи. То есть организовать загрузку таким образом, чтобы одновременно работало несколько экземпляров консольного приложения, каждое из которых загружает данные из таблицы для своего значения столбца, который мы используем для дробления таблицы.
Такая организация функционирования загрузки на основе многопоточности позволяет использовать ресурсы процессора полностью и тем самым ускорить загрузку на порядок!
Есть один принципиальный момент. Предположим, что в таблице-источнике T есть столбец K, который мы решили использовать в условии запроса. Пусть, столбец K может принимать много значений, например, 100. Это значит, что должно отработать в общей сложности 100 потоков, а каждый поток будет работать со своим значением столбца K. Мощности процессора может не хватит на одновременную работу всех 100 потоков. Поэтому процедура, которая будет выполнять загрузку, должна запускать некоторое ограниченное число потоков, равное, например, 10. При этом процедура должна непрерывно следить за состоянием потоков. Как только один или несколько из потоков завершат свою работу, на их месте процедурой должны запускаться такое же количество новых потоков, каждый из которых будет выполнять загрузку данных для своего значения столбца K. Благодаря такой организации в каждый момент времени будет работать одинаковое число потоков, процессор будет использовать все свои ресурсы для обслуживания потоков и загрузка будет скоростной и безопасной!
Для реализации описанного выше плана нам необходим механизм, позволяющий кодом на языке Transact-SQL выполнять асинхронный запуск процедуры. Нет сомнения в том, что с этой задачей можно справиться с помощью хранимых процедур на основе clr, запуск асинхронных процессов с помощью sp_OACreate или xp_cmdshell, а также создание и запуск кодом специально подготовленных джобов (sp_start_job). Но на мой взгляд самый удобный и безопасный способ это служба Service Broker, которая появилась в СУБД MS SQL с 2005 года. Эта служба предоставляет богатый и удобный синтаксис, позволяющий решать задачи асинхронного запуска процессов на сервере, без использования какого-либо внешнего кода.
Приступим к разработке. Сформулируем примерное условие. Предположим, что у нас есть таблица на сервере источнике, в которой хранится история изменения атрибута некоторой сущности. Тем самым в таблице есть поле с датой присвоения сущности определенного значения атрибута. Пусть также это поле является первым столбцом в кластерном индексе таблицы на сервере, с которого грузятся данные. На сервере-приемнике есть программный файл, который через командную строку получает дату и выполняет загрузку данных в текстовый файл. Начнем писать код. У нас будут 2 настроечные таблицы. В первой будет храниться общая информация о характере загрузки: имя программного файла, число потоков и пр. Также будет таблица, описывающая состояние потоков, в ней будет по одной строке на каждое значение столбца с датой.
create table dbo.LoaderParameters
(
WorkerCount int not null,
ProgramFile nvarchar ( 255 ) not null,
DateStart datetime not null,
DateEnd datetime not null
) on [PRIMARY]
В поле WorkerCount будет храниться число потоков, которое должно работать одновременно. ProgramFile это полное имя программного файла, который выполняет загрузку за определенный день. В последних двух полях фиксируются даты начала и окончания, то есть эти поля дают диапазон значений, для которых надо загружать данные. Теперь построим таблицу, по которой мы будем отслеживать состояние потоков.
create table dbo.WorkerState
(
dtDate datetime not null,
FilePath nvarchar ( 255 ) not null,
Stat int not null,
RetryAttempts int not null,
CurAttempt int not null,
WorkerLog nvarchar ( max ) not null,
constraint PK_WorkerState_dtDate
primary key clustered ( dtDate asc )
) on [PRIMARY]
В таблице WorkerState каждая строка ассоциируется с некоторым экземпляром консольного приложения, которое осуществляет закачку данных за определенную дату (поле dtDate), данные складываются в определенный файл (поле FilePath). Причем загрузка может находиться в одном из состояний (значения поля Stat): 0 (загрузка еще не началась, экземпляр программы еще не запущен, но готов к этому), 1 (программа извлекает данные за день dtDate), 2 (загрузка завершилась с ошибкой), 3 (загрузка успешно завершена). Кроме перечисленных полей, есть еще 2 поля, в которых хранится информация о числе повторных попыток запуска программного файла, если загрузку не получилось провести с первого раза из-за возможных ошибок на стороне сервера источника.
Напишем хранимую процедуру, которая должна запускать экземпляр консольного приложения, передавая ему информацию о том, за какую дату производить загрузку данных. Предполагается, что в коде консольного приложения есть код, выполняющий запрос к таблице dbo.WorkerState с условием на переданную дату, для получения информации о том, в какой файл складывать данные.
Хранимая процедура dbo.LoadOneFile будет запускаться в асинхронном режиме. Для того, чтобы это было возможно, необходимо использовать службу Service Broker. Для начала разрешим ее использование для базы данных Import:
Напишем хранимую процедуру, которая должна запускать экземпляр консольного приложения, передавая ему информацию о том, за какую дату производить загрузку данных. Предполагается, что в коде консольного приложения есть код, выполняющий запрос к таблице dbo.WorkerState с условием на переданную дату, для получения информации о том, в какой файл складывать данные.
use Import
go
if sessionproperty ( N'quoted_identifier'
) = 0
set quoted_identifier
on
go
if sessionproperty ( N'ansi_nulls' ) = 0
set ansi_nulls on
go
if object_id ( N'Import.dbo.LoadOneFile', N'P' ) is null
exec ( N'create proc dbo.LoadOneFile as return 1' )
go
alter proc dbo.LoadOneFile
(
@dtDate datetime
)
as
begin
set nocount, xact_abort on
declare @cmd varchar ( 8000 ), @res int, @WorkerLog nvarchar ( max ), @CurAttempt int, @RetryAttempts int,
@CurWorkerLog nvarchar ( max )
if object_id ( N'tempdb..#LoaderResult',
N'U' ) is not null
drop table #LoaderResult
create table
#LoaderResult
(
Res nvarchar ( max )
)
begin try
select @cmd = '"' + cast ( ProgramFile as varchar ( 8000 ) ) + '" ' + convert ( varchar ( 100 ), @dtDate, 112 )
from dbo.LoaderParameters
select @CurAttempt = CurAttempt, @RetryAttempts = RetryAttempts
from WorkerState
set @res = 2
while @CurAttempt <= @RetryAttempts
begin
insert into #LoaderResult ( Res )
exec master.dbo.xp_cmdshell @cmd
if exists
(
select top 1 1
from #LoaderResult
where lower ( Res ) like lower ( N'%work done%' )
)
begin
set @res = 3
break
end
else
begin
declare curLoadFileLog
cursor local static forward_only for
select isnull ( Res, N'' ) Res
from #LoaderResult
open curLoadFileLog
fetch next from curLoadFileLog into @CurWorkerLog
while @@fetch_status
= 0
begin
set @WorkerLog = isnull ( @WorkerLog, N'' ) + N' ' + isnull ( @CurWorkerLog, N'' )
fetch next from curLoadFileLog into @CurWorkerLog
end
close curLoadFileLog
deallocate curLoadFileLog
end
set @CurAttempt += 1
end
update dbo.WorkerState
set Stat = @res, WorkerLog = isnull ( @WorkerLog, N'' )
where dtDate = @dtDate
end try
begin catch
update dbo.WorkerState
set Stat = 2, WorkerLog = error_message ()
where dtDate = @dtDate
end catch
end
go
Хранимая процедура dbo.LoadOneFile будет запускаться в асинхронном режиме. Для того, чтобы это было возможно, необходимо использовать службу Service Broker. Для начала разрешим ее использование для базы данных Import:
alter database Import set enable_broker
Код выше накладывает блокировку изменения схемы уровня базы данных, поэтому этот код будет заблокирован, если в базе будут другие активные пользовательские соединения. Когда код отработает, у базы появляется идентификатор брокера:
select service_broker_guid
from sys.databases
where name = N'Import'
Ниже мы напишем процедуру которая будет асинхронно запускать dbo.LoadOneFile и получать xml-документ, содержащий дату, за которую необходимо выполнить загрузку данных. Будем привязывать xml-документ к xml-схеме, которая создается ниже.
Теперь необходимо создать объекты службы Service Broker. Асинхронный запуск хранимой процедуры будет осуществляться с помощью активации хранимой процедуры, которая происходит в момент появления сообщения в специальной очереди. Очереди будут созданы немного позже, а сейчас надо создать типы сообщений, которые дадут возможность создавать сообщения, которые можно будет класть в очереди. Код ниже создает типы сообщений для очередей, принимающих запросы на асинхронный запуск процедуры загрузки и для очереди, которая будет содержать сообщения о том, что определенный запуск процедуры уже завершился:
Теперь необходимо создать так называемый контракт, то есть объект, который позволяет выполнять передачу сообщений тех или иных типов между участниками диалога (в нашем примере между соединением, которое инициирует асинхронный запуск процедуры dbo.LoadOneFile и соединением, которое выполняет запуск данной процедуры и сигнализирует о его окончании).
Создадим очередь, которая будет принимать запросы на асинхронный запуск.
if object_id ( N'Import.dbo.srcLoadData', N'SQ' ) is not null
if object_id ( N'Import.dbo.DistributeLoadMessages', N'P' ) is null
Заметим, что в отличие от предыдущей очереди, в данной прописаны параметры активации, то есть имя хранимой процедуры, которая активируется при попадании в очередь сообщения, и которая запускает процедуру загрузки dbo.LoadOneFile.
Теперь можно создать службы.
Настало время создать хранимую процедуру, которая получает в качестве параметра дату и выполняет асинхронный запуск процедуры, указанной в параметрах активации очереди trgLoadData.
if sessionproperty ( N'ansi_nulls' ) = 0
if sessionproperty ( N'quoted_identifier' ) = 0
Все, что осталось сделать, это написать хранимую процедуру, которая будет следить за состоянием потоков и очереди запросов на выполнение работы. Данная процедура будет играть роль планировщика.
Сделаем несколько пояснений к коду. В коде процедуры запускается определенное число потоков, равное столбцу WorkerCount таблицы dbo.LoaderParameters. Затем через равные интервалы времени происходит проверка числа работающих потоков через таблицу статусов. При этой проверке значение WorkerCount всегда вычисляется заново. Это делается для того, чтобы число одновременно работающих потоков можно было динамически уменьшать или увеличивать (вдруг во время загрузки перестанет хватать ресурсов процессора для обслуживания, скажем, 10 одновременно работающих потоков, и тогда, не отменяя загрузку, это число можно будет уменьшить по ходу закачки). При уменьшении числа потоков процедура добавляет новые потоки из числа необработанных дат для достижения нужного числа одновременно работающих программ.
При проведении тестов в моей работе цикл, который бегал по строкам запроса к таблице на другом сервере Oracle, возвращающего 150 млн. записей, складывал данные в текстовый файл около 5 часов, при этом периодически возникали ошибки устаревания снэпшота. После внедрения описанной выше технологии, загрузка при использовании 5 потоков сократилась до 35 минут, а при увеличении числа одновременно работающих потоков до 10 время загрузки снизилось до 25 минут. То есть скорость возросла более чем в 10 раз! При этом тестирование происходило на сервере-приемнике, на котором был единственный двухядерный процессор!
Эту методу можно найти и дальнейшие применения. Например, после того как данные загружены в текстовый файл их надо переместить в таблицы с помощью оператора bulk insert. И эту задачу также можно делать на основе многопоточности, с помощью того же метода, который описан выше. В моей работе на том же сервере с двумя ядрами 20 постоянно одновременно работающих соединений складывали данные из 90 файлов общим объемом в 12 Гб в таблицы за 9 минут.
use Import
go
if sessionproperty ( N'quoted_identifier'
) = 0
set quoted_identifier
on
go
if sessionproperty ( N'ansi_nulls' ) = 0
set ansi_nulls on
go
if exists
(
select top 1 1
from Import.sys.xml_schema_collections
where
name = N'LoadParam' and
[schema_id] = schema_id ( N'dbo' )
)
begin
raiserror ( N'Коллекция схем xml с именем "%s" уже существует в схеме "%s" базы данных "%s".', 16, 1,
N'LoadParam', N'dbo', N'Import' )
return
end
create xml schema collection dbo.LoadParam
as
N'
<xsd:schema
xmlns:xsd="http://www.w3.org/2001/XMLSchema">
<xsd:annotation>
<xsd:documentation
xml:lang="ru">
xml-схема предназначена для проверки на валидность xml-документов, используемых для передачи параметров при асинхронных вызовах процедуры загрузки.
Автор:
Владимир Едынак
</xsd:documentation>
</xsd:annotation>
<xsd:element
name="LoadPrm" type="LoadPrmType"/>
<xsd:complexType
name="LoadPrmType">
<xsd:sequence>
<xsd:element
name="dtDate" type="xsd:date" minOccurs="1"
maxOccurs="1"/>
</xsd:sequence>
</xsd:complexType>
</xsd:schema>
'
go
Теперь необходимо создать объекты службы Service Broker. Асинхронный запуск хранимой процедуры будет осуществляться с помощью активации хранимой процедуры, которая происходит в момент появления сообщения в специальной очереди. Очереди будут созданы немного позже, а сейчас надо создать типы сообщений, которые дадут возможность создавать сообщения, которые можно будет класть в очереди. Код ниже создает типы сообщений для очередей, принимающих запросы на асинхронный запуск процедуры загрузки и для очереди, которая будет содержать сообщения о том, что определенный запуск процедуры уже завершился:
if exists
(
select top 1 1
from Import.sys.service_message_types
where name = N'srcLoadData'
)
begin
raiserror ( N'Тип сообщений с именем "%s" уже существует в базе данных "%s".', 16, 1, N'srcLoadData', N'Import' )
return
end
create message type srcLoadData
authorization dbo
validation = valid_xml with schema collection dbo.LoadParam
go
if exists
(
select top 1 1
from Import.sys.service_message_types
where name = N'trgLoadData'
)
begin
raiserror ( N'Тип сообщений с именем "%s" уже существует в базе данных "%s".', 16, 1, N'trgLoadData', N'Import' )
return
end
create message type trgLoadData
authorization dbo
validation = well_formed_xml
go
if exists
(
select top 1 1
from Import.sys.service_contracts
where name = N'LoadData'
)
begin
raiserror ( N'Контракт с именем "%s" уже существует в базе данных "%s".', 16, 1,
N'LoadData', N'Import' )
return
end
create contract LoadData
authorization dbo
(
srcLoadData sent by initiator,
trgLoadData sent by target
)
go
begin
raiserror ( N'Очередь с именем "%s" уже существует в схеме "%s" базы данных "%s".',
16, 1,
N'srcLoadData', N'dbo', N'Import' )
return
end
create queue dbo.srcLoadData
with
status = on,
retention = off
on [DEFAULT]
go
Перед созданием очереди, в которую будут класться сообщения с отчетами о работе процедуры dbo.LoadOneFile создадим процедуру, которая будет ее читать.
use Import
go
if sessionproperty ( N'ansi_nulls' ) = 0
set ansi_nulls on
go
if sessionproperty ( N'quoted_identifier' ) = 0
set quoted_identifier on
go
if object_id ( N'Import.dbo.DistributeLoadMessages', N'P' ) is null
exec ( N'create proc dbo.DistributeLoadMessages as return 1' )
go
alter proc dbo.DistributeLoadMessages
as
begin
set nocount, xact_abort on
declare @ConversHandle
uniqueidentifier,
@ServiceName nvarchar ( 128 ),
@MessTypeName nvarchar ( 128 ), @MesBody xml ( dbo.LoadParam ), @prm xml, @XmlHandle int,
@dtDate date, @Respond xml
;
receive top ( 1 ) @ConversHandle = [conversation_handle],
@ServiceName = [service_name],
@MessTypeName = message_type_name, @MesBody = message_body
from Import.dbo.trgLoadData
if @@rowcount = 0
return
if @MessTypeName not in ( N'srcLoadData' )
return
if @ServiceName not in ( N'trgLoadData' )
return
set @prm = @MesBody
exec sp_xml_preparedocument @XmlHandle out, @prm
select @dtDate = dtDate
from openxml ( @XmlHandle, N'/LoadPrm', 1 )
with
(
dtDate date N'dtDate'
)
exec sp_xml_removedocument @XmlHandle
set @Respond = N'<Respond>Work is
done for date "' + convert ( nvarchar ( 100 ), @dtDate, 105 ) + '"</Respond>'
if @ServiceName in ( N'trgLoadData' )
begin
exec Import.dbo.LoadOneFile @dtDate
end
;
send on conversation @ConversHandle
message type trgLoadData ( @Respond )
end
go
В коде выше используется переменная @ServiceName, обозначающая имя службы. Назначение этого параметра будет объяснено позднее. А теперь создадим очередь, в которую будут складываться сообщения об окончании работы процедуры dbo.LoadOneFile.
if object_id ( N'Import.dbo.trgLoadData', N'SQ' ) is not null
begin
raiserror ( N'Очередь с именем "%s" уже существует в схеме "%s" базы данных "%s".',
16, 1,
N'trgLoadData', N'dbo', N'Import' )
return
end
create queue dbo.trgLoadData
with
status = on,
retention = off,
activation
(
status = on,
procedure_name = Import.dbo.DistributeLoadMessages,
max_queue_readers =
30000,
exec as self
)
on [DEFAULT]
go
Теперь можно создать службы.
if exists
(
select top 1 1
from Import.sys.services
where name = N'srcLoadData'
)
begin
raiserror ( N'Служба с именем "%s" уже существует в базе данных "%s".', 16, 1,
N'srcLoadData', N'Import' )
return
end
create service srcLoadData
authorization dbo
on queue dbo.srcLoadData ( LoadData )
go
if exists
(
select top 1 1
from Import.sys.services
where name = N'trgLoadData'
)
begin
raiserror ( N'Служба с именем "%s" уже существует в базе данных "%s".', 16, 1,
N'trgLoadData', N'Import' )
return
end
create service trgLoadData
authorization dbo
on queue dbo.trgLoadData ( LoadData )
go
use Import
go
if sessionproperty ( N'ansi_nulls' ) = 0
set ansi_nulls on
go
if sessionproperty ( N'quoted_identifier' ) = 0
set quoted_identifier on
go
if object_id ( N'Import.dbo.StartFileLoading', N'P' ) is null
exec ( N'create proc dbo.StartFileLoading as return 1' )
go
alter proc dbo.StartFileLoading
(
@dtDate datetime
)
as
begin
set nocount, xact_abort on
declare @ConversHandle
uniqueidentifier,
@Request xml ( dbo.LoadParam )
begin try
set @Request = N'
<LoadPrm>
<dtDate>' + convert ( nvarchar ( 100 ), cast ( @dtDate as date ), 121 ) + '</dtDate>
</LoadPrm>
'
begin dialog conversation @ConversHandle
from service srcLoadData to service N'trgLoadData'
on contract LoadData
with encryption = off
;
send on conversation @ConversHandle
message type srcLoadData ( @Request )
end try
begin catch
update Import.dbo.WorkerState
set Stat = 2, WorkerLog = N'Произошла ошибка при попытке отправить сообщение брокера (служба srcLoadData): ' +
error_message ()
where dtDate = @dtDate
end catch
end
go
Процедура dbo.StartFileLoading после запуска сразу завершает свою работу, а на сервере активируется новое соединение, в котором происходит запуск процедуры dbo.DistributeLoadMessages, которая читает очередь, находит сообщение, извлекает параметры тела сообщения, запускает процедуру dbo.LoadOneFile.
Все, что осталось сделать, это написать хранимую процедуру, которая будет следить за состоянием потоков и очереди запросов на выполнение работы. Данная процедура будет играть роль планировщика.
use Import
go
if sessionproperty ( N'quoted_identifier'
) = 0
set quoted_identifier
on
go
if sessionproperty ( N'ansi_nulls' ) = 0
set ansi_nulls on
go
if object_id ( N'Import.dbo.LoadDataScheduler', N'P' ) is null
exec ( N'create proc dbo.LoadDataScheduler as return 1' )
go
alter proc dbo.LoadDataScheduler
as
begin
set nocount, xact_abort on
declare @DateStart datetime, @DateEnd datetime, @WorkerCount int, @CurWorkerCount int, @dtDate datetime,
@ConversHandle uniqueidentifier
if object_id ( N'tempdb..#NewWorkers',
N'U' ) is not null
drop table #NewWorkers
create table #NewWorkers
(
dtDate datetime not null,
primary key clustered ( dtDate asc )
) on [PRIMARY]
select @DateStart = DateStart, @DateEnd = DateEnd, @WorkerCount = WorkerCount
from Import.dbo.LoaderParameters
delete Import.dbo.WorkerState
;
with Dates
as
(
select @DateStart dtDate
union all
select dateadd ( [dd], 1, dtDate ) dtDate
from Dates
where dtDate < @DateEnd
)
insert into Import.dbo.WorkerState
(
dtDate,
CurAttempt,
FilePath,
RetryAttempts,
Stat,
WorkerLog
)
select
dtDate, 1, N'C:\Users\Владимир\Desktop\'
+ convert ( nvarchar ( 100 ), dtDate, 121 ) + '.txt', 20, 0, N''
from Dates
set @CurWorkerCount = 1
while @CurWorkerCount
<= @WorkerCount
begin
select top 1 @dtDate = dtDate
from Import.dbo.WorkerState with ( nolock )
where Stat = 0
if @@rowcount <> 0
begin
update Import.dbo.WorkerState
set Stat = 1
where dtDate = @dtDate
exec Import.dbo.StartFileLoading @dtDate
end
set @CurWorkerCount += 1
end
while exists
(
select top 1 1
from Import.dbo.WorkerState with ( nolock )
where Stat = 0
)
begin
select @WorkerCount = WorkerCount
from Import.dbo.LoaderParameters
select @CurWorkerCount
= count (*)
from Import.dbo.WorkerState with ( nolock )
where Stat = 1
if @WorkerCount - @CurWorkerCount <= 0
begin
waitfor delay N'00:00:01'
end
else
begin
delete #NewWorkers
insert into #NewWorkers ( dtDate )
select top ( @WorkerCount - @CurWorkerCount ) dtDate
from Import.dbo.WorkerState with ( nolock )
where Stat = 0
update Stat
set Stat.Stat = 1
from
Import.dbo.WorkerState Stat
inner
join
#NewWorkers
tmp on Stat.dtDate = tmp.dtDate
declare curNewWrkStart cursor
local static forward_only for
select dtDate
from #NewWorkers
open curNewWrkStart
fetch next from curNewWrkStart into @dtDate
while @@fetch_status = 0
begin
exec Import.dbo.StartFileLoading @dtDate
fetch next from curNewWrkStart into @dtDate
end
close curNewWrkStart
deallocate curNewWrkStart
end
end
while exists
(
select top 1 1
from Import.dbo.WorkerState with ( nolock )
where Stat = 1
)
begin
waitfor delay N'00:00:01'
end
declare curLoadDialogClear
cursor local static forward_only for
select distinct [conversation_handle]
from Import.dbo.srcLoadData
open curLoadDialogClear
fetch next from curLoadDialogClear
into @ConversHandle
while @@fetch_status
= 0
begin
end conversation @ConversHandle
fetch next from curLoadDialogClear
into @ConversHandle
end
close curLoadDialogClear
deallocate curLoadDialogClear
if exists
(
select top 1 1
from Import.dbo.WorkerState with ( nolock )
where Stat = 2
)
begin
raiserror ( N'В ходе работы одного из потоков возникла ошибка. Подробности в логе.', 16, 1 )
end
end
go
Сделаем несколько пояснений к коду. В коде процедуры запускается определенное число потоков, равное столбцу WorkerCount таблицы dbo.LoaderParameters. Затем через равные интервалы времени происходит проверка числа работающих потоков через таблицу статусов. При этой проверке значение WorkerCount всегда вычисляется заново. Это делается для того, чтобы число одновременно работающих потоков можно было динамически уменьшать или увеличивать (вдруг во время загрузки перестанет хватать ресурсов процессора для обслуживания, скажем, 10 одновременно работающих потоков, и тогда, не отменяя загрузку, это число можно будет уменьшить по ходу закачки). При уменьшении числа потоков процедура добавляет новые потоки из числа необработанных дат для достижения нужного числа одновременно работающих программ.
При проведении тестов в моей работе цикл, который бегал по строкам запроса к таблице на другом сервере Oracle, возвращающего 150 млн. записей, складывал данные в текстовый файл около 5 часов, при этом периодически возникали ошибки устаревания снэпшота. После внедрения описанной выше технологии, загрузка при использовании 5 потоков сократилась до 35 минут, а при увеличении числа одновременно работающих потоков до 10 время загрузки снизилось до 25 минут. То есть скорость возросла более чем в 10 раз! При этом тестирование происходило на сервере-приемнике, на котором был единственный двухядерный процессор!
Эту методу можно найти и дальнейшие применения. Например, после того как данные загружены в текстовый файл их надо переместить в таблицы с помощью оператора bulk insert. И эту задачу также можно делать на основе многопоточности, с помощью того же метода, который описан выше. В моей работе на том же сервере с двумя ядрами 20 постоянно одновременно работающих соединений складывали данные из 90 файлов общим объемом в 12 Гб в таблицы за 9 минут.