https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • a

    Amanda Wee

    11/10/2020, 1:34 AM
    Hi friends, I have a question about the use of
    prefect backend server
    . I intend to run apollo directly as a container in an ECS service, taking reference from the
    docker-compose.yml
    file that prefect provides. Consequently, what do I need to configure to make it such that the container (and of course the other containers for hasura, graphql, and towel) starts as if I had run
    prefect backend server
    followed by
    prefect server start
    ?
    k
    • 2
    • 1
  • m

    Michelle Wu

    11/10/2020, 7:16 AM
    Hi! I have several tasks running in my flow right now, I wonder how I can see the outputs of the tasks? for example, if I write a print statement in a python file, will it show up in the log?
    j
    • 2
    • 5
  • j

    Jacob Blanco

    11/10/2020, 9:33 AM
    Hi folks, one pattern that we keep running into is using Parameters to override the result of a task. For example, we want to use some value from a database or the Parameter specified by the user.
    with Flow("Update Data") as flow:
        last_export = get_last_export("my_table")
    
        update_from = Parameter("update_from", default=last_export)
    
        insert_query = parse_insert_query(update_from=update_from)
    
        run_execute_query(insert_query)
    This doesn’t work but I hope it illustrates what I’m trying to do. Is there any way to do this without having to write an intermediate task to “merge” the two inputs? I realize this pattern is wasteful by running
    get_last_export
    for no reason if parameter is provided.
    a
    j
    • 3
    • 3
  • d

    Dor Menachem

    11/10/2020, 3:47 PM
    In what situation will I have to combine Prefect and Databricks? How Spark is related to them? And what do people create with Prefect, Databricks and Spark all together?
    j
    a
    • 3
    • 3
  • d

    DJ Erraballi

    11/10/2020, 6:39 PM
    still confused on how we I would be able to transform map inputs in the following example:
    @task
    def taskA() -> List[int]: 
      return results
    with Flow() as flow:
      param = Parameter('blah')
      a_results = taskA()
      flow_run_task = FlowRunTask()
      flow_run_task.map(flow_name=unmapped('DownstreamFlow'), parameters=[{'blah': param, 'resultId': result} for result in a_results])
  • d

    DJ Erraballi

    11/10/2020, 6:49 PM
    in the example im doing the transformation with a list generator
  • d

    DJ Erraballi

    11/10/2020, 6:49 PM
    but i don’t think i can actually loop over a_results inline inthe flow definition right?
    j
    • 2
    • 2
  • m

    Mark McDonald

    11/10/2020, 9:04 PM
    Hi - I've noticed an issue recently in Prefect Cloud. If I try to change the state of a flow run, I get the following error. I just wanted to raise it as a fyi
    j
    • 2
    • 1
  • m

    Marley

    11/10/2020, 9:20 PM
    Is there a feature yet to at the time of flow registration (via Python), be able to toggle off Heartbeats, Lazarus, etc.? If not, I would kindly request it!
    j
    • 2
    • 2
  • n

    Newskooler

    11/10/2020, 9:22 PM
    👋 Hi, I am trying to unit-test my flows via a CI. How can I run a flow which has a schedule? (i.e. I need to run it once without triggering the schedule).
    j
    • 2
    • 3
  • j

    Jacob Blanco

    11/11/2020, 3:25 AM
    Hey folks, not sure if this is an issue with our settings but when setting a schedule in Cloud, the time is converted from UTC to our local timezone JST. So if I want the Flow to run at 11PM I have to set the schedule to run at the 14th hour. Is this expected behaviour?
    c
    • 2
    • 5
  • e

    Edison A

    11/11/2020, 9:13 AM
    Hey there, How can I change the port number of the prefect UI from the default
    8080
    ? I have only seen a config example for the API port. Can you help me with a link to a page that contains all the possible config params for
    config.toml
    ?
    j
    • 2
    • 2
  • g

    Greg Roche

    11/11/2020, 10:05 AM
    Hi folks, does anyone have any proof-of-concept examples of a project where there are multiple flows within the same project directory and some shared logic which the flows can import? I can get importing to work when running the flow locally with 
    flow.run()
    , but registering the flow on Prefect Server and triggering the flow from there fails because of module errors. I guess I'm just misunderstanding python's importing logic (I'm by no means an expert...) so a basic working example that I can build on would really help me get off the ground.
    a
    m
    +2
    • 5
    • 17
  • s

    simone

    11/11/2020, 3:53 PM
    Hi Thanks a lot for the great product! I am working on porting my microscopy images analysis pipeline to prefect and currently I am not able to visualise custom logs in the UI (the CloudFlowRunner logs are present). I am running prefect server on a local cluster and connect via ssh to port 8080 and 4200 to connect to the UI and the apollo server from my laptop. I also spin an agent using the specific IP
    prefect agent local start --api <http://172.22.0.5:4200>
    I get IP from the terminal printout of the `prefect server start`command. Using this setup I am able to run flows, however I am not able to catch logs. I tested different server settings in the `config.toml`file but none fixed the issue:
    [server]
     [server.ui]
     apollo_url="<http://0.0.0.0:4200/graphql>"
    
    [server]
     [server.ui]
     apollo_url="<http://localhost:4200/graphql>"
    If I understand how things work (please be patient I am a biologist 🙂 ) the issue is on my side and is caused by a mistake in the identification of the correct IP to use for setting up the system. If it is the case is there a way to predefine the IP where the services are starting? If I am wrong please let me know, any help is appreciated! Below is the toy flow I have been using to test test logging
    import prefect
    from prefect import task, Flow, Parameter, flatten, unmapped
    from prefect.engine.executors import DaskExecutor
    from prefect.utilities.debug import raise_on_exception
    from prefect.utilities.logging import get_logger
    from datetime import timedelta, datetime
    from prefect.schedules import IntervalSchedule
    
    # MOCK TASK FUNCTION TO BUY TIME
    @task(task_run_name=lambda **kwargs: f"testing-logger-writing-logs-{kwargs['x']}-suiname",log_stdout=True)
    def wlog(x):
        logger = prefect.context.get("logger")
        logger.debug('i am debugging')
        # logger = prefect_logging_setup('test')
        <http://logger.info|logger.info>(f'start sleep')
        time.sleep(20)
        <http://logger.info|logger.info>(f'done sleep')
    
    
    a = list(range(10))
    # with Flow("test_running",schedule=schedule) as flow:
    with Flow("logging-flow",environment=LocalEnvironment(DaskExecutor(address='<tcp://193.10.16.58:18938>'))) as flow:
        logger = prefect.utilities.logging.get_logger()
        <http://logger.info|logger.info>('this log is generated in the flow')
        out_task = wlog.map(a)
        <http://logger.info|logger.info>('done')
    
    flow.register(project_name="test")
    m
    r
    • 3
    • 11
  • r

    Robin

    11/11/2020, 6:07 PM
    Dear community, is there a simple
    red button
    to stop all running flows or tasks? 🔴 Our task concurrency limit indicates, that some tasks are still running, although no flow is running. So I guess there are some zombie tasks hiding somewhere, that I would like to see stopped. 😮 I am exploring the interactive API, but haven't found a quick way to just
    cancel all tasks
    ...
    j
    • 2
    • 3
  • j

    Joël Luijmes

    11/11/2020, 6:21 PM
    Hey, is there a way for a flow to depend on another flow, with a different schedule. As in, wait execution until the first flow has completed (and vice versa)? I’ve seen running dependent flows topic, but that is constructing a higher-level flow. Where I want to have different schedules. Reason I need this: I sync several databases to BQ every 20 minutes, but on another schedule (once per day), I actually process the data in BQ. The data processing can’t run while sync are still running (due inconsistent data). So I have two crons, on different schedules, on which I want to create a dependency. Basically I want to hold a lock / semaphore, such that a flow waits until another flow releases it. Are there any suggestions to achieve this, preferably with core/server version? I’m not sure, but maybe what I’m looking for is described here in the cloud docs? And might it be implemented in this PR? Or do these tags behave differently?
    ✅ 1
    j
    a
    • 3
    • 5
  • f

    fabian wolfmann

    11/11/2020, 7:27 PM
    Hi! is there a way to schedule a flow depending on a condition?
    m
    d
    • 3
    • 3
  • m

    Mark McDonald

    11/11/2020, 10:30 PM
    hi - would you all consider adding the project name to the cloudhook payload? I'm using cloudhooks to do some monitoring and it would be helpful to have the project name because we use projects to segment uat vs prod flows, as well as by teams within our org. If you're open to this, should I create an issue on the github repo?
  • m

    M Taufik

    11/12/2020, 5:19 AM
    Hey I have installed prefect server using k8s helm chart https://github.com/PrefectHQ/server/tree/helm-chart/helm/prefect-server and the services has running and how i can create prefect tenant and running k8s agent? Thank you
    d
    m
    n
    • 4
    • 12
  • j

    jamodes

    11/12/2020, 8:32 AM
    Hi, I need to run a fargate task in aws. I have followed this tutorial (https://docs.prefect.io/orchestration/execution/fargate_task_environment.html#examples) When I run this flow via flow.run(), the stdout print "Flow run SUCCESS: all reference tasks succeeded", the task does not run. What am I supposed to execute this flow?
    d
    m
    • 3
    • 10
  • d

    DJ Erraballi

    11/12/2020, 8:37 AM
    Is there a way to pull the prefect client without the rest of prefect core? Think tehre may be only the one package on pypi
    d
    j
    • 3
    • 2
  • d

    DJ Erraballi

    11/12/2020, 8:40 AM
    it seems like there may be some conflicts with cloudpickle when installing the prefect package with pipenv
    - prefect [required: ==0.11.5, installed: 0.11.5]
        - click [required: >=7.0,<8.0, installed: 7.1.2]
        - cloudpickle [required: >=0.6.0,<1.5, installed: 1.4.1]
        - croniter [required: >=0.3.24,<1.0, installed: 0.3.36]
          - natsort [required: Any, installed: 7.0.1]
          - python-dateutil [required: Any, installed: 2.8.1]
            - six [required: >=1.5, installed: 1.14.0]
        - dask [required: >=0.19.3,<3.0, installed: 2.30.0]
          - pyyaml [required: Any, installed: 5.3.1]
        - distributed [required: >=1.26.1,<3.0, installed: 2.30.1]
          - click [required: >=6.6, installed: 7.1.2]
          - cloudpickle [required: >=1.5.0, installed: 1.4.1]
          - contextvars [required: Any, installed: 2.4]
            - immutables [required: >=0.9, installed: 0.14]
          - dask [required: >=2.9.0, installed: 2.30.0]
            - pyyaml [required: Any, installed: 5.3.1]
          - msgpack [required: >=0.6.0, installed: 1.0.0]
          - psutil [required: >=5.0, installed: 5.7.3]
          - pyyaml [required: Any, installed: 5.3.1]
          - setuptools [required: Any, installed: 45.2.0]
          - sortedcontainers [required: !=2.0.1,!=2.0.0, installed: 2.3.0]
          - tblib [required: >=1.6.0, installed: 1.7.0]
          - toolz [required: >=0.8.2, installed: 0.11.1]
          - tornado [required: >=5, installed: 6.1]
          - zict [required: >=0.1.3, installed: 2.0.0]
            - heapdict [required: Any, installed: 1.0.1]
        - docker [required: >=3.4.1,<5.0, installed: 4.3.1]
          - requests [required: >=2.14.2,!=2.18.0, installed: 2.25.0]
            - certifi [required: >=2017.4.17, installed: 2020.4.5.1]
            - chardet [required: >=3.0.2,<4, installed: 3.0.4]
            - idna [required: >=2.5,<3, installed: 2.9]
            - urllib3 [required: >=1.21.1,<1.27, installed: 1.26.1]
          - six [required: >=1.4.0, installed: 1.14.0]
          - websocket-client [required: >=0.32.0, installed: 0.57.0]
            - six [required: Any, installed: 1.14.0]
        - marshmallow [required: >=3.0.0b19,<3.6.1, installed: 3.5.1]
        - marshmallow-oneofschema [required: >=2.0.0b2,<3.0, installed: 2.1.0]
          - marshmallow [required: >=3.0.0rc6,<4.0.0, installed: 3.5.1]
        - mypy-extensions [required: >=0.4.0,<1.0, installed: 0.4.3]
        - pendulum [required: >=2.0.4,<3.0, installed: 2.1.2]
          - python-dateutil [required: >=2.6,<3.0, installed: 2.8.1]
            - six [required: >=1.5, installed: 1.14.0]
          - pytzdata [required: >=2020.1, installed: 2020.1]
        - python-box [required: >=3.4.4,<5.0, installed: 4.2.3]
          - ruamel.yaml [required: Any, installed: 0.16.12]
            - ruamel.yaml.clib [required: >=0.1.2, installed: 0.2.2]
          - toml [required: Any, installed: 0.10.2]
        - python-dateutil [required: ~=2.7, installed: 2.8.1]
          - six [required: >=1.5, installed: 1.14.0]
        - python-slugify [required: >=1.2.6,<5.0, installed: 4.0.1]
          - text-unidecode [required: >=1.3, installed: 1.3]
        - pytz [required: >=2018.7, installed: 2019.3]
        - pyyaml [required: >=3.13,<5.4, installed: 5.3.1]
        - requests [required: >=2.20,<3.0, installed: 2.25.0]
          - certifi [required: >=2017.4.17, installed: 2020.4.5.1]
          - chardet [required: >=3.0.2,<4, installed: 3.0.4]
          - idna [required: >=2.5,<3, installed: 2.9]
          - urllib3 [required: >=1.21.1,<1.27, installed: 1.26.1]
        - tabulate [required: >=0.8.0,<1.0, installed: 0.8.7]
        - toml [required: >=0.9.4,<1.0, installed: 0.10.2]
        - urllib3 [required: >=1.24.3, installed: 1.26.1]
  • d

    DJ Erraballi

    11/12/2020, 8:40 AM
    I think actually problem is that prefect lib is conflicting with its own subdeps
    c
    • 2
    • 1
  • d

    DJ Erraballi

    11/12/2020, 8:41 AM
    maybe need to pin distributed and the change to cloudpickle is recent?
  • r

    Robin

    11/12/2020, 10:53 AM
    Hey prefect community, we have a somewhat philosophical question and a question of our fundamental understanding of prefect and orchestration (using dask): How do you approach the infrastructure setup, i.e. selection of node size, number of workers etc.? In a nutshell: • EKS vs Fargate vs others • fewer big nodes with high number of workers vs. many smaller nodes with lower number of workers • how many workers per node in general • how do you optimize between tasks, workers and nodes? What better questions to ask? How do you think and iterate over the infrastructure, prefect, dask setup, and so on? 🙂
    • 1
    • 1
  • j

    jamodes

    11/12/2020, 11:33 AM
    Hi all, when we regsiter a flow , always return Error while deploying flow: FileNotFoundError(2, "No such file or directory: 'prefect'").
    d
    • 2
    • 3
  • d

    Dor Menachem

    11/12/2020, 12:57 PM
    is it possible to run an agent in the cloud and not in my local machine?
    r
    d
    • 3
    • 5
  • f

    Felipe Netto

    11/12/2020, 3:00 PM
    Hi guys! I'm new to Prefect and I'm wondering if it has some built-in task to interact with ASW EMR...
    d
    • 2
    • 2
  • w

    Walt Wells

    11/12/2020, 3:27 PM
    Long time listener, first time caller. Wondering if there are any best practices for teams using Prefect Cloud to ensure that their execution environment is up to date? Are there any known issues for when Prefect Cloud moves to a new release and the execution environment a flow is running in doesn't move along side? What is Prefect's philosophy on versioning/breaking changes?
    d
    • 2
    • 5
  • j

    Joël Luijmes

    11/12/2020, 3:33 PM
    With the schedule concept, can I also set a maximum deadline for which a missed flow will ran? I just tested something while my agent wasn’t running, and now I got 40ish jobs it still will run.
    ✅ 1
    d
    • 2
    • 2
Powered by Linen
Title
j

Joël Luijmes

11/12/2020, 3:33 PM
With the schedule concept, can I also set a maximum deadline for which a missed flow will ran? I just tested something while my agent wasn’t running, and now I got 40ish jobs it still will run.
✅ 1
d

Dylan

11/12/2020, 3:38 PM
Hi @Joël Luijmes! This sort of functionality is on our roadmap 👍
j

Joël Luijmes

11/12/2020, 3:50 PM
Cool, thanks 🙂
View count: 1