Christopher
02/10/2022, 6:10 PMJames Sutton
02/10/2022, 8:46 PMDaniel Nilsen
02/11/2022, 7:40 AMmutation MyMutation($input: any) {
create_flow_run(
input: $input
) {
id
}
}
Faisal k k
02/11/2022, 8:07 AMMichael Hadorn
02/11/2022, 8:21 AMWilliam Edwards
02/11/2022, 12:45 PMcreate_flow_run
in the client, or should the client be doing anything else?Robert Kowalski
02/11/2022, 2:28 PMdocker run --mac-address="70:ca:9b:ce:67:ae" IMAGE
So I want set constant mac address when define docker storage as:
storage = Docker(
env_vars={"PYTHONPATH": "$PYTHONPATH:/pipeline"},
files={f'{parent_dir}': '/pipeline'},
image_tag=os.environ.get('IMAGE_TAG'),
image_name=flow_name,
stored_as_script=True,
path='/pipeline/flow.py',
extra_dockerfile_commands=[]
)
Someone has tried to achieve something similar ?Josh
02/11/2022, 3:07 PMFuETL
02/11/2022, 4:28 PMwith Flow("dummy_flow") as flow:
... # parameters, logic, etc
I want to re-use this same flow (create another one), but reuse the logic of the previous one without have to copy-and-paste the old one is that possible? I want to create a second flow ex: "second_dummy_flow" but using the same logic of dummy_flow, so on prefect ui i will see 2 flows that will do the same thingKevin Mullins
02/11/2022, 4:55 PMmap
and have been successful with it previously for things like create_flow_run
and `wait_for_flow_run`; however, the wait_for_flow_run
input arguments exactly match the output from create_flow_run
. I’m trying to think of how tasks can be chained together where the tasks take mapped arguments from upstream but it’s not included in the direct parent’s output. For instance, for several tasks I to prepare/process/finalize something, I need the same configuration information for multiple thats was a result of a discover
task; however, this configuration information is not returned all the way through.
I’m curious what would be a good approach to something like this. If I have an original fan-out that say returns a list of 5 results that get mapped and tasks keep returning different lists of 5 results is it safe to pass these results downstream and match them up by index order?
# pseudo code
five_config_results = discover()
five_state_results = prepare.map(five_config_results)
five_process_results = process.map(
five_state_results, config_results=five_config_results
)
five_finalize_results = finalize.map(
five_process_results,
five_state_results=five_state_results,
config_results=five_config_results,
)
Or would another approach be needed to capture the matching results from each task to give to others?Leon Kozlowski
02/11/2022, 5:48 PMVipul
02/11/2022, 5:52 PMTom Shaffner
02/11/2022, 6:04 PMAdam Roderick
02/11/2022, 6:56 PMresult=3Result(bucket=s3_results_bucket)
The flow's output looks like it is using a default template something like YYYY/MM/DD/**.prefect_result
.
I would like to reuse this bucket across flows and environments, with a template something like {environment}/{flow_name}/YYYY/MM/DD/**.prefect_result
How can I accomplish this?Hugo Kitano
02/11/2022, 7:42 PMGabriel Gazola Milan
02/11/2022, 8:44 PMTamas Szuromi
02/12/2022, 11:32 PMFlow(...*storage=GCS(bucket=...
? I'd create a client myself and without using GOOGLE_APPLICATION_CREDENTIALS
. Can someone point me to the right direction?Yongchan Hong
02/13/2022, 1:38 PMakshay shenoy
02/13/2022, 6:37 PMFarid
02/14/2022, 3:54 AMStéphan Taljaard
02/14/2022, 5:22 AMquery {
flow(where: { name: { _ilike: "Energy Insight" } }) {
flow_runs(where: {name: {_eq: "stalwart-ostrich"}}) {
name
id
}
}
->
{
"data": {
"flow": [
{
"flow_runs": []
},
{
"flow_runs": []
},
{
"flow_runs": []
},
{
"flow_runs": []
},
{
"flow_runs": []
},
{
"flow_runs": [
{
"name": "stalwart-ostrich",
"id": "8bca3ab2-05cc-4223-93f0-071813528545"
}
]
}
]
}
}
Noam polak
02/14/2022, 8:07 AMchild_flow = create_flow_run(
flow_name=CHILD_FLOW,
parameters={
"input_data": input_data,
"run_id": run_id,
},
project_name="default",
)
child_result = get_task_run_result(
child_flow,
task_slug="child_flow-copy",
poll_time=3,
)
I tried to add handler to get_task_run_result but it get error:
child_result = get_task_run_result(
hazarder_flow,
task_slug="child_flow-copy",
poll_time=3,
state_handlers=[post_to_slack_task_handler],
)
TypeError: got an unexpected keyword argument 'state_handler'
So how can I do it?
thanksmassumo
02/14/2022, 9:09 AMmassumo
02/14/2022, 9:10 AMAlexis Lucido
02/14/2022, 10:58 AMFrederick Thomas
02/14/2022, 3:46 PMprefect.Client
to query the GraphQL endpoint, however, I am
getting errors that have me stumped. The relevant code:
import prefect
import pandas as pd
client = prefect.Client()
client.graphql("""
query {
agent{
flow_runs(limit:5, where:{state:{_eq: "Success"}start_time:{_gt:"2022-02-14"}}){
start_time
end_time
state
flow{
name
id
tasks_aggregate{
aggregate{
count
}
}
}
}
}
} """)
The errors:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 465, in _request
json_resp = response.json()
File "/usr/local/lib/python3.8/site-packages/requests/models.py", line 898, in json
return complexjson.loads(self.text, **kwargs)
File "/usr/local/lib/python3.8/json/__init__.py", line 357, in loads
return _default_decoder.decode(s)
File "/usr/local/lib/python3.8/json/decoder.py", line 337, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/usr/local/lib/python3.8/json/decoder.py", line 355, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/runpy.py", line 194, in _run_module_as_main
return _run_code(code, main_globals, None,
File "/usr/local/lib/python3.8/runpy.py", line 87, in _run_code
exec(code, run_globals)
File "/workspaces/prefect/ETL/CDNY/GraphQL.py", line 7, in <module>
client.graphql("""
File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 298, in graphql
result = <http://self.post|self.post>(
File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 213, in post
response = self._request(
File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 468, in _request
raise ClientError(
prefect.utilities.exceptions.ClientError: Malformed response received from Cloud - please ensure that you have an API token properly configured.
We're using Prefect core as our backend at the moment and the documentation for this is sparse when I google. ThanksAustin Vecchio
02/14/2022, 4:28 PMRichard Hughes
02/14/2022, 7:15 PMcroniter
to calc cron schedules inside of the prefect flow and registering on the cloud instance. When I kick of my flow I want to understand what is the last cron time schedule that was ran. These cron values I have placed them into a dictionary that has these times for a handful of different jobs. Then, I was to use this as a parameters to some code I am running. For Example:
croniter(SchemaNames[key], datetime.now()).get_prev(datetime).strftime("%m/%d/%Y %H:%M:%S")
For whatever reason the datetime.now() function is recalling the datetime that might be corresponding to the datetime when the flow was actually registered and not actually the runtime datetime.now(). Any thoughts how to achieve the results I am looking or guidance from this point? Much appreciated.
p.s. I would assume I could use the mquery api and extract the last schedule start times for these parameters but, I thought this approach was easier.Aric Huang
02/14/2022, 8:33 PMResult
using a specific serializer (e.g. PandasSerializer
) would use the same serializer when loading the result using prefect.tasks.prefect.get_task_run_result
? I have a task that uses the following task decorator:
@task(slug="output", result=GCSResult("<path>", serializer=PandasSerializer(file_type="parquet")), checkpoint=True)
When I try to get the result by doing:
result = get_task_run_result.run(flow_id, "output-copy", poll_time=5)
I get an error that seems to indicate it's trying to use cloudpickle
instead of Pandas:
File "/usr/local/lib/python3.8/dist-packages/prefect/engine/serializers.py", line 86, in deserialize
return cloudpickle.loads(value)
_pickle.UnpicklingError: A load persistent id instruction was encountered,
but no persistent_load function was specified.
Is there a way to have get_task_run_result
use a specific serializer?Peter Peter
02/14/2022, 9:42 PMPeter Peter
02/14/2022, 9:42 PMKevin Kho
02/14/2022, 9:43 PMPeter Peter
02/15/2022, 11:03 AM