https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • t

    Tom Shaffner

    11/22/2021, 4:23 PM
    Hey All, I'm setting up a new instance of Prefect and I seem to be misunderstanding something about flows. I've got a bunch of methods with @task decorators and then I set them up in what should be a single a flow. When I load this into Prefect though I get two split DAGs that seem to function independently. Flow setup here and picture of the result attached.
    with Flow(flow_name) as flow:
        <http://logger.info|logger.info>(f"{flow_name} Task Flow initiated, running in {file_path}")
        df = pull_oracle_data_via(oracle_query_sql=oracle_query_sql_path,prod=use_oracle_prod)
        df = set_data_types(df)
        upload_to_table(df, destination_table = data_destination_table_name)
    
        if summary_view_name is not None and history_table_name is not None:      
            <http://logger.info|logger.info>("Initiating history upload process.")
            summary_df,summary_data_empty = pull_summary_data_via(sql=f"SELECT * FROM {summary_view_name}")
            if summary_data_empty:
                delete_today_from_history_if_exists(df=df,history_table=history_table_name)
                upload_to_history_table(df=summary_df, destination_table=history_table_name, append=True)
        else:
            <http://logger.info|logger.info>("Skipping summary view run: summary view name and/or history table name missing.")
    To address this I tried to make the dependencies explicit, by adding "upstream_task" flags to two of the above lines as so:
    summary_df,summary_data_empty = pull_summary_data_via(_upstream_tasks_=[upload_to_table],_sql_=_f_"SELECT * FROM {summary_view_name}")
    delete_today_from_history_if_exists(_upstream_tasks_=[pull_summary_data_via],_df_=df,_history_table_=history_table_name)
    This doesn't seem to fix the issue though; when I run the flow, later tasks still seem to initiate before the Oracle pull, which should occur before everything. Anyone see what I'm doing wrong? The documentation would seem to indicate that feeding result data from one task to another would make dependencies work correctly, but that doesn't seem to be happening here.
    k
    19 replies · 2 participants
  • m

    Marko Herkaliuk

    11/22/2021, 5:36 PM
    hi, maybe I’m tired today, but I can’t figure out if I can set / change the context variable inside of the Flow block.
    k
    a
    6 replies · 3 participants
  • d

    David Yang

    11/22/2021, 8:18 PM
    Hi, anyone has tried to use shelltask to run powershell scripts?
    k
    a
    24 replies · 3 participants
  • j

    Jason Motley

    11/22/2021, 8:19 PM
    Does prefect have a mobile app?
    a
    k
    3 replies · 3 participants
  • j

    Jason Motley

    11/22/2021, 8:41 PM
    Is there a good way to read in custom SQL, then delete 14 days of data from the target table, then deposit that custom SQL for the last 22 days into the target table, ensuring no duplicate records? Something like
    extract => transform => delete 14 days in target => append last 22 days w/ no dupes
    . Right now I"m doing 2 parallel ETLs, one which ends in replacing the existing data and the other which appends. This seems very slow since I have to perform 2 full extracts.
    k
    67 replies · 2 participants
  • w

    Wieger Opmeer

    11/22/2021, 10:47 PM
    I'm doing some concurrency testing using a simple workflow with a mapped task that starts (dummy) AWS Batch jobs. Running locally (flow.run()) I manage to run into the AWS API limits. When I run the same flow via Prefect Cloud and an agent the bottleneck seems to be somewhere else because I don't see more than about 100 tasks running concurrently. In both cases I use a LocalDaskExecutor with 500 threads and the EC2 instance running the flow/agent is not at all busy (only waiting for Batch jobs to finish after all). Any ideas what could cause the slowdown using Cloud?
    k
    t
    8 replies · 3 participants
  • k

    kwmiebach

    11/22/2021, 11:22 PM
    Hi, I pip installed orion and tried to start the server:
    prefect orion start
    - some error messages appear, including sqlite
    no such table 'flow_run'
    - is there some db initialisation procedure that I missed?
    k
    5 replies · 2 participants
  • k

    kwmiebach

    11/22/2021, 11:48 PM
    How can I open the 4200 port to all interfaces? It runs on a server an not on my laptop
    k
    12 replies · 2 participants
  • c

    Constantino Schillebeeckx

    11/23/2021, 12:30 AM
    I'm having several issues setting up a logger for my shared code; I've tried to follow the docs. 1. I've set the ENV
    PREFECT__LOGGING__LEVEL="DEBUG"
    within the run_config (ECSRunner) and I've confirmed that this env is set as such in the running container. However, within my running flow, when I check
    os.environ
    its set to INFO 2. I setup a logger in my shared code that I later import like a library:
    logger = logging.getLogger("dwh.utils.main")
    . I've set
    PREFECT__CLOUD__SEND_FLOW_RUN_LOGS='true'
    and
    PREFECT__LOGGING__EXTRA_LOGGERS="['dwh']"
    . when I execute a flow that uses that shared code, I can't see it emit logs in CloudWatch or in Prefect cloud, however when I run the flow locally I do see the logging statement what am I missing?
    k
    a
    +1
    61 replies · 4 participants
  • c

    Chris L.

    11/23/2021, 4:10 AM
    Hello! An Orion question here. At the moment, I’m uploading wrangled Pandas DataFrames as a parquet file to S3 as it’s own task. I would like to refactor away these tasks and use S3Result with PandasSerializer instead. My question: will the Results API in its current v0.15 form (ie as separate classes to be instantiated, given an appropriate serialiser and passed into the task decorator as a keyword argument) follow through into Orion?
    k
    1 reply · 2 participants
  • m

    Matt Clapp

    11/23/2021, 4:45 AM
    Hi everyone, first post here and just coming up to speed on Prefect. So far it's a joy to use! My question is about running an agent on AWS, hopefully something like ECS. Specifically I'd like to have a durable setup/flow that stays on AWS, and totally cut my local computer out of the loop for everything. I see a lot of good tutorials that involve initiating a flow with a local computer that runs on AWS using ECSRun. (like https://towardsdatascience.com/serverless-data-pipelines-made-easy-with-prefect-and-aws-ecs-fargate-7e25bacb450c and https://www.lejimmy.com/distributed-data-pipelines-with-aws-ecs-fargate-and-prefect-cloud/) Tantalizingly, the second post by Jimmy Le has one mention that sounds close to my use case: "Ultimately we'll probably want to run many ECSAgents in a small Elastic Compute Cloud (EC2) with
    supervisord
    monitoring their health." What I'd like and what my question is: Is there documentation on just setting up a Docker image and having that be self-contained container running a flow on AWS, without needing to kick it off from a local computer? I'd be happy to use Prefect Cloud for UI. Is this a valid use case? Is there some reason there's not much documentation for it? (Or do I just not understand something basic?) thanks so much for any help.
    k
    a
    7 replies · 3 participants
  • g

    Gaylord Cherencey

    11/23/2021, 6:48 AM
    Hello, I want to use SnowflakeQuery Task against Snowflake configure with Okta SSO and MFA enable. Based on the documentation on Snowflake connector documentation I would have to pass
    authenticator
    to the connect method (which doesn't seams to be possible at the moment in the task). Is my assumption correct or is there a magic env variable I can use? If not is it a change I can request or implement my self in the repository?
    a
    k
    7 replies · 3 participants
  • d

    dammy arinde

    11/23/2021, 2:14 PM
    Hello, please I am having some issues getting my prefect flow to run. I am trying to create a parent flow to run other flows based on a conditional value that will be derived from the file dropped in s3 and the column it matches in a snowflake table. I created a task that extracts the column value we are looking for from the snowflake table, then used if statements to assign the flow to run based on the value extracted from the table. I then return the flow to run, so I can pass it as the flow name in the startflowrun function. it seems I'm unable to pass the flow to run as it always returns not defined. also when I tried to just test out the startflowrun with an existing flow, it returns an error that the flow is not found. Please is there a way to get this job name and pass it as the job name for startflowrun to run?
    a
    k
    23 replies · 3 participants
  • j

    Jason Motley

    11/23/2021, 4:32 PM
    Can prefect be used to extract from a database and do an SFTP upload? Are there examples of this?
    a
    1 reply · 2 participants
  • m

    Maurits de Ruiter

    11/23/2021, 4:45 PM
    Hi everyone, we just updated from 0.15.4 to 0.15.9 and suddenly our flows are failing on a specific line of code. Every instance of
    if status in ['SUCCEEDED', 'FAILED']:
    fails with the following error:
    TypeError: 'sequence' not supported between instances of 'str' and 'tuple'
    If we log the type of string and array, it returns their types correctly.
    j
    7 replies · 2 participants
  • c

    Côme Arvis

    11/23/2021, 5:50 PM
    Hello ! 👋 Just a quick question, I have a task
    B
    which depends on a undefined size list of other tasks
    [A1, A2, A3, ...]
    (Prefect therefore creates an implicit
    List
    task under the hood). The thing is, some tasks in the list
    [A1, A2, A3, ...]
    can be skip at runtime, but I still want
    B
    to be executed. I currently can’t achieve this, even if
    skip_on_upstream_skip=False
    is specified for
    B
    , since the implicit
    List
    task is skip without being able to do anything (I receive
    None
    , and not a list of optional elements). Is there a way to do it? Thanks!
    a
    11 replies · 2 participants
  • w

    Wesam Manassra

    11/23/2021, 6:29 PM
    Hi there, I am running into a weird issue where if I provide a
    dockerfile
    to the
    prefect.environments.storage.Docker
    class, I get an error that looks like this:
    shutil.Error "[Errno 63] File name too long: ['<Endless recursive path>']
    a
    23 replies · 2 participants
  • m

    Marwan Sarieddine

    11/23/2021, 6:29 PM
    Hey Folks, question about SLAs - is it possible to trigger a flow run in case a certain flow run fails ?
    a
    m
    19 replies · 3 participants
  • k

    Kevin

    11/23/2021, 8:28 PM
    When creating re-usable flows, how do you name them uniquely? For example, if I have a flow that connects to an SFTP, Downloads those files, and then writes them to cloud storage based off passed parameters - how does the flow name align with the parameters being passed?
    a
    6 replies · 2 participants
  • d

    Dotan Asselmann

    11/24/2021, 10:29 AM
    Hi I encountered interesting issue when running the same flow using local agent vs a k8s agent, it seems like the same code takes longer to run on local agent compared to the k8s job [all on the same cluster, machine type, and resource requirements]. is there anything on local agent configuration or run config that can explain the difference? it runs x4 slower
    a
    m
    15 replies · 3 participants
  • h

    haf

    11/24/2021, 10:49 AM
    I've managed to silence the logger: I can't see the stacktrace when a task crashes. How do I make Prefect print the stacktrace locally?
    @task(
        nout=2,
        max_retries=10,
        retry_delay=timedelta(seconds=1),
    )
    def fetch_model_settings(
        dsn_params: DSNParams,
        app_id: UUID,
        default_model_group_id: UUID,
    ) -> ModelSettings:
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(
            f"Fetching model settings for app_id={app_id}, default_model_group_id={default_model_group_id}"
        )
        raise ValueError("WTF where are the logs")
    
    # before flow:
    prefect.config.logging.level = "DEBUG"
    In the sample above I never get to see the WTF in the output of running the flow in the console/locally (
    PREFECT__LOGGING__LEVEL=DEBUG python flows/flow.py
    )
    a
    3 replies · 2 participants
  • m

    Manuel Gomes

    11/24/2021, 11:49 AM
    In one of my flows, I have a task to upload a video to s3 (boto3 s3 resource
    upload_file()
    call, because big&binary). Since it's synchronous, I can trust the file will be where desired when task succeeds. Next task in this flow is to transcode this video. I do so by creating a
    mediaconvert
    boto3
    client, and sending a mess of json to its
    create_job(**args)
    method. This returns me... the job. Now from what I've read... I should be able to use a prefect built-in
    prefect.tasks.aws.client_waiter.AWSClientWait
    to wait for said job to finish (which is fine, at this point the workflow is serial/synchronous). Problem is... even when the job reports success (in the console, even!), it takes a while (minutes?!) for the transcoded movie to be present in the target bucket. I would then... need to enter another wait task until I could find the file in the bucket's list of objects, possibly through prefect.tasks.aws.s3.S3List? until I could proceed to do further things to this transcoded video? This conjunction sounds all too common not to have an integrated solution, unless I'm being dense (hah! no news there!) and not spotting an obvious solution. Any guidance?
    a
    5 replies · 2 participants
  • m

    Marko Herkaliuk

    11/24/2021, 3:00 PM
    Hello! Q about auth in Cloud. If I register by invitation via Google, can I set new password and login by email/pass?
    a
    j
    7 replies · 3 participants
  • e

    Emma Rizzi

    11/24/2021, 3:32 PM
    Hello! I'm considering to use Prefect to orchestrate VM as I work on a Cloud called WEkEO Dias and does not offer dynamic provisionning of instances. This typical workflow would be executed on a master VM : 1. start a VM 2. launch a flow run on this VM 3. shutdown the VM I thought about using the function
    create_flow_run
    to launch part 2 synchronized with steps 1 and 3. Considering I can configure some script to deploy the Agent on VM start, is it possible ? I'm concerned about the main flow run starting before the VM's agent exists If you have suggestions on better ways to implement this with Prefect I'm interested ! So far i only used basic ECS agent 🙂
    a
    6 replies · 2 participants
  • c

    chicago-joe

    11/24/2021, 3:41 PM
    Hey everyone, I had something scheduled via the Prefect CronClock and Scheduled (prefect v0.15.5), and noticed that turning off the schedule on the cloud UI, it still ran at the scheduled time. That's not what I expected to happen, has anyone else experienced this?
    a
    12 replies · 2 participants
  • m

    Michael Warnock

    11/24/2021, 4:23 PM
    Hello again! I'm back to helping our science guys use prefect. I went to team->members to invite someone, and it says we're out of accounts, and offers a link to https://cloud.prefect.io/team/account which is broken. I got to the page, though, and apparently we need an enterprise account for more users. I'll submit the form, but it would be great if we could get unblocked today
    ✅ 2
    a
    n
    6 replies · 3 participants
  • a

    André Petersen

    11/24/2021, 4:28 PM
    Hi there! I am new to prefect and looking whether it is the right tool for our needs in our Data Engineering team. Therefore, I wanted to first run the simple tutorial described here: https://cloud.prefect.io/tutorial/Universal-Deploy#user-content-universal-deploy I am working on Windows on WSL1. Everything worked fine (creating python environment with prefect, running the test flow locally, authentication with the key, creating the project), no error message, but I can not see the flow I defined in the UI within prefect cloud. What could be the reason for it?
    a
    m
    17 replies · 3 participants
  • r

    Ryan Brennan

    11/24/2021, 4:38 PM
    Hey Everyone - For those using DbtShellTask, is it expected to have to run
    dbt compile
    before executing
    dbt run
    ?
    a
    15 replies · 2 participants
  • a

    André Petersen

    11/24/2021, 4:51 PM
    Our data engineering team is considering the following setup: • snowflake as the database • s3 buckets as storage • prefect or airflow as orchestration tool For airflow AWS offers MWAA which basically creates the whole infrastructure (scheduler, UI through webserver, worker)needed for orchestration. For prefect I would think that we could use prefect cloud, which handles the ui/webserver aspect, but we would still need to setup a server running an agent, right? As we would like to not manage any server in order to keep the infrastructure as simple as possible: What would be the best option? Or did I misunderstand something? Thanks for advice in advance!
    a
    3 replies · 2 participants
  • s

    Slackbot

    11/24/2021, 5:22 PM
    This message was deleted.
    a
    j
    37 replies · 3 participants
Powered by Linen
Title
s

Slackbot

11/24/2021, 5:22 PM
This message was deleted.
a

Anna Geller

11/24/2021, 5:24 PM
Normally the edges are created for you behind the scenes. You shouldn’t have to worry about this, as long as you set dependencies either by passing data between tasks or setting explicit state dependencies. LMK if you need some examples.
j

Jason Motley

11/24/2021, 5:25 PM
Could you go into the explicit state dependencies a little more?
I'm looking for a way to make the flow diagram not so convoluted
a

Anna Geller

11/24/2021, 5:26 PM
Sure! Method 1:
with Flow(...) as flow:
    a = first_task()
    b = second_task()

    c = third_task(c_inputs, upstream_tasks=[a,b])
Method 2:
with Flow(...) as flow:
     a = first_task()
     b = second_task()

     c = third_task()
     c.set_upstream(b)
     c.set_upstream(a)
j

Jason Motley

11/24/2021, 5:27 PM
for method 1, would the flow diagram show A=> B => C?
Let me show my current example (simplified) and make sure I have it right
👍 1
extract_1 = task1(stuff)
    load_1(connection=conn, data=extract_1

    extract_2 = task2(stuff)
    transformed = task3(stuff)
    delete_stuff() # This works
    load_2(connection=stuconnff, data=transformed)

    rewrite as:
    extract_1 = task1(stuff)
    load_1(connection=conn, data=extract_1, upstream_tasks = [extract_1]
    
    extract_2 = task2(stuff, upstream_tasks = [extract_1, load_1])

    etc.
i.e. do I keep adding the tasks inside upstream_tasks = [a, b, c] as I go? or do I write it all at the end?
a

Anna Geller

11/24/2021, 5:31 PM
this would be a->b->c with method 1:
from prefect import task, Flow


@task
def first_task():
    pass

@task
def second_task():
    pass

@task
def third_task():
    pass

with Flow("ex") as flow:
    a = first_task()
    b = second_task(upstream_tasks=[a])
    c = third_task(upstream_tasks=[b])

flow.visualize()
j

Jason Motley

11/24/2021, 5:31 PM
ahh
so pass each task into upstream_tasks one at a time'
a

Anna Geller

11/24/2021, 5:33 PM
correct, if the sequential order is important
j

Jason Motley

11/24/2021, 5:33 PM
and is it the result of each task that goes in the
[]
?
E.x.
a

Anna Geller

11/24/2021, 5:33 PM
compare those two:
j

Jason Motley

11/24/2021, 5:34 PM
right, I want left
that made no sense sorry -- The one on the left is preferable.
a

Anna Geller

11/24/2021, 5:34 PM
it only depends on what dependencies must be met before the third task can run: if the order of task 1 and 2 doesn’t matter, then the picture on the right makes more sense
j

Jason Motley

11/24/2021, 5:34 PM
If I put first_task and second_task (or their results) into the upstream_tasks, that would give m the LEFT visual?
a

Anna Geller

11/24/2021, 5:35 PM
sure! and one more thing: I see in your inputs that the load task expects a database connection as input - this won’t work, since data that you pass between tasks must be serializable with pickle.
the left visual:
from prefect import task, Flow


@task
def first_task():
    pass

@task
def second_task():
    pass

@task
def third_task():
    pass

with Flow("ex") as flow:
    a = first_task()
    b = second_task(upstream_tasks=[a])
    c = third_task(upstream_tasks=[b])

flow.visualize()
j

Jason Motley

11/24/2021, 5:36 PM
Great, i'll check it out, thank you!
IS there a readme on the pickle error?
a

Anna Geller

11/24/2021, 5:36 PM
the right visual:
import pendulum
from prefect.schedules import Schedule
from prefect.schedules.clocks import CronClock
from prefect import task, Flow


@task
def first_task():
    pass

@task
def second_task():
    pass

@task
def third_task():
    pass

with Flow("ex") as flow:
    a = first_task()
    b = second_task(upstream_tasks=[a])
    c = third_task(upstream_tasks=[a,b])

flow.visualize()
if you want to pass a database connection between various tasks, you can use Resource manager, as discussed here: https://prefect-community.slack.com/archives/CL09KU1K7/p1637249773390000?thread_ts=1637249695.389900&amp;cid=CL09KU1K7 Regarding serialization, this can be helpful: https://docs.prefect.io/orchestration/flow_config/storage.html#pickle-vs-script-based-storage
j

Jason Motley

11/24/2021, 5:38 PM
Phenomenal, thank you!
🙌 1
@Anna Geller I got an error where adding the upstream_tasks to a flow seems to negate the reading of other features of the task such as the connections
Here is load in the task flow:
load(connection=connection_w, if_exists='append',  ## Load new table
    db_table='DataTable', 
    dataframe=new_transformed,
    upstream_tasks=[previous_task])
Error:
r("load() missing 4 required positional arguments: 'connection', 'if_exists', 'db_table', and 'dataframe'")
a

Anna Geller

11/24/2021, 6:55 PM
@Jason Motley within the
with Flow()
constructor you can only have tasks - are you sure that the load function is decorated with @task? It’s described here: https://docs.prefect.io/core/concepts/tasks.html
j

Jason Motley

11/24/2021, 6:57 PM
yeah it is a @task
Do you know why I would get a credentials error in production if it works using my local agent?
Failed to load and execute Flow's environment: NoCredentialsError('Unable to locate credentials')
a

Anna Geller

11/24/2021, 6:59 PM
I could say more by looking at the flow definition - can you share it?
j

Jason Motley

11/24/2021, 6:59 PM
i.e. the whole flow section?
a

Anna Geller

11/24/2021, 7:00 PM
ideally the entire flow file, yes. If you don’t wanna share publicly, sending it via DM works for me too
resolved via DM by moving the db connection into the load task, and passing secret as data dependency
import pandas as pd
from prefect import task, Flow
from prefect.tasks.secrets import PrefectSecret


@task
def load(df, connection_string, db_table="table", schema="schema"):
    engine = create_engine(connection_string)
    df.to_sql(db_table, schema=schema, con=engine, index=False)


@task
def get_df():
    return pd.DataFrame()


with Flow("ex") as flow:
    db_conn = PrefectSecret("DB_CONNECTION_STRING")
    df = get_df()
    load(df, db_conn)
View count: 2