https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • k

    Kevin Systrom

    04/30/2020, 5:49 PM
    Guys, is there a best practice for making sure the same code is available to workers on remote boxes? I've factored out a few functions into a utility python file that I import. I'm running dask-worker manually on a remote server and it complains (obviously) ModuleNotFoundError. I thought copying the code to remote and running worker from that directory might help but no dice
    c
    j
    • 3
    • 4
  • j

    Jeremiah

    04/30/2020, 6:54 PM
    Hey everyone - @Laura Lorenz (she/her) is about to spearhead an integration with Great Expectations and we’d value any input you care to share. Please forgive the cross-post, but we’ve had so many people join the main channel in the last few weeks I wanted to make sure everyone knew what was happening in #prefect-contributors! https://prefect-community.slack.com/archives/C0106HZ1CMS/p1588271140092800
    ❤️ 5
    👍 7
  • d

    Darragh

    05/01/2020, 10:20 AM
    Morning all! I’m new to Prefect, looking to deploy the server, or whole stack, in a container onto FarGate - has anyone tried this? Can’t find much on it..
    j
    j
    • 3
    • 9
  • z

    Zviri

    05/01/2020, 12:09 PM
    Hi everyone! I have just created my first Prefect flow and deployed it to my manually set up Dask cluster via the Prefect Cloud. Everything went smoothly and I have to say this by far the best data pipeline tool I have used. However, I noticed that when I run the Flow from the cloud instead of just a script (
    flow.run
    ) the memory consumption of my Dask Workers gradually grows until they run out of memory. When I run the same flow from a script everything finishes correctly and there are no memory issues what so ever. I am using the latest
    prefect
    and
    dask
    packages. Has anyone experienced something similar? Is there anything that could be done about it? Thanks a lot.
    j
    j
    • 3
    • 12
  • j

    Jacques

    05/01/2020, 1:46 PM
    Hi all, we are waiting with much anticipation for https://github.com/PrefectHQ/prefect/issues/2041 ! It will improve performance for us a lot. Having said that I'm wondering if there is a pattern / feature that will come along with this to allow yielding results to the DFE? We have a task that is recursive in nature (walking a tree) that we implement using the LOOP pattern - wonder if there is something coming that will allow one loop to spawn multiple loops concurrently (I guess this is a separate issue), and then start processing the result of each loop in DFE way?
    j
    • 2
    • 10
  • j

    Jacob (he/him)

    05/01/2020, 4:00 PM
    Hello everyone 👋, I’m having a conceptual issue I’m hoping to get some clarity on. Say for example, you have a number of tasks you want to execute in Redshift. I’m using psycopg2 to establish a connection and create a cursor. You cannot pass the cursor object from task to task because it is not serializable. How do you execute multiple tasks within a transaction block if you need a new connection per task? Am I thinking about this wrong? Is psycopg not the recommended connection method? Thanks!
    a
    j
    • 3
    • 4
  • j

    John Ramirez

    05/01/2020, 6:00 PM
    hey everyone - is it possible to run a workflow in a docker image outside of the cloud
    j
    • 2
    • 1
  • t

    Troy Köhler

    05/01/2020, 6:01 PM
    hi! I'm not actually a prefect user, i'm just a person willing to help and contribute in open source project. I'm not very experienced as developer, but I'm quick-learner and have enough spare time during self-isolation to grow my skills by ton of practice. I'm from Kyiv, Ukraine, would love to connect. I've already made my first PR and going to make more 🐛
    💯 3
    :marvin: 5
    👋 5
    j
    • 2
    • 5
  • j

    John Ramirez

    05/01/2020, 6:19 PM
    hey everyone - separate question: I have a workflow and in a very rare instance need to run 3000 runs in parallel. Is there an easy way to point the worflows at a different env at at the time of execution
    j
    • 2
    • 5
  • t

    tiz.io

    05/01/2020, 6:49 PM
    Coming from Airflow, I've become reliant on the detailed statistics and analytics for my jobs. Is this on the roadmap for Prefect?
    j
    j
    • 3
    • 10
  • c

    Crawford Collins

    05/01/2020, 7:18 PM
    how does everyone test the results of a task?
    with Flow("test_target_transformer") as f:
            te = fit_target_transformer(problem, target, train_data)
            df = target_encoder_transform(te, imputed_categorical_df)
     assert df.isna().sum() == 0
    I'm trying to test a @task that returns a dataframe, but assert statement errors on
    AttributeError: 'FunctionTask' object has no attribute 'isna'
    . How do I return this as a DataFrame and not a task?
    j
    j
    j
    • 4
    • 4
  • m

    Matt

    05/02/2020, 7:22 AM
    Hi! First time trying this out. I got the server running locally. Can someone point me to instructions on how to connect to github? I would like to set it up so the first step is the pull the most recent code and executes that. I'm coming from Airflow so might be missing something so any help is appreciated!
    j
    k
    • 3
    • 3
  • r

    Ruben Stolk

    05/02/2020, 10:00 AM
    Can anyone tell me what is going wrong on my local?
    log
    j
    • 2
    • 1
  • d

    David Haines

    05/02/2020, 12:25 PM
    Just a quick one to see if
    ifelse
    has been integrated within the imperative api yet?
    c
    • 2
    • 2
  • f

    Florian

    05/03/2020, 12:31 PM
    Hello everyone, thanks for this amazing product, I am just getting started with it. I was wondering the following though. I want to execute different flows using a local dask cluster. But as the documentation aptly describes, running
    flow.run()
    leads to a blocking process until the flow has finished. Is there a way of submitting jobs to the dask cluster in an almost fire and forget scenario? Especially since my outputs are written to file or DB I really don’t need the return/result object. I basically want to dynamically create workflows submit them to the dask cluster and then have them finish based on available resources. Could be that I am overlooking something super simple or using prefect for something it is not build for 🤷‍♂️ but would be nice to know 🙂
    c
    • 2
    • 3
  • p

    Pierre CORBEL

    05/03/2020, 12:44 PM
    Hello there 👋, I ain't able to see logs of my tasks on Prefect Cloud for some days now. Does something has changed? The only logs I'm able to see are those from the Agent or the CloudFlowRunner (see attached image n°1) One strange thing is when a task failed, I can see the
    Last State Message
    in the UI but not in the logs tab (see attached image n°2 and n°3) For info, I'm using RemoteEnvironment with LocalDaskExecutor on Prefect 0.10.5
    c
    • 2
    • 6
  • n

    Nate Atkins

    05/04/2020, 12:37 AM
    Hopefully someone can give me a light thump on the head to show me the errors of my thinking or teach me the magic Prefect incantation to simplify how I'm trying to solve the data science workflow. I have a pipeline consisting of Query, Prepare, Train, and Evaluate tasks. Each of them is time consuming and depends on the output from the previous task. I can turn on Checkpointing, Caching with a LocalResultHandler. The developer process we used previously kept a dependency tree off what python file and source file(s) were used to generate the next file. If you deleted a data file or updated a source file (think Task) and ran the pipeline it would figure out what task needed to be run. Let say you updated the code in the Model Task then the cached output of Prepare would be used with the new model task code to generate a model that was evaluated by the Evaluate task. I think of it is as doing a reverse look up on the DAG to find the earliest upstream task that needs to be run. All downstream of any task that is run are also run and their cached results aren't used. As I'm currently using Prefect if the Model Result is missing the Model Task will be rerun, but the Evaluate Task will find existing cached inputs and blindly use them. 1. Is there some slick way to handle this pattern in Prefect that I just haven't found yet? 2. Currently I have a CacheValidatorFactory that builds the cache function with a list of source files and destination files. The validator checks to see if any source files are newer than any destination files or if any destination files are missing to determine if the cache is valid or not. If a source file is newer or a destination file is missing then the cache is invalid and the task runs. This may be related to https://github.com/PrefectHQ/prefect/issues/2104 I've also noticed that if the output of a task is cached and used that it still reads all the cached inputs. 1. Is there a way to have a different cache_validator for inputs and results? 2. If the results cache will be used can we avoid loading the cached inputs? 3. What happens if the Prepare runs, Model doesn't and Evaluate should? Not sure my example makes total sense, but how do we trace back through the DAG to see which upstream results have changed?
    c
    a
    • 3
    • 3
  • c

    Chris O'Brien

    05/04/2020, 3:56 AM
    Hi Prefect Team, I would love your thoughts on if this is the Prefect way on doing this. I have a scheduled flow where we need a
    run_date
    that is yesterday UTC time. I would love to use the prefect context to do this and this worked for me, but is there a better way?
    Untitled.txt
    c
    • 2
    • 5
  • t

    Thomas La Piana

    05/04/2020, 6:41 AM
    Has anyone gotten flows executing in a Dask Gateway remote environment? based on the docs it looks like i'll be writing my own, but wanted to double check there isn't a way to do it currently without writing a new environment
    j
    • 2
    • 7
  • a

    Adisun Wheelock

    05/04/2020, 1:16 PM
    I'm getting a request error when I try to
    prefect auth login -t my_generated_token
    requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: <http://localhost:4200/graphql/alpha>
    has anyone else ran into this? The server start up brought up all services just fine.
    j
    • 2
    • 26
  • a

    Avi A

    05/04/2020, 2:45 PM
    Hey! We want to use Prefect with k8s in a way that would automatically connect to k8s API and will create Pods according to our real-time requirements. can this be done with Prefect?
    k
    • 2
    • 6
  • k

    Ketan Bhikadiya

    05/04/2020, 4:38 PM
    New to prefect : Do we need to install/setup anything locally if we use prefect cloud ?
    k
    • 2
    • 5
  • p

    Pedro Machado

    05/04/2020, 6:39 PM
    Hi everyone. I am evaluating Prefect and would like to convert a workflow that is currently running in Airflow. This workflow calls a reporting API that returns a list of URLs pointing to data extracts (multiple compressed csv files for each report). The input parameters of the API include the start and end dates, a "report id" that tells the API the set of data we want, and a parameter that controls the interval of the report (daily, weekly, or monthly). The next step is to download each of those URLs and store them in s3 using a predefined prefix naming convention. Each report can have different a frequency and data readiness delay. For example, some weekly reports are ready on Wednesdays others are ready on Saturdays. Some daily reports are ready the next day, others the following Saturday. In principle, there is a single flow that 1) waits for a number of days or hours 2) queries the API to see if a given report is ready and 3) downloads multiple files to s3. In Airflow, I have multiple DAGs (one for each report/frequency combination). Each DAG pulls a report based on the inputs and
    execution_date
    . I am trying to figure out how I could implement this in Prefect. It seems to me that I could have a single parametrized flow but I am not sure how the parameters would be provided and the flow scheduled. Thanks!
    k
    • 2
    • 3
  • t

    tiz.io

    05/05/2020, 12:38 AM
    I've just converted an event-based ETL job to Prefect. ATM it just kicks off the job and exits immediately. I'd love to see PIN 14 implemented so Prefect can observe the events and mark the job done when the appropriate event is received.
    k
    t
    • 3
    • 7
  • b

    Brad

    05/05/2020, 2:14 AM
    Hey team - is there a way to find out via the graphql api about which upstream task is linked to a downstream task run ? Specifically I’m interested in the input parameters to an upstream task
    j
    • 2
    • 13
  • p

    Pedro Machado

    05/05/2020, 3:53 AM
    Hi. What's the recommended approach to implement something like an Airflow sensor? For example, one that will wait until a certain time or one that will poll an API or run a database query until it returns a certain value? Would a simple long running task with a loop and a delay work?
    j
    • 2
    • 2
  • s

    Sainath

    05/05/2020, 11:31 AM
    Hi! I am new both to Airflow and Prefect I have a scenario that when a file comes into a certain folder on a machine, I will have to run a python program and parse and send the contents of that file to a DB I was able to do it with using directory watchers in python (by using inotify or watchdog), I might have close to 1000 files coming to that directory anytime, so my file watchers would be always running But since discovering about the prefect project, I would want to incorporate prefect in the whole picture I was wondering if there is something of a directory watch trigger (which would trigger a data flow as soon as a file comes into that watch directory) available in prefect? similar to PIN 8
    j
    l
    • 3
    • 3
  • t

    Thomas La Piana

    05/05/2020, 11:35 AM
    When i set up my flow with storage and an environment and then call
    flow.run()
    locally, it automatically defaults to the local executor instead of using the environment that i set for it, but when i register it and a kube runner picks it up it runs it with the correct environment. does anyone know why?
    z
    • 2
    • 2
  • a

    Avi A

    05/05/2020, 7:45 PM
    Hey community! Is there a way to run a flow inside a different flow? I mean suppose I have an existing flow and want to trigger it as part of a different flow, without specifying the flow again. Is there a way to do that? e.g. a Task that wraps an existing flow? Preferably a flow that is registered to the orchestration server?
    z
    • 2
    • 10
  • i

    itay livni

    05/05/2020, 10:06 PM
    Hi - I am encountering a botocore
    NoCredentialsError('Unable to locate credentials')
    error using Docker Storage (local image), prefect cloud and the
    s3ResultHandler
    When the flow is run locally. There are no credential issues initializing the handler
    s3_handler = S3ResultHandler(bucket='some-bucket')
    To resolve the issue I created a Prefect Secret called
    "AWS_CREDENTIALS"
    and tried
    s = Secret("AWS_CREDENTIALS")
    aws_scrts = s.get()
    s3_handler = S3ResultHandler(bucket='some-bucket', aws_credentials_secret=aws_scrts)
    What is the best way to resolve aws credential error? Thanks
    z
    • 2
    • 20
