The Slate Jobs component is a background job/persistant task queue system for Kotlin and is similar to Ruby SideKiq, NodeJS Bull. It leverages Kotlin Coroutines and Channels for concurrency based operations to gracefully start, stop, pause, resume jobs.
slatekit new job -name="SampleJob" -package="mycompany.apps"
A high-level diagram of the concepts in this component
Goal | Description |
1. Job types | Different types of jobs OneTime, Paged, Queued job types. |
2. Diagnostics | Multiple types of diagnostics and tracking of jobs statistics available |
3. Policies | Policies as middleware and hooks to restrict, customize, or enhance jobs |
This component is currently stable and there is a project generator for it
Feature | Status | Description |
**Periodic** | Upcoming | Support for periodic and scheduled jobs |
**Flow** | Upcoming | Integration with Kotlin Flows |
repositories {
// other repositories
maven { url "http://dl.bintray.com/codehelixinc/slatekit" }
}
dependencies {
// other dependencies ...
compile 'com.slatekit:slatekit-jobs:1.0.0'
}
Jar | slatekit.jobs.jar |
Package | slatekit.jobs |
Sources | slatekit-jobs |
Example | Example_Jobs.kt |
Requires | See build.gradle for more info. |
This is a simple example of a Queued worker ( which takes it work from a Task which a unit-of-work stored in-memory or in persistent queue ).
suspend fun sendNewsLetterFromQueue(task: Task):WorkResult {
// Get data out of the task
val userId = task.data.toInt()
// Simulate getting the user and sending the newsletter
val user = getUser(userId)
send(task.job, "Latest News Letter!", user)
// Acknowledge the task or abandon via task.fail()
task.done()
// Indicate that this can now handle more
return WorkResult(WorkState.More)
}
Component | Description |
Jobs | A registry of all the jobs being managed. |
Job | The main component managing work between workers, queues, tasks, and policies. |
Coordinator | Responsible for coordinating requests on Workers ( by default using Channels ) |
Workers | A collection of 1 or more workers ( linked to job ) |
Worker | Performs the actual work (on a possible Task) |
WorkState | Represents the state of a single iteration of work done by a worker |
Task | A single work item that can be performed by a worker |
Queue | Represents a queue of Tasks either in-memory or persisted ( AWS SQS ) |
Action | The Action(s) that can be performed on a Job/Worker: Start, Stop, Pause, Resume, Control, Process, Delay |
Priority | The priority level of a Queue |
Policy | A form of middleware to inject into a Job to adjust behaviour |
Status | The status ( idle, running, paused, stopped, completed ) of a worker or job. |
Name | Description | More |
1. Setup | How to setup a Worker | more |
2. Register | How to set up a job and register it with Jobs system | more |
3. Usage | How to start, stop, pause resume jobs and check status. | more |
4. Cycle | The life-cycle of a worker | more |
5. Types | OneTime, Paged, Queued Job types | more |
6. Events | Subscribe to job events | more |
7. Workers | Access to the jobs workers | more |
8. Stats | Capturing job and worker statistics | more |
9. Load | Performance and load | more |
Every job must be set up with an Identity. This identity is transfered down to all workers ( given a unique uuid per worker ) and is used to uniquely identity jobs and workers.
// slatekit.common.Identity is used to identify the job.
val id = SimpleIdentity("demo", "newsletter", Agent.Job, "dev")
// id.id : {AREA}.{SERVICE}.{AGENT}.{ENV}.{INSTANCE}
// id.id : demo.newsletter.job.dev.09cd7588-24cb-425b-8cec-2494c4460387
// id.name : demo.newsletter
// id.full : demo.newsletter.job.dev
// id.area : demo
// id.svc : newsletter
// id.agent: Job
// id.env : dev
// id.inst : 09cd7588-24cb-425b-8cec-2494c4460387
// id.tags : []
You can set up workers in various ways using either simple functions ( which get wrapped inside a Worker) or extending from the Worker class itself and implementing the work method. For full example, refer to Example_Jobs.kt
You can declare a simple 0 parameter function to perform some work
suspend fun sendNewsLetter():WorkResult {
// Perform the work ...
return WorkResult(WorkState.Done)
}
You can declare a simple 1 parameter that takes in a Task ( representing a unit of work ). This assumes you are sourcing the work from a Queue.
suspend fun sendNewsLetter(task: Task):WorkResult {
println(task.id) // abc123
println(task.from) // queue://notification
println(task.job) // job1
println(task.name) // users.sendNewsletter
println(task.data) // { ... } json payload
println(task.xid) // 12345 correlation id
// Perform the work ...
return WorkResult(WorkState.Done)
}
Finally, you can extend from the Worker class. This is the most comprehensive option as it gives you the ability to customize various life-cycel events ( see later section ). The life-cycle events go from init -> work -> done.
// slatekit.common.Identity is used to identify the job.
val id = SimpleIdentity("samples", "newsletter", Agent.Job, "dev")
// Worker
class NewsLetterWorker : Worker<String>(id) {
// Initialization hook ( for setup / logs / alerts )
override suspend fun init() {
notify("initializing", listOf(("id" to this.id.name)))
}
// Implement your work here.
// NOTE: If you are not using a queue, this task will be empty e.g. Task.empty
override suspend fun work(task:Task): WorkResult {
// Perform the work ...
return WorkResult(WorkState.Done)
}
// Transition hook for when the status is changed ( e.g. from Status.Running -> Status.Paused )
override suspend fun move(state: Status) {
notify("move", listOf("status" to state.name))
}
// Completion hook ( for logic / logs / alerts )
override suspend fun done() {
notify("done", listOf(("id" to this.id.name)))
}
// Failure hook ( for logic / logs / alerts )
override suspend fun fail(err:Throwable?) {
notify("failure", listOf(("id" to this.id.name), ("err" to (err?.message ?: ""))))
}
// Initialization hook ( for setup / logs / alerts )
override fun notify(desc: String?, extra: List<Pair<String, String>>?) {
val detail = extra?.joinToString(",") { it.first + "=" + it.second }
// Simulate notification to email/alerts/etc
println(desc + detail)
}
}
Once you have a work function, you can register it as a new Job.
// Identity
val id = SimpleIdentity("samples", "newsletter", Agent.Job, "dev")
// Queues
val queue1 = Queue("queue1", Priority.Mid, QueueSourceInMemory.stringQueue(5))
val queue2 = Queue("queue2", Priority.Mid, QueueSourceInMemory.stringQueue(5))
// Registry
// NOTE: Create an identity for each job using the id template above.
val jobs = Jobs(
listOf(queue1, queue2),
listOf(
slatekit.jobs.Job(id.copy(service = "job1"), ::sendNewsLetter),
slatekit.jobs.Job(id.copy(service = "job2"), listOf(::sendNewsLetter, ::sendNewsLetterWithPaging)),
slatekit.jobs.Job(id.copy(service = "job3"), listOf(::sendNewsLetterWithPaging)),
slatekit.jobs.Job(id.copy(service = "job4"), listOf(::sendNewsLetterWithPaging)),
slatekit.jobs.Job(id.copy(service = "job5"), listOf(::sendNewsLetterFromQueue), queue1),
slatekit.jobs.Job(id.copy(service = "job6"), listOf(NewsLetterWorker()), queue2)
)
)
// Run job named "samples.job1"
jobs.run("samples.job1")
There are various operations on the job from checking the status, to management actions such as starting, stopping, etc.
Jobs that are paged or queued can be gracefully started, stopped, paused, resumed. This is accomplished by sending Job Requests to the Job. These requests impact the status of the job and all its workers. These are methods available on the Job component itself.
Name | Status | Purpose |
start | Starting | Starts the job and its associated workers |
stop | Stopped | Stops the job and its associated workers |
pause | Paused | Pauses the job and its associated workers |
resume | Running | Resumes the jobs and its associated workers |
process | Running | Issues a single process / work request |
// Assume jobs are registered in the Jobs component.
// ....
// See registration section
// Get a job by its name
val job = jobs.get("samples.job1")
// Perform operations
job?.let { job ->
// NOTES:
// 1. This does not immediately perform the action.
// 2. A Job request is sent to the Job Channel
// 3. A Job then delegates the respective action to its workers
job.start()
job.stop()
job.pause()
job.resume()
job.process()
}
You can check the status of the job. The Status range are available in Status
// Get a job by its name
val job = jobs.get("samples.job1")
// Check status
job?.let { job ->
// Get current status
val status = job.status()
// Check status
job.isIdle()
job.isRunning()
job.isPaused()
job.isStopped()
job.isComplete()
job.isFailed()
job.isStoppedOrPaused()
// Check for specific status
job.isState(Status.Running)
""
}
Workers have several life-cycle event methods. They are called in order and the status of the worker changes after the event.
Name | Status | Purpose |
init | Starting | initialization hook for the worker |
work | Running | initialization hook for the worker |
fail | Failed | Called in the event of an error/exception/failure |
done | Complete | Called when the work method has issues Work.Done |
There are different types of jobs based on whether they run all their work in 1 go or perform work in batches/pages. This is (currently) not indicated explicitly at the function/worker level, but is based on what the function/work methods returns for the WorkState.
A one time worker simply performs the long-running operation and returns a WorkResult indicating it’s done.
suspend fun sendNewsLetter():WorkResult {
// Process the work
// ...
return WorkResult(WorkState.Done)
}
A paged worker runs performs work in batches or pages. After performing a batch of work, this paged worker must return a WorkState.Next.
// This keeps track of the page/offset
val offset = AtomicInteger(0)
val batchSize = 4
suspend fun sendNewsLetterWithPaging():WorkResult {
// Process the work
// ...
// Increment the batch/page offset for next time
offset.addAndGet(batchSize)
// Indicate to the system this is paged and there is more to do.
return WorkResult.next(
offset = offset.get() + batchSize.toLong(),
processed = batchSize
reference = "newsletters"
)
}
A queued worker runs work via Tasks from a Queue.
suspend fun sendNewsLetterFromQueue(task: Task):WorkResult {
// Get data out of the task
val userId = task.data.toInt()
// Simulate getting the user and sending the newsletter
val user = getUser(userId)
send(task.job, "Latest News Letter!", user)
// Acknowledge the task or abandon via task.fail()
task.done()
// Indicate that this can now handle more
return WorkResult(WorkState.More)
}
You can subscribe to various job events such as any Status changes or for a specific Status change.
// Subscribe to changes in status
job.subscribe { println("Job: ${it.id} status changed to ${it.status().name}") }
// Subscribe to change to Stopped status
job.subscribe(Status.Stopped) { println("Job: ${it.id} stopped!") }
You can set up custom policies which essentially represent middleware functions to execute at various times and/or conditions of workers and jobs. Policies are available in the slatekit-functions project ( not yet documented).
// There are 3 policies setup for this job:
// 1. Every: for every 10 items processed, call the function supplied
// 2. Limit: limit number of items to process to 12. ( useful for "waves")
// 3. Ratio: Ensure the job stops if the error (failed) job ratio hits 10%
val job = slatekit.jobs.Job(id.copy(service = "job7"), listOf(::sendNewsLetterWithPaging),
policies = listOf(
Every(10, { req, res -> println("Paged : " + req.task.id + ":" + res.msg) }),
Limit(12, true, { req -> req.context.stats.counts }),
Ratio(.1, slatekit.results.Status.Errored(0, ""), { req -> req.context.stats.counts })
)
)
A job internally holds a collection of workers via Workers. Each worker has an identity (based off the job’s identity ) in order to uniquely identify it. You can get the ids of all the workers and get the worker context which holds various components like polices ( middleware ) and diagnostics/stats.
// Get the workers collection ( Workers )
val workers = job.workers
// Get all worker ids ( List<Identity> )
val workerIds = workers.getIds()
// Get the context for a specific worker ( WorkContext )
val workerContext = job.workers.get(workerIds.first())
// The worker performing work on tasks
workerContext?.let { ctx ->
// Identity of the worker parent ( Job.id )
println(ctx.id)
// Worker itself
ctx.worker
// Worker middleware applied
ctx.policies
// Worker statistics
ctx.stats
}
Statistics are recorded at the worker level. There is ( currently ) no way aggregate statistics across all workers. But this can be done and/or integrated with a 3rd-party metrics library like Micrometer.
// Get the workers collection ( Workers )
val workers = job.workers
// Get all worker ids ( List<Identity> )
val workerIds = workers.getIds()
// Get the context for a specific worker ( WorkContext )
val workerContext = job.workers.get(workerIds.first())
// The worker performing work on tasks
workerContext?.let { ctx ->
// Worker statistics
val stats = ctx.stats
// Worker statistics
// Calls: Simple counters to count calls to a worker
println("calls.totalRuns: " + ctx.stats.calls.totalRuns())
println("calls.totalPassed: " + ctx.stats.calls.totalPassed())
println("calls.totalFailed: " + ctx.stats.calls.totalFailed())
// Counts: Counts the result success/failure categories
// See slatekit.results.Status.kt for more info
println("counts.totalProcessed : " + ctx.stats.counts.totalProcessed())
println("counts.totalSucceeded : " + ctx.stats.counts.totalSucceeded())
println("counts.totalDenied : " + ctx.stats.counts.totalDenied())
println("counts.totalInvalid : " + ctx.stats.counts.totalInvalid())
println("counts.totalIgnored : " + ctx.stats.counts.totalIgnored())
println("counts.totalErrored : " + ctx.stats.counts.totalErrored())
println("counts.totalUnexpected: " + ctx.stats.counts.totalUnexpected())
// Lasts: Stores the last request/result of an call to work
println("lasts.totalProcessed : " + ctx.stats.lasts?.lastProcessed())
println("lasts.totalSucceeded : " + ctx.stats.lasts?.lastSuccess())
println("lasts.totalDenied : " + ctx.stats.lasts?.lastDenied())
println("lasts.totalInvalid : " + ctx.stats.lasts?.lastInvalid())
println("lasts.totalIgnored : " + ctx.stats.lasts?.lastIgnored())
println("lasts.totalErrored : " + ctx.stats.lasts?.lastErrored())
println("lasts.totalUnexpected: " + ctx.stats.lasts?.lastUnexpected())
// You can also hook up loggers and events
ctx.stats.logger
ctx.stats.events
}
Performance and load docs coming soon