• l

    Luis Muniz

    2 years ago
    Hi we are trying to use
    register()
    to push a flow to a ECR (amazon) private registry, but we can't figure out how prefect picks up the credentials to do it. We know that with the command line
    docker build
    and
    docker push
    , you need to execute a special
    docker login
    with
    aws ecr get-login...
    command, but don't know how to pass this on to prefect for register(). We feel like we miss a step.
    l
    Dylan
    +3
    35 replies
    Copy to Clipboard
  • d

    Dan Ball

    2 years ago
    Hi all! I’m experimenting with porting an old Luigi dag to Prefect and wondering what the most idiomatic approach would be. (I asked a similar question on the Airflow Slack because I’m trying to wrap my head around the differences in approach.) My current process: • Parses the HTML of a webpage every hour and fetches a timestamp value indicating the last time a linked file was updated • If the ETL DAG has not been run for that timestamp, it downloads the file, does some processing, uploads to S3, etc. • If it has been run successfully for that timestamp, does nothing and checks again in another hour. Do I want to create a DAG that ends with a task that creates a Result target on success, and begin the DAG with a task that checks for the existence of that target? Or is there a more Prefecty approach to checking whether the DAG has been successfully run with a given parameter (in this case, a fetched external timestamp) and skip it if so?
    d
    Dylan
    3 replies
    Copy to Clipboard
  • b

    Benjamin

    2 years ago
    Hello everyone. I'm running a POC with prefect in a very simple flow: read data from s3, standardize numbers and write result to s3. For the moment we're trying local backend server (and not prefect cloud version) and we're getting a strange behavior: everything runs fine when use
    flow.run(executor=executor)
    but not with
    flow.register()
    . We're using using the FargateCluster to create a new cluster for the flow and it's setup properyl with flow.register but no processing is done. I had asked this before a couple of days ago here but now I have a reproducible exemple. I'll provide the details in this thread. Thanks a lot
    b
    Chris White
    23 replies
    Copy to Clipboard
  • b

    Bob Colner

    2 years ago
    hey hey, I'm a bit stumped trying to implement a data ingestion pattern in prefect. Is it possible to access task parameter values when templating the result 'target' filename? I'm trying to have my results stored in a directory structure (aka 'hive' dir partitioning). Thanks for any insight
    b
    Dylan
    +1
    22 replies
    Copy to Clipboard
  • g

    Greg Desmarais

    2 years ago
    Hi all, newcomer to prefect - happy so far. I'm stuck a bit in a Dask integrated prefect environment. I have a simple flow that looks something like this:
    @task(log_stdout=True)
    def say_hello(name):
        print(f'{datetime.now()}: workflow hello {name}', flush=True)
        worker = get_worker()
        return f'done on {worker.name}, scheduler at {worker.scheduler.address}'
    
    name = Parameter('name')
    with Flow("Simple parallel hello") as flow:
        # Since there is no return value dependency, we end up with possible parallel operations
        for i in range(10):
            say_hello(name)
    If I run the flow from my script, targeting a particular Dask cluster, I can hit the right Dask workers:
    executor = DaskExecutor(address=dask_scheduler_address)
    flow.run(executor=executor)
    My question is about registering this flow and running it, say, from the prefect ui. I can easily register the flow with:
    flow.register()
    But then trying to run it from the ui just hangs. I'm pretty sure it is because the executor isn't registered with the flow. Am I missing something? Thanks in advance...
    g
    Chris White
    +1
    7 replies
    Copy to Clipboard
  • a

    Adrien Boutreau

    2 years ago
    Hello! I have an issue in production, graphql seems to be down : when I call xxx.eu-west-2.compute.amazonaws.com:4200/graphql I have 4200: Connection refused and when I check docker container ls it's not on the list - do you know how I can restart it ? PostGre is also down
    a
    j
    8 replies
    Copy to Clipboard
  • v

    Vitor Avancini

    2 years ago
    Hello everyone, anyone have an idea on how to write a GQL query for marking as success upstream tasks? What i'm trying to do here is to run a dag from a specific task forward. I had bug in my code, fixed id, registered a new version, and now I have to rerun the dag. The thing is that alot already succesful tasks would ran again It takes almost one hour to run again.
    v
    j
    2 replies
    Copy to Clipboard
  • b

    Bob Colner

    2 years ago
    question about mapping tasks -is it possible for a task to be 'mapped' over multiple iterators? (think task with 2 parameters and I want to 'map' over both with a list of param values)
    b
    j
    +1
    12 replies
    Copy to Clipboard
  • b

    Bob Colner

    2 years ago
    Hi all, I'm working on some custom prefect serializers for pandas df results. I have something that seems to work but I don't think it is very robust (requires creating a local file).
    class ParquetSerializer(Serializer):
    
        def serialize(self, value: pd.DataFrame) -> bytes:
            # transform a Python object into bytes
            tmp_filename = str(time_ns()) + '.parquet'
            value.to_parquet(
                path=tmp_filename,
                index=False
            )
            with open(tmp_filename, 'rb') as in_file:
                df_bytes = in_file.read()
            Path(tmp_filename).unlink()
            return df_bytes
    
        def deserialize(self, value:bytes) -> pd.DataFrame:
            # recover a Python object from bytes        
            df_bytes_io = BytesIO(value)
            df = pd.read_parquet(df_bytes_io)
            return df
    Does anyone have thoughts about the above approach? (saving as a local file then reading the bytes from the file?)
    b
    Jeremiah
    +1
    14 replies
    Copy to Clipboard
  • b

    Bob Colner

    2 years ago
    Hi again, I wanted to mention that I've successfully replaced airflow with prefect in production for all our ETL/BI and ML workflows at Instasize.com! Big Thanks for all the help/suport along the way. We ended up building a google cloud centric solution around prefect core. We did look into using prefect cloud (I like the dash/features, cost not an issue) but had friction with the implementation at this time. If anyone on the team is interested I'm happy provide any feedback about our experience. Thanks again!
    b
    Jeremiah
    3 replies
    Copy to Clipboard