Процеси и OTP : GenServer


Какво е OTP?

Всъщност е много неща. От една страна OTP е платформата с която се разпространява Erlang. Версиите на Erlang са версии на OTP. Когато си пуснем iex, виждаме нещо такова : Erlang/OTP 19. Тоест Erlang и OTP са като едно цяло.

OTP е и стандартната библиотека на Erlang. Съкращението идва от Open Telecom Platform, но с времето OTP се е превърнало в нещо много по-голямо от платформа за писане на телекомуникационни програми. В момента изискванията за web разработка са много подобни на изискванията за писане на телекомуникационни програми. И OTP е решение.

Платформата OTP идва със следните неща:

  • Интерпретатор и компилатор на Erlang.
  • Стандартните библиотеки на Erlang.
  • Dialyzer, за който говорихме в Тип-спецификации и поведения.
  • Mnesia - дистрибутирана база данни.
  • ETS - база данни в паметта.
  • Дебъгер.
  • И много други…

Това, което ще разгледаме в тази и следващата статии са три абстракции, предоставени ни от OTP - Gen сървъри, Supervisor-и и OTP application-и.

Какво е GenServer?

В статията Тип-спецификации и поведения си говорихме за поведения. Дефинирахме поведение, наречен SynchronousCallHandler. Това поведение описваше процес, който държи състояние и предоставя начин за синхронно модифициране или четене на това състояние.

Оказва се че такова поведения, както и други, подобни са често срещано изискване при работа с Erlang. Именно затова OTP идва с поведение наречено gen_server. Elixir, от своя страна го адаптира като поведението GenServer.

GenServer представлява процес, представен от модул, който предлага функции за различни, често срещани случаи при работа с процеси. Функции за синхронна и/или асинхронна комуникация, функции, които се извикват при създаване на процес или преди унищожаването му, функции за нотифициране при грешки и за логване. Когато комбинираме GenServer-и със Supervisor процеси лесно можем да създадем fault-tolerant система.

С помощта на мета-програмиране GenServer е специален тип поведение - поведение с функции, които имат имплементации по подразбиране. За да направим от един модул GenServer, използваме use GenServer. Ще си говорим по-подробно за use, когато си говорим за мета-програмиране.

defmodule MyWorker do
  use GenServer
end

По този начин декларираме, че даден модул ще изпълни поведението GenServer. Това поведение и логиката около него ни дават следните възможности:

  • Да стартираме процес.
  • Да поддържаме състояние в този процес.
  • Да получаваме request-и и да връщаме отговори.
  • Да спираме процеса.

Нека започнем с пример и след това ще разгледаме самото поведение.

Пример : GenServer и Blogit - Posts component

Backend-ът на блога, който четете е пример за използване на OTP логика. Примерът е Blogit.Components.Posts, който е GenServer, чието състояние са всички блог-постове.

Blogit е OTP application, който от дадено git repository с markdown файлове създава блог с постове със съдържание, съдържанието на тези markdown файлове като HTML. Един Post представлява следната структура:

defmodule Blogit.Models.Post do
  alias Blogit.Models.Post.Meta

  @type t :: %__MODULE__{
    name: String.t, raw: String.t, html: String.t, meta: Meta.t
  }
  @enforce_keys [:name, :raw, :html, :meta]
  defstruct [:name, :raw, :html, :meta]
end

Структурата държи име на поста, което е уникален идентификатор, суровото markdown съдържание, html съдържанието и мета-информация.

Искаме да имаме компонент, който да държи всички постове от дадено git repostiry. Както и да можем да правим заявки към него като ‘дай пост по уникално име’, ‘дай всички постове сортирани по дата на създаване’, ‘осъвремени си постовете’ и други.

За това, разбира се, можем да ползваме Agent, но GenServer ни дава повече свобода и по лесен начин за правене на каквито и да е заявки към дадено състояние. Всъщност Agent е имплементация на GenServer.

Нека разгледаме Blogit.Components.Posts:

