Task-based Asynchronous Pattern – Strukturált Paralelizmus

A Task Schedulerekkel foglalkozó cikkben már említésre került, hogy a Task-ok képesek egymás közötti függőségeket kifejezni. Most áttekintjük ezeket a függőségeket, és megismerjük az implementációs részleteit is. Miért fontos ismerni az implementációs részleteket? Azért, mert ezek ismerete nélkül könnyű olyan programot írni, ami közel sem használja ki a többmagos processzorok erejét.

Task állapotok

Mielőtt rátérünk a taszk példányok között felírható kapcsolatokra, meg kell ismerni a taszkok egy másik jellemzőjét, mégpedig azt, hogy milyen állapotokon mehetnek keresztül. Egy csupasz delegate-tel ellentétben, a Task példány csak egyszer hajtható végre. Túl bonyolult esetek fordulhatnának elő, ha egy taszk többször végrehajtható lenne. Készíthető olyan taszk, aminek azután kell lefutni, miután egy előző végzett. Ezeket continuation task-oknak hívjuk. Mi van, ha ez continuation task megpróbálja futtatni újra az előzmény taszkját? Ekkor a végrehajtási láncban egy kör keletkezne, aminek segítségével átláthatatlan végrehajtási gráfok készíthetőek. Vagy mi van akkor, ha egy taszknak a futása még véget sem ért, de elindítjuk újra? Ekkor egy taszk példány két (esetleg párhuzamos) folyamatot képvisel? Látszik, hogy ez koncepcionálisan sincs rendben.

A taszkok tehát egyszer futtathatóak, és hogy eldönthető legyen, ez megtörtént-e már vagy nem, egy erre vonatkozó állapottal bírnak.

Végállapotok

A taszk azonban többet is elárul arról, hogy lefutott-e már, vagy még nem. A taszk lefutása ugyanis több módon végeződhet. Egy előző cikkben foglalkoztunk már azzal, hogy a taszkok futása visszavonható (Cancellation). Ha ez nem történik meg, még akkor is előfordulhat, hogy a taszk futása egy hiba miatt áll le, ami egy le nem kezelt exception miatt történhet meg. Ha a taszkot nem vonták vissza, és nem történt futás közben hiba, akkor a futását rendben befejezheti. Egy taszk végállapota ezek alapján tehát három féle lehet: “RanToCompletion”, “Canceled” vagy “Faulted”.

Köztes állapotok

Amikor számos taszkot létrehozunk, és kifejezzük az iránti szándékunkat, hogy azokat futtatni akarjuk (például a Start() metódus hívásával), könnyű elfogadnunk, hogy a taszkok futása nem fog azonnal elindulni. Ha például a taszkok végrehajtásával a Thread Pool foglalkozik, az csak annyi taszkot fog elindítani, ahány szabad szál van a pool-ban. Emiatt a Task példány lehetőséget ad arra, hogy megtudjuk, a taszk futtatása elkezdődött, vagy még csak várakozik.

Ha egy taszk esetében kifejeztük, hogy azt futtatni szeretnénk (ismét, például a Start() hívásával), akkor a taszk állapota WaitingToRun-ra vált. Egészen pontosan akkor vált egy taszk állapota WaitingToRun-ra, ha azt egy taszk scheduler már egy várakozási sorba helyezte. A Start() hívás hatására ugyanis az történik, hogy a taszk átadása kerül egy schedulernek (vagy Start() paraméterében átadottnak, vagy default-nak), ami azt egy várakozási sorba helyezi. Innen már a scheduler alatt működő mechanizmustól függ, hogy az mikor kezdi el a taszk futtatását. ThreadPoolTaskScheduler-t használva például a taszk állapota akkor lesz WaitingToRun, ha az a Thread Pool feladatsorában van.

Amikor a taszk példány kikerül a feladatsorból, és elkezdődik a taszk által hordozott delegate futtatása, akkor a taszk állapota Running-ra vált.

Azt gondolhatnánk, hogy a Running állapotból a taszk valamelyik végállapotba ugrik, és ez az esetek egy részében így is van. Más esetekben ez nem következik be olyan egyértelműen. A taszkok esetében lehetőség van arra, hogy szülő/gyerek kapcsolatokat fejezzünk ki. Egy szülő taszknak több gyerek taszkja lehet, és ha egy taszknak gyerek taszkjai vannak (speciális gyerek taszkjai, lásd később), akkor nem ugrik végállapotba, amíg a gyerek taszkok végállapotba nem kerülnek. Ennek az okára még visszatérünk, egyelőre annyit kell tudnunk, hogy ha egy szülőtaszk a saját feladatát már befejezte, de a gyerektaszkok közül még valamelyik nem, akkor az állapota WaitingForChildrenToComplete. Attól függően ugrik majd valamelyik végállapotba, hogy a gyerekek végállapota milyen lett.

Kezdő állapotok

Megtudtuk, hogy egy taszk akkor kerül WaitingToRun állapotba, ha a TaskScheduler már a feladatsorba helyezte a taszkot. A scheduler viszont nem foglalkozik egy taszkkal, csak ha jelzik neki, hogy ezt megteheti. Erre szolgál például a Task.Start() függvény. Ebből adódik, hogy kell lennie még egy, az eddig tárgyaltakat megelőző állapotnak is. Ennek az állapotnak a neve nemes egyszerűséggel Created.

Nem minden taszk jön létre azonban Created állapotban, csak amelyeket a programozó közvetlenül hoz létre, és ő határozza meg, mikor induljanak el, pontosabban mikor kerüljenek feladatsorba.

Vannak olyan taszkok, amelyek feladatsorba helyezéséről a keretrendszer gondoskodik (természetesen egy scheduler példányt használva). Amikor egy taszkoz egy a későbbiekben tárgyalandó continuation taszkot készíttettünk, akkor a hatékonyabb infrastruktura miatt a Task osztály implementációja gondoskodik arról, hogy a continuation feladatok mikor kerüljenek feladatsorba. Ez több ok miatt előnyös. Egyrészt, így a TaskScheduler-ek alatt működő mechanizmusok (pl Thread Pool) nincsenek feleslegesen túltöltve a continuation taszkokkal, amiket amúgy sem lehet adott pillanatban végrehajtani. Másrészt, a continuation taszkokhoz megadható, hogy milyen feltételek esetén kell elindulniuk. Ha ezeket a feltételeket a schedulereknek / schedulerek alatt működő Thread Pool-nak kellene ellenőriznie, akkor túl sokat kellene tudnia a Task-ok működéséről. A continuation taszkok így egy speciális állapotot kapnak létrehozásukkor, ennek a neve pedig WaitingForActivation. Ez a külön állapot azért jó, mert a keretrendszer tudja, hogy az ilyen taszkokra már nem lehet meghívni a Start() függvényt (azt csak a Created-ekre engedi), ugyanakkor nyilvánvalóan nem lehet ezeknek a taszkoknak az állapota a Created utáni következő lépés (mint WaitingToRun).

Az alábbi ábra mutatja a taszkok által felvehető állapotokat, illetve azt, hogy jellemzően milyen művelet hatására mehetnek át egyik állapotból a másikba: (klikk a képre a nagyobb változatért)

Szülő/gyerek kapcsolatok

Egy taszk által futtatott kódból (delegate-ből) újabb taszkokat lehet létrehozni és elindítani. Azt már korábban megtudtuk, hogy az újonnan indított taszk örököl dolgokat a szülő taszktól. Alapesetben, ugyanaz a scheduler fogja kezelni a taszkot. Az ExecutionContext is vándorol, így az ExecutionContext-ről szóló cikkben tárgyalt lehetőségek rendelkezésre állnak.

