F# WebSocket Server
Аннотация
Идея писать веб-сервера и веб-фреймворки на всех языках у меня возникла с тех пор, когда я понял, что то, что я сделал для экосистемы Erlang: направление фреймворков для предприятий под общим брендом N2O, а сейчас как часть платформы erp.uno; вполне применимо и для других языков и платформ. В этой статье представлена версия вебсокет-сервера для языка программирования F# — ws.erp.uno. Адрес пакета: nuget.org/packages/ws. Адрес репозитория: erpuno/ws.
Предисловие
Haskell. Первый эксперимент был совершен Андреем Мельниковым в виде порта для Хаскеля: N2O.HS, позже более полную версию с N2O и NITRO с экзестенциальными сигнатурами сделал Марат Хафизов, который управляет Github организацией O3 и сайтом o3.click. Мне совершенно непонятно, почему ни один хаскель программист, которые вроде как должны восхищаться минимализмом, не идет по этому пути, а обычно ищет правды в таких фреймворках как UrWeb, IHP, UnisonWeb. На мой взгляд — всё это переусложненные штуки.
Standard ML. Также в академических целях, Марат Хафизов сделал порт связки веб-сервера N2O и веб-фреймворка NITRO на язык Standard ML (обе главные версии SML/NJ и MLton) — эта работа представлена Github организацией O1. Это тот язык, который я считаю уместно преподавать как первый академический язык программирования (до знакомства с промышленными языками Erlang, F#, Haskell).
Lean. Для закрепления своей идеи и более четкой и точной ее артикуляции я попросил Siegmentation Fault сделать порт на еще более формальный язык программирования, математический прувер Lean 4. Эта версия связки веб-сервера N2O и веб-фреймворка NITRO представлена Github организацией O89 и сразу двумя сайтами: lean4.dev и bum.pm. Последний представляет собой пакетный менеджер написанный на Lean 4, который нам помогает администрировать Александр Темерев из CERN. Lean 4 N2O проекты залайкал Леонардо де Мура, автор Lean и Z3, чему мы безмерно рады.
Идиоматический веб-сервер на F#
Критерии идиоматичности могут каждым восприниматься по разному, но в основном это означаем минимум прелюдий и максимум сути, так или иначе основная мантра всех минималистов в общем и N2O инфраструктуры в частности. Так в современные критерии идиоматичности веб-сервера для языка F# я бы выделил следующее: 1) использование системных классов System.Net.WebSockets, которые уже предоставляют буферизированные енкодер и декодер фреймов стандарта RFC 6455; 2) сервер должен быть построен на Async компютейшинал экспрешинах; 3) для управления асинхронными потоками выполнения должен использоваться MailboxProcessor, а не самописная система воркеров, которая хоть и поможет выжать последнее из F# (у меня получилось 14 миллионов сообщений в секунду), но не продемонстрирует сути, так как будет девиацией в сторону рантаймов; 4) Использование классов TcpListener и TcpClient, NetworkStream. Больше ни чем не разрешается пользоваться!
Что почитать перед написанием?
Немного погуглив, я понял что интернету нехватает статьи, которая описывает историю понятия асинхронных вычислений и вычислительных выражений, которые в народе известны по ключевым словам async/await. Вижу статью, которая называется "Survey of Brief Async history", в которой будет показана ретроспектива Async технологии:
0) J operator 1965;
1) LISP call/cc 1968;
2) Erlang 1986;
3) Concurrent ML 1998;
4) Haskell async 2004;
5) C# async yield 2006;
6) Perl IO:Async 2007;
7) F# Async 2010
8) C#/PHP Async 2012
9) Python async 2015
10) ECMAScript async 2017
Основополагающей статьей по F# async я бы назвал F# Async Guide Лео Городинского, jet.com. Основной книгой, которую я бы порекомендовал полистать перед знакомством с F# — это "Expert F# 4.0" автора языка Дона Сайма. Основной презентацией по F# Async я бы назвал доклад Дона Сайма на митапе в Лондоне — Some F# for the Erlang programmer. Вооружившись этими документами и этим Gist сниппетом я ухал во Львов писать самый идиоматичный вебсокет-сервер.
Витрина
Как обычно принятно в бектрекинг системах, прологах и декларативных языках, будем двигаться с конца, а именно с интерфейса который мы хотим получить. Хочется, чтобы ЭХО-сервер представлял собой функцию id.
open N2O
module Program =
[<EntryPoint>]
let main _ =
let mutable ret = 0
try Stream.protocol <- fun x -> x
use ws = Server.start "0.0.0.0" 1900
System.Threading.Thread.Sleep -1
with exn ->
printfn "EXIT: %s" exn.Message
ret <- 1
ret
Архитектура асинхронных процессов
Для тех, кто знаком с архитектурой Erlang/OTP, известно, что проектирование сетевых приложений начинается с дерева супервижина легковесных процессов и протоколов которые определяют их взаимодействие. Подчиненные дочерние процессы обычно разделяют токены времени жизни CancellationToken, благодаря чему исключения возникшие в родительских процессах могут отменить все дерево подпроцессов. Поэтому в циклах процессов присутствует выражение
while not ct.IsCancellationRequested do
Наш вебсокет-сервер состоит из 7 асинхронных процессов:
[Sup] [L]*
/ /
[start]--[S]--[C]*
\ \
[H] [T]*
Легенда этого дерева такова: [start] нода представляет собой точку входа, из которой будут рождаться остальные асинхронные процессы, соотвествует функции start; [S] нода соовтествует асинхронному процессу, который представлен функцией listen; [Sup] нода соотвествует функции startSupervisor; [C] нода соотвествует функции startClient; [H] нода соотвествует функции heartbeat; [L] нода соотвествует функции loop; [T] нода соотвествует функции telemetry. Звездочкой будем обозначать процессы, количество которых зависит от количества активных соединений: [C]*, [L]*, [T]*.
Протоколы взаимодействия
В момент рождения клиента [C] в родительском процессе сервера [S] происходит нотификация [S]->[Sup] по так называемому протоколу супервижина Sup с одноименным типом. Публичный протокол публичной функции Stream.protocol представлен типом Msg, который предназначен для управления асинхронным процессом [L].
Система серверных пингов реализована совместимой с протоколами Sup и Msg, хартбит процесс [H] через интервал времени посылает Tick сообщение в супервизор [Sup], который в свою очередь шлет броадкаст для всех клиентов телеметрии [T] созданных на той же очереди, что и [C], т.е. тот же протокол.
Процесы [T], [L] и [C] разделяют WebSocket стрим и все связаны с супервизором сервера, нотифицируя его в случае возникновения исключений.
type Msg =
| Bin of byte array
| Text of string
| Nope
type Sup =
| Connect of MailboxProcessor<Msg> * WebSocket
| Disconnect of MailboxProcessor<Msg>
| Close of WebSocket
| Tick
RFC 6455 Хендшейк
Функции обработки HTTP хедеров. isWebSocketsUpgrade ищет в хедерах пару Upgrade и WebSocket. getLines возвращает хедеры как массив строк, а функция getKey возвращает значение хедера по его ключу.
let isWebSocketsUpgrade (lines: string array) =
Array.exists (fun (x:string) -> "upgrade: websocket" = x.ToLower()) lines
let getKey (key: string) arr =
try let f (s: String) = s.StartsWith(key)
(Array.find f arr).[key.Length + 1..]
with _ -> ""
let getLines (bytes: byte array) len =
if len > 8 then
bytes.[..(len - 9)]
|> UTF8Encoding.UTF8.GetString
|> fun hs -> hs.Split([| "\r\n" |],
StringSplitOptions.RemoveEmptyEntries)
else
[||]
Функция RFC 6455 ответа называется handshake. Этой функциональности насколько мне известно нет в системных неймспейсах.
let acceptString6455 acceptCode =
"HTTP/1.1 101 Switching Protocols\r\n" +
"Upgrade: websocket\r\n" +
"Connection: Upgrade\r\n" +
"Sec-WebSocket-Accept: " + acceptCode + "\r\n\r\n"
let handshake lines =
(getKey "Sec-WebSocket-Key:" lines) + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
|> Encoding.ASCII.GetBytes
|> SHA1CryptoServiceProvider.Create().ComputeHash
|> Convert.ToBase64String
|> acceptString6455
|> Encoding.ASCII.GetBytes
Асинхронные процессы сервера
Первый процесс [start], представляет собой точку входа, где стартует сразу три процесса: процесс супервизора всех соединений [Sup], процесс сервера слушателя соединений [S] и, если включен флаг Server.ticker, процесс сердцебиений, который работает как интервальный циклический таймер [H]. Эпилог процесса [start] содержит кенселяцию токена глобального для всех подпроцессов при освобождении переменной, которая соджержит вебсокет-сервер во внешнем коде.
let start (addr: string) (port: int) =
let cts = new CancellationTokenSource()
let token = cts.Token
let sup = startSupervisor token
let listener = TcpListener(IPAddress.Parse(addr), port)
try
listener.Start(10)
with
| :? SocketException ->
failwithf "%s:%i is using by another program" addr port
| err ->
failwithf "%s" err.Message
Async.StartImmediate(listen listener token sup, token)
if ticker then Async.StartImmediate(heartbeat interval token sup, token)
{ new IDisposable with
member x.Dispose() = cts.Cancel() }
Второй процесс [Sup], супервизор является чистой функцией, которая обрабатывет сообщения супервижин протокола о регистрации и смерти новых соединений. Тут же происходит бродкаст сообщений как реакция на сердцебиение тикера.
let startSupervisor (ct: CancellationToken) =
MailboxProcessor.Start(
(fun (inbox: MailboxProcessor<Sup>) ->
let listeners = ResizeArray<_>()
async {
while not ct.IsCancellationRequested do
match! inbox.Receive() with
| Close ws -> ()
| Connect (l, ns) -> listeners.Add(l)
| Disconnect l -> listeners.Remove(l) |> ignore
| Tick -> listeners.ForEach(fun l -> l.Post Nope)
}),
cancellationToken = ct
)
Интервальный таймер сердцебиения [H].
let heartbeat (interval: int)
(ct: CancellationToken)
(sup: MailboxProcessor<Sup>) =
async {
while not ct.IsCancellationRequested do
do! Async.Sleep interval
sup.Post(Tick)
}
Главный цикл процесса [S], который принимает новые TCP соединения и стартует новых клиентов [C].
let listen (listener: TcpListener)
(ct: CancellationToken)
(sup: MailboxProcessor<Sup>) =
async {
while not ct.IsCancellationRequested do
let! client = listener.AcceptTcpClientAsync() |> Async.AwaitTask
client.NoDelay <- true
startClient client sup ct |> ignore
}
Асихронный процесс [C] с очередью (MailboxProcessor) обработки TCP соединений или, проще говоря, TCP клиент. Это точка входа для клиентского соединения, именно здесь происходит хендшейк. В случае успешного хендшейка мы шлем RFC 6455 ответ и запускаем сразу два асинхронных процесса: первый это сам цикл обработки вебсокет сообщений [L], а также, если установлен флаг Server.ticker мы запускаем процесс телеметрии [T], который разделяет WebSocket стрим и может осуществлять туда асинхронный сброс сообщений, конкурируя с основным циклом [L]. Такие процессы существуют всегда в паре.
let startClient (tcp: TcpClient)
(sup: MailboxProcessor<Sup>)
(ct: CancellationToken) =
MailboxProcessor.Start(
(fun (inbox: MailboxProcessor<Msg>) ->
async {
let ns = tcp.GetStream()
let size = tcp.ReceiveBufferSize
let bytes = Array.create size (byte 0)
let! len = ns.ReadAsync(bytes, 0, bytes.Length)
|> Async.AwaitTask
let lines = getLines bytes len
match isWebSocketsUpgrade lines with
| true ->
do! ns.AsyncWrite (handshake lines)
let ws =
WebSocket.CreateFromStream(
(ns :> Stream), true, "n2o", TimeSpan(1, 0, 0))
sup.Post(Connect(inbox, ws))
if ticker then Async.Start(telemetry ws inbox ct sup, ct)
return! looper ws size ct sup
| _ -> tcp.Close()
}),
cancellationToken = ct
)
Процесс телеметрии [T] слушает очередь, и на любое сообщение, шлет в вебсокет канал текстовое сообщение "TICK".
let telemetry (ws: WebSocket)
(inbox: MailboxProcessor<Msg>)
(ct: CancellationToken)
(sup: MailboxProcessor<Sup>) =
async {
try
while not ct.IsCancellationRequested do
let! _ = inbox.Receive()
do! send ws ct (Text "TICK")
finally
sup.Post(Disconnect <| inbox)
ws.CloseAsync(WebSocketCloseStatus.PolicyViolation, "TELEMETRY", ct)
|> ignore
}
Главный цикл обработки сообщений [L], в котором происходит создание буферизированого WebSocket стрима, тип которого явно присутствует в протоколе супервижина. Также здесь выделяется глобальный для всего цикла буфер, куда копируются байты их сокета с помощью ReceiveAsync. При возникновении исключения происходит нотификация супервизора с помощью сообщения Close, которое сигнализирует о разрыве соединения, например в случае ошибки валидации UTF-8.
let looper (ws: WebSocket)
(bufferSize: int)
(ct: CancellationToken)
(sup: MailboxProcessor<Sup>) =
async {
try
let mutable bytes = Array.create bufferSize (byte 0)
while not ct.IsCancellationRequested do
let! result =
ws.ReceiveAsync(ArraySegment<byte>(bytes), ct)
|> Async.AwaitTask
let recv = bytes.[0..result.Count - 1]
match (result.MessageType) with
| WebSocketMessageType.Text ->
do! protocol (Text (Encoding.UTF8.GetString recv))
|> send ws ct
| WebSocketMessageType.Binary ->
do! protocol (Bin recv)
|> send ws ct
| WebSocketMessageType.Close -> ()
| _ -> printfn "PROTOCOL VIOLATION"
finally
sup.Post(Close <| ws)
ws.CloseAsync(WebSocketCloseStatus.PolicyViolation, "LOOPER", ct)
|> ignore
}
Функции терминации канала наследуюет архаическое на мой взгляд разделение текстовых и бинарных сообщений. Как показывает практика трактовка всего как бинарных сообщений только улучшает семантику протокола.
let sendBytes (ws: WebSocket) ct bytes =
ws.SendAsync(ArraySegment<byte>(bytes),
WebSocketMessageType.Binary, true, ct) |> ignore
let send ws ct (msg: Msg) =
async {
match msg with
| Text text -> sendBytes ws ct (Encoding.UTF8.GetBytes text)
| Bin arr -> sendBytes ws ct arr
| Nope -> ()
}
Что дальше?
Дальше идут три фазы:
1) Контекст пира: порт, айпишник, хедеры, эндпойнт, служебная информация;
2) BERT сериализация для совместимости с клиентской инфраструктурой N2O;
3) Имплементация NITRO протокола.
Благодарности
Хочется поблагодарить всех, кто ставил лайки нашему проекту, а осоебнно Филлипа Картера, програмного менеджера .NET и F# ❤ Мы чрезвычайно воодушевлены!
Авторы
Максим Сохацкий, Игорь Городецкий, Siegmentation Fault