defmodule Blogit.Components.Posts do
  use GenServer

  alias Blogit.Models.Post

  # Client functions

  def start_link() do
    GenServer.start_link(__MODULE__, nil, name: __MODULE__)
  end

  # Server callbacks

  def init(_) do
    send(self(), :init_posts)
    {:ok, nil}
  end

  def handle_info(:init_posts, nil) do
    posts = GenServer.call(Blogit.Server, :get_posts)
    {:noreply, posts}
  end

  def handle_cast({:update, new_posts}, _) do
    {:noreply, new_posts}
  end

  def handle_call({:list, from, size}, _from, posts) do
    result = Map.values(posts)
             |> Post.sorted |> Enum.drop(from) |> Enum.take(size)

    {:reply, result, posts}
  end

  def handle_call({:by_name, name}, _from, posts) do
    case post = posts[name] do
      nil -> {:reply, :error, posts}
      _ -> {:reply, post, posts}
    end
  end
end

Това е скъсена имплементация, но можем да видим различни функции от GenServer и как се ползват те. Можем да разделим функциите на две категории:

  • Такива, които се извикват от процес-клиент.
  • Такива, които имплементират GenServer поведението.

Обикновено функцията за стартиране на GenServer процес се нарича start_link. В случаят по-горе се стартира GenServer имплементиран от текущия модул. Даваме му име - модула. По този начин ще можем да го адресираме така:

GenServer.call(Blogit.Components.Posts, {:list, 0, 5})

Това ще изпрати съобщение, което ще се обработи от функцията, дефинирана като handle_call({:list, from, size}, _from, posts).

Когато се стартира GenServer, ако сме му дефинирали init/1 функция, се извиква тя. Една такава функция може да преустанови изпълнението на процеса преди още да е започнал работа. Скоро ще видим какво може да връща тя. Засега е важно да знаем, че обикновено тя връща {:ok, <състояние>}. Можем да приемем init/1 като конструктор на GenServer процеса.

В горния случай сме избрали да отложим инициализацията за малко по-късно. Това е специфично за Blogit програмата и нейното устройство, но ако състоянието на един GenServer процес зависи от друг процес, това е начинът. Процесът ни трябва да е активен и готов да приема съобщения, което се случва точно след като init върне {:ok, <състояние>}.

Тъй като GenServer е просто процес, можем да му пращаме обикновени съобщения без особена структура със send. Което и правим. Такива съобщения се обработват с handle_info функции.

Нашата handle_info(:init_posts, nil), приема два аргумента съобщение и текущо състояние. Всяка от handle_* функциите може да промени състоянието на GenServer процеса, като го върне като последен елемент на кортеж-резултата си. В нашия случай, взимаме постовете от друг GenServer процес, за който ще говорим по-късно, използвайки GenServer.call/2. Тази функция изпраща съобщения синхронно - тоест чака за отговор. За handle_info е нормално да върне {:noreply, <ново–състояни>}. Асинхронните хендлъри връщат :noreply статуси, докато синхронните reply.

След като инициализацията се изпълни, процесът започва живота си, чакайки за съобщения. Това е имплементирано с безкрайна рекурсия, чакаща на receive.

Виждате, че имаме handle_cast и handle_call хендлъри на съобщения. Разликата е много проста: handle_cast се използват за асинхронна комуникация, а handle_call за синхронна. Всъщност ние имплементирахме проста версия на handle_call поведението в SynchronousCallHandler примера.

В Blogit.Components.Posts имплементацията, използваме handle_cast({:update, <ново-състояние>}, <текущо-състояние>) за да може друг процес да обнови постовете. Това се случва, когато има промяна в git файловете. Връщаме {:noreply, <ново-състояние>} за да променим състоянието. Можем да пратим съобщение, което ще се обработи от тази handle_cast функция така:

GenServer.cast(Blogit.Components.Posts, {:update, new_posts})

Всъщност състоянието представлява Map с ключове уникалните имена на публикации и стойности самите Blogit.Models.Post структури. Нека разгледаме:

def handle_call({:list, from, size}, _from, posts) do
  result = Map.values(posts)
           |> Post.sorted |> Enum.drop(from) |> Enum.take(size)

  {:reply, result, posts}
end

