Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

The Ingester Service process forwards items and their associated the configured pipelet-configuration to the Plumber Service where Pipelets get executed.

  • pipelet-configuration : The Pipelet to be run and its configuration

    • Pipelet Configuration is stored within the pipeline_workflow

...

and waits for it’s response

Bottleneck Plumber

  • Priorly, per default, one plumber worker process gets spawned

  • That may lead to inefficient pipeline processing because of one slow step

    • Ingester process may fail with a timeout if the plumber doesn’t manage to respond in time (TimeoutError)

    • That happens usually for batches (default N=1000) that contain mostly large PDFs combined with a Pipelet-Step that performs computational heavy CPU-bound tasks (like the NLP-Tagger)

Configuration to Increase Throughput

Ingester Service

  • The Ingester service can spawn multiple worker processes Each Ingester process splits a batch into N minibatches to allow parallelisation and increase throughputto parallelise the processing of batched steps like pipelet, language-detection, ml-workflow, etc.

  • One Ingester worker process consumes one batch and splits it into √len(batch_items) mini-batches to allow further parallelisation and increase throughput (since Release 3.3.4).

    • Those mini-batches are handled and sent concurrently to the Plumber Service, using a ThreadPool maintaining step_plumber_mini_batch_threads threads.

Code Block
$ /etc/squirro/ingester.ini
[ingester]
processors = 2

[pipeline] 

step_plumber_mini_batch_threads = 2

Note

  • ingester.ini has a related configuration processor.workers

    • That setting is used only for pipeline-steps that get executed in parallel using a thread-pool (like webshot-step).

Plumber Service

  • With the example configuration above, the Plumber Service should spawn 4 workers to have always enough resources ready to handle incoming mini-batches served by Ingester processes at any time (ingester.processors x ingester.pipeline.step_plumber_mini_batch_threads
    = plumber.server.max_spare = 4)

...