Van azonban lehetőség ennél szorosabb kapcsolat kialakítására is. Alapesetben, ha egy szülő taszk létrehoz egy gyerek taszkot, és azt elindítja, akkor a folyamatok fire and forget módon működnek. A szülőtaszkra nincs az hatással, hogy a gyerek taszk(ok) mikor milyen állapotokat vesznek fel a későbbiekben, és hogy mikor és milyen végállapotba futnak be. Ez bizonyos esetben megfelelő működés, máskor azonban az a munka, amelyet a gyerektaszkok elvégeznek, a szülőtaszk munkájának szerves részét képezik. Ezek miatt a szülőtaszk logikailag akkor van kész, ha a gyerektaszkok lefutottak, és szolgáltatni tudják az eredményt. Elképzelhető például, hogy a szülőtaszkhoz kapcsolt continuation taszk ezekre az eredményekre épít. Ehhez a szülőtaszknak meg kell várni a gyerektaszkok lefutását, hogy ő maga végállapotba léphessen.

A Task osztály támogatja a fent leírt igényt. Mindössze annyit kell tenni, hogy a Task példány által létrehozott új Task példányok konstruktorában meg kell adni a TaskCreationOptions paraméter AttachedToParent flag-jét:

var task = new Task(() => DoSomething(), TaskCreationOptions.AttachedToParent);

A flag megadásának hatásait a következő ábra szemlélteti:

Az ábra felső részén egy taszk két újabb taszkot hoz létre, amelyek közül az egyik még egy további taszkot indít. Az ábra azt mutatja, hogy a szülőtaszk RanToCompletion állapotba lép, attól függetlenül, hogy a létrehozott taszkok futása még nem fejeződött be.

Az ábra alsó részén a szülőtaszk, miután lefuttatta az általa hordozott delegate-et, egy speciális állapotba lép, amíg a gyerektaszkok RanToCompletion (vagy egyéb) végállapotba nem lépnek. Kérdés, hogy az ábrán a zöld sávval jelölt részek mennyi erőforrásba kerülnek? Tudjuk, hogy a taszkok bevezetésének egyik célja a processzormagok minél jobb kihasználtsága, ennek pedig az a záloga, hogy minél kevesebb szál minél több munkát tudjon elvégezni. Vajon az AttachedToParent megadásával azt kockáztatjuk, hogy a szálak beakadnak a gyerektaszkokra várakozva?

Szülő/gyerek kapcsolatok belülről

Az AttachedToParent flag megadásakor két dolog történik. Az egyik, hogy a létrehozandó taszk példány megjegyzi szülőnek azt a taszkot, amelyik éppen az adott pillanatban fut, tehát aminek a kódja létrehozza az új taszkot. Honnan tudja ezt az új taszk? Hisz ha megnézzük az MSDN-en, egyik Task konstruktor sem kapja meg a szülőtaszkot. Onnan tudja, hogy amikor egy taszk fut, akkor egy statikus thread lokális változóba eltárolja saját referenciáját, mint az aktuálisan futó taszk. Az új taszk konstruktora hozzáfér ehhez a változóhoz, és ha a konstruktor AttachedToParent flag-et kap, eltárolja szülő taszknak az aktuális értékét. (Ha a taszk nem egy másik taszk kódjából jön létre, az AttachedToParent flag nincs semmilyen hatással)

Mást is csinál azonban az új taszk konstruktora, ha AttachedToParent flag-et kap. Jelzi a szülő taszknak (egy metódushívással), hogy egy új gyerektaszk jött létre hozzá. Minden taszk példány egy számlálót tart karban a gyerektaszkjairól. Amikor egy gyerektaszk létrejön, akkor ez a számláló növekszik. A szülőtaszk egyébként nem tart fent referenciát a gyerektaszkjaira, egyedül a számlálót. Amikor egy gyerektaszk futása véget ér, akkor az belehív a szülőtaszk megfelelő metódusába, és ebben az esetben a híváshoz átadja saját referenciáját is. Ekkor a szülőtaszk megvizsgálja, hogy a gyerektaszk (amelyik a hívást kezdeményezte) milyen végállapotba futott. Ha a gyerektaszk Faulted állapotban fejezte be a futását, akkor a kivételt, amelyik a Faulted állapotot okozta, a szülőtaszk eltárolja (a kivételkezeléssel egy másik cikk foglalkozik majd), és végül csökkenti a gyerekszámlálót.

Ha a számláló lefogy nullára, és a szülőtaszk már WaitingForChildrenToComplete állapotban van, akkor még meghívódnak azok a kódok, amelyek egy taszk végjátékát intézik. Ekkor indulnak el például a continuation taszkok, illetve ha a szülőtaszk is gyerektaszkja egy másik taszknak, akkor az eddig leírt folyamatok rekurzívan ismétlődnek.

Hogy hogyan szerveződnek a folyamatok, kipróbálhatjuk a következő kis programmal:

Task.Factory.StartNew(
              () =>
              {
                  ShortWork();

                  Task.Factory.StartNew(
                    () => LongWork(),
                    TaskCreationOptions.AttachedToParent);

                  ShortWork();
              }).ContinueWith(
                    (_) => ShortWork(),
                    TaskContinuationOptions.ExecuteSynchronously);

Egy taszk elvégez apró számításokat (ShortWork()), ezután az egyik számítást külön taszkban indítja. Ennek a számításnak az eredményére később szüksége lesz (ez nem látszik a kódból, de mondjuk, hogy így van), emiatt a TaskCreationOptions.AttachedToParent flag-et használja létrehozáshoz. Az egész taszk tehát akkor lesz kész, amikor a gyerektaszk a LongWork számításával végez.

A szülőtaszkhoz egy continuation taszk van csatolva, ami így akkor indulhat el, ha a szülőtaszk és a gyerek taszk is elvégezte a számításokat.

A futás folyamata a következő ábrán követhető nyomon:

Az ábra attól a ponttól mutatja a folyamatokat, hogy a szülőtaszk kódja (a példaprogram külső StartNew() hívásban megadott delegate) már megkapta a vezérlést. (1) Ez első lépésként meghívja a ShortWork függvényt. Ezután létrehoz egy új taszkot (a példaprogram StartNew-t használ, ami lényegében egy taszk létrehozása és a Start() metódus hívása). Az új taszk konstruktora, mivel az AttachedToParent flag-et megkapta, visszahív a szülőtaszkba (AddNewChild() hívás). Ekkor a szülőtaszkban növekszik az a számláló, ami a gyerektaszkok számát tartja karban. A gyerektaszk Start() metódusának hívására (2) az átadja magát egy schedulernek. A scheduler ekkor az implementációjának megfelelően valamilyen várakozási sorba helyezi a taszkot (ez a szekvencia diagrammon nincs részletezve). A gyerektaszk indítása után a szülőtaszk még egyszer meghívja a ShortWork()-öt, ezután a delegate-ünk kódja véget ér. A Task osztály implementációja azonban még megkezdi a taszk futásának lezárását (3), azonban észreveszi, hogy a ChildCount nem nulla, azaz van futó gyerektaszk. Emiatt a taszk WaitinForChildrenToComplete állapotba lép, majd visszaadja a vezérlést annak a logikának, ami őt elindította. Ez lehet például a Thread Pool egyik szálának logikája, amely így más feladat végrehajtásával is tud foglalkozni.

