Luis Aguirre
02/25/2022, 1:42 PMHenrietta Salonen
02/25/2022, 2:31 PMData
argument correctly for Prefect’s PostgresExecute
task together with the Query
argument.
I have two different dataframes that I would like to insert into Postgres tables. This is what I have now but unsure of how would I pass the data argument here
```@task
def make_query(schema_name, table_name, columns):
return f'''
CREATE TABLE IF NOT EXISTS {schema_name}.{table_name} ({columns});
INSERT INTO {schema_name}.{table_name} ({columns}) VALUES ();'
'''
with Flow("postgres_test") as flow:
execute_task = PostgresExecute(db_name=database, user=user, host=host, port=port, commit=True)
table_name = Parameter("table_name", default=[df1.name, df2.name])
columns = Parameter("columns", default=[df1_columns, df2_columns])
query = make_query.map(unmapped(schema_name), table_name, columns)
execute_task.map(password=unmapped(postgres_pwd), query=query)
if name == "__main__":
flow.run()```
Leo Kacenjar
02/25/2022, 3:48 PMLuke Segars
02/25/2022, 6:32 PMAusten Bouza
02/25/2022, 7:42 PMprefect.exceptions.AuthorizationError: Malformed response received from Cloud - please ensure that you are authenticated.
message. The auth token I am using has not changed and has not expired. The response is failing on a requests.exceptions.JSONDecodeError: [Errno Expecting value]
error with the response body <html><head><meta charset="utf-8"><script>C9xx.T9xx=T9xx;C9xx.g8=window;;n0II(C9xx.g8);T9TT(C9xx.g8);C9xx.D9S=(function(){var Z9S=2;for(;Z9S !== 1;){switch(Z9S)
…and a ton more obfuscated javascript. Is this a typical response body from Prefect Cloud when not authenticated?Max Lei
02/25/2022, 8:54 PMDaskExecutor
with dask_cloudprovider.aws.FargateCluster
, how do we assign a task-definition to the cluster?iñigo
02/25/2022, 8:57 PMDaniel Komisar
02/25/2022, 9:14 PMJosh
02/25/2022, 9:31 PMfrom prefect import Flow, task
from pprint import pformat
@task
def numbers():
return [1, 2]
@task
def letters():
return ["a", "b", "c"]
@task
def cross_product(numbers, letters):
res = []
for letter in letters:
for num in numbers:
res.append({"num": num, "letter": letter})
return res
@task
def pretty_print(**kwargs):
print(pformat(kwargs))
with Flow("...") as flow:
n = numbers()
l = letters()
map_items = cross_product(n, l)
pretty_print.map(map_items)
flow.run()
I get the following error:
Traceback (most recent call last):
File "flow.py", line 33, in <module>
pretty_print.map(map_items)
File "prefect/core/task.py", line 760, in map
*args, mapped=True, upstream_tasks=upstream_tasks, flow=flow, **kwargs
File "prefect/core/task.py", line 674, in bind
callargs = dict(signature.bind(*args, **kwargs).arguments) # type: Dict
File "inspect.py", line 3015, in bind
return args[0]._bind(args[1:], kwargs)
File "inspect.py", line 2942, in _bind
'too many positional arguments') from None
TypeError: too many positional arguments
kiran
02/25/2022, 9:33 PM@channel
or @here
in the SlackTask
message? Right now I can tag myself using my user ID but it doesn’t work when I try it with my channel ID or @channel
or <@channel>
...
elif new_state.is_failed():
data_engineer_user_id = Secret("DATA_ENGINEER_USER_ID").get()
msg = f"<@{data_engineer_user_id}> — {obj} ({flow_run_name}): {new_state}"
SlackTask(message=msg, webhook_secret="SLACK_WEBHOOK_URL").run()
Dexter Antonio
02/25/2022, 9:52 PMclient.graphql
. This is the graphql query that I want to use. It works on the prefect cloud’s interactive api;
query {
flow(where: {name: {_eq: "autoscorer"}, version:{_eq:10}}) {
id
name
version
tasks{
name
id
}
}}
This is my attempt to do it with the client.graphql
.
client.graphql(
{
'query': {
'flow(where: {name: {_eq: "autoscorer"}, version:{_eq:10}})': {
#'id',
#'name',
#'version',
'tasks': {'task_runs': {'id', 'name'}},
},
}
})
This works, but if I uncomment the commented fields (e.g. id
,) I get a syntax error. What is the proper syntax to indicate a root field? Is there any documentation how to use this API in more detail? Is it based off of a common Python GraphQL library?Aqib Fayyaz
02/26/2022, 7:47 AMiñigo
02/26/2022, 9:50 AMUnexpected error: PicklingError('Could not pickle object as excessively deep recursion required.')
Traceback (most recent call last):
File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 563, in dump
return Pickler.dump(self, obj)
RecursionError: maximum recursion depth exceeded while pickling an object
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 909, in get_task_run_state
result = self.result.write(value, **formatting_kwargs)
File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/prefect/engine/results/local_result.py", line 115, in write
value = self.serializer.serialize(new.value)
File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/prefect/engine/serializers.py", line 73, in serialize
return cloudpickle.dumps(value)
File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 570, in dump
raise pickle.PicklingError(msg) from e
_pickle.PicklingError: Could not pickle object as excessively deep recursion required.
After some testing I also get a problem while publishing my script with flow.register:
Traceback (most recent call last):
File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 563, in dump
return Pickler.dump(self, obj)
File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 639, in reducer_override
if sys.version_info[:2] < (3, 7) and _is_parametrized_type_hint(obj): # noqa # pragma: no branch
RecursionError: maximum recursion depth exceeded in comparison
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/inigo/project/scripts/es_am_08_caetano_to_s4e/caetano_to_s4e_flow.py", line 65, in <module>
flow.register(project_name='S4E')
File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/prefect/core/flow.py", line 1727, in register
registered_flow = client.register(
File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/prefect/client/client.py", line 1127, in register
serialized_flow = flow.serialize(build=build) # type: Any
File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/prefect/core/flow.py", line 1489, in serialize
self.storage.add_flow(self)
File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/prefect/storage/local.py", line 143, in add_flow
f.write(flow_to_bytes_pickle(flow))
File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/prefect/utilities/storage.py", line 177, in flow_to_bytes_pickle
cloudpickle.dumps(flow, protocol=4), newline=False
File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 570, in dump
raise pickle.PicklingError(msg) from e
_pickle.PicklingError: Could not pickle object as excessively deep recursion required.
iñigo
02/26/2022, 9:50 AMHui Huang
02/26/2022, 6:53 PMSlackbot
02/26/2022, 9:25 PMDavid Michael Carter
02/26/2022, 10:59 PMtask_run_name
would be a value based on a specific key for that mapped dict index. Anyone have experience naming tasks based on a mapped dictionary?Ayah Safeen
02/27/2022, 6:41 AMfrom prefect import flow
cannot import name 'ThreadedChildWatcher' from 'asyncio'
The code :
from prefect import flow
@flow
def common_flow(config: dict):
print("I am a subgraph that shows up in lots of places!")
intermediate_result = 42
return intermediate_result
@flow
def main_flow():
data = common_flow(config={})
# run the flow
flow_state = main_flow()
Hedgar
02/27/2022, 11:03 AMprefect agent local start
• Was also able to click the quick run
and I got a beautiful green bar of success!
However I have a challenge:
My AWS ec2 with the help of lambda function get started at certain time of the day and shuts down at certain time of the day.
How do I ensure that my flow runs when AWS ec2 starts for the day? Do I need to create a bash script that start the local agent via a crontab on the remote instance?Liran Cohen
02/27/2022, 1:49 PMLiran Cohen
02/27/2022, 1:53 PMModuleNotFoundError: No module named 'prefect'
the package seems to be installed cause when I do pip show prefect I get:
Name: prefect
Version: 1.0.0
Summary: The Prefect Core automation and scheduling engine.
I checked few times and the environment where I try to import is the same where I run pip show
any suggestions on why that might happen?
thanksAdi Gandra
02/27/2022, 4:55 PMAdi Gandra
02/27/2022, 9:07 PMNitin Bansal
02/28/2022, 1:58 AMNitin Bansal
02/28/2022, 1:59 AMNitin Bansal
02/28/2022, 1:59 AMOvo Ojameruaye
02/28/2022, 6:40 AMTomer Cagan
02/28/2022, 7:38 AMStephen Lloyd
02/28/2022, 9:06 AMRUN_CONFIG = ECSRun(labels=['s3-flow-storage'],
task_role_arn=TASK_ARN,
execution_role_arn=EXECUTION_ARN,
image='<http://1111111.dkr.ecr.us-east-1.amazonaws.com/prefectdemo:latest|1111111.dkr.ecr.us-east-1.amazonaws.com/prefectdemo:latest>',
memory=512, cpu=256)
E Li
02/28/2022, 3:34 PM