module folktale/concurrency/task

A data structure that models asynchronous actions, supporting safe cancellation and automatic resource handling.

This feature is experimental!

This API is still experimental, so it may change or be removed in future versions. You should not rely on it for production applications.

Documentation

A data structure that models asynchronous actions, supporting safe cancellation and automatic resource handling.

Example:

const { task } = require('folktale/concurrency/task');

const delay = (ms) => task(
  (resolver) => {
    const timerId = setTimeout(() => resolver.resolve(ms), ms);
    resolver.cleanup(() => {
      clearTimeout(timerId);
    });
  }
);

// waits 100ms
const result = await delay(100).or(delay(2000)).run().promise();
$ASSERT(result == 100);

Why use Task?

Because JavaScript implementations are usually single-threaded, and there's no coroutine support, concurrent applications tend to use either callbacks (continuation-passing style) or Promise.

Callbacks aren't very composable. In order to combine callbacks, a user has to write code specific to each place that will use them. While you can make code written using callbacks maintainable, their low-level nature forces you to deal with a fair amount of detail that could be resolved by a library, including optimal concurrency:

const map = (list, fn, done) => {
  let result = [];
  let pending = list.length;
  let resolved = false;

  list.forEach((item, index) => {
    fn(item, (error, value) => {
      if (!resolved) {
        if (error) {
          resolved = true;
          done(error, null);
        } else {
          pending -= 1;
          result[index] = value;
          if (pending === 0) {
            done(null, result);
          }
        }
      }
    });
  });
};

map([1, 2], (x, c) => c(null, x + 1), (e, v) => {
  $ASSERT(e == null);
  $ASSERT(v == []);
});

map([1, 2], (x, c) => c(x), (e, v) => {
  $ASSERT(e == 1);
  $ASSERT(v == null);
});

Because functions using callbacks don't return a value to the caller, they're not composable. They are also, of course, not usable with JavaScript control-flow constructs either. So it's not possible to write something like:

if (someAsyncPredicate(...)) {
  ...
}

Instead of returning a value, someAsyncPredicate passes the result of its computation to another function (the callback). Because of this, there's no value for the if statement to work with.

Promises alleviate this a bit. Promises are first-class values, so regular synchronous functions may invoke functions yielding promises and get a value back. In some cases, that's not going to be the right value, but with async/await you get a lot of the compositionality back, as you can mix promises and regular synchronous constructs freely in special (async) functions.

Promises, however, do not support cancellations. Since they represent values, not computations, a Promise by itself has no concept of "what to cancel". It only waits for an external process to provide a value to it. In JavaScript, promises also suffer from not being able to nest. This is not a problem for most common cases, but it makes writing some data structures much less convenient and more error-prone.

Task, on the other hand, works at the computation level, so it knows which resources a computation has allocated to do the work, and can safely collect those resources automatically when the computation is cancelled. Very similar to how killing a thread or process allows you to clean things up. Because Tasks abstract computations, and not values, things that aren't possible with Promises, like running operations sequentially, is supported natively by the Task API.

Constructing tasks

The task function is how Tasks are generally created. It takes a computation (a function that will perform all of the work) and provides that computation means of describing its result, cleaning up its allocated resources, and reacting to external cancellations.

A task that simply resolves after a certain amount of time would look like this:

const { task } = require('folktale/concurrency/task');

const delay = (time) => task(
  (resolver) => {
    const timerId = setTimeout(() => resolver.resolve(time), time);

    resolver.cleanup(() => {
      clearTimeout(timerId);
    });

    resolver.onCancelled(() => {
      /* does nothing */
    });
  }
);

const result = await delay(100).run().promise();
$ASSERT(result == 100);

Here the computation takes a resolver argument, which contains methods to change the state of the task execution. resolver.resolve(value) signals that the execution succeeded, and provides a return value for it. resolver.reject(reason) signals that the execution failed, and provides the reason of its failure. resolver.cancel() cancels the exection of the task.

NOTE
While .cancel() will cancel the execution of the Task, the processes started by the task computation will not be automatically stopped. The task computation must stop those itself, as we'll see later in the section about cancelling tasks.

The cleanup function takes a callback to be invoked unconditionally once the Task finishes its execution or is cancelled. This gives the computation a chance of freeing the resources it has allocated while it was running. Resource handling with asynchronous exceptions and cancellations is difficult. While Task does help ensuring that the composition of asynchronous tasks will respect the proper resource lifecycles, it's limited to cases where a Task allocates a particular resource, and frees it at the end of its execution. Shared resources across different tasks are a bigger problem that this library does not try to solve.

The onCancelled function takes a callback that'll be invoked when the task's execution is cancelled. The resolver also has an isCancelled boolean field that can be queried at any time to determine whether the task has been cancelled or not at that point in time.

Sometimes Task functions expect a Task as input or result value, but you already have the value that should be computed. While you can always resolve a Task synchronously, like so:

const one = task(resolver => resolver.resolve(1));

