https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • m

    Marwan Sarieddine

    08/04/2020, 10:30 PM
    Hi folks - is there an example of using multiple script files when executing a flow (that are not part of a python package) ? I am using docker storage, and it seems that one solution is to specify the
    files
    dictionary and
    env_vars
    - the docker image is built successfully locally and passes the healthchecks, but when the k8s agent tries to create the prefect job I get
    ModuleNotFoundErrors
    j
    3 replies · 2 participants
  • m

    Ming Fang

    08/05/2020, 1:44 AM
    Hi. I’m trying to implement this https://docs.prefect.io/orchestration/execution/storage_options.html#non-docker-storage-for-containerized-environments using minio storage and the Kubernetes agent. My simple flow runs but I’m getting this error. Does anyone know what can be causing this?
    August 4th 2020 at 8:56:04pm | prefect.CloudTaskRunner
    Unexpected error: NameError("name 'prefect' is not defined")
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 801, in get_task_run_state
        value = timeout_handler(
      File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 188, in timeout_handler
        return fn(*args, **kwargs)
      File "minio.py", line 7, in hello_task
    NameError: name 'prefect' is not defined
    2 replies · 1 participant
  • p

    Pedro Machado

    08/05/2020, 4:58 AM
    Hi there. I am trying to register a flow for the second time and I am getting this error message:
    Traceback (most recent call last):
      File "/home/pedro/prefect/my_flow.py", line 187, in <module>
        flow.register(project_name="myproject", labels=["aws"])
      File "/home/pedro/.venvs/prefect/lib/python3.7/site-packages/prefect/core/flow.py", line 1581, in register
        no_url=no_url,
      File "/home/pedro/.venvs/prefect/lib/python3.7/site-packages/prefect/client/client.py", line 668, in register
        project = self.graphql(query_project).data.project  # type: ignore
      File "/home/pedro/.venvs/prefect/lib/python3.7/site-packages/prefect/client/client.py", line 238, in graphql
        raise ClientError(result["errors"])
    prefect.utilities.exceptions.ClientError: [{'path': ['project'], 'message': 'field "project" not found in type: \'query_root\'', 'extensions': {'path': '$.selectionSet.project', 'code': 'validation-failed', 'exception': {'message': 'field "project" not found in type: \'query_root\''}}}]
    Any ideas?
    n
    6 replies · 2 participants
  • h

    Hawkar Mahmod

    08/05/2020, 11:55 AM
    Hey folks. I've got a Prefect flow set up. For execution I am using the Fargate Agent, itself hosted on an ECS cluster. In the flow I've configured the Docker Storage to use a custom ECR repository to host the image for the flow and I then register the flow in the same module (.py file). I am however ensure how to coordinate the deployment of the agent and new flows. How are people registering flows in production and deploying their agent? Is it all part of one build process, or are they separate, and if so how do you manage it, what tools are you using for instance? I am considering using AWS CodeBuild since we already use it on other projects?
    j
    t
    +2
    6 replies · 5 participants
  • l

    Lukas N.

    08/05/2020, 1:17 PM
    Hi everyone, I'm running Prefect in Kubernetes and I'm trying to make it talk to an existing vault. We use the Kubernetes agent to deploy jobs, by default the job doesn't talk to the vault. I would need to add some annotations to the job to make it work
    <http://vault.security.banzaicloud.io/vault-addr|vault.security.banzaicloud.io/vault-addr>:
    <http://vault.security.banzaicloud.io/vault-role|vault.security.banzaicloud.io/vault-role>:
    Looking at the code of the agent the
    job_spec
    is hardwired and I cannot modify it. I've also checked the
    KubernetesJobEnvironment
    which seems like a way to go for a custom
    job_spec.yaml
    file. But in this case, the environment values specified in the prefect Kubernetes agent
    prefect agent start kubernetes --env NAME=value
    don't get passed to the custom job. They only get passed to the first Job that creates the custom job. Is there another way to have both custom annotations on Jobs and environment values passed from prefect kubernetes agent?
    j
    2 replies · 2 participants
  • a

    Adam

    08/05/2020, 1:23 PM
    Hello friends, trust you’re all having a lovely day! I’m having some issues with building my docker image due to pickling and I’m hoping someone can lend a hand. Error in the thread
    j
    20 replies · 2 participants
  • a

    Adam

    08/05/2020, 3:25 PM
    Hi all, does anyone have any examples of patterns for bulk loading data into Postgres from Prefect? Currently I’m mapping over my dataset and passing each individual record into the
    PostgresExecute
    task which runs an insert statement. In addition to the thousands of INSERTs, the task also creates and terminates a postgres connection each time which i’d prefer to avoid. Thoughts? My current implementation in the thread
    d
    d
    12 replies · 3 participants
  • r

    Riley Hun

    08/05/2020, 4:48 PM
    Hi everyone, Was wondering what would be the best way to submit a spark job to a DataProc cluster through Prefect? The way I'm doing it right now (even though I haven't yet executed it) is as follows: I have these items baked into the dockerfile for the dask workers: • gcloud command tools • spark-snowflake credentials json file • A copy of the main spark job python file Is this the correct approach?
    1 reply · 1 participant
  • s

    Slackbot

    08/05/2020, 5:00 PM
    This message was deleted.
    c
    r
    2 replies · 3 participants
  • a

    Adam

    08/05/2020, 7:49 PM
    Hopefully my last question for the day, I’m having some trouble building the docker image for my flow when one of my tasks uses calls another function (non-task) in my file. It fails the health check because of
    cloudpickle_deserialization_check
    If I stop calling that function it works again. Am I missing something? Can I not call other methods from within a task?
    j
    7 replies · 2 participants
  • g

    George Coyne

    08/05/2020, 8:53 PM
    A number of flows in prefect cloud are stuck in submitted and zombie killer is not putting them to bed, any suggestions on getting them processed?
    n
    d
    +1
    45 replies · 4 participants
  • g

    George Coyne

    08/05/2020, 8:56 PM
    Logs are clean, nothing untoward
  • a

    Amit

    08/06/2020, 1:19 PM
    Hi Team, I was looking at slack integration for notifications. The docs: https://docs.prefect.io/core/advanced_tutorials/slack-notifications.html#using-your-url-to-get-notifications says to save the slack url in the
    ~/.prefect/config.toml
    file, is there any other way like environment variable or something?
    l
    5 replies · 2 participants
  • a

    Adam

    08/06/2020, 4:10 PM
    Hi all, happy Thursday! Looking for some help with KubernetesEnvironment _`job_spec_file`._ It doesn’t seem to include my environment variables. Had a look at the source code and docs and it definitely seems it should. Would someone mind looking at my scripts and letting me know what I’m doing wrong? Could be the weird way i’m doing the builds (multi flow storage). Code in the thread
    👀 1
    d
    m
    9 replies · 3 participants
  • m

    Ming Fang

    08/06/2020, 8:00 PM
    Hi, on DockerHub I’m seeing the 0.13 images for prefect but not for server or ui. https://hub.docker.com/r/prefecthq/prefect/tags?name=0.13&amp;page=1 works https://hub.docker.com/r/prefecthq/server/tags?name=0.13&amp;page=1 does not work is this intentional?
    c
    l
    4 replies · 3 participants
  • m

    Mark McDonald

    08/06/2020, 8:14 PM
    Hi - I've been testing out the s3Result object in my flows and I generally find it amazing - nice work! However, I have one flow we're I'm struggling with it. I'm specifying my s3Result at a flow level. I'm using a control flow "case" task and downstream tasks are failing because the downstream task is trying to retrieve the result from s3 for this case task and there is nothing available. How is this supposed to work? Is the case task supposed to return a result which then gets written to s3? I assume it should.
    👀 1
    d
    m
    8 replies · 3 participants
  • d

    Darragh

    08/06/2020, 8:36 PM
    Hey guys, I have what might be the first “change of behaviour” between 0.12.X and 0.13 🙂 I was mid update of the EC2 instance we host Prefect Server on, and looks like the prefect install pulled 0.13.0, which is great, apart from one minor flaw - my deployment script doesn’t work anymore. Previously I was able to get the Prefect Server instance to talk to itself over graphql by setting the following env variables before running
    prefect backend server
    && `prefect server start`:
    export PREFECT__CLOUD__API=http://$(curl <http://169.254.169.254/latest/meta-data/public-ipv4):4200/graphql>
    export PREFECT__SERVER__UI__GRAPHQL_URL=http://$(curl <http://169.254.169.254/latest/meta-data/public-ipv4):4200/graphql>
    where the above curls evaluate to the public IP of the EC2 instance, and then previously the various containers would be able to talk to the graphql container, but it’s not working since the update to 0.13.0. Instead, in the UI I now get the following:
    Couldn't connect
    <http://localhost:4200/graphql>
    Any suggestions? Different env vars needed, something else?
    👀 2
    d
    14 replies · 2 participants
  • s

    Skip Breidbach

    08/06/2020, 10:02 PM
    Was there a change in behavior with
    flow.register()
    in 0.13? I'm getting
    'project_name' is a required field when registering a flow.
    I've verified that my backend is set to
    server
    and have debugged into the code & don't see how
    project_name
    can be optional at this point. I even tried calling Dunder Mifflin but they didn't answer.
    😂 4
    n
    m
    6 replies · 3 participants
  • s

    Skip Breidbach

    08/06/2020, 11:47 PM
    Getting a surprising-to-me error running a super-simple flow using the "Non-Docker Storage for Containerized Environments" recipe - something that was working in 12.x. If I start my agent with
    --show-flow-logs
    and try to run the registered flow via the UI. Agent picks up the job fine but reports:
    [2020-08-06 23:43:16,058] ERROR - agent | Error while deploying flow: AttributeError("Can't pickle local object 'NpipeHTTPAdapter.__init__.<locals>.<lambda>'")
    Traceback (most recent call last):
      File "<string>", line 1, in <module>
      File "C:\Python38\lib\multiprocessing\spawn.py", line 116, in spawn_main
        exitcode = _main(fd, parent_sentinel)
      File "C:\Python38\lib\multiprocessing\spawn.py", line 126, in _main
        self = reduction.pickle.load(from_parent)
    EOFError: Ran out of input
    n
    m
    +1
    9 replies · 4 participants
  • m

    Marwan Sarieddine

    08/07/2020, 3:20 AM
    Hi folks - what is the recommended way to visualize a large flow ?
    n
    5 replies · 2 participants
  • m

    Matthias

    08/07/2020, 11:32 AM
    A question following yesterdays release of 13.1 - will the Docker Tag 13.1 be added? Right now I see for UI and Server only the “latest” tag updated
    :upvote: 1
    n
    6 replies · 2 participants
  • b

    bolto6

    08/07/2020, 12:32 PM
    How i can add parameters in
    b_flow
    from
    a_flow
    and dir parameter? Flow A:
    python
    @task
    def any_work(a: str) -> str:
        return f'/tmp/{a}'
    
    with Flow('a_flow') as flow:
        name = Parameter('name', default='all')
        result = any_work(name)
    Flow B:
    python
    @task
    def any_work(a: str, b: str) -> str:
        return f'{a}/{b}'
    
    with Flow('b_flow') as flow:
        home = Parameter('home', required=True)
        dir = Parameter('dir', default='any')
    
        result = any_work(home, dir)
    Flow C:
    python
    a_flow = FlowRunTask(flow_name='a_flow', wait=True)
    b_flow = FlowRunTask(flow_name='b_flow', wait=True)
    
    with Flow('c_flow') as flow:
        name = Parameter('name', default='all')
        dir = Parameter('dir', default='any')
    
        a_flow_state = a_flow(parameters={'name': name})
    
        # this error code, but how i can add parameters in `b_flow` from `a_flow` and dir parameter?
        result = b_flow(
            upstream_tasks=[a_flow_state],
            parameters={
                'home': a_flow_state.result,
                'dir': dir,
            },
        )
    n
    1 reply · 2 participants
  • v

    Victor Apolonio

    08/07/2020, 1:50 PM
    hey guys have any plans for prefect and apache hudi?
    n
    1 reply · 2 participants
  • d

    Darragh

    08/07/2020, 1:52 PM
    Has anyone tried using Prefect with an AWS ApplicationLoadBalancer? It keeps failing a healthtest check, regardless of what path I give it. The EC2 instance it’s pointing at is accessible and responds with a 200 to http://IP:8080 but I keep getting a failed test on it. Not saying it’s a Prefect problem, but I’m curious if anyone has hit this?
    s
    g
    13 replies · 3 participants
  • b

    Ben Fu

    08/07/2020, 2:52 PM
    Hello, when moving from 0.12.x to 0.13.x, what is the proper way to migrate the existing flows in the database? I cannot run the Alembic migrations since the old migrations don't exist in the new server repo
    n
    2 replies · 2 participants
  • a

    alex

    08/07/2020, 3:38 PM
    Hello, I have a bunch of tasks called sequentially in my flow. If any of them fails, I want to execute a failure_recovery task. What would the most concise way of expressing this be? Also sometimes I may want to execute failure_recovery task directly (this is specified as a param)
    n
    5 replies · 2 participants
  • r

    Riley Hun

    08/07/2020, 4:01 PM
    Hello everyone, I just have a quick question (hopefully). But I just took some time to finish reading the prefect core documentation from beginning to end, and there are still a few concepts I'm struggling to understand. The first thing I don't quite get is how
    set_upstream
    and
    set_downstream
    work? Here is an ETL flow I created:
    with Flow("Thinknum ETL") as flow:
    
        token = token_task(
            version=version,
            client_id=client_id,
            client_secret=client_secret
        )
    
        history = history_task(
            dataset_id=dataset_id,
            version=version,
            token=token
        )
    
        loaded_dates = loaded_dates_task(
            bucket_name=bucket_name,
            dataset_id=dataset_id
        )
    
        dates_to_load = get_dates_to_load(
            already_loaded_dates=loaded_dates,
            history=history
        )
    
        datasets_to_load = load_to_gcs_task.map(
            dataset_id=unmapped(dataset_id),
            date=dates_to_load,
            version=unmapped(version),
            token=unmapped(token),
            bucket_name=unmapped(bucket_name)
        )
    It DOES seem to work fine, but I don't know when or if I should be applying `set_upstream`/`set_downstream`
    i
    3 replies · 2 participants
  • r

    Riley Hun

    08/07/2020, 6:36 PM
    Another quick question, but I've decided to split my prefect tasks into multiple python files to make it a lot more organized and modularized. When I submit the flow to the Dask executor, I get a
    ModuleNotFoundError
    . Do I need to dockerize the flow and specify python paths for this to work? Thanks in advance! My folder tree looks something like this
    ├── alternative_data_pipelines
    │   ├── thinknum
    |.      ├── __init__.py
    │   │   └── thinknum.py
    │   └── utils
    │       ├── __init__.py
    │       ├── logging.py
    │       ├── snowflake.py
    │       └── utils.py
    ├── requirements.txt
    ├── setup.py
    └── thinknum_flow.py
    d
    b
    +2
    18 replies · 5 participants
  • j

    Jeremiah

    08/07/2020, 8:05 PM
    Sorry for the late notice but Laura is giving a live stream RIGHT NOW all about the new Server and UI:

    https://www.youtube.com/watch?v=89W-jMVH410▾

    📺 3
    :upvote: 3
    l
    b
    2 replies · 3 participants
  • a

    alex

    08/07/2020, 9:17 PM
    Hello everyone, I would really appreciate some advice on how to structure a flow that applies various different tasks to each item in a list. Here is a snippet
    feeds = get_feeds()
    
    with Flow(..):
       for feed in feeds:
         run_feed = GetDataTask(name=f"GetData: {feed.feed_name}")(feed)
         latest = run_feed
    
         if feed.do_transform():
            transform_feed = TransformTask()(feed, upstream_tasks=[latest]))
            latest = transform_feed
         
         # ... some more optional logic here ...
    
         failure_recovery = FailureRecoveryTask(trigger=any_failed)(feed, upstream_tasks=[latest])  # should handle any failure in any of above tasks
    
        
       mapped = feeds.map(
                    all_feeds,
                    target=unmapped("aggregated"),
                )
       mapped.set_upstream([failure_recovery])
    This structure isn't giving me the dag i'm looking for and I was wondering if anyone could give any advice on the most "prefect" way to do this. Some questions I had: • Should I initialize tasks before the flow, as the docs advise or is this structure ok? • Is the if and
    latest=
    logic advisable? Or should I use run and skip option transformations and set a "skipped" state? • How should I specify the aggregation task? Right now, the map task seems to only have a dependency on the last feed's
    failure_recovery_task
    .
    j
    2 replies · 2 participants
