• sarika gowtham

    sarika gowtham

    9 months ago
    #dask error. Unable to reach tcp ip Timed out trying to connect to tcp://172.16.0.5:8786 after 30 s while executing
    executor = DaskExecutor(address="<tcp://127.0.0.1:8787>")
    flow.run(executor=executor)
    sarika gowtham
    1 replies
    Copy to Clipboard
  • Zheng Xie

    Zheng Xie

    9 months ago
    Zheng Xie
    Anna Geller
    3 replies
    Copy to Clipboard
  • Zheng Xie

    Zheng Xie

    9 months ago
    I have tried to follow the document to do some troubleshooting, but can’t figure it out.
    Zheng Xie
    1 replies
    Copy to Clipboard
  • Tilak Maddy

    Tilak Maddy

    9 months ago
    I want to know what exactly does the prefect agent pull from storage ? I'm asking this because in the main flow file which is
    with Flow(...) as flow:
        a = first_task()
        b = second_task()
    say there are tasks which are defined and imported from other files (which indeed call other tasks and so on..) There is no way I can look at just this main file and tell how the entire flow is gonna look like ? What kind of dependencies all the tasks will have, retries, etc, etc Basically there isn't much info I can get . So what are we doing here ? (and why?)
    Tilak Maddy
    Anna Geller
    29 replies
    Copy to Clipboard
  • Ievgenii Martynenko

    Ievgenii Martynenko

    9 months ago
    Hi, I have a task that should be executed each 30 minutes; sometimes task is running longer than 30 minutes, and at the same time another instance of this Task is started in parallel which is causing logical and performance issues. Is there any option not to start the same Task if previous execution is not finished yet for non-cloud? (seems like for cloud it's possible using labels and number of parallel executions allows).
    Ievgenii Martynenko
    Anna Geller
    +1
    11 replies
    Copy to Clipboard
  • Jason Motley

    Jason Motley

    9 months ago
    Is there a Prefect cloud/UI equivalent of this? https://docs.prefect.io/core/idioms/flow-to-flow.html
    Jason Motley
    Kevin Kho
    19 replies
    Copy to Clipboard
  • John Shearer

    John Shearer

    9 months ago
    Is it expected that running a local flow with
    PREFECT__FLOWS__CHECKPOINTING=false
    but with checkpoint data present in the prefect result directory would read from those results? - I would expect this, but this is the current behaviour (on my machine ...)
    John Shearer
    Anna Geller
    +1
    17 replies
    Copy to Clipboard
  • Erik Schomburg

    Erik Schomburg

    9 months ago
    Hello! I’m trying to create a convert an existing script into a prefect flow, and the
    task.map
    functionality doesn’t quite work as I had expected. I’m aggregating a bunch of data sources and rows into a DataFrame, and previously this had been sped up by running it over subsets of keys, with each subset running in a different process. So in prefect, I have a task that splits the keys into subsets, and then maps a task over those subsets. The problem is that there’s a small probability of failure on each subset, due to connection timeouts. I have added some retry logic, but still want to make sure that successful sub-tasks have their results checkpointed, and unsuccessful ones are not. But the
    results = task.map(subset=subsets)
    code instead just stores the
    results
    in a single file, and then does not re-run unsuccessful sub-tasks. I tried adding
    {map_index}
    to the task target filename pattern, but this did not work (update: it does work, I just had extra
    {
    brackets, i.e.,
    {{map_index}}
    🤦). Here’s the basic flow:
    all_keys = get_keys_task()
    key_subsets = partition_keys_task(all_keys, n_subsets)
    data_subsets = get_data_task.map(keys=key_subsets)
    all_data = concatenate_subsets_task(data_subsets=data_subsets)
    I know I can work around this by writing my own utility to create a list of tasks with their own unique task names, but it seems like part of the point of
    .map
    ought to be to do this sort of results management for you... Any tips? Maybe there’s just a parameter in the
    prefect.task
    decorator or the
    task.map
    function I don’t know about?
    Erik Schomburg
    Kevin Kho
    9 replies
    Copy to Clipboard
  • Tony Yun

    Tony Yun

    9 months ago
    Hello, we’ve recently noticed that the exception logs in Production are printing out personal home folder path. Do you know how this is happened?
    @task(log_stdout=True)
    def test():
        print(f'current folder: {Path.cwd()}')
        raise Exception('test exception')
    
    Task 'test': Exception encountered during task execution!
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 863, in get_task_run_state
        logger=self.logger,
      File "/usr/local/lib/python3.7/site-packages/prefect/utilities/executors.py", line 445, in run_task_with_timeout
        return task.run(*args, **kwargs)  # type: ignore
      File "/Users/tonyyun/github/data_operations/dbt_automation/flow.py", line 296, in test
    Exception: test exception
    , where
    /Users/tonyyun/github
    shouldn’t be there^
    Tony Yun
    Kevin Kho
    +1
    9 replies
    Copy to Clipboard
  • Jeffery Newburn

    Jeffery Newburn

    9 months ago
    We are using prefect cloud. We are curious how to get audit logs out of the server to push them to our monitoring system? Is there something we can use that comes with the system for exporting. Is there any documentation on this?
    Jeffery Newburn
    Kevin Kho
    +1
    11 replies
    Copy to Clipboard