https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • m

    Matt Drago

    10/27/2020, 7:38 AM
    Hey Folks, I'm looking for an example ELT or similar to extract data from an RDBMS and for subsequent runs of the extraction task to pick-up where the previous run got up to. I've come across
    States
    and
    Caching and Persisting Data
    in the doco. Would I be able to store the position that the extract got up to in one of these things, or is the
    Result
    object the right thing to use and then interrogate the Result of the last successful run of the Task?
    j
    • 2
    • 2
  • j

    John Grubb

    10/27/2020, 12:37 PM
    Hello documentation team. I may have just solved the puzzle of why sometimes the doc site will not load the page I'm requesting from the sidebar nav. I think this only happens when I've had the particular page open for a while - like since before the last docs rebuild and deploy maybe? You're using some JS client framework to serve the site and the link or content that is requested once the site is rebuilt goes stale if I haven't reloaded the page. Does this sound plausible?
    👀 4
    j
    • 2
    • 2
  • j

    Jasono

    10/27/2020, 9:08 PM
    Hi quick question about
    prefect.context
    . In my config.toml, a context variable is defined like
    foo = "202010"
    . In the task definition, it’s used like
    year = prefect.context.foo[:4]
    but it causes
    int object is not subscriptable
    error. Is there a way to make it work without doing
    str(foo[:4])
    ?
    j
    • 2
    • 9
  • h

    Hui Zheng

    10/27/2020, 10:44 PM
    Hello folks, I need some help with running a dynamic set of tasks in sequence (not in parallel). In my flow, I have a
    task_A
    , which returns a
    list_A
    . I want to create one
    task_B_i
    for each
    item i in list_A
    . However, there is a catch. those task_Bs needs to be executed in sequence, that is,
    task_B_0
    execute first, and then
    task_B_1
    follows, and then
    task_B_2
    follows. If a task_B_i failed, all the subsequence task_Bs shall be skipped. Because those
    task_Bs
    can NOT be executed in parallel, so it seems I can not use
    map()
    task_B_i
    returns an output (which is a string),
    item_b
    . So, after all task_Bs executes, I have a
    list_B
    of task_B_i_output. I then need to have a
    task_c
    , which takes
    list_B
    , the results of all task_Bs, to carry some work. which is like a reduce operation of the upstream task_Bs. I tried something like below but it fails obviously, and I have no better ideas for now.
    with Flow("dynamic tasks in sequence")
      list_A = task_A()
      for item_a in list_A:
        item_b = task_B(item_a)
    it failed with error message
    File "/app/flow.py", line 736, in <module>
        for item_a in list_A:
    TypeError: 'FunctionTask' object is not iterable
    c
    • 2
    • 14
  • d

    Dean Magee

    10/28/2020, 3:35 AM
    Hi All, I need to trigger the same flow but with 40+ different sets of parameters. Currently im doing the following...
    for unique_client in function_that_returns_list_of_40+_parameter_dicts():
        flow.run(unique_client)
    Surely there is a better way to run the same flow but with different parameters each time. Am I missing something?
    c
    • 2
    • 3
  • r

    Ralph Willgoss

    10/28/2020, 9:37 AM
    Hi, I'd be interested in chatting to anyone who's used prefect in a case where they have large amounts of intermediary data (> 1GB -> 10GB+), that needs passing between tasks
    m
    a
    • 3
    • 26
  • s

    Simo Tumelius

    10/28/2020, 12:08 PM
    Hi! I have an ETL flow that utilizes mapping to process each file in a separate branch. The transformation is done in multiple steps and in one of the steps I need to be able to access the input arguments of an upstream task. Example: extract_file -> apply_transformation_1 -> apply_transformation_2 -> apply_transformation_3 (here I need to access input arguments of apply_transformation_1) -> load. Does Prefect offer some built-in functionality for implementing this kind of a "task context" or do I need to create a custom class that contains task arguments and its results and is passed between tasks? Thanks in advance!
    j
    • 2
    • 2
  • v

    Vincent

    10/28/2020, 3:28 PM
    Hi. I was wondering if anyone has experience launching ephemeral dask clusters with hybrid worker specs. ie. launching a set of workers with GPU tagged resource and regular compute nodes. Right now we are developing a pipeline which has combines some CPU intensive tasks with GPU tasks. The Dask task affinity tags seem like a key component towards this implementation, but I don't think that the DaskKubernetesEnvironment supports workers with different specs. Any suggestions on this front
    m
    j
    j
    • 4
    • 6
  • y

    Youssef

    10/28/2020, 5:06 PM
    Hello Community I'm new to Prefect and I need your help on some newbie matters 😔 I have a local server with Prefect 0.13 running on it. Everything is running smoothly so far, the only issue I'm facing is the local agent launch. It's bounded to the shell window and I can not launch it background (I've tried some methods like adding & or &>dev/null ..) but nothing seems to be working. Do you have any idea or param I can use to ensure the agent will stay up & running after I close the shell window ? Thanks a lot community 👍
    p
    • 2
    • 3
  • p

    Philip MacMenamin

    10/28/2020, 5:16 PM
    is there an easy way to look up the current flow ID from within a flow?
    j
    c
    m
    • 4
    • 8
  • j

    josh

    10/28/2020, 6:05 PM
    Hey team, Prefect version 
    0.13.13
     has been released and here are a few notable changes: ⏰
    FlowRunTask
    can now schedule runs in the future 🤝 Upgrades to Azure tasks 🛠️ Fixes to stdout logging 🌐 Added option for
    networkMode
    in Fargate Agent 🗃️ Fixed a caching bug A big thank you to our contributors who helped out with this release! Full changelog:
    Untitled
    🚀 9
    👍 3
    :marvin: 2
  • m

    Maura Drabik

    10/28/2020, 6:22 PM
    Hello, I'm trying to register an updated flow to the cloud using Prefect version 0.12.3
    scm_validation.register(
        project_name="SCM Validation",
        build=True
    )
    it failed with this error message:
    prefect.utilities.exceptions.ClientError: [{'path': ['project'], 'message': 'field "project" not found in type: \'query_root\'', 'extensions': {'path': '$.selectionSet.project', 'code': 'validation-failed', 'exception': {'message': 'field "project" not found in type: \'query_root\''}}}]
    j
    • 2
    • 8
  • c

    Cody Vandervoort

    10/28/2020, 8:03 PM
    Hello everyone, I was hoping someone could give me some insights towards if what im trying to accomplish is possible for prefect, I want to automate image capturing/screen shots of currency pair charts on trading view on a time schedule somewhere between once every hour and once everyday.
    a
    u
    • 3
    • 4
  • h

    Hui Zheng

    10/28/2020, 9:56 PM
    I just submitted a minor change request regarding shellTask, to ask logs all stdout when
    ShellTask(return_all=True)
    https://github.com/PrefectHQ/prefect/issues/3584
    m
    • 2
    • 1
  • j

    Jeremy Knickerbocker

    10/28/2020, 10:29 PM
    Hi Everyone, can anyone give me some pointers as to why a flow that is in the default project runs just fine, but a flow in another project does not? I can schedule via the CLI or GUI, but the flow always says "Scheduled" but it is showing up late. I am using Prefect Core Server and have a local agent running.
    j
    • 2
    • 4
  • h

    Hui Zheng

    10/29/2020, 1:19 AM
    I successfully ran a flow locally. however, when I deploy the flow to prefect-cloud, I got this warning message at the
    healthcheck
    step.
    /opt/prefect/healthcheck.py:149: UserWarning: Task <Task: fetch_runnable_models> has retry settings but some upstream dependencies do not have result types. See <https://docs.prefect.io/core/concepts/results.html> for more details.
      result_check(flows)
    please see the thread for more details
    c
    • 2
    • 9
  • m

    Marwan Sarieddine

    10/29/2020, 3:24 AM
    Hi folks, I just updated our kubernetes agent and dask kuberentes execution environment to use prefect 0.13.3 - but after around 20 minutes of running a flow I am getting this traceback:
    <Failed: "Unexpected error: AttributeError("partially initialized module 'prefect' has no attribute 'schedules' (most likely due to a circular import)")">
    wondering if anyone has encountered this before ?
    c
    • 2
    • 2
  • z

    Zhiguo Yuan

    10/29/2020, 4:44 AM
    We have ~ 8 flows. All the jobs failed with "Failed after exceeding scheduled work SLA."
  • z

    Zhiguo Yuan

    10/29/2020, 4:44 AM
    Any hint on how to solve it?
    c
    • 2
    • 2
  • n

    Newskooler

    10/29/2020, 9:32 AM
    Hi 😛refect: When I run a fairly simple Flow (get data -> check if it exists -> save to a couple of places), if I map it such the recurrent Flow need to run over 15k+ times, why does it take over 60 min to start the flow task which are mapped ? I guess it’s an expected behaviour (I am running a single worker Dask executor), but I want to understand why this delay happens with the hope of optimizing it a bit. Thanks : )
    d
    n
    • 3
    • 19
  • p

    psimakis

    10/29/2020, 10:22 AM
    Hello everyone, Is there any way to apply a schedule filter to a specific clock? Check out the following example:
    schedules.Schedule(
        clocks=[
        	clocks.IntervalClock(timedelta(hours=1)), # clock 0: fire every hour
        	clocks.IntervalClock(timedelta(hours=5)), # clock 1: fire every five hours
        ],
        # but only on weekdays for the second clock
        filters=[filters.is_weekday_clock_1]
    )
    I know that filters are applied in scheduler level but it could be very handy to apply filtering on clock level. Is there any (even hackie) way to achieve this? If is not possible, is there any way to provide more that one scheduler to a flow? Thanks in advance!
    r
    n
    • 3
    • 12
  • c

    Christian

    10/29/2020, 12:21 PM
    Hi 👋 I'm currently trying to get prefect server running on my remote workstation (connected via VPN) - however, it seems that a local agent is not discovered by the gui (zero agents)... Would any of you kind souls have time for a little hand-holding to get me started? I upgraded to the newest version, switched the backend and started server and an agent I see the ui at ip-address:8080 but no agents... I also see this in the ui (which cannot be good):
    Couldn't connect to Prefect Server at <http://localhost:4200/graphql>
    d
    n
    • 3
    • 17
  • c

    Clemens

    10/29/2020, 3:21 PM
    Hi everyone, I am trying to connect prefect to a local dask cluster to test out distributed calculation of a flow that I want to register to a dask cluster in the cloud later. I followed the tutorial https://docs.prefect.io/core/advanced_tutorials/dask-cluster.html and that works great as long as everything is defined in one python file. But if I want to import any function from a second file running it results in a
    ModuleNotFoundError
    . Does anybody have any experience on how Dask is handling these imports?
    m
    n
    • 3
    • 6
  • а

    Алексей Филимонов

    10/29/2020, 4:11 PM
    Hey Folks! We would like to use prefect as our main workflow manager. However, it is not completely clear how library versioning is managed. Let's say I would like to use an old version of pandas in one flow and a new one in another. Is there any way to integrate different venv's into each flow? How do you solve this problem?
    a
    n
    m
    • 4
    • 7
  • b

    Brian Mesick

    10/29/2020, 5:35 PM
    Hi all, we’re run into an issue a few times and I’m curious if anyone can help me figure out where we are going wrong. We’ve deployed a few flows to Prefect Cloud, but whenever we try to add RETRY logic using an S3Result the container seems to fail to start, gets Lazarus kicked 3 times and dies with no other logs.
    n
    • 2
    • 12
  • n

    Nuno

    10/29/2020, 5:45 PM
    Hello guys. I need to make a class method callable into a task object. Was trying to use the utility
    as_task
    but, even with global scoped functions isn’t working as expected. Current code while testing:
    import prefect
    from prefect import task, Flow
    from prefect.utilities.tasks import as_task
    
    
    def hello_world():
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>("Hello, World!")
    
    
    flow = Flow("hello-flow", tasks=[as_task(hello_world)])
    
    
    if __name__ == "__main__":
        flow.run()
    Does anyone has a suggestion? Thank you.
    m
    • 2
    • 2
  • g

    George Coyne

    10/29/2020, 6:33 PM
    Has anybody stood up a docker agent on windows pulling from GCR?
    n
    d
    • 3
    • 9
  • g

    George Coyne

    10/29/2020, 6:34 PM
    prefect agent start docker -t TOKEN_HERE -l local_agent --volume $Env:GOOGLE_APPLICATION_CREDENTIALS:/home/service_account.json --env GOOGLE_APPLICATION_CREDENTIALS=/home/service_account.json
    
     ____            __           _        _                    _
    |  _ \ _ __ ___ / _| ___  ___| |_     / \   __ _  ___ _ __ | |_
    | |_) | '__/ _ \ |_ / _ \/ __| __|   / _ \ / _` |/ _ \ '_ \| __|
    |  __/| | |  __/  _|  __/ (__| |_   / ___ \ (_| |  __/ | | | |_
    |_|   |_|  \___|_|  \___|\___|\__| /_/   \_\__, |\___|_| |_|\__|
                                               |___/
    
    [2020-10-29 17:58:56,091] INFO - agent | Starting DockerAgent with labels ['local_agent']
    [2020-10-29 17:58:56,091] INFO - agent | Agent documentation can be found at <https://docs.prefect.io/orchestration/>
    [2020-10-29 17:58:56,092] INFO - agent | Agent connecting to the Prefect API at <https://api.prefect.io>
    [2020-10-29 17:58:56,211] INFO - agent | Waiting for flow runs...
    [2020-10-29 17:59:22,897] INFO - agent | Found 1 flow run(s) to submit for execution.
    [2020-10-29 17:59:23,071] INFO - agent | Deploying flow run 7c642664-2e1d-4599-a0a2-c4c4f9496579
    [2020-10-29 17:59:23,072] INFO - agent | Pulling image <http://gcr.io/GCLOUD_PROJECT/flows/internal-data/aggasetl2:2020-10-27t22-42-21-305568-00-00|gcr.io/GCLOUD_PROJECT/flows/internal-data/aggasetl2:2020-10-27t22-42-21-305568-00-00>...
    [2020-10-29 17:59:24,540] ERROR - agent | Logging platform error for flow run 7c642664-2e1d-4599-a0a2-c4c4f9496579
    [2020-10-29 17:59:24,846] ERROR - agent | Error while deploying flow: APIError(HTTPError('500 Server Error: Internal Server Error for url: <http+docker://localnpipe/v1.40/images/create?tag=2020-10-27t22-42-21-305568-00-00&fromImage=gcr.io%2FPROJECT%2Fflows%2Finternal-data%2Faggasetl2'>))
  • e

    Edison A

    10/29/2020, 9:13 PM
    I hope I'm posting in the right place. I need help on how to call a class method as a task. Check 👇 the thread for the Code and StackTrace.
    m
    • 2
    • 14
  • a

    Arthur Duarte

    10/29/2020, 9:38 PM
    Hi, my first message here 🙂. I am new to Prefect, and I was able to run my flow from my local PC (Windows, python 3.8). Now I am trying to build it to a Docker Storage, and I am getting this error during the health check:
    raise NotImplementedError("cannot instantiate %r on your system" NotImplementedError: cannot instantiate 'WindowsPath' on your system
    Any ideas?
    n
    m
    p
    • 4
    • 8
Powered by Linen
Title
a

Arthur Duarte

10/29/2020, 9:38 PM
Hi, my first message here 🙂. I am new to Prefect, and I was able to run my flow from my local PC (Windows, python 3.8). Now I am trying to build it to a Docker Storage, and I am getting this error during the health check:
raise NotImplementedError("cannot instantiate %r on your system" NotImplementedError: cannot instantiate 'WindowsPath' on your system
Any ideas?
full error message is here:
Step 14/14 : RUN python /opt/prefect/healthcheck.py '["/opt/prefect/flows/project01-reports.prefect"]' '(3, 8)'
 ---> Running in 52f09f649e73
Beginning health checks...
System Version check: OK
Traceback (most recent call last):
  File "/opt/prefect/healthcheck.py", line 147, in <module>
    flows = cloudpickle_deserialization_check(flow_file_paths)
  File "/opt/prefect/healthcheck.py", line 40, in cloudpickle_deserialization_check
    flows.append(cloudpickle.load(f))
  File "/usr/local/lib/python3.8/pathlib.py", line 1043, in __new__
    raise NotImplementedError("cannot instantiate %r on your system"
NotImplementedError: cannot instantiate 'WindowsPath' on your system

Removing intermediate container 52f09f649e73
The command '/bin/sh -c python /opt/prefect/healthcheck.py '["/opt/prefect/flows/project01-reports.prefect"]' '(3, 8)'' returned a non-zero code: 1
Traceback (most recent call last):
  File "flows/project01/project01_flow.py", line 400, in <module>
    my_flow.register("project01")
  File "D:\Programas\py_temp\prefect\lib\site-packages\prefect\core\flow.py", line 1620, in register
    registered_flow = client.register(
  File "D:\Programas\py_temp\prefect\lib\site-packages\prefect\client\client.py", line 734, in register
    serialized_flow = flow.serialize(build=build)  # type: Any
  File "D:\Programas\py_temp\prefect\lib\site-packages\prefect\core\flow.py", line 1458, in serialize
    storage = self.storage.build()  # type: Optional[Storage]
  File "D:\Programas\py_temp\prefect\lib\site-packages\prefect\environments\storage\docker.py", line 351, in build
    self._build_image(push=push)
  File "D:\Programas\py_temp\prefect\lib\site-packages\prefect\environments\storage\docker.py", line 417, in _build_image
    raise ValueError(
ValueError: Your docker image failed to build!  Your flow might have failed one of its deployment health checks - please ensure that all necessary files and dependencies have been included.
n

nicholas

10/29/2020, 9:39 PM
Hi @Arthur Duarte, welcome! 👋
This sounds like it's related to the way Core is pickling your flow as it's trying to build - I'm going to open an issue on the Core repo because they'll be better able to triage this.
🙌 1
@Marvin open "Issue with Docker storage - Cannot instantiate 'WindowsPath' on your system"
m

Marvin

10/29/2020, 9:43 PM
https://github.com/PrefectHQ/prefect/issues/3595
a

Arthur Duarte

10/29/2020, 9:45 PM
Thanks Nicholas!
p

Pedro Machado

11/06/2020, 9:07 PM
Hi Nicholas. Do you know if the team has looked into this? Are there any workarounds you can think of?
n

nicholas

11/06/2020, 9:55 PM
Hi @Pedro Machado - no I'm not sure, if you're seeing the same could you also post on the ticket to bump up its priority for the Core team?
View count: 1