Now that I'm building on my own, I’ve thought about building as well, but I’ve found that off-the-shelf systems handle all of this far better (and they are opensourced too), ie trigger-dot-dev and many others.
It's not clear if they used a product or DIY solution. The nice thing many existing products offer is a web UI and a database.
There are two main differences between our system and the one in the post:
- In our scheduler, the actual cron (aka recurrence rule) is stored along with the task information. That is, you specify a period (like "every 5 minutes" or "every second Tuesday at 2am") and the task will run according that schedule. We try to support most of the RRule specification. [1] If you want a task to just run one time in the future, you can totally do that too, but that's not our most common use case internally.
- Our scheduler doesn't perform a wide variety of tasks. To maximize flexibility and system throughput, it does just one thing: when a schedule is "due", it puts a message onto a queue. (Internally we have two queueing systems it interops with -- an older one built on top of Redis, and a newer one built on PG + S3). Other team consume from those queues and do real work (sending emails, generating reports, etc). The queueing systems offer a number of delivery options (delayed messages, TTLs, retries, dead-letter queues) so the scheduling system doesn't have to handle it.
Ironically, because supporting a high throughput of scheduled jobs has been our biggest priority, visibility into individual task executions is a bit limited in our system today. For example, our API doesn't expose data about when a schedule last ran, but it's something on our longer term roadmap.
[1] https://icalendar.org/iCalendar-RFC-5545/3-8-5-3-recurrence-...
I do think the best solution for this is still RabbitMQ. It has the ability to push tasks in the queue and tell it to run at a very specific time called "Delayed Messages" and then it just processes them at that time.
It contained what people quickly need to reach for:
- schedule a job in UTC or local time zone for a particular place;
- schedule a job but only if another job ran beforehand;
- semaphore-like resource limits on jobs.
It did this with job generating resource tokens and other jobs stating a token as a condition for being scheduled.
It ended up being a not so nice system to debug to be honest, but worked fine.
For simple job, I’d reach for systemd timers on a single machine, a kubernetes cronjob on a given platform, or something external altogether otherwise (for geo-distributed scheduled jobs).
Unmeshed has all of these, managing holiday calendars etc and makes it super easy. It even has agents for AS400 server commands if that is still a thing you need.
For my part, I see this pattern repeatedly at different places. The raw tools in the platforms are too codey and the third-party frameworks like Temporal seem overkill, so you build a scheduler and need to solve the problems OP did: only run once, know if it errored, etc.
But it's amazing how "it's firing off a basic action!" becomes a script, then becomes a script composed of reusable actions that can pick up where they left off in case of errors ... Over time your "it's just enough for us!" feature creeps towards the framework's functionality.
I'd be curious to know how long the OP's solution stays simple before it submits to the feature creep demands. (Long may complexity be fought off, though! Every day you can live without the complexity of full workflows is a blessing)
- At processing start, update the schedule entry to 'executing', then open a new transansaction and lock it, while skipping already locked tasks (`SELECT FOR UPDATE ... SKIP LOCKED`).
- At the end of processing, set it to 'COMPLETED' and commit. This also releases the lock.
This has the following nice characteristics:
- You can have parallel processors polling tasks directly from the database without another queueing mechanism like SQS, and have no risk of them picking the same task.
- If you find an unlocked task in 'executing', you know the processor died for sure. No heuristic needed
Beanstalkd, cronicle, agenda, sidekiq, faktory, celery, etc. are the usual suspects.
What is often missing is HA of the controller service process.
To illustrate what I am looking for, I often end up using supervisord [0] (but I also like immortal [1]) for process control when not on a systemd enabled system. In my experience they are reliable, lightweight and a pleasure to work with.
I am looking for something similar for scheduled jobs.
- [0] https://supervisord.org/
- [1] https://immortal.run/
Why not set the publication_date when you create a post and have a function getPublishedPosts that fetches a list of posts, filtering out those with a publication_date earlier than the current date? With this approach, you don't need cron jobs at all.
In .NET world I use Hangfire for this. In Node (I assume what this is) I tinkered with Bull, but not sure what best in class is there.
And such a service provides reliability guarantees.
If I have to do a reliable periodic service, my go-to is a kubernetes cronjob, which is like a baby version of a cloud cronjob. I'd be reluctant to adopt some sort of task queue framework because of the complexity of the mental model plus the complexity of keeping one more thing running reliably. K8s is already running reliably, I might as well use that.
1. Select next job
2. Update status to executing where jobId = thatJob and status is pending
3. If previous affected 0 rows, you didn't get the job, go back to select next job
If you have "time to select" <<< "time to do" this works great. But if you have closer relationship you can see how this is mostly going to have contention and you shouldn't do it.
https://github.com/jhuckaby/Cronicle/blob/master/docs/Setup....
Designed to run in a container, but should equally well work on a single host. However, no option for "high availability" running, where multiple hosts coordinate.
There are loads of people over the years who have reached for cron instead of reaching for proper general-purpose dæmon management (SRC, SMF, daemontools, runit, daemontools-encore, perp, s6, ...). It is on Stack Exchange answers and in people's personal "How I did this" articles on WWW sites. (Although the idea goes back to the Usenet era.) It became one of those practices perpetuated because other people did it.
The next step is always discovering that cron's error handling and logging are aimed at an era when the system operator sat in the console room, and received "You have new mail" notifications at the console shell prompt.
And the step after that is (re-)discovering that the anacron approach does not fully cut the mustard. (-:
Systemd has been a game-changer for small-scale deployments.
Mainframe and minicomputer operating systems support scheduling in the operating system itself, as part of their process/thread scheduler; their native queuing systems are built on top of the primitives their scheduler offers, for proper accounting and maximum resource utilization (including prioritization).
Only UNIX would just provide a way to run processes at a specified time or interval and call the job done.
One challenge is to guarantee exactly-once processing across software upgrades. DBOS uses the cron-scheduled time as an idempotency key, and tags each workflow execution with a version. We also use the database transactions to guard against conflicting concurrent updates.
The reason we built it, despite the fact that developers could very well have deployed a CloudWatch EventBridge schedule + SQS + lambda or similar, is because they never did. They would consistently choose to build it into their existing services, which were rarely if ever handling things like limiting concurrency if a task took too long, emitting metrics on success/failure/duration, audit logging for when a task had to be manually triggered for some reason. If I had to guess, I think the reason was because it allowed them to piggyback on existing change controls and "just write application code" instead of having to think about additional pieces of infrastructure.
If I could do it again, I would probably have reached for something like Temporal, even though it seemed overkill for what we initially set out to do. It took about a week before people started asking for locking and retries.
And adds an external dependency for something very essential.
And it's worth it because now you have Temporal, which is the bees knees as far as I'm concerned. I will gladly sing praises of any tool that saves me getting paged, and Temporal has that in spades.
The danger is that it's so easy to start and it's decent for small and simple applications. Once your jobs start growing, both in number of contributors and in workload, the problems start. DSL is difficult to debug, plugins are buggy and the brittle master node will become your most precious pet that need constant supervising to not grind the whole system to a stop. By the time you realize this you have a hard time to get out of this lockin.
Why is this? My only memory of systemd was slightly better configurations for sequencing the start of processes that depended on the completion of earlier processes so I'm a bit rusty.
The deep integration into nixos made me feel the same. You sound like you could enjoy a bit nix too.
That is in fact batch (and atrun, although that's considered an implementation detail).
* https://pubs.opengroup.org/onlinepubs/9799919799/utilities/b...
Most implementations flesh out the "implementation-defined algorithms" stuff to be calculations based upon load averages, as on NetBSD.
* https://man.netbsd.org/batch.1
* https://man.netbsd.org/atrun.8
Or fairly primitive parallelism limits as on Illumos.
* https://illumos.org/man/1/batch
* https://illumos.org/man/5/queuedefs
Not quite JECL, is it? (-:
Over time, jobs start taking long enough to the point where you need to split them. Separate jobs are assigned slices of the original batch. Eventually, there are so many slices that you make a Jenkins job where the sole responsibility is firing off these individual jobs.
Then you start hitting the real painpoints in Jenkins. Poor allocation of jobs across your nodes/agents, often overloading CPU/Mem on machines, and you struggle to manage the ungodly interface that is the Jenkins REST endpoint. You install many Jenkins addons to try and address the scheduling problems, and end up with a team dedicated to managing this Jenkins infrastructure.
The scaling struggles continue to amass and you end up needing separate Jenkins instances to battle the load. Any attempt at replacing the Jenkins infrastructure goes on standstill, as the amount of random scripts found in Jenkinsfiles has created an insurmountable vendor lock-in.
You read a post about a select-for-update job scheduler and reflect on simpler times. You cry as you refactor your Jenkins Groovy DSL.
t2: update, set status=completed|error
these are two independent, very short transactions? or am i misunderstanding something here?
--
edit:
i think i'm not seeing what the 'transaction at start of processor' logic is; i'm thinking more of a polling logic
while true:
r := select for update
if r is None:
return
sleep a bit
this obviously has the drawback of knowing how long to sleep for; and tasks not getting "instantly" picked up, but eh, tradeoffs.Write your own scheduler.
Oracle is cheaper in the long run.
It’s always a mistake, but it’s easy in the moment and sticks around longer than I’d like.
Getting a weird 3rd party scheduling system with access to internal stuff approved is HARD in big corps.
So we (ab)use the CI system we have. It has scheduling and it already accesses internal resources.
Two (very, if indexed properly) short transactions at start and end are a good solution. One caveat is that the worker can die after t1, but before t2 - hence jobs need a timeout concept and should be idempotent for safe retrying.
This gets you "at least once" processing.
> this obviously has the drawback of knowing how long to sleep for; and tasks not getting "instantly" picked up, but eh, tradeoffs.
Right. I've had success with exponential backoff sleep. In a busy system, means sleeps remain either 0 or very short.
Another solution is Postgres LISTEN/NOTIFY: workers listen for events and PG wakes them up. On the happy path, this gets instant job pickup. This should be allowed to fail open and understood as a happy path optimization.
As delivery can fail, this gets you "at most once" processing (which is why this approach by itself it not enough to drive a persistent job queue).
A caveat with LISTEN/NOTIFY is that it doesn't scale due to locking [1].
[1]: https://www.recall.ai/blog/postgres-listen-notify-does-not-s...
> - If you find an unlocked task in 'executing', you know the processor died for sure. No heuristic needed
1. Nix has clear advantages for *deployment* (including end-user deployment) but really gets in the way for new *development*. Maybe flakes fix this? Maybe not though.
2. The "Nix on other Linux" install scripts were hostile in attacking startup scripts, rather than allowing opt-in isolation.
3. The Nix language (and library?) is not sane. Nobody actually understands it, only copy-pastes pieces of existing package scripts and hopes the changes work.Perhaps Nix is "Wonko the Sane" and it is in fact the rest of us who are in the asylum?
Nix, the language, is a little strange at first but really does make sense. Nixpkgs, the "standard library", is a little stranger and sometimes makes an odd default choice. The nice thing though is that using Nix you can coerce Nixpkgs into just about any shape that suits you.
https://github.com/awslabs/amazon-dynamodb-lock-client
> The AmazonDynamoDBLockClient is a general purpose distributed locking library built on top of DynamoDB. It supports both coarse-grained and fine-grained locking.
If a worker/processor dies abruptly, it will neither unlock nor set the state appropriately. It won't have the opportunity. Conceptually, this failure mode can always occur (think, power loss).
If such a disruption happened, yet you later find tasks unlocked, they must have been unlocked by another system. Perhaps Postgres itself, with a killer daemon to kill long-running transactions/locks. At which point we are back to square one: the job scheduling should be robust against this in the first place.
> using a table instead of LISTEN/NOTIFY
What do you mean? The job queue is backed by a PG table. You could optionally layer LISTEN/NOTIFY on top.
I've had success with a table with compound, even natural primary keys, yes. Think "(topic, user_id)". The idea is to allow for PARTITION BY should the physical tables become prohibitively large. The downsides of PARTITION BY don't apply for this use case, the upsides do (in theory - I've not actually executed on this bit!).
Per "topic", there's a set of workers which can run under different settings (e.g. number of workers to allow horizontal scaling - under k8s, this can be automatic via HorizontalPodAutoscaler and dispatching on queue depth!).
At Heartbeat, we have a lot of different tasks that need to run at a particular time. Users can create draft posts or events that get published at a certain time. Event reminders need to be sent at a certain number of hours before an event. Automated workflows can be set up that send emails or direct messages after a delay.
For the longest time, all of these tasks were managed by a variety of cron scripts. We had createScheduledPosts.ts that would run every 15 minutes, scan our table of scheduled posts and create any that needed to be published. sendEventReminders.ts would run every single minute, scan our table of events, and send any notifications that needed to be sent out. And so on.
Each of these cron jobs would need to be managed independently. Whenever a new feature was added that involved running tasks in the future, a new cron job would be created. If one of the scripts started erroring, I’d need to figure out why, fix it and then figure out a way to retroactively run the tasks that were missed while the script was broken. Sometimes, we’d get reports from customers that a certain task that was supposed to run did not. I’d painstakingly dig into the logs & code, trying to figure out why a particular event reminder did not get sent on time. The first couple times this happened, I’d usually discover that we lacked the logs to even properly diagnose the issue. All I would be able to do is add some more logs and hope that I’d find the problem the next time. Once the logs were in place, I’d uncover some bug caused by timezones, improper error handling or who knows what else.
Eventually, I came to my senses and realized that all of these various cron jobs were doing the same thing. And rather than have 10 different cron jobs each implementing their own half-baked version of a task scheduler, we should just have a robust, centralized system for scheduling tasks.
The way it works is we have a single database table called ScheduledTasks with the following schema:
enum ScheduledTaskStatus {
QUEUED
EXECUTING
COMPLETED
}
model ScheduledTask {
id String @id
communityID String
createdAt DateTime
lastStatusUpdate DateTime
timestamp DateTime
status ScheduledTaskStatus
expectedExecutionTimeInMinutes Int
expirationInMinutes Int?
priority Int
payload Json
message String?
@@index([status, timestamp])
}
payload is a discriminated union that contains each type of task we have. For example:
type ScheduledTaskPayload =
| {
type: "PUBLISH_EVENT";
eventID: EventID;
}
| {
type: "PUBLISH_SCHEDULED_POST";
scheduledPostID: ScheduledPostID;
}
| {
type: "SEND_EVENT_REMINDER";
eventID: EventID;
}
| {
type: "SEND_EMAIL";
email: string;
subject: string;
body: string;
};
Now, whenever we have a task that needs to be scheduled for the future, all we need to do is insert a new ScheduledTask into the database. We have a single cron job responsible for executing scheduled tasks that runs once every minute.
The cron job works as follows:
Completednow + 30 secondsnow is less than timestamp + expirationInMinutes or expirationInMinutes is null)Executing, now > timestamp + expectedExecutionTimeInMinutesExecuting in the databaseSeparately, we have an SQS consumer that listens for the SQS messages. The consumer reads the payload discriminated union and calls the corresponding function responsible for executing the task.
async function processTask(taskPayload: ScheduledTaskPayload) {
if (taskPayload.type === "PUBLISH_EVENT") {
await publishEvent(taskPayload.eventID);
} else if (taskPayload.type === "PUBLISH_SCHEDULED_POST") {
await publishScheduledPost(taskPayload.scheduledPostID);
}
//...
}
After the task runs, mark it as completed in the database. Some of our tasks will return a new scheduled task. If they do, insert the new scheduled task into the database. For example, after sending an event reminder for an instance of a recurring event, the next reminder is scheduled.
The system has retry logic built in. If for some reason the script does not run for some amount of time due to an outage or error, the scheduled tasks will still exist in the database. Once the script is running again, any tasks that were not executed when they were originally supposed to will be run. The expirationInMinutes enables us to control which tasks are run at a later time. Some tasks, such as event reminders, don’t make sense to be run after a certain point. Others, like publishing scheduled posts, fall into the “better late than never” bucket, in which case expirationInMinutes will be set to null. The expectedExecutionTimeInMinutes field lets us handle retry for tasks that get stuck in Executing. If a task that was scheduled for 10:00am is still marked as Executing at 10:01am, we probably don’t want to run the task again because the first run might still be in progress. However, by 10:08am, if the task is still stuck in Executing, it probably ran into an error and we can try running it again. expectedExecutionTimeInMinutes tells the system how long to wait until rerunning a task stuck in Executing.
To ensure tasks run at the right time, we need to make sure that whenever a change is made to an entity, the corresponding scheduled task is also updated. For example, when a user creates an event that starts at 3pm, we create a scheduled task for the reminder to be sent at 2pm. If the user later updates the event to be at 6pm, we need to update the same scheduled task to send the reminder at 5pm instead. We enable this by using consistent ids for editable tasks.
type ScheduledTaskPayload =
| {
type: "PUBLISH_EVENT";
eventID: EventID;
}
| {
type: "PUBLISH_SCHEDULED_POST";
scheduledPostID: ScheduledPostID;
}
| {
type: "SEND_EVENT_REMINDER";
eventID: EventID;
}
| {
type: "SEND_EMAIL";
email: string;
subject: string;
body: string;
};
function getTaskID(payload: ScheduledTaskPayload) {
if (payload.type === "PUBLISH_EVENT") {
return `task-${payload.type}-${payload.eventID}`;
} else if (payload.type === "PUBLISH_SCHEDULED_POST") {
return `task-${payload.type}-${payload.scheduledPostID}`;
} else if (payload.type === "SEND_EVENT_REMINDER") {
return `task-${payload.type}-${payload.eventID}`;
} else if (payload.type === "SEND_EMAIL") {
return generateUUID();
} else {
assertNever(payload);
}
}
Whenever we create a task, we use getTaskID to get the id for the scheduled task. And rather than just creating the task, we do an upsert. So when a user creates an event for the first time, the scheduled task does not exist, so it will be created. When edits are made to the event, the eventID remains the same, so the id for the corresponding scheduled task will be the same. As a result, the previous scheduled task will be updated rather than creating a new one. Other tasks, such as SEND_EMAIL, can be triggered from a variety of sources and are not directly editable by the user, so those tasks just use a uuid instead.
Overall, creating the scheduled tasks system has come with enormous benefits:
ScheduledTaskPayload and a new handler to the processTask function.Many of you probably already had the foresight to centralize your scheduled tasks into one place. But I haven’t seen too many people talking about this problem, so hopefully this was helpful for anyone that’s currently stuck maintaining a sea of scattered cron jobs.