Това е заявка към списък от постове, сортирани по датата си на създаване (най-новите първо) от дадена позиция и колко на брой. Имплементацията взима стойностите на Map-a състояние, сортира ги, премахва тези преди индекса from и взима бройка равна или по-малка от size. Тъй като е синхронно извикване резултатът е {:reply, <резултат>, <състояние>}. Много подобно поведение на нашия SynchronousCallHandler.handle_call.

Виждате колко лесно може да се построи сървър на състояние със специфични съобщения и инициализация с помощта на GenServer. Целият код за рекурсията държаща състоянието и съпоставянето на съобщения към клаузи, който написахме в SynchronousCallHandler, а и по-сложни случаи е поет от GenServer. Затова е толкова популярен начин за създаване на процеси. В production Erlang/Elixir се ползва GenServer вместо spawn/spawn_link/spawn_monitor, защото поема доста boilerplate код.

Поведението GenServer

Ще разгледаме всяка от опционалните функции, които един GenServer може да имплементира. За дадена функция, ще разгледаме параметрите ѝ, поведението ѝ и какви резултати се очаква да връща.

init/1

@type args :: any
@type state :: any
@type reason :: any
@type timeout :: non_neg_integer

@type init_result ::
  {:ok, state} |
  {:ok, state, timeout | :hibernate} |
  :ignore |
  {:stop, reason}

@spec init(args) :: init_result

Можем да приемем init за конструктор на GenServer процеса. Тя приема аргументи от какъвто и да е тип и връща състоянието на процеса. За множество аргументи можем да използваме списък. Когато GenServer.start_link/3 се извика с даден модул, ако той дефинира init/1, то тя ще се извика като конструктор. Поведението по подразбиране, което се изпълнява, ако не дефинираме тази функция е да се използват аргументите като състояние:

def init(args), do: {:ok, args}

Функцията GenServer.start_link/3 взима модул като първи аргумент, аргументите, които подава на init/1 като втори и keyword списък от опции като трети. Такава опция видяхме по-горе - name. Има и други, както е видно от документацията.

Както казахме, ако върнем {:ok, <състояние>}, процесът стартира с това състояние. Ако пък върнем {:ok, <състояние>, timeout-в-милисекунди}, процесът ще стартира със състоянието, и ако за даденото време не получи никакво съобщение, ще получи автоматично съобщение :timeout, което може да прихване ако се дефинира handle_info(:timeout, <състояние>). Ако init/1 върне {:ok, <състояние>, :hibernate}, процесът ще хибернира.

Хиберниран процес остава хиберниран, докато не получи съобщение. Ако има съобщение в опашката му за съобщения - това е веднага. Хибернирането пуска Garbage Collector-а на heap-а на този процес. Това е добър отговор ако скоро не се очаква съобщение, защото GC операциите не са леки. Ако инициализирането зарежда голям ресурс в паметта на процеса и връща малка част от него е добре да върнем {:ok, <състояние>, :hibernate}

Ако init/1 върне :ignore, процесът ще излезе нормално и start_link ще върне :ignore.

Ако init/1 върне {:stop, reason}, процесът ще излезе с върнатата причина и start_link/3 ще върне {:error, reason}.

Когато init/1 върне някой от {:ok, state} вариантите, GenServer.start_link/3 ще върне {:ok, pid}.

Всъщност ние имплементирахме подобна функция за SynchronousCallHandler.

handle_call

@type from :: {pid, ref}
@type handle_call_result ::
  {:reply, reply, new_state} |
  {:reply, reply, new_state, timeout | :hibernate} |
  {:noreply, new_state} |
  {:noreply, new_state, timeout | :hibernate} |
  {:stop, reason, reply, new_state} |
  {:stop, reason, new_state} when reply: term, new_state: term, reason: term

@spec handle_call(request :: term, from, state :: term) :: handle_call_result

Функциите handle_call се извикват когато GenServer-а получи съобщение, изпратено от GenServer.call/3.

