Task-based Asynchronous Pattern – A TaskScheduler

Az előző cikkben megismerhettük a .NET 4-gyel kapott új Thread Pool-t, illetve röviden találkoztunk a Task-ok és a TaskScheduler fogalmával. Megtudtuk, hogy az új Thread Pool jelentős fejlesztéseken ment keresztül. Ez a fejlesztés annak érdekében történt, hogy a már jelenleg is tapasztalható tendencia, miszerint a számítógépekben egyre több processzormag található, könnyebben kiaknázható legyen. A Task-ok szolgálnak arra, hogy a feladatainkat kisebb darabokra tördeljük, mely darabokat ezután az új Thread Pool hatékonyan szétosztja a processzormagok között. Ez a kép így elég tiszta, azonban leegyszerűsített. A Task-ok végrehajtásában több osztály/mechanizmus vesz részt, és ezek különböző absztrakciós szinteken helyezkednek el.

Task-okról röviden

Ennek a cikknek a témája a TaskScheduler. Task-ok nélkül a TaskScheduler azonban nem tárgyalható. Emiatt ez a rész minimális szinten ismerteti a Task-okat, teljes egészében pedig egy későbbi cikk foglalkozik velük.

A Task-ok megértéséhez vissza kell emlékeznünk az előző cikkben felmerült problémákra. Az egyik probléma a granularitás volt. Ha egy program nem osztható szét megfelelően kicsi, párhuzamosan végrehajtható részekre, akkor a processzormagok kihasználtsága nem biztosítható kellőképpen. Ezeket a “megfelelően kicsi, párhuzamosan végrehajtható részeket” képviselik a Task-ok.

Kicsi, végrehajtható részeket a delegate-tek is le tudnak írni, és akár még párhuzamosan is futtathatóak a fordító által generált BeginXX függvényekkel, vagy egyszerűen a Thread Pool-ra dobálva őket. Csak egy számítás vagy egy feladat leírására ezért nem érné meg egy új koncepciót bevezetni. Nem mindig lehetséges azonban egy programot/számítást/algoritmust egy lépésben és egymástól független részekre bontani. Sokkal valószínűbb az az eset, amikor egy számítás/algoritmus elkezdődik, majd a futásának egy bizonyos pontján adódik olyan helyzet, hogy az algoritmus több, párhuzamosan futó szálon folytatható. Tipikus és gyakran bemutatott példája ennek a QuickSort algoritmus, mely első lépésében egy vektort két részre oszt, majd a két részvektoron működik tovább – akár párhuzamosan.

Az is gyakori, hogy a párhuzamosan elvégzett taszkok eredményét egy további taszk összegyűjti, és azon dolgozik tovább egymaga. Ehhez vagy egy már elindított taszknak a futását kell addig felfüggeszteni, amíg az összes függést okozó taszk le nem fut, vagy eleve csak akkor kell a taszkot elindítani, amikor függést okozó taszkok lefutottak. A taszkok között tehát függőségeket lehet felírni, és a .NET Task osztálya ezeket a függőségeket képes ábrázolni. A következő néhány példa segít megérteni az alapeseteket.

Az első esetben nincsenek függőségek. A taszk programkódja a futása közben létrehoz és elindít új taszkokat, melyek azután a szülőtaszktól függetlenül futnak tovább, esetleg újabb taszkokat létrehozva. Mindez a hagyományos delegétekkel is elérhető:

A kék vízszintes sávok a taszkokat jelképezik, időben balról jobbra haladva. Futás közben különböző függvényeket hajtanak végre. Új taszkok akkor keletkeznek, amikor a sárga alapban levő programkód fut le. Az ábrán leírt esetet a következő felépítésű kóddal lehet létrehozni:

...
int r = CalculateSomething();
...
Task.Factory.StartNew(() => CalculatePart(r - 1));
Task.Factory.StartNew(() => CalculatePart(r + 1));
...
DoOtherThings();

A programkód tehát számol valamit a CalculateSomething() hívással, ezután elindít két párhuzamos számítást (a két Task.Factory.StartNew() hívás), amelyek esetleg a Thread Pool szálain fognak lefutni, miközben az eredeti szál folytatja a futását egyéb műveletekkel (DoOtherThings).

Más esetekben egy Task lefutása után szeretnénk, ha elindulna egy vagy több másik Task. Ehhez hasonlót a delegate-ek is tudnak, hiszen minden delegate-et leíró osztálynak a fordító generál egy függvényt, amelyik BeginXX() névvel rendelkezik, és megatható neki egy callback delegate. Mivel minden a programozó által definiált delegate a MulticastDelegate osztályból származik, ami pedig képes több függvényre referálni, a callback delegate takarhat többszörös függvényhívást is. Ugyanakkor a delegate-ek esetében az összes callback ugyanazon a szálon fog lefutni, míg a Task-ok esetében ez nem feltétlenül igaz. A következő ábrán látható lefutást tehát már csak a Task-ok támogatják:

Taszkok esetében a fenti ábrán látható szervezést a következő kóddal érhetünk el:

Task<int> precalculation = new Task<int>(() => Precalculation());
precalculation.ContinueWith(task => Calculation(task.Result - 1));
precalculation.ContinueWith(task => Calculation(task.Result + 1));
precalculation.Start(); 

Ebben az esetben készül egy precalculation nevű taszk. Ehhez hozzá lesz rendelve “continuation” taszkként két új taszk, amelyek a ContinueWith() függvénynek átadott delegate-ek alapján jönnek létre. A “continuation” taszkok csak akkor kezdik el a futásukat, ha a taszk, amihez continuation taszként lett rendelve, lefutott. Ezután indítjuk az előszámítást végző taszkot a Start() hívással. Hogy pontosan mi történik ilyen esetekben, pontosan kinyomozzuk a következő cikkben. Most csak annyi érdekes, hogy lássuk, a taszkoknak milyen függőségeik lehetnek.