Powered by Linen
Title
i

itay livni

05/05/2020, 10:06 PM
Hi - I am encountering a botocore
NoCredentialsError('Unable to locate credentials')
error using Docker Storage (local image), prefect cloud and the
s3ResultHandler
When the flow is run locally. There are no credential issues initializing the handler
s3_handler = S3ResultHandler(bucket='some-bucket')
To resolve the issue I created a Prefect Secret called
"AWS_CREDENTIALS"
and tried
s = Secret("AWS_CREDENTIALS")
aws_scrts = s.get()
s3_handler = S3ResultHandler(bucket='some-bucket', aws_credentials_secret=aws_scrts)
What is the best way to resolve aws credential error? Thanks
z

Zachary Hughes

05/05/2020, 10:15 PM
Hi @itay livni, taking a look now! Will get back to you with an answer ASAP.
It looks like you might be giving the
S3ResultHandler
your secret values, rather than the name of your Prefect secret. Do you still get the
NoCredentialsError
when you provide
aws_credentials_secret="NAME_OF_YOUR_SECRET_HERE"
? Here are a couple of links below for reference, if that's at all helpful. https://docs.prefect.io/api/latest/engine/result_handlers.html#s3resulthandler https://github.com/PrefectHQ/prefect/blob/master/src/prefect/engine/result_handlers/s3_result_handler.py#L68
i

