https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • a

    Adam Everington

    10/20/2021, 8:35 AM
    hey guys, hopefully this is an easy / noob level resolution that's required: I want to get the count of records returned in a dataframe from a task to pass through to another task like so:
    with Flow('test_flow',executor=LocalExecutor()) as flow:
    df = sometask_that_returns_a_df()
    count_of_records_in_df = len(df.index)
    execute_task_2(count_of_records_in_df)
    Obvs compilation fails because the return type of sometask is a functiontask
    a
    k
    • 3
    • 6
  • l

    Laurens

    10/20/2021, 10:12 AM
    Hi all. We are looking into whether Prefect meets our scheduling needs. We mainly want to use it to call spark-submits. We want this to be called in parallel, which is possible in Prefect with Dask for example. Suggest we do a spark submit and it takes 15 minutes. These 15 minutes therefore take place outside Prefect. In this case, will Prefect wait for this task to be completed or does it continue with other tasks? In other words, will this task use the thread for 15 minutes.
    a
    k
    • 3
    • 3
  • r

    Raúl Mansilla

    10/20/2021, 11:43 AM
    Hello Team! Im facing some new errors on actual production flows that I´ve never seen, we have had to reboot the server this morning and now we see in two flows thesame error….
    a
    k
    • 3
    • 27
  • g

    Gabi Pi

    10/20/2021, 12:11 PM
    Hey everyone, Could you advice please how should I provide aws credentials to
    KubernetesRun
    ? I tried to do the following:
    run_config=KubernetesRun(
                    env={
                        "AWS_ACCESS_KEY_ID": AWS_ACCESS_KEY_ID,
                        "AWS_SECRET_ACCESS_KEY": AWS_SECRET_ACCESS_KEY
                    })
    but when I run the flow, I get an error saying:
    Error downloading Flow from S3: An error occurred (InvalidAccessKeyId) when calling the GetObject operation: The AWS Access Key Id you provided does not exist in our records.
    Any ideas?
    a
    k
    c
    • 4
    • 17
  • j

    John Marx

    10/20/2021, 12:46 PM
    I tried to sign up today and even though I got the e-mail, I get a 404 error when I click the link. Is there another way to sign in?
    ✅ 1
    a
    n
    • 3
    • 7
  • d

    Daniel Manson

    10/20/2021, 1:31 PM
    hi, I've found the conditional tasks to be rather confusing. In particular I was hoping that if you have something like this...
    a = task_a()
    test_value = task_get_test_value()
    with case(test_value, 'test-against-this'):
      b = task_b()
      b.set_upstream(a)
    ...then task
    b
    would fail if
    a
    fails regardless of whether the
    case
    block is in the skipped or non skipped state. However that doesn't seem to be the way it works - rather if the
    case
    block skips, then it 'hides' whether or not
    a
    succeeded.
    k
    • 2
    • 18
  • t

    Tim Enders

    10/20/2021, 1:39 PM
    I am trying to run a task after an apply_map, but I am unsure how to link it up. It keeps running first with no upstream tasks.
    with Flow(
        "AWS CUR import",
        result=GCSResult(bucket="platformsh-vendor-prefect"),
        state_handlers=[flow_failure],
    ) as flow:
        # save_file = prefect.Parameter("Download Files", default=False)
        num_periods = prefect.Parameter("num_periods", default=4)
        periods = generate_periods(num_periods)
        apply_map(run_or_bail, periods)
        clear_files()
    k
    • 2
    • 3
  • k

    Kyle McChesney

    10/20/2021, 3:14 PM
    I am experiencing an interesting issue with
    upstream_tasks
    , I have the following example flow:
    with Flow(
        'example',
        executor=LocalExecutor(),
    ) as flow:
        data_csv_url = Parameter('data_csv_url')
        output_url = Parameter('output_url')
    
        data = generate_data(data_csv_url)
        data_with_stuff = generate_stuff.map(data)
        data_with_stuff_written = write_data(data_with_stuff, output_url)
    
        data_with_other_stuff = generate_other_stuff.map(
            data_with_stuff,
            upstream_tasks=(data_with_stuff_written,),
        )
    I read a csv file and generate some “data” objects (a list of them) for the given flow run. I think map them across another task, which mutates the data items to add things to them. I want to then write the state of the objects to a file for tracking and I want to do this BEFORE I make further modifications in`generate_other_stuff`. With this flow def,
    generate_other_stuff
    fails with out any exception and does not seem to even run the task
  • k

    Kyle McChesney

    10/20/2021, 3:15 PM
    ex
    └── 09:09:26 | INFO    | Task 'generate_data[0]': Starting task run...
    └── 09:09:26 | INFO    | Task 'generate_data[0]': Finished task run for task with final state: 'Success'
    └── 09:09:26 | INFO    | Task 'write_data': Starting task run...
    └── 09:09:26 | INFO    | Task 'write_data': Finished task run for task with final state: 'Success'
    └── 09:09:27 | INFO    | Task 'generate_other_stuff': Starting task run...
    └── 09:09:27 | INFO    | Task 'generate_other_stuff': Finished task run for task with final state: 'Failed'
    k
    m
    • 3
    • 9
  • m

    Marwan Sarieddine

    10/20/2021, 4:12 PM
    Hi folks, is there a way to make a manual_only task get trigger failed and not pause the flow run in case there is an upstream task run failure ?
    k
    • 2
    • 4
  • m

    Matthew Seligson

    10/20/2021, 5:04 PM
    Are Prefect artifacts going to become a fully fledged feature in the near term? I see they are currently in an experimental phase.
    k
    m
    • 3
    • 2
  • l

    Luis Gallegos

    10/20/2021, 6:39 PM
    Hi Prefect Team, What version of Nginx does the last Prefect core 0.15.6 use?? I'm asking cause the prefect core 0.14.8 uses Ngix 1.18.0 (with docker) which has a vulnerability that is patched in Nginx 1.20.1.
    k
    • 2
    • 2
  • a

    Aqib Fayyaz

    10/20/2021, 8:36 PM
    Hi community, i am trying to deploy prefect on google kubernetes and i am unable to do it. i am using the command mentioned in documents prefect agent kubernetes install -k API_KEY | kubectl apply --namespace=default -f - and when i see the pods using kubectl get pods there in status of prefect agent it says CrashLoopBackOff. When i log the pod i see the following error prefect.exceptions.AuthorizationError: [{'path': ['auth_info'], 'message': 'AuthenticationError: Forbidden', 'extensions': {'code': 'UNAUTHENTICATED'}}]
    k
    • 2
    • 21
  • t

    Thomas Furmston

    10/20/2021, 8:52 PM
    Hi, I have been trying to get a prefect agent running on kubernetes, starting with locally on minikube.
    k
    • 2
    • 7
  • s

    Slackbot

    10/20/2021, 8:55 PM
    This message was deleted.
    k
    t
    • 3
    • 6
  • j

    Jacolon Walker

    10/20/2021, 9:48 PM
    Hey there. We have been seeing this error during some flow runs
    No heartbeat detected from the remote task; marking the run as failed.
    . Any info on what is possibly causing this?
    k
    z
    m
    • 4
    • 16
  • w

    William Grim

    10/21/2021, 5:14 AM
    Hello everyone! In a production environment that we have, we do not have access to the prefect UI and need to use the
    prefect
    CLI tool to do things like check which flows are running and see their logs. If a flow is running and I want to cancel it, how could I do this? There doesn't seem to be a command, and I'm assuming I need to do it with graphql. Is there an example of how to do this anyway? I appreciate it in advance.
    k
    • 2
    • 11
  • a

    AJ

    10/21/2021, 6:39 AM
    hi all, Does prefect have integration with talend? thanks.
    k
    a
    • 3
    • 6
  • p

    Piyush Bassi

    10/21/2021, 6:53 AM
    #prefect-community Hello Everyone, I would like to know if Prefect is compatible with Windows Containers
    k
    a
    • 3
    • 4
  • m

    Mathias Lanng

    10/21/2021, 6:54 AM
    Hello there! I am currently getting much of our codebase to work with prefect. In that regard I am doing some flows, where I use the Mapping feature for processing data. I am currently running into a problem, where I want to output multiple values from a mapped task, but I cannot seem to get it working. I have tried specifying both output annotation and nout=2, but it does not seem to register. It returns a TypeError exception saying that the task is not iterable. I have another task which is not a mapped task where this works fine. Is there anyone that have tried this before and can help me get it working? Thanks in advance!
    k
    • 2
    • 4
  • m

    Michael Hadorn

    10/21/2021, 7:07 AM
    Hi there I'm still struggling with the concept of pickle based storage -> we use s3 as storage and docker run. I read the docs here: https://docs.prefect.io/orchestration/flow_config/storage.html#pickle-based-storage). What exactly is persisted in pickled file? • For sure it's the flow-object • but also global variables are frozen (e.g. datetime.now() will be never refreshed in the run, even it's called outside of the flow-context) • but every piece of code in a task will be executed So if I build some objects while the flow build / register, this objects are frozen. Right? But if I change some methods for these objects, rebuild the image and rerun the flow based on the "previous" flow-definition the new method will be called (but not if I change the init-method). Is there any explanation which parts are dynamic and which part are frozen. Thanks for any help!
    e
    a
    • 3
    • 11
  • j

    Johan Wåhlin

    10/21/2021, 7:11 AM
    Hi! Im trying to use The Great Expectations task in our Prefect workflow running on prefect server in kubernetes. I'm struggling with providing a data context. Since the ge.data_context.DataContext object can not be serialized with cloudpickle, I cannot initialize it in one task and pass it on to the RunGreatExpectationsValidation task. Any one knowing of a solution to pass such an object between tasks?
    a
    k
    • 3
    • 6
  • a

    Akarshan Arora

    10/21/2021, 9:34 AM
    Hi! I am trying to run prefect orion, but I am getting this error Can you please help? I have tried it on WSL, Windows10, linux. I am getting the same error on all three platforms. $prefect orion start $prefect version 2.0a3
    a
    j
    +4
    • 7
    • 16
  • a

    Aqib Fayyaz

    10/21/2021, 9:59 AM
    Hi, i have been trying to deploy prefect on Google kubernetes, I have generated manifest file using command prefect agent kubernetes install --rbac -k my_key so when i try to deploy using skafold file build fails every time with this error Starting deploy... - deployment.apps/prefect-agent created - Error from server (Forbidden): error when creating "STDIN": roles.rbac.authorization.k8s.io is forbidden: User "468642734279@cloudbuild.gserviceaccount.com" cannot create resource "roles" in API group "rbac.authorization.k8s.io" in the namespace "default": requires one of ["container.roles.create"] permission(s). - Error from server (Forbidden): error when creating "STDIN": rolebindings.rbac.authorization.k8s.io is forbidden: User "468642734279@cloudbuild.gserviceaccount.com" cannot create resource "rolebindings" in API group "rbac.authorization.k8s.io" in the namespace "default": requires one of ["container.roleBindings.create"] permission(s). kubectl apply: exit status 1
    a
    k
    • 3
    • 15
  • a

    Amine Dirhoussi

    10/21/2021, 10:10 AM
    Hello everybody! I am actually trying to test a PoC flow using Docker storage and DaskExecutor. I deployed a local prefect server and a separate dask cluster . Runing the flow directly works well but when registering and running using a
    prefect agent docker
    i am having a permission error : Thanks for your help
    Untitled.py
    a
    • 2
    • 14
  • a

    Andreas Tsangarides

    10/21/2021, 10:32 AM
    Hi all. Trying to do a backfill on one of my flows. The flow takes in a DateTime parameter and all subsequent tasks use that to crawl/transform data.. so I do something like:
    for target_date in target_dates:
        f.run(parameters={"target_date": target_date}, run_on_schedule=False)
    I am running the above using a
    LocalDaskExecutor(scheduler="processes", num_workers=12)
    locally. The problem is that run time grows...
    Took 15.72 sec for 2016-05-08
    Took 20.59 sec for 2016-05-09
    Took 32.52 sec for 2016-05-10
    Took 43.89 sec for 2016-05-11
    Took 55.81 sec for 2016-05-12
    Took 67.15 sec for 2016-05-13
    Took 77.92 sec for 2016-05-14
    Took 90.59 sec for 2016-05-15
    Took 105.21 sec for 2016-05-16
    Took 115.50 sec for 2016-05-17
    Would that be a dask issue? Any suggestions for maybe running the backfill locally in a different way welcome! (There is no back aggregation involved, so no reason for later runs to take longer. Volume/size of data is almost identical for each run. So when a backfill completes and I start another one fora different set of dates the time profile is similar: very fast -> slower)
    a
    k
    • 3
    • 14
  • m

    Martim Lobao

    10/21/2021, 10:44 AM
    If a task in a flow failed but the flow hasn't failed yet (because there are other branched tasks in the flow run that are still running), how can I restart the failed task without having to wait for the other tasks to finish?
    a
    k
    • 3
    • 17
  • d

    Daniil Ponizov

    10/21/2021, 1:39 PM
    hello! Am I able to attach cloud hook to all flows in project? or group flows to one cloud hook somehow
    a
    k
    j
    • 4
    • 28
  • l

    Luiz Felipe

    10/21/2021, 3:12 PM
    Hello! Is it possible to set a template name (using target) for a task with multiple return values? I'm doing something like this:
    @task(checkpoint=True, target="{task_name}.pkl", nout=4)
    def split_train_test(dataset):
        array = dataset.values
        X = array[:,0:4]
        y = array[:,4]
        X_train, X_validation, Y_train, Y_validation = train_test_split(X, y, test_size=0.20, random_state=1)
        return X_train, X_validation, Y_train, Y_validation
    Using LocalResult on my flow, I noticed that I've saved an
    split_train_test.pkl
    , which was what I expected, but also 4 files with prefect default names (
    prefect-result-...
    ). Is it possible to define a template for these files too? Or just not save them separately? This is important because, with these names, the cache will never work for them (as it have the current timestamp)
    m
    • 2
    • 11
  • c

    Constantino Schillebeeckx

    10/21/2021, 4:49 PM
    As part of our CICD we have a script that registers flows for us; I've noticed that our script ends up generating new versions of flows even though nothing changed about the flow. So, in order to debug this, I wrote some debugging statements during CICD:
    if not args.dry_run:
            project = args.project
            logger.log(SUCCESS, f"Registering flow '{flow.name}' to project '{project}'.")
            logger.debug(f"Serialized flow: {json.dumps(flow.serialize(), indent=4, sort_keys=True)}")
            logger.debug(f"Hashed flow: {flow.serialized_hash()}")
            flow.register(
                project_name=project,
                idempotency_key=flow.serialized_hash(),
                set_schedule_active=set_schedule_active,
            )
    I noticed that on back to back execution of the above script, for the same flow, to the same project, different hashes were being created. When I look at the
    serialized_hash
    of those flows I see no difference. To confirm the serialized flows are the same, I've saved the logging output to two json files (see screenshot). The only difference I can spot is that is use
    indent=4
    in my json.dumps however the codebase does not.
    k
    d
    • 3
    • 9
Powered by Linen
Title
c

Constantino Schillebeeckx

10/21/2021, 4:49 PM
As part of our CICD we have a script that registers flows for us; I've noticed that our script ends up generating new versions of flows even though nothing changed about the flow. So, in order to debug this, I wrote some debugging statements during CICD:
if not args.dry_run:
        project = args.project
        logger.log(SUCCESS, f"Registering flow '{flow.name}' to project '{project}'.")
        logger.debug(f"Serialized flow: {json.dumps(flow.serialize(), indent=4, sort_keys=True)}")
        logger.debug(f"Hashed flow: {flow.serialized_hash()}")
        flow.register(
            project_name=project,
            idempotency_key=flow.serialized_hash(),
            set_schedule_active=set_schedule_active,
        )
I noticed that on back to back execution of the above script, for the same flow, to the same project, different hashes were being created. When I look at the
serialized_hash
of those flows I see no difference. To confirm the serialized flows are the same, I've saved the logging output to two json files (see screenshot). The only difference I can spot is that is use
indent=4
in my json.dumps however the codebase does not.
k

Kevin Kho

10/21/2021, 4:56 PM
Hey, will test this myself and see if the version bumps up for me
Can I see how you define your schedule? And do you have any date or time based parameters?
My version is not bumping up when I try this
c

Constantino Schillebeeckx

10/21/2021, 5:06 PM
This is how I call the code snippet shown above:
freezer = freeze_time("2021-01-14 22:00:00")
freezer.start()
for flow_file, flow in flows.items():
  register_flow(flow, flow_file, args)
freezer.stop()
I do this to ensure any calls to
now()
evaluate to the same thing. I think this addresses your question about any dates or times
Schedule is being set like
flow.schedule = CronSchedule(("00 10 * * *", start_date=pendulum.datetime(2021, 1, 1, tz=tz))
k

Kevin Kho

10/21/2021, 6:27 PM
This looks right to me.
I think I might need a minimum example for this one to debug further
c

Constantino Schillebeeckx

10/21/2021, 6:36 PM
yeah seems reasonable; lemme put a pin in this and remove adjust some of my debugging statements. I'll report back with what I find
d

davzucky

10/21/2021, 9:16 PM
Do you have in your flow any parameter with a default value that could change Everytime you register? Like datetime.now
View count: 2