Eközben a scheduler elrendezte, hogy a gyerektaszkot valamilyen végrehajtó mechanizmus (mint a Thread Pool) elindítsa (4). Az ábrán az Execute() név szerepel metódushívásként, ami csak egyszerűsítés. A lényeg, hogy elindul a gyerektaszk által hordozott delegate (a példakód belső StartNew() hívásában megadott rész). Ez meghívja az egy szem LongWork() metódust. A taszk delegate-jének futtatása után itt is elindul a taszk lezárását végrehajtó kód, az ábrán Finish hívással jelölve (5). Ez egyik feladataként meghívja a szülő taszk ProcessChildCompletion() metódusát, amely csökkenti a gyerekszámlálót. Mivel ekkor a számláló eléri a nullát, a szülő taszk kihasználja (ProcessChildCompletion() metódus implementációja használja ki), hogy nála van a vezérlés, és most ebből a szálból elvégzi a taszk lezárásához szükséges munkákat. Ennek részeként most már beléphet végállapotba, majd lekezeli a Continuations taszkokat, egy FinishContinuations() hívással . A példaprogram a continuation taszkot ExecuteSynchronously flag-gel hozta létre, ami annyit jelent, hogy ha lehet, a continuation taszkot be kell inline-olni, azaz az aktuális szálon végrehajtani, nem pedig egy másik szálon aszinkron módon. Ennek megfelelően a FinishContinuations() hívás végrehajtja a continuation taszkot (6), majd a végrehajtó szál elkezdhet más feladatokkal foglalkozni (ez az ábrán már nincs jelölve)

A kiinduló kérdésre, miszerint az AttachedToParent flag azt okozza-e, hogy a szülőtaszk beakaszt egy szálat, az implementációs részletek ismeretében kijelenthetjük, hogy nem. Így ez a flag egy olcsó eszközt biztosít bizonyos szinkronizációs feladatokra.

Continuation Taszkok

A taszkok egy másik beépített képessége, hogy lefutásuk után újabb taszkokat tudnak indítani. Ez megoldható lenne kódból is, a következő módon:

Task.Factory.StartNew(
() =>
{
  SomeWork();

  Task.Factory.StartNew(
    () => Continuation1());

  Task.Factory.StartNew(
    () => Continuation2());
});

És egyszerűbb esetekben körülbelül ennyi is történik. Van azonban néhány hátránya a fenti megoldásnak. Az egyik, hogy nem túl flexibilis. Mi van, ha a continuation taszkokat pár szinttel odébb valamilyen mechanizmus határozza meg? Ekkor valahogy paraméterben kellene őket behozni a fenti kódba. Koncepció szempontjából sem biztos, hogy helyes, ha egy kód törődik azzal, hogy mennyi, vagy hogy egyáltalán vannak-e tőle függő taszkok.

Egy másik, még bonyolultabb helyzet, hogy mi van akkor, ha a fenti taszk olyan gyerektaszkokat használ (például a SomeWork() metódusból indítva), amelyek AttachedToParent flaggel lettek létrehozva? Ebben az esetben a korábban bemutatott, és a taszk példány által hatékonyan megvalósított mechanizmust nem tudnánk használni, kézzel kellene valamilyen szinkronizációs megoldást találni, amivel a gyerektaszkok bevárhatóak a Continuation1 és Continuation2 indítása előtt. Még ha olyan kiváló programozók is vagyunk, hogy hatékonyságban felvesszük a versenyt azokkal, akiknek a Microsoftnál ezeknek a mechanizmusoknak a kidolgozása a feladatuk, hogyan szerezzük meg az összes gyerektaszkot? Honnan tudjuk, hogy a SomeWork() hívás belülről milyen gyerektaszkokat használ és hogyan?

A fentiek miatt tehát célszerűbb a Task osztály beépített mechanizmusait használnunk a continuation taszkok kezelésére. A Task osztály ContinueWith() metódusa számos overloaddal bír. Lehet meghatározni CancellationToken-t, TaskScheduler-t, ezek szerepét a korábbi cikkekből már ismerhetjük. A számunkra most fontos paraméter a TaskContinuationOptions. Ebben meghatározhatjuk, hogy a continuation taszk milyen feltételek mellett és milyen módon fusson le. Korábban láttuk, hogy egy taszk három különböző állapotban fejezheti be a futását. A TaskContinuationOptions alap esetben lehetővé teszi, hogy megmondjuk, melyik állapot esetén ne hajtsa végre a continuation taszkot. Ezek a flag-ek a NotOnCanceled, NotOnFaulted és a NotOnRanToCompletion. A többi ezzek kapcsolatos flag már csak a fenti három keveréke. Az OnlyOnCanceled például a NotOnFaulted | NotOnRanToCompletion. Ezt azért érdemes megjegyezni, hogy ne csodálkozzunk miért nem működik az OnlyOnFaulted | OnlyOnCanceled. Valaki azt hihetné, hogy ez azt jelenti, hogy faulted vagy canceled esetben induljon a continuation taszk. Azonban a flag-ek felépítése miatt ez a continuation taszk egyáltalán nem fog elindulni, mert az összes NotOnXXX bit be lesz kapcsolva.

Ha egy continuation taszk nem futhat le, mert az előzmény taszk végállapota ezt nem teszi lehetővé, akkor a continuation taszk Canceled állapotot vesz fel (és az ő hozzá kapcsolt continuation taszkok közül csak az fog elindulni, amelynek a feltétele, hogy az előzmény taszk Canceled legyen)

Az alábbi példáprogram bemutatja, miről van szó:

var rnd = new Random();

// mivel véletlenszerű lesz a taszk lefutása, 20-as ciklusban
// ismétlünk, így jó eséllyel lesz mindenféle
for (int i = 0; i < 20; i++)
{
    // Erre szükség lesz a Canceled állapot eléréséhez
    var cancellationTokenSource = new CancellationTokenSource();

    // A taszkunk nem csinál semmit, csak kisorsolja a végállapotát
    var task = new Task(
        () =>
            {
                switch (rnd.Next(3))
                {
                    // RanToCompletion, nem kell csinálni semmit.
                    case 0:
                        Console.WriteLine("Előzmény: RanToCompletion");
                        break;

                    // Ha dobunk egy exceptiont, (pontosabban ha kijut egy
                    // kezeletlen exception a delegate-ünkből), akkor faulet
                    // állapot lesz.
                    case 1:
                        Console.WriteLine("Előzmény: Faulted");
                        throw new Exception();

                    // Taszk visszavonása. Elvileg kívülről szokták a tokenSource-ot
                    // használni, így az alábbi kód nem túl szép, de tesztnek jó:
                    case 2:
                        Console.WriteLine("Előzmény: Canceled");
                        cancellationTokenSource.Cancel();
                        cancellationTokenSource.Token.ThrowIfCancellationRequested();
                        break;                           // ide már nem jut el
                } // switch
            },
            // Ezt a tokent át kell adni, a taszk ellenőrzi majd, hogy az 
            // OperationCanceledException által tartalmazott token ugyanattól
            // a tokenSource-tól származik, mint az alábbi. Ha nem, Faulet lesz a
            // végállapot Canceled helyett
            cancellationTokenSource.Token);

    // A continuation taszkok, minden végállapothoz egy:
    var continuation1 = 
            task.ContinueWith(
                (_) => Console.WriteLine("Continuation : RanToCompletion"),                                   
                TaskContinuationOptions.OnlyOnRanToCompletion);

    var continuation2 = 
            task.ContinueWith(
                (_) => Console.WriteLine("Continuation : Faulted"),
                TaskContinuationOptions.OnlyOnFaulted);

    var continuation3 = 
            task.ContinueWith(
                (_) => Console.WriteLine("Continuation : Canceled"),
                TaskContinuationOptions.OnlyOnCanceled);

    task.Start();

    // hogy csökkentsük a több szálról küldött üzenetek keveredését, 
    // megvárjuk, míg lefut rendesen a continuation taszk is.
    Task.WaitAny(continuation1, continuation2, continuation3);
} // for i

