Sharing object context Tasks

434 Views Asked by At

I have a list lstSubscriptionRequests on which indivisual items i am doing some sort of processing asynchronously. Then after All items are processed i have to return the updated list items. My current implementation is like

List<SubscriptionRequest> lstSubscriptionRequests = FromSomeResource();

 var tsk = new List<Task>();
                foreach (var oTsk in lstSubscriptionRequests.Select(objSubscriptionRequest => new Task(
                    () => ProcessSubscriptionForASingleRecord(objSubscriptionRequest))))
                {
                    oTsk.Start();
                    lock (tsk)
                    {
                        tsk.Add(oTsk);
                    }
                }
                Task.WaitAll(tsk.ToArray());

It's looks like some of the items after all the tasks completed are not updated. Please let me know what correction i needed

1

There are 1 best solutions below

3
JSteward On

You could accomplish this much easier. Note that using the Task constructor is not typically necessary or recommended, also mutating the state of a particular object can be difficult to follow or debug. Returning a new object that represents your desired state will allow you to enforce a minimum valid state. The following code will processes all your items and return the completed items to your client code.

using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace Processing {
    public class MyProcessor {

        public async Task<IEnumerable<Subscription>> ProcessSubscriptionRequestsAsync(IEnumerable<SubscriptionRequest> subscriptionRequests) {
            var subscriptionProcessingTasks = subscriptionRequests.Select(request => ProcessSubscriptionForASingleRecord(request)).ToArray();
            return await Task.WhenAll(subscriptionProcessingTasks);
        }

        public async Task<Subscription> ProcessSubscriptionForASingleRecord(SubscriptionRequest request) {
            //process the request
            try {
                var subscription = await Context.ProcessRequest(request);
                return subscription;
            } catch {
                //something went wrong with the request
            }
        }
    }

    public class SubscriptionRequest {
        //A subcription request
    }

    public class Subscription {
        //A completed subscription request
    }
}

Update

It could help if you could exclude new Class Subscription and add the solution in your answer. i will then try it out

Hopefully the simplified before and after view will be easier to integrate. The primary difference is replacing the Parallel.ForEach with a Select to create your collection of tasks without needing to take a lock on the task list for every SubscriptionRequest, also spooling up Tasks in parallel isn't typically necessary as each one will be executing asynchronously you only gain reaching a point where all are waiting sooner, not finishing. Next each tasks is allowed to start and all are awaited at await Task.WhenAll(tasks). It will be important for you to determine what type of processing each SubscriptionRequest undergoes. For the sake of example i"ve made the assumption that each request is somehow linked to database access, i.e. storing the request, updating a user profile of sorts etc..

public class OriginalSynchronous {

    public void ProcessSubscriptionRequest() {
        List<SubscriptionRequest> lstSubscriptionRequests = FromSomeResource();

        List<Task> tsk = new List<Task>();
        Parallel.ForEach(lstSubscriptionRequests, objSubscriptionRequest => {
            var oTsk =
                new Task(
                    () => ProcessSubscriptionForASingleRecord(objSubscriptionRequest));// update some properties after processing SubscriptionRequest
            oTsk.Start();
            lock (tsk) {
                tsk.Add(oTsk);
            }

        });
        Task.WaitAll(tsk.ToArray());
    }

    private void ProcessSubscriptionForASingleRecord(SubscriptionRequest request) {
        //modify SubscriptionRequest
    }
}

public class ModifiedAsync {

    public async Task ProcessSubscriptionRequest() {
        var subscriptionRequests = await FromSomeResourceAsync();
        var tasks = subscriptionRequests.Select(request => {
            return ProcessSubscriptionForASingleRecord(request);
        }).ToArray();
        await Task.WhenAll(tasks);
    }

    public async Task ProcessSubscriptionForASingleRecord(SubscriptionRequest request) {
        //modify SubscriptionRequest
    }
}