https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • b

    Bose

    03/30/2021, 9:45 PM
    Hello all. I’m new to prefect and I’m trying to figure out an optimal way to deploy Prefect on Google Cloud Platform in a serverless architecture. I’ve come across a lot of material for AWS but nothing on GCP. Would be grateful if someone could share. I aim to use Prefect Cloud
    m
    • 2
    • 1
  • r

    Rob Fowler

    03/31/2021, 3:16 AM
    I have searched and searched for this one. I have dev/test/prod I write my flow, build into into a docker image and run it on the dev prefect server instance. No worries. What I want to do is promote that docker image and register it with the test and later the prod prefect server.
  • r

    Rob Fowler

    03/31/2021, 3:17 AM
    Prod does not really have much in it. Ideally I'd just like to use the command line to tell prefect server to load up the flow in the existing docker container without the flow present outside of it.
    j
    • 2
    • 16
  • x

    xyzy

    03/31/2021, 3:44 AM
    Is there a library that allows you to write flows like this:
    chain(
            extract(from_date, to_date, base_currency), 
            transform, 
            load
        )
    Instead of this:
    extract_result = extract(from_date, to_date, base_currency)
        transform_result = transform(extract_result)
        load(transform_result)
    ?
    • 1
    • 2
  • r

    Ranu Goldan

    03/31/2021, 7:58 AM
    Hi everyone, I can see that Flow can have multiple schedules, but the schedule on/off toggle attached to the Flow. Is there any way to pause only 1 of multiple schedule?
    n
    • 2
    • 3
  • k

    kumar

    03/31/2021, 10:55 AM
    Hello Team, Hope you guys are doing good. By using SnowflakeQuery we can pass one query per task, Is there any way to create task for Snowflake sql file.
    k
    l
    m
    • 4
    • 28
  • t

    Tim Pörtner

    03/31/2021, 11:50 AM
    Hello everyone! I'm currently playing around with the artifacts. I'm wondering if i can give artifacts a custom name. does anybody know if it's possible and how to do it?
    e
    m
    • 3
    • 7
  • d

    Daniel Black

    03/31/2021, 2:30 PM
    Hello! I am trying to configure logging that will be integrated with AWS Cloudwatch. I currently have standard logging but I am trying to figure out how to set up the logging so that only ERROR and CRITICAL logs get passed to AWS Cloudwatch. I still need the standard logging but do not want Cloudwatch to include all logs. I know i can add extra loggers but I am confused how to set up both standard and error/critical logging together
    m
    • 2
    • 2
  • r

    Ria May Dewi

    03/31/2021, 3:11 PM
    Hi everyone I have 2 scheduler time in a flow, how to show all scheduler time via graphql? Because when i run this graphql, only show the read-only scheduler.
    {
      flow{
        id
        name
        schedule
      }
    }
    And the result is:
    {
            "id": "86f5fb6f-26c9-4ffe-9961-f28e5538b53b",
            "name": "spark",
            "schedule": {
              "type": "Schedule",
              "clocks": [
                {
                  "cron": "0 1 * * *",
                  "type": "CronClock",
                  "day_or": true,
                  "labels": null,
                  "end_date": null,
                  "start_date": null,
                  "__version__": "0.14.13",
                  "parameter_defaults": {}
                }
              ],
              "filters": [],
              "or_filters": [],
              "__version__": "0.14.13",
              "adjustments": [],
              "not_filters": []
            }
    n
    j
    • 3
    • 8
  • l

    liren zhang

    03/31/2021, 3:27 PM
    Hi experts, I am working on using startFlowRun to allow parent/child relationship flows. I need to pass the result from one child flow to another like the following example :
    @task
    def get_parameters():
    	.....
    	return {}
    	
    flow_a = StartFlowRun(flow_name="flow_a", project_name="sample", wait=True)
    flow_b = StartFlowRun(flow_name="flow_b", project_name="sample", wait=True) 
    
    
    
    with Flow("parent-flow") as flow:
        date_to_process = Parameter("date_to_process", default=None)
        vp = get_parameters(date_to_process)   
    
        
        flow_a = flow_a(upstream_tasks=[vp], parameters=vp)
        flow_b = flow_b(upstream_tasks=[flow_a])
    I was able to do these successfully; but I am NOT sure how I can use the parameter in the dependent flows. In this case, I have a get_parameters() task to compose the parameters that I need to pass down to dependent flow_a. How do I receive/use the parameter I passed in from
    flow_a = flow_a(upstream_tasks=[vp], parameters=vp)
    ? Let's just say, I want to print out the parameters I pass into flow_a. What do I need to do in flow_a.py to reference the passed in parameter?
    k
    • 2
    • 18
  • d

    Daniel Caldeweyher

    03/31/2021, 4:10 PM
    Has anyone had the problem when a scheduled flow, just stop scheduling?
    weekly = CronSchedule("15 1 * * *", start_date=DEFAULT_START_DATE)
    with Flow("Daily Extract", schedule=weekly) as flow:
       ...
    This flow was happily running at the scheduled time at 1:15 a.m. every day, except now it just stopped getting scheduled. Curiously today is the first day of the next month....
    n
    • 2
    • 33
  • a

    Andrew Hannigan

    03/31/2021, 4:19 PM
    How are mapped tasks billed in new model? As one task or as the size of the mapped task list?
    n
    • 2
    • 6
  • r

    Ryan Baker

    03/31/2021, 4:23 PM
    What scopes does the Github personal access token need to have for using the Github storage for registering flows in Prefect Cloud?
    j
    • 2
    • 4
  • h

    Hui Zheng

    03/31/2021, 4:39 PM
    Hello, Prefect, this morning Perfect-cloud seems having a glitch and didn’t create jobs on schedule. We have two flows scheduled at 9:20AM didn’t get its run created until 9:29AM PST. please see thread for details.
    n
    j
    • 3
    • 5
  • m

    Mitchell Bregman

    03/31/2021, 4:41 PM
    Hey there! I noticed an extensive conversation around using the
    ECSRun
    run configuration, with
    ECSAgent
    and
    Docker
    storage. I am running into a similar issue - not entirely sure if there is a workaround currently, but here is the error I cannot get past:
    [2021-03-31 15:09:51,907] ERROR - agent | Error while deploying flow: InvalidParameterException('An error occurred (InvalidParameterException) when calling the RunTask operation: Task definition does not support launch_type FARGATE.')
    My setup:
    # flow.py
    
    import prefect
    from prefect import Flow, task
    
    @task
    def say_hello():
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>("Got here!!!!")
    
    with Flow("Test") as flow:
        say_hello()
    --
    # deploy.py
    
    flow.run_configs = ECSRun(
        run_task_kwargs={'cluster': 'my-cluster'},
        execution_role_arn='arn:aws:iam::{ACCOUNT_NUMBER}:role/my-ecs-task-role',
        labels=['ecs']
    )
    
    flow.storage = Docker(
        env_vars=config.ENVIRONMENT_VARIABLES,
        extra_dockerfile_commands=[
            f"RUN pip install -e /service",
        ],
        files={f"{os.path.join(os.path.expanduser('~'), 'project')}": "/service"},
        image_name=config.DOCKER_IMAGE_NAME,
        image_tag=config.DOCKER_IMAGE_TAG,
        registry_url=config.DOCKER_REGISTRY_URL,
    )
    Here is how I am running the agent:
    prefect agent ecs start --token {PREFECT_RUNNER_TOKEN} --cluster my-cluster --label ecs --launch-type FARGATE
    My issue is, I have no idea if I am doing something wrong on the AWS side of things or not; I am trying to wrap my head around the root cause. I’m also noticing there are
    task_definitions
    and
    task_role_arn
    parameters and I’m unsure if I am expected to use them or not, where to set those up. Any insight would be extremely appreciated!
    j
    • 2
    • 11
  • m

    Matic Lubej

    03/31/2021, 4:49 PM
    Hi there prefect people! Here is a really weird issue which didn't seem to happen in the past, but now we've noticed it and we're not sure what to think of it. Here is a simple flow with simple tasks and mapping, but somehow the mapped output in the last task has a different length than the input, but everything is successful in the flow. For example, if you check the image below, you can see the difference in number of outputs, but this is unexpected... Here is the MWE for the simple flow: https://pastebin.com/1hGeYHjV
    j
    • 2
    • 2
  • n

    Nicholas Chammas

    03/31/2021, 4:56 PM
    New Prefect user here. I’m trying to understand the difference between static and dynamic Task arguments, as mentioned in this user warning: https://github.com/PrefectHQ/prefect/blob/553544ca25e454140dda363c8f8ca648059a874c/src/prefect/core/task.py#L154-L156 The warning points to a part of the docs that mentions neither the word “static” nor the word “dynamic”, so I’m not sure what the authors really mean when they use these words.
    j
    a
    • 3
    • 5
  • k

    Konstantinos

    03/31/2021, 5:15 PM
    Hello, I d like to use a custom function for a filter in a schedule's clock. using my own lambda or script defined function throws an error of serialization: prefect/utilities/serialization.py", line 397, in _serialize raise ValidationError("Invalid function reference: {}".format(value)) marshmallow.exceptions.ValidationError: Invalid function reference: <function mon_sat_day_filter at 0x7f35e22d78b0> Is there a way around this ? Is there away to interpret a list of filter funcs with OR condition rather than AND ? For reference, Iam trying to run a job Mon - Sat, so an OR condition on a list of is_day_of_week filter func would do. Thank you!
  • r

    Rey Marin

    03/31/2021, 5:49 PM
    Hello! Is there a way to search for a flow run by the run’s parameters? Either through the UI or through GraphQL
  • c

    Carter Kwon

    03/31/2021, 6:14 PM
    Hello, I'm not sure if my account is acting up or if there's a bug, but I'm unable to delete a secret in my account with the name
    test/
    . I was trying to see if a secret could be named
    test/this/path
    and after adding it
    test/
    was the only part that stayed. Now it can't be deleted.
    m
    • 2
    • 4
  • m

    matta

    03/31/2021, 10:42 PM
    So, I have a flow that often hangs and doesn't "complete" even when the Reference tasks is done. I see this error message come up in the logs:
    RuntimeError: cannot schedule new futures after shutdown
    Not sure what to do? I don't want to manually have to set its state to "Success"
    • 1
    • 1
  • b

    Brian Keating

    04/01/2021, 2:11 AM
    I'm having trouble running a simple "hello world" flow on a temporary dask
    EC2Cluster
    . I'm new to dask. The relevant part of my flow script is:
    flow.executor = DaskExecutor(
        cluster_class='dask_cloudprovider.aws.EC2Cluster',
        cluster_kwargs={'n_workers': 2, 'docker_image': 'prefecthq/prefect', 'debug': True}
    )
    flow.run()
    This fails with
    FlowRunner: ClientError('An error occurred (InvalidParameterValue) when calling the RunInstances operation: User data is limited to 16384 bytes')
    . The issue is that the docker run command looks like this:
    docker run --net=host   prefecthq/prefect env DASK_INTERNAL_INHERIT_CONFIG="a_very_very_long_string" python -m distributed.cli.dask_scheduler
    , so I guess the command winds up being too long. Does anyone know a workaround for this issue?
    j
    • 2
    • 7
  • r

    Ranu Goldan

    04/01/2021, 5:55 AM
    Hi everyone, I have a question I have 2 mysql db (A & B) and want to ingest it to bigquery. I only want to have 1 Flow -> mysql_to_bigquery I planned to use multiple schedule, schedule A with db A default parameters, schedule B with db B default parameters. And next, I want to these 2 db ingested with different KubeAgent by label. Is there any way to assign label to the schedule?
    k
    • 2
    • 2
  • v

    Varun Joshi

    04/01/2021, 5:59 AM
    Hi Prefecters, does anyone know the command to delete a flow? Or can it be done only using the UI?
    a
    j
    m
    • 4
    • 16
  • j

    Jeremy Tee

    04/01/2021, 6:27 AM
    Hi everybody, i am wondering for the
    transform task
    , it is expected to fail for parameter
    "a"
    , but when i rerun from fail, what is the expected flow? Will it rerun for all cases
    [3,1,"a"]
    or skip
    3,1
    , and only rerun
    a
    @task
    def extract():
        return [3, 1, "a"]
    
    
    @task
    def transform(x):
        y = x + 1
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(y)
        return y
    
    
    with Flow("failure-flow-test", executor=LocalExecutor(), run_config=LocalRun()) as flow:
        e = extract()
        t = transform.map(e)
    m
    j
    • 3
    • 3
  • j

    Jacob Blanco

    04/01/2021, 7:20 AM
    I’ve got a really strange thing happening with Cloud: some of our schedules have stopped working. 1. Schedules that have been setup in code were working fine for months 2. Last week we setup one more schedule in Cloud directly 3. Now only the schedules that we setup in Cloud are working
    n
    • 2
    • 5
  • x

    xyzz

    04/01/2021, 9:31 AM
    If you are using results e.g. S3Result, how do you debug the content of the generated files without actually running the flow that is using them?
    n
    • 2
    • 1
  • v

    Vladimir Bolshakov

    04/01/2021, 1:17 PM
    Hello! I have a question. How to get result of
    Resume
    state passed via
    Client.set_task_run_state
    method to paused task in task runtime? When i try to use
    Client.get_task_run_state
    in task i found that state in task runtime is
    Running
    (after
    Resume
    and after
    Submitted
    ). So passed result to
    Resume
    state is not accessible. Have any ideas?
    ✅ 1
    m
    • 2
    • 12
  • h

    Hawkar Mahmod

    04/01/2021, 1:23 PM
    Short and sweet I hope. But it isn’t clear to me from the docs where data is cached output data is persisted when using Cloud backend. I know locally it’s stored in memory, and you cannot use the cache locally unless you make use of a backend API, but does this mean that the output data is actually stored in the backend - Cloud or Core server? If so, does this not violate the principle that no data flow data should be stored on the API side?
    e
    m
    j
    • 4
    • 12
  • l

    liren zhang

    04/01/2021, 2:43 PM
    where can I find a document on detailed feature comparison for each different Prefect package(starter, standard and Enterprise)? https://www.prefect.io/pricing
    m
    • 2
    • 1
Powered by Linen
Title
l

liren zhang

04/01/2021, 2:43 PM
where can I find a document on detailed feature comparison for each different Prefect package(starter, standard and Enterprise)? https://www.prefect.io/pricing
m

Michael Adkins

04/01/2021, 3:28 PM
Hey @liren zhang - we're currently QA-ing a UI release that will include a nice page for this.
👍 1
View count: 2