A program kimenetének egy részlete pedig:

Elozmény: Canceled
Continuation : Canceled
-------------------------------------
Elozmény: RanToCompletion
Continuation : RanToCompletion
-------------------------------------
Elozmény: Faulted
Continuation : Faulted
-------------------------------------
Elozmény: Faulted
Continuation : Faulted
-------------------------------------
Elozmény: RanToCompletion
Continuation : RanToCompletion
-------------------------------------

TaskContinuationOptions.PreferFairness

A TaskContinuationOptions maradék pár flag-je a continuation taszk végrehajtásának a módját határozza meg, ezek hasonlóak a Task osztály konstruktorának átadható TaskCreationOptions-sal. A PreferFairness például azt mondja meg, hogy a taszkot olyan végrehajtási sorba kellene betenni (ha lehet), ami figyelembe veszi azt, hogy melyik taszk mikor került a végrehajtási sorba, és onnan FIFO elven kerülnek feldolgozásra. Egy ilyen végrehajtási sor a Thread Pool globális feladatsora, ezzel a flag-gel megadott continuation taszk, amennyiben egy ThreadPoolTaskScheduler az ütemező (alap esetben ez igaz), a globális feladatsorba kerülnek.

Van azonban néhány érdekes dolog ezen a területen. Tudjuk, hogy a Thread Pool szálak lokális feladatsora nem fair, azaz LIFO módon dolgozza fel a feladatokat, a korábbi cikkben tárgyalt lokalitási elv miatt. Nézzük a következő programot:

// Két magos rendszeren tesztelünk, az egyik mag lefoglalása. Sorozatosan kell
// feladatokat feldolgozni, különben a Thread Pool új szálakat indít.
Action coreDenial = null; // Szólna a fordító enélkül.
coreDenial = 
    () =>
        {
            Thread.Sleep(250);                 // egy kis "munka", 0.5 sec-nél kevesebb kell
            Task.Factory.StartNew(coreDenial); // új taszk a lokális sorba
        };

Task.Factory.StartNew(coreDenial);             // Egyik mag lefoglalása
                
// Egyszerű taszk 3 continuationnel. Mindegyik kiírja, hanyadiknak lett felvéve,
// így látjuk a végrehajtási sorrendet.
var task = new Task(                          
    () => 
    {
        Console.WriteLine("Előzmény taszk: " + Thread.CurrentThread.ManagedThreadId);
    });

task.ContinueWith(
    (_) =>
    {
        Console.WriteLine("Continuation 1: " + Thread.CurrentThread.ManagedThreadId);
    });

task.ContinueWith(
    (_) =>
    {
        Console.WriteLine("Continuation 2: " + Thread.CurrentThread.ManagedThreadId);
    });

task.ContinueWith(
    (_) =>
    {
        Console.WriteLine("Continuation 3: " + Thread.CurrentThread.ManagedThreadId);
    });

task.Start();
Console.ReadLine();

A fenti program indít egy taszkot, hogy a tesztgépen a két maghoz indított két szál közül az egyiket lefoglalja. Ez azért kell, hogy ne akarjon feladatot lopni a másik szál feladat sorából. A másik szál így elvégzi a rövid kis feladatát, majd a continuation taszkok az aktuális szál lokális feladatsorába kerülnek. Mivel ez LIFO elvű, kimenetnek azt várnánk, hogy fordított sorrendben íródnak ki a szövegek. Ehelyett a következő eredményt látjuk:

Elozmény taszk: 4
Continuation 1: 4
Continuation 2: 4
Continuation 3: 4

Tehát a continuation taszkok nem olyan sorrendben futottak, ahogyan azt vártuk. Miért? Az a helyzet, hogy a cikk ezen a pontján már műveltebbek vagyunk annál, mint ahogy a Microsoftnál erre számítottak, amikor a Task osztály ezen részét implementálták. Egy átlagos programozó – egyébként logikusan – azt várja, hogy a continuation taszkjai olyan sorrendben lesznek végrehajtva, amilyen sorrendben a ContinueWith függvények meg lettek hívva. A többszálúság miatt elvileg a sorrendre amúgy nem szabad építeni. Ennek ellenére a Microsoft láthatólag nem akarta meglepni a fejlesztőket, emiatt a Task osztály implementációja, amikor a continuation taszkokat a scheduler-nek átadja, ezt fordított sorrendben teszi meg. Azért csinálja így, mert alap esetben (tehát szinte mindig) a Thread Pool lesz a scheduler mögött, ami viszont a lokális feladatsorokon a LIFO feldolgozás miatt visszafordítja a sorrendet.

A fentiek logikusnak hangzanak, egészen addig, amíg ki nem próbáljuk a PreferFairness flaget. Írjuk át a fenti programot a következőre:

task.ContinueWith(
    (_) =>
    {
        Console.WriteLine("Continuation 1: " + Thread.CurrentThread.ManagedThreadId);
    }, TaskContinuationOptions.PreferFairness);

task.ContinueWith(
    (_) =>
    {
        Console.WriteLine("Continuation 2: " + Thread.CurrentThread.ManagedThreadId);
    }, TaskContinuationOptions.PreferFairness);

task.ContinueWith(
    (_) =>
    {
        Console.WriteLine("Continuation 3: " + Thread.CurrentThread.ManagedThreadId);
    }, TaskContinuationOptions.PreferFairness);

A PreferFairness miatt a scheduler a continuation taszkot a Thread Pool globális feladatsorába teszik (legalábbis a ThreadPoolTaskScheduler, ami az alapértelmezett scheduler). A globális feladatsor fair, emiatt a behelyezés sorrendjébe veszi ki az elemeket – a Task implementáció viszont ugyanúgy fordítva adagolja öket, mint korábban. Emiatt most a következő kimenetet kapjuk:

Elozmény taszk: 4
Continuation 3: 4
Continuation 2: 4
Continuation 1: 4

Tehát pont a fair esetben nem lesz fair a végrehajtási sorrend. A SynchronizationContextTaskScheduler ugyanúgy fordított sorrendet eredményez. A gondokat tetézi, hogy a globális feladatsorból az egyik szál majd kiveszi a feladatot, és amikor a feladat új és új alfeladatokat hoz létre, azok már a lokális sorba kerülnek, akár meg volt adva a PreferFairness akár nem – így a működés nem lesz fair. Mindez furcsa, azonban a PreferFairness nem a fenti módon használva érdekes. Ha valamiért tényleg fair végrehatási módot szeretnénk, legjobb, ha készítünk egy saját TaskFactory-t:

var taskFactory = new TaskFactory(
                        TaskCreationOptions.PreferFairness, 
                        TaskContinuationOptions.PreferFairness);

Action work = null;
work = 
    () =>
        {
            Thread.Sleep(250);
            taskFactory.StartNew(work);
        };

taskFactory.StartNew(work);

A fenti kis program, mivel az új taszkokat egy megfelelően konfigurált factory-n keresztül készíti el, azok mindig a globális feladatsorba kerülnek, és onnan fair módon lesznek feldolgozva.

TaskContinuationOptions.ExecuteSynchronously

Az ExecuteSynchronously flag esetén a continuation taszk “inline”-olva lesz, azaz nem (feltétlenül) kerül be egy feladatsorba, hanem az aktuális szálon azonnal végrehajtásra kerül, amennyiben ez lehetséges. Ez hasznos lehetőség akkor, ha a continuation taszk csak néhány utasításnyi műveletet végez, mivel ekkor a feladatsorba helyezéssel járó overhead túl nagy a hasznos művelethez képest.

TaskContinuationOptions.LongRunning

