https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • l

    Leon Kozlowski

    05/06/2022, 4:27 PM
    If I have a complex schedule with an hourly interval clock and a cron clock - will there be 2 runs 1 of the clocks take precedence for an overlap, or will there be 2 scheduled runs?
    k
    a
    12 replies · 3 participants
  • e

    Evan Curtin

    05/06/2022, 6:12 PM
    Hey all, has anybody successfully started a dask cluster + prefect on top of an existing spark cluster (databricks)? I have a databricks cluster but want to use distributed prefect to schedule
    a
    k
    7 replies · 3 participants
  • z

    Zach Schumacher

    05/06/2022, 8:03 PM
    whats the correct pattern for spinning our own cloud runner? we simply want to add some initialization to
    CloudFlowRunner
    so that we can initialize sentry and datadog as early as possible, as well as flush our statsd buffer to track metrics. it seems like the place i'd expect it to read from the env var is hardcoded tho https://github.com/PrefectHQ/prefect/blob/master/src/prefect/agent/kubernetes/agent.py#L562-L586
    a
    j
    +1
    15 replies · 4 participants
  • m

    Mars

    05/06/2022, 8:03 PM
    Hi, does Prefect Cloud support multi-factor authentication? I’m looking through the features table at https://cloud.prefect.io/plans, and through my admin account profile page, and I don’t see a MFA option listed.
    a
    2 replies · 2 participants
  • x

    Xavier Babu

    05/06/2022, 8:04 PM
    I am getting this error while executing a flow via Cloud Prefect 2.0 "RuntimeError: This portal is not running". Any clue what is going wrong. The same flow was working fine before I use Cloud Prefect 2.0. Actually it finishes all my flows and tasks successfully and at the end it crashes and it happens to all my existing flows. line 1512, in propose_state raise prefect.exceptions.Abort(response.details.reason) prefect.exceptions.Abort: This run has already terminated.
    m
    31 replies · 2 participants
  • d

    Darin Douglass

    05/06/2022, 8:27 PM
    is there a way to find which tasks are holding the task tag concurrency locks? we’ve a case where our lock is taken up by a task when no flows are running
    a
    k
    25 replies · 3 participants
  • d

    David Haynes

    05/06/2022, 9:07 PM
    Fun with dataframes and map. So I have a map function that loads a dataframe in one task and then uses that dataframe in the next task. When I look at the return type on the first task, it is pandas.core.frame.DataFrame. When I look at the type of the dataframe value in the second task, it is class 'list'. I have the type hint set to pandas.DataFrame in the second task. Does anyone have any idea why the DataFrame is becoming a list?
    k
    8 replies · 2 participants
  • k

    Kathryn Klarich

    05/06/2022, 9:46 PM
    Our prefect agent has been running without issue on ECS for the past ~11 months (since June 28 2021), but about 1 hour ago we noticed we were unable to submit new flows. Upon further investigation it appears our agent crashed and has been trying to restart but keeps crashing. I’m not sure if this is what initially caused it to crash but the error message we are seeing in cloudwatch logs now is :
    ValueError: You have not set an API key for authentication.
    Is anyone familiar with this error? Was something updated in prefect that could have caused this crash?
    m
    k
    8 replies · 3 participants
  • c

    Cole Murray

    05/07/2022, 3:53 AM
    Hey Prefect community, I'm working on deploying an instance of an Orion server to ECS using AWS CDK. One issue I've run into is how the database url is being specified via env vars. In ECS, there is a distinction between environment_vars and secrets and, to my knowledge, they cannot be concatenated. This presents a dilemma as you know must: • Have the database password in the clear as an env var. ◦ This also opens up an issue, where if the password is updated, it is not reflected until the next deployment. • resolve the secret key, concat with the whole URI, put into SSM
    orionServer.addContainer('ServiceContainer', {
                essential: true,
                image: ContainerImage.fromDockerImageAsset(serviceImage),
                logging: new AwsLogDriver({
                    streamPrefix: 'orion-server'
                }),
                portMappings: [{
                    containerPort: this.props.orionEnvVars?.port || Number(this.CONTAINER_PORT)
                }],
                environment: {
                    PREFECT_ORION_API_HOST: this.props.orionEnvVars?.host || '0.0.0.0',
                    PREFECT_ORION_API_PORT: this.props.orionEnvVars?.port?.toString() || this.CONTAINER_PORT,
                    PREFECT_ORION_DATABASE_CONNECTION_URL: `postgres+asyncpg:///${this.props.username}:${ecsSecret.fromSecretsManager(this.props.dbPassword)}@${this.props.databaseHost}/orion`
    
                },
                secrets: {
    
                },
                memoryLimitMiB: 300,
            });
    We can overcome this by altering the settings provided, and fetching the username and password separately from the host to build the DB URI, perhaps as a second set of options. Has anyone found an alternate way to concat the strings together? If not, are we open to adding additional options to distinguish DB_PASSWORD to be injected as secrets?
    a
    3 replies · 2 participants
  • s

    Slackbot

    05/07/2022, 6:40 PM
    This message was deleted.
    k
    l
    2 replies · 3 participants
  • l

    Laksh Aithani

    05/07/2022, 7:16 PM
    Hello everyone, I’m working on running a very simple Prefect 2.0 workflow using a remote Ray cluster. More details in the thread below
    error.txt
    k
    a
    5 replies · 3 participants
  • a

    Artem Vysotsky

    05/07/2022, 10:58 PM
    friends, I’m trying to setup prefect on my k8s cluster using the official helm chart. On top of that I want to enable hasura admin secrets. It seems like when I’m doing it the graphql service is not starting. Correct me if I’m wrong but looks like
    database upgrade
    command does not respect
    PREFECT_SERVER__HASURA__ADMIN_SECRET
    value.
    a
    1 reply · 2 participants
  • j

    Jan Domanski

    05/08/2022, 1:30 PM
    Hi there everyone, I’m trying to understand how to do simple map tasks with prefect2
    @task
    def generate_numbers():
        return [1, 2, 3, 4]
    
    @task 
    def compute_sth_expensive(number):
        return number ** 2
    
    @flow
    def pipeline():
        result_generate_numbers = generate_numbers()
        results = map(compute_sth_expensive, result_generate_numbers)
        for r in results: r.result() ## ??
    Is that an acceptable pattern? I want to do a parallel calculation over the
    result_generate_numbers
    and then perform some gather-like operation
    a
    10 replies · 2 participants
  • a

    Artem Vysotsky

    05/08/2022, 3:44 PM
    hey community, i’m switching to Prefect 2.0 cloud now and trying to programmatically register a deployment in the cloud. I have a webhook that when called need to register a deployment in the deployment cloud. What do I need to do to achieve this? Right now I have a very simple piece of code that fails because it calls into sqlite3 which is unexpected. I thought that Prefect will just call api cloud w/o any databases. How do I configure the process so that it starts calling the cloud api?
    import flow
    from prefect.deployments import DeploymentSpec
    from prefect.orion.schemas.schedules import CronSchedule
    
    
    def create_deployment(name: str,
                          user_id: str,
                          job_id: str,
                          schedule: str):
        d = DeploymentSpec(
            flow=flow.flow,
            name=name,
            schedule=CronSchedule(
                cron=schedule
            ),
            parameters={
                user_id: user_id,
                job_id: job_id
            }
        )
    
        d.create_deployment()
    k
    2 replies · 2 participants
  • i

    Ievgenii Martynenko

    05/08/2022, 6:13 PM
    Hi, What is the best approach to be able to run flow locally not changing Flow() parameters? don't want to re-invent the wheel. way it should be deployed and kept in code:
    with Flow(name="Flow", executor=executor, storage=storage, run_config=run_config) as flow:
    way it can start locally:
    with Flow(name="Flow") as local_flow:
    flow itself remains the same.
    a
    2 replies · 2 participants
  • s

    Sander

    05/08/2022, 6:33 PM
    Hi, I've set up a Postgres Orion database for registering the states of flows/tasks etc. I noticed they create a structure in public schema. Is it possible to have it create the structure in say prefect-Orion? Use case is that I may want to set up logical replication from that database to another and I would prefer to avoid public (it's already in use by some other database. )
    a
    2 replies · 2 participants
  • s

    Suresh R

    05/09/2022, 2:41 AM
    Hi! How to migrate from Prefect cloud 1.0 to 2.0?
    k
    1 reply · 2 participants
  • s

    SeungHyeon Wang

    05/09/2022, 3:20 AM
    Hi! I’m just a beginner with prefect. I was trying to run the agent on the ec2 t2.micro instance, but an out of memory error occurred while running it, so I think it is an insufficient memory problem, so I am trying to upgrade the ec2 instance. Are there any recommended specs for running the agent?
    k
    4 replies · 2 participants
  • i

    Ievgenii Martynenko

    05/09/2022, 8:50 AM
    Morning, I'm trying to register flow from another python (like it's described in https://github.com/PrefectHQ/prefect/discussions/4042) My code which does registration:
    import os.path as p
    
    from <http://flows.xxx|flows.xxx> import testflow
    from prefect.storage import S3
    
    flows = [testflow]
    
    root = p.dirname(p.realpath(__file__))
    
    storage = S3(stored_as_script=True, key='testflow.py', bucket='test')
    
    if __name__ == '__main__':
    
        for flow_file in flows:
            flow = flow_file.flow
    
            print(f"Registering flow {flow.name} from {flow_file}")
    
            storage.add_flow(flow)
    
            flow.register(
                project_name='test',
                idempotency_key=flow.serialized_hash()
            )
    Flow itself:
    name = "testflow"
    executor = LocalDaskExecutor()
    storage = S3(stored_as_script=True, key='testflow.py', bucket='test')
    run_config = KubernetesRun(
        job_template_path='<https://XXX/job_template/k8s_job_template.yaml>')
    
    with Flow(name=name, executor=executor, storage=storage, run_config=run_config) as flow:
    	....
    Error I've got is the same when you're not adding storage to DataFlow:
    Failed to load and execute flow run: ValueError('Flow is not contained in this Storage')
    What am I missing?
    a
    19 replies · 2 participants
  • n

    Nacho Rodriguez

    05/09/2022, 8:54 AM
    Hello everyone! Is Orion ready to be used in PROD? We are coding some new pipelines and we want to start using it from the beginning instead of upgrading in the future. Thank you in advance!
    a
    2 replies · 2 participants
  • f

    Florian Guily

    05/09/2022, 10:08 AM
    Hey guys, i have a quick question about conditions in flow definition. I use a boolean parameter and build the flow depending on its value. I'm planning to use a prefect case to do so where i will define new variables from tasks call. Are those definition globally scoped so i can use them "outside" of the prefect case or are they locally scoped ?
    a
    1 reply · 2 participants
  • a

    Assaf Ben Shimon

    05/09/2022, 11:29 AM
    Hi all, I have an authentication problem:
    prefect.exceptions.AuthorizationError: Malformed response received from Cloud - please ensure that you are authenticated. See `prefect auth login --help`.
    However when I run
    prefect auth status
    it says I'm connected and authenticated. Any idea what's wrong?
    a
    20 replies · 2 participants
  • r

    Rafael Afonso Rodrigues

    05/09/2022, 12:45 PM
    Hello, is there a simple way in OSS Prefect v1 to set container images at the task level when using kubernetes executor? From my understanding the answer is no, but could be wrong. An example is a flow containing N tasks of native type, e.g. ShellTask/PythonTask, each relying on a (possibly) separate container image.
    a
    5 replies · 2 participants
  • a

    Artem Vysotsky

    05/09/2022, 12:51 PM
    how does Prefect 2.0 deal with dependencies? Does agent need all deps that flow requires? Also, what if the flow file depends on local files? I.e. Here is my
    flow
    directory:
    (venv) ➜  src git:(main) ✗ ls -la flow                                 
    drwxr-xr-x  6 avysotsky  staff   192 May  9 08:47 .
    drwxr-xr-x  5 avysotsky  staff   160 May  9 08:34 ..
    -rw-r--r--  1 avysotsky  staff  3301 May  9 08:47 flow.py
    -rw-r--r--  1 avysotsky  staff   870 May  8 10:11 graphql.py
    -rw-r--r--  1 avysotsky  staff   611 May  9 08:47 prefect_client.py
    And here is how I create the deployment:
    d = DeploymentSpec(
            flow_location="./flow/flow.py",
            name=name,
            schedule=CronSchedule(
                cron=schedule
            ),
            tags=[
                f"user_id:{user_id}",
                f"job_id:{job_id}"
            ],
            parameters={
                user_id: user_id,
                job_id: job_id
            }
        )
    Is Prefect smart enough to pull in all the deps that flow.py needs?
    a
    17 replies · 2 participants
  • s

    Shuchita Tripathi

    05/09/2022, 1:39 PM
    Hi, I am trying to set upstream tasks for a given task dynamically. It is working fine however I do see some extra tasks listed up which don't do anything (screenshot -1). Why is that happening? When I am printing set_upstream tasks in console, it is showing just the tasks I want, however while executing, I see more (screenshot-2).
    k
    1 reply · 2 participants
  • r

    Rhys Mansal

    05/09/2022, 1:54 PM
    Hi everyone, I was wondering if anyone knew a way to set a
    resource_manager
    cleanup
    method upstream of another task?
    k
    e
    10 replies · 3 participants
  • j

    Jason

    05/09/2022, 2:07 PM
    Is there documenting on using attachments with the https://docs.prefect.io/api/latest/tasks/notifications.html#emailtask that I'm missing? I'd like to use the result of one task, a Pandas DF, and attach df.to_csv(..name..) to the email. It asks for the "names of files" but are these PathLikes
    k
    2 replies · 2 participants
  • m

    Malthe Karbo

    05/09/2022, 2:09 PM
    In 2.0, is there an equivalent to modulestorage which works for k8s and docker flow runners?
    k
    11 replies · 2 participants
  • x

    Xavier Babu

    05/09/2022, 2:32 PM
    Dear Prefect-community, We use Prefect Orion 2.0 with Cloud Prefect 2.0 UI and while launching beta.prefect.io with a IFrame for our Application we get the following error message: "*beta.prefect.io* refused to connect". Looks like in the Cloud Prefect setup we need to add x-frame-options:SAMEORIGIN. Since I don't have control I don't know how to resolve this issue.
    k
    14 replies · 2 participants
  • d

    Dylan

    05/09/2022, 3:40 PM
    Hey does this just mean our EKS cluster hasn’t whitelisted api.prefect.io?
    k
    12 replies · 2 participants
