Skip to main content
Version: Latest

Queue

Description

BullMQ-based job queue system with Redis backend.

  • QueueClient: Producer - adds jobs to queues (used by API services)
  • QueueManager: Consumer - processes jobs from queues (used by workers)
  • @QueueWorker: Decorator for worker class registration

Examples

// Producer (API Service)
const client = new QueueClient({ host: 'localhost', port: 6379 });
await client.addJob('emails', 'send-welcome', { userId: 123 });
// Consumer (Worker Service)
@QueueWorker('emails', { concurrency: 5 })
class EmailWorker implements IQueueWorkerHandler {
async process(job: Job) {
// Process job
}
}

Interfaces

IQueueClient

Defined in: packages/core/src/core/queue/types.ts:134

Interface for Queue Client (used by API Service). Only responsible for adding jobs to queues - never consumes.

Methods

addBulk()
addBulk<T>(queueName, jobs): Promise<string[]>;

Defined in: packages/core/src/core/queue/types.ts:156

Add multiple jobs to a queue in bulk

Type Parameters
T

T

Parameters
queueName

string

Name of the queue

jobs

JobData<T>[]

Array of job data

Returns

Promise<string[]>

Array of Job IDs

addJob()
addJob<T>(
queueName,
jobName,
data,
options?): Promise<string>;

Defined in: packages/core/src/core/queue/types.ts:143

Add a single job to a queue

Type Parameters
T

T

Parameters
queueName

string

Name of the queue

jobName

string

Name/type of the job

data

T

Job payload data

options?

JobOptions

Optional job configuration

Returns

Promise<string>

Job ID

close()
close(): Promise<void>;

Defined in: packages/core/src/core/queue/types.ts:174

Close all connections

Returns

Promise<void>

getJobStatus()
getJobStatus(queueName, jobId): Promise<JobStatus>;

Defined in: packages/core/src/core/queue/types.ts:163

Get the status of a specific job

Parameters
queueName

string

Name of the queue

jobId

string

Job ID

Returns

Promise<JobStatus>

getQueueStatus()
getQueueStatus(queueName): Promise<QueueStatus>;

Defined in: packages/core/src/core/queue/types.ts:169

Get queue statistics

Parameters
queueName

string

Name of the queue

Returns

Promise<QueueStatus>


IQueueWorkerHandler

Defined in: packages/core/src/core/queue/types.ts:101

Interface for queue worker handlers

Type Parameters

T

T = any

Methods

onCompleted()?
optional onCompleted(job, result): void;

Defined in: packages/core/src/core/queue/types.ts:110

Called when a job completes successfully

Parameters
job

Job<T>

result

any

Returns

void

onFailed()?
optional onFailed(job, error): void;

Defined in: packages/core/src/core/queue/types.ts:115

Called when a job fails

Parameters
job

Job<T>

error

Error

Returns

void

onProgress()?
optional onProgress(job, progress): void;

Defined in: packages/core/src/core/queue/types.ts:120

Called when job progress is updated

Parameters
job

Job<T>

progress

number | object

Returns

void

process()
process(job): Promise<any>;

Defined in: packages/core/src/core/queue/types.ts:105

Process a job from the queue

Parameters
job

Job<T>

Returns

Promise<any>


JobData

Defined in: packages/core/src/core/queue/types.ts:44

Job data structure

Type Parameters

T

T = any

Properties

data
data: T;

Defined in: packages/core/src/core/queue/types.ts:46

name
name: string;

Defined in: packages/core/src/core/queue/types.ts:45

options?
optional options: JobOptions;

Defined in: packages/core/src/core/queue/types.ts:47


JobOptions

Defined in: packages/core/src/core/queue/types.ts:21

Options for adding a job to a queue

Properties

attempts?
optional attempts: number;

Defined in: packages/core/src/core/queue/types.ts:25

Number of retry attempts on failure

backoff?
optional backoff: object;

Defined in: packages/core/src/core/queue/types.ts:27

Backoff strategy for retries

delay
delay: number;
type
type: "fixed" | "exponential";
delay?
optional delay: number;

Defined in: packages/core/src/core/queue/types.ts:23

