Планировщик бизнес-процессов

Для реализации в следующей версии BPE 5.0 примитивов #gateway и #subProcess (ISO/IEC-19510), нам понадобится что-то вроде скедулера, а также реализация древовидной истории процесса. Текущая версия BPE, как вы знаете, однопоточная, это означает, что для реализации паралельных вычислений вы должны явно стартовать другой процесс из определенной задачи, и он уже будет управлятся напрямую своим Erlang-процессом, но тоже иметь свою линейную историю. Связывать и ждать завешение вам тоже нужно самому и изобретать для этого свой протокол. Почему сразу не написали ISO c субпроцессами? Потому что хотели показать, что и без гейтвеев можно строить системы, показать то что важно, но теперь пришла пора полностью реализовать стандарт. Поэтому сразу — новые рекорды и алгоритм.

#gateway

Согласно стандарту гейтвеи #gateway в общем случае это мультиплексоры или демультиплексоры с разной логикой: "И" (паралельное выполнение), "ИЛИ" (однопоточное условное выполнение, условный оператор), а также для редюсеров: ждать "ВСЕ" ребра, "ОДНО" или определённое "ПОДМНОЖЕСТВО" рёбер.

-type gate() :: none | exclusive | parallel | inclusive | complex | event.

#subProcess

#subProcess же просто обычная задача, которая создает дополнительную группировку, в реальности все будет происходить в том же Эрланг процессе, новый эрланг процесс создаваться не будет.

Алгоритм

Начнем с примера. Представим себе граф Х, в которм вершины B, C, F являются паралельным многопоточным сплитом (fork), а вершины G и J являются паралельными демультимплексорами, которые ожидают завершение всех входящих вершин (join), т.е. их наличие в истории процесса.

E ----------- H 4 / 7 / C / 8 1 2 / \ 5 9 / A ---- B F --- I / [ Граф X ] 3 \ \ 11 \ 10 D - G --- J ----- K 6 12 13

NOTE: обратите внимание, что в общем случае у процесса может быть несколько точек входа, это означает, что на начальном этапе в планировщие уже присутствует несколько паралельных потоков выполнения которые будут квантоваться линейной последовательностью событий самого Erlang-процесса.

Итак, в примере у нас старт с А(1), она же будет первым потоком планировщика, адреса других потоков планировщика (в разное время): A/B(2,3), A/B/C(4,5), A/B/C/F(9,11), т.е. кортежи составленые из разветвлений.

Первый шаг, начинаем процесс и выполняем ребро 1, которе нам дает единственный поток выполнения А, событие #beginEvent. Сразу перед вторым шагом делаем следующее: видим, что ребро 1 нас привело в состояние B, из которого исходят паралельные ребра 2 и 3 поэтому мы их еще до выполнения второго шага, ложим в планировщик, а ребро 1 и поток выполнения А удаляем, так как после этой точки уже будут другие потоки управления. На втором шаге у нас два потока выполнения: A/B(2) и A/B(3). Выбираем первый попавшийся и делаем шаг по этому ребру, например, A/B(2). Тут, поскольку стоит опять гейтвей, мы еще перед выполнением третьего шаго изменяем потоки планировщика, и, точно также как на первом ребре, мы удаляем A/B(2) поток и на его место создаем два потока A/B/C(4) и A/B/C(5), поток A/B(3) при этом не трогаем.

Внутри на каждом шаге мы храним курсоры потоков выполнения в циклическом массиве и крутим его индекс на каждом шаге, одновременно обновляя курсоры. В приведенном линеаризированом трейсе покажем как упаковывается древовидная история процесса.

Шаг 0: BEGIN, [BEGIN->1], 1
Шаг 1: [1->2,3], 1
Шаг 2: [2->4,5;3], 3
Шаг 3: [4;5;3->6], 1
Шаг 4: [4->7;5;6], 2
Шаг 5: [7;5->9,11;6], 4
Шаг 6: [7;9;11], 1
Шаг 7: [7->8;9;11], 2
Шаг 8: [8;9->10;11], 3
Шаг 9: [8;11->10;12], 3
Шаг 10: [10;12], 1
Шаг 11: [12], 1
Шаг 12: [13], 1
Шаг 13: [], 0

В этом CSV-логе первый элемент -- это номер выполнившегося ребра, второй -- это состояние планировщика (список его потоков разделенных точкой с запятой), третий -- это указатель на текущий поток выполнения.

На join элементах нужно гарантировать, что входщие ребра необходимые для пропихивания процесса вперед, находятся в истории процесса (это делается за 1 get по индексу истории процесса для каждого входящего ребра). К счатью в нашем примере алгоритм линеаризировал автоматически правильную последовательность без рейсов и блокировок.

При наличие циклов в графе, трейс процеса потенциально может иметь бесконечную длину. Алгоритм позволяет создать план выполнения даже бесконечных процессов при отсутствии свободных переменных в логике (функции) процесса, в этом случае недетерминизм.

Имплементация

Представление графа Х (описание бизнес-процеса):

start() -> R=[{r1, a,b}, {r2, b,c}, {r3, b,d}, {r4, c,e}, {r5, c,f}, {r6, d,g}, {r7, e,h}, {r8, h,j}, {r9, f,i}, {r10,i,j}, {r11,f,g}, {r12,g,j}, {r13,j,k}], N=[{a,[],[r1]}, {b,[r1],[r2,r3]}, {c,[r2],[r4,r5]}, {d,[r3],[r6]}, {e,[r4],[r7]}, {f,[r5],[r9, r11]}, {g,[r6,r11],[r12]}, {j,[r8,r10,r12],[r13]}, {i,[r9],[r10]}, {h,[r7],[r8]}, {k,[r13],[]}], walk(R, N, {0, [r1], 1, []}).

Рекурсор процесса:

walk(_,_, {_,[],_, Visited}) -> lists:reverse(Visited); walk(R,N, State) -> walk(R,N, step(R, N, State)).

Прототип планировщика на 10 строчек:

0 step(Rs, Ns, {Step, Threads, Pointer, Visited}) -> 1 CurrentR = lists:keyfind(lists:nth(Pointer, Threads), 1, Rs), 2 CurrentN = lists:keyfind(element(3, CurrentR), 1, Ns), 3 Rids = element(2, CurrentN), 4 NewStep = Step + 1, 5 NewVis = [CurrentR | Visited], 6 Check = lists:all(fun(Rid) -> lists:member(Rid, [element(1,R) || R <- NewVis]) end, Rids), 7 Inserted = case Check of false -> []; true -> element(3, CurrentN) end, 8 NewThreads = lists:sublist(Threads, Pointer-1) ++ Inserted ++ lists:nthtail(Pointer, Threads), 9 NewPointer = if Pointer == length(Threads) -> 1; true -> Pointer + length(Inserted) end, 10 {NewStep, NewThreads, NewPointer, NewVisited}.

Тут можно почитать про то как это же делается в Camunda.