It's practical to use the of() and rejected() methods instead. The first creates a task that resolves successfuly with a value, whereas rejected() creates a task that resolves with a failure:

const { of, rejected } = require('folktale/concurrency/task');

const one_ = of(1);
const two_ = rejected(2);

Running tasks

Creating a Task does not start any computation, it only provides a description for how to do something. In a sense, they are similar to a function definition. In order to execute the operations a Task defines, one must run it:

const { task } = require('folktale/concurrency/task');

const hello = task(resolver => resolver.resolve('hello'));

const helloExecution = hello.run();

Running a Task with the .run() method returns a TaskExecution object. This object allows one to cancel the execution of the task, or query its eventual value either as JavaScript's Promise, or a Folktale's Future:

const value = await helloExecution.promise();
$ASSERT(value === 'hello');

helloExecution.future().map(value => {
  $ASSERT(value === 'hello');
});

NOTE
While Promises let you use JavaScript's async/await feature, it does not support nested promises, and cancellations are handled as rejections. Future is a simpler structure, which models all three states of a Task's eventual value, but does not support async/await.

TaskExecution also allows one to react to the result of running a task with the listen() method. This is useful for handling cancellations or rejections at the top level, where one doesn't need to combine the task with anything else:

helloExecution.listen({
  onCancelled: () => 'task was cancelled',
  onRejected:  (reason) => 'task was rejected',
  onResolved:  (value) => $ASSERT(value == 'hello')
});

Combining tasks concurrently

Task's primary goal is helping with concurrency, or the ordering of independent processes within an application. There are three primary categories of operations for this in Folktale:

  • Sequencing: when process A depends on process B, and thus must only be executed after A is done.
  • Choosing non-deterministically: A and B are independent processes that provide a similar answer. The program chooses the first process that finishes.
  • Waiting related processes: A and B are independent processes, but C depends on both, and thus must only be executed after A and B are done.

Sequencing tasks

One task models and independent process that eventually computes a value. Sometimes one task depends on the result of another task, and as thus may only run if that task resolves successfully. In order to sequence tasks we use the .chain() method:

const { task, of } = require('folktale/concurrency/task');

const concat = (a, b) => task(resolver => resolver.resolve(a + b));

const taskA = of('hello');
const taskB = of('world');

const theTask = taskA.chain(x => taskB.chain(y => concat(x, y)));

const result = await theTask.run().promise();
$ASSERT(result == 'helloworld');

In this case, taskB only starts after taskA finishes executing successfully, and concat only starts after both taskA and taskB finish executing. It makes sense for concat to wait on both taskA and taskB, as it needs the two tasks to finish successfully before it can be executed, but there's no reason for taskA and taskB to wait for each other.

Choosing the first of N tasks

Suppose you send a request to a server, but if you don't get a response in a couple of seconds the program should just give up. This scenario can be modelled as two independent processes: a request to a server, and a timer that fires after a couple of seconds. The program should pick whichever process resolves first. With Folktale's Task, this is done with the .or() method.

The .or() method combines two tasks such that the resulting task assimilates the result of the first one to resolve:

const { task } = require('folktale/concurrency/task');

const delay = (ms) => task(
  resolver => {
    const timerId = setTimeout(() => resolver.resolve(ms), ms);
    resolver.cleanup(() => {
      clearTimeout(timerId);
    });
  }
);

const timeout = (ms) => task(
  resolver => {
    const timerId = setTimeout(() => resolver.reject(ms), ms);
    resolver.cleanup(() => {
      clearTimeout(timerId);
    })
  }
);

const result = await delay(20).or(timeout(300))
                 .run().promise();
$ASSERT(result == 20);

const result2 = await delay(200).or(timeout(100))
                  .run().promise().catch(e => `timeout ${e}`);
$ASSERT(result2 == 'timeout 100');

As a convenience for combining a large or unknown amount of tasks, the waitAny() function receives an array of Tasks to "or" together:

const { waitAny } = require('folktale/concurrency/task');

const result3 = await waitAny([
  delay(10),
  delay(20),
  delay(30)
]).run().promise(); // equivalent to `delay(10).or(delay(20).or(delay(30)))`
$ASSERT(result3 == 10);

Waiting many independent processes

If some computation depends on the results of more than one process you could use a nested sequence of .chain() calls to describe these dependencies, but that could be inefficient. If you don't care about the ordering of these processes, .chain() would impose an order on them. In essence, you wouldn't be getting any concurrency performance out of it.

Instead of sequencing unrelated tasks, you can combine them with the .and() operation. a.and(b) combines two tasks concurrently. That is, when you run this result, it'll start both a and b concurrently, and wait for their return without imposing any ordering on it. The result of the task will be a tuple containing the values of a and b:

const { task } = require('folktale/concurrency/task');

const delay = (ms) => task(
  resolver => {
    const timerId = setTimeout(() => resolver.resolve(ms), ms);
    resolver.cleanup(() => {
      clearTimeout(timerId);
    });
  }
);

