How to write a background job in Elixir

The easy, built-in way to run a background job

Because Elixir runs on the Beam, concurrency is one of the strong selling points of the language. A background job is as easy as starting a Task. The Task will run in a separate process and our code can coninue on without worrying about how long the task will take.

defmodule MyApp.Users do
  def create_user(params) do
    {:ok, user} = MyApp.Repo.insert(User.changeset(params))

    # launch a background job to create and send a welcome email
    Task.start(fn ->
      send_welcome_email(user)
    end)

    # return the created user
    {:ok, user}
  end

  def send_welcome_email(user) do
    # … send welcome email
  end
end

While super easy to do, this leaves some questions. What if sending the email fails? Let’s add some resiliency by adding our job to a Task supervisor with restarts if it should fail.

defmodule MyApp.Application do
  use Application

  @impl true
  def start(_type, _args) do
    children = [
      {Task.Supervisor, name: MyApp.JobSupervisor}
    ]

    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

defmodule MyApp.Users do
  require Logger

  def create_user(params) do
    {:ok, user} = MyApp.Repo.insert(User.changeset(params))

    # launch a supervised background job to create and send a welcome email
    # The job will continue to run even if our process completes (i.e. web request ends)
    Task.Supervisor.start_child(
      MyApp.JobSupervisor,
      fn ->
        try do
          # Log the start of the job
          Logger.info("Sending welcome email for #{user.id}")
          :ok = send_welcome_email(user)
          Logger.info("Sending welcome email for #{user.id} complete")
        rescue
          error ->
            # Log the job failure for later investigation
            Logger.error(
              "Failed to send welcome email for #{user.id} - #{inspect(error)}, #{inspect(__STACKTRACE__)}"
            )

            reraise error, __STACKTRACE__
        end
      end,
      # restart: :transient means the job will be restarted if it fails
      restart: :transient
    )

    # return the created user
    {:ok, user}
  end

  def send_welcome_email(user) do
    # … send welcome email
  end
end

Schedule a task to run from Cron

Because Elixir runs on the Beam which is a virtual machine that manages processes, it’s not like many other languages where you can just launch a script in it’s own process. Even launching a Mix task will launch a copy of the Beam runtime, start your app and it’s dependencies so that’s not the best choice. If you’re using releases, then there is a way to run a function on a running app by calling into the app from the command line.

First, lets’s start with a basic job module. Our Job module exposes an enqueue/0 function to add it to the supervisor which then calls it with the run/0 function.

defmodule MyApp.Job do
  def enqueue() do
    Task.Supervisor.start_child(MyApp.JobSupervisor, __MODULE__, :run, [])
  end

  def run() do
    IO.puts("Job started")
    Process.sleep(5000)
    IO.puts("Job complete")
  end
end

A release provides a command line entry point into your application. After it has started you can use bin/myapp rpc COMMAND to run a command on the running system. With this we can launch our job from cron as such:

* * * * * /full/path/to/myapp/bin/myapp rpc 'MyApp.Job.enqueue()'

If you launch your app with bin/myapp start then you should see “Job started” followed by “Job complete” 5 seconds later, every minute as the cron job runs.

Enterprise level background jobs

While the code above is ver easy to setup it is lacking a lot of key features for a reliable background system.

Oban is a great library for robust job processing with enterprise grace features (as it’s description describes it).

Oban is built on top of PostgreSQL which is likely already part of your project. It uses PostgreSQL to store jobs, handle multi-node access without erlang clustering, and ensure uniqueness of jobs when you need it.

A job is a module that implements the Oban.Worker behaviour and must expose a perform/1 function.
We can rewrite the simple job from above as:

defmodule MyApp.Job do
  use Oban.Worker

  # This is a convenience function so we can still call MyApp.Job.enqueue()
  def enqueue(args \\ %{}, opts \\ []) do
    Oban.insert(MyApp.Job.new(args, opts))
  end

  @impl Worker
  def perform(%{args: _args} = _job) do
    IO.puts("Job started")
    Process.sleep(5000)
    IO.puts("Job complete")
  end