itay livni

05/05/2020, 10:45 PM
@Zachary Hughes Just to be clear the AWS Prefect Secret name has to be all caps?
"AWS_CREDENTIALS"
z

Zachary Hughes

05/05/2020, 10:46 PM
Not necessarily-- it just needs to be the name of your secret. I just used all-caps to denote that this was a portion of code that needed to be replaced. 🙂
i

itay livni

05/05/2020, 10:47 PM
So something else is going on. Because this flow was running previously in the same enviroment.. using different version of prefect
z

Zachary Hughes

05/05/2020, 10:48 PM
Hmm... do you recall what version of Prefect you were on, and what version you're on now?
i

itay livni

05/05/2020, 10:49 PM
well I am on 0.10.6
and the secret should look like this?
'{"ACCESS_KEY": "something", "SECRET_ACCESS_KEY": "something"}'
z

Zachary Hughes

05/05/2020, 10:54 PM
I believe so!
i

itay livni

05/05/2020, 10:55 PM
Strange I did not need to muck around with this previously
z

Zachary Hughes

05/05/2020, 11:09 PM
I can ask around to see if folks know of any changes that could have impacted this! It depends on your setup, but I'm also wondering if boto's picking up changes in your environment/config, as the S3ResultHandler defaults to standard boto3 auth if you don't provide a secret.
Out of curiosity, how were you providing AWS creds previously?
i

itay livni

05/05/2020, 11:17 PM
I was not. It was magically happening behind the scene "If not provided, 
boto3
 will fall back on standard AWS rules for authentication."
Now - That stopped working
s = Secret("aws_credentials")
aws_scrts = s.get()

s3_handler = S3ResultHandler(bucket='some-bucket', aws_credentials_secret=aws_scrts)
And when I build an image, using
flow.register()
and the code above I get this error.
Beginning health checks...
System Version check: OK
Traceback (most recent call last):
  File "/root/.prefect/healthcheck.py", line 123, in <module>
    flows = cloudpickle_deserialization_check(flow_file_path)
  File "/root/.prefect/healthcheck.py", line 39, in cloudpickle_deserialization_check
    flows.append(cloudpickle.load(f))
  File "/MOC/etl_moc.py", line 48, in <module>
    aws_scrts = s.get()
  File "/usr/local/lib/python3.7/site-packages/prefect/client/secrets.py", line 139, in get
    ) from None
ValueError: Local Secret "aws_credentials" was not found.
and the secret does exist:
z

Zachary Hughes

05/05/2020, 11:29 PM
It looks like you're passing in the value of
s.get()
, rather than the name of your secret. Can you try passing in the secret name "aws_credentials" like this?
s3_handler = S3ResultHandler(bucket='some-bucket', aws_credentials_secret="aws_credentials")
😁 1
i

itay livni

05/06/2020, 12:12 AM
@Zachary Hughes - For closure. (1) Why I "suddenly" need to use a prefect aws configuration is a mystery. (2) Using prefect secret works.
Thank you for your patience!
z

Zachary Hughes

05/06/2020, 12:18 AM
No patience required-- just glad it's working for you! 🙂 Haven't been able to isolate any core changes that might have prompted this shift, but I'll be sure to keep an eye out.
View count: 1