https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • z

    Zach

    11/17/2020, 7:47 PM
    I am getting an error with Prefect Secrets in my flow. The traceback is to a line of Prefect code that was changed a few days ago. The traceback can be found in the thread below. This is what the task in the flow looks like:
    gcs_creds_secret = PrefectSecret("GCS_SP_CREDS")
    j
    j
    • 3
    • 4
  • z

    Zach

    11/17/2020, 7:48 PM
    Here is a link to the github code: https://github.com/PrefectHQ/prefect/blob/6c3920c57b097c43d26fbe598ce633fe3f0747df/src/prefect/tasks/secrets/base.py#L65
  • h

    Henry

    11/17/2020, 8:11 PM
    i'm receiving a
    GraphQLError: x-hasura-admin-secret/x-hasura-access-key required, but not found]
    with a setup using a hasura admin secret - i wasn'ta ble to find a env var for it in
    prefect_server
    https://github.com/PrefectHQ/server
  • h

    Henry

    11/17/2020, 8:12 PM
    any help would be much appreciated
    j
    • 2
    • 2
  • p

    Philip MacMenamin

    11/17/2020, 9:23 PM
    How do I ask a task what its terminal state is?
    j
    • 2
    • 11
  • p

    Patrick

    11/17/2020, 11:20 PM
    Hey all, we’re new to Prefect (though I did maintain an Airflow cluster for several years, in a previous job), and have some questions about what idioms to adopt. In particular, it appears (https://prefect-community.slack.com/archives/CL09KU1K7/p1602603712337700?thread_ts=1602244141.206600, https://github.com/PrefectHQ/prefect/issues/3368) that
    RunConfig
    is slowly taking the place of
    Environment
    as the primary abstraction for specifying execution environments, though it’s still marked experimental. Does anyone, especially longtime users and/or maintainers, have any general advice on which to use if just getting started? I’d like to be using Prefect for “production” work within a few weeks, so a somewhat stable API would be grand. OTOH, we don’t want to adopt conventions whose obsolescence is already planned. Also, we’re an AWS shop. Thanks in advance!
    j
    • 2
    • 3
  • r

    Riley Hun

    11/17/2020, 11:27 PM
    Hi everyone, Would anyone happen to know what this below error is referring to? I am running Prefect Server on a remote machine which is authenticated to use docker already and I can confirm that the image exists in GCR. It's able to pull the image successfully, but then later returns an
    ImageNotFound
    Error:
    [2020-11-17 23:17:14,473] INFO - agent | Pulling image <http://gcr.io/aa-mlops-dev-inm5/prefect-etl-storage:0.1.0|gcr.io/aa-mlops-dev-inm5/prefect-etl-storage:0.1.0>...
    /snap/google-cloud-sdk/160/lib/third_party/requests/__init__.py:83: RequestsDependencyWarning: Old version of crypt
    ography ([1, 2, 3]) may cause slowdown.
      warnings.warn(warning, RequestsDependencyWarning)
    [2020-11-17 23:17:16,498] INFO - agent | Successfully pulled image <http://gcr.io/aa-mlops-dev-inm5/prefect-etl-storage:0.1|gcr.io/aa-mlops-dev-inm5/prefect-etl-storage:0.1>
    .0...
    [2020-11-17 23:17:16,517] ERROR - agent | Logging platform error for flow run f42d17a1-fb46-4556-8253-3b10f5a482e8
    [2020-11-17 23:17:16,576] ERROR - agent | Error while deploying flow: ImageNotFound(HTTPError('404 Client Error: No
    t Found for url: <http+docker://localhost/v1.40/containers/create',>),)
    j
    c
    m
    • 4
    • 4
  • s

    Sam Kolli

    11/18/2020, 12:31 AM
    Hello all... am trying to separate building flow storage from the flow register command ... the instructions under "Pre-Build Storage" here do not seem to work without adding another call to add_flow method on the storage. This call had to be added before calling the build method on storage. Is that missing from the instructions on the web page or am I working with a different version than what the website shows the instructions for?
    👍 2
    c
    • 2
    • 1
  • a

    Aiden Price

    11/18/2020, 5:46 AM
    Hi again folks, I've deployed server with the new helm chart and everything works when I port-forward. Now I've set up some Kubernetes ingresses to expose the UI and GraphQL endpoints. The special case here is that I need to host these at a path on the domain like so
    <https://etl.mydomain.com/prefect>
    . But when I try to go to the dashboard it doesn't load, looking at Chrome's console I can see that the page is requesting all its files without my path part, e.g.
    <https://etl.mydomain.com>
    . I tried adding the env var
    PREFECT_SERVER__SERVER_UI_ENDPOINT
    as the full URL and path but it doesn't seem to have changed the behaviour. Does anyone have any advice? Thanks.
    m
    • 2
    • 4
  • m

    Michelle Wu

    11/18/2020, 8:34 AM
    Hey guys. I’ve been running into problems with
    EmailTask
    in
    state_handlers
    . I want to receive a email message that shows
    task
    and
    new_state
    when failures occur. My handler looks something like this:
    def email_on_failure(task, old_state, new_state):
        if new_state.is_failed():
             task = EmailTask(
                subject="Prefect alert: {}".format(new_state),
                msg = "Task: {}; New State: {}".format(task, new_state),
                email_from='xxx',
                email_to='xxx',
                smtp_server='smtp.gmail.com',
                smtp_port=xxx,
                smtp_type='STARTTLS'
            ).run()
    In tests, I can always receive the email with the correct subject, but the
    task
    and the
    new_state
    was always missing in the
    msg
    …Anyone has encountered similar problems?
    j
    • 2
    • 2
  • s

    Saulius Beinorius

    11/18/2020, 11:55 AM
    Hi guys! I was wondering what would be the best way to run spark jobs on AWS EMR with Prefect? I thought about creating an execution environment, but I'm not sure if that is really the best solution here. I am especially curious in how to manage step failures (we have a partial solution which runs a step monitor every once in a while and restarts steps, but that won't show up in Prefect), also cluster instance failures where the entire cluster needs to be restarted, but the finished steps do not.
    j
    • 2
    • 4
  • a

    Alex Bussan

    11/18/2020, 2:10 PM
    Hey all! I've been experimenting with
    KubernetesRun
    https://github.com/PrefectHQ/prefect/blob/master/src/prefect/run_configs/kubernetes.py My goal is to be able to set a different job template on a "per Flow Run basis". It seems currently this is set up for a "per flow" basis since you have to do:
    flow.run_config = KubernetesRun(...)
    flow.register(project_name=project_name)
    So it appears you have to re-register if you want different resource limits per flow run. Any ideas on how to best achieve this?
    j
    • 2
    • 12
  • p

    psimakis

    11/18/2020, 3:40 PM
    Hello everyone, I'm trying to configure a fargate agent but I probably miss something. Prefect server is running locally and I run a fargate agent using the following command:
    $ prefect agent fargate start cpu=256 memory=512 executionRoleArn=arn:aws:iam::xxxxxxxxx:role/ecsTaskExecutionRole networkConfiguration="{'awsvpcConfiguration':{'assignPublicIp': 'ENABLED', 'subnets': ['subnet-xxxxxx'], 'securityGroups': ['sg-xxxxxxxxxxx']}}" cluster=fargate-cluster
    Below you will find the flow:
    from prefect import Flow, task, Parameter
    from prefect.environments.storage import Docker
    
    
    @task
    def square_int(x):
        return x * x
    
    @task
    def sub_2(x):
        return x - 2
    
    
    with Flow(
        name='sp-flow-test',
        storage=Docker(registry_url='xxxxxxxx.dkr.ecr.eu-west-1.amazonaws.com',
                       image_name='prefecttest', image_tag='latest'),
    
    ) as flow:
        integer = Parameter('integer', default=2)
        x_sqr = square_int(integer)
        x_less_2 = sub_2(x_sqr)
    
    flow.register(project_name='test', idempotency_key=flow.serialized_hash())
    When I run the flow, the fargate task seems to run fine (check out the screenshot). The problem is that flow run stays
    Submitted
    forever. Have you any clue? Thanks.
    s
    • 2
    • 7
  • c

    Carlo

    11/18/2020, 3:41 PM
    Is there a way to register a cloud hook in python as part of registration?
    • 1
    • 1
  • a

    Alex Bussan

    11/18/2020, 4:29 PM
    I have been looking into running flows using a KubernetesRun like so:
    flow.run_config = KubernetesRun(
        job_template_path="my_custom_template.yaml"
    )
    vs configuring/running a flow this way (using KubernetesJobEnvironment)
    flow = Flow(
        "flow_w_jobenv",
        environment=KubernetesJobEnvironment(job_spec_file="job_spec.yaml"),
        storage=Docker()
    )
    I'm struggling to understand the differences/ tradoffs of configuring a job one way vs another. Both ways you can pass in information that a job spec would need. Then you register the flow and run it and the k8s agent picks it up. Is there any fundamental difference I'm missing?
    j
    • 2
    • 2
  • s

    Sean Harkins

    11/18/2020, 5:12 PM
    I'm completely new to Prefect and had some questions about pickling dependencies for containerized execution environments. We are working on a project that uses a thin wrapper over Prefect Flows to include some additional input and output information https://github.com/developmentseed/example-pipeline/blob/fargate_test/recipe/pipeline.py. My question is about how
    cloudpickle
    serializes my Flow's dependencies with S3 storage. Based on some information outlined here https://prefect-community.slack.com/archives/C014Z8DPDSR/p1605200879483400 I've got S3 storage configured and working but it seems that my Flow's upstream dependencies are not pickled when I register the flow as indicated in https://docs.prefect.io/core/advanced_tutorials/task-guide.html#task-inputs-and-outputs. Specifically I receive the following dependency error
    Unexpected error: ModuleNotFoundError("No module named 'h5netcdf'")
    . I can build an image with the necessary dependencies for use with the
    DaskExecutor
    and everything works correctly but our goal is to decouple our execution environment from Flows. Am I misunderstanding how Flow dependencies should be serialized by
    cloudpickle
    . Is there another approach I should be considering in this case? Thanks in advance.
    j
    c
    m
    • 4
    • 7
  • r

    Riley Hun

    11/18/2020, 5:55 PM
    Hi everyone, I'm using Prefect Core Server on a Compute Engine instance on Google Cloud Platform. Even though I specified the backend to be server, it keeps running my flows on the cloud backend. I'm just guessing that this could be the reason why my prefect tasks have failed. i.e. Here are the logs
    [2020-11-18 17:51:18+0000] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'ETL'
    [2020-11-18 17:51:20+0000] INFO - prefect.CloudFlowRunner | Flow run FAILED: some reference tasks failed.
    Also when I look into the kubernetes pod logs, it references Prefect Cloud.
    [2020-11-18 17:51:22+0000] CRITICAL - CloudHandler | Unable to write logs to Prefect Cloud
    Any guidance would be much appreciated!
    m
    j
    • 3
    • 2
  • m

    Marco Palmeri

    11/18/2020, 6:10 PM
    Hi. I have a question. I've tried rearranging my flow in a few different ways, but I still cannot get a mapped task to wait for all the proceeding mapped tasks to finish. Basically I have a flow where sometimes I will need to decrypt files, some sometimes not (based on a config). So I have a case() control flow task to do the "decryption branch." I start with a dict with some partition keys and a list of URIs for each. In the "decryption branch" I do two things to this dict. (1) transform the URIs to the new output paths (2) make a flat list of all the unique URIs. For (2) I map this and run the decryption task in parallel as a mapped task - this part is working just fine. Then I merge the dict from (1) with the an unchanged version of the dict for the cases where running the decryption should be skipped - this part is working just fine. My issue is then, I pass the resulting dict to mapped task to parallelize for each key. But the task starts up before the decryption mapping task is done. I need it to wait until all of the workers are done. - You can see I have even tried a "dummy" skip task, so I could merge the decrypting task with something from the false branch, and pass that to the final task. This didn't work.
    extract_region_transactions
    starts running as soon as any of the
    decrypt_file_task
    worker tasks complete. -- See code snippet in thread. ``````
    j
    • 2
    • 21
  • g

    Gabriel Montañola

    11/18/2020, 8:03 PM
    I found a related PR: https://github.com/PrefectHQ/prefect/pull/3649 Hi there my good people, how you doing today? I'm trying to get
    dbt
    error logs sent to my task log but they are coming up as blank and I've already set
    log_stderr
    to True in the task definition.
    Does someone know if I'm doing something wrong or is this a limitation/bug?
    👍 1
    j
    • 2
    • 5
  • i

    Isaac Brodsky

    11/18/2020, 9:19 PM
    I’m also seeing an issue with
    'PrefectSecret' object has no attribute 'secret_name'
    which started around November 11. Possibly there was a change to my execution environment like using a newer/different Prefect agent at that point. Do I need to reinstall the Prefect Agent?
    j
    • 2
    • 16
  • d

    Dolor Oculus

    11/18/2020, 9:32 PM
    Hi, I am working on a slack handler where I want to be notified only if the overall flow failed, not individual tasks. This is for a flow that is being registered via an agent, not run directly. I was looking at https://docs.prefect.io/core/advanced_tutorials/slack-notifications.html#customizing-your-alerts but it appears to be at task level, not flow level. Is there a good example in the docs for being notified on flow failure? ty!
    m
    • 2
    • 9
  • d

    Dolor Oculus

    11/18/2020, 10:18 PM
    Logging -- we've added
    log_stdout=True
    to all of our tasks, and the log messages are showing up just fine in the agent's stdout, but we'd like them to also be available on the web ui. We've discovered that by getting the logger from the context, we can do this, but this is a pretty invasive change for our codebase (most of our modules are using standard logging and we don't want to change that). Is there any magic we can invoke to get our logger messages to show up on the web ui?
    j
    c
    m
    • 4
    • 29
  • j

    John Grubb

    11/18/2020, 10:35 PM
    Untitled
  • j

    John Grubb

    11/18/2020, 10:35 PM
    Hi, I'm attempting to register a flow from the CLI to the cloud, and keep meeting with this error, and I'm hoping someone has seen this before and can tell me what mistake I'm making.
    👀 1
    n
    • 2
    • 16
  • r

    Riley Hun

    11/18/2020, 10:40 PM
    How do we add add 
    host.docker.internal
     to 
    /etc/hosts
     via 
    --add-host
    ? Is this something we add to the running agent or the config.toml?
    j
    d
    • 3
    • 44
  • s

    simone

    11/19/2020, 2:45 PM
    Hi I have a function wrapped in a
    @task
    decorator. When the function is executed in a flow everything runs fine. If I write the same code by subclassing
    Task
    I get a pickling error:
    pickle.UnpicklingError: NEWOBJ class argument isn't a type object
    The issue happened in a couple of other functions. I guess there is something basic in the Task construction that I am missing. I will appreciate if you can shine some light on this issue! Thanks Example of the code: Gist Tasks code
    s
    d
    • 3
    • 25
  • k

    Kevin Weiler

    11/19/2020, 6:14 PM
    hi prefecters - I’m having an issue with docker storage. I’m using a base image that has the environment and all the necessary code to run my flow. This is a conda environment that has prefect installed. When I attempt to store the flow container, it tries to install prefect over top of my conda installed version. This issue is compounded by the fact that the environment is owned by root and not the user of the container. Is there some way to tell prefect to not install the prefect package via pip?
    d
    • 2
    • 6
  • m

    Mike Grabenstein

    11/19/2020, 9:01 PM
    Hi New to Prefect, wondering if anyone is using Prefect to manage data flows with tasks written in Node, Java or Clojure. And how would you do it? Shell task that kicks the Node or Java/Clojure code? Or Container autorun like task? Thanks.
    d
    • 2
    • 11
  • w

    Walt Wells

    11/20/2020, 12:16 AM
    for any great expectations users - how are teams making their GE configuration directory available in their task worker environments? https://docs.prefect.io/api/latest/tasks/great_expectations.html#rungreatexpectationscheckpoint
    r
    • 2
    • 2
  • s

    Saulius Beinorius

    11/20/2020, 8:10 AM
    Hi, is there a way to have task timeouts - to mark the task as failed if it runs too long (and then execute some code to stop it, for example)? In my case I can code that into the task itself, but it would be nice if there was some built-in mechanism.
    n
    • 2
    • 4
Powered by Linen
Title
s

Saulius Beinorius

11/20/2020, 8:10 AM
Hi, is there a way to have task timeouts - to mark the task as failed if it runs too long (and then execute some code to stop it, for example)? In my case I can code that into the task itself, but it would be nice if there was some built-in mechanism.
n

nicholas

11/20/2020, 2:59 PM
Hi @Saulius Beinorius - the task constructor takes a kwarg
timeout
, to which you can pass a number (in seconds) to wait before timing out the task. For more information on the
timeout
kwarg and other task kwargs, take a look at the Task API.
s

Saulius Beinorius

11/20/2020, 3:08 PM
Thanks! And if I see correctly, the
on_failure
parameter answer the second part of my question 🙂
n

nicholas

11/20/2020, 3:15 PM
Ah yes sorry! I missed the question in the parenthesis 🙂
s

Saulius Beinorius

11/20/2020, 3:30 PM
No problem, thanks for the help!
😄 1
View count: 1