Az előző cikkben megismerhettük a .NET 4-gyel kapott új Thread Pool-t, illetve röviden találkoztunk a Task-ok és a TaskScheduler fogalmával. Megtudtuk, hogy az új Thread Pool jelentős fejlesztéseken ment keresztül. Ez a fejlesztés annak érdekében történt, hogy a már jelenleg is tapasztalható tendencia, miszerint a számítógépekben egyre több processzormag található, könnyebben kiaknázható legyen. A Task-ok szolgálnak arra, hogy a feladatainkat kisebb darabokra tördeljük, mely darabokat ezután az új Thread Pool hatékonyan szétosztja a processzormagok között. Ez a kép így elég tiszta, azonban leegyszerűsített. A Task-ok végrehajtásában több osztály/mechanizmus vesz részt, és ezek különböző absztrakciós szinteken helyezkednek el.
Task-okról röviden
Ennek a cikknek a témája a TaskScheduler. Task-ok nélkül a TaskScheduler azonban nem tárgyalható. Emiatt ez a rész minimális szinten ismerteti a Task-okat, teljes egészében pedig egy későbbi cikk foglalkozik velük.
A Task-ok megértéséhez vissza kell emlékeznünk az előző cikkben felmerült problémákra. Az egyik probléma a granularitás volt. Ha egy program nem osztható szét megfelelően kicsi, párhuzamosan végrehajtható részekre, akkor a processzormagok kihasználtsága nem biztosítható kellőképpen. Ezeket a “megfelelően kicsi, párhuzamosan végrehajtható részeket” képviselik a Task-ok.
Kicsi, végrehajtható részeket a delegate-tek is le tudnak írni, és akár még párhuzamosan is futtathatóak a fordító által generált BeginXX függvényekkel, vagy egyszerűen a Thread Pool-ra dobálva őket. Csak egy számítás vagy egy feladat leírására ezért nem érné meg egy új koncepciót bevezetni. Nem mindig lehetséges azonban egy programot/számítást/algoritmust egy lépésben és egymástól független részekre bontani. Sokkal valószínűbb az az eset, amikor egy számítás/algoritmus elkezdődik, majd a futásának egy bizonyos pontján adódik olyan helyzet, hogy az algoritmus több, párhuzamosan futó szálon folytatható. Tipikus és gyakran bemutatott példája ennek a QuickSort algoritmus, mely első lépésében egy vektort két részre oszt, majd a két részvektoron működik tovább – akár párhuzamosan.
Az is gyakori, hogy a párhuzamosan elvégzett taszkok eredményét egy további taszk összegyűjti, és azon dolgozik tovább egymaga. Ehhez vagy egy már elindított taszknak a futását kell addig felfüggeszteni, amíg az összes függést okozó taszk le nem fut, vagy eleve csak akkor kell a taszkot elindítani, amikor függést okozó taszkok lefutottak. A taszkok között tehát függőségeket lehet felírni, és a .NET Task osztálya ezeket a függőségeket képes ábrázolni. A következő néhány példa segít megérteni az alapeseteket.
Az első esetben nincsenek függőségek. A taszk programkódja a futása közben létrehoz és elindít új taszkokat, melyek azután a szülőtaszktól függetlenül futnak tovább, esetleg újabb taszkokat létrehozva. Mindez a hagyományos delegétekkel is elérhető:
A kék vízszintes sávok a taszkokat jelképezik, időben balról jobbra haladva. Futás közben különböző függvényeket hajtanak végre. Új taszkok akkor keletkeznek, amikor a sárga alapban levő programkód fut le. Az ábrán leírt esetet a következő felépítésű kóddal lehet létrehozni:
... int r = CalculateSomething(); ... Task.Factory.StartNew(() => CalculatePart(r - 1)); Task.Factory.StartNew(() => CalculatePart(r + 1)); ... DoOtherThings();
A programkód tehát számol valamit a CalculateSomething() hívással, ezután elindít két párhuzamos számítást (a két Task.Factory.StartNew() hívás), amelyek esetleg a Thread Pool szálain fognak lefutni, miközben az eredeti szál folytatja a futását egyéb műveletekkel (DoOtherThings).
Más esetekben egy Task lefutása után szeretnénk, ha elindulna egy vagy több másik Task. Ehhez hasonlót a delegate-ek is tudnak, hiszen minden delegate-et leíró osztálynak a fordító generál egy függvényt, amelyik BeginXX() névvel rendelkezik, és megatható neki egy callback delegate. Mivel minden a programozó által definiált delegate a MulticastDelegate osztályból származik, ami pedig képes több függvényre referálni, a callback delegate takarhat többszörös függvényhívást is. Ugyanakkor a delegate-ek esetében az összes callback ugyanazon a szálon fog lefutni, míg a Task-ok esetében ez nem feltétlenül igaz. A következő ábrán látható lefutást tehát már csak a Task-ok támogatják:
Taszkok esetében a fenti ábrán látható szervezést a következő kóddal érhetünk el:
Task<int> precalculation = new Task<int>(() => Precalculation()); precalculation.ContinueWith(task => Calculation(task.Result - 1)); precalculation.ContinueWith(task => Calculation(task.Result + 1)); precalculation.Start();
Ebben az esetben készül egy precalculation nevű taszk. Ehhez hozzá lesz rendelve “continuation” taszkként két új taszk, amelyek a ContinueWith() függvénynek átadott delegate-ek alapján jönnek létre. A “continuation” taszkok csak akkor kezdik el a futásukat, ha a taszk, amihez continuation taszként lett rendelve, lefutott. Ezután indítjuk az előszámítást végző taszkot a Start() hívással. Hogy pontosan mi történik ilyen esetekben, pontosan kinyomozzuk a következő cikkben. Most csak annyi érdekes, hogy lássuk, a taszkoknak milyen függőségeik lehetnek.
Más esetekben pont fordított függésre van szükség. Egy mátrix szorzását például egyszerű több taszkra szétbontani, ugyanakkor a szorzás eredményét valószínű fel szeretnénk használni. Az eredményt feldolgozó taszknak meg kell várnia az összes taszkot, ami a mátrix szorzásán dolgozik. Ekkor a Taszkok a következőhöz hasonló módon vannak szervezve:
Az ezt megvalósító kód pedig:
Task part1 = Task.Factory.StartNew(() => CalculatePart(p1);
Task part2 = Task.Factory.StartNew(() => CalculatePart(p2);
Task part3 = Task.Factory.StartNew(() => CalculatePart(p3);
Task.Factory.ContinueWhenAll(new[] { part1, part2, part3 }, (tasks) => FinishCalculation(tasks));
Itt elindul három független taszk (part1, part2, part3), majd készítünk egy olyan taszkot, ami akkor indul el, ha mind a három taszk befejezte a működését.
Mit tudunk tehát a Taszkokról?
Amint kiderült, a taszkok lényegében becsomagolt delegate-ek, amelyek között különböző típusú függőségeket határozhatunk meg. Ezek a függőségek nagyban hatással vannak a taszkok végrehajtási sorrendjére. Van a taszkoknak egyéb más szolgáltatása is, amivel itt nem kell foglalkozni. Ilyenek például a taszkok futásának megszakíthatósága (cancellation), és taszkok hierarchiáján is működő kivételek (exceptions). Ezekkel későbbi cikkben foglalkozunk.
Task Schedulerek
Az előző cikkben egyszerre tárgyaltuk az új Thread Pool-t, ami aszerint lett optimalizálva, hogy kisebb feladatokat is hatékonyan elvégezzen. Ugyanitt lett először bevezetve a kisebb feladatokat leíró Taszk fogalma, emiatt úgy tűnhet, hogy ez a két dolog szorosan összefügg. Ez igazából jól is tűnik, és általában igaz az, hogy a Task-okat a taszkok feldolgozására optimalizált Thread Pool fogja végrehajtani.
A Taszk azonban egy általánosabb fogalom, és független kellene, hogy legyen attól a mechanizmustól, ami azokat végrehajtja. Nem is kell eröltetett példákat keresni, foglalkoztunk már a GUI-s programok speciális igényeivel, és a SynchronizationContext-tel. Egy GUI-s alkalmazásnál vigyázni kell azzal, hogy egy feladat melyik szálon fut le. Ugyanakkor miért ne lehetne használni a GUI közelében az amúgy önmagában is hasznos koncepciót, a Task-ot?
GUI-s alkalmazás közelében a Task-okat viszont csak úgy lehet használni, ha elérhető, hogy a Task-ok, vagy legalábbis bizonyos Task-ok ne a Thread Pool Thread-eken, hanem a GUI Thread-en fussanak le. Megoldható lenne ugyan, hogy a Task-ok belül egy SynchornizationContext-et használva a vezérlést a GUI szálra irányítsák, vagy egyéb módon jelezhetné a Task a speciális igényeit, ez azonban kicsit belecsúnyítana a Task-ok koncepciójába.
A másik lehetőség, hogy a Task-ok végrehajtása a Task-ok ábrázolásától különböző absztrakciós szintre tartozik. Ez az absztrakciós szint a Task Scheduler-eké.
Amikor egy Task-ot létrehozunk, vagy a futását elindítjuk, akkor ahhoz megadhatunk egy TaskScheduler példányt. Ha ezt mi nem tesszük meg, a Task felveszi annak a Task-nak a Scheduler-jét, ami Task őt létrehozta, vagy ha ilyen nincs – mert mondjuk a Task-ot nem egy másik Task-ból hozzuk létre – akkor felveszi a Default Task Scheduler-t. A Default Task Scheduler egy egyszerű statikus változó, a TaskScheduler.Default property-n keresztül érhető el, és a TaskScheduler statikus konstruktora állítja be egy olyan példányra, ami a Task-okat a Thread Pool-ra irányítja. Ebből adódik, hogy ha egyáltalán nem foglalkozunk a Scheduler-ekkel, akkor a Task-jaink a Thread Pool Thread-jein fognak lefutni.
Összefoglalva tehát, minden Task-hoz tartozni fog egy Task Scheduler, vagy a programozó által megadott, vagy a szülő Task-hoz rendelt, vagy a Default.
Scheduler-e a Task Scheduler?
A Scheduler elnevezés kicsit félrevezeti az embert, amikor azt próbálja összerakni, hogy mi is ennek az osztálynak a szerepe. Azt gondolhatnánk, hogy ez az osztály dönti el, hogy melyik Task mikor hogyan fog lefutni. Nos, lehetne a Task Scheduler-eknek olyan implementációja, ami ezt tényleg meg is teszi, azonban a .NET-tel kapott Scheduler-ek igazából nem Scheduler-ek abban az értelemben, hogy semmit nem ütemeznek.
Mit csinálnak akkor? Többnyire csak egy egységes felületet adnak a Task-ok felé, hogy azok néhány alapműveletet el tudjanak végezni. Mik ezek az alapműveletek? Amikor egy Task-ot elindítunk (a programkódunk elindítja, akár egy másik Task kódjából, akár máshonnan), akkor a Task veszi a hozzárendelt TaskScheduler-t, és meghívja a QueueTask() függvényét. Tehát egy Task indítása alapesetben azt jelenti, hogy a TaskScheduler-t megkéri a Task, hogy tegye egy végrehajtási sorba.
Hogy ezután mi történik, az az aktuálus TaskScheduler-től függ. Az a TaskScheduler, amelyik a .NET Thread Pool-t használja, (egy-két flag ellenőrzése után) a Thread Pool-nak adja tovább a Task-ot, és az ütemező logika a Thread Pool-ban van implementálva.
Ha olyan Task Scheduler van a Task-hoz hozzárendelve, ami a GUI alkalmazások számára készült, akkor a Scheduler egy korábban eltárolt SynchronizationContext segítségével a GUI szálnak post-olja a Task-ot (némi delegate-be csomagolás után, mivel a Synchronization Context nem ismeri a Task koncepcióját).
Mind a két .NET-hez adott Task Scheduler tehát szimplán továbbhárítja az ütemezés felelőségét egy alatta lévő osztálynak/mechanizmusnak, inkább egyfajta adapterként működve a Task-ok felé, mint valódi ütemezőként. Ez azonban csak implementációs részlet, elképzelhető olyan implementáció, ami valódi ütemezést valósít meg.
Absztrakciós szintek megkavarva
A TaskScheduler osztályok képviselik tehát azt az absztrakciós szintet, ahol a feladatok ütemezése történik. Ez a nagykönyv szerint szép is lenne, de ritka az olyan gyakorlati helyzet, hogy a nagykönyvi módszerek keresztbe patkolása a gyakorlatban ne működne praktikusabban/hatékonyabban. Ez így van a Task-ok, TaskScheduler-ek esetében is, így van pár helyzet, amikor a Task beleszólhat az ütemezésbe.
Az egyik ilyen helyzet a Task Continuation-ok ütemezése. A Task Continuation egy olyan Task, amit akkor kell lefuttatni, amikor egy másik Task már lefutott. Ezeket írtuk le egy korábbi példa kódban a ContinueWith() függvényhívással. Elvileg ezeket a Continuation Task-okat át lehetne adni az ütemezőnek, ami szépen gondoskodhatna róla, hogy amíg a szükséges feltételek nem teljesülnek, nem futtatja le az adott Task-okat. A valóság azonban az, hogy ez a logika a Task-okban van implementálva. Egy Task osztály példányhoz hozzá lehet rendelni egy vagy több taszkot, amiket le kell futtatni, ha a taszk futása befejeződott. Ez koncepcionálisan a completion callback függvényekhez hasonlít. Minden Task példány a belső megvalósításában rendelkezik egy listával, amely az ő Continuation taszkjait fűzi fel. Ezek a Continuation Task-ok nem kerülnek a Scheduler queue-ba egészen addig, amíg a Task által képviselt feladat le nem futott. Amikor a feladat készen van, a Task osztály implementációja fogja a Continuation listát, és az elemeit a Scheduler queue-ba helyezi a TaskScheduler.QueueTask() hívással. Ezzel nagyon sok munkát levesz a Scheduler (illetve a Scheduler alatt levő) példánytól – azon az áron, hogy nem élesek az absztrakciós szintek határai.
Egy másik eset, amikor a Task beleszól az ütemezésbe, az az, amikor várakoznia kell másik Task(ok) lefutására. Ebben az esetben a Task kódja, mielőtt elkezdene valójában várakozni, megkéri a TaskScheduler-t, hogy “inline”-olja be a Task-okat, azaz azonnal hajtsa őket végre az aktuális szálon. Ez az “inline”-olás persze nem mindig lehetséges, triviális példa, ha a Task futása már elkezdődött.
Gyári Task Scheduler osztályok
A .NET két előre csomagolt Task Scheduler osztállyal érkezik, lefedve ezzel azokat funkciókat, amelyek eddig is megvalósíthatóak voltak a Thread Pool mezitlábas használatával illetve a SynchronizationContext osztály használatával. A Thread Pool-on alapuló implementáció neve ThreadPoolTaskScheduler, és ez lesz a default osztály, amit egy Task kap, ha nem definiálunk neki külön egy másikat, illetve nem tudja örökölni az éppen futó taszkét. Azok a Task-ok, amelyek a GUI-t manipulálják, a SynchronizationContextTaskScheduler egy példányát kell kapják. A SynchronizationContextTaskScheduler-t használó taszkok kódjában egy kicsit jobban oda kell figyelni. Ha egy ilyen taszk újakat hoz létre anélkül, hogy definiálná a Schedulert, akkor minden újonnan létrehozott taszk szintén a SynchronizationContextTaskScheduler példányt örökli, és így minden egyes taszk a GUI szálon fog lefutni, lényegében egyszálúsítva az alkalmazást. Ha van tehát egy GUI-t kezelő taszk, és az széttörhető több másik taszk-ra, amelyek egyenként nem nyúlnak a GUI-hoz, külön meg kell adni, hogy például a ThreadPoolTaskScheduler-t használják.
Nézzük a következő példát, amely egy alkalmazás eseménykezelő függvénye:
private void Button_Click(object sender, RoutedEventArgs e)
{
Task.Factory.ContinueWhenAll(
new[] { Task.Factory.StartNew(() => Calculate(1)),
Task.Factory.StartNew(() => Calculate(2)),
Task.Factory.StartNew(() => Calculate(3))},
tasks =>
{
StringBuilder result = new StringBuilder();
foreach (var t in tasks)
{
result.Append(t.Result.ToString());
result.Append(" ");
} // foreach
this.TextBlock.Text = result.ToString();
},
CancellationToken.None,
TaskContinuationOptions.None,
TaskScheduler.FromCurrentSynchronizationContext());
}
private int Calculate(int p)
{
Thread.Sleep(2000);
return Thread.CurrentThread.ManagedThreadId;
}
Mi történik a kódban? Mielőtt a Task.Factory.ContinueWhenAll() meghívódik (03 sor), kiértékelődnek a paraméterei. Ehhez létrejön az első paraméternek átadott tömb (05), és elkezdődik a tömb inicializálása. Ehhez egymás után meghívódik a három Task.Factory.StartNew() függvény (05-07). Ez egy TaskFactory osztálypéldány StartNew() függvénye, ami több lépésben próbálja meghatározni, hogy a létrehozandó Task példányhoz milyen TaskScheduler példányt rendeljen.
A TaskFactory példányosításakor meg lehet adni egy TaskScheduler példányt, ekkor a Factory minden létrehozandó Task-hoz ezt rendeli. A Task osztály statikus konstruktora, amelyik a Task.Factory property számára létrehozza a TaskFactory-t, nem ad meg TaskScheduler-t, ezért a példakód hívásánál a TaskFactory egy második lépés is végrehajt a TaskScheduler meghatározásához. Megvizsgálja, hogy az éppen futó szál nem-e egy Task-ot hajt végre éppen. Ha igen, elkéri ennek a Task-nak a Scheduler példányát, és ezt rendeli az új Task-hoz is.
Ha nincs Task végrehajtás alatt, ami a mi példánkban most a helyzet (hiszen a GUI szálon egy eseménykezelőben vagyunk), akkor a TaskScheduler.Default értéket veszi a TaskFactory, ami pedig egy, ThreadPoolTaskScheduler példány lesz. A három létrehozott Task példány az 05-07 sorban, amelyek a Calculate függvényt fogják meghívni, a ThreadPoolTaskScheduler-en keresztül lesznek végrehajtva, így a Thread Pool szálaira kerülnek.
Elindult tehát a három taszk, és az őket reprezentáló Task példányokat most már át lehet adni a Task.Factory.ContinueWhenAll() hívásnak. Hogy pontosan mi történik egy ilyen híváson belül, meg fogjuk nézni a következő, Task-okkal foglalkozó cikkben, itt most elég annyi, hogy a hívás elrendezi, hogy amikor lefutott mind a három taszk, a második paraméterben (09-20) megadott delegate-et magába foglaló Task el fog indulni. Hogy melyik szál futtatja ezt a Task-ot, azt a függvényhívás utolsó paramétere határozza meg (24 sor), ez a TaskScheduler lesz ugyanis a létrejövő Task-hoz hozzárendelve. Ezt a paramétert el lehetne hagyni, ekkor az előbb ismertetett folyamat menne végbe a TaskScheduler felderítésére. Most azonban a paraméterben átadott Scheduler lesz a létrejövő taszk Scheduler-e.
A 24-ik sorban a TaskScheduler.FromCurrentSynchronizationContext() hívás létrehoz egy SynchronizationContextTaskScheduler() példányt, ami pedig egyszerűen a SynchronizationContext.Current értékét tárolja el, és használja később. Ezzel az értékkel már találkoztunk egy korábbi cikkben. A mintakód egy WPF-es alkalmazásból származik, így a SynchronizationContext.Current értéke egy DispatcherSynchronizationContext példány lesz. Ennek a DispatcherSynchronizationContext példánynak a Post() függvénye egy delegate-et képes lefuttatni a GUI szálon, és a SynchronizationContextTaskScheduler pont ezt használja ki.
Amikor a példakód három taszkja lefutott, az utoljára lefutott taszk a 24-ik sorban átadott TaskScheduler-en keresztül átadja feldolgozásra a 09-20 sorban megadott delegate-t becsomagoló taszkot. Mivel egy SynchronizationContextTaskScheduler-ről van szó, ez a QueueTask() hívásában a korábban eltárolt DispatcherSynchronizationContext.Post()-ot fogja használni a Task végrehajtására, ez pedig egy korábbi cikkben felvázolt módon, a Message Queue segítségével hajtatja végre a taszkot a GUI szálon. Mivel a 09-20 sorban leírt delegate a GUI szálon fog futni, az 19-ik sorban levő TextBlock.Text property kezelése nem fog problémákba ütközni, ugyanakkor a három Calculate() függvényt futtató taszk a Thread Pool-on fog párhuzamosan futni.
Ha a folyamat bonyolultnak tűnt, az alábbi ábra segíthet a megértésben. Első lépésben elindul az üzenetkezelő függvény, amelyet a GUI szál message loop-ja hív meg (1). Az üzenetkezelő függvényben elsőként a Task.Factory.StartNew() hívások futnak le (az ábrán csak egy hívás szerepel) (2). A StartNew() hívás létrehoz egy Task példányt, és megfelelően inicializálja. Az inicializáció lépéseként eltárolja a megfelelő delegate-et, ami a taszk által végrehajtandó kódra mutat, illetve a fentebb említett szabályok alapján hozzárendel egy TaskScheduler példányt, ebben az esetben egy ThreadPoolTaskSchedulert. (3)
Amikor a Task példány létrejött, a TaskFactory StartNew() függvénye meghívja a Task.ScheduleAndStart() függvényét (4). A bevezetett absztrakciós szintek miatt a Task nem maga gondoskodik a futtatásáról, hanem rábízza ezt a hozzárendelt TaskScheduler-re, így meghívja annak a QueueTask() függvényét, ami ebben az esetben a Task-ot a Thread Pool feladatsorába helyezi (6). Ezután a vezérlés visszakerül az üzenetkezelő függvénybe (7), ami most végrehajtja a ContinueWhenAll() hívást (8). A ContinueWhenAll() hivás létrehoz egy Task példányt a megadott delegate-tel, és a Task példányhoz egy SynchronizationContextTaskScheduler-t rendel, mivel az a függvény bemenő paramétere (9). Miután megvan az új taszk, a ContinueWhenAll() függvény úgy konfigurálja a taszkokat, hogy az új taszk elinduljon a többi lefutása után. Ez a következő cikkben jobban részletezendő Task Continuations mechanizmusával éri el (10). A GUI szál ezekután visszatér a Message Loop-ba (az ábrán ez nincs jelölve)
Eközben a Thread Pool észreveszi, hogy van egy feladat a feladatsorában és indít neki egy szálat (11). A .NET 4 Thread Pool számára a feladatok egy IThreadPoolItem interfészt valósítanak meg, aminek a Thread Pool meghívja az ExecuteWorkItem függvényét (12). Ez a hívás első lépésben lefuttatja az eltárolt delegate által mutatott kódot (13), majd ha ez végetért, végignézi a Task Continuations listáját. Mivel ez a lista most nem üres, a benne található Task-nak meghívja a ScheduleAndStart() függvényét (14).
Mint korábban, a Task most is az eltárolt Scheduler segítségével indítja el magát, ami ebben az esetben egy SynchronizationContextTaskScheduler példány. Ez a példány a QueueTask hívás hatására, a belül eltárolt SynchronizationContext példány Post metódusát használja (az ábrán nem lészretezett, de egy korábbi cikkben leírt módon) (15). Ekkor a Message Loop számára egy üzenet érkezik, amelynek hatására az néhány trükk után meghívja a Task példány ExecuteEntry függvényét (16), amely közvetlenül végrehajtja az eltárolt delegate által mutatott függvényt (17).
A GUI szálon futó Task példányoknál oda kell figyelni, ha azok új taszkokat indítanak. Ha a fenti kód nem egy üzenetkezelőből, hanem egy GUI szálon futó taszkból indulna, teljesen más eredményt kapnánk. Nézzük a következő kódot:
Task.Factory.StartNew(
() =>
{
Task.Factory.ContinueWhenAll(
new[] { Task.Factory.StartNew(() => Calculate(1)),
Task.Factory.StartNew(() => Calculate(2)),
Task.Factory.StartNew(() => Calculate(3))},
tasks =>
{
StringBuilder result = new StringBuilder();
foreach (var t in tasks)
{
result.Append(t.Result.ToString());
result.Append(" ");
} // foreach
this.TextBlock.Text = result.ToString();
});
},
CancellationToken.None,
TaskCreationOptions.None,
TaskScheduler.FromCurrentSynchronizationContext());
Ebben a példában az előző példa egy Task-ba lett becsomagolva, ami a 25. sorban megadott paraméter miatt egy SynchronizationContextTaskScheduler példányon keresztül kerül feldolgozásra, és a GUI szálon fog elindulni. Mikor elindult, elkezdődik az a folyamat, ami korábban ismertetve lett. A Task.Factory.ContinueWhenAll() híváshoz kiértékelődnek a paraméterek, és eközben létrejön és elindul a három, Calculate() hívást tartalmazó Taszk. A TaskFactory.StartNew() kódja most az előző példával ellentétben észre fogja venni, hogy egy taszk végrehajtása van folyamatban, és így veszi a taszkhoz tartozó Scheduler-t, ami ebben az esetben egy SynchronizationContextTaskScheduler. Emiatt mind a három taszk a GUI szálra lesz irányítva, és ott csak egymás után tudnak lefutni. A másik különbség az előző kódhoz képest, hogy a ContinueWhenAll() hívásnak most nincs átadva TaskScheduler. Ez nem szükséges, mivel a három taszk mindegyike a GUI szálon fut, emiatt ott fogják elindítani az 10-21 sorban megadott delegate-et becsomagoló taszkot is, és ekkor a 20-ik sorban a kontrol manipulálása rendben megtörténhet.
Ha egy SynchronizationContextTaskScheduler által indított taszk kódjából akarunk új taszkokat indítani, amelyek nem érintik a gui-t, külön meg kell adni a TaskScheduler-t. A következő példa GUI szálon futó taszkból indít új taszkokat, és az új taszkok mégis a Thread Pool-on fognak lefutni, mivel a paraméterként átadott TaskScheduler.Default érték egy ThreadPoolTaskScheduler példányt takar:
Task.Factory.StartNew(
() =>
{
Task.Factory.ContinueWhenAll(
new [] { Task.Factory.StartNew(
() => Calculate(1),
CancellationToken.None,
TaskCreationOptions.None,
TaskScheduler.Default),
Task.Factory.StartNew(
() => Calculate(2),
CancellationToken.None,
TaskCreationOptions.None,
TaskScheduler.Default),
Task.Factory.StartNew(
() => Calculate(1),
CancellationToken.None,
TaskCreationOptions.None,
TaskScheduler.Default)},
tasks =>
{
StringBuilder result = new StringBuilder();
foreach (Task<int> t in tasks)
{
result.Append(t.Result.ToString());
result.Append(" ");
} // foreach
this.TextBlock.Text = result.ToString();
},
CancellationToken.None,
TaskContinuationOptions.None,
TaskScheduler.FromCurrentSynchronizationContext());
},
CancellationToken.None,
TaskCreationOptions.None,
TaskScheduler.FromCurrentSynchronizationContext());
Custom Scheduler-ek
Az előzőekben megismerkedtünk a .NET-tel szállított két Scheduler típussal. De szükség lehet-e más Scheduler alkalmazására? Ritkán ez előfordulhat. Milyen esetekben? Ahelyett, hogy kitalálnék példákat, könnyebb ötleteket kölcsönözni a pfxteam által közreadott Parallel Programming Samples által megvalósított Task Schedulerek alapján. (A Schedulerek a letöltött zip \ParallelExtensionsExtras\TaskSchedulers\ könytárban vannak)
Egyik könnyen elfogadható példa, hogy néha felmerül annak az igénye, hogy a Task-okhoz prioritások vannak rendelve. Ez a funkcionalitás a jelenlegi Scheduler-ekkel nem érhető el.
Egy másik, első ránézésre megkérdőjelezhető jogosultságú Scheduler a ParallelExtensionExtras-ban a LimitedConcurrencyLevelTaskScheduler. Ez annyit tud, hogy megadható, egyszerre maximum hány Thread Pool szál dolgozzon párhuzamosan a Task-okon. Ez elvileg a ThreadPoolTaskScheduler segítségével is megoldható, mivel a Thread Pool számára beállítható, hogy maximum hány szálat indítson. Van azonban egy nagyon lényeges dolog, amit itt észre kell venni. Ha átállítjuk a Thread Pool-on a maximális szálak számát, akkor az a programunk egészére kihat. Ennek súlyos következményei lehetnek, elég az előző cikk azon példájára gondolni, amelyik a kevés szál miatt beragadt. Ha mi figyelünk is rá, valami régebbi library esetleg még mindig leállhat a szálak korlátozása miatt. Emiatt biztonságosabb csak a Task-jaink számára rendelkezésre álló szálak számát korlátozni, és erre jó lehet a LimitedConcurrencyLevelTaskScheduler. De miért akarnánk korlátozni a szálak számát?
Ennek akkor lehet értelme, ha egy masszív párhuzamos algoritmust szeretnénk taszkok segítségével futtatni valamiféle háttérmunkaként. A probléma itt az, hogy a gyári Scheduler-ek közül csak a ThreadPoolTaskScheduler használható rendes párhuzamos munkához, ami pedig a taszkokat szimplán ráengedi a Thread Pool-ra. Ekkor, ha a feladatunkat ügyesen osztottuk taszkokra, az összes Thread Pool szál elkezd rajta dolgozni. A Thread Pool szálak egy lokális feladatsorral rendelkeznek, ahova a szálon képződött új taszkok kerülnek. A szál elsősorban ezeket fogja feldolgozni, és ki sem tekintenek a globális feladatsorra. Ha most a háttérmunka mellé a feladatsorba küldünk egy új feladatot, azt a meglévő szálak nem fogják felvenni.
A Thread Pool van annyira intelligens, hogy ha észreveszi, hogy a globális feladatsorból nem fogynak a taszkok, akkor új szálat állít munkába. Ez a gyakorlatban azt jelenti, hogy kb fél másodperc várakozás után lesz egy szál, ami kiszedi a globális feladatsorból a taszkot. Mi történik, miután ezt elvégezte? Ha a globális sorban nincs feladat, akkor lop egyet az egyik szál lokális feladatsorából (work stealing), és azon kezd el dolgozni. Ez a feladat valószínű a háttérmunkánk egy taszkja lesz, ami a massziv párhuzamos algoritmus miatt folyamatosan újakat termel. Most ha új feladat kerül a globális sorba, megint nem lesz, ami feldolgozza azt. Ezt a Thread Pool újra észreveszi, és új szálat állít munkába, ami ha elvégzi a munkát, mi fog történni? Az is lop egy feladatot, és ez a szál is a háttérmunkának szánt feladatunkon kezd el dolgozni, többet ki sem tekintve a globális feladatsorra. A szálak száma egyébként nem fog elszaladni, mivel a Thread Pool egy hill-climbing-nek nevezett eljárás miatt próbálgatja, hogy hány szál működtetése az optimális, és emiatt néha leállít szálakat a Thread Pool-ban. Az azonban látszik, hogy a háttérmunkának szánt párhuzamos feladatunk megzavarja a Thread Pool logikáját.
A ParallelExtensionsExtras LimitedConcurrencyLevelTaskScheduler-je megoldhatja azt a problémát, hogy az új Thread Pool szálak ne ugorjanak rá mindig a háttérmunkánkra.
Egy saját Scheduler implementáció
A fenti példa alapján az látszik, hogy a LimitedConcurrencyLevelTaskScheduler-nél talán még hasznosabb lenne egy olyan Scheduler, ami nem is a Thread Pool-t használja, hanem saját dedikált szálakat. Ilyen is akad a ParallelExtensionsExtras csomagban WorkStealingTaskSchedules néven, de hogy látható legyen, mennyire nem nagy dolog saját Scheduler-t írni, most összeállítunk egy egyszerű implementációt.
A saját Scheduler paraméterben megadható számú dedikált szálat fog használni. Az ütemezendő taszkok egy feladatsorba fognak kerülni, ahonnan a dedikált szálak ezeket felvehetik. Egy kis ötletlopást alkalmazva azonban a szálak maguk is fognak lokális feladatsort használni, ahova azok a taszkok kerülnek majd, amelyeket az éppen végrehajtott taszk hoz létre. Miért fontos ez? Az előző cikkben szó volt a lokalitási elvről, ott megtudható az indok.
A lokális feladatsornak azonban van egy hátulütője: előfordulhat, hogy abban nagyon sok feladat feltorlódik, míg más szálak kifogynak a munkából. A .NET Thread Pool emiatt vezette be a work stealing módszert. Mi ezt most nem fogjuk implementálni, mert kicsivel bonyolultabb adatszerkezeteket igényel. A mi primitív módszerünk az lesz, hogy ha túl sok feladat van a lokális feladatsorban, akkor onnan egyet átrakunk a globális sorba.
Az eddigiek alapján az osztály így néz ki:
public class ExampleScheduler : TaskScheduler
{
/// <summary>
/// A lokális feladatsorba tehető feladatok maximális száma. Ha ennél több
/// feladat keletkezik adott szálon, akkor feladatokat kell áttenni a globális
/// sorba, ahonnan azokat más szálak is fel tudják venni.
/// </summary>
private const int LocalQueueLimit = 8;
/// <summary>
/// A globális feladatsor. Mivel több szál éri el, a .NET speciális adatszerkezetét
/// használjuk.
/// </summary>
private ConcurrentQueue<Task> globalTaskQueue;
/// <summary>
/// A lokális feladatsor. A ThreadStatic attributum miatt minden szál egy saját példányt
/// fog látni. Ez egyben azt is jelenti, hogy a statikus konstruktor ezt a példányt nem
/// tudja inicializálni (mivel az csak egyszer fut le). Azért Stack adatszerkezetre esett
/// a választás, mert a lokalitási elv miatt a legutoljára berakott taszkot érdemes
/// következőnek feldolgozni. A Stack adatszerkezet hátránya, hogy amikor feladatokat kell
/// átrakni a globális feladatsorba, akkor a Stack aljárol lenne optimális kivenni feladatokat,
/// ezt azonban a .NET-es implementáció közvetlen nem engedi (közvetve meg túl erőforrásigényes)
/// </summary>
[ThreadStatic]
private static Stack<Task> localTaskQueue;
/// <summary>
/// A szemafor a globális feladatsorba kerülő elemek számlálására és kiosztására szolgál.
/// A SemaphoreSlim egy .NET-es implementáció, nem windows kernel objektumára épül, emiatt
/// sokkal gyorsabb működést tesz lehetővé - azon az áron, hogy processzek között nem működik,
/// ami most nem érdekes.
/// </summary>
private SemaphoreSlim taskAvailable;
/// <summary>
/// Azoknak a szálaknak az azonosítója, amelyek a pool-unkban dolgoznak. Amiatt kell, hogy
/// ellenőrizni lehessen, az adott hívás "kívülről" vagy egy taszk végrehajtása közben
/// történik. Ezen információ alapján lehet eldönteni, hogy globális vagy lokális feladatsort
/// kell használni.
/// </summary>
private HashSet<int> workerThreadIds;
/// <summary>
/// A Scheduler osztály inicializálása. Ez a függvény indítja el a saját pool-hoz tartozó
/// szálakat, a maxConcurencyLevel paraméternek megfelelően.
/// </summary>
/// <param name="maxConcurencyLevel">A poolban lévő szálak száma.</param>
public ExampleScheduler(int maxConcurencyLevel)
{
this.globalTaskQueue = new ConcurrentQueue<Task>();
this.taskAvailable = new SemaphoreSlim(0);
this.workerThreadIds = new HashSet<int>();
// Ezen a ponton indulnak a pool szálai. A szálak logikája a PoolWorker
// függvényben vannak implementálva.
for (int i = 0; i < maxConcurencyLevel; i++)
{
Thread workerThread = new Thread(PoolWorker);
// A szál Id-ját el kell tárolni a későbbi azonosítás céljából.
this.workerThreadIds.Add(workerThread.ManagedThreadId);
workerThread.Start();
} // for i
} // ExampleScheduler()
private void PoolWorker() {...}
protected override void QueueTask(Task task) {...}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) {...}
protected override bool TryDequeue(Task task) {...}
protected override IEnumerable<Task> GetScheduledTasks() {...}
} // class ExampleScheduler
A mezők feletti kommentekből kiderül, hogy melyik mire lesz használatos. Ami hátra van, az annak a néhány virtuális függvénynek az implementálása, amire a .NET környezet támaszkodik, illetve a saját szálaink logikáját megvalósító PoolWorker() függvény. Nézzük először a szál logikát:
private void PoolWorker()
{
// Minden szál maga inicializálja a Thread Statikus változóját.
localTaskQueue = new Stack<Task>();
// Normális implementációnál meg kellene oldani, hogy a szálak ki tudjanak lépni,
// most a végtelenciklus is megteszi.
while (true)
{
// A szemafor pontosan egy szálat fog feléleszteni, amint egy taszk megjelenik a
// feladatsoron.
this.taskAvailable.Wait();
Task taskToBeExecuted;
// Ezen a ponton biztosan van feladat a globális feladatsoron, a TryDeqeue az
// adatszerkezet jellegzetessége miatt ilyen felépítésű. Elvileg nincs esély a
// szemafor miatt, hogy false-szal térjen vissza, de azért nem árt ellenőrizni.
if (globalTaskQueue.TryDequeue(out taskToBeExecuted))
{
// A következő hívás egy két ellenőrzés után meghívja a Task példány
// ExecuteEntry() metódusát, ami a Task-ban levő delegate és egyéb utómunkák
// végrehajtását eredményezi.
base.TryExecuteTask(taskToBeExecuted);
// A fenti hívás hatására új Taszkok jelenhettek meg a lokális sorban, ha a
// a taszk al-taszkokat hozott létre, és azok örökölték a jelenlegi ExampleScheduler
// példányt. Ekkor ezen a szálon egy QueueTask() hívódott meg, ami a lokális
// sorba adagolta a taszkokat.
while (localTaskQueue.Count > 0)
{
// Ez a hívás végrehajtja a taszkot, ami megint új taszkkal bővítheti a lokális
// sort. Ez az ciklus így többszázezerszer-milliószor is lefuthat bizonyos
// algoritmusoknál.
base.TryExecuteTask(localTaskQueue.Pop());
} // while
} // if
} // while
} // PoolWorker()
A Scheduler-ek legfontosabb függvénye a QueueTask, ennek hatására várja a programkörnyezet, hogy a taszk elindul a végrehajtási soron, a Scheduler vagy az alatta levő mechanizmus logikájának megfelelően. A leggyakrabban használt Task.Factory.StartNew() függvény is végső soron erre a függvényre fut rá:
/// <summary>
/// Annak az ellenőrzése, hogy az aktuális szál a pool-unk egy szála, vagy valami más.
/// Ez amiatt fontos, hogy csak a pool szálakon használjuk a lokális feladatsort. Gyakorlatban
/// minden más szál is láthatna egy lokális sor példányát, csak ahhoz a saját logikája alapján
/// soha nem nyúlna (mivel nem is tud róla).
/// </summary>
/// <returns>Igaz, ha az aktuális szál a pool egy szála.</returns>
private bool IsRunningOnWorkerThread()
{
// A konstruktor egy HashSet-ben eltárolta az összes szál Id-ját, ez az adatszerkezet
// O(1) időben ellenőrzi, hogy a jelenlegi szál benne van-e. Mivel tipikusan csak
// pár szálról van szó, lehet, hogy egy tömb egy ciklussal gyorsabb lenne, én nem mértem ki.
return this.workerThreadIds.Contains(Thread.CurrentThread.ManagedThreadId);
} // IsRunningOnWorkerThread()
/// <summary>
/// A Taszkot környező osztályok illetve a Task osztály maga közvetve vagy közvetlenül
/// ebbe a függvénybe fut bele, amikor egy Task példányt fel kell dolgozni.
/// </summary>
/// <param name="task">A feldolgozandó Task</param>
protected override void QueueTask(Task task)
{
// Attól függően, hogy saját szálunkból jutottunk ide vissza (ami nyilván egy taszk
// feldolgozása közben történhet csak meg) a lokális vagy a globális feladatsorba
// kerül a taszk.
if (IsRunningOnWorkerThread())
{
// Ha saját szálban vagyunk, a lokális feladatsor a cél, azt azonban nem szabad
// túltölteni, mert akkor a szálak (és így a processzor magok) egyenetlenül lesznek
// terhelve. Kulturált megoldás helyett túltöltés esetén egyszerűen átrakjuk a
// feladatsor tetején levő elemet a globális sorba. Az optimális megoldás az lenne,
// ha a legrégebben betett elemet vehetnénk ki, ezt azonban a Stack közvetlenül nem
// támogatja. Így kénytelenek vagyunk egy "hot" elemet kiszedni.
if (localTaskQueue.Count >= LocalQueueLimit)
{
// új feladat a globális sorban, a szemafort is értesíteni kell, hogy feléleszthesse
// az esetlegesen várakozó szálakat.
this.globalTaskQueue.Enqueue(localTaskQueue.Pop());
this.taskAvailable.Release();
} // if
localTaskQueue.Push(task);
}
else
{
this.globalTaskQueue.Enqueue(task);
this.taskAvailable.Release();
} // else
} // QueueTask()
A következő függvények speciálsi helyzetben használhatóak:
/// <summary>
/// Megkísérli adott taszkot végrehajtani az éppen futó szálon. Ez a függvény jellemzően akkor
/// hívódik meg, amikor egy taszknak másik taszkokra kell várakoznia. Várakozás előtt a taszk
/// megpróbálja adott szálon maga végrehajtatni a többi taszkot, ennek a függvénynek a segítségével.
/// Ezt a módszert "inlining"-nak hívják. Az inlining egyik feltétele, hogy ha a taszk már a
/// feladatsorban van, akkor ki kell onnan emelni. Ez rendes adatszerkezetek esetén könnyebben
/// elérhető, az általunk használt Stack és Queue esetén azonban csak nagyon limitált lehetőségek
/// vannak.
/// Az inlining meghiúsulása nem jelent végzetes problémát, csak a függő szál várakozni lesz
/// kénytelen, amíg a hagyományos mechanizmusok fel nem dolgozzák a várakozott taszkot.
/// Az implementáció próbálkozhatna a globális feladatsor ellenőrzésével, azonban a kicsi
/// esélyek, és a szinkronizációs bonyodalmak miatt ez itt most nem szerepel.
/// </summary>
/// <param name="task">A végrehajtani kívánt taszk.</param>
/// <param name="taskWasPreviouslyQueued">A taszk már valamelyik feladatsorban van.</param>
/// <returns>Igaz, ha az inlining sikeres volt. </returns>
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
// Ha a taszk már a feladatsorban van, ki kell onnan szedni. Ha ez sikertelen, az
// inline sikertelen lesz.
if (taskWasPreviouslyQueued && !this.TryDequeue(task))
{
return false;
} // if
// Ha már nincs a feladatsorokban a taszk, végre lehet hajtani inline.
return base.TryExecuteTask(task);
} // TryExecuteTaskInline()
/// <summary>
/// A taszk kiemelése a feladatsorokból. Az inlining és a task cancellation esetében
/// hívott függvény. A mi esetünken kicsi esély van a taszk kiemelésére.
/// </summary>
/// <param name="task">A feladatsorból kiemelendő taszk.</param>
/// <returns>Igaz, ha a kiemelés sikeres volt.</returns>
protected override bool TryDequeue(Task task)
{
// akkor lehetséges a kiemelés, ha a lokális feladatsor tetején van a taszk.
if (localTaskQueue.Count > 0 && localTaskQueue.Peek().Equals(task))
{
localTaskQueue.Pop();
return true;
}
else
{
return false;
} // else
} // TryDequeue()
/// <summary>
/// Ezt a függvényt a visual studio és egyéb toolok használhatják a nyomkövetés és
/// diagnosztikák elkészítéséhez.
/// </summary>
/// <returns></returns>
protected override IEnumerable<Task> GetScheduledTasks()
{
throw new NotSupportedException();
} // GetScheduledTasks()
TaskScheduler és az Exception-ok
A TaskScheduler osztály a UnobservedTaskException event-en keresztül részt vehet a taszkokon keletkező kivételek kezelésében. A Taszkok és a kivételek együtt olyan összetett téma, hogy önmagában saját cikket érdemel. Emiatt itt ez most nem lesz kifejtve.
Összefoglalás
A Task osztály használata elképzelhető a TaskScheduler-ek ismerete nélkül is. Speciális helyzetekben azonban sok kellemetlenségtől kímélhet meg, ha tisztában vagyunk azzal a folyamattal, ahogy a Task-ok feldolgozásra kerülnek, és azokkal a lehetőségekkel, amelyekkel a feldolgozást befolyásolni lehet.





#1 by senki on 2011. április 13. - 21:07
Gratula. Nagyon jók a cikkjeid, annyira jók, hogy akár könyvet is írhatnál:).
#2 by Tóth Viktor on 2011. április 14. - 07:57
Köszönöm! Gondolkodtam a könyvön (e-könyvön ingyért), egyrészt mert már most egész sok oldal terjedelemnél jár a multithreading sorozat, pedig kb csak a felénél-harmadánál járok, másrészt pedig másodszorra kicsit másképpen építeném fel a tartalmat (van sok részlet, ami felesleges, illetve pár helyen több gyakorlati példát lehetne mutatni). Majd meglátom mire lesz időm-kedvem.