Функцията GenServer.call/3 изпраща съобщение до GenServer процес и чака за отговор. Това е синхронна комуникация. Първият ѝ аргумент е pid или име на GenServer процес, вторият - съобщението, което трябва да се изпрати, а третият е timeout в милисекунди. След като изтече този timeout, call спира да чака.

Какво получаваме е handle_call?

  1. Съобщението, по което може да се pattern match-ва и така да имаме много версии на handle_call.
  2. Наредена двойка от pid-а на извикващия процес и уникална референция за това извикване.
  3. Състоянието на GenServer процеса.

Ако върнем с {:reply, <отговор>, <състояние>}, ще върнем отговорът като резултат на GenServer.call\3 и ще продължим със състоянието, което връщаме. Можем да очакваме подобно на init/1 поведение ако timeout или :hibernate са част от резултата. При timeout процесът ще получи :timeout съобщение след зададеното време, ако няма нови съобщения, а при :hibernate ще се включи GC.

Ако отговорът е noreply, процесът извикал GenServer.call/3 няма да получи отговор и ще чака. Всеки процес може да отговори с GenServer.reply(from, reply). Нужно е само from да е точно тази наредена двойка, която е получена в handle_call. Има три основни причини да върнем noreply от handle_call:

  1. Защото сме отговорили с GenServer.reply/2 преди да върнем резултат. Това е когато знаем какво трябва да се отговори, но трябва да извършим някаква бавна операция преди да излезем от handle_call.
  2. Защото ще отговори след като handle_call е свършила изпълнението си. Това е ако все още не знаем какво трябва да отговорим. Да речем има заявка към друг GenServer, за която чакаме отговор.
  3. Защото някой друг процес трябва да отговори. Да речем някакъв background процес.

Връщаме :stop резултат, когато искаме да прекратим изпълнението на GenServer процеса. В този случай ще се извика terminate(reason, state) ако е дефинирана и процесът ще прекрати изпълнение с причина - зададената причина.

Поведението по подразбиране на handle_call ако не е дефинирана за дадено съобщение е да върне {:stop, {:bad_call, request}, state}.

handle_cast

@spec handle_cast(request :: term, state :: term) ::
  {:noreply, new_state} |
  {:noreply, new_state, timeout | :hibernate} |
  {:stop, reason :: term, new_state} when new_state: term

Както споменахме handle_cast функциите се изпълняват при асинхронна комуникация. Това става чрез извикването на GenServer.cast(pid|name, request), която винаги връща :ok и не чака за отговор.

Функциите handle_cast приемат изпратеното съобщение и текущото състояние. Могат да върнат :noreply резултати, които се държат по същия начин като тези на handle_call, но нямат reply част. Могат и да върнат :stop резултат, който ще извика terminate/2 ако е дефинирана и ще прекрати процеса с дадената причина.

Тези дефиниции най-често се използват за промяна на състоянието, както в нашия пример : handle_cast({:update, new_posts}, _).

handle_info

@spec handle_info(msg :: :timeout | term, state :: term) ::
  {:noreply, new_state} |
  {:noreply, new_state, timeout | :hibernate} |
  {:stop, reason :: term, new_state} when new_state: term

Използват се за прихващане на всякакви други съобщения, да речем такива, пратени със send. Приемат и връщат аналогични параметри/резултати на тези на handle_cast.

terminate

@type reason :: :normal | :shutdown | {:shutdown, term} | term
@spec terminate(reason, state :: term) :: term

Извиква се преди терминиране на GenServer процес. Причината идва от резултат от типа {:stop, ...} върнат от handle_* функциите, Или от exit сигнал, ако GenServer процеса е системен процес. Supervisor процеси могат да пращат EXIT съобщения, които да се предадат на тази функция.

code_change

Извиква се когато кодът на процеса се смени по време на изпълнение. Това е възможно при Erlang/Elixir. За повече информация прочетете в документацията

format_status

Използва се за специфично представяне на състоянието на GenServer процес. За повече информация вижте документацията.

Заключение

GenServer е лесен начин за писане на процеси, поемащ често-използвана логика. Имплементациите му се вписват добре в supervision дървото на OTP програмите. Ще си поговорим за Supervisor процесите и OTP програмите в следващите статии.