A LongRunning flag a nevének megfelelően azt jelzi, hogy a taszk hosszú lefutású lesz. Ez gondot okozhat a thread pool számára, mivel megzavarhatja azt a logikát, amivel az optimális thread számot meghatározza. A .NET 4 Thread Poolja, ha egy feladat fél másodpercnél tovább fut, akkor “lemond” arról, hogy az a szál a feladatsorban levő feladatok végrehajtásában hatékonyan részt vesz, és indít egy új szálat (amennyiben vannak várakozó feladatok). Ez egyben azt is jelenti, hogy ha vannak várakozó feladatok, és az egyik thread pool szálat leterheljük egy sokáig tartó feladattal, akkor a feladatok feldolgozása fél másodpercig akadályoztatva lesz. Ha ezt el akarjuk kerülni, meg kell adni a LongRunning flag-et, és ebben az esetben a ThreadPoolTaskScheduler például külön dedikált szálat indít a taszknak, nem pedig a Thread Pool-ra irányítja.

Continuation Taszkok belülről

A continuation taszkok belső implementációja nem különösebben érdekes, azért foglalkozunk vele, hogy lássuk, a megoldás nem erőforrás igényes. Minden Task példány rendelkezik egy listával, ami lista minden eleme egy Task-ot és a hozzá tartozó paramétereket tartalmazza (mint amit a TaskContinuationOptions-szel meg lehet adni). Ez a lista minden esetben fel lesz dolgozva, még akkor is, ha a taszkunk exception-nel áll le. Amikor egy taszk befejezte a futását, megvizsgálja, hogy vannak-e gyerek taszkjai, amelyekre várakozni kellene (ezt a korábban említett számláló alapján tudja). Ha nincsenek, akkor abból a szálból, amelyikben éppen van, elkezdi a taszkhoz tartozó schedulerrel elindíttani (vagy beinline-olni) a continuation lista elemeit. Ha van még futó gyerektaszk, akkor a listát az utoljára végző gyerektaszk fogja feldolgozni a saját szálából, ahogyan azt korábban láttuk. A feldolgozási sorrend egyébként az, hogy első körben a nem inline-olható (a ExecuteSynchronously flag-gel nem ellátott) taszkokat a végrehajtási sorba rakja, és ezek után kezdi el a maradék taszk inline-olását.

A continuation taszkok tehát nem kényszerítenek szálakat várakozásra, nem használnak szinkronizációs objektumokat (mint például event-ek). Ha a feladat, amit continuation-ként szeretnénk indítani csak pár lépésből áll, érdemes az ExecuteSynchronously flag-gel ellátni, ekkor még az olyan overhead-ek is kikerülhetőek, mint amelyek a Thread Pool használatával járnak.

A TaskFactory és a continuation taszkok

A TaskFactory osztály összetettebb lehetőségeket is kinál annál, mint amit a Task osztály natívan támogat. Lehetőség van olyan “Continuation” taszkok készítésére, amelyek indítása több taszk lefutásától függ. Egyik lehetőség, amikor több taszk mindegyikének lefutása szükséges ahhoz, hogy a continuation taszk elinduljon. A másik, amikor több taszkból egynek a lefutása elég ahhoz, hogy a continuation taszk elinduljon. Arra nincs lehetőség ezeknél a módszereknél, hogy a taszkok végállapotától függjön egy continuation taszk indítása. Hiába van tehát például a TaskFactory.ContinueWhenAll() metódusnak olyan változata, ami TaskContinuationOptions-t vár, ebbe csak az ExecuteSynchronously/LongRunning/PreferFairness flag-ek adhatóak meg. A függvények használatára nem adok példát, ez könnyen kiolvasható akár az MSDN-ről. A következőkben inkább azt nézzük meg, miért kerül viszonylag kicsi erőforrásba ezeknek a függvényeknek a használata.

A TaskFactory continuation metódusai belülről.

A TaskFactory szóban forgó függvényei azért hatékonyak, mert belső implementációjukban a Task beépített continuation mechanizmusaira épülnek. Elsőként nézzük meg a kettő közül bonyolultabb ContinueWhenAll() működését:

A ContinueWhenAll egy csökkenő számlálót használ. Kezdetben ennek a számlálónak az értéke megegyezik azoknak a taszkoknak a számával, ahány taszknak a lefutására várakozni kell. Ezt a számlálót kell csökkenteni minden egyes taszk lefutása után, és erre triviálisan adódik megoldásnak, hogy az adott taszk mindegyikére egy olyan continuation taszkot kell csatolni, amely ezt a számlálót csökkenti.

Mi történik, amikor a számláló eléri a nullát? Ez az a pillanat, amikor a ContinueWhenAll() hívásnak átadott delegate-et meg kellene hívni. Azonban a ContinueWhenAll() hívás egy taszkod ad vissza, ami a paraméterben megadott delegétet csomagolja be. A számláló lefutásakor tehát nem lehet egyszerűen meghívni a ContinueWhenAll()-nak átadott delegate-et, hanem a létrehozott taszkot kell elindítani. A közönségesen létrehozott taszkokkal azonban vagy egy kis probléma. Ezeket létrehozva a példány Created állapotba kerül, amelyet a programozó megkapva kézzel is el tudna indítani. Emiatt jönnek létre a közönséges continuation taszkok egy speciális, WaitingForActivation állapotban. Esetünkben is ez lenne a szerencsés.

Egy lehetőség tehát, hogy a taszkot, aminek el kell indulnia a ContinueWhenAll() paraméterében megadott összes taszk lefutása után, egy másik taszk példány continuation taszkjaként kell létrehozni, és ennek a másik taszk példánynak akkor kell lefutott állapotba kerülnie, amikor a korábban említett számláló nullára csökkent.

Ez viszont azt a furcsa követelményt támasztja, hogy olyan taszkra lenne szükségünk, aminek a lefutása nem egy kód végrehajtásának befejezésétől, hanem egy “külső” körülménytől függ, ebben az esetben a számláló nullázódásától. Szerencsére van erre megoldás, mivel egy általánosabb igény lehet taszkokba becsomagolni egyéb aszinkron műveleteket (például file olvasást), aminek a végén az aszinkron műveletet becsomagoló taszk állapotát RanToCompletion-ba kell állítani, illetve lejátszani azokat a műveleteket, amelyek ilyenkor lejátszódnak (pl continuation taszkok indítása). A számunkra most szükséges osztály a TaskCompletionSource, ami a következőket tudja:

Minden TaskCompletionSource példány létrehoz magában egy Task példányt, a Task egy speciális konstruktorát használva, amit csak a Task Parallel Library kódja ér el. Ez a létrehozott taszk nem tartalmaz delegate-et, amelyet futtatnia kellene, az állapota pedig WaitingForActivation, így kézzel elindítani sem lehet. A taszk állapota nem a szokásos útvonalon ugrál végig. Mivel a fejlesztő nem hívhat Start()-ot, a keretrendszer pedig nem fogja ezeket a taszkokat feladatsorba állítani, a Taszk állapota “magától” nem változik. Amit lehet tenni, az az, hogy a befoglaló TaskCompletionSource példányon keresztül közvetlenül valamelyik végállapotba billentjük a taszkot. Erre szolgálnak a TaskCompletionSource.SetResult()/SetCanceled()/SetException() függvények.

Összességében tehát, amikor a fent említett számláló lefogy nulláig, akkor egy TaskCompletionSource-on keresztül egy taszkot végállapotba lehet billenteni. Ha ehhez a taszkhoz van continuation taszkként hozzáadva az a taszk, amelyik a ContinueWhenAll() hívással jött létre, akkor az el fog indulni.

