Java Blocking Queue

Scenario Since the third-party API has a rate limit, I need to send requests one by one, with a delay between each call. So I create a simple solution to do it! Solution // ApiService import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import java.util.Map; @Slf4j @Service public class ApiService { public void executeApi(BlockingQueueService.Request request) throws InterruptedException { // log.info("[jobId-{}] API_SERVICE > START", request.getId()); Thread.sleep(request.getRunTime()); // log.info("[jobId-{}] API_SERVICE > STOP", request.getId()); } } // BlockingQueueService import lombok.Data; import lombok.ToString; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.concurrent.*; @Slf4j @Service public class BlockingQueueService { @Autowired private ApiService apiService; private final int POOL_SIZE = 1; private final BlockingQueue queue = new LinkedBlockingQueue(); private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(POOL_SIZE); public BlockingQueueService() { log.info("INIT BLOCKING QUEUE SERVICE"); for (int i = 0; i processQueue(workerId), 0, 2, TimeUnit.SECONDS); } // executor.scheduleAtFixedRate(this::processQueue, 0, 1, TimeUnit.SECONDS); } public void enqueue(Request request) { log.info("QUEUE > ADD: {}", request); queue.offer(request); } private void processQueue(int workerId) { Request request = queue.poll(); log.info("[worker-{}] QUEUE > CHECK", workerId); if (request == null) { return; } log.info("[worker-{}__jobId-{}] QUEUE > PROCESS START", workerId, request.getId()); try { this.apiService.executeApi(request); } catch (Exception e) { e.printStackTrace(); } finally { log.info("[worker-{}__jobId-{}] QUEUE > PROCESS END", workerId, request.getId()); } } @Data @ToString public static class Request { int id; int runTime; } } // Queue101Application import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; @RestController @SpringBootApplication public class Queue101Application { @Autowired BlockingQueueService blockingQueueService; private int count = 0; public static void main(String[] args) { SpringApplication.run(Queue101Application.class, args); } @GetMapping("/trigger") public String trigger() { for (int i = 0; i < 100; i++) { BlockingQueueService.Request request = new BlockingQueueService.Request(); int randomNumber = ThreadLocalRandom.current().nextInt(1,2); request.setId(count); request.setRunTime(randomNumber*1000); count++; blockingQueueService.enqueue(request); } return "SUCCESS"; } } Test Trigger queue: http://localhost:8080/trigger Log ... 2025-04-17T21:22:44.409+09:00 INFO 88153 --- [queue101] [nio-8080-exec-1] o.h.queue101.BlockingQueueService : QUEUE > ADD: BlockingQueueService.Request(id=99, runTime=1000) 2025-04-17T21:22:45.799+09:00 INFO 88153 --- [queue101] [pool-2-thread-1] o.h.queue101.BlockingQueueService : [worker-1] QUEUE > CHECK 2025-04-17T21:22:45.799+09:00 INFO 88153 --- [queue101] [pool-2-thread-1] o.h.queue101.BlockingQueueService : [worker-1__jobId-0] QUEUE > PROCESS START 2025-04-17T21:22:46.799+09:00 INFO 88153 --- [queue101] [pool-2-thread-1] o.h.queue101.BlockingQueueService : [worker-1__jobId-0] QUEUE > PROCESS END 2025-04-17T21:22:47.800+09:00 INFO 88153 --- [queue101] [pool-2-thread-1] o.h.queue101.BlockingQueueService : [worker-1] QUEUE > CHECK 2025-04-17T21:22:47.801+09:00 INFO 88153 --- [queue101] [pool-2-thread-1] o.h.queue101.BlockingQueueService : [worker-1__jobId-1] QUEUE > PROCESS START 2025-04-17T21:22:48.806+09:00 INFO 88153 --- [queue101] [pool-2-thread-1] o.h.queue101.BlockingQueueService : [worker-1__jobId-1] QUEUE > PROCESS END ...

Apr 17, 2025 - 13:57
 0
Java Blocking Queue

Scenario

Since the third-party API has a rate limit, I need to send requests one by one, with a delay between each call.

