Troy Sankey
04/24/2020, 6:44 PMChristopher Harris
04/24/2020, 8:40 PMdef execute(pipeline_config: PipelineConfiguration):
project = pipeline_config.project
with Flow("test-flow") as flow:
# Service Initialization
source = init_source(project, pipeline_config.
sinks = init_sink.map(unmapped(project), pipeline_config.sinks)
# Perform ETL
documents = pull(source)
push.map(documents, sinks)
flow.run(executor=LocalExecutor())
The problem with this approach is it does a one to one mapping - like the first image.
I want a many to one mapping, like the second image.
Effectively i am trying to recreate the following logic
for each document:
for each sink:
sink.push(doc)
Brad
04/25/2020, 2:08 AMTom B
04/25/2020, 3:34 AMJoe Schmid
04/25/2020, 3:50 AMon_execute()
(or the existing on_start()
) called from an Environment. I can pass the environment's self.logger
-- that seems clunky, but the following doesn't seem to work:
def on_execute(parameters: Dict[str, Any], provider_kwargs: Dict[str, Any]) -> None:
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Checking Flow run parameters: {}".format(parameters))
Arsenii
04/25/2020, 5:18 AMTask 'Some important work[19]': Calling task.run() method...
and it just "runs" this task forever, but there's no changed state from ... to Running
afterwards
When using a LocalExecutor. I thought that this maybe has something to do with me using raise SKIP
to "filter" some elements, but rewriting using a FilterTask
didn't fix the problemZiyao Wei
04/25/2020, 2:44 PMZiyao Wei
04/25/2020, 4:34 PMmax_active_runs
or similar options?Daniel
04/26/2020, 9:47 AMZiyao Wei
04/27/2020, 2:05 AMKostas Chalikias
04/27/2020, 9:04 AMKamil Okáč
04/27/2020, 1:11 PMDavid Ojeda
04/27/2020, 2:18 PMprefect.context
. This includes some secrets and some URLs and object ids on some internal rest services.
Before we used the prefect server, we had a small cli that would populate the prefect.context
and use it when running the flow:
with prefect.context(**our_custom_vars):
flow_state = flow.run(parameters=flow_parameters, ...)
My ultimate objective is to have a flow with a default context that I can run from the UI or schedule it with that context when deploying it.
In my question of April 19th, @Jeremiah pointed out that there is no way to set these contexts at the moment and you will discuss it internally later… is there any update on this front?
Otherwise, I am looking for alternatives or workarounds:
One workaround would be to understand where the context would be set on an agent, worker, or runner (I am not sure which one). Jeremiah also pointed our that the FlowRunner requests the context from the server, but I can’t find where or even if a flow has its context saved anywhere (there does not seem to be a field named context on the flow_by_pk
query).
Another workaround would be to populate environment variables or change the default config.toml
of the agent, worker, or runner (I am not sure which one) so that the prefect.context
is populated with these values. I am not sure if this would work.
Another workaround may be to override the __setstate__
and __getstate__
method, so that the flow can retrieve the context when unpickled. I am not sure if this would work either.
Any ideas on which of these workarounds may be the best bet here?Jacques
04/27/2020, 2:24 PMJacques
04/27/2020, 2:25 PMtrigger=all_successful
for the mapped function, but it only works for unmapped tasksalvin goh
04/27/2020, 2:42 PMAn Hoang
04/27/2020, 2:48 PMX does want to push back because docker is a involved, and has many unsolved aspects since the daemons run as root. It didn’t look like the prefect ui could run in singularity which is the user-space container management thing that they were talking about.
If it is possible to install the prefect ui as a non-docker, either as a library, or on a server. I think that has many fewer concerns from the systems team.
Matias Godoy
04/27/2020, 3:42 PMPedro Machado
04/28/2020, 2:03 AMalvin goh
04/28/2020, 7:31 AMfrom prefect import task, Flow
from prefect.engine import signals
import random
from prefect.triggers import all_finished
@task()
def randomly_fail():
x = random.random()
if x > 0.7:
raise ValueError("x is too large")
@task(trigger=all_finished)
def print_result(l):
print(l)
return l
with Flow("random-mapping") as f:
result = [randomly_fail() for i in range(10)]
print_result(result)
f.run()
output shows that the list operation failed so i cant print the final results.
[2020-04-28 07:33:16,639] INFO - prefect.TaskRunner | Task 'randomly_fail': Starting task run...
[2020-04-28 07:33:16,640] ERROR - prefect.TaskRunner | Unexpected error: ValueError('x is too large',)
Traceback (most recent call last):
File "c:\tools\anaconda3\envs\prefect\lib\site-packages\prefect\engine\runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "c:\tools\anaconda3\envs\prefect\lib\site-packages\prefect\engine\task_runner.py", line 884, in get_task_run_state
self.task.run, timeout=self.task.timeout, **raw_inputs
File "c:\tools\anaconda3\envs\prefect\lib\site-packages\prefect\utilities\executors.py", line 185, in timeout_handler
return fn(*args, **kwargs)
File "<ipython-input-6-020915229ace>", line 10, in randomly_fail
raise ValueError("x is too large")
ValueError: x is too large
[2020-04-28 07:33:16,647] INFO - prefect.TaskRunner | Task 'randomly_fail': finished task run for task with final state: 'Failed'
[2020-04-28 07:33:16,662] INFO - prefect.TaskRunner | Task 'List': Starting task run...
[2020-04-28 07:33:16,669] INFO - prefect.TaskRunner | Task 'List': finished task run for task with final state: 'TriggerFailed'
[2020-04-28 07:33:16,685] INFO - prefect.TaskRunner | Task 'print_result': Starting task run...
Trigger was "all_successful" but some of the upstream tasks failed.
[2020-04-28 07:33:16,693] INFO - prefect.TaskRunner | Task 'print_result': finished task run for task with final state: 'Success'
[2020-04-28 07:33:16,694] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
David Hogarty
04/28/2020, 1:54 PMDavid Hogarty
04/28/2020, 1:55 PMDavid Hogarty
04/28/2020, 1:55 PMDavid Hogarty
04/28/2020, 1:55 PMDavid Hogarty
04/28/2020, 1:55 PMThomas Wiecki
04/28/2020, 3:18 PMThomas Wiecki
04/28/2020, 3:18 PMThomas Wiecki
04/28/2020, 3:29 PMWill Milner
04/28/2020, 3:37 PMDavid Ojeda
04/28/2020, 4:37 PMGracefulFail
, which extends Success
(we use it for tasks that did not succeed but should not stop the rest of the downstream tasks).
Unfortunately, using these on prefect server gives a 500, leaving the task out of date on the UI…
Failed to set task state with error: ClientError([{'message': '{\'_schema\': "Invalid data type: [None, {\'_schema\': \'Unsupported object type: GracefulFail\'}]"}', 'locations': [{'line': 6, 'column': 13}], 'path': ['set_task_run_states', 'states', 0, 'id'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}])
Traceback (most recent call last):
File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/engine/cloud/task_runner.py", line 115, in call_runner_target_handlers
state = self.client.set_task_run_state(
File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/client/client.py", line 1082, in set_task_run_state
result = self.graphql(
File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/client/client.py", line 225, in graphql
raise ClientError(result["errors"])
prefect.utilities.exceptions.ClientError: [{'message': '{\'_schema\': "Invalid data type: [None, {\'_schema\': \'Unsupported object type: GracefulFail\'}]"}', 'locations': [{'line': 6, 'column': 13}], 'path': ['set_task_run_states', 'states', 0, 'id'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]