// This takes 100ms
const result = await delay(60).chain(x => delay(40).map(y => [x, y])).run().promise();
$ASSERT(result == [60, 40]);

// This takes 60ms
const result2 = await delay(60).and(delay(40)).run().promise();
$ASSERT(result == [60, 40]);

Because the tasks are started concurrently, and no ordering is imposed on them, the entire computation takes as long as the slowest of its processes. If you were to use .chain() to combine them, it would take the sum of all processes' times.

As a convenience for combining a large or unknown amount of tasks, the waitAll() function receives an array of Tasks to "and" together. waitAll() returns a normalised array of the results instead of nested tuples:

const { waitAll } = require('folktale/concurrency/task');

const result3 = await delay(10).and(delay(20).and(delay(30))).run().promise();
$ASSERT(result3 == [10, [20, 30]]);

const result4 = await waitAll([
  delay(10),
  delay(20),
  delay(30)
]).run().promise();
$ASSERT(result4 == [10, 20, 30]);

Error handling

Sometimes processes will fail. You can recover from such failures using the .orElse() method. The method takes a function, passes to it the error value, if one happened, and expects it to return a new Task, whose state will be assimilated. In order to recover from the error you'd return a successful task, so computations that depend on it may proceed.

For example, this could be used to retry a particular computation:

const { task, of, rejected } = require('folktale/concurrency/task');

let errors = [];


const result = await rejected('nope').orElse(reason => {
  errors.push(reason);
  return of('yay');
}).run().promise();

$ASSERT(result == 'yay');
$ASSERT(errors == ['nope']);

.orElse() can also return rejected or cancelled tasks, and their state will be assimilated likewise:

errors = [];
const retry = (task, times) => {
  return task.orElse(reason => {
    errors.push(reason);
    if (times > 1) {
      return retry(task, times - 1)
    } else {
      return rejected('I give up');
    }
  });
};

let runs = 0;
const ohNoes = task(r => {
  runs += 1;
  r.reject('fail');
});

try {
  const result2 = await retry(ohNoes, 3).run().promise();
  throw 'never happens';
} catch (error) {
  $ASSERT(runs == 3);
  $ASSERT(errors == ['fail', 'fail', 'fail']);
  $ASSERT(error == 'I give up');
}

Properties

Combining tasks

waitAll(tasks)

Constructs a new task that waits all given tasks resolve. The result of the new task is an array with the results of the input tasks, if they all succeed, or the first failure if they fail.

Experimental
waitAny(tasks)

Constructs a new task that waits any of the given tasks resolve. The result of the first task to resolve is assimilated by the new task.

Experimental

Constructing

of(value)

Constructs a Task that resolves with a successful value.

Experimental
rejected(reason)

Constructs a Task that resolves with a rejected value.

Experimental
task(computation)

Constructs a Task and associates a computation to it. The computation is executed every time the Task is ran, and should provide the result of the task: a success or failure along with its value.

Experimental

Converting from function with Node-style callback

fromNodeback(aNodeback)

A convenience method for the folktale/conversions/nodeback-to-task module.

Experimental

Converting from other types

fromPromised(aPromiseFn)

Converts a Promise-yielding function to a Task-yielding function.

Experimental

Sequencing tasks

do: taskDo(generatorFn)

Allows using a direct style of programming (similar to async/await for Promises) to sequence Tasks. The function must return a Task.

Experimental

Types

_Task: Task(computation)

Tasks model asynchronous processes with automatic resource handling. They are generally constructed with the task function.

Experimental
_TaskExecution()

Represents the execution of a Task, with methods to cancel it, react to its results, and retrieve its eventual value. TaskExecution objects aren't created directly by users, but instead returned as a result from Task's run() method. b

Experimental

Source Code

Defined in source/concurrency/task/index.js at line 17, column 0
{
  of: Task.of,
  rejected: Task.rejected,
  task: require('./task'),
  waitAny: require('./wait-any'),
  waitAll: require('./wait-all'),
  do: require('./do'),
  _Task: Task,
  _TaskExecution: require('./_task-execution'),

  /*~
   * stability: experimental
   * type: |
   *    forall s, e:
   *      ((Any..., (e, s) => Void) => Void)
   *      => (Any...)
   *      => Task e s
   */
  fromNodeback(aNodeback) {
    return require('folktale/conversions/nodeback-to-task')(aNodeback);
  },

  /*~
   * stability: experimental
   * type: |
   *   forall e, v:
   *     ((Any...) => Promise v e) => (Any...) => Task e v
   */
  fromPromised(aPromiseFn) {
    return require('folktale/conversions/promised-to-task')(aPromiseFn);
  }
}
Stability
experimental
Licence
MIT
Module
folktale/concurrency/task
Authors
Copyright
(c) 2013-2017 Quildreen Motta, and CONTRIBUTORS
Authors
  • Quildreen Motta
Maintainers
  • Quildreen Motta <queen@robotlolita.me> (http://robotlolita.me/)