So I create a simple solution to do it!

Solution

// ApiService

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.util.Map;

@Slf4j
@Service
public class ApiService {

    public void executeApi(BlockingQueueService.Request request) throws InterruptedException {
//        log.info("[jobId-{}] API_SERVICE > START", request.getId());
        Thread.sleep(request.getRunTime());
//        log.info("[jobId-{}] API_SERVICE > STOP", request.getId());
    }
}

// BlockingQueueService
import lombok.Data;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.concurrent.*;

@Slf4j
@Service
public class BlockingQueueService {
    @Autowired
    private ApiService apiService;
    private final int POOL_SIZE = 1;
    private final BlockingQueue<Request> queue = new LinkedBlockingQueue<>();
    private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(POOL_SIZE);

    public BlockingQueueService() {
        log.info("INIT BLOCKING QUEUE SERVICE");
        for (int i = 0; i < POOL_SIZE; i++) {
            int workerId = i + 1;
            executor.scheduleAtFixedRate(() -> processQueue(workerId), 0, 2, TimeUnit.SECONDS);
        }
//        executor.scheduleAtFixedRate(this::processQueue, 0, 1, TimeUnit.SECONDS);
    }

    public void enqueue(Request request) {
        log.info("QUEUE > ADD: {}", request);
        queue.offer(request);
    }

    private void processQueue(int workerId) {
        Request request = queue.poll();
        log.info("[worker-{}] QUEUE > CHECK", workerId);
        if (request == null) {
            return;
        }

        log.info("[worker-{}__jobId-{}] QUEUE > PROCESS START", workerId, request.getId());
        try {
            this.apiService.executeApi(request);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            log.info("[worker-{}__jobId-{}] QUEUE > PROCESS END", workerId, request.getId());
        }

    }

    @Data
    @ToString
    public static class Request {
        int id;
        int runTime;
    }
}
// Queue101Application
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;

@RestController
@SpringBootApplication
public class Queue101Application {

    @Autowired
    BlockingQueueService blockingQueueService;

    private int count = 0;

    public static void main(String[] args) {
        SpringApplication.run(Queue101Application.class, args);
    }

    @GetMapping("/trigger")
    public String trigger() {
        for (int i = 0; i < 100; i++) {
            BlockingQueueService.Request request = new BlockingQueueService.Request();
            int randomNumber = ThreadLocalRandom.current().nextInt(1,2);
            request.setId(count);
            request.setRunTime(randomNumber*1000);
            count++;
            blockingQueueService.enqueue(request);
        }


        return "SUCCESS";
    }
}

Test

...
2025-04-17T21:22:44.409+09:00  INFO 88153 --- [queue101] [nio-8080-exec-1] o.h.queue101.BlockingQueueService        : QUEUE > ADD: BlockingQueueService.Request(id=99, runTime=1000)
2025-04-17T21:22:45.799+09:00  INFO 88153 --- [queue101] [pool-2-thread-1] o.h.queue101.BlockingQueueService        : [worker-1] QUEUE > CHECK
2025-04-17T21:22:45.799+09:00  INFO 88153 --- [queue101] [pool-2-thread-1] o.h.queue101.BlockingQueueService        : [worker-1__jobId-0] QUEUE > PROCESS START
2025-04-17T21:22:46.799+09:00  INFO 88153 --- [queue101] [pool-2-thread-1] o.h.queue101.BlockingQueueService        : [worker-1__jobId-0] QUEUE > PROCESS END
2025-04-17T21:22:47.800+09:00  INFO 88153 --- [queue101] [pool-2-thread-1] o.h.queue101.BlockingQueueService        : [worker-1] QUEUE > CHECK
2025-04-17T21:22:47.801+09:00  INFO 88153 --- [queue101] [pool-2-thread-1] o.h.queue101.BlockingQueueService        : [worker-1__jobId-1] QUEUE > PROCESS START
2025-04-17T21:22:48.806+09:00  INFO 88153 --- [queue101] [pool-2-thread-1] o.h.queue101.BlockingQueueService        : [worker-1__jobId-1] QUEUE > PROCESS END
...