How to limit amount of CoroutineWorkers working at the same time

357 Views Asked by At

I've created simple test app to run multiple CoroutineWorkers in the background:

class Work(context: Context, workerParameters: WorkerParameters) :
    CoroutineWorker(context, workerParameters) {
    override suspend fun doWork(): Result {
        Log.d("bagins", "Work ${this.tags.first()} started")
        runBlocking {
            delay(5000)
        }
        Log.d("bagins", "Work ${this.tags.first()} finished")
        return Result.success()
    }
}

I launch them with workManager.enqueue(workers)

But I don't want all of them to working simultaneously. I want max 2 to be executed at the same time.

Based on examples found I udpated AndroidManifest.xml by adding:

<provider
    android:name="androidx.startup.InitializationProvider"
    android:authorities="${applicationId}.androidx-startup"
    tools:node="remove">
</provider>

And Application class to this:

class App : Application(), Configuration.Provider {
    override fun getWorkManagerConfiguration(): Configuration {
        val executor = Executors.newFixedThreadPool(2)
        return Configuration.Builder().setExecutor(executor).build()
    }

    override fun onCreate() {
        super.onCreate()
        WorkManager.initialize(this, workManagerConfiguration)
    }
}

If I execute workManager.enqueue multiple (10) times it is executing the CoroutineWorkers simultaneously. I want them to be executed max 2 parallel, and if new added to the queue, I want this rule to still work.

2

There are 2 best solutions below

0
Lajos Arpad On

To achieve your goal, you can use a Semaphore, like

// This is example to explain the API for semaphore

fun main() {
  // Creates semaphore with 10 permits
  val semaphore = Semaphore(permits = 10)
  
  // Acquires a permit from this semaphore, suspending until one is available. 
  // All suspending acquirers are processed in first-in-first-out (FIFO) order
  semaphore.acquire() 
  
  // Releases a permit, returning it into this semaphore. 
  // Resumes the first suspending acquirer if there is one at the point of invocation
  semaphore.release()
}

Another example:

fun <T, R> Iterable<T>.map(
    concurrency: Int,
    transform: (T) -> R
): List<R> = runBlocking {
    // Create semaphore with permit specified as `concurrency`
    val semaphore = Semaphore(concurrency)
   
    map { item ->
        // Before processing each item, acquire the semaphore permit
        // This will be suspended until permit is available.
        semaphore.acquire()
        
        async(Dispatchers.Default) {
            try {
                transform(item)
            } finally {
                // After processing (or failure), release a semaphore permit
                semaphore.release()
            }
        }
    }.awaitAll()
}

fun doSomething(users: List<User>) {
    // Concurrently 5 users will be processed
    users.map(concurrency = 5) { user -> user.toSomething() /* `toSomething()` is heavy method */ }
}

Both examples were taken from here, I strongly recommend reading the article.

0
crack_head On

I suggest you should use non-suspend worker version extending from Worker instead of CoRoutineWorker. Then use runBlocking to wait for suspend method to finish the work.

  override fun doWork(): Result  = runBlocking{
          try{
              //your long running code
              Result.success()
          }catch (ie:InterruptedException){
              Result.retry()
          }
        }
    }

runBlocking may throw InterruptedException. So you need to catch it and return the Resutl.retry.