First requests not sending in the start of the goroutine func

144 Views Asked by At

i am running goroutines in my code. say, if i set my threads to 50, it will not run the first 49 requests, but it will run the 50th request and continue with the rest. i am not really sure how to describe the issue i am having, and it gives no errors. this has only happened while using fasthttp, and works fine with net/http. could it be an issue with fasthttp? (this is not my whole code, just the area where i think the issues are occurring)

    threads := 50
    var Lock sync.Mutex
    semaphore := make(chan bool, threads)

    for len(userArray) != 0 {
        semaphore <- true
        go func() {
            Lock.Lock()
            var values []byte
            defer func() { <-semaphore }()
            fmt.Println(len(userArray))
            if len(userArray) == 0 {
                return
            }
            values, _ = json.Marshal(userArray[0])
            currentArray := userArray[0]
            userArray = userArray[1:]
            client := &fasthttp.Client{
                Dial: fasthttpproxy.FasthttpHTTPDialerTimeout(proxy, time.Second * 5),
            }
            time.Sleep(1 * time.Nanosecond)
            Lock.Unlock()

this is the output i get (the numbers are the amount of requests left)

200
199
198
197
196
195
194
193
192
191
190
189
188
187
186
185
184
183
182
181
180
179
178
177
176
175
174
173
172
171
170
169
168
167
166
165
164
163
162
161
160
159
158
157
156
155
154
153
152
151
(10 lines of output from req 151)
150
(10 lines of output from req 150)
cont.

sorry if my explanation is confusing, i honestly don't know how to explain this error

1

There are 1 best solutions below

2
On

I think the problem is with the scoping of the variables. In order to represent the queueing, I'd have a pool of parallel worker threads that all pull from the same channel and then wait for them using a waitgroup. The exact code might need to be adapted as I don't have a go compiler at hand, but the idea is like this:

    threads := 50
    queueSize := 100 // trying to add more into the queue will blocke

    semaphore := make(chan bool, threads)
    jobQueue := make(chan MyItemType, queueSize)

    var wg sync.WaitGroup

    func processQueue(jobQueue <- chan MyItemType) {
      defer wg.Done()
      for item := range jobQueue {
         values, _ = json.Marshal(item) // doesn't seem to be used?
         client := &fasthttp.Client{
            Dial: fasthttpproxy.FasthttpHTTPDialerTimeout(proxy, time.Second * 5),
         }
      }
    }

    for i := 0; i < threads; i++ {
      wg.Add(1)
      go processQueue(jobQueue)
    }
  
    close(jobQueue)
    wg.Wait()

Now you can put items into jobQueue and they will be processed by one of these threads.