Powered by Linen
Title
a

alex

08/07/2020, 9:17 PM
Hello everyone, I would really appreciate some advice on how to structure a flow that applies various different tasks to each item in a list. Here is a snippet
feeds = get_feeds()

with Flow(..):
   for feed in feeds:
     run_feed = GetDataTask(name=f"GetData: {feed.feed_name}")(feed)
     latest = run_feed

     if feed.do_transform():
        transform_feed = TransformTask()(feed, upstream_tasks=[latest]))
        latest = transform_feed
     
     # ... some more optional logic here ...

     failure_recovery = FailureRecoveryTask(trigger=any_failed)(feed, upstream_tasks=[latest])  # should handle any failure in any of above tasks

    
   mapped = feeds.map(
                all_feeds,
                target=unmapped("aggregated"),
            )
   mapped.set_upstream([failure_recovery])
This structure isn't giving me the dag i'm looking for and I was wondering if anyone could give any advice on the most "prefect" way to do this. Some questions I had: • Should I initialize tasks before the flow, as the docs advise or is this structure ok? • Is the if and
latest=
logic advisable? Or should I use run and skip option transformations and set a "skipped" state? • How should I specify the aggregation task? Right now, the map task seems to only have a dependency on the last feed's
failure_recovery_task
.
j

Jeremiah

08/08/2020, 1:29 AM
Hi Alex, nothing jumps out to me as odd about this structure - it’s helpful to think of your flow generation script as pure Python and not worry too much about Prefect semantics if you don’t have to. Things like where you initialize don’t really matter (we do it earlier in our docs just because the double call of
Task()(args)
looks weird to folks new to Prefect). There’s also nothing wrong with your
latest
logic, though the question of whether to use
latest
in your flow or a
skipped
state is one of compile-time vs run-time semantics. I think you could do either (though I may be misunderstanding your use case) - and it’s more a preference of where you want the control flow to live. If possible, I recommend keeping it as-is because the Python script will be more clear. Last, for the aggregation task, your mapped task only depends on the last task because
mapped.set_upstream([failure_recovery])
is only called once on the last
failure_recovery
task. To have it depend on all of them, collect all the failure recovery tasks into a list and call
mapped.set_upstream(list_of_failure_recoveries)
a

alex

08/10/2020, 5:39 PM
Thanks for your insights !
👍 1
View count: 1