• m

    Mike Nerone

    2 years ago
    Greetings! I have an existing app that is in essence an ETL app, but it currently runs in one process. I now wish to scale it out, and I’m hoping to use Prefect to do that. My question is this, and please forgive me if this is a n00b question: the result set that ultimately comes out of the Extract is potentially far more than will fit in memory, so it’s currently implemented as an (async) generator that makes paginated requests so it can yield chunks to the Transform. It doesn’t seem as if this pattern is currently supported by Prefect, so can you give me any advice on “The Prefect Way” to handle a large-data problem like this? My first thought was that instead of being a generator, the Extract task could itself kick off a parametrized flow for each chunk of data, but that seems like a lot of overhead for potentially thousands (or even tens of thousands) of chunks (and I’m not positive a task can do that anyway). Is there perhaps some other mechanism I might use to stream data between longer-running tasks?
    m
    Laura Lorenz (she/her)
    11 replies
    Copy to Clipboard
  • m

    Mac Gréco Péralte Chéry

    2 years ago
    Hello every one! Can you modify a [mapped] task name on runtime? I am doing a project where i do web-scraping to retrieve data for each institution (hospital) on a web site. instead of
    [2020-07-30 16:24:11] INFO - prefect.TaskRunner | Task 'Scraping Institution Data[0]': Starting task run...
    I would like to have
    [2020-07-30 16:24:11] INFO - prefect.TaskRunner | Task 'Scraping Institution Data[0] [hospital name]': Starting task run...
    I think this would be great when you visualize the flow run in the UI to know by the task name for which hospital the web scrapping Failed. Whithout going the the logs P.S Currently i am logging the Hospital name to know What hospital data is currently web-scraped
    @task(name="Scraping Institution Data")
    def scrape_institution_data(instActionIdTuple):
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(f"Scraping for site: {instActionIdTuple[0]}")
    So at runtime i get:
    [2020-07-30 16:24:11] INFO - prefect.TaskRunner | Task 'Scraping Institution Data[0]': Starting task run...
    [2020-07-30 16:24:11] INFO - prefect.Scraping Institution Data[0] | Scraping for site: Hôspital Claire Heureuse de Marchand Dessalines
    m
    i
    +1
    3 replies
    Copy to Clipboard
  • Amit

    Amit

    2 years ago
    Did anyone had any timezone issues while running flow, with a docker storage, which was built using a custom Dockerfile? (This is ran on Kubernetes, via Kubernetes Agent) I get the following error, while running a flow:
    Traceback (most recent call last):
      File "/opt/conda/envs/my_project/bin/prefect", line 10, in <module>
        sys.exit(cli())
      File "/opt/conda/envs/my_project/lib/python3.7/site-packages/click/core.py", line 829, in __call__
        return self.main(*args, **kwargs)
      File "/opt/conda/envs/my_project/lib/python3.7/site-packages/click/core.py", line 782, in main
        rv = self.invoke(ctx)
      File "/opt/conda/envs/my_project/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
        return _process_result(sub_ctx.command.invoke(sub_ctx))
      File "/opt/conda/envs/my_project/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
        return _process_result(sub_ctx.command.invoke(sub_ctx))
      File "/opt/conda/envs/my_project/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
        return ctx.invoke(self.callback, **ctx.params)
      File "/opt/conda/envs/my_project/lib/python3.7/site-packages/click/core.py", line 610, in invoke
        return callback(*args, **kwargs)
      File "/opt/conda/envs/my_project/lib/python3.7/site-packages/prefect/cli/execute.py", line 50, in cloud_flow
        client = Client()
      File "/opt/conda/envs/my_project/lib/python3.7/site-packages/prefect/client/client.py", line 82, in __init__
        self._access_token_expires_at = pendulum.now()
      File "/opt/conda/envs/my_project/lib/python3.7/site-packages/pendulum/__init__.py", line 211, in now
        dt = _datetime.datetime.now(local_timezone())
      File "/opt/conda/envs/my_project/lib/python3.7/site-packages/pendulum/tz/__init__.py", line 60, in local_timezone
        return get_local_timezone()
      File "/opt/conda/envs/my_project/lib/python3.7/site-packages/pendulum/tz/local_timezone.py", line 35, in get_local_timezone
        tz = _get_system_timezone()
      File "/opt/conda/envs/my_project/lib/python3.7/site-packages/pendulum/tz/local_timezone.py", line 63, in _get_system_timezone
        return _get_unix_timezone()
      File "/opt/conda/envs/my_project/lib/python3.7/site-packages/pendulum/tz/local_timezone.py", line 242, in _get_unix_timezone
        raise RuntimeError("Unable to find any timezone configuration")
    RuntimeError: Unable to find any timezone configuration
    Amit
    Jackson Maxfield Brown
    +2
    7 replies
    Copy to Clipboard
  • Skip Breidbach

    Skip Breidbach

    2 years ago
    Small oddity here. I've been working with a Docker agent and "Non-Docker Storage for Containerized Environments". I was having some issues getting the container to be able to pull from S3, but added an env var to my container and all was well. Then I read further and saw the "Authentication for using Cloud Storage with Containerized Environments" section and thought I'd try that, essentially adding
    secrets
    to the flow storage. But when I do, I get a surprising (to me) error when the flow is executed:
    Failed to load and execute Flow's environment: HTTPError('400 Client Error: Bad Request for url: http://host.docker.internal:4200/graphql')
    It looks to me like the container thinks it's using the cloud backend or something? Any ideas?
    Skip Breidbach
    Chris White
    4 replies
    Copy to Clipboard
  • m

    Michael Ludwig

    2 years ago
    We are doing our first experiments with mappings. In theory we like them but in praxis we saw some not explainable behaviour and would be curious if we are doing something wrong or if something is broken. Our flows have an integrated Skip Functionality (from times before the cached state and the Results where existing). This means when I re-run the “same” flow (new flow but on the same input data as before) already computed tasks are sending a SKIP signal. That works also great. When adding mapped function we suddenly see 1) old tasks which were already computed being re-executed and 2) mapped tasks to be failed instead of skipped when the parent was skipped. For 1) we still need to find out how we produced this but for 2) I have this screenshot:
  • m

    Michael Ludwig

    2 years ago
    So both parent tasks in that screenshot raise a SKIP signal and the task which should be mapped complains about that he had no states to be mapped. This is true but he should be skipped as well as the parents are skipped. Any ideas?
    prefect==0.12.2
  • m

    Michael Ludwig

    2 years ago
    This is how we apply map:
    bucket_prefix = create_bucket_prefix_rfy()
    prediction_single_files = list_files(s3_folder=prediction)
    return load_rfy.map(
        input_data_location=prediction_single_files,
        bucket_prefix=unmapped(bucket_prefix),
    )
    m
    nicholas
    +1
    4 replies
    Copy to Clipboard
  • psimakis

    psimakis

    2 years ago
    Hello community, I just wanted to let you know that preview.prefect.io is not responding. If you are already aware of this issue, then ignore this message 😄 Wishes.
    psimakis
    Jenny
    3 replies
    Copy to Clipboard
  • c

    Chris Martin

    2 years ago
    Hi- if you have some common code that should be shared between flows (let's say creation of some common resource, or a custom task type) what's the best way to lay this out? I'm currently putting this in a separate library and making sure it's available in the docker base image, but it feels a bit suboptimal as the only point of this code is to be used by the flows and so ideally it would live alongside them. Is there something better I can do here?
    c
    nicholas
    2 replies
    Copy to Clipboard