as
09/01/2020, 2:37 PMtarget
file as a task parameter.
So if this target file is present it should skip the execution of this task.
I want to choose to programmatically (eg. through a parameter) force execution of this task (and thus ignoring/overwriting the target file).
I first thought using the cache_validator
task parameter would be the way to go, but I see in the documentation that it is deprecated.
Is there a recommended way to achieve this behavior?
ThanksSaranya Elumalai
09/01/2020, 7:54 PM@task(max_retries=3, retry_delay=timedelta(seconds=5))
def write_delta_lookup_table():
......
My image build failed for the reason
ValueError: Task <Task: write_delta_lookup_table> has retry settings but some upstream dependencies do not have result types. See <https://docs.prefect.io/core/concepts/results.html> for more details.
Why the task retry require result types. I am not passing values from one task to another. Also https://docs.prefect.io/core/concepts/tasks.html#overview retry example doesnt show the use of result type. What am I missing here. ??
(Any documentation with simple retry usecase will be very helpful)asm
09/01/2020, 10:58 PMEric
09/02/2020, 12:42 AMclient.set_flow_run_state(flow_run_id="<flow_run_id>", state=PAUSE)
). The flow-run state on UI is set to PAUSE and the task B continues to execute until the end. But once the task B is finished, I use client.set_flow_run_state
to set this flow-run back to running (or resume). Task C will get this error (I throw it to thread) and the whole flow-run is still running (Task C is pending), it seems like an abnormal state because this flow-run will keep running but do nothing.
May I ask if this is normal? Thanks 🙂josh
09/02/2020, 12:23 PM0.13.5
 has been released and here are a few notable changes:
🚸  Introduce better monitoring of mapped children
♻️ K8s agent manages Prefect job lifecycles
👩‍⚕️ Improved automatic health checks for common user hiccups
A big thank you to our contributors who helped out with this release! Full changelog:Luis Muniz
09/02/2020, 1:40 PMMatias Godoy
09/02/2020, 1:50 PMLast State Message
[2 Sep 2020 3:24pm]: Failed to load and execute Flow's environment: FileNotFoundError(2, 'No such file or directory')
Max Ernst
09/02/2020, 2:00 PMShaun Cutts
09/02/2020, 4:07 PMtowel
doing?) Some explanation as to how the pieces interact (and/or how I should best go about getting them to interact in my scenario) would be great.
Thanks!DJ Erraballi
09/02/2020, 4:28 PMDJ Erraballi
09/02/2020, 4:50 PMDJ Erraballi
09/02/2020, 4:51 PMHawkar Mahmod
09/02/2020, 5:09 PMwith Flow('event-export', schedule) as flow:
api_key = EnvVarSecret('SOME_API_KEY')
offset = get_last_offset()
event_data = extract_email_event_data(api_key, offset)
transformed_data = transform(event_data)
send_to_firehose(transformed_data)
update_last_offset(offset=offset+100)
I would like to be able to call the extract task for as long as necessary, incrementing the offset on each occasion, and I would like to transform and send each extracted set of events in one swoop. However I can't seem to figure out how to put this into a while loop with the result of the extract task to be my end condition. I.e if no events are left then stop the flow.
I've already tried LOOP
but this is no use to me because I don't want to wait until I get all the results back which could take hours.David Elliott
09/02/2020, 7:41 PMStep 9/9 : RUN python /opt/prefect/healthcheck.py '["/opt/prefect/flows/error-test.prefect"]' '(3, 7)'
---> Running in 13eb44f7f5db
Beginning health checks...
System Version check: OK
Traceback (most recent call last):
File "/opt/prefect/healthcheck.py", line 121, in <module>
flows = cloudpickle_deserialization_check(flow_file_path)
File "/opt/prefect/healthcheck.py", line 43, in cloudpickle_deserialization_check
flows.append(cloudpickle.load(f))
File "/usr/local/lib/python3.7/site-packages/prefect/core/edge.py", line 149, in __hash__
self.flattened,
AttributeError: 'Edge' object has no attribute 'flattened'
I’ve simplified my original setup right down to the attached script to try and trace the issue - I’m getting the error on version 0.12.6 onwards - might be related to this change added in v0.12.6? v0.12.5 builds ok.
Hoping someone could let me know if it’s something wrong in my code or if they’re able to reproduce?
Locally I’m running python 3.7 & prefect 0.13.5 (though this shouldn’t matter as it’s building the docker container with the specified base image)
Thanks!Jeff Brainerd
09/03/2020, 1:59 AM400 Client Error: Bad Request for url:Â <https://api.prefect.io/graphql>
This error began last night (~9PM eastern) in code that has been working fine for weeks.
We are using Prefect Cloud and Prefect Core 0.13.2 (with a 0.12.x Fargate agent IIRC). Wondering if this is related to the recent upgrade... didn’t see any breaking changes in the release notes that look suspicious. Thanks 🙏Chris Goddard
09/03/2020, 2:41 AMpsimakis
09/03/2020, 7:46 AM0.12.2
to 0.13.5
a connectivity issue came up with the graphql component. Prefect server is running in a different server but the graphql url remains <http://localhost:4200/graphql>
. server.ui.graphql_url
was working great with version 0.12.2
but now I can't find any way to configure the graphql url properly.
Below you will find the `config.toml`:
$ cat ~/.prefect/config.toml
[logging]
level = "INFO"
[api]
url = "<http://192.168.40.180:4200>"
[server.database]
host_port = "6543"
[context.secrets]
SLACK_WEBHOOK_URL = '<https://hooks.slack.com/services/xx/XX/Xx>'
[server.ui]
graphql_url = "<http://192.168.40.180:4200/graphql>"
In the image you can see a POC of the case.
I'm a lit bit confused about the old and the new way to configure the prefect server. Have you any idea about this issue?
If this issue has been already reported, feel free to delete this message.
Thanks in advance.Jacob Blanco
09/03/2020, 7:55 AMARun
09/03/2020, 12:34 PMClientError: Malformed response received from Cloud - please ensure that you have an API token properly configured.
Is there another token I need to configure?Thomas Hoeck
09/03/2020, 1:14 PMfrom prefect import task
import prefect
@task
def extract():
logger = prefect.context.get("logger")
logger.debug("Executing")
return [1,2,3]
extract.run()
as this yields
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-25-9ca06496a242> in <module>
11
12
---> 13 extract.run()
<ipython-input-25-9ca06496a242> in extract()
7 def extract():
8 logger = prefect.context.get("logger")
----> 9 logger.debug("Executing")
10 return [1,2,3]
11
AttributeError: 'NoneType' object has no attribute 'debug'
Lukas N.
09/03/2020, 2:22 PMcopy_data_to_s3
which takes data (after expensive computation) on input copies them to s3 and outputs the s3 path (not the data!) where they were stored. Other downstream tasks work with that s3 path. Now if some of the downstream tasks fail, I need to re-run the entire flow, whereas with S3Result I could just rerun the part after s3 data copy. Now the tricky part is I want to output the path on s3 and not the actual the data which is what S3Result does by default, seems like it should be easy modification but I can't come up with a solution. Anyone able to help?Jeff Brainerd
09/03/2020, 6:41 PMraise signals.RETRY
in those cases. Problem is this does not appear to work in the task handler. This code gets called: https://github.com/PrefectHQ/prefect/blob/5b7015a75ea60bc5a8ffb676b3be2ff406103583/src/prefect/engine/cloud/task_runner.py#L96-L105 and the task ultimately is failed, not retried. So should our code be something more like this:
return Retrying(msg=msg, context=new_state.context)
or should this in fact work? Thanks!Gabriel Montañola
09/03/2020, 7:08 PM.csv
with query results;
3. GZIP the .csv
and upload it to S3;
4. Dump the uploaded file content into a Redshift table (using COPY
with the correct settings)
So I have a few questions:
• A noticed that there is a P S3 task for uploading, but I guess I can't pass gzipped content to it, right? So I built a custom Task that handles the gzipping + uploading for me. Am I using the S3Upload Task wrong or for my purposes this is what I should be doing?
• Is there a way to generate dynamic tasks/flows? Example: with Airflow I made both a DAG and a Task factory using a .yml
file as reference. My tasks are pretty repetitive and I only need to change some fields (source table name, destination schema/table, schedule time and incremental key). How could I achieve this with Prefect? Could you point me some directions or a documentation link?
Thank you very much and congratulations for the awesome work.Marwan Sarieddine
09/03/2020, 7:42 PMMatt Wong-Kemp
09/03/2020, 11:02 PMJackson Maxfield Brown
09/04/2020, 12:02 AMlocal
or remote
and so I want tasks to store results in LocalResult
or S3Result
accordingly:
python run_my_flow.py # tasks are created with LocalResult (default)
python run_my_flow.py --remote # tasks are created with S3Result
Weird pseudo code:
with Flow("my_flow") as flow:
result_target = Parameter("result_target", default="local")
if result_target == "local":
result_target_for_task_a = LocalResult("a_results/")
result_target_for_task_b = LocalResult("b_results/")
else result_target == "remote":
result_target_for_task_a = S3Result("<s3://hello-world/a_results/>")
result_target_for_task_b = S3Result("<s3://hello-world/b_results/>")
items = task_a(result_handler=result_target_for_task_a)
task_b(result_handler=result_target_for_task_b)
Brennan Chapman
09/04/2020, 12:35 AMSven Teresniak
09/04/2020, 8:17 AMPREFECT_SERVER__GRAPHQL_URL
to the FQDN of my service with port 4200 and everything. With that ENV the UI worked perfect with v0.12.5. But the new Prefect is completely ignoring that ENV.Jacob Blanco
09/04/2020, 9:02 AM@task
def some_task_that_can_fail():
if something_bad:
raise signals.FAIL()
return dict()
filter_out_stuff = FilterTask(filter_func=lambda config: config.get('some_parameter') = 'today')
with Flow() as flow:
.....
configurations = some_task_that_can_fail.map(some_inputs)
useable_configs = filter_out_stuff(configurations)
ARun
09/04/2020, 3:00 PMTypeError: cannot pickle '_thread.lock' object