Josh
02/11/2022, 3:07 PMFuETL
02/11/2022, 4:28 PMwith Flow("dummy_flow") as flow:
... # parameters, logic, etc
I want to re-use this same flow (create another one), but reuse the logic of the previous one without have to copy-and-paste the old one is that possible? I want to create a second flow ex: "second_dummy_flow" but using the same logic of dummy_flow, so on prefect ui i will see 2 flows that will do the same thingKevin Mullins
02/11/2022, 4:55 PMmap
and have been successful with it previously for things like create_flow_run
and `wait_for_flow_run`; however, the wait_for_flow_run
input arguments exactly match the output from create_flow_run
. I’m trying to think of how tasks can be chained together where the tasks take mapped arguments from upstream but it’s not included in the direct parent’s output. For instance, for several tasks I to prepare/process/finalize something, I need the same configuration information for multiple thats was a result of a discover
task; however, this configuration information is not returned all the way through.
I’m curious what would be a good approach to something like this. If I have an original fan-out that say returns a list of 5 results that get mapped and tasks keep returning different lists of 5 results is it safe to pass these results downstream and match them up by index order?
# pseudo code
five_config_results = discover()
five_state_results = prepare.map(five_config_results)
five_process_results = process.map(
five_state_results, config_results=five_config_results
)
five_finalize_results = finalize.map(
five_process_results,
five_state_results=five_state_results,
config_results=five_config_results,
)
Or would another approach be needed to capture the matching results from each task to give to others?Leon Kozlowski
02/11/2022, 5:48 PMVipul
02/11/2022, 5:52 PMTom Shaffner
02/11/2022, 6:04 PMAdam Roderick
02/11/2022, 6:56 PMresult=3Result(bucket=s3_results_bucket)
The flow's output looks like it is using a default template something like YYYY/MM/DD/**.prefect_result
.
I would like to reuse this bucket across flows and environments, with a template something like {environment}/{flow_name}/YYYY/MM/DD/**.prefect_result
How can I accomplish this?Hugo Kitano
02/11/2022, 7:42 PMGabriel Gazola Milan
02/11/2022, 8:44 PMTamas Szuromi
02/12/2022, 11:32 PMFlow(...*storage=GCS(bucket=...
? I'd create a client myself and without using GOOGLE_APPLICATION_CREDENTIALS
. Can someone point me to the right direction?Yongchan Hong
02/13/2022, 1:38 PMakshay shenoy
02/13/2022, 6:37 PMFarid
02/14/2022, 3:54 AMStéphan Taljaard
02/14/2022, 5:22 AMquery {
flow(where: { name: { _ilike: "Energy Insight" } }) {
flow_runs(where: {name: {_eq: "stalwart-ostrich"}}) {
name
id
}
}
->
{
"data": {
"flow": [
{
"flow_runs": []
},
{
"flow_runs": []
},
{
"flow_runs": []
},
{
"flow_runs": []
},
{
"flow_runs": []
},
{
"flow_runs": [
{
"name": "stalwart-ostrich",
"id": "8bca3ab2-05cc-4223-93f0-071813528545"
}
]
}
]
}
}
Noam polak
02/14/2022, 8:07 AMchild_flow = create_flow_run(
flow_name=CHILD_FLOW,
parameters={
"input_data": input_data,
"run_id": run_id,
},
project_name="default",
)
child_result = get_task_run_result(
child_flow,
task_slug="child_flow-copy",
poll_time=3,
)
I tried to add handler to get_task_run_result but it get error:
child_result = get_task_run_result(
hazarder_flow,
task_slug="child_flow-copy",
poll_time=3,
state_handlers=[post_to_slack_task_handler],
)
TypeError: got an unexpected keyword argument 'state_handler'
So how can I do it?
thanksmassumo
02/14/2022, 9:09 AMmassumo
02/14/2022, 9:10 AMAlexis Lucido
02/14/2022, 10:58 AMFrederick Thomas
02/14/2022, 3:46 PMprefect.Client
to query the GraphQL endpoint, however, I am
getting errors that have me stumped. The relevant code:
import prefect
import pandas as pd
client = prefect.Client()
client.graphql("""
query {
agent{
flow_runs(limit:5, where:{state:{_eq: "Success"}start_time:{_gt:"2022-02-14"}}){
start_time
end_time
state
flow{
name
id
tasks_aggregate{
aggregate{
count
}
}
}
}
}
} """)
The errors:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 465, in _request
json_resp = response.json()
File "/usr/local/lib/python3.8/site-packages/requests/models.py", line 898, in json
return complexjson.loads(self.text, **kwargs)
File "/usr/local/lib/python3.8/json/__init__.py", line 357, in loads
return _default_decoder.decode(s)
File "/usr/local/lib/python3.8/json/decoder.py", line 337, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/usr/local/lib/python3.8/json/decoder.py", line 355, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/runpy.py", line 194, in _run_module_as_main
return _run_code(code, main_globals, None,
File "/usr/local/lib/python3.8/runpy.py", line 87, in _run_code
exec(code, run_globals)
File "/workspaces/prefect/ETL/CDNY/GraphQL.py", line 7, in <module>
client.graphql("""
File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 298, in graphql
result = <http://self.post|self.post>(
File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 213, in post
response = self._request(
File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 468, in _request
raise ClientError(
prefect.utilities.exceptions.ClientError: Malformed response received from Cloud - please ensure that you have an API token properly configured.
We're using Prefect core as our backend at the moment and the documentation for this is sparse when I google. ThanksAustin Vecchio
02/14/2022, 4:28 PMRichard Hughes
02/14/2022, 7:15 PMcroniter
to calc cron schedules inside of the prefect flow and registering on the cloud instance. When I kick of my flow I want to understand what is the last cron time schedule that was ran. These cron values I have placed them into a dictionary that has these times for a handful of different jobs. Then, I was to use this as a parameters to some code I am running. For Example:
croniter(SchemaNames[key], datetime.now()).get_prev(datetime).strftime("%m/%d/%Y %H:%M:%S")
For whatever reason the datetime.now() function is recalling the datetime that might be corresponding to the datetime when the flow was actually registered and not actually the runtime datetime.now(). Any thoughts how to achieve the results I am looking or guidance from this point? Much appreciated.
p.s. I would assume I could use the mquery api and extract the last schedule start times for these parameters but, I thought this approach was easier.Aric Huang
02/14/2022, 8:33 PMResult
using a specific serializer (e.g. PandasSerializer
) would use the same serializer when loading the result using prefect.tasks.prefect.get_task_run_result
? I have a task that uses the following task decorator:
@task(slug="output", result=GCSResult("<path>", serializer=PandasSerializer(file_type="parquet")), checkpoint=True)
When I try to get the result by doing:
result = get_task_run_result.run(flow_id, "output-copy", poll_time=5)
I get an error that seems to indicate it's trying to use cloudpickle
instead of Pandas:
File "/usr/local/lib/python3.8/dist-packages/prefect/engine/serializers.py", line 86, in deserialize
return cloudpickle.loads(value)
_pickle.UnpicklingError: A load persistent id instruction was encountered,
but no persistent_load function was specified.
Is there a way to have get_task_run_result
use a specific serializer?Peter Peter
02/14/2022, 9:42 PMAntonio Manuel BR
02/15/2022, 7:05 AMJean-Baptiste Six
02/15/2022, 9:28 AM@task
def subtask():
return 1
with Flow("subflow") as subflow:
subtask()
@task
def main_task():
subflow.run()
with Flow("main_flow") as main_flow:
main_task()
But I faced this Error :
_Unexpected error while running flow: KeyError('Task slug init_dirs-1 is not found in the current Flow. This is usually caused by a mismatch between the flow version stored in the Prefect backend and the flow that was loaded from storage.\n- Did you change the flow without re-registering it?\n- Did you register the flow without updating it in your storage location (if applicable)?')_
I precise that I had already register the _main_flow_ (without the subflow inside) and it worked, but then I updated it and I registered the subflow, and it failed, could you help me please ? 🙏
The task "_init_dirs_" is in the main_flow, finishes in a success state, but this error append when subflow.run() is called (and init_dirs is not in the subflow)Martin Teller
02/15/2022, 10:37 AMFlorian Kühnlenz
02/15/2022, 1:11 PM{
flow_group(
where: {flows: {name: {_ilike: "%name%"}, archived: {_eq:false}}} ) {
id
name
flows{
name
id
archived
}
}
}
Will not actually filter for not archived versions. What am I doing wrong?Simon Stusak
02/15/2022, 4:15 PMZach Schumacher
02/15/2022, 4:21 PMNatsume Kirito
02/15/2022, 4:37 PM