end

Job Scheduling

Oban provides a number of job scheduling options

You can schedule a job to run in the future by adding the option [schedule_in: 5] (start in 5 seconds time)
You can schdule a job to run at a specific time by adding the option [schedule_at: DateTime.add(DateTime.now!("Etc/UTC"), 1, :hour)] (start in an hours time)
You can also use the Cron plugin to setup period jobs.

config :my_app, Oban,
  repo: MyApp.Repo,
  plugins: [
    {Oban.Plugins.Cron,
     crontab: [
       {"* * * * *", MyApp.Job, args: %{}},
     ]}
  ]

Off course, with releases you can still use an RPC call to run jobs via the system crontab.

Job Queuing and priorities

Oban has the ability to run different job queues and to have different priorities within each queue. This gives you a lot of flexibility over when a job might run. You can have long running jobs on one queue, and important jobs running on a different queue. You can then prioritise jobs within each queue up to four levels. By default everything is queued at the highest priority and you can set jobs to run at lower priorities when they are less important.

# Insert the job on the `:long_running` queue with the lowest priority of 3 (0 is highest)
Oban.insert(MyApp.Job.new(%{}, queue: :long_running, priority: 3))

Retrying, back-off strategies, and timeouts

When a job fails you want it to retry but it’s useful to have a backoff strategy that spaces out the retries over longer and longer periods. Oban has this built in with a default linear backoff strategy up to the max_attempts option (default 20 retries).
This can be overridden using the backoff/1 callback (see more at Customizing Backoff in the docs)

By default jobs are allowed to execute indefinitly but you can set a timeout in a similar way, through the timeout/1 callback.

Here is our code modified with a custom backoff and timeout strategy.

defmodule MyApp.Job do
  # Limit the job to only 3 retries
  use Oban.Worker, max_attempts: 3

  # This is a convenience function so we can still call MyApp.Job.enqueue()
  def enqueue(args \\ %{}, opts \\ []) do
    Oban.insert(MyApp.Job.new(args, opts))
  end

  @impl Worker
  def perform(%{args: _args} = _job) do
    IO.puts("Job started")
    Process.sleep(5000)
    IO.puts("Job complete")
  end

  @impl Worker
  def backoff(%{attempt: _attempt}) do
    # wait a fixed thirty seconds between each attempt
    30
  end

  @impl Worker
  def timeout(%{attempt: attempt}) do
    # Allow the job 5 additional seconds on each attempt
    attempt * 5000
  end
end

Logging and notification on failure

At Sitesure we want to ensure that everything runs correctly at all times so this article wouldn’t be complete without a way to report your job execution to a monitor.
Oban uses Telemetry to instrument everything that happens within the system. We can leverage that to report on job success and job failure by writing a handler that attaches to :oban events.

This uses the simple Req library for calling out to the Sitesure monitor URL.

defmodule MyApp.ObanNotifier do
  def handle_event([:oban, :job, :stop], _measure, m(job, state) = _metadata, _) do
    notify_url = Map.get(job.meta, "notify_url")

    if notify_url do
      Req.get!(notify_url, params: %{s: if(state == :success, do: 0, else: 1)})
    end
  end

  def handle_event([:oban, :job, :exception], _measure, m(job, _state) = _metadata, _) do
    notify_url = Map.get(job.meta, "notify_url")

    if notify_url do
      Req.get!(notify_url, params: %{s: -1})
    end
  end
end

The notifier handler can be attached in your application.ex

defmodule MyApp.Application do
  use Application

  @impl true
  def start(_type, _args) do
    children = [
      {Task.Supervisor, name: MyApp.JobSupervisor}
    ]

    # attach to the oban job stop and exception events
    events = [[:oban, :job, :stop], [:oban, :job, :exception]]
    :telemetry.attach_many("oban-notifier", events, &MyApp.ObanNotifier.handle_event/4, [])

    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

And finally add the notify_url to the meta option of each job you want to monitor.

MyApp.Job.enqueue(%{}, meta: %{notify_url: "https://notify.do/my-token"})