Powered by Linen
Title
d

Dylan

05/09/2022, 3:40 PM
Hey does this just mean our EKS cluster hasn’t whitelisted api.prefect.io?
urllib3.exceptions.MaxRetryError: HTTPSConnectionPool(host='<http://api.prefect.io|api.prefect.io>', port=443): Max retries exceeded with url: / (Caused by ReadTimeoutError("HTTPSConnectionPool(host='<http://api.prefect.io|api.prefect.io>', port=443): Read timed out. (read timeout=15)"))
k

Kevin Kho

05/09/2022, 3:42 PM
No it could be you have a big API call happening. You can increase that like by setting
prefect.context.config.cloud.request_timeout
as seen here
d

Dylan

05/09/2022, 3:50 PM
What’s the surface and size of the API calls?
And is there an environment variable for that setting? we’re running a prefect cloud agent so I’m not using that config.toml
https://github.com/PrefectHQ/prefect/blob/9dbb0e92f44654846c676d8a444acad794c33783/src/prefect/agent/kubernetes/deployment.yaml#L25-L55
Looking here for reference
k

Kevin Kho

05/09/2022, 3:56 PM
Should be
PREFECT__CLOUD__REQUEST_TIMEOUT
Size could depend on your Flow too. Do you have any GraphQL queries (could be under the hood of tasks too like
create_flow_run
?
d

Dylan

05/09/2022, 4:00 PM
I don’t have any tasks that create their own flows, pretty simple so far, flow registration runs separately from flow execution, flows executing are simple tasks (not composing tasks from or within tasks for example) but I am passing state between tasks. Would it be possible that this is an error masking an underlying connection error for a failed
prefect.engine.results.S3Result
I/O operation?
k

Kevin Kho

05/09/2022, 4:02 PM
I don’t think so because that would use the boto3 client. I think this is specifically the Prefect Client. Was this a one time thing yesterday?
There was a spike in latency but I don’t think it was an incident. Or does this happen with some regularity?
d

Dylan

05/09/2022, 4:09 PM
Seems to just be once occurrence on 5/7
k

Kevin Kho

05/09/2022, 4:42 PM
There may have been increased latency on 5/7 6pm ET so it might be a one off thing also
View count: 27