• j

    jorwoods

    2 years ago
    What is necessary to get Prefect to use checkpointing? It seems there is something missing from my simplified test code, but unclear what:
    from prefect import Flow, task, unmapped, Parameter
    from prefect.engine.results import LocalResult
    from prefect.engine.executors import LocalDaskExecutor
    import prefect
    
    lr = LocalResult(location='{flow_name}-{task_name}-{map_index}.txt')
    
    @task(log_stdout=True, checkpoint=True,)
    def add(x, y):
        return x + y
    
    with Flow('iterated map', result=lr) as flow:
        y = unmapped(Parameter('y', default=10))
        mapped_result = add.map([1, 2, 3], y=y)
    
    flow.run(executor=LocalDaskExecutor())
    j
    Chris White
    2 replies
    Copy to Clipboard
  • j

    jorwoods

    2 years ago
    One more question around checkpointing:
    target (str, optional): location to check for task Result. If a result
                exists at that location then the task run will enter a cached state.
                `target` strings can be templated formatting strings which will be
                formatted at runtime with values from `prefect.context`
    If I have a mapped task, and I want it to create separate outputs per map, I see
    map_index
    in
    prefect.context
    , but that relies on my arguments being in the same order each time, correct? Is there a way to pass Parameter values into this
    target
    or the result's location kwarg such that when I look at the directory containing my checkpoint files, I quickly know which ones have completed?
    j
    Zachary Hughes
    +1
    6 replies
    Copy to Clipboard
  • Crawford Collins

    Crawford Collins

    2 years ago
    Hi I'm trying to do a lot of work with mapped functions. How do i merge the results of 2 tasks together? For example
    imputed_categorical_dfs
    returns two objects. I need to merge these with another task which does the same.
    My code which does not work.
    transformed_df_map = merge_transformed_data.map(
            df1=[*imputed_categorical_dfs, *encoded_df_map],
            df2=[*imputed_numeric_dfs] + [*yeo_johnson_dfs],
        )
    Crawford Collins
    Zachary Hughes
    3 replies
    Copy to Clipboard
  • b

    Ben Davison

    2 years ago
    Hiya, for prefect server. Are there any statsd metrics that can be hooked into? Or any events we can use for monitoring prefect internally.
    b
    1 replies
    Copy to Clipboard
  • a

    Avi A

    2 years ago
    Hey there, I have a problem with prefect server. My setup used to work but now there might be something wrong with the version or something. I’m getting this error in the UI when running a flow:
    [3:18pm]: Exception raised while calling state handlers: HTTPError('400 Client Error: Bad Request for url: <http://localhost:4200/graphql/alpha>')
    Any idea on how to start debugging?
    a
    nicholas
    +1
    37 replies
    Copy to Clipboard
  • r

    Rodrigo Neves

    2 years ago
    Hello guys, first time here and recent user of prefect. First of all I need to say it that so far I’m loving it, keep with the awesome work!! I need help to compose a flow with several task with map. Currently I can do it, but each task is only processed after all task from last step are done. This is sub-optimal since each branch of the pipeline(each .map()) are independent of each other. There is any way of achieving that, out-of-the-box??
    def run_flow_step_0_map(self, df):        
            cols = list(df.items())
            with Flow("pipeline_map") as flow:
                col = self.get_data.map(unmapped(self), cols)
                col = self.task1.map(unmapped(self), col)
                col = self.task2.map(unmapped(self), col)
                col = self.task3.map(unmapped(self), col)
                col = self.task4.map(unmapped(self), col)
                col = self.task5.map(unmapped(self), col)
                result = self.task5.map(unmapped(self), col)
    
            return flow
    (is confusing the explanation, if you need extra info just say it)
    r
    nicholas
    +1
    18 replies
    Copy to Clipboard
  • Simone Cittadini

    Simone Cittadini

    2 years ago
    Hi! I'm looking at the code and can't find my way around, can you kindly point me to the place where Flows are run on the server ? I use eliot for logs, and I'd like flow runs to live in an action context. Easily done locally overriding the run method, but I can't find my way around server code ( coming from airflow, love your job, it's a lifesaver ! )
    Simone Cittadini
    nicholas
    4 replies
    Copy to Clipboard
  • Radu

    Radu

    2 years ago
    Hello everyone! I just joined this group, as I was assigned to do a POC with Prefect 🙂 I hope I'll get useful information reading through these channels. Cheers!
    Radu
    nicholas
    2 replies
    Copy to Clipboard
  • Mary Clair Thompson

    Mary Clair Thompson

    2 years ago
    Hi folks! I'm running prefect on a vm and having issues when the box reboots. The docker containers and prefect server restart just fine, but the agent doesn't--so scheduled jobs don't start. What's the best way to deal with this?
    Mary Clair Thompson
    a
    +1
    3 replies
    Copy to Clipboard
  • n

    Nelson

    2 years ago
    Hi everyone! We’re trialling Prefect (so far good docs and user experience), I’m just overwhelmed when going from a single local environment to a multi-environment distributed setup (this will help a lot https://github.com/PrefectHQ/prefect/issues/2508#issue-613607085 !). For parametrising resources, I tried the below
    config.toml
    to have different buckets per env:
    source_data_bucket = "${environments.${environment}.source_data_bucket}"
    transient_data_bucket = "${environments.${environment}.transient_data_bucket}"
    
    [environments]
    
        [environments.dev-nelson]
            source_data_bucket = "<s3://REDACTED>"
            transient_data_bucket = "<s3://REDACTED>"
    
        [environments.prod]
            source_data_bucket = "<s3://REDACTED>"
            transient_data_bucket = "<s3://REDACTED>"
    I can print the
    prefect.config.transient_data_bucket
    inside a task, but when used as
    S3Result(bucket=prefect.config.transient_data_bucket)
    it fails with
    Invalid bucket name ""
    . How are others doing this? Note I’m providing this result as a task config
    @task(
        target="{date:%Y-%m-%d}/crunchbase-raw.cloudpickle",
        result=S3Result(bucket=prefect.config.transient_data_bucket)
    n
    nicholas
    +1
    11 replies
    Copy to Clipboard