https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • a

    Ayyanar Thangaraj

    06/16/2021, 7:21 AM
    Hello all
    👋 2
    k
    • 2
    • 1
  • t

    Thomas Hoeck

    06/16/2021, 7:23 AM
    Hi, I'm having trouble with what looks like a Prefect Cloud API problem. I had a flow running which failed after an hour with the following error. The flow had 10+ tasks complete successfully first. I'm running it with Prefect Cloud and K8 agent. The flow have been running successfully before.
    Failed to set task state with error: ClientError([{'path': ['set_task_run_states'], 'message': '[{\'extensions\': {\'path\': \'$.selectionSet.billing_license.selectionSet.account_name\', \'code\': \'validation-failed\'}, \'message\': \'field "account_name" not found in type: \\\'billing_license\\\'\'}]', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}])
    Traceback (most recent call last):
      File "/root/miniconda3/lib/python3.8/site-packages/prefect/engine/cloud/task_runner.py", line 98, in call_runner_target_handlers
        state = self.client.set_task_run_state(
      File "/root/miniconda3/lib/python3.8/site-packages/prefect/client/client.py", line 1503, in set_task_run_state
        result = self.graphql(
      File "/root/miniconda3/lib/python3.8/site-packages/prefect/client/client.py", line 319, in graphql
        raise ClientError(result["errors"])
    prefect.utilities.exceptions.ClientError: [{'path': ['set_task_run_states'], 'message': '[{\'extensions\': {\'path\': \'$.selectionSet.billing_license.selectionSet.account_name\', \'code\': \'validation-failed\'}, \'message\': \'field "account_name" not found in type: \\\'billing_license\\\'\'}]', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
    I'm running Prefect 0.14.7 in the flows and the agent is 0.14.17
    a
    j
    m
    • 4
    • 5
  • a

    Ayyanar Thangaraj

    06/16/2021, 7:24 AM
    Can you please help me to send a eamil using EmailTask notification task?
    m
    • 2
    • 13
  • j

    Jeremy Phelps

    06/16/2021, 10:27 AM
    Hi, can someone help me with this error? https://cloud.prefect.io/stockwell/flow-run/f82f5a87-e196-4019-a332-d27355fda2dd?logs=
    KeyError: 'Task slug disable_sound_on_mixer_error_stores-1 not found in the current Flow; this is usually caused by changing the Flow without reregistering it with the Prefect API.'
    The slug is a transformation of the name of a flow. Prefect replaced the hyphens with
    _
    and added
    -1
    to the end. The error is happening when trying to run a different flow. Both flows are defined in the same Python source file. Both flows were registered with the latest code, contrary to the suggestion in the error message.
    👀 1
    z
    k
    • 3
    • 29
  • j

    João Amorim

    06/16/2021, 11:56 AM
    Hi, is it common for scheduled tasks to be submitted more than once when starting? This does not occur when run by "quick run"... Anyone else having the same issue or know how to fix it?
    m
    k
    • 3
    • 8
  • b

    Brad I

    06/16/2021, 3:43 PM
    Hi, just wondering if there is a way to clear the idempotency key cache through graphql? We are evaluating Prefect and testing flow runs from a different backend service, but if our hash matches a run that failed, it will just return that run ID instead of creating a new run. So it would be great if we could pass a flag to first clear the cache and then create the flow run during testing.
    m
    • 2
    • 4
  • a

    Alexander

    06/16/2021, 4:51 PM
    Hello. We are trying to setup ECS prefect run config with prefect cloud and this thing just refuses to work out of the box. There is a huge lack of logging inside ecs agent, it does a lot of work by generating task definition but never actually shows generated definition, if registration fails. First issue I faced was that task definision generated by default was invalid, and i was receiving
    An error occurred (InvalidParameterException) when calling the RunTask operation: Task definition does not support launch_type FARGATE.
    because generated definition lacks
    requiresCompatibilities
    field. Trying to solve that with custom task definition, i constantly run into different errors like missing field here and there and a problem that definition registration script patches my task definition with its invalid values. Right now i am unable to fight a problem when generated definition have null container name, though docker storage, and run config have them set. I can not debug anything without any logging.
    z
    m
    c
    • 4
    • 9
  • j

    jcozar

    06/16/2021, 5:27 PM
    Hi! Is there any way to test an automation with slack? I configured an automation so every flow that changes its state to failed send a notification to slack, but it’s not working. Should I use https:// or just the following part of the webhook? Thank you in advance!
    z
    k
    m
    • 4
    • 7
  • p

    Paul Schale

    06/16/2021, 6:59 PM
    Hi, I found a bug with the flow idempotency key generation. When you use an object created with
    typing.NewType()
    for a task type hint, the idempotency key becomes non-deterministic. From what I can tell, it’s because
    typing.NewType()
    actually returns a function, and calling
    repr
    or
    str
    on a function includes the memory address in the result. Here is a simple flow that reproduces the issue:
    import prefect
    import typing
    
    
    MyNewType = typing.NewType("MyNewType", str)
    
    
    @prefect.task(name="print_something")
    def print_something(input_str: MyNewType):
        print(input_str)
    
    
    with prefect.Flow("my_flow") as flow:
        print_something("test_str")
    
    # This is different every time script is run
    print(flow.serialized_hash())
    
    # Output includes memory address, e.g.
    # "<function NewType.<locals>.new_type at 0x7fa6c71c34c0>"
    print(list(flow.tasks)[0].inputs()["input_str"]["type"]
    As for fixing this, I don’t know enough about the serialization process to know where to start. It can be mitigated by wrapping the type hint in
    typing.Optional[]
    . I’m running prefect version 0.14.22 on python 3.9.1
    m
    m
    • 3
    • 3
  • h

    Hugo Kitano

    06/16/2021, 9:42 PM
    I’m trying to start up an ECS Agent (with the agent runner token) and getting the following error:
    ERROR - proteograph-ecs-agent | Failed to infer default networkConfiguration, please explicitly configure using
    --run-task-kwargs`` with the most recent prefect version. I’ve run it successfully with other ec2 instances so i’m not sure what’s going on
    m
    • 2
    • 8
  • e

    Evan Crook

    06/16/2021, 10:47 PM
    hello all! I'm encountering a weird condition that seems like a bug, or maybe I'm just not understanding it properly. (Minimal example in thread) so essentially we want to run a pipeline of
    extract -> map over might_fail -> map over show
    and any failed mapped
    might_fail
    instance passes its result to
    handle_error
    .
    might_fail
    also includes retries. However what I'm seeing is: the task
    might_fail[3]
    starts its task run, hits a
    RuntimeError
    , enters a
    Retrying
    state, and then finishes its task run with state
    Retrying
    . It does not do any retries. Then the rest of the mapped tasks finish and we proceed to `handle_failure`; and
    handle_failure[3]
    says
    "Not all upstream states are finished; ending run."
    (because the upstream state is
    Retrying
    , not
    Failed
    ) and then
    Finished task run for task with final state: 'Pending'
    . So
    handle_failure
    never actually runs at all. If I take out retries on
    might_fail
    it works as expected. But in the real-world example this is mimicking,
    might_fail
    hits an API prone to rate limits / transient failures so we actually want to retry it, and then trigger
    handle_failure
    , only if it's entered a failed state after retrying a few times. Does this make sense? Is this a bug or am I just doing something terribly wrong? (happy to provide logs from running this minimal example, too, if it's helpful) thanks so much in advance! 🌟
    k
    z
    • 3
    • 15
  • s

    Sumit Kumar Rai

    06/17/2021, 3:39 AM
    I've set a Prefect Cloud secret, but I get error as in the screenshot. The prefect doc says "Secrets are resolved locally first, falling back to Prefect Cloud (if supported)". What am I missing? I'm using below command to retrieve the secret.
    from prefect.client import Secret
    
    GITHUB_ACCESS_TOKEN = Secret("GITHUB_ACCESS_TOKEN").get()
    k
    a
    • 3
    • 10
  • g

    g.suijker

    06/17/2021, 7:35 AM
    Hi all! Since today I'm getting the following errors when building the flow's docker storage, yesterday it all worked fine. Any ideas why I'm getting this error and how to solve it?
    E: Failed to fetch <https://packages.microsoft.com/debian/10/prod/pool/main/u/unixodbc/odbcinst_2.3.7_amd64.deb>  404  Not Found [IP: 104.214.230.139 443]
    E: Failed to fetch <https://packages.microsoft.com/debian/10/prod/pool/main/u/unixodbc/unixodbc-dev_2.3.7_amd64.deb>  404  Not Found [IP: 104.214.230.139 443]
    E: Failed to fetch <https://packages.microsoft.com/debian/10/prod/pool/main/u/unixodbc/odbcinst1debian2_2.3.7_amd64.deb>  404  Not Found [IP: 104.214.230.139 443]
    E: Failed to fetch <https://packages.microsoft.com/debian/10/prod/pool/main/u/unixodbc/libodbc1_2.3.7_amd64.deb>  404  Not Found [IP: 104.214.230.139 443]
    E: Failed to fetch <https://packages.microsoft.com/debian/10/prod/pool/main/u/unixodbc/unixodbc_2.3.7_amd64.deb>  404  Not Found [IP: 104.214.230.139 443]
    E: Failed to fetch <https://packages.microsoft.com/debian/10/prod/pool/main/m/msodbcsql17/msodbcsql17_17.7.2.1-1_amd64.deb>  404  Not Found [IP: 104.214.230.139 443]
    E: Unable to fetch some archives, maybe run apt-get update or try with --fix-missing?
    I'm using a custom docker file as provided here https://docs.prefect.io/orchestration/recipes/configuring_storage.html:
    FROM prefecthq/prefect:0.14.10-python3.8
    
    # install some base utilities
    RUN apt update && apt install build-essential -y build-essential unixodbc-dev && rm -rf /var/lib/apt/lists/*
    RUN apt-get update && apt-get install curl -y
    
    # install mssql-tools
    RUN curl <https://packages.microsoft.com/keys/microsoft.asc> | apt-key add -
    RUN curl <https://packages.microsoft.com/config/debian/10/prod.list> > /etc/apt/sources.list.d/mssql-release.list
    RUN apt-get update && ACCEPT_EULA=Y apt-get install msodbcsql17 -y
    RUN ACCEPT_EULA=Y apt-get install mssql-tools -y
    
    # update bash configuration
    RUN echo 'export PATH="$PATH:/opt/mssql-tools/bin"' >> ~/.bash_profile
    RUN echo 'export PATH="$PATH:/opt/mssql-tools/bin"' >> ~/.bashrc
    
    # update OpenSSL configuration file
    RUN sed -i 's/TLSv1\.2/TLSv1.0/g' /etc/ssl/openssl.cnf
    RUN sed -i 's/DEFAULT@SECLEVEL=2/DEFAULT@SECLEVEL=1/g' /etc/ssl/openssl.cnf
    r
    • 2
    • 2
  • s

    Sumit Kumar Rai

    06/17/2021, 7:51 AM
    I have a
    pipelinewise
    command as a shell task in a prefect flow. The command takes config files and a state file as an input parameter. The command executes and updates the state file at the end. What is the best way to retain the state file and also read the latest state file before executing the task?
    k
    • 2
    • 3
  • f

    Florian Kühnlenz

    06/17/2021, 8:17 AM
    Hi everyone, is it possible to set up a flow sla for a group of flows? In the UI it seems only possible for one flow at a time.
    k
    • 2
    • 1
  • s

    Stéphan Taljaard

    06/17/2021, 9:27 AM
    Hi. Is there a way to create a Secret dynamically, as a step in a flow? I submitted a PR for SendGrid.SendEmail some time ago. I based it on other tasks in the task library, so I had the secret name passed as an argument, instead of the secret value. Now I have a GCP Secret named
    shared_credentials
    with contents
    {"sendgrid-api-key": "abc", "some-other-api-key": "efg", ...}
    , Is there a way to read the
    sendgrid-api-key
    value from
    shared_credentials
    , then create a temporary secret in the flow named
    SENDGRID_API_KEY
    , as required by the SendGrid task? Or other ideas to use the value inside of the secret?
    k
    • 2
    • 4
  • d

    Domantas

    06/17/2021, 10:36 AM
    Hello Prefect, I'm getting an error
    TypeError: cannot pickle '_thread.lock' object
    (I'll paste full error code in the comments) and I'm out of ideas how to properly solve it. It is related with
    DaskExecutor
    and it appears when trying to proceed this task(I'll upload code sample in the comments): 1. Read file content from a yml file which is located in the S3 storage. For file reading I'm using raw
    boto3
    implementation. 2. Read bytes from the downloded yml file 3. Load yml file and convert it into list Does anyone knows a solution to this problem?
    a
    d
    • 3
    • 4
  • b

    Ben Muller

    06/17/2021, 11:14 AM
    Hey prefect peeps, if a flow fails midway (say task 3 of 5) are we able to retry that task in order to get the flow to succeed or do we need to retry the entire flow from the beginning again?
    a
    • 2
    • 1
  • c

    ciaran

    06/17/2021, 1:54 PM
    Eyo 👋 I'm trying to get fancy by specifying both
    pod_template
    and
    scheduler_pod_template
    in my
    dask_kubernetes.KubeCluster
    DaskExecutor
    . Both the templates (for now) are exactly the same:
    "pod_template": make_pod_spec(
        image=os.environ["BAKERY_IMAGE"],
        labels={"flow": flow_name},
        env={
            "AZURE_STORAGE_CONNECTION_STRING": os.environ[
                "FLOW_STORAGE_CONNECTION_STRING"
            ]
        },
    ),
    "scheduler_pod_template": make_pod_spec(
        image=os.environ["BAKERY_IMAGE"],
        labels={"flow": flow_name},
        env={
            "AZURE_STORAGE_CONNECTION_STRING": os.environ[
                "FLOW_STORAGE_CONNECTION_STRING"
            ]
        },
    ),
    If I try to run a flow with both declared, my Dask Scheduler pod fails with:
    Traceback (most recent call last):  File "/srv/conda/envs/notebook/bin/dask-worker", line 8, in <module>
        sys.exit(go())
      File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/cli/dask_worker.py", line 462, in go
        main()
      File "/srv/conda/envs/notebook/lib/python3.8/site-packages/click/core.py", line 1137, in __call__
        return self.main(*args, **kwargs)
      File "/srv/conda/envs/notebook/lib/python3.8/site-packages/click/core.py", line 1062, in main
        rv = self.invoke(ctx)
      File "/srv/conda/envs/notebook/lib/python3.8/site-packages/click/core.py", line 1404, in invoke
        return ctx.invoke(self.callback, **ctx.params)
      File "/srv/conda/envs/notebook/lib/python3.8/site-packages/click/core.py", line 763, in invoke
        return __callback(*args, **kwargs)
      File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/cli/dask_worker.py", line 406, in main
        nannies = [
      File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/cli/dask_worker.py", line 407, in <listcomp>
        t(
      File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/nanny.py", line 220, in __init__
        host = get_ip(get_address_host(self.scheduler.address))
      File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/addressing.py", line 142, in get_address_host
        return backend.get_address_host(loc)
      File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/tcp.py", line 572, in get_address_host
        return parse_host_port(loc)[0]
      File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/addressing.py", line 90, in parse_host_port
        port = _default()
      File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/addressing.py", line 69, in _default
        raise ValueError("missing port number in address %r" % (address,))
    ValueError: missing port number in address '$(DASK_SCHEDULER_ADDRESS)'
    But if I only declare the
    pod_template
    , everything works out great. I'm assuming the fact I declare the
    scheduler_pod_template
    means I'm losing some default setup somewhere down the line?
  • j

    Joe

    06/17/2021, 2:17 PM
    Hello world! Have any of you ever encountered this when trying to connect an agent to a remote server?
    Failed to establish a new connection: [WinError 10061] No connection could be made because the target machine actively refused it'))
    k
    • 2
    • 10
  • l

    liren zhang

    06/17/2021, 3:35 PM
    Hi experts, we are currently having problem with retrying failed flow starting from the point of the failure. We created a master control flow to using
    StartFlowRun
    to invoke individual flows in their defined orders. All the flows run in their own docker container. When master flow execution flow fails due to specific failed execution, it was not working. I read in docs and said that docker run does not yet support retaining results/inputs/outputs. I read a thread here by @Kevin Kho and seemed to have suggested that PrefectResult may be a workaround if we don't care about the data we push to Prefect Cloud. https://prefect-community.slack.com/archives/CL09KU1K7/p1622645858327700 I am not exactly sure how PrefectResult can be used in our scenario. Please help!
    k
    • 2
    • 19
  • z

    Zach Schumacher

    06/17/2021, 3:37 PM
    morning - opened this discussion for adding first class support for sentry https://github.com/PrefectHQ/prefect/discussions/4672
    👍 3
  • j

    Jonathan Pou

    06/17/2021, 3:48 PM
    Hello everyone! Does anyone have a recommendation for a Data Catalog tool that would be compatible with Prefect? I haven't been able to find one that would integrate with Prefect but would like to be able to see data lineage from the different elements in our data stack.
    k
    z
    • 3
    • 3
  • r

    Riley Hun

    06/17/2021, 5:09 PM
    Hello everyone, For ephemeral Dask Cluster creation and execution of a Prefect workflow, what is the recommended approach -- Resource Managers or Dask Gateway? We are using Dask Gateway right now, but recently learned about Resource Managers so contemplating if it's necessary to switch over. If anyone has some insight into some advantages/disadvantages of both approaches, including cost-effectiveness, scalability, etc., please kindly advise. @Eric Jurotich
    k
    e
    • 3
    • 3
  • b

    Ben Muller

    06/18/2021, 6:14 AM
    Hey everyone, how do you use S3 storage and specify a different boto profile to use when you deploy locally?
    n
    • 2
    • 2
  • b

    Ben Muller

    06/18/2021, 6:14 AM
    eg. my default is staging and I want to
    --profile staging
  • b

    Ben Muller

    06/18/2021, 7:00 AM
    Another question: code in 🧵
    k
    • 2
    • 13
  • r

    Rustam Iskenderov

    06/18/2021, 10:00 AM
    Hi everyone, I tryed to create FLow-of-Flows with StartFlowRun. I made simple example https://www.codepile.net/pile/O0OLQXGb , but it gives a strange result. Each flow returns the Success status, but the flow "proccess" didn't run actually. How to fix it?
    [2021-06-18 12:33:54+0300] INFO - prefect.FlowRunner | Beginning Flow run for 'download'
    [2021-06-18 12:33:54+0300] INFO - prefect.TaskRunner | Task 'check_file': Starting task run...
    Checking file availability plip ...
    [2021-06-18 12:33:54+0300] INFO - prefect.TaskRunner | Task 'check_file': Finished task run for task with final state: 'Success'
    [2021-06-18 12:33:54+0300] INFO - prefect.TaskRunner | Task 'download_file': Starting task run...
    Downloading file plip ...
    [2021-06-18 12:33:54+0300] INFO - prefect.TaskRunner | Task 'download_file': Finished task run for task with final state: 'Success'
    [2021-06-18 12:33:54+0300] INFO - prefect.TaskRunner | Task 'Flow process': Starting task run...
    [2021-06-18 12:33:55+0300] INFO - prefect.TaskRunner | Task 'Flow process': Finished task run for task with final state: 'Success'
    [2021-06-18 12:33:55+0300] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
    k
    • 2
    • 15
  • j

    jcozar

    06/18/2021, 10:01 AM
    Hi all! I think that I read about this before but now I can't find it. I want to schedule a ETL daily to query some databases and load the data in an object store. If the prefect agent disconnects (let's say because of Internet connection problems) for more than one day, there would be more than one flow run queued. When the agent comes up again I would like that only the last flow run is executed and the others are cancelled. Is there something to implement this in prefect? Thank you very much!
    k
    • 2
    • 8
  • m

    Michael Law

    06/18/2021, 10:04 AM
    Hey guys, is there anyway to register flows in an idempotent way, so it is unchanged then it doesn't a flow as a new version?
    s
    • 2
    • 2
Powered by Linen
Title
m

Michael Law

06/18/2021, 10:04 AM
Hey guys, is there anyway to register flows in an idempotent way, so it is unchanged then it doesn't a flow as a new version?
s

Stéphan Taljaard

06/18/2021, 10:17 AM
Hi. You can try this
flow.register(
    project_name=flow_build.project,
    idempotency_key=flow.serialized_hash(),
)
👍 2
m

Michael Law

06/18/2021, 10:22 AM
Tremendous Stéphan, ty
View count: 1