Marvin
09/18/2020, 6:15 AMFanilo A.
09/18/2020, 7:09 AMMarvin
09/18/2020, 7:09 AMCarl R
09/18/2020, 7:15 AMMarvin
09/18/2020, 7:15 AMRobin
09/18/2020, 8:26 AMmax_retries
and retry_delay
and I got the following error:
/opt/prefect/healthcheck.py:149: UserWarning: Task <Task: copy_storage> has retry settings but some upstream dependencies do not have result types. See <https://docs.prefect.io/core/concepts/results.html> for more details.
result_check(flows)
It's a mapped_task
.
I read the documentation about result types but that did not answer several questions:
• What might go wrong if I don't set a result type?
• If I understood correctly, the result type can be either flow
or task
. I think I want that the result type is flow, since each task should be only run once (if successful) so the result does not change during the flow. Is that correct?
• Should I therefore do something like with Flow("creative_name", result=flow_result)
? And what should I set flow_result
to?
Bests
RobinJacob Blanco
09/18/2020, 9:31 AMVinod Sugur
09/18/2020, 1:42 PMMarvin
09/18/2020, 1:42 PMDolor Oculus
09/18/2020, 4:21 PMprefect backend server
prefect server start
prefect agent start local -p .
server launches ok, ui looks good, I can register a simple hello world flow,
with Flow("Welcome Flow") as pf:
print("Hello world")
pf.register(project_name="Hello, World!")
Registering this flow results in
Hello world
Result Handler check: OK
Flow: <http://localhost:8080/flow/a5ec20aa-ad9e-4add-977e-cffe9699eba3>
But when I run the flow from the UI, I get this error message:
[2020-09-18 16:20:35,115] ERROR - agent | Error while deploying flow: FileNotFoundError(2, "No such file or directory: 'prefect'")
Any thoughts? ty 🙂Hamza Ahmed
09/18/2020, 5:34 PMfrom prefect import Flow, task
from prefect.tasks.aws.secrets_manager import AWSSecretsManager
from prefect.tasks.postgres.postgres import PostgresFetch
@task
def print_me(to_print):
print(to_print)
with Flow('fetching-data') as flow:
credentials = AWSSecretsManager(secret='pg/prefectsql/prefect')
print_me(credentials)
pg_user = credentials['username']
pg_pass = credentials['password']
pg_host = 'localhost'
pg_port = credentials['port']
pg_query = 'SELECT * FROM hdb_catalog.hdb_table LIMIT 5;'
runsql = PostgresFetch(db_name='prefect', user=pg_user, host='localhost')
result = runsql(password=pg_pass, query=pg_query, fetch='all')
print_me(result)
flow.run()
the PostgresFetch initialization doesn't work when I try to use user=credentials['username']
, but it does when I hardcode the username, or even if I set pg_user
to the a string containing the username
The error flow run produces the below output:matt forbes
09/18/2020, 5:52 PMjars
09/18/2020, 8:12 PMMinakshi
09/18/2020, 8:43 PMModuleNotFoundError: No module named 'dask.system'
I found a resolution for that online namely trying to use the latest version of daks and distributed https://github.com/dask/distributed/issues/3331. But problem here is that i need to add these to our internal repo. So i need to confirm what all are the dependent packages with versions so that i can check or add all at once as required. Where can i get the list for that?Dolor Oculus
09/18/2020, 9:33 PMwith Flow() as flow:
e = Extract()
t = Transform(e)
l = Load(t)
state = flow.run()
syntax? I'm getting key not found results on asserting state.result[e], and wondering if you have to do it in the way given in the testing-flows link above to unit test the flows.Glen Trudeau
09/18/2020, 10:14 PMFailed to load and execute Flow's environment: ModuleNotFoundError("No module named 'sqlalchemy'")
when running a flow on Fargate agent which I assume is due to the fact that the base docker image doesn’t have that module installed. But when we tried downloading the public image, adding the necessary modules and then uploading to ECR it doesn’t appear that the agent is utilizing the updated image even though we called it inside of the container definitions (reference python wrapper). We see the below screenshot for the new task definition in ECS which means it isn’t pulling the specified image correctly. Any suggestions?CA Lee
09/19/2020, 11:08 AMplt.savefig
takes in a file_path
arg (below is some sample code):
CWD = os.path.dirname(os.path.abspath(__file__)) # Get CWD
file_path = os.path.join(CWD, "picture.png") # Save in CWD
plt.savefig(file_path)
This all works fine when I am running it on my server using flow.run()
, however, when it is registered on cloud, invoking a Quick Run results in the error : PermissionError: [Errno 13] Permission denied: '/picture.png'
The error shows that file_path
is unable to be parsed to the current working directory that the script is being run from when triggered from Cloud.
Would appreciate any insight.. if it helps, I am getting data from Google Sheets, plotting it and sending it to Slack, so I'd like to Prefect this workflow.Eric
09/20/2020, 1:58 AMEric
09/20/2020, 1:58 AMEric
09/20/2020, 1:59 AMCA Lee
09/20/2020, 9:54 AM@task(cache_for=datetime.timedelta(days=1))
def fetch_data():
return fetched_data
@task
def insert_into_database(fetched_data):
fetched_data.to_sql('table_name', con=db.engine)
itay livni
09/20/2020, 12:24 PMAnshaj
09/20/2020, 1:10 PMMarvin
09/20/2020, 1:10 PMJeff Brainerd
09/20/2020, 2:48 PM0.13.2
with heartbeats off and Lazarus on. Is this a known issue? Should Lazarus be picking these up? Thanks!Eric
09/20/2020, 5:33 PMEric
09/20/2020, 5:34 PMjosh
09/20/2020, 10:41 PM0.13.7
has been released and here are a few notable changes:
🛠️ Fixes, lots of fixes
🗜️ Data compression for S3 upload/download tasks
🤫 Quieter debug logs when starting server
A big thank you to our contributors who helped out with this release! Full changelog:Pedro Machado
09/21/2020, 12:45 AMstart_date
and end_date
.
If these are not provided, I want to use the scheduled_start_time
from the context to generate the start_date
and end_date
.
What is the cleanest way to write this logic given the fact that the parameters will only be available at run time?
I currently have a task that takes both parameters and outputs a tuple of dates.
This tuple has either the value of the parameters converted to a pendulum
date or the values generated from the prefect.context.scheduled_start_time
also as dates.
Is there a better way?
This is what the flow looks like now.CA Lee
09/21/2020, 1:39 AMCA Lee
09/21/2020, 1:39 AMLaura Lorenz
09/21/2020, 2:53 PMCA Lee
09/22/2020, 1:10 AMLaura Lorenz
09/22/2020, 1:12 AMCA Lee
09/22/2020, 1:13 AM@task(cache_for=datetime.timedelta(days=1))
def get_complaint_data():
do something
raw = get_complaint_data()
parsed = parse_complaint_data(raw)
populated_table = store_complaints(parsed)
Question being that lets say some fetching of data (e.g. a web scraping script) is run on an hourly interval.
Caching would help prevent the fetching from running again, but how would I then stop the parsing and populating, based on the cached state of the fetching data step? ( as it wouldn't make sense to clean or store the same cached data again )Laura Lorenz
09/22/2020, 1:32 AMcache_keys
to mark that all of those tasks share the same cache, and thus can consider themselves cached as long as that cache key is not invalidated. See https://github.com/PrefectHQ/prefect/blob/master/src/prefect/core/task.py#L156 and the last bullet in https://docs.prefect.io/core/concepts/persistence.html#output-caching (I know the api docs says deprecated there, but I'm pretty sure it's not actually deprecated yet until https://github.com/PrefectHQ/prefect/issues/2619 is done, in which case you would move that configuration onto the result).
You could also use a custom trigger (https://docs.prefect.io/api/latest/triggers.html#triggers) since all triggers get their upstream dependency's edges and states (https://github.com/PrefectHQ/prefect/blob/b9914890dfec52610a42cd694427badafab8c8ba/src/prefect/triggers.py#L174) but depending how many other dependencies those tasks have it could get quite tricky, and afaik we don't have a published example that operates on specific upstream tasks to decide a trigger -- it should be possible, we just don't have any examples so you'd have to reverse engineer it a bit 🙂CA Lee
09/22/2020, 12:53 PM