https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • h

    Hui Zheng

    12/11/2020, 1:41 AM
    Hello, we have a task with a timeout limit, Timeout limit has been working, however, we had an incident today that the task has been running way over its limit but didn’t timeout. We use GKE cluster for agent and flow runs in docker container. Our Prefect version:
    prefecthq/prefect:0.13.15-python3.8
    the flow run: https://cloud.prefect.io/semios/flow-run/19fcc78a-9442-48ef-8228-a9c3db18e341 Please see the threads for more details.
    m
    • 2
    • 10
  • m

    Michal Mucha

    12/11/2020, 12:21 PM
    Hi! Perhaps silly question - I was so excited to see my first flow work out very well on Prefect Cloud, that I went on to register a second one - what I found is, it archives and overwrites the first flow. When I try to register the first flow, it again archives and overwrites. What am I doing wrong? I am creating flows using the with: context manager, I am calling them different names and even separate "projects". The flows come from different files. I went through the docs and couldn't find any note on that
    e
    m
    • 3
    • 13
  • s

    Sébastien

    12/11/2020, 3:53 PM
    If I
    time.sleep(x)
    right before
    with Flow():
    , will the run respect that sleep before spinning up the cluster, or does a run work differently (e.g. only runs code inside the flow's task graph)?
    n
    • 2
    • 24
  • j

    jspeis

    12/11/2020, 6:20 PM
    Hello! I’ve been experimenting with prefect locally and I am interested in setting up a simple docker/k8s configuration to boot up server / agent in the cloud and point to external DB. Before I put something together, just wanted to check if anyone had already done something similar?
    m
    • 2
    • 3
  • a

    Andrey Tatarinov

    12/11/2020, 7:02 PM
    Hi! So I have an external API with pagination, that provides some results with a call and a link to next page of results. Total results (all chunks combined) of this API are huge and do not fit in memory. Ideally I would make a task which pulls that API a generator so that I can start mapping its results before pulling complete. Is it possible to achieve that in Prefect?
    n
    • 2
    • 17
  • s

    Sanjay Patel

    12/11/2020, 10:50 PM
    Hi, I'm trying to understand how to use StartFlowRun as per Running dependent flows | Prefect Docs but can't get my head around the concept of passing parameters from one flow into the next. the 'Running a parametrized flow' example here doesn't use a flow as an input. I've done a few searches but can't find exactly what I am looking for. this is one example I was looking at but it doesn't give enough detail
    with Flow('first') as flow:
        a = task_a()
        b = task_b(a)
        c = task_c(b)
        StartFlowRun(flow_name='second', project_name=...)(parameters={"input"=c})
    this is the closest to what I am trying to do but the response ended with using state_result.
    with Flow('first') as flow:
        a = task_a()
        b = task_b(a)
        c = task_c(b)
        
    with Flow('second') as flow:
        param = Parameter('input')
        d = task_d(param)
        
    # How to do something like this:
    flow_a = StartFlowRun(flow_name="first", project_name="examples", wait=True)
    flow_b = StartFlowRun(flow_name="second", project_name="examples", wait=True)
    with Flow('total') as flow:
        a = flow_a()
        b = flow_b()(upstream_tasks=[a], parameters={'input': a.d})
    I have hte option of changing one of my flows so it produces the output I need for the next flow. so 2 questions • how does the output of flow_a get referenced into flow b • how do you access the parameter input that comes from flowa into flowb thank you!
    n
    j
    • 3
    • 7
  • a

    Aiden Price

    12/12/2020, 12:23 AM
    Hi folks, when I try to add an
    apply_map()
    to my flow I get this error;
    ValueError: Cycle found; flows must be acyclic!
    Unfortunately the stack trace doesn't provide any hints as to where I've accidentally created a cycle in the graph. I've done some comment-driven-debugging and found that the problem is related to a case statement in the function I'm attempting to
    apply_map()
    . I've also done
    flow.visualize()
    and it does output a picture but because it's a big flow it's hard to spot the cycle in there. Does anyone have any hints on how to better debug cycles in flow graphs? Thanks in advance.
    n
    • 2
    • 8
  • v

    Vincent

    12/12/2020, 2:49 AM
    Hi all, I was wondering if someone could help me identify why only part of my flow executes in parallel. As shown in this flow diagram, 5/8 tasks have been mapped, while 3 are still pending. I am sure that I started enough Dask worker nodes to process the compute, but these jobs are stuck pending. Thanks for the advice !
    m
    • 2
    • 2
  • a

    Andreas Jung

    12/14/2020, 11:04 AM
    Just started with Prefect, wrote this example script to measure the execution speed. The script contains 4 tasks where only the "parse" task is doing some real processing on the data (taking less than 1ms). However the over all execution speed of the whole flow is always between 1000ms and 1500ms...why is this? There such a huge overhead in the underlying task scheduler/executor?
    import time 
    from base64 import b64decode 
     
    from prefect import Flow, task 
     
    import reportparser 
     
     
    @task 
    def fetch_message(): 
     
       ts = time.time() 
       report_data = b64decode( 
           b"ew4AAAAAAADmAgAABgAAAQAAH+EH3QAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" 
           b"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAIBPwgQAA4AAAAA" 
       ) 
       print("fetch  ", time.time()) 
       print("fetch  ", time.time() - ts) 
       return report_data 
     
     
    @task 
    def classify_message(msg): 
       msg_type = 0 
       ts = time.time() 
       print("classify", time.time()) 
       print("classify", time.time() - ts) 
       return msg_type 
     
     
    @task 
    def parse_message(msg, msg_type): 
       ts = time.time() 
       result = reportparser.parse(msg) 
       print("parse  ", time.time()) 
       print("parse  ", time.time() - ts) 
       return result 
     
     
    @task 
    def deliver_message(result): 
       ts = time.time() 
       print("deliver ", time.time()) 
       print("deliver ", time.time() - ts) 
     
     
    def main(): 
     
       with Flow("reportparser") as flow: 
           msg = fetch_message() 
           msg_type = classify_message(msg) 
           result = parse_message(msg, msg_type) 
           deliver_message(result) 
     
       for i in range(20): 
           print() 
           ts = time.time() 
           flow.run() 
           print("total", time.time() - ts) 
     
     
    if __name__ == "__main__": 
        main()
      Execution time (absolute time, relative time per task):
    fetch   1607943883.0552974
    fetch   0.0001933574676513672
    classify 1607943883.3690984
    classify 0.00015401840209960938
    parse   1607943883.6593442
    parse   0.0009167194366455078
    deliver 1607943883.9757109
    deliver 0.0001609325408935547
    total 1.3962466716766357
    m
    • 2
    • 2
  • c

    Christian

    12/14/2020, 11:26 AM
    Hi there. Question about the new
    RunGreatExpectationsValidation
    ... I always get a
    FAIL
    state returned from executing it and no artifacts page. Is it to be expected that only a successful validation produces the markdown artifacts page? I'm fighting with several aspects of my setup and it would be good to be able to inspect what the GE component is actually doing... Also, in general, I'd want to see the failed validation results... Thanks
    j
    • 2
    • 2
  • a

    Adam

    12/14/2020, 12:05 PM
    Hi team, would like to upgrade my Prefect Cloud to the paid version. It says I should ‘Contact Us’. Can someone help me?
    d
    • 2
    • 3
  • m

    Marc Lipoff

    12/14/2020, 2:42 PM
    I'm getting -- what I am sure is -- a silly error. I'm trying to run a simple flow on ECS. I get this. Any ideas about what the cause is?
    k
    • 2
    • 3
  • m

    Marc Lipoff

    12/14/2020, 2:42 PM
  • a

    Andrey Tatarinov

    12/14/2020, 7:14 PM
    Question about caching behaviour: I'm in the process of active development of a certain flow. Flow runs with K8s agent packaged in Docker. There's one task that takes a lot of time. Task is decorated with
    result=GCSResult
    and
    cache_for=timedelta(hours=1)
    . I notice, that when I'm not rebuilding docker image Prefect is respecting cache, i.e. second run goes much faster than first. But it seems that each rebuild of an image invalidates cache. Q: is it true? How can I get more insight on how caching works?
    k
    • 2
    • 8
  • w

    Will Milner

    12/14/2020, 8:24 PM
    Hi, I'm seeming to have trouble to get any stdout when using a docker agent. This should show something in the logs of the flow run correct?
    task = ShellTask(return_all=True, log_stdout=True, log_stderr=True, stream_output=True)
    with Flow("test") as flow:
       print_test = task(command="echo hi")
    I except to see
    hi
    in my prefect logs but I see nothing. I have my logging level set to DEBUG
    k
    • 2
    • 4
  • l

    Levi Leal

    12/14/2020, 8:28 PM
    $ kubectl -n agent logs prefect-job-2d655498-sqqlc
    [2020-12-14 20:20:37+0000] INFO - prefect.S3 | Downloading slow-flow/2020-12-07t12-16-09-863598-00-00 from lime-prefect
    Unable to locate credentials
    Traceback (most recent call last):
      File "/usr/local/bin/prefect", line 8, in <module>
        sys.exit(cli())
      File "/usr/local/lib/python3.8/site-packages/click/core.py", line 829, in __call__
        return self.main(*args, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/click/core.py", line 782, in main
        rv = self.invoke(ctx)
      File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
        return _process_result(sub_ctx.command.invoke(sub_ctx))
      File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
        return _process_result(sub_ctx.command.invoke(sub_ctx))
      File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1066, in invoke
        return ctx.invoke(self.callback, **ctx.params)
      File "/usr/local/lib/python3.8/site-packages/click/core.py", line 610, in invoke
        return callback(*args, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/prefect/cli/execute.py", line 34, in flow_run
        return _execute_flow_run()
      File "/usr/local/lib/python3.8/site-packages/prefect/cli/execute.py", line 99, in _execute_flow_run
        raise exc
      File "/usr/local/lib/python3.8/site-packages/prefect/cli/execute.py", line 84, in _execute_flow_run
        flow = storage.get_flow(storage.flows[flow_data.name])
      File "/usr/local/lib/python3.8/site-packages/prefect/environments/storage/s3.py", line 105, in get_flow
        self._boto3_client.download_fileobj(
      File "/usr/local/lib/python3.8/site-packages/boto3/s3/inject.py", line 678, in download_fileobj
        return future.result()
      File "/usr/local/lib/python3.8/site-packages/s3transfer/futures.py", line 106, in result
        return self._coordinator.result()
      File "/usr/local/lib/python3.8/site-packages/s3transfer/futures.py", line 265, in result
        raise self._exception
      File "/usr/local/lib/python3.8/site-packages/s3transfer/tasks.py", line 255, in _main
        self._submit(transfer_future=transfer_future, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/s3transfer/download.py", line 340, in _submit
        response = client.head_object(
      File "/usr/local/lib/python3.8/site-packages/botocore/client.py", line 357, in _api_call
        return self._make_api_call(operation_name, kwargs)
      File "/usr/local/lib/python3.8/site-packages/botocore/client.py", line 662, in _make_api_call
        http, parsed_response = self._make_request(
      File "/usr/local/lib/python3.8/site-packages/botocore/client.py", line 682, in _make_request
        return self._endpoint.make_request(operation_model, request_dict)
      File "/usr/local/lib/python3.8/site-packages/botocore/endpoint.py", line 102, in make_request
        return self._send_request(request_dict, operation_model)
      File "/usr/local/lib/python3.8/site-packages/botocore/endpoint.py", line 132, in _send_request
        request = self.create_request(request_dict, operation_model)
      File "/usr/local/lib/python3.8/site-packages/botocore/endpoint.py", line 115, in create_request
        self._event_emitter.emit(event_name, request=request,
      File "/usr/local/lib/python3.8/site-packages/botocore/hooks.py", line 356, in emit
        return self._emitter.emit(aliased_event_name, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/botocore/hooks.py", line 228, in emit
        return self._emit(event_name, kwargs)
      File "/usr/local/lib/python3.8/site-packages/botocore/hooks.py", line 211, in _emit
        response = handler(**kwargs)
      File "/usr/local/lib/python3.8/site-packages/botocore/signers.py", line 90, in handler
        return self.sign(operation_name, request)
      File "/usr/local/lib/python3.8/site-packages/botocore/signers.py", line 162, in sign
        auth.add_auth(request)
      File "/usr/local/lib/python3.8/site-packages/botocore/auth.py", line 357, in add_auth
        raise NoCredentialsError
    botocore.exceptions.NoCredentialsError: Unable to locate credentials
    Anyone who managed to make EKS Fargate work with prefect agents?
    k
    • 2
    • 6
  • l

    Levi Leal

    12/14/2020, 8:28 PM
    I'm using EKS to deploy a prefect server. I had a nodegroup and when I added one agent and it worked fine. I could start flows and the agent were getting them and executing. My flows are hosted on S3. I decided to add the agents to a fargate profile so the k8s nodes are created on demand. I created a FargateProfile that "queries" the namespace 'agent' and when I deploy an agent EKS creates a fargate node for the agent. The fargate agent is able to retrieve flows to be executed and create new fargate nodes with jobs to execute the flow. My problem here is that this new fargate node is unable to connect to S3. This is the log on the job:
  • b

    Berty

    12/14/2020, 11:00 PM
    Hi Prefect(ers?), I stumbled across a piece of documentation I needed last week but I can't seem to locate now. I have a task that returns a tuple, from which I only want to pass in the first item to the task that consumes it. I recall seeing some way of telling prefect how many results to expect. Can someone point me in the right direction? e.g.
    with Flow('...') as f:
       t = task(some_func) # <-- returns tuple
       c = task(consume_func, keyword_tasks={'config': t[0]})
    m
    • 2
    • 3
  • d

    DJ Erraballi

    12/14/2020, 11:34 PM
    2020-12-12T16:53:10.075-08:00	[2020-12-13 00:53:10] ERROR - prefect.CloudTaskRunner | Failed to set task state with error: ClientError([{'path': ['set_task_run_states'], 'message': 'Unable to complete operation', 'extensions': {'code': 'API_ERROR'}}],)
    
    2020-12-12T16:53:10.075-08:00	Traceback (most recent call last):
    
    2020-12-12T16:53:10.075-08:00	File "/usr/local/lib/python3.6/site-packages/prefect/engine/cloud/task_runner.py", line 123, in call_runner_target_handlers
    
    2020-12-12T16:53:10.075-08:00	cache_for=self.task.cache_for,
    
    2020-12-12T16:53:10.075-08:00	File "/usr/local/lib/python3.6/site-packages/prefect/client/client.py", line 1104, in set_task_run_state
    
    2020-12-12T16:53:10.075-08:00	version=version,
    
    2020-12-12T16:53:10.075-08:00	File "/usr/local/lib/python3.6/site-packages/prefect/client/client.py", line 226, in graphql
    
    2020-12-12T16:53:10.075-08:00	raise ClientError(result["errors"])
    
    2020-12-12T16:53:10.075-08:00	prefect.utilities.exceptions.ClientError: [{'path': ['set_task_run_states'], 'message': 'Unable to complete operation', 'extensions': {'code': 'API_ERROR'}}]
    
    2020-12-12T16:53:10.075-08:00	ERROR:prefect.CloudTaskRunner:Failed to set task state with error: ClientError([{'path': ['set_task_run_states'], 'message': 'Unable to complete operation', 'extensions': {'code': 'API_ERROR'}}],)
    
    2020-12-12T16:53:10.075-08:00	Traceback (most recent call last):
    
    2020-12-12T16:53:10.075-08:00	File "/usr/local/lib/python3.6/site-packages/prefect/engine/cloud/task_runner.py", line 123, in call_runner_target_handlers
    
    2020-12-12T16:53:10.075-08:00	cache_for=self.task.cache_for,
    
    2020-12-12T16:53:10.075-08:00	File "/usr/local/lib/python3.6/site-packages/prefect/client/client.py", line 1104, in set_task_run_state
    
    2020-12-12T16:53:10.075-08:00	version=version,
    
    2020-12-12T16:53:10.075-08:00	File "/usr/local/lib/python3.6/site-packages/prefect/client/client.py", line 226, in graphql
    
    2020-12-12T16:53:10.075-08:00	raise ClientError(result["errors"])
    
    2020-12-12T16:53:10.075-08:00	prefect.utilities.exceptions.ClientError: [{'path': ['set_task_run_states'], 'message': 'Unable to complete operation', 'extensions': {'code': 'API_ERROR'}}]
    
    2020-12-12T16:53:10.087-08:00	[2020-12-13 00:53:10] INFO - prefect.CloudTaskRunner | Task 'aggregate_patients': finished task run for task with final state: 'ClientFailed'
    Ran into this yesterdsay, seemed intermittent but is thsi due to prefect cloud being down
    👀 1
    c
    • 2
    • 1
  • j

    jars

    12/15/2020, 1:29 AM
    Hello, are there plans for Prefect.io to allow the restoration of an archived flow? This doesn't seem to be an option on the UI (please let me know if I'm missing it!) Some context: we deployed a flow into our development environment last week that ran successfully locally, but then failed in the cloud. If this had happened in our production flows, we would have had a broken flow until the issue was successfully debugged locally, and resolved (this could take hours). If we could just have a flow "rollback" feature from the UI, that would be great. An aside, but the issue btw came down to two things: (1) some project dependencies and (2) we found an experimental feature in Docker called gRPC FUSE on mac computers that needed to be turned off.
    k
    • 2
    • 2
  • h

    Hui Zheng

    12/15/2020, 2:21 AM
    Hello, Could someone explain more about
    Version Locking
    is it saying enforcing the assertion that a submitted/triggered flow run would run once and only once?
    <https://docs.prefect.io/orchestration/concepts/flows.html#toggle-version-locking>
    j
    • 2
    • 2
  • r

    Robert Bastian

    12/15/2020, 3:08 PM
    Hi Prefect Community. I have a question related to the "right" way to do things in Prefect. Here is an example task using Postgres and Secrets. It works but doesn't seem "correct" to me since both PostgresFetch and Secrets are subclasses of Task. Any advice would be appreciated!
    def get_record_count():
        logger = prefect.context.get("logger")
        pg = PostgresFetch(host='<http://hostname.us-east-1.rds.amazonaws.com|hostname.us-east-1.rds.amazonaws.com>', db_name='testdatabase', user='test')
        pg.query="select count(*) from testdatabase.test_table"
        secret = PrefectSecret('TEST_PASSWORD')
        password = secret.run()
        result = pg.run(password=password)
        return result[0]
    j
    • 2
    • 4
  • r

    Rolf Schick

    12/15/2020, 3:21 PM
    Hi. I’m new to Prefect and try to deploy a GreatExpectations environment into my docker/Kubernetes deployment. Unfortunately the only way I could get files into the container until now was by using the “files” parameter in the “Docker” storage class. To fill my container this way seems a bit odd to me. Is there another way I can just COPY my files into my container environment?
    j
    • 2
    • 10
  • s

    Shawn Marhanka

    12/15/2020, 5:11 PM
    Hi Prefect! On Prefect Cloud, is there a limit to how many runs you can have in scheduled state or queued up to be ran by the next available agent (we have 2 slots). My co-worker tried to queue 80 runs last night and said not all of them made it. Thank you!
    j
    • 2
    • 5
  • r

    Raghav Thirumulu

    12/15/2020, 6:00 PM
    hi everyone! i'm attempting to get DBT shell tasks running via prefect. attaching my code below. anyone have any ideas on what i'm getting wrong? i can confirm that the
    profiles.yml
    was created correctly. im able to run dbt commands with the newly created
    profiles.yml
    outside of the prefect package
    h
    j
    • 3
    • 9
  • h

    Hui Zheng

    12/15/2020, 6:23 PM
    Hello, Is there a way to set the MaximumDeadline for a flow, such that regardless retries or long-running tasks. The total duration of the flow shall not exceed certain amount of time. Once exceeded, the flow run will be marked as
    FAIL
    ?
    d
    • 2
    • 1
  • m

    Matt Drago

    12/16/2020, 12:00 AM
    Hey folks, I'm looking for some guidance on using the
    GCRResult
    . I have Prefect running in a kubernetes cluster, using the
    KubernetesRun
    with
    Docker
    storage. I've set the created the
    GCRResult
    using:
    result = GCSResult(
        bucket='REDACTED',
        location='flow-results/{flow_name}/{date:%Y/%m/%d}/{task_name}_{task_run_id}.prefect'
    )
    And assigned the result to the flow with:
    with Flow("Redacted", schedule=schedule, result=result) as flow:
    Weird thing is that the bucket name is being used to store the Result files, but not using the location template that I have provided. One thing that I noticed was that for one task (a function with the
    @task
    decorator), I passed in an argument with the name
    date
    and that Task did have it's Results stored in the location according to the template.
    c
    m
    • 3
    • 9
  • m

    Marc Lipoff

    12/16/2020, 1:50 AM
    what iam permissions are needed for an ecs agent? A complete list would be great, rather than trying to go one by one. ATM, I'm getting this error
    An error occurred (UnauthorizedOperation) when calling the DescribeVpcs operation: You are not authorized to perform this operation.
    k
    c
    • 3
    • 6
  • a

    Amanda Wee

    12/16/2020, 3:13 AM
    Hi friends, a question on memory usage. Currently, my team is only using Prefect Core, which we setup as a bunch of ECS tasks (each corresponding to what Prefect Cloud/Server calls a project) that runs the flows using flow.run() with a schedule. It works, but we found that the memory consumed by the flows was not released after the flow ended. As a workaround, we kill the ECS task after detecting that all the flows for that task have been run for the day, and then ECS kindly restarts the task for us with the memory usage reset. Not ideal, but the hack works so that subsequent flows for the day can reuse the memory released to the EC2 instance. In the coming few days, we're transitioning to Prefect Server using a single local agent with multiple projects, with both agent and flows (since the agent is local) in one ECS task. At startup, the ECS task will create the projects and register the flows, then start the local agent. Eventually we aim to move to use an ECS agent with Fargate, but for now: will we experience memory hogging with this setup?
    c
    • 2
    • 3
  • m

    mithalee mohapatra

    12/16/2020, 3:27 AM
    if name == '__main__':     schedule = Schedule(clocks=[IntervalClock(start_date=pendulum.datetime(2020, 12, 16,hour=2,minute=14,second=0),interval=timedelta(minutes=2))]) #schedule.next(5) with Flow("Hello", schedule) as flow:     say_hello() executor=LocalDaskExecutor(scheduler="processes", num_workers=6) flow.run(executor=executor) I am trying to run LocalDaskExecutor with prefect schedule but getting the below error. Just Flow.run() works fine with schedule though. with Flow("Hello", schedule) as flow: NameError: name 'schedule' is not defined
    c
    • 2
    • 6
Powered by Linen
Title
m

mithalee mohapatra

12/16/2020, 3:27 AM
if name == '__main__':     schedule = Schedule(clocks=[IntervalClock(start_date=pendulum.datetime(2020, 12, 16,hour=2,minute=14,second=0),interval=timedelta(minutes=2))]) #schedule.next(5) with Flow("Hello", schedule) as flow:     say_hello() executor=LocalDaskExecutor(scheduler="processes", num_workers=6) flow.run(executor=executor) I am trying to run LocalDaskExecutor with prefect schedule but getting the below error. Just Flow.run() works fine with schedule though. with Flow("Hello", schedule) as flow: NameError: name 'schedule' is not defined
c

Chris White

12/16/2020, 6:01 AM
Hi @mithalee mohapatra - please use triple backticks to format your code so we can better interpret the indentation levels; this appears to be a python issue, not a Prefect one — you need to ensure that you assign the
schedule
variable within the same scope as when you define your flow
m

mithalee mohapatra

12/16/2020, 7:00 AM
Thanks Chris. I fixed the scope. But now when I run my flow with LocalDaskexecutor and schedule ,my tasks are not running.Here is my code:@task def say_hello():     print("Hello, world!")     logger = prefect.context.get("logger")     logger.info("Hello, world!")  schedule = Schedule(clocks=[IntervalClock(start_date=pendulum.datetime(2020, 12, 16,hour=2,minute=14,second=0),interval=timedelta(minutes=2))]) with Flow("Hello",schedule) as flow:     say_hello() executor=LocalDaskExecutor(scheduler="processes", num_workers=6) flow.run(executor=executor)
The LocalDaskExecutor() works fine without the schedule.
I am wondering if the LocalDaskExecutor will work with the prefect Schedule? The DastExecutor is working fine the Schedule.
Thanks.working for me now.
View count: 2