Marvin
09/18/2020, 7:09 AMCarl R
09/18/2020, 7:15 AMMarvin
09/18/2020, 7:15 AMRobin
09/18/2020, 8:26 AMmax_retries
and retry_delay
and I got the following error:
/opt/prefect/healthcheck.py:149: UserWarning: Task <Task: copy_storage> has retry settings but some upstream dependencies do not have result types. See <https://docs.prefect.io/core/concepts/results.html> for more details.
result_check(flows)
It's a mapped_task
.
I read the documentation about result types but that did not answer several questions:
• What might go wrong if I don't set a result type?
• If I understood correctly, the result type can be either flow
or task
. I think I want that the result type is flow, since each task should be only run once (if successful) so the result does not change during the flow. Is that correct?
• Should I therefore do something like with Flow("creative_name", result=flow_result)
? And what should I set flow_result
to?
Bests
RobinJacob Blanco
09/18/2020, 9:31 AMVinod Sugur
09/18/2020, 1:42 PMMarvin
09/18/2020, 1:42 PMDolor Oculus
09/18/2020, 4:21 PMprefect backend server
prefect server start
prefect agent start local -p .
server launches ok, ui looks good, I can register a simple hello world flow,
with Flow("Welcome Flow") as pf:
print("Hello world")
pf.register(project_name="Hello, World!")
Registering this flow results in
Hello world
Result Handler check: OK
Flow: <http://localhost:8080/flow/a5ec20aa-ad9e-4add-977e-cffe9699eba3>
But when I run the flow from the UI, I get this error message:
[2020-09-18 16:20:35,115] ERROR - agent | Error while deploying flow: FileNotFoundError(2, "No such file or directory: 'prefect'")
Any thoughts? ty 🙂Hamza Ahmed
09/18/2020, 5:34 PMfrom prefect import Flow, task
from prefect.tasks.aws.secrets_manager import AWSSecretsManager
from prefect.tasks.postgres.postgres import PostgresFetch
@task
def print_me(to_print):
print(to_print)
with Flow('fetching-data') as flow:
credentials = AWSSecretsManager(secret='pg/prefectsql/prefect')
print_me(credentials)
pg_user = credentials['username']
pg_pass = credentials['password']
pg_host = 'localhost'
pg_port = credentials['port']
pg_query = 'SELECT * FROM hdb_catalog.hdb_table LIMIT 5;'
runsql = PostgresFetch(db_name='prefect', user=pg_user, host='localhost')
result = runsql(password=pg_pass, query=pg_query, fetch='all')
print_me(result)
flow.run()
the PostgresFetch initialization doesn't work when I try to use user=credentials['username']
, but it does when I hardcode the username, or even if I set pg_user
to the a string containing the username
The error flow run produces the below output:matt forbes
09/18/2020, 5:52 PMjars
09/18/2020, 8:12 PMMinakshi
09/18/2020, 8:43 PMModuleNotFoundError: No module named 'dask.system'
I found a resolution for that online namely trying to use the latest version of daks and distributed https://github.com/dask/distributed/issues/3331. But problem here is that i need to add these to our internal repo. So i need to confirm what all are the dependent packages with versions so that i can check or add all at once as required. Where can i get the list for that?Dolor Oculus
09/18/2020, 9:33 PMwith Flow() as flow:
e = Extract()
t = Transform(e)
l = Load(t)
state = flow.run()
syntax? I'm getting key not found results on asserting state.result[e], and wondering if you have to do it in the way given in the testing-flows link above to unit test the flows.Glen Trudeau
09/18/2020, 10:14 PMFailed to load and execute Flow's environment: ModuleNotFoundError("No module named 'sqlalchemy'")
when running a flow on Fargate agent which I assume is due to the fact that the base docker image doesn’t have that module installed. But when we tried downloading the public image, adding the necessary modules and then uploading to ECR it doesn’t appear that the agent is utilizing the updated image even though we called it inside of the container definitions (reference python wrapper). We see the below screenshot for the new task definition in ECS which means it isn’t pulling the specified image correctly. Any suggestions?CA Lee
09/19/2020, 11:08 AMplt.savefig
takes in a file_path
arg (below is some sample code):
CWD = os.path.dirname(os.path.abspath(__file__)) # Get CWD
file_path = os.path.join(CWD, "picture.png") # Save in CWD
plt.savefig(file_path)
This all works fine when I am running it on my server using flow.run()
, however, when it is registered on cloud, invoking a Quick Run results in the error : PermissionError: [Errno 13] Permission denied: '/picture.png'
The error shows that file_path
is unable to be parsed to the current working directory that the script is being run from when triggered from Cloud.
Would appreciate any insight.. if it helps, I am getting data from Google Sheets, plotting it and sending it to Slack, so I'd like to Prefect this workflow.Eric
09/20/2020, 1:58 AMEric
09/20/2020, 1:58 AMEric
09/20/2020, 1:59 AMCA Lee
09/20/2020, 9:54 AM@task(cache_for=datetime.timedelta(days=1))
def fetch_data():
return fetched_data
@task
def insert_into_database(fetched_data):
fetched_data.to_sql('table_name', con=db.engine)
itay livni
09/20/2020, 12:24 PMAnshaj
09/20/2020, 1:10 PMMarvin
09/20/2020, 1:10 PMJeff Brainerd
09/20/2020, 2:48 PM0.13.2
with heartbeats off and Lazarus on. Is this a known issue? Should Lazarus be picking these up? Thanks!Eric
09/20/2020, 5:33 PMEric
09/20/2020, 5:34 PMjosh
09/20/2020, 10:41 PM0.13.7
has been released and here are a few notable changes:
🛠️ Fixes, lots of fixes
🗜️ Data compression for S3 upload/download tasks
🤫 Quieter debug logs when starting server
A big thank you to our contributors who helped out with this release! Full changelog:Pedro Machado
09/21/2020, 12:45 AMstart_date
and end_date
.
If these are not provided, I want to use the scheduled_start_time
from the context to generate the start_date
and end_date
.
What is the cleanest way to write this logic given the fact that the parameters will only be available at run time?
I currently have a task that takes both parameters and outputs a tuple of dates.
This tuple has either the value of the parameters converted to a pendulum
date or the values generated from the prefect.context.scheduled_start_time
also as dates.
Is there a better way?
This is what the flow looks like now.CA Lee
09/21/2020, 1:39 AMsark
09/21/2020, 9:21 AMTraceback (most recent call last):
File "daily.py", line 95, in <module>
with Flow('start dataflow job 1', executor=LocalDaskExecutor(), storage=GCS('ghpr-dev-prefect-flows')) as startFlow1:
TypeError: __init__() got an unexpected keyword argument 'executor'
(prefect) bash-3.2$ prefect version
0.13.7
hmm i am unable to set the executor for a flow like this
https://docs.prefect.io/api/latest/core/flow.html#flow-2 says that it does have the executor
parameter though
i am trying to increase parallelism for my flowsGreg Roche
09/21/2020, 9:27 AMvenv
). If I have two separate agents running on the machine, each started from within their own venv, they can both execute their own flows successfully. As soon as I try to cut it down to one agent listening for both flows, the agent fails to execute any flows which live inside another venv ( Failed to load and execute Flow's environment: ModuleNotFoundError
). I'd like to keep the separated venvs if possible and just have one agent running, which would execute each of the flows within the context of their own venv. Is this possible, or would I need to bite the bullet and have one venv for both flows?