You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Способността две или повече задачи или подзадачи да се изпълняват едновременно, използвайки множество ресурси (процесорни ядра).
Скалируемост
Способността на една система да се справи с повишаването на натоварването чрез добавяне на хардуерни ресурси.
Конкурентност и паралелизъм
Конкуретност се отнася до структурата на една програма.
За да бъде една програма конкурентна, то задачите, които изпълнява трябва да могат да бъдат прекъсвани и възобновявани. Редът на изпълнение на подзадачите не е строго определен.
Паралелизъм се отнася до изпълнението на една програма.
Дали има или няма паралелизъм се определя по време на изпълнението и е пряко зависимо от хардуерните ресурси (брой процесорни ядра)
Пример за конкурентна програма:
Уеб сървър, който използва 1 процесорно ядро, но обслужва множество заявки, изпълнявайки частично всяка една заявка до нейното завършване.
Пример за паралелна програма:
Същия пример като горния, но разполагаме с 8 процесорни ядра. Уеб сървърът може да обработва 8 заявки наистина едновременно.
Конкурентност и паралелизъм
Една програма може да е конкурентна и паралелна.
Една програма може да е конкуретна, но не паралелна.
Една програма може да е паралелна, но не конкурентна.
Една програма може да е нито паралелна, нито конкурентна.
Конкурентност и паралелизъм
Кодът, написан на Elixir, е конкурентен и при възможност, паралелен.
iex --erl "+S 1" -S mix - Стартира проекта с 1 Scheduler; използва една нишка на едно процесорно ядро. Имаме конкурентност, но без паралелизъм
iex --erl "+S 8" -S mix (при изпълнение на процесор с 8 ядра) - Имаме конкурентност и паралелизъм.
Проблемите на конкурентността
Програмите са по-трудни за писане и поддръжка.
Синхронизация, координация, планиране на изпълнението.
Програмите са по-трудни за дебъгване.
Допълнително натоварване породено от постоянна смяна на контекста.
Конкурентните подзадачи отнемат по-дълго време за изпълнение при натоварване.
Не всички програми могат да бъдат написани конкурентно.
Защо има нужда от конкурентност?
По-пълноценно използване на ресурсите.
Подобряване на бързината на отговор от системата.
Позволява да изпълняваме повече задачи едновременно (според външен наблюдател) от броя налични процесорни ядра.
Не позволява на една или няколко задачи да монополизират ресурсите.
Комуникация между участници в конкурентна система
Чрез споделена памет
Пример: A и B са две програми, които се изпълняват на един компютър и споделят файловата система.
Пример: А и B са две нишки, които достъпват обща променлива-стек и добавят и премахват елементи от него.
Чрез изпращане на съобщения
Пример: Всичко в Elixir.
Пример: A и B са уеб браузър и уеб сървър - А изпраща заявки за уеб страница и ресурси на B; B обработва заявките и изпраща резултатите на A.
Процеси
Процесът винаги има адрес, понякога има и име.
Всичко в Elixir се изпълнява в процес.
Кодът, изпълняван в процеса, е последователен и функционален.
Ако искаме да изпълним конкурентно задача A и задача B, то трябва да ги изпълним в отделни процеси.
Когато един процес приключи своята работа, то той "умира".
Elixir проектът, чрез който виждате тази презентация, изпълнява ~320 процеса (по време на писането на презентацията).
Когато един процес свърши своята работа, то той бива терминиран.
Комуникация между процеси
Процесите изпращат съобщения: send/2.
Процесите получават съобщения: receive/1.
Съобщенията са асинхронни - send/2 винаги връща изпратеното съобщение, без да проследи получаването.
Трябва сами да имплементираме синхронна комуникация, когато е необходимо (или да използваме готови шаблони от библиотеката).
Всеки процес има пощенска кутия (mailbox) - място, където се съхраняват съобщенията, които са изпратени до него.
Send
send/2 - изпраща съобщение до процес.
Първият аргумент е pid на процеса, към който се изпраща съобщението.
Вторият аргумент е самото съобщение.
Всеки валиден тип в Elixir може да бъде съобщение.
send(self(), :hello) - изпраща съобщение към себе си.
flush() - само в iex - чете всички съобщения на текущия процес.
Receive
receive дефинира, подобно на case, множество шаблони, които се съпоставят последователно.
Ако някой от шаблоните се съпостави, то кодът му се изпълнява и receive завършва.
Ако никой от шаблоните не се съпостави, то receive блокира процеса:
Докато не се получи съобщение, което да се съпостави с някой от шаблоните.
Докато не изтече таймаут, дефиниран с after.
Ако отляво има само име на променлива, то всяко съобщение успешно се съпоставя с него.
# Чете всяко съобщение, без да го съпоставя с някой шаблонreceivedomsg->IO.inspect(msg)end# Чете само съобщения, които са наредени двойки с първи аргумент :helloreceivedo{:hello,from}->IO.inspect(from)end# Чете съобщения, които са стринговеreceivedomsgwhenis_binary(msg)->IO.inspect(msg)end
pid=spawn(fn->IO.puts(DateTime.utc_now())# Чака за съобщение :some_msg. Ако не го получи в рамките на 5 секунди,# изпълнява кода в after.receivedo:some_msg->:okafter5_000->IO.puts("#{DateTime.utc_now()} Terminating because of timeout!"):timeoutendend)# 2023-03-08 22:30:42# 2023-03-08 22:30:47 Terminating because of timeout!
pid=spawn(fn-># selective receive - съпоставя съобщенията с шаблониreceivedo{:pattern_1,from}->send(from,:received_pattern_1){:pattern_2,from}->send(from,:received_pattern_2):print_info->IO.inspect(Process.info(self(),:messages),label: "Messages")endend)send(pid,{:pattern_100,self()})send(pid,{:pattern_101,self()})send(pid,{:pattern_101,self()})send(pid,{:pattern_2,self()})flush()# :received_pattern_2
При множество паралени процеси, писането в поща на друг процес не е безопасно.
m-buf:
Парче памет извън хийпът, където други процеси могат да пишат безопасно.
Когато цялото съобщение е копирано, съобщението в m-buf се свързва с пощенската кутия.
Използва се когато lock-ът на пощенската кутия е взет от друг.
Изпращане на съобщение
Пресмята размера на Msg.
Алокира място за съобщението (в хийпа на P2 или в m-buf).
Копира съобщението от хийпа на P2 в алокираното място.
Алокира и свързва структура ErlMessage, обвиваща съобщението.
Свързва ErlMessage с пощенската кутия.
Гаранции при изпращане на съобщение
Почти никакви. Приема се, че съобщенията могат да не бъдат доставени.
send връща директно, без да се интересува дали съобщението е получено.
Ако процеси A, B, и C изпращат съобщения на D, то няма гаранции за реда на получаване.
Ако процес A изпраща съобщения a1, a2 и a3 на процес B, то е гарантирано, че съобщенията ще бъдат получени в този ред (ако бъдат получени).
Group Leader
Всеки процес участва в някаква група и всяка група има лидер.
Процес се присъединява към групата на процеса, който го е създал.
Всички IO операции се пренасочват към груповия лидер.
printer_pid=spawn(fn->receivedo{:print,text}->IO.puts(text<>" from #{self()}!"endend)IO.puts("Hello from #{self()}!")#=> Hello from #PID<0.111.0>!# `self()` и `pid` имат общ групов лидер, затова виждаме IO.puts# изпълнен в процес, различен от текущияsend(printer_pid,{:print,"Hello"})#=> Hello from #PID<0.1303.0>!send(printer_pid,{:print,"1 + 2 = 3"})#=> 1 + 2 = 3 from #PID<0.1303.0>!
Грешки в друг процес
fun=fn->{:error,"something went wrong"}endspawn(fn->:ok=fun.()end)spawn(fn->1/0end)spawn(fn->raise"error"end)spawn(fn->throw10end)spawn(fn->exit(:normal)end)
Можем само да видим принтирана грешка на екрана заради общ групов лидер
Поради изолацията на процесите, грешките в тях не се пренасят на други процеси.
Връзки между процеси - link
link - специална двупосочна връзка между два процеса.
Когато единият процес приключи своята работа неочаквано, то всички свързани с него процеси също терминират.
Неочаквано: Когато процесът терминира чрез exit, throw или raise;
MatchError, FunctionClauseError, ръчно извикване на exit/throw.
Свързване на един процес с друг става:
по време на създаването му чрез spawn_link/{1,3};
след създаването му чрез Process.link/1 (свързва текущия процес с друг процес).
spawn_link/{1,3} е атомарна операция, spawn/{1,3} + Process.link/1 не е.
self()#PID<0.111.0>spawn_link(fn->raise"error"end)# ** (EXIT from #PID<0.111.0>) shell process exited with reason: an exception was raised:# ** (RuntimeError) error# iex:1: (file)# # 18:32:45.788 [error] Process #PID<0.113.0> raised an exception# ** (RuntimeError) error# iex:1: (file)
Връзки между процеси - link
Ако процесите са толкова изолирани, защо link убиваа всички свързани с него процеси?
По този начин не разпространяваме ли грешките, вместо да ги изолираме и обработваме?
Можем да превърнем грешките в инструменти!
Process.flag(:trap_exit, true)
Преобразува грешките в съобщения;
Изпраща съобщение, също и когато процесът приключи успешно.
self()#PID<0.111.0>Process.flag(:trap_exit,true)spawn_link(fn->raise"error"end)# Процес PID<0.111.0> не умира# 18:40:18.477 [error] Process #PID<0.115.0> raised an exception# ** (RuntimeError) error# iex:3: (file)flush()# {:EXIT, #PID<0.115.0>,# {%RuntimeError{message: "error"},# [{:elixir_eval, :__FILE__, 1, [file: 'iex', line: 3]}]}}
self()spawn_link(fn->:okend)flush()# Когато свързаният процес приключи успешно и нямаме trap_exit=true# нашият процес не получава нищо
link има нужда от trap_exit, за да не убива свързаният процес.
Може да имаме monitor-и от процес A към процес B (полезно е при писане на библиотеки).
monitor е "неинваразивен" - не влияе на живота на наблюдавания процес.
link се отнася повече към организацията на процесите и разпространението на грешките.
Отношението между уеб сървър <-> заявка.
Ако уеб сървърът терминира, то заявката също трябва да терминира. Обратното не е вярно.
Шаблони
# Синхронна комуникация чрез асинхронни съобщенияpid=spawn(fn->receivedo{:get_state,pid}->send(pid,{:state,"state"})endend)send(pid,{:get_state,self()})receivedo{:state,state}->stateend
# Синхронна комуникация чрез асинхронни съобщенияpid=spawn(fn->receivedo{:get_state,pid,ref}->send(pid,{:state,ref,"state"})endend)ref=make_ref()send(pid,{:get_state,self(),ref})receivedo{:state,^ref,state}->stateend
# Синхронна комуникация чрез асинхронни съобщенияpid=spawn(fn->receivedo{:call,{pid,ref},msg}->send(pid,{:reply,ref,msg}endend)ref=Process.monitor(pid)send(pid,{:call,{self(),ref},msg})receivedo{^ref,reply}->Process.demonitor(ref,[:flush])reply{:DOWN,^ref,:process,^pid,status}->exit(status)end
# Синхронна комуникация с timeoutpid=spawn(fn->receivedo{:call,{pid,ref},msg}->Process.sleep(Enum.random(0..10_000))send(pid,{:reply,ref,msg}endend)ref=Process.monitor(pid)send(pid,{:call,{self(),ref},msg})receivedo{^ref,reply}->Process.demonitor(ref,[:flush])reply{:DOWN,^ref,:process,^pid,status}->exit(status)after5_000->Process.demonitor(ref,[:flush])raise"timeout"end
# "Mutable" statedefmoduleCounterdo# Стартираме нов процес, който изпълнява loop(0)defstart_link()dospawn_link(__MODULE__,:loop,[0])end# Интерфейс, който симулира синрхонна комуникация.# Потребителят извиква get_next(pid), който вътрешно# изпраща съобщение до процеса pid и чака отговор.defget_next(pid)doref=make_ref()send(pid,{:get_counter,self(),ref})receivedo{:counter,^ref,counter}->counterendend# Безкрайна рекурсия, която чака съобщение, обработва го,# извиква себе си с нова стойност на counter, и отново# чака съобщение. Опашкковата (tail) рекурсия е задължителна,# за да не се получи StackOverflow.defloop(counter)doreceivedo{:get_counter,pid,ref}->send(pid,{:counter,ref,counter})loop(counter+1)# StackOverflow???endendendpid=Counter.start_link()IO.puts(Counter.get_next(pid))# => 0IO.puts(Counter.get_next(pid))# => 1IO.puts(Counter.get_next(pid))# => 2
# Sneak peek: GenServer синхронна комуникация и mutable statedefmoduleCounter2douseGenServerdefstart_link(opts)doGenServer.start_link(__MODULE__,%{},name: Keyword.get(opts,:name))enddefinit(_),do: {:ok,%{counter: 0}}defhandle_call(:get,_from,state)do{:reply,# Ще отговорим на извикващия процесstate.counter,# Това е отговорътMap.update!(state,:counter,&&1+1)# Новото състояние}endend{:ok,pid}=Counter2.start_link(name: :counter)IO.puts(GenServer.call(:counter,:get))# => 0IO.puts(GenServer.call(pid,:get))# => 1IO.puts(GenServer.call(:counter,:get))# => 2
Именуване на процеси
В някои случаи е по-удобно вместо променящ се pid, да имаме име на процеса.
Ако един процес е уникален/специален, то може да му дадем име.
Ако един уеб сървър обработва всяка заявка в отделен процес, то тези процеси не трябва да имат име.
Има различни механизми за именуване на процеси.
:erlang.register/2;
:global.register_name/2;
Registry модул;
pg2;
Библиотеки: swarm, horde и др.
pid=Counter.start_link()# От предишните слайдове:erlang.register(:counter,pid)# Изпраща съsend(:counter,{:get_counter,self(),make_ref()})send(pid,{:get_counter,self(),make_ref()})resolved_pid=:erlang.whereis(:counter)send(resolved_pid,{:get_counter,self(),make_ref()})flush()# {:counter, #Reference<0.997260736.984612865.46599>, 0}# {:counter, #Reference<0.997260736.984612865.51911>, 1}# {:counter, #Reference<0.997260736.984612865.62775>, 2}