Az egész mechanizmusban az a szép, hogy nem kell szálat beállítani a szinkronizáláshoz, sőt, még nehéz súlyú szinkronizációs objektumokat sem kell használni. Az egyedüli szinkronizációs mechanizmus a számláló csökkentésére használt Interlocked.Decrement(), hogy ha két szál egyszerre próbál értéket csökkenteni, ne legyen gond. Ezt viszont a processzorok natívan támogatják, így meglehetősen gyorsak.

A fent leírtakat próbálja érzékeltetni a következő ábra:

A TaskFactory.ContinueWhenAll() hívása (1) inicializál egy számlálót (taskLeft) illetve egy TaskCompletionSource-ot. A template paraméter lényegtelen, nincs nem template-es verzió, így lesz jobbhíján bool. A TaskCompletionPort példánynak van egy Task példánya, ehhez lesz hozzáadva continuationként a ContinueWhenAll()-nak átadott delegate. A ContinueWhenAll() ezen kívül definiál egy delegate-et (action-t) (2), ami a fent említett számlálót csökkenti, illetve ha az elérte a nullát a TaskCompletionSource-on keresztül végállapotba állítja a speciális taszkot. Ez az action hozzá lesz adva az összes előzmény taszkhoz (3). Amikor azok lefutnak, a számláló emiatt csökkent, a nullát elérve a TaskCompletionSource taszkja végállapotba jut, ami miatt elindul az a Continuation taszk, ami a ContinueWhenAll() paraméterben átadott delegate-et hordozza (4).

Az igazi megvalósítás ennél picivel összetettebb, például azok az előzmény taszkok, amelyek a hívás pillanatában már végállapotban vannak, nem lesznek részei a fenti folyamatnak.

A ContinueWhenAny() implementációja a fentihez nagyon hasonló logikán alapul, azt leszámítva, hogy ott nincs szükség számlálóra. Ebben az esetben az előzmény taszkok continuations listájába tett action azonnal hívja a TaskCompletionSource.SetResult()-ját, így az első lefutáskor már indul is a ContinueWhenAny() által visszaadott taszk. (Az nem okoz problémát, hogy a több előzmény taszk többször hív SetResult-ot, mivel igazából TrySetResult() hívás van, ami egyszerűen false értéket ad vissza, ha már végállapotban van a befoglalt taszk)

Látható tehát, hogy a TaskFactory ezen műveletei szintén nem igényelnek sok erőforrást.

Task.WaitAll/WaitAny

Az utolsó csoport függvényei nem szorosan kapcsolódnak a struktúrált parallelizmushoz, de lehet őket ebből a célból is használni. Ez nem olyan szerencsés, hiszen a WaitAll/WaitAny leállíthatja a szálat, amíg a szükséges események be nem fejeződnek. Ez főleg problémás Thread Pool szálakon, mivel a Thread Pool logikája csak fél másodperc múlva indít el helyettesítő szálat – ez idő alatt a processzor képességei nem feltétlenül lesznek kihasználva 100%-ig.

A WaitAll/WaitAny némi optimalizációval az operációs rendszer lehetőségeit használja ki. A WaitAll első lépésben a várakozott taszkokat megpróbálja “inline”-olni, azaz az aktuális szálon azonnal végrehajtani. Szerencsés esetben ez sikerülhet, ekkor a WaitAll nem igényel sok erőforrást. Ha nem sikerül, akkor minden várakozott taszkhoz létrejön egy event objektum, amin keresztül a várakozó szál fel lesz élesztve. Ezek használata önmagában sokkal lassabb az eddig bemutatott eljárásokhoz képest, ahol legrosszabb esetben is az Interlocked osztály metódusai szolgáltatták a minimális szinkronizációs szükségleteket.

Másik hátrány, amit az MSDN meg sem említ, hogy az operációs rendszer korlátai miatt nem is működnek akárhány taszk példányt használva. A WaitAll() ugyan belső implementációjában elrejti ezeket a korlátokat, a WaintAny() ezt már nem tudja megtenni – így maximum 64 taszkpéldánnyal használható.

Az alábbi – némileg mesterkélt – példaprogram mutat rá a WaitAll() hátrányaira. A trükközésre a kicsi példaprogram esetén azért van szükség, mert a WaitAll() belső optimalizációja egyszerűbb esetekben elég hatékony. Sokszor ugyanis az történik, hogy a szülőtaszk létrehoz pár gyerektaszkot, majd azokra várakozik WaitAll()-lal. A gyerektaszkok viszont a lokális feladatsorba kerülnek, ahonnan a WaitAll() ügyesen beinline-olja őket, emiatt pedig nem fog blokkolni a szál végrehajtása. Azok a feladatok, amelyek gondot okozhatnak, már bonyolultabb végrehajtási lánccal kell, hogy bírjanak – vagy pedig nekünk kell a teszt kedvéért kiiktatni a WaitAll() optimalizációs próbálkozását. Ezt megtehetjük, ha az új taszkokat direkt nem a lokális feladatsorba pakoltatjuk, amihez egyszerűen a PreferFairness flag-et használjuk. Ezt teszi az alábbi példaprogram:

// Alább a delegate-ben van hívatkozás a workItem változóra, és
// a fordító szólna, hogy még nincs inicializálva, ha nincs itt ez
// a sor:
Func<object, long> workItem = null;

// Az alábbi kód rekurzív, amíg nem érte el a paraméterben megadott mélységet,
// létrehoz ugyanolyan taszkokat, mint ő maga, és megvárja az eredményüket.
// Ha már a megfelelő mélységbe jutott, akkor számol egy kicsit.
workItem =
    (o) =>
    {
        // Az elért mélység. A TaskFactory.StartNew csak Func<object, long>-ot tud, emiatt
        // nem lehet int-ként behozni a paramétert, csak objectként:
        int i = (int)o;  

        // Nem vagyunk a megfelelő mélységben, tovább bontani a feladatot:
        if (i > 0)
        {
            var tasks = new Task<long>[2];
            for (int n = 0; n < tasks.Length; n++)
            {
                tasks[n] = 
                    Task.Factory.StartNew(
                        workItem, i - 1, TaskCreationOptions.PreferFairness);
            } // for n

            // Várakozni a két alfeladatra. Ha fentebb nem lenne a PreferFairness,
            // az alábbi hívás nagy valószínűséggel be tudná inline-olni mind a kettőt.
            Task.WaitAll(tasks);

            // A részszámítások összegzése
            return tasks.Sum(t => t.Result);
        }
        else
        {
            // Elértük az alsó szintet a rekurzióban, számolni valamit. A számítás
            // úgy lett belővel, hogy több, mint fél másodpercig tartson. Ekkor a thread pool
            // nagyobb hajlandóságot mutat újabb szálak indítására.
            var startTime = DateTime.Now;
            long result = 0;

            for (int n = 1; n < 500000; n++)
            {
                result += (DateTime.Now - startTime).Milliseconds;
            } // for n

            return result;
        }
    };

var sw = Stopwatch.StartNew();

// A kezdő feladat indítása, a rekurzió 6 mélységig tart:
var root = 
        Task.Factory.StartNew(
            workItem, 6, TaskCreationOptions.PreferFairness);

root.Wait();

Console.WriteLine(root.Result);
Console.WriteLine(sw.ElapsedMilliseconds);

A futás eredményét a következő ábrákon láthatjuk.

Az első ábra azt mutatja, hogy a processzor magjai mivel voltak elfoglalva. A vízszintes tengely az idő, a függőleges pedig a magok által végzett munkát mutatja. A számunkra érdekes terület a zöld, mivel azokon a területeken foglalkozott a processzor a feladataink számításával. Látható, hogy ez a terület nem túl nagy, a processzor körülbelül az idejének csak harmadát töltötte számunkra értékes munkával. Jelentős területet foglal el a szürke szín, ami az Idle, tehát amikor a processzor csak várakozik. Ez a rendeteg WaitAll hívásnak köszönhető.Ha közelebbről megnézzük a diagrammot, azért látható, hogy pici hasznos munka ezeken a területeken is van:

