Load csv file again during gatling simulation

1.4k Views Asked by At

I am trying to run a simulation with two scenarios. The setUp looks like this

setUp(
 scn1.inject(rampConcurrentUsers(minUserCount) to (tps) during (testRampSeconds seconds),constantConcurrentUsers(tps) during (testDurationSeconds seconds)).throttle(reachRps(tps) in (1 seconds), holdFor(testDurationSeconds seconds), jumpToRps(0), holdFor(60 seconds)).protocols(httpProtocol),
 scn2.inject(nothingFor(testDurationSeconds),atOnceUsers(1)).protocols(httpProtocol)
  )

Scenario 1 makes REST API calls and the response data is saved in a csv file

PrintWriter to write to file:

  val fileWriter = {
    val fos = new java.io.FileOutputStream("foo.csv")
    val appTokenWriter = new java.io.PrintWriter(fos,true)
    appTokenWriter.println("data")
    appTokenWriter
  }

Scenario 2 waits for Scenario 1 to complete so that it can process all entries in the csv file at once.

.foreach(csv(foo.csv).readRecords,"record") {
      exec(flattenMapIntoAttributes("${record}"))
      .exec(session => {
        println(session("data").as[String])
        session
      })
      .exec(...REST Call...)

The csv(foo.csv).readRecord is not picking up the values that were flushed by scenario1 into the csv. csv(foo.csv).readRecords.size = 0 in Scenario2 but I can see the data written in the file

Is this the right way to maintain data between two scenarios? If yes, how do I make the .readRecords API pick the latest changes. If no, please suggest the right way to do this.

Edit: Based on Stephane's implementation, tried using ConcurrentLinkedDeque

package com.intuit.platform.identity.oauth2.performance.simulation

import java.util.concurrent.ConcurrentLinkedDeque

import io.gatling.core.Predef._
import io.gatling.core.feeder.Feeder
import io.gatling.http.Predef.http

import scala.concurrent.duration._
import scala.util.Random

/**
 * @author vnarayanan
 */
class SOSimulation extends Simulation {

  val httpProtocol = http.baseUrl("localhost:8080")

  val dataQueue = new ConcurrentLinkedDeque[String]()

  val saveInQueue = exec{ session => {
    dataQueue.offer(session("data").as[String])
    session
  }
  }

  class DataFeeder extends Feeder[String] {
    override def hasNext: Boolean = dataQueue.size() > 0
    override def next(): Map[String, String] = {
      Map("data" ->dataQueue.poll())
    }
  }
  
  val scenario1  = scenario("Scenario1")
    .exec(session => {
      session.set("data", Random.alphanumeric.take(20).mkString)
    })
    .exec(saveInQueue)

//I need to stop the requests when the queue is empty. tried it with two solutions:
//  1) asLongAs
//  2) repeat (commented)


  val scenario2  = scenario("Scenario2")
    .exec(session => {
      println(dataQueue.size())
      session
    })
    .asLongAs(dataQueue.size() > 0) {
      exec(session => {
        println(dataQueue.size())
        session
      })
        .feed(new DataFeeder())
        .exec(session => {
          println(session("data").as[String])
          session
        })
    }

//  With Repeat

//  val scenario2  = scenario("Scenario2")
//    .exec(session => {
//      println(dataQueue.size())
//      session
//    })
//    .repeat(dataQueue.size() ) {
//      exec(session => {
//        println(dataQueue.size())
//        session
//      })
//        .feed(new DataFeeder())
//        .exec(session => {
//          println(session("data").as[String])
//          session
//        })
//    }
    

  setUp(
    scenario1.inject(atOnceUsers(10)).throttle(reachRps(1) in (1 seconds), holdFor(5 seconds), jumpToRps(0), holdFor(10 seconds)).protocols(httpProtocol),
    scenario2.inject(nothingFor(5 seconds),atOnceUsers(1)).protocols(httpProtocol)

  ).maxDuration(10 seconds)
}
1

There are 1 best solutions below

6
On

That's not possible with our built in csv feeders. Now, remember that Feeder is an alias for Iterator[Map[String, Any]], so you can write your own implementation that would support your use case. For example, instead of writing to and reading from a file, you could keep the data in memory in a ConcurrentLinkedDeque.