Delay before the job is processed (ms)

jobId?
optional jobId: string;

Defined in: packages/core/src/core/queue/types.ts:38

Unique job ID (prevents duplicates)

priority?
optional priority: number;

Defined in: packages/core/src/core/queue/types.ts:32

Job priority (lower = higher priority)

removeOnComplete?
optional removeOnComplete: number | boolean;

Defined in: packages/core/src/core/queue/types.ts:34

Remove job from queue after completion

removeOnFail?
optional removeOnFail: number | boolean;

Defined in: packages/core/src/core/queue/types.ts:36

Remove job from queue after failure


JobStatus

Defined in: packages/core/src/core/queue/types.ts:53

Job status information

Properties

attemptsMade
attemptsMade: number;

Defined in: packages/core/src/core/queue/types.ts:61

data
data: any;

Defined in: packages/core/src/core/queue/types.ts:58

failedReason?
optional failedReason: string;

Defined in: packages/core/src/core/queue/types.ts:60

id
id: string;

Defined in: packages/core/src/core/queue/types.ts:54

name
name: string;

Defined in: packages/core/src/core/queue/types.ts:55

progress
progress: number;

Defined in: packages/core/src/core/queue/types.ts:57

returnValue?
optional returnValue: any;

Defined in: packages/core/src/core/queue/types.ts:59

status
status: "waiting" | "active" | "completed" | "failed" | "delayed";

Defined in: packages/core/src/core/queue/types.ts:56

timestamp
timestamp: number;

Defined in: packages/core/src/core/queue/types.ts:62


QueueStatus

Defined in: packages/core/src/core/queue/types.ts:68

Queue status information

Properties

active
active: number;

Defined in: packages/core/src/core/queue/types.ts:71

completed
completed: number;

Defined in: packages/core/src/core/queue/types.ts:72

delayed
delayed: number;

Defined in: packages/core/src/core/queue/types.ts:74

failed
failed: number;

Defined in: packages/core/src/core/queue/types.ts:73

name
name: string;

Defined in: packages/core/src/core/queue/types.ts:69

paused
paused: boolean;

Defined in: packages/core/src/core/queue/types.ts:75

waiting
waiting: number;

Defined in: packages/core/src/core/queue/types.ts:70


QueueWorkerMetadata

Defined in: packages/core/src/core/queue/types.ts:93

Metadata for

Queue Worker

decorator

Properties

options
options: WorkerOptions;

Defined in: packages/core/src/core/queue/types.ts:95

queueName
queueName: string;

Defined in: packages/core/src/core/queue/types.ts:94


RedisConfig

Defined in: packages/core/src/core/queue/types.ts:6

Redis connection configuration

Properties

db?
optional db: number;

Defined in: packages/core/src/core/queue/types.ts:10

host
host: string;

Defined in: packages/core/src/core/queue/types.ts:7

password?
optional password: string;

Defined in: packages/core/src/core/queue/types.ts:9

port
port: number;

Defined in: packages/core/src/core/queue/types.ts:8


WorkerOptions

Defined in: packages/core/src/core/queue/types.ts:81

Worker configuration options

Properties

concurrency?
optional concurrency: number;

Defined in: packages/core/src/core/queue/types.ts:83

Number of concurrent jobs to process

lockDuration?
optional lockDuration: number;

Defined in: packages/core/src/core/queue/types.ts:85

Lock duration for a job (ms)

maxStalledCount?
optional maxStalledCount: number;

Defined in: packages/core/src/core/queue/types.ts:87

Maximum stalled count before failing

Type Aliases

QueueWorkerConstructor()

type QueueWorkerConstructor = (...args) => IQueueWorkerHandler;

Defined in: packages/core/src/core/queue/types.ts:126

Constructor type for queue worker classes

Parameters

args

...any[]

Returns

IQueueWorkerHandler

References

Job

Re-exports Job


QUEUE_WORKER_METADATA_KEY

Re-exports QUEUE_WORKER_METADATA_KEY


QueueClient

Re-exports QueueClient


QueueManager

Re-exports QueueManager


QueueWorker

Re-exports QueueWorker