Mik lehetnek ezek a területek? Végrehajtás során az történik, hogy a Thread Pool szálain a programunk beakasztja a szálat a WaitAll-lal. A Thread Pool belső logikája, ha fél másodpercnél tovább állnak a feladatok a feladatsorban, indít egy új szálat (ezzel próbálva elkerülni például a dead lockokat). Amikor egy új szál elindul, elkezdi végrehajtani a taszkjainkat, ami csinál két új taszkot, majd várakozni kezd. Amíg az új szál elindul, addig a pontig, amíg várakozni kezd, egy kevés hasznos munkát végez, ezeket jelképezik a fél másodpercenként megjelenő huplik. Ugyanez a jelenség látszik a következő ábrán:

A teljes diagramnak csak a fele fért a képre, összesen közel 70 szálat indított a Thread Pool, mire a feladat végére ért. A piros szín a szinkronizációt jelenti, és látható, hogy szinte minden szál ebben az állapotban tölti az idejének nagy részét. Látszik tehát, hogy bár a példánk erőltetett (a PreferFairness flag miatt), azért előfordulhat olyan helyzet, ahol a WaitAll/WaitAny nagyon durva teljesítményvesztéssel jár.

Amit tenni lehet ennek elkerülésére, hogy a feladatot átfogalmazzuk úgy, hogy a TaskFactory.ContinueWhenAll/Any() függvényeit használja.

TaskExtensions.Unwrap

Ez az átfogalmazás azonban nem olyan egyszerű. Nem is olyan bonyolult, csak a többszörös índirekciók miatt kicsit nehéz követni. Az eddigi taszkjaink egy szimpla long értéket adtak, mint eredmény. Mivel a szülőtaszk megvárta a gyerektaszk lefutását, ehhez a long értékhez a Task.Result property-n keresztül hozzáfért.

Most azonban a long érték nem megoldás. Amikor egy adott végrehajtási szint (a workItem delegate egy lefutása) még nem ért el a rekurzív hívások aljára, akkor – mivel nem blokkolhat – a gyerek taszkok létrehozása után vissza kell adnia a vezérlést. Ez azért problémás, mert még nem végezte el az összes munkáját, hiszen a gyerektaszkok eredményeit összegezni kell, ezek viszont még nincsenek meg.

Amit ebben a helyzetben tenni lehet, az az, hogy az összegzést végző munkát kiszervezzük egy continuation taszkba, és elérjük, hogy ez a continuation taszk lefusson, amikor a gyerektaszkok mindegyikének a futása véget ért. Milyen visszatérési értéke lesz ebben az esetben a workItem delegate-nek? Nem lehet az eddigi long, hiszen az érték még nem áll rendelkezésre, amikor a delegate visszaadja a vezérlést. Egy olyan dolgot kell visszaadnunk, ami majd egyszer, a jövőben megadja a kívánt long értéket. A taszkok pont ilyen dolgok, amelyek egyszer a jövőben elvégeznek egy feladatot, és az alapján szolgálni tudnak eredménnyel. Visszaadhatunk tehát egy taszkot, ami long értéket szolgáltat – miután elvégezte a munkát.

Ez utóbbi gondolatmenet illik a continuation taszkok filozófiájába, azt mondtuk, hogy a gyerektaszkok eredményeinek (amelyek long-ok) az összegzését egy continuation taszk végzi, ami így ha készen van, akkor egy long értéket szolgáltat. Nem véletlen, hogy a Task.Factorty.ContinueWhenAll() hívás, ha a neki átadott delegate long visszatérési értékkel bír, akkor egy Task-ot ad eredménynek. Másik oldalról eljutottunk addig, hogy a workItemnek is ezt a típust kell szolgáltatnia, hiszen ez az a típus, ami a jövőben majd egy longot ad eredményül.

tasks = gyerekszatszkok;
...
return Task.Factory.ContinueWhenAll(tasks, t => t.Sum(c => c.Result)); // => Task<long>

Aki azt gondolja, hogy ez volt a nehezen követhető rész, az csalódni fog, ugyanis most jönnek az igazi turpisságok.

Két problémánk van ugyanis. Az egyik, hogy amikor a rekurzív hívási lánc leér az “aljáig”, akkor nem indít újabb taszkokat, hanem kiszámolja a szükséges értéket (a long-ot). Ebben az esetben a természetes az lenne, ha ezt a long-ot vissza lehetne adni eredményként, azonban ugyanabban a workItem delegate-ben vagyunk, aminek a visszatérési értéke a fentiek alapján már Task<long>. Amit tennünk kell itt tehát az, hogy a meglévő értékünket becsomagoljuk egy Task példányba, amiből utána azt meg lehet szerezni a Task.Result-on keresztül. Már találkoztunk azt ezt lehetővé tévő osztállyal, a TaskCompletionSource pont ezt tudja:

long result = ...;

var tcs = new TaskCompletionSource<long>();
tcs.SetResult(result);
return tcs.Task;        // A tcs.Task.Result-ben az eredmény

Ezzel tehát elértük, hogy a hívási lánc közbülső szintjei és az alja ugyanazt a visszatérési típust használhatja. Van azonban egy másik probléma, ami nem tűnik fel azonnal. A workItem delegate olyan típusú, hogy egy Task<long>-od ad vissza eredménynek. Ha közvetlenül meghívnánk ezt a delegate-et, az eredmény egy Task példány lenne, ami egyszer valamikor szolgáltat egy long értéket. A gond az, hogy nem közvetlenül hívjuk meg a workItem delegate-et. Azért nem, mert aszinkron módon szeretnénk futtatni (ráadásul többet), emiatt becsomagoljuk azt taszk példányokba. Ezt akkor tesszük meg, amikor hívjuk a Task.Factory.StartNew() metódust – ami így eredménynek egy Task<Task<long>> típusú értéked ad vissza.

Hogyan értelmezzük ezt a visszatérési értéket? Úgy hogy kapunk egy taszkot, ami majd egyszer végez a munkájával és ekkorra előállít egy taszkot, ami majd egyszer, ha végez a munkájával, akkorra előállít egy long értéket. Még egyszer, hogy jutottunk ide? A gond az volt, hogy adott végrehajtási szinten, a vezérlést vissza kell adni még azelőtt, hogy a részeredmények meglennének. Ezért egy taszkot lehet visszaadni eredménynek, ami majd szolgáltat egy long-ot, így lett a delegate visszatérési értéke Task<long>. Az alszámítások ugyanúgy nem várhatják meg az eredményüket, mint a jelenlegi szint (hisz ugyanaz a kód, rekurzív az algoritmus), így azok is Task<long>-ot szolgáltatnak. Azonban mi a jelenlegi szinten ezeket az al-taszkokat is aszinkron módon használjuk, azaz nem akarjuk megvárni, amíg az al-taszkok szolgáltatni tudják a Task<long>-okat, hanem mindegyik egy külön taszkban végzi el a munkát, így lesznek Task<Task<long>>-jaink. A fent leírt continuation taszkunk, tehát nem jó. Ugyanis a tasks tömb nem megfelelő típusú lesz:

// tasks[n] => Task<Task<long>> 
tasks[n] = Task.Factory.StartNew(workItem, i - 1, TaskCreationOptions.PreferFairness)

A continuation ugye így nézett ki:

return Task.Factory.ContinueWhenAll(tasks, t => t.Sum(c => c.Result));

Ez nyilván nem működhet, mivel a tasks tömb egy elemének (ami fent a c változóba kerül) a Result property-je most Task, nem pedig long. Írjuk a következőt?

return Task.Factory.ContinueWhenAll(tasks, t => t.Sum(c => c.Result.Result));

Ez logikusnak tűnik, mivel a c.Result egy Task<long>, aminek szintén van Result property-je, amivel megkapjuk a várt long-t. Ez azonban nagyon rossz ötlet. Miért? A fent leírt continuation akkor fog elindulni, ha az összes taszk a tasks tömbben RanToCompletion állapotba futott. Ezeknek a taszkoknak viszont csak az a feladata, hogy létrehozzák a Task taszkokat. Ha ez megvan, belépnek RanToCompletion-ba. Ez viszont nem jelenti azt, hogy a taszkok, amelyeket létrehoztak, már tudják szolgáltatni az értéket, viszont mi a continuation delegate-ben ezt meghivatkozzuk. Ezzel az a baj, hogy a Task osztály implementációja, ha a Result property-t hivatkozzuk, és az még nincs kész, akkor blokkolja a szálat, amíg az eredmény meg nem születik (vagy valami más végállapotba nem lét a taszk). A megoldásunk tehát ugyanattól a problámátol fog szenvedni, mint a korábbi. A sok szálblokkolás miatt új és új szálak jönnek létre, a processzor pedig kihasználatlan lesz, mire a Thread Pool a fél másodperces lépéseivel megfelelő számúra szaporítha fel a Thread Pool szálakat.

Amire szükségünk lenne, az egy olyan continuation funkció, ami akkor indulna el, ha a taszk által létrehozott taszk is lefutott. Hát, ilyen funkció nincs, azonban van mód létrehozni egy olyan taszkot, ami akkor lép RanToCompletion állapotba, ha egy taszk által becsomagolt taszk RanToCompletion állapotba lépett. Ha a Task.Factory.StartNew() által visszaadott taszkok mindegyike helyett ezt a spéci taszkot használnánk, akkor a ContinueWhenAll() hívás működhetne az eredeti formájában.

Egy ilyen spéci taszkot csinálni nem túl bonyolult, de még ezzel sem kell bajlódnunk, hiszen a TaskExtensions.Unwrap() függvény ezt megcsinálja helyettünk. A trükk egyébként az, hogy a függvény a külső taszkra rak egy continuation delegate-et, ami ugye akkor indul el, amikor a belső taszk már létrejött (ez a külső taszk Result-ja) így arra tesz egy continuation delegate-et, ami viszont egy TaskCompletionSource által hordozott taszkba másolja a belső taszk eredményét. Az Unwrap() metódus pedig ennek a TaskCompletionSource példánynak a Task property-jével tér vissza, ami így akkor fut RanToCompletion-ba, amikor a belső taszk is lefutott, és az eredménye pont a belső taszk eredménye lesz. Ennek a tudásnak a birtokában befejezhetjük a kódunkat, ami most így néz ki:

Func<object, Task<long>> workItem = null;
workItem =
    (o) =>
    {
        int i = (int)o;

        if (i > 0)
        {
            var tasks = new Task<long>[2];
            for (int n = 0; n < tasks.Length; n++)
            {
                tasks[n] = 
                    Task.Factory.StartNew(
                        workItem, 
                        i - 1, 
                        TaskCreationOptions.PreferFairness).Unwrap();
            } // for n

            return Task.Factory.ContinueWhenAll(tasks, t => t.Sum(c => c.Result));
        }
        else
        {
            var startTime = DateTime.Now;
            long result = 0;

            for (int n = 1; n < 500000; n++)
            {
                result += (DateTime.Now - startTime).Milliseconds;
            } // for n

            var tcs = new TaskCompletionSource<long>();
            tcs.SetResult(result );
            return tcs.Task;
        }
    };

var sw = Stopwatch.StartNew();


var root = Task.Factory.StartNew(workItem, 6, TaskCreationOptions.PreferFairness).Unwrap();
root.Wait();

Console.WriteLine(root.Result);
Console.WriteLine(sw.ElapsedMilliseconds);

Ez a kód, amellett, hogy majd háromszor olyan gyorsan végzett, a diagrammok szerint is sokkal hatékonyabban használta a CPU-t:

Az alábbi diagrammon a szálhasználat látható. Látszik, hogy az algoritmus második felében azért itt is több szálat indított a Thread Pool, azért közel sem annyit, mint korábban.

Parallel.Invoke()

A Parallel.Invoke() metódus szintén képes arra, hogy feladatokat párhuzamosan indítson, majd azok eredményét megvárja. Vajon hatékony a működése? Ennek a függvénynek a működése legjobban egy egybevont Factory.StartNew()/Task.WaitAll() párosra hasonlít, és ezzel már látjuk is a hátrányait. Kevés számú delegate esetén (max 10) megpróbál inline-olni, majd pedig a WaitAll()-hoz hasonlóan beállítja a szálat, és várakozik. Ennél nagyobb számú delegate-nél a Self Replicating Task-ok technikáját használja, amivel majd egy későbbi cikkben foglalkozunk. Röviden ennek a lényege, hogy megpróbál spórolni a taszk példányok folyamatos létrehozásával. A Self Replicating Task-oknak egyébként a Parallel.For() esetében van nagy szerepe. A lényeg tehát, hogy teljesítményigényes algoritmusoknál kerülni kell a használatát.

Konklúzió

A Task Parallel Library sok lehetőséget kínál Task példányok közötti függőségek leírására, illetve a példányok közötti szinkronizációra. Láthattuk, hogy a lehetőségek közül nem mindegy, mikor melyiket használjuk. Ha a fejlesztő csak felületesen ismeri meg a lehetőségeket, könnyen követ el olyan hibákat, mint például ezen az oldalon. Itt a fejlesztő a Parallel.Invoke() metódust használta, mivel ez két szálon képes futtatni a gyorsrendezés két részét, és megoldja a szinkronizációt is. Láttuk, hogy ez csak látszólag hatékony, a megoldás a szálak elszaporításához vezet, melyek közül sok várakozni fog. Ugyanazon az oldalon egy másik fejlesztő saját szinkronizációs mechanizmust használ, és hibásan a Thread Pool globális feladatsorába pakolja a részrendezéseket – törve ezzel a lokalitási elvet, és nehezítve cache dolgát.

Aki figyelmesen olvasta ezt a cikket, rájöhet, hogy a helyes megoldás a részszámítások indítására a következő:

Task.Factory.StartNew(
    () => QuicksortParallel(arr, left, pivot - 1), 
    TaskCreationOptions.AttachedToParent);
Task.Factory.StartNew(
    () => QuicksortParallel(arr, pivot + 1, right),
    TaskCreationOptions.AttachedToParent);

Ehhez persze a kezdő lépést is megfelelően kell felépíteni:

public static void QuicksortParallel<T>(T[] arr) where T : IComparable<T>     
{         
    Task.Factory.StartNew(
        () => QuicksortParallel(arr, 0, arr.Length - 1)).Wait(); 
} 

Vagy:

public static Task QuicksortParallel<T>(T[] arr) where T : IComparable<T>     
{         
    return Task.Factory.StartNew(
               () => QuicksortParallel(arr, 0, arr.Length - 1)); 
} 
  1. Task-based Asynchronous Pattern – Kivételek - pro C# tology - devPortal
  2. Aszinkron programozás – Áttekintés – Újdonságok a C# 5-ben « DotNetForAll
  3. Aszinkron programozás – Áttekintés – Újdonságok a C# 5-ben | .NET apps

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: