R Zo
02/25/2022, 2:48 AMBen Muller
02/25/2022, 3:52 AMcreate_flow_run.run(
flow_name="advanced_cleaning",
project_name="modelling",
run_name=f"data_update={data_update}",
parameters=dict(data_update=data_update),
)
When I run this it is called multiple times, but only one sometimes two flows are triggered in the cloud, is this a possible bug?
Do run names need to be unique?
This is being called from within a task fyiTomer Cagan
02/25/2022, 8:57 AMAditi Tambi
02/25/2022, 10:22 AMdatamongus
02/25/2022, 12:47 PMDonnchadh McAuliffe
02/25/2022, 1:14 PMMatthias
02/25/2022, 1:34 PMdocker.errors.NotFound: 404 Client Error for <http+docker://localhost/v1.41/containers/f30a3e9241fab0d272c3f36eae16867487fb187b964c8b3f22bc8fd05d2aa4d0/json>: Not Found ("No such container: f30a3e9241fab0d272c3f36eae16867487fb187b964c8b3f22bc8fd05d2aa4d0")
and the flow is stuck in submitted state. Does anyone knows how to fix it? Just to make sure, the only difference between the successful run and the run stuck in submitted state is the fact that I added a non-root user to the image.
https://github.com/anna-geller/packaging-prefect-flows/blob/master/flows_no_build/docker_script_docker_run_local_image.pyLuis Aguirre
02/25/2022, 1:42 PMHenrietta Salonen
02/25/2022, 2:31 PMData
argument correctly for Prefect’s PostgresExecute
task together with the Query
argument.
I have two different dataframes that I would like to insert into Postgres tables. This is what I have now but unsure of how would I pass the data argument here
```@task
def make_query(schema_name, table_name, columns):
return f'''
CREATE TABLE IF NOT EXISTS {schema_name}.{table_name} ({columns});
INSERT INTO {schema_name}.{table_name} ({columns}) VALUES ();'
'''
with Flow("postgres_test") as flow:
execute_task = PostgresExecute(db_name=database, user=user, host=host, port=port, commit=True)
table_name = Parameter("table_name", default=[df1.name, df2.name])
columns = Parameter("columns", default=[df1_columns, df2_columns])
query = make_query.map(unmapped(schema_name), table_name, columns)
execute_task.map(password=unmapped(postgres_pwd), query=query)
if name == "__main__":
flow.run()```
Leo Kacenjar
02/25/2022, 3:48 PMLuke Segars
02/25/2022, 6:32 PMAusten Bouza
02/25/2022, 7:42 PMprefect.exceptions.AuthorizationError: Malformed response received from Cloud - please ensure that you are authenticated.
message. The auth token I am using has not changed and has not expired. The response is failing on a requests.exceptions.JSONDecodeError: [Errno Expecting value]
error with the response body <html><head><meta charset="utf-8"><script>C9xx.T9xx=T9xx;C9xx.g8=window;;n0II(C9xx.g8);T9TT(C9xx.g8);C9xx.D9S=(function(){var Z9S=2;for(;Z9S !== 1;){switch(Z9S)
…and a ton more obfuscated javascript. Is this a typical response body from Prefect Cloud when not authenticated?Max Lei
02/25/2022, 8:54 PMDaskExecutor
with dask_cloudprovider.aws.FargateCluster
, how do we assign a task-definition to the cluster?iñigo
02/25/2022, 8:57 PMDaniel Komisar
02/25/2022, 9:14 PMJosh
02/25/2022, 9:31 PMfrom prefect import Flow, task
from pprint import pformat
@task
def numbers():
return [1, 2]
@task
def letters():
return ["a", "b", "c"]
@task
def cross_product(numbers, letters):
res = []
for letter in letters:
for num in numbers:
res.append({"num": num, "letter": letter})
return res
@task
def pretty_print(**kwargs):
print(pformat(kwargs))
with Flow("...") as flow:
n = numbers()
l = letters()
map_items = cross_product(n, l)
pretty_print.map(map_items)
flow.run()
I get the following error:
Traceback (most recent call last):
File "flow.py", line 33, in <module>
pretty_print.map(map_items)
File "prefect/core/task.py", line 760, in map
*args, mapped=True, upstream_tasks=upstream_tasks, flow=flow, **kwargs
File "prefect/core/task.py", line 674, in bind
callargs = dict(signature.bind(*args, **kwargs).arguments) # type: Dict
File "inspect.py", line 3015, in bind
return args[0]._bind(args[1:], kwargs)
File "inspect.py", line 2942, in _bind
'too many positional arguments') from None
TypeError: too many positional arguments
kiran
02/25/2022, 9:33 PM@channel
or @here
in the SlackTask
message? Right now I can tag myself using my user ID but it doesn’t work when I try it with my channel ID or @channel
or <@channel>
...
elif new_state.is_failed():
data_engineer_user_id = Secret("DATA_ENGINEER_USER_ID").get()
msg = f"<@{data_engineer_user_id}> — {obj} ({flow_run_name}): {new_state}"
SlackTask(message=msg, webhook_secret="SLACK_WEBHOOK_URL").run()
Dexter Antonio
02/25/2022, 9:52 PMclient.graphql
. This is the graphql query that I want to use. It works on the prefect cloud’s interactive api;
query {
flow(where: {name: {_eq: "autoscorer"}, version:{_eq:10}}) {
id
name
version
tasks{
name
id
}
}}
This is my attempt to do it with the client.graphql
.
client.graphql(
{
'query': {
'flow(where: {name: {_eq: "autoscorer"}, version:{_eq:10}})': {
#'id',
#'name',
#'version',
'tasks': {'task_runs': {'id', 'name'}},
},
}
})
This works, but if I uncomment the commented fields (e.g. id
,) I get a syntax error. What is the proper syntax to indicate a root field? Is there any documentation how to use this API in more detail? Is it based off of a common Python GraphQL library?Aqib Fayyaz
02/26/2022, 7:47 AMiñigo
02/26/2022, 9:50 AMUnexpected error: PicklingError('Could not pickle object as excessively deep recursion required.')
Traceback (most recent call last):
File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 563, in dump
return Pickler.dump(self, obj)
RecursionError: maximum recursion depth exceeded while pickling an object
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 909, in get_task_run_state
result = self.result.write(value, **formatting_kwargs)
File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/prefect/engine/results/local_result.py", line 115, in write
value = self.serializer.serialize(new.value)
File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/prefect/engine/serializers.py", line 73, in serialize
return cloudpickle.dumps(value)
File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 570, in dump
raise pickle.PicklingError(msg) from e
_pickle.PicklingError: Could not pickle object as excessively deep recursion required.
After some testing I also get a problem while publishing my script with flow.register:
Traceback (most recent call last):
File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 563, in dump
return Pickler.dump(self, obj)
File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 639, in reducer_override
if sys.version_info[:2] < (3, 7) and _is_parametrized_type_hint(obj): # noqa # pragma: no branch
RecursionError: maximum recursion depth exceeded in comparison
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/inigo/project/scripts/es_am_08_caetano_to_s4e/caetano_to_s4e_flow.py", line 65, in <module>
flow.register(project_name='S4E')
File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/prefect/core/flow.py", line 1727, in register
registered_flow = client.register(
File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/prefect/client/client.py", line 1127, in register
serialized_flow = flow.serialize(build=build) # type: Any
File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/prefect/core/flow.py", line 1489, in serialize
self.storage.add_flow(self)
File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/prefect/storage/local.py", line 143, in add_flow
f.write(flow_to_bytes_pickle(flow))
File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/prefect/utilities/storage.py", line 177, in flow_to_bytes_pickle
cloudpickle.dumps(flow, protocol=4), newline=False
File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 570, in dump
raise pickle.PicklingError(msg) from e
_pickle.PicklingError: Could not pickle object as excessively deep recursion required.
iñigo
02/26/2022, 9:50 AMHui Huang
02/26/2022, 6:53 PMSlackbot
02/26/2022, 9:25 PMDavid Michael Carter
02/26/2022, 10:59 PMtask_run_name
would be a value based on a specific key for that mapped dict index. Anyone have experience naming tasks based on a mapped dictionary?Ayah Safeen
02/27/2022, 6:41 AMfrom prefect import flow
cannot import name 'ThreadedChildWatcher' from 'asyncio'
The code :
from prefect import flow
@flow
def common_flow(config: dict):
print("I am a subgraph that shows up in lots of places!")
intermediate_result = 42
return intermediate_result
@flow
def main_flow():
data = common_flow(config={})
# run the flow
flow_state = main_flow()
Hedgar
02/27/2022, 11:03 AMprefect agent local start
• Was also able to click the quick run
and I got a beautiful green bar of success!
However I have a challenge:
My AWS ec2 with the help of lambda function get started at certain time of the day and shuts down at certain time of the day.
How do I ensure that my flow runs when AWS ec2 starts for the day? Do I need to create a bash script that start the local agent via a crontab on the remote instance?Liran Cohen
02/27/2022, 1:49 PMLiran Cohen
02/27/2022, 1:53 PMModuleNotFoundError: No module named 'prefect'
the package seems to be installed cause when I do pip show prefect I get:
Name: prefect
Version: 1.0.0
Summary: The Prefect Core automation and scheduling engine.
I checked few times and the environment where I try to import is the same where I run pip show
any suggestions on why that might happen?
thanksAdi Gandra
02/27/2022, 4:55 PMAdi Gandra
02/27/2022, 9:07 PMAdi Gandra
02/27/2022, 9:07 PMAnna Geller
02/27/2022, 9:47 PM