Is it OK to use Collection#parallelStream(), or other potentially multi-threaded code, within Gradle plugin code?

Is it OK to use Collection#parallelStream(), or other potentially multi-threaded code, within Gradle plugin code?

Is it only OK in certain contexts / phases, but not in others?

Or is it only OK if I use some java.util.concurrent.Executor (or similar threading utility) provided by Gradle?

Have a look at the worker API, which allows you to perform work in parallel while honoring user settings like --max-workers. It also ensures that your code executes in a safe context where it can’t access any non-safe objects.

1 Like

@st_oehme

Thanks, I’ll look into that.

Could the Worker API be setup so that parallelStream() uses the Worker API thread pool?

I would imagine that using a ForkJoinPool inside the Worker API would accomplish that.

parallelStream() uses a ForkJoinPool if it is called from a thread in that pool; otherwise, it uses the common ForkJoinPool.

It seems that this is the only way to specify which thread pool is used by parallelStream() (but maybe I’m mistaken).

I don’t know if there are any negatives to using ForkJoinPool instead of another Executor, or how easy it would be to introduce ForkJoinPool to the Worker API.

I’m not sure we’ll want to bind ourselves to a certain implementation. If you want to do more parallel processing inside a work item, you can use the worker API again to schedule these additional items.

@st_oehme

Thanks.

A few questions about the Worker API:

The JavaDoc for WorkerExecutor and other docs I’ve seen for the Worker API seem slightly ambiguous to me. How exactly does it work?

  1. Injection
    Are injected WorkerExecutors all the same instance, or is a separate instance used for each injection?

    Either way, I assume there’s only one thread pool used by all injected WorkerExecutors.

    I also assume that separate injected instances (if they’re indeed used instead of a single shared instance) would only be used for segmenting the set of actions upon which await() waits.

    Are these assumptions correct?

  2. submit()
    Are submitted actions guaranteed to be dispatched to workers in submission order?

  3. await()
    Does await() prevent additional actions from being submitted, either to this WorkerExecutor, or to any WorkerExecutor backed by the same thread pool?

    If not, if subsequent actions are submitted, does await() only wait for the completion of actions submitted prior to the call to await(), or does it also wait for subsequently submitted actions to complete?

    Either way, if a single WorkerExecutor is shared across injections, then any task calling await() can be delayed by actions submitted by other tasks, which doesn’t make any sense to me.

  4. Returning results from submitted actions
    What is the best way to return results from submitted actions to the thread that submitted them?

    Given that:

    1. submitting code does not instantiate the submitted Runnable
    2. all the action params must implement Serializable
    3. there doesn’t seem to be any methods for retrieving results via the Worker API (and how could there be any, given that Runnable#run() has no return value)

    the best solution I’ve devised involves (this is theoretical, as I haven’t yet tried it):

    1. using IsolationMode.NONE

    2. if all the submitted Runnables are guaranteed to be instantiated in the submitting thread, then I could store a results container in a static ThreadLocal, get the results container for the current thread in the constructor of my Runnable, and synchronize writing results to the result container in my Runnable#run()

    3. if the submitted Runnables are not guaranteed to be instantiated in the submitting thread, then I could use some other static map with values that are results containers, and with keys that are unique tokens per thread (or per action submitter)

The executor is global, but it knows from where you are scheduling items, so when you call await it will wait for the children of the operation that is calling that method. I.e. different tasks using the executor will only wait on their own work items.

Scheduling currently works in submission order, but there is no guarantee for that. Work items are meant to be isolated, so there should be no implicit temporal dependency between them.

There is no way to return results. If you have a good use case for that, please open a feature request on Github. Static state would be against the purpose of the worker API, which is all about safety.

@st_oehme
Thanks for the info.

I experimented and found that the submitted Runnables are instantiated in threads other than the submitting thread, so I’ll try a token-based approach.

My current use case is for parsing module-info.java files. If a task has more than one in its inputs, there’s no need to parse them sequentially, so I submit actions to a WorkerExecutor (if there’s only one module-info.java, however, I parse it in the original task thread). The main task thread needs certain info that is parsed from the files, hence I need to get results from the workers to the submitter.

Using Thread.currentThread().getId() as the Map key worked where ThreadLocal didn’t, so I’ve managed to get Workers to return data to the submitting thread.

If you want to natively support returning results from a worker to the submitting thread, I would add something like the following method to WorkerExecutor (or to a new sub-interface, if you need to preserve backwards compatibility for existing third-party implementations of the interface):

<R extends Serializable> void submit(Class<? extends Callable<R>> actionClass, Consumer<? super R> callback, Action<? super WorkerConfiguration> configAction)

Obviously, the Worker API would take the return value from running the instantiated Callable, and pass it as the argument to callback.accept().

To prevent threading issues, only one callback can run at a time. If a worker finishes while another callback is running, its callback is queued.