Más esetekben pont fordított függésre van szükség. Egy mátrix szorzását például egyszerű több taszkra szétbontani, ugyanakkor a szorzás eredményét valószínű fel szeretnénk használni. Az eredményt feldolgozó taszknak meg kell várnia az összes taszkot, ami a mátrix szorzásán dolgozik. Ekkor a Taszkok a következőhöz hasonló módon vannak szervezve:

Az ezt megvalósító kód pedig:

 
Task part1 = Task.Factory.StartNew(() => CalculatePart(p1);
Task part2 = Task.Factory.StartNew(() => CalculatePart(p2);
Task part3 = Task.Factory.StartNew(() => CalculatePart(p3);
 
Task.Factory.ContinueWhenAll(new[] { part1, part2, part3 }, (tasks) => FinishCalculation(tasks));

Itt elindul három független taszk (part1, part2, part3), majd készítünk egy olyan taszkot, ami akkor indul el, ha mind a három taszk befejezte a működését.

Mit tudunk tehát a Taszkokról?

Amint kiderült, a taszkok lényegében becsomagolt delegate-ek, amelyek között különböző típusú függőségeket határozhatunk meg. Ezek a függőségek nagyban hatással vannak a taszkok végrehajtási sorrendjére. Van a taszkoknak egyéb más szolgáltatása is, amivel itt nem kell foglalkozni. Ilyenek például a taszkok futásának megszakíthatósága (cancellation), és taszkok hierarchiáján is működő kivételek (exceptions). Ezekkel későbbi cikkben foglalkozunk.

Task Schedulerek

Az előző cikkben egyszerre tárgyaltuk az új Thread Pool-t, ami aszerint lett optimalizálva, hogy kisebb feladatokat is hatékonyan elvégezzen. Ugyanitt lett először bevezetve a kisebb feladatokat leíró Taszk fogalma, emiatt úgy tűnhet, hogy ez a két dolog szorosan összefügg. Ez igazából jól is tűnik, és általában igaz az, hogy a Task-okat a taszkok feldolgozására optimalizált Thread Pool fogja végrehajtani.

A Taszk azonban egy általánosabb fogalom, és független kellene, hogy legyen attól a mechanizmustól, ami azokat végrehajtja. Nem is kell eröltetett példákat keresni, foglalkoztunk már a GUI-s programok speciális igényeivel, és a SynchronizationContext-tel. Egy GUI-s alkalmazásnál vigyázni kell azzal, hogy egy feladat melyik szálon fut le. Ugyanakkor miért ne lehetne használni a GUI közelében az amúgy önmagában is hasznos koncepciót, a Task-ot?

GUI-s alkalmazás közelében a Task-okat viszont csak úgy lehet használni, ha elérhető, hogy a Task-ok, vagy legalábbis bizonyos Task-ok ne a Thread Pool Thread-eken, hanem a GUI Thread-en fussanak le. Megoldható lenne ugyan, hogy a Task-ok belül egy SynchornizationContext-et használva a vezérlést a GUI szálra irányítsák, vagy egyéb módon jelezhetné a Task a speciális igényeit, ez azonban kicsit belecsúnyítana a Task-ok koncepciójába.

A másik lehetőség, hogy a Task-ok végrehajtása a Task-ok ábrázolásától különböző absztrakciós szintre tartozik. Ez az absztrakciós szint a Task Scheduler-eké.

Amikor egy Task-ot létrehozunk, vagy a futását elindítjuk, akkor ahhoz megadhatunk egy TaskScheduler példányt. Ha ezt mi nem tesszük meg, a Task felveszi annak a Task-nak a Scheduler-jét, ami Task őt létrehozta, vagy ha ilyen nincs – mert mondjuk a Task-ot nem egy másik Task-ból hozzuk létre – akkor felveszi a Default Task Scheduler-t. A Default Task Scheduler egy egyszerű statikus változó, a TaskScheduler.Default property-n keresztül érhető el, és a TaskScheduler statikus konstruktora állítja be egy olyan példányra, ami a Task-okat a Thread Pool-ra irányítja. Ebből adódik, hogy ha egyáltalán nem foglalkozunk a Scheduler-ekkel, akkor a Task-jaink a Thread Pool Thread-jein fognak lefutni.

Összefoglalva tehát, minden Task-hoz tartozni fog egy Task Scheduler, vagy a programozó által megadott, vagy a szülő Task-hoz rendelt, vagy a Default.

Scheduler-e a Task Scheduler?

A Scheduler elnevezés kicsit félrevezeti az embert, amikor azt próbálja összerakni, hogy mi is ennek az osztálynak a szerepe. Azt gondolhatnánk, hogy ez az osztály dönti el, hogy melyik Task mikor hogyan fog lefutni. Nos, lehetne a Task Scheduler-eknek olyan implementációja, ami ezt tényleg meg is teszi, azonban a .NET-tel kapott Scheduler-ek igazából nem Scheduler-ek abban az értelemben, hogy semmit nem ütemeznek.

Mit csinálnak akkor? Többnyire csak egy egységes felületet adnak a Task-ok felé, hogy azok néhány alapműveletet el tudjanak végezni. Mik ezek az alapműveletek? Amikor egy Task-ot elindítunk (a programkódunk elindítja, akár egy másik Task kódjából, akár máshonnan), akkor a Task veszi a hozzárendelt TaskScheduler-t, és meghívja a QueueTask() függvényét. Tehát egy Task indítása alapesetben azt jelenti, hogy a TaskScheduler-t megkéri a Task, hogy tegye egy végrehajtási sorba.

Hogy ezután mi történik, az az aktuálus TaskScheduler-től függ. Az a TaskScheduler, amelyik a .NET Thread Pool-t használja, (egy-két flag ellenőrzése után) a Thread Pool-nak adja tovább a Task-ot, és az ütemező logika a Thread Pool-ban van implementálva.

Ha olyan Task Scheduler van a Task-hoz hozzárendelve, ami a GUI alkalmazások számára készült, akkor a Scheduler egy korábban eltárolt SynchronizationContext segítségével a GUI szálnak post-olja a Task-ot (némi delegate-be csomagolás után, mivel a Synchronization Context nem ismeri a Task koncepcióját).

Mind a két .NET-hez adott Task Scheduler tehát szimplán továbbhárítja az ütemezés felelőségét egy alatta lévő osztálynak/mechanizmusnak, inkább egyfajta adapterként működve a Task-ok felé, mint valódi ütemezőként. Ez azonban csak implementációs részlet, elképzelhető olyan implementáció, ami valódi ütemezést valósít meg.

Absztrakciós szintek megkavarva

A TaskScheduler osztályok képviselik tehát azt az absztrakciós szintet, ahol a feladatok ütemezése történik. Ez a nagykönyv szerint szép is lenne, de ritka az olyan gyakorlati helyzet, hogy a nagykönyvi módszerek keresztbe patkolása a gyakorlatban ne működne praktikusabban/hatékonyabban. Ez így van a Task-ok, TaskScheduler-ek esetében is, így van pár helyzet, amikor a Task beleszólhat az ütemezésbe.

Az egyik ilyen helyzet a Task Continuation-ok ütemezése. A Task Continuation egy olyan Task, amit akkor kell lefuttatni, amikor egy másik Task már lefutott. Ezeket írtuk le egy korábbi példa kódban a ContinueWith() függvényhívással. Elvileg ezeket a Continuation Task-okat át lehetne adni az ütemezőnek, ami szépen gondoskodhatna róla, hogy amíg a szükséges feltételek nem teljesülnek, nem futtatja le az adott Task-okat. A valóság azonban az, hogy ez a logika a Task-okban van implementálva. Egy Task osztály példányhoz hozzá lehet rendelni egy vagy több taszkot, amiket le kell futtatni, ha a taszk futása befejeződott. Ez koncepcionálisan a completion callback függvényekhez hasonlít. Minden Task példány a belső megvalósításában rendelkezik egy listával, amely az ő Continuation taszkjait fűzi fel. Ezek a Continuation Task-ok nem kerülnek a Scheduler queue-ba egészen addig, amíg a Task által képviselt feladat le nem futott. Amikor a feladat készen van, a Task osztály implementációja fogja a Continuation listát, és az elemeit a Scheduler queue-ba helyezi a TaskScheduler.QueueTask() hívással. Ezzel nagyon sok munkát levesz a Scheduler (illetve a Scheduler alatt levő) példánytól – azon az áron, hogy nem élesek az absztrakciós szintek határai.

Egy másik eset, amikor a Task beleszól az ütemezésbe, az az, amikor várakoznia kell másik Task(ok) lefutására. Ebben az esetben a Task kódja, mielőtt elkezdene valójában várakozni, megkéri a TaskScheduler-t, hogy “inline”-olja be a Task-okat, azaz azonnal hajtsa őket végre az aktuális szálon. Ez az “inline”-olás persze nem mindig lehetséges, triviális példa, ha a Task futása már elkezdődött.

Gyári Task Scheduler osztályok

A .NET két előre csomagolt Task Scheduler osztállyal érkezik, lefedve ezzel azokat funkciókat, amelyek eddig is megvalósíthatóak voltak a Thread Pool mezitlábas használatával illetve a SynchronizationContext osztály használatával. A Thread Pool-on alapuló implementáció neve ThreadPoolTaskScheduler, és ez lesz a default osztály, amit egy Task kap, ha nem definiálunk neki külön egy másikat, illetve nem tudja örökölni az éppen futó taszkét. Azok a Task-ok, amelyek a GUI-t manipulálják, a SynchronizationContextTaskScheduler egy példányát kell kapják. A SynchronizationContextTaskScheduler-t használó taszkok kódjában egy kicsit jobban oda kell figyelni. Ha egy ilyen taszk újakat hoz létre anélkül, hogy definiálná a Schedulert, akkor minden újonnan létrehozott taszk szintén a SynchronizationContextTaskScheduler példányt örökli, és így minden egyes taszk a GUI szálon fog lefutni, lényegében egyszálúsítva az alkalmazást. Ha van tehát egy GUI-t kezelő taszk, és az széttörhető több másik taszk-ra, amelyek egyenként nem nyúlnak a GUI-hoz, külön meg kell adni, hogy például a ThreadPoolTaskScheduler-t használják.

Nézzük a következő példát, amely egy alkalmazás eseménykezelő függvénye:

 
private void Button_Click(object sender, RoutedEventArgs e)
{
  Task.Factory.ContinueWhenAll(

      new[] { Task.Factory.StartNew(() => Calculate(1)),
              Task.Factory.StartNew(() => Calculate(2)),
              Task.Factory.StartNew(() => Calculate(3))},

      tasks => 
      {
         StringBuilder result = new StringBuilder();

         foreach (var t in tasks)
         {
            result.Append(t.Result.ToString());
            result.Append(" ");
         } // foreach

         this.TextBlock.Text = result.ToString();
      },

      CancellationToken.None,
      TaskContinuationOptions.None,
      TaskScheduler.FromCurrentSynchronizationContext());
}

private int Calculate(int p)
{
  Thread.Sleep(2000);
  return Thread.CurrentThread.ManagedThreadId;
}

Mi történik a kódban? Mielőtt a Task.Factory.ContinueWhenAll() meghívódik (03 sor), kiértékelődnek a paraméterei. Ehhez létrejön az első paraméternek átadott tömb (05), és elkezdődik a tömb inicializálása. Ehhez egymás után meghívódik a három Task.Factory.StartNew() függvény (05-07). Ez egy TaskFactory osztálypéldány StartNew() függvénye, ami több lépésben próbálja meghatározni, hogy a létrehozandó Task példányhoz milyen TaskScheduler példányt rendeljen.

A TaskFactory példányosításakor meg lehet adni egy TaskScheduler példányt, ekkor a Factory minden létrehozandó Task-hoz ezt rendeli. A Task osztály statikus konstruktora, amelyik a Task.Factory property számára létrehozza a TaskFactory-t, nem ad meg TaskScheduler-t, ezért a példakód hívásánál a TaskFactory egy második lépés is végrehajt a TaskScheduler meghatározásához. Megvizsgálja, hogy az éppen futó szál nem-e egy Task-ot hajt végre éppen. Ha igen, elkéri ennek a Task-nak a Scheduler példányát, és ezt rendeli az új Task-hoz is.

Ha nincs Task végrehajtás alatt, ami a mi példánkban most a helyzet (hiszen a GUI szálon egy eseménykezelőben vagyunk), akkor a TaskScheduler.Default értéket veszi a TaskFactory, ami pedig egy, ThreadPoolTaskScheduler példány lesz. A három létrehozott Task példány az 05-07 sorban, amelyek a Calculate függvényt fogják meghívni, a ThreadPoolTaskScheduler-en keresztül lesznek végrehajtva, így a Thread Pool szálaira kerülnek.

Elindult tehát a három taszk, és az őket reprezentáló Task példányokat most már át lehet adni a Task.Factory.ContinueWhenAll() hívásnak. Hogy pontosan mi történik egy ilyen híváson belül, meg fogjuk nézni a következő, Task-okkal foglalkozó cikkben, itt most elég annyi, hogy a hívás elrendezi, hogy amikor lefutott mind a három taszk, a második paraméterben (09-20) megadott delegate-et magába foglaló Task el fog indulni. Hogy melyik szál futtatja ezt a Task-ot, azt a függvényhívás utolsó paramétere határozza meg (24 sor), ez a TaskScheduler lesz ugyanis a létrejövő Task-hoz hozzárendelve. Ezt a paramétert el lehetne hagyni, ekkor az előbb ismertetett folyamat menne végbe a TaskScheduler felderítésére. Most azonban a paraméterben átadott Scheduler lesz a létrejövő taszk Scheduler-e.

A 24-ik sorban a TaskScheduler.FromCurrentSynchronizationContext() hívás létrehoz egy SynchronizationContextTaskScheduler() példányt, ami pedig egyszerűen a SynchronizationContext.Current értékét tárolja el, és használja később. Ezzel az értékkel már találkoztunk egy korábbi cikkben. A mintakód egy WPF-es alkalmazásból származik, így a SynchronizationContext.Current értéke egy DispatcherSynchronizationContext példány lesz. Ennek a DispatcherSynchronizationContext példánynak a Post() függvénye egy delegate-et képes lefuttatni a GUI szálon, és a SynchronizationContextTaskScheduler pont ezt használja ki.

Amikor a példakód három taszkja lefutott, az utoljára lefutott taszk a 24-ik sorban átadott TaskScheduler-en keresztül átadja feldolgozásra a 09-20 sorban megadott delegate-t becsomagoló taszkot. Mivel egy SynchronizationContextTaskScheduler-ről van szó, ez a QueueTask() hívásában a korábban eltárolt DispatcherSynchronizationContext.Post()-ot fogja használni a Task végrehajtására, ez pedig egy korábbi cikkben felvázolt módon, a Message Queue segítségével hajtatja végre a taszkot a GUI szálon. Mivel a 09-20 sorban leírt delegate a GUI szálon fog futni, az 19-ik sorban levő TextBlock.Text property kezelése nem fog problémákba ütközni, ugyanakkor a három Calculate() függvényt futtató taszk a Thread Pool-on fog párhuzamosan futni.

Ha a folyamat bonyolultnak tűnt, az alábbi ábra segíthet a megértésben. Első lépésben elindul az üzenetkezelő függvény, amelyet a GUI szál message loop-ja hív meg (1). Az üzenetkezelő függvényben elsőként a Task.Factory.StartNew() hívások futnak le (az ábrán csak egy hívás szerepel) (2). A StartNew() hívás létrehoz egy Task példányt, és megfelelően inicializálja. Az inicializáció lépéseként eltárolja a megfelelő delegate-et, ami a taszk által végrehajtandó kódra mutat, illetve a fentebb említett szabályok alapján hozzárendel egy TaskScheduler példányt, ebben az esetben egy ThreadPoolTaskSchedulert. (3)

Amikor a Task példány létrejött, a TaskFactory StartNew() függvénye meghívja a Task.ScheduleAndStart() függvényét (4). A bevezetett absztrakciós szintek miatt a Task nem maga gondoskodik a futtatásáról, hanem rábízza ezt a hozzárendelt TaskScheduler-re, így meghívja annak a QueueTask() függvényét, ami ebben az esetben a Task-ot a Thread Pool feladatsorába helyezi (6). Ezután a vezérlés visszakerül az üzenetkezelő függvénybe (7), ami most végrehajtja a ContinueWhenAll() hívást (8). A ContinueWhenAll() hivás létrehoz egy Task példányt a megadott delegate-tel, és a Task példányhoz egy SynchronizationContextTaskScheduler-t rendel, mivel az a függvény bemenő paramétere (9). Miután megvan az új taszk, a ContinueWhenAll() függvény úgy konfigurálja a taszkokat, hogy az új taszk elinduljon a többi lefutása után. Ez a következő cikkben jobban részletezendő Task Continuations mechanizmusával éri el (10). A GUI szál ezekután visszatér a Message Loop-ba (az ábrán ez nincs jelölve)

Eközben a Thread Pool észreveszi, hogy van egy feladat a feladatsorában és indít neki egy szálat (11). A .NET 4 Thread Pool számára a feladatok egy IThreadPoolItem interfészt valósítanak meg, aminek a Thread Pool meghívja az ExecuteWorkItem függvényét (12). Ez a hívás első lépésben lefuttatja az eltárolt delegate által mutatott kódot (13), majd ha ez végetért, végignézi a Task Continuations listáját. Mivel ez a lista most nem üres, a benne található Task-nak meghívja a ScheduleAndStart() függvényét (14).

Mint korábban, a Task most is az eltárolt Scheduler segítségével indítja el magát, ami ebben az esetben egy SynchronizationContextTaskScheduler példány. Ez a példány a QueueTask hívás hatására, a belül eltárolt SynchronizationContext példány Post metódusát használja (az ábrán nem lészretezett, de egy korábbi cikkben leírt módon) (15). Ekkor a Message Loop számára egy üzenet érkezik, amelynek hatására az néhány trükk után meghívja a Task példány ExecuteEntry függvényét (16), amely közvetlenül végrehajtja az eltárolt delegate által mutatott függvényt (17).

A GUI szálon futó Task példányoknál oda kell figyelni, ha azok új taszkokat indítanak. Ha a fenti kód nem egy üzenetkezelőből, hanem egy GUI szálon futó taszkból indulna, teljesen más eredményt kapnánk. Nézzük a következő kódot:

 
Task.Factory.StartNew( 
  () => 
  {
    Task.Factory.ContinueWhenAll(

      new[] { Task.Factory.StartNew(() => Calculate(1)),
              Task.Factory.StartNew(() => Calculate(2)),
              Task.Factory.StartNew(() => Calculate(3))},

      tasks =>
      {
        StringBuilder result = new StringBuilder();

        foreach (var t in tasks)
        {
           result.Append(t.Result.ToString());
           result.Append(" ");
        } // foreach

        this.TextBlock.Text = result.ToString();
      });
  },
  CancellationToken.None,
  TaskCreationOptions.None,
  TaskScheduler.FromCurrentSynchronizationContext());

Ebben a példában az előző példa egy Task-ba lett becsomagolva, ami a 25. sorban megadott paraméter miatt egy SynchronizationContextTaskScheduler példányon keresztül kerül feldolgozásra, és a GUI szálon fog elindulni. Mikor elindult, elkezdődik az a folyamat, ami korábban ismertetve lett. A Task.Factory.ContinueWhenAll() híváshoz kiértékelődnek a paraméterek, és eközben létrejön és elindul a három, Calculate() hívást tartalmazó Taszk. A TaskFactory.StartNew() kódja most az előző példával ellentétben észre fogja venni, hogy egy taszk végrehajtása van folyamatban, és így veszi a taszkhoz tartozó Scheduler-t, ami ebben az esetben egy SynchronizationContextTaskScheduler. Emiatt mind a három taszk a GUI szálra lesz irányítva, és ott csak egymás után tudnak lefutni. A másik különbség az előző kódhoz képest, hogy a ContinueWhenAll() hívásnak most nincs átadva TaskScheduler. Ez nem szükséges, mivel a három taszk mindegyike a GUI szálon fut, emiatt ott fogják elindítani az 10-21 sorban megadott delegate-et becsomagoló taszkot is, és ekkor a 20-ik sorban a kontrol manipulálása rendben megtörténhet.

Ha egy SynchronizationContextTaskScheduler által indított taszk kódjából akarunk új taszkokat indítani, amelyek nem érintik a gui-t, külön meg kell adni a TaskScheduler-t. A következő példa GUI szálon futó taszkból indít új taszkokat, és az új taszkok mégis a Thread Pool-on fognak lefutni, mivel a paraméterként átadott TaskScheduler.Default érték egy ThreadPoolTaskScheduler példányt takar:

 
Task.Factory.StartNew( 
    () => 
    {
        Task.Factory.ContinueWhenAll(

            new [] { Task.Factory.StartNew(
                        () => Calculate(1), 
                        CancellationToken.None, 
                        TaskCreationOptions.None, 
                        TaskScheduler.Default),

                    Task.Factory.StartNew(
                        () => Calculate(2), 
                        CancellationToken.None, 
                        TaskCreationOptions.None, 
                        TaskScheduler.Default),

                    Task.Factory.StartNew(
                        () => Calculate(1), 
                        CancellationToken.None, 
                        TaskCreationOptions.None, 
                        TaskScheduler.Default)},

            tasks =>
            {
                StringBuilder result = new StringBuilder();

                foreach (Task<int> t in tasks)
                {
                    result.Append(t.Result.ToString());
                    result.Append(" ");
                } // foreach

                this.TextBlock.Text = result.ToString();
            },
                                
            CancellationToken.None,
            TaskContinuationOptions.None,
            TaskScheduler.FromCurrentSynchronizationContext());
    },
    CancellationToken.None,
    TaskCreationOptions.None,
    TaskScheduler.FromCurrentSynchronizationContext());

Custom Scheduler-ek

Az előzőekben megismerkedtünk a .NET-tel szállított két Scheduler típussal. De szükség lehet-e más Scheduler alkalmazására? Ritkán ez előfordulhat. Milyen esetekben? Ahelyett, hogy kitalálnék példákat, könnyebb ötleteket kölcsönözni a pfxteam által közreadott Parallel Programming Samples által megvalósított Task Schedulerek alapján. (A Schedulerek a letöltött zip \ParallelExtensionsExtras\TaskSchedulers\ könytárban vannak)

Egyik könnyen elfogadható példa, hogy néha felmerül annak az igénye, hogy a Task-okhoz prioritások vannak rendelve. Ez a funkcionalitás a jelenlegi Scheduler-ekkel nem érhető el.

Egy másik, első ránézésre megkérdőjelezhető jogosultságú Scheduler a ParallelExtensionExtras-ban a LimitedConcurrencyLevelTaskScheduler. Ez annyit tud, hogy megadható, egyszerre maximum hány Thread Pool szál dolgozzon párhuzamosan a Task-okon. Ez elvileg a ThreadPoolTaskScheduler segítségével is megoldható, mivel a Thread Pool számára beállítható, hogy maximum hány szálat indítson. Van azonban egy nagyon lényeges dolog, amit itt észre kell venni. Ha átállítjuk a Thread Pool-on a maximális szálak számát, akkor az a programunk egészére kihat. Ennek súlyos következményei lehetnek, elég az előző cikk azon példájára gondolni, amelyik a kevés szál miatt beragadt. Ha mi figyelünk is rá, valami régebbi library esetleg még mindig leállhat a szálak korlátozása miatt. Emiatt biztonságosabb csak a Task-jaink számára rendelkezésre álló szálak számát korlátozni, és erre jó lehet a LimitedConcurrencyLevelTaskScheduler. De miért akarnánk korlátozni a szálak számát?

Ennek akkor lehet értelme, ha egy masszív párhuzamos algoritmust szeretnénk taszkok segítségével futtatni valamiféle háttérmunkaként. A probléma itt az, hogy a gyári Scheduler-ek közül csak a ThreadPoolTaskScheduler használható rendes párhuzamos munkához, ami pedig a taszkokat szimplán ráengedi a Thread Pool-ra. Ekkor, ha a feladatunkat ügyesen osztottuk taszkokra, az összes Thread Pool szál elkezd rajta dolgozni. A Thread Pool szálak egy lokális feladatsorral rendelkeznek, ahova a szálon képződött új taszkok kerülnek. A szál elsősorban ezeket fogja feldolgozni, és ki sem tekintenek a globális feladatsorra. Ha most a háttérmunka mellé a feladatsorba küldünk egy új feladatot, azt a meglévő szálak nem fogják felvenni.

A Thread Pool van annyira intelligens, hogy ha észreveszi, hogy a globális feladatsorból nem fogynak a taszkok, akkor új szálat állít munkába. Ez a gyakorlatban azt jelenti, hogy kb fél másodperc várakozás után lesz egy szál, ami kiszedi a globális feladatsorból a taszkot. Mi történik, miután ezt elvégezte? Ha a globális sorban nincs feladat, akkor lop egyet az egyik szál lokális feladatsorából (work stealing), és azon kezd el dolgozni. Ez a feladat valószínű a háttérmunkánk egy taszkja lesz, ami a massziv párhuzamos algoritmus miatt folyamatosan újakat termel. Most ha új feladat kerül a globális sorba, megint nem lesz, ami feldolgozza azt. Ezt a Thread Pool újra észreveszi, és új szálat állít munkába, ami ha elvégzi a munkát, mi fog történni? Az is lop egy feladatot, és ez a szál is a háttérmunkának szánt feladatunkon kezd el dolgozni, többet ki sem tekintve a globális feladatsorra. A szálak száma egyébként nem fog elszaladni, mivel a Thread Pool egy hill-climbing-nek nevezett eljárás miatt próbálgatja, hogy hány szál működtetése az optimális, és emiatt néha leállít szálakat a Thread Pool-ban. Az azonban látszik, hogy a háttérmunkának szánt párhuzamos feladatunk megzavarja a Thread Pool logikáját.

A ParallelExtensionsExtras LimitedConcurrencyLevelTaskScheduler-je megoldhatja azt a problémát, hogy az új Thread Pool szálak ne ugorjanak rá mindig a háttérmunkánkra.

Egy saját Scheduler implementáció

A fenti példa alapján az látszik, hogy a LimitedConcurrencyLevelTaskScheduler-nél talán még hasznosabb lenne egy olyan Scheduler, ami nem is a Thread Pool-t használja, hanem saját dedikált szálakat. Ilyen is akad a ParallelExtensionsExtras csomagban WorkStealingTaskSchedules néven, de hogy látható legyen, mennyire nem nagy dolog saját Scheduler-t írni, most összeállítunk egy egyszerű implementációt.

A saját Scheduler paraméterben megadható számú dedikált szálat fog használni. Az ütemezendő taszkok egy feladatsorba fognak kerülni, ahonnan a dedikált szálak ezeket felvehetik. Egy kis ötletlopást alkalmazva azonban a szálak maguk is fognak lokális feladatsort használni, ahova azok a taszkok kerülnek majd, amelyeket az éppen végrehajtott taszk hoz létre. Miért fontos ez? Az előző cikkben szó volt a lokalitási elvről, ott megtudható az indok.

A lokális feladatsornak azonban van egy hátulütője: előfordulhat, hogy abban nagyon sok feladat feltorlódik, míg más szálak kifogynak a munkából. A .NET Thread Pool emiatt vezette be a work stealing módszert. Mi ezt most nem fogjuk implementálni, mert kicsivel bonyolultabb adatszerkezeteket igényel. A mi primitív módszerünk az lesz, hogy ha túl sok feladat van a lokális feladatsorban, akkor onnan egyet átrakunk a globális sorba.

Az eddigiek alapján az osztály így néz ki:

 
public class ExampleScheduler : TaskScheduler
{
    /// <summary>
    /// A lokális feladatsorba tehető feladatok maximális száma. Ha ennél több
    /// feladat keletkezik adott szálon, akkor feladatokat kell áttenni a globális
    /// sorba, ahonnan azokat más szálak is fel tudják venni.
    /// </summary>
    private const int LocalQueueLimit = 8;

    /// <summary>
    /// A globális feladatsor. Mivel több szál éri el, a .NET speciális adatszerkezetét
    /// használjuk.
    /// </summary>
    private ConcurrentQueue<Task> globalTaskQueue;

    /// <summary>
    /// A lokális feladatsor. A ThreadStatic attributum miatt minden szál egy saját példányt
    /// fog látni. Ez egyben azt is jelenti, hogy a statikus konstruktor ezt a példányt nem
    /// tudja inicializálni (mivel az csak egyszer fut le). Azért Stack adatszerkezetre esett
    /// a választás, mert a lokalitási elv miatt a legutoljára berakott taszkot érdemes
    /// következőnek feldolgozni. A Stack adatszerkezet hátránya, hogy amikor feladatokat kell
    /// átrakni a globális feladatsorba, akkor a Stack aljárol lenne optimális kivenni feladatokat,
    /// ezt azonban a .NET-es implementáció közvetlen nem engedi (közvetve meg túl erőforrásigényes)
    /// </summary>
    [ThreadStatic]
    private static Stack<Task> localTaskQueue;

    /// <summary>
    /// A szemafor a globális feladatsorba kerülő elemek számlálására és kiosztására szolgál.
    /// A SemaphoreSlim egy .NET-es implementáció, nem windows kernel objektumára épül, emiatt
    /// sokkal gyorsabb működést tesz lehetővé - azon az áron, hogy processzek között nem működik,
    /// ami most nem érdekes.
    /// </summary>
    private SemaphoreSlim taskAvailable;

    /// <summary>
    /// Azoknak a szálaknak az azonosítója, amelyek a pool-unkban dolgoznak. Amiatt kell, hogy
    /// ellenőrizni lehessen, az adott hívás "kívülről" vagy egy taszk végrehajtása közben
    /// történik. Ezen információ alapján lehet eldönteni, hogy globális vagy lokális feladatsort
    /// kell használni.
    /// </summary>
    private HashSet<int> workerThreadIds;

    /// <summary>
    /// A Scheduler osztály inicializálása. Ez a függvény indítja el a saját pool-hoz tartozó
    /// szálakat, a maxConcurencyLevel paraméternek megfelelően.
    /// </summary>
    /// <param name="maxConcurencyLevel">A poolban lévő szálak száma.</param>
    public ExampleScheduler(int maxConcurencyLevel)
    {
        this.globalTaskQueue = new ConcurrentQueue<Task>();
        this.taskAvailable = new SemaphoreSlim(0);
        this.workerThreadIds = new HashSet<int>();

        // Ezen a ponton indulnak a pool szálai. A szálak logikája a PoolWorker
        // függvényben vannak implementálva.
        for (int i = 0; i < maxConcurencyLevel; i++)
        {
            Thread workerThread = new Thread(PoolWorker);

            // A szál Id-ját el kell tárolni a későbbi azonosítás céljából.
            this.workerThreadIds.Add(workerThread.ManagedThreadId);
            workerThread.Start();
        } // for i        
    } // ExampleScheduler() 

    private void PoolWorker() {...}

    protected override void QueueTask(Task task) {...}

    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) {...}

    protected override bool TryDequeue(Task task) {...}

    protected override IEnumerable<Task> GetScheduledTasks() {...}

} // class ExampleScheduler 

A mezők feletti kommentekből kiderül, hogy melyik mire lesz használatos. Ami hátra van, az annak a néhány virtuális függvénynek az implementálása, amire a .NET környezet támaszkodik, illetve a saját szálaink logikáját megvalósító PoolWorker() függvény. Nézzük először a szál logikát:

 
private void PoolWorker()
{
    // Minden szál maga inicializálja a Thread Statikus változóját.
    localTaskQueue = new Stack<Task>();

    // Normális implementációnál meg kellene oldani, hogy a szálak ki tudjanak lépni,
    // most a végtelenciklus is megteszi.
    while (true)
    {
        // A szemafor pontosan egy szálat fog feléleszteni, amint egy taszk megjelenik a
        // feladatsoron.
        this.taskAvailable.Wait();
        Task taskToBeExecuted;

        // Ezen a ponton biztosan van feladat a globális feladatsoron, a TryDeqeue az
        // adatszerkezet jellegzetessége miatt ilyen felépítésű. Elvileg nincs esély a
        // szemafor miatt, hogy false-szal térjen vissza, de azért nem árt ellenőrizni.
        if (globalTaskQueue.TryDequeue(out taskToBeExecuted))
        {
            // A következő hívás egy két ellenőrzés után meghívja a Task példány 
            // ExecuteEntry() metódusát, ami a Task-ban levő delegate és egyéb utómunkák
            // végrehajtását eredményezi.
            base.TryExecuteTask(taskToBeExecuted);

            // A fenti hívás hatására új Taszkok jelenhettek meg a lokális sorban, ha a
            // a taszk al-taszkokat hozott létre, és azok örökölték a jelenlegi ExampleScheduler
            // példányt. Ekkor ezen a szálon egy QueueTask() hívódott meg, ami a lokális
            // sorba adagolta a taszkokat.
            while (localTaskQueue.Count > 0)
            {
                // Ez a hívás végrehajtja a taszkot, ami megint új taszkkal bővítheti a lokális
                // sort. Ez az ciklus így többszázezerszer-milliószor is lefuthat bizonyos
                // algoritmusoknál.
                base.TryExecuteTask(localTaskQueue.Pop());
            } // while
        } // if
    } // while
} // PoolWorker()

A Scheduler-ek legfontosabb függvénye a QueueTask, ennek hatására várja a programkörnyezet, hogy a taszk elindul a végrehajtási soron, a Scheduler vagy az alatta levő mechanizmus logikájának megfelelően. A leggyakrabban használt Task.Factory.StartNew() függvény is végső soron erre a függvényre fut rá:

 
/// <summary>
/// Annak az ellenőrzése, hogy az aktuális szál a pool-unk egy szála, vagy valami más.
/// Ez amiatt fontos, hogy csak a pool szálakon használjuk a lokális feladatsort. Gyakorlatban
/// minden más szál is láthatna egy lokális sor példányát, csak ahhoz a saját logikája alapján
/// soha nem nyúlna (mivel nem is tud róla).
/// </summary>
/// <returns>Igaz, ha az aktuális szál a pool egy szála.</returns>
private bool IsRunningOnWorkerThread()
{
    // A konstruktor egy HashSet-ben eltárolta az összes szál Id-ját, ez az adatszerkezet
    // O(1) időben ellenőrzi, hogy a jelenlegi szál benne van-e. Mivel tipikusan csak
    // pár szálról van szó, lehet, hogy egy tömb egy ciklussal gyorsabb lenne, én nem mértem ki.
    return this.workerThreadIds.Contains(Thread.CurrentThread.ManagedThreadId);
} // IsRunningOnWorkerThread()

/// <summary>
/// A Taszkot környező osztályok illetve a Task osztály maga közvetve vagy közvetlenül
/// ebbe a függvénybe fut bele, amikor egy Task példányt fel kell dolgozni.
/// </summary>
/// <param name="task">A feldolgozandó Task</param>
protected override void QueueTask(Task task)
{
    // Attól függően, hogy saját szálunkból jutottunk ide vissza (ami nyilván egy taszk
    // feldolgozása közben történhet csak meg) a lokális vagy a globális feladatsorba
    // kerül a taszk.
    if (IsRunningOnWorkerThread())            
    {
        // Ha saját szálban vagyunk, a lokális feladatsor a cél, azt azonban nem szabad
        // túltölteni, mert akkor a szálak (és így a processzor magok) egyenetlenül lesznek
        // terhelve. Kulturált megoldás helyett túltöltés esetén egyszerűen átrakjuk a
        // feladatsor tetején levő elemet a globális sorba. Az optimális megoldás az lenne,
        // ha a legrégebben betett elemet vehetnénk ki, ezt azonban a Stack közvetlenül nem
        // támogatja. Így kénytelenek vagyunk egy "hot" elemet kiszedni.
        if (localTaskQueue.Count >= LocalQueueLimit)
        {
            // új feladat a globális sorban, a szemafort is értesíteni kell, hogy feléleszthesse
            // az esetlegesen várakozó szálakat.
            this.globalTaskQueue.Enqueue(localTaskQueue.Pop());                    
            this.taskAvailable.Release();
        } // if

        localTaskQueue.Push(task);
    } 
    else
    {
        this.globalTaskQueue.Enqueue(task);
        this.taskAvailable.Release();
    } // else
} // QueueTask()

A következő függvények speciálsi helyzetben használhatóak:

 
/// <summary>
/// Megkísérli adott taszkot végrehajtani az éppen futó szálon. Ez a függvény jellemzően akkor
/// hívódik meg, amikor egy taszknak másik taszkokra kell várakoznia. Várakozás előtt a taszk
/// megpróbálja adott szálon maga végrehajtatni a többi taszkot, ennek a függvénynek a segítségével.
/// Ezt a módszert "inlining"-nak hívják. Az inlining egyik feltétele, hogy ha a taszk már a 
/// feladatsorban van, akkor ki kell onnan emelni. Ez rendes adatszerkezetek esetén könnyebben
/// elérhető, az általunk használt Stack és Queue esetén azonban csak nagyon limitált lehetőségek
/// vannak.
/// Az inlining meghiúsulása nem jelent végzetes problémát, csak a függő szál várakozni lesz
/// kénytelen, amíg a hagyományos mechanizmusok fel nem dolgozzák a várakozott taszkot.
/// Az implementáció próbálkozhatna a globális feladatsor ellenőrzésével, azonban a kicsi
/// esélyek, és a szinkronizációs bonyodalmak miatt ez itt most nem szerepel.
/// </summary>
/// <param name="task">A végrehajtani kívánt taszk.</param>
/// <param name="taskWasPreviouslyQueued">A taszk már valamelyik feladatsorban van.</param>
/// <returns>Igaz, ha az inlining sikeres volt. </returns>
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
    // Ha a taszk már a feladatsorban van, ki kell onnan szedni. Ha ez sikertelen, az
    // inline sikertelen lesz.
    if (taskWasPreviouslyQueued && !this.TryDequeue(task))
    {
        return false;
    } // if

    // Ha már nincs a feladatsorokban a taszk, végre lehet hajtani inline.
    return base.TryExecuteTask(task);
} // TryExecuteTaskInline()

/// <summary>
/// A taszk kiemelése a feladatsorokból. Az inlining és a task cancellation esetében
/// hívott függvény. A mi esetünken kicsi esély van a taszk kiemelésére.
/// </summary>
/// <param name="task">A feladatsorból kiemelendő taszk.</param>
/// <returns>Igaz, ha a kiemelés sikeres volt.</returns>
protected override bool TryDequeue(Task task)
{
    // akkor lehetséges a kiemelés, ha a lokális feladatsor tetején van a taszk.
    if (localTaskQueue.Count > 0 && localTaskQueue.Peek().Equals(task))
    {
        localTaskQueue.Pop();
        return true;
    }
    else
    {
        return false;
    } // else
} // TryDequeue()

/// <summary>
/// Ezt a függvényt a visual studio és egyéb toolok használhatják a nyomkövetés és
/// diagnosztikák elkészítéséhez.
/// </summary>
/// <returns></returns>
protected override IEnumerable<Task> GetScheduledTasks()
{
    throw new NotSupportedException();
} // GetScheduledTasks()

TaskScheduler és az Exception-ok

A TaskScheduler osztály a UnobservedTaskException event-en keresztül részt vehet a taszkokon keletkező kivételek kezelésében. A Taszkok és a kivételek együtt olyan összetett téma, hogy önmagában saját cikket érdemel. Emiatt itt ez most nem lesz kifejtve.

Összefoglalás

A Task osztály használata elképzelhető a TaskScheduler-ek ismerete nélkül is. Speciális helyzetekben azonban sok kellemetlenségtől kímélhet meg, ha tisztában vagyunk azzal a folyamattal, ahogy a Task-ok feldolgozásra kerülnek, és azokkal a lehetőségekkel, amelyekkel a feldolgozást befolyásolni lehet.

  1. #1 by senki on 2011. April 13. - 21:07

    Gratula. Nagyon jók a cikkjeid, annyira jók, hogy akár könyvet is írhatnál:).

  2. #2 by Tóth Viktor on 2011. April 14. - 07:57

    Köszönöm! Gondolkodtam a könyvön (e-könyvön ingyért), egyrészt mert már most egész sok oldal terjedelemnél jár a multithreading sorozat, pedig kb csak a felénél-harmadánál járok, másrészt pedig másodszorra kicsit másképpen építeném fel a tartalmat (van sok részlet, ami felesleges, illetve pár helyen több gyakorlati példát lehetne mutatni). Majd meglátom mire lesz időm-kedvem.

  1. Aszinkron programozás – Áttekintés – Újdonságok a C# 5-ben « DotNetForAll
  2. Aszinkron programozás – Áttekintés – Újdonságok a C# 5-ben | .NET apps

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: