https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • m

    Mikael

    07/11/2020, 6:20 AM
    I am trying to work out how to work with relational database sources in Prefect. Does anyone have an example? The usual process is that I have a bunch of tables that needs to be extracted and later loaded somewhere. I have tried the postgres-task. It works, but how would I structure my code. It is not always great to read 100M rows into memory by using
    fetch='all'
    . If I want to read 5000 rows, store them somewhere and then read another 5000 rows. How would i proceed with prefect?
    j
    • 2
    • 2
  • e

    Espen Overbye

    07/11/2020, 10:51 AM
    Hi, We have a set of workflows that we are in process of prefectifying, but have stumbled on a problem. We are processing weather data, and the preferred python libraries for handling meteorological data (NetCDF) expect files to be accessible on a mounted filesystsem. Our preferred option is to run flows on kubernetes, but it is not clear to us how to mount a volume on the pods that execute the flows. All tips greatly appreciated
    c
    j
    • 3
    • 20
  • r

    Rafal

    07/11/2020, 11:42 AM
    Hello! I have the following use case: launch specific container (expose to end user), mount volume and clone gitlab repo there (usage of analytical environment by user) push changes to repo and end running of container. What is the best tool and approach to run such a scenario, with as little as possible user interference? Prefect?
  • s

    Sebastian

    07/11/2020, 6:06 PM
    Hi everyone, I was wondering if anyone could kindly give some pointers to a data engineering noob like me. đŸ€“ I have to ingest about 10GB of data from xlsx files into a sql starschema. The person working on this before me used Pandas to do all the data cleaning, surrogate key creation etc. I am worried that as we get more data that at some point it won't fit into memory anymore and that we will have a problem. đŸ€Ș What are the best approaches for this in your opinion? Do you put the data into some intermediary SQL table? Do you use Dask? (Taking into consideration that I would like to learn/leverage😛refect:).
    a
    s
    • 3
    • 8
  • b

    Beibei

    07/13/2020, 7:25 AM
    Hi All! I am trying to use Parameters / Context to control two of my tasks in one Flow. At first to try, I wrote two command lines to get Parameters, e.g.
    a = Parameter('a', default=1)
    b = Parameter('b', default=2)
    In Prefect API, I only see one Parameter for
    b
    . Did I miss something? If I need to use multiple controls for different arguments in functions, which one do you suggest me to use, Parameter or Context?
    j
    • 2
    • 4
  • k

    Klemen Strojan

    07/13/2020, 9:17 AM
    Do we need to re-register the flow (using GitHub storage) if we change the default value of a parameter?
    :upvote: 1
    j
    • 2
    • 2
  • i

    itay livni

    07/13/2020, 2:52 PM
    Hi - Under what scenarios would a flow take longer to run locally with the DaskExecutor than the LocalExecutor? Is to many small tasks one of them?
    c
    • 2
    • 6
  • a

    Amit Singh

    07/13/2020, 3:21 PM
    <!here> @Dylan I'm trying to reload a settings module in a flow for each scheduled iteration, but it never loads the changes in settings module. Is that because of some restrictions with the flow, that I might be overlooking
    with Flow('ETL Status Cron Flow', schedule=CronSchedule("*/5 * * * *")) as cron_flow:
    	work_conf = importlib.import_module('settings.work_settings')
    	importlib.reload(work_conf)
    	my_settings = getattr(work_conf, 'my_settings')                        
    
    	if(my_settings['active'] == False):
    		return
    	else:
    		print('do something')
         
    state = cron_flow.run()
    d
    j
    • 3
    • 10
  • s

    Slackbot

    07/13/2020, 3:48 PM
    This message was deleted.
    k
    a
    • 3
    • 2
  • a

    Aaron Y

    07/13/2020, 3:53 PM
    Hi all, I'm using fargate to set up my task environment and using non-dockerized containers in s3. I can see the task starting on fargate and I'm connected to the cluster, but I do not see the flow running. It's stuck after "Deploying flow run " and "Adding tags for flow_id and version". Not sure why it's stuck 😕 cc: @itay livni
    fargate.py
    from prefect.agent.fargate import FargateAgent
    agent = FargateAgent(
        #
        enable_task_revisions=True,
        launch_type="FARGATE",
        taskRoleArn="arn:aws:iam::#####61:role/ayang-role",
        executionRoleArn="arn:aws:iam::#####61:role/ayang-role",
        family="#####-task",
        cluster="#####-fargate-cluster",
        networkConfiguration={
            "awsvpcConfiguration": {
                "assignPublicIp": "ENABLED",
                "subnets": ["subnet-#####11"],
                "securityGroups": []
                }
        },
        cpu="1024",
        memory="4096",
        containerDefinitions = [
            {
                "logConfiguration": {
                    "logDriver": "awslogs",
                    "options": {
                        "awslogs-group": "/ecs/my-first-task",
                        "awslogs-region": "us-east-2",
                        "awslogs-stream-prefix": "ecs"
                    }
                },
                "image": "#####.<http://dkr.ecr.us-east-2.amazonaws.com/pga_scraper:latest|dkr.ecr.us-east-2.amazonaws.com/pga_scraper:latest>",
                "name": "pga_scraper"
            }
        ],
        labels=["s3-flow-storage"]
    )
    agent.start()
    k
    i
    • 3
    • 11
  • h

    Hugo Shi

    07/13/2020, 5:32 PM
    Hi all, I've got kind of a dumb question. If I already have a bunch of dask.delayed functions, and I'm going to pull them into a prefect, what's the best way to do that? Should I call compute on them within the prefect Task?
    👀 1
    k
    j
    +3
    • 6
    • 12
  • j

    Jacob Blanco

    07/14/2020, 1:24 AM
    Is there any way to separate secrets for different flows? The use case is we have some jobs that interact with our PostgreSQL DB, some are for pulling data so we would like to keep those with read-only credentials and others are ETL so they need read-write. We would like to avoid allowing users to inadvertently create flows that might end up deleting data. Is there a concept of namespace or some other mechanism for segmentation? So for example if Flow A runs and pulls secret
    my_password
    it'll be different a different password compared to if Flow B pulls the same secret?
    k
    • 2
    • 6
  • j

    Jackson Maxfield Brown

    07/14/2020, 4:46 AM
    Is caching done entirely based off of file existence? Has there been thought into other methods of result caching? I know there it's possible to add a
    Serializer
    but has there been thought into a
    Verifier
    or similar? (Entirely just curious at this point)
    c
    • 2
    • 5
  • p

    Philip MacMenamin

    07/14/2020, 6:33 AM
    Hey - I'm having an issue running a Flow within Docker (https://docs.prefect.io/orchestration/tutorial/docker.html#persisting-your-flow-with-docker-storage) I've set
    prefect backend server
    Started
    prefect agent start docker
    Copied the Hello Docker flow from the web page above, Registered that flow, it created the image without issue:
    `Successfully tagged hello-docker:2020-07-14t05-57-08-042222-00-00
    Flow: <http://localhost:8080/flow/e25b220f-c9e4-4051-9f00-20ca88122a99>`
    and submit a run via the GUI server running on localhost:8080. The run enters a state of Submitted and does not seem to get any further, it stays in that state.
    👍 1
    k
    j
    • 3
    • 9
  • d

    Darragh

    07/14/2020, 8:27 AM
    Hey guys, does anyone have any experience using the FargateTaskEnvironment? Specifically the aws_sesison_token, having problems getting AWs to accept it, I keep getting the following error:
    Failed to load and execute Flow's environment: ClientError('An error occurred (UnrecognizedClientException) when calling the RegisterTaskDefinition operation: The security token included in the request is invalid')
    Sample config for the environment:
    flow.environment = FargateTaskEnvironment(
        launch_type="FARGATE",
        region="us-east-1",
        aws_session_token="FwoGZXIvYXdzEOL//////////wEaDFSgRMmr3M07yXJ3gCKCAdZO6f/LRZc6b7DjDip0lrTvCa+FDQpFAGJEyB6Ka1tF9By3fTgKkbqSM6EnuHQgTEviQJrOn13E7wlvKKVV1++YGaa3gb1Pn9q12BxCN7I6SvQ8oBW9AE73Judo0tuYTdTc5eYC7m2PaYU/d5fkRIj29EJWp9EpO3+yq/S1saxiGvYosKGy+AUyKBOOerH4ymGN9lxo/5aprU5DXumyaC6yg2satDNNoUdPaSBQ2R8fNq0=",
        cpu="256",
        memory="512",
        enable_task_revisions="True",
    ....
    ....
    )
    z
    g
    • 3
    • 81
  • j

    joao

    07/14/2020, 8:48 AM
    Hi The tutorials, both the "Running a Flow" and the "Universal Deploy" tutorials, fail. Both on the 
    prefect auth login
     step. Prefect tries to connect locally rather than to your cloud:
    requests.exceptions.ConnectionError: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f5aa8c9f630>: Failed to establish a new connection: [Errno 111] Connection refused',))
    How do I tell Prefect that it should connect to the cloud?
    j
    • 2
    • 2
  • k

    karteekaddanki

    07/14/2020, 8:55 AM
    @Philip MacMenamin Came here to report exactly the same issue. The task state doesn't change. It seems to work quite alright on the cloud.
  • b

    Bernard Greyling

    07/14/2020, 9:39 AM
    Hey guys, I have a question regarding triggering
    CloudFlowRunner
    flows. Locally I have a flow setup and registered to the cloud backend. I can trigger the flow via the web-ui, however when running locally I get the following:
    >> cloud_flow = CloudFlowRunner(flow)
    >> flow_state = cloud_flow.run(return_tasks=flow.tasks)
    >> flow_state
    [2020-07-14 09:37:58] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'Map Reduce'
    <Failed: "Could not retrieve state from Prefect Cloud">
    Am I missing something here?
    j
    • 2
    • 7
  • r

    Robin

    07/14/2020, 10:55 AM
    Hey folks, We are still new to prefect and have a question in terms of code organization: Do you have an extra repository for prefect flows (âŹ…ïž) or do you save the prefect flows within other repositories (âžĄïž)? For âŹ…ïž (extra repo), how do you import the Python packages? And in general, how does your CI/CD look like? We are setting up a backend from scratch (using GitLab, pulumi (or terraform), dbt, snowflake and sigma (or maybe looker, but might be too expensive..), so we are happy to hear best practices and learn from you! 🙂
    âžĄïž 6
    âŹ…ïž 3
  • k

    Klemen Strojan

    07/14/2020, 11:59 AM
    Hey all - I am using Secret like this:
    task = ShellTask()
    
    m = Secret('my_secret_on_the_cloud').get()
    
    with Flow('my_flow') as flow:
        name = task(
            helper_script='cd ~'
            env={'CREDENTIAL':m},
            command="mysqldump -u my_user -p$CREDENTIAL my_db > my_db.sql"
            )
    while I should be using it like this:
    task = ShellTask()
    
    with Flow('my_flow') as flow:
        m = Secret('my_secret_on_the_cloud')
        name = task(
            helper_script='cd ~'
            env={'CREDENTIAL':m},
            command="mysqldump -u my_user -p$CREDENTIAL my_db > my_db.sql"
            )
    The later fails with
    TypeError
    . I have the secret on the cloud and in the local
    config.toml
    for testing purposes. What am I doing wrong?
    j
    r
    • 3
    • 8
  • p

    Peter Peter

    07/14/2020, 12:18 PM
    Hello,   I am new to prefect and am trying to use DbtShellTask which does work. The issue I am having is that if the dbt fails the error is not the full message compared to if I were to run it manually. For example if compiling of the dbt fails it will return compilation error but excludes the file that the error occurs in. Running manually gives the same error message and include which file the error is in.   Another example is if the dbt fails during execution running manually I get the following messages:  
    10:33:16 | Concurrency: 1 threads (target='dev')
    10:33:16 |
    10:33:16 | 1 of 4 START table model dbt_test.add_rid............................ [RUN]
    10:33:28 | 1 of 4 OK created table model dbt_test.add_rid....................... [SELECT 161069 in 11.75s]
    10:33:28 | 2 of 4 START table model dbt_test.union.............................. [RUN]
    10:33:36 | 2 of 4 OK created table model dbt_test.union......................... [SELECT 159280 in 7.86s]
    10:33:36 | 3 of 4 START table model dbt_test.union_join_test.................... [RUN]
    * Deprecation Warning: The adapter function `adapter.get_columns_in_table` is
    deprecated and will be removed in a future release of dbt. Please use
    `adapter.get_columns_in_relation` instead.
    Documentation for get_columns_in_relation can be found here:
       <https://docs.getdbt.com/docs/adapter>
    10:33:36 | 3 of 4 ERROR creating table model dbt_test.union_join_test........... [ERROR in 0.22s]
    10:33:36 | 4 of 4 START table model dbt_test.union_join......................... [RUN]
    10:33:50 | 4 of 4 OK created table model dbt_test.union_join.................... [SELECT 159280 in 14.28s]
    10:33:50 |
    10:33:50 | Finished running 4 table models in 35.12s.
      When I run from DbtShellTask for the same dbt workflow I get this message:  
    July 14th 2020 at 7:37:14am | prefect.DbtShellTask
    ERROR lens
    Command failed with exit code 1: Done. PASS=3 WARN=0 ERROR=1 SKIP=0 TOTAL=4
      Trying to make it easier to track down the error. Is there anyway to include these full error messages?   Here is a sample of how I am doing this. I was hoping return_all would return all messages from dbt.  
    with Flow(name="dbt_flow") as flow:
       task = DbtShellTask(
           profile_name='default',
           environment='dev',
           dbt_kwargs={
                   'type': 'postgres',
                   'threads': 1,
                   'host': 'IP',
                   'port': 5433,
                   'user': 'username',
                   'pass': 'docker',
                   'dbname': 'actualDbName',
                   'schema': 'dbt_test'
           },
           overwrite_profiles=True,
           profiles_dir='Actual Path', return_all=True
       )(command='dbt run' )
     
    flow.register()
      Any help would be great.   P
    j
    • 2
    • 3
  • k

    karteekaddanki

    07/14/2020, 3:23 PM
    Hey guys, Is it possible to trigger one flow to run another? If so, is there a prefect recommended way of doing it? I have a workflow where there are some heavyweight tasks that do not change much with time (related to parsing and data normalization). Tasks downstream from these are driven by research and regularly require re-running. As Flow seems to be atomic when it comes to scheduling in prefect, what is the best way for me to structure my workflow in prefect? Thanks in advance.
    j
    m
    • 3
    • 8
  • z

    Zach Angell

    07/14/2020, 4:11 PM
    Is there a good way to handle situations where a Prefect agent may shut down during flow execution? I'm looking into using EC2 spot instances as Prefect agents. These instances can theoretically be shut down without warning. Ideally, any Flows that were running would be aware that something has disrupted execution, and signal that the flow needs to be re-run. If I try killing local agent process (Ctrl + C while the agent is mid-flow execution), it seems like tasks hang in the
    Submitted
    state by default.
    j
    • 2
    • 12
  • b

    Ben Fogelson

    07/14/2020, 7:09 PM
    This is a pathological case, but it seems possible to end up with multiple tasks in the same flow having the same slug:
    import prefect as p
    
    @p.task
    def add_one(n):
        return n + 1
    
    @p.task
    def add_two(n):
        return n + 2
    
    with p.Flow('f') as f:
        a = p.Parameter('a')
        b = add_one(a)
        b.slug = 'my_slug'
        c = add_two(a)
        c.slug = 'my_slug'
    
    f.get_tasks(slug='my_slug') # returns both tasks
    
    f.validate(). # passes
    j
    i
    j
    • 4
    • 7
  • k

    Kyle Combs

    07/14/2020, 9:15 PM
    Hello everyone. I am trying to access a Parameter from within a state handler using prefect.context.get("parameters", {}).get(stakeholderEmail). When I run my code I get no errors, but it does result in my variable containing the emails to be a nonetype. Is my approach the proper method for retrieving a parameter from inside a state handler, or is there a better way to do this?
    z
    j
    • 3
    • 21
  • p

    Philip MacMenamin

    07/15/2020, 12:43 AM
    Is there any documentation about how you would go about setting up prefect server to run in a containerized env? I saw Tyler Wanner's terraform setting up a GCP compute instance, but is there a way to set things up to get run out of a container?
    n
    c
    • 3
    • 4
  • k

    Karen

    07/15/2020, 1:12 AM
    Hi All, I am evaluating Prefect Core and I love the look of it at the moment and have some questions around deployments, flow registration and versioning. We currently have a Build/Deploy process that is using Azure Pipelines and I anticipate that given a repo it would bundle up the relevant flows and python files. For deployment I would like to deploy to AWS S3 buckets with a version number. I do see that there is a storage option but that would require the deploy agent to have prefect/python installed to call flow.build with the s3 storage option set. In terms of hosting I was hoping to self host in ECS the prefect server/UI and relevant agents 1. Is there an option where I can deploy versioned files myself using either terraform or aws cli to S3, if so how do I register the relevant flows? 2. I don't want my deployment pipeline to have a VPN to the relevant AWS VPC/environment so can flow.register() be completely separated from flow.build()/etc 3. For flows deployed to S3, how does the server ensure the latest version is being used/registered? 4. My AWS environments are in completely separate accounts/VPCs, and I noticed Prefect environments/agents/flows have labels, is there a particular pattern that ensures flows are reusable between environments?
    c
    • 2
    • 2
  • j

    Jacob Blanco

    07/15/2020, 7:01 AM
    I'm having issues deleting a secret. I added one with a /, e.g
    my_project/postgres_user
    that got converted to
    my_project/
    and now I cannot delete it.
    j
    • 2
    • 1
  • r

    Romain

    07/15/2020, 8:46 AM
    Hi folks, Have been checking the page about dask cluster in K8s (https://docs.prefect.io/orchestration/recipes/k8s_dask.html#dask-cluster-on-kubernetes), and there is an env variable that I cannot figure out why it is useful. Nor finding any info about it. Can someone explain what it is used for :
    DASK_DISTRIBUTED__SCHEDULER__BLOCKED_HANDLERS
    ?
    j
    c
    • 3
    • 3
  • a

    Avi A

    07/15/2020, 1:21 PM
    Prefect and dataclasses: I want to refactor my tasks to pass around dataclasses instead of dicts and stuff. now, if I have a
    dict
    , prefect handles it by planting the
    getItem
    task in the DAG:
    dict_output = generate_dict()
    a = task_a(dict_output['a'])
    b = task_b(dict_output['b'])
    How would you recommend working with dataclasses? The following piece of code won’t work, naturally (since a task doesn’t have these attributes)
    @dataclass
    def MyClass:
        a: int
        b: int
    ...
    ...
    dataclass_output = generate_dataclass()
    a = task_a(dataclass_output.a)
    b = task_b(dataclass_output.b)
    c = another_task(dataclass_output)
    j
    • 2
    • 10
Powered by Linen
Title
a

Avi A

07/15/2020, 1:21 PM
Prefect and dataclasses: I want to refactor my tasks to pass around dataclasses instead of dicts and stuff. now, if I have a
dict
, prefect handles it by planting the
getItem
task in the DAG:
dict_output = generate_dict()
a = task_a(dict_output['a'])
b = task_b(dict_output['b'])
How would you recommend working with dataclasses? The following piece of code won’t work, naturally (since a task doesn’t have these attributes)
@dataclass
def MyClass:
    a: int
    b: int
...
...
dataclass_output = generate_dataclass()
a = task_a(dataclass_output.a)
b = task_b(dataclass_output.b)
c = another_task(dataclass_output)
j

Jeremiah

07/15/2020, 1:26 PM
Hey @Avi A - behind the scenes indexing is just creating one of these
GetItem
tasks (see here) - you could replicate this with a
GetAttribute
task that basically runs
lambda x: x.attr
. Adding sugar for that might be tricky, because we don’t want to allow magic attribute access to the task class. Maybe something like
dataclass_output.output.a
or something? Just idle thoughts.
a

Avi A

07/15/2020, 1:29 PM
Thanks @Jeremiah. So
dataclass_output.output
allows direct interaction with a `Task`’s result?
j

Jeremiah

07/15/2020, 1:29 PM
No, sorry for the confusion - I was trying to think of how we might implement something like what you’re looking for. Today that isn’t implemented.
You’d need to create a
GetAttribute
task like:
@task
def get_attribute(task, attr):
    return getattr(task, attr)

a = get_attribute(dataclass_output, 'a')
:prefect: 1
a

Avi A

07/15/2020, 1:31 PM
yeah I got that part 🙂
10x!
j

Jeremiah

07/15/2020, 1:31 PM
:yes: :yes:
a

Avi A

07/15/2020, 1:33 PM
follow-up: from what I could tell I see that
GetItem
doesn’t persist even though
Result
is defined on the flow, which makes a lot of sense b/c it just extracts data from existing data. How can I do the same for this task? would it help to just set
target=None
?
j

Jeremiah

07/15/2020, 1:34 PM
I think so - there’s nothing special about the
GetItem
task AFAIK
Except that it’s created automatically when you use
[]
View count: 1