Hi everyone, I am getting the following error, when trying to write a pandas dataframe into google ...
f

Felix Vemmer

over 4 years ago
Hi everyone, I am getting the following error, when trying to write a pandas dataframe into google Cloud Storage:
Unexpected error: TypeError("__init__() got an unexpected keyword argument 'client_options'")
Traceback (most recent call last):
  File "/Users/felixvemmer/.pyenv/versions/3.8.6/envs/automation_beast/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/Users/felixvemmer/.pyenv/versions/3.8.6/envs/automation_beast/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 891, in get_task_run_state
    result = self.result.write(value, **formatting_kwargs)
  File "/Users/felixvemmer/.pyenv/versions/3.8.6/envs/automation_beast/lib/python3.8/site-packages/prefect/engine/results/gcs_result.py", line 77, in write
    self.gcs_bucket.blob(new.location).upload_from_string(binary_data)
  File "/Users/felixvemmer/.pyenv/versions/3.8.6/envs/automation_beast/lib/python3.8/site-packages/prefect/engine/results/gcs_result.py", line 41, in gcs_bucket
    client = get_storage_client()
  File "/Users/felixvemmer/.pyenv/versions/3.8.6/envs/automation_beast/lib/python3.8/site-packages/prefect/utilities/gcp.py", line 53, in get_storage_client
    return get_google_client(storage, credentials=credentials, project=project)
  File "/Users/felixvemmer/.pyenv/versions/3.8.6/envs/automation_beast/lib/python3.8/site-packages/prefect/utilities/gcp.py", line 31, in get_google_client
    client = Client(project=project, credentials=credentials)
  File "/Users/felixvemmer/.pyenv/versions/3.8.6/envs/automation_beast/lib/python3.8/site-packages/google/cloud/storage/client.py", line 122, in __init__
    super(Client, self).__init__(
TypeError: __init__() got an unexpected keyword argument 'client_options'
I am running a task thats returning a
pd.DataFrame
which I am trying to store into Google Cloud Storage:
pandas_serializer = PandasSerializer(
    file_type='csv'
)

gcs_result = GCSResult(
    bucket='tripliq-data-lake',
    serializer=pandas_serializer,
    location=f'linkedin_top_posts/{datetime.datetime.now().strftime("%Y%m%d-%H%M%S")}_linkedin_post_likes.csv'
)

like_linkedin_feed = LikeLinkedInFeed(
    result=gcs_result
)
I am not understanding the source code too well, but I think it’s referring to this line in
site-packages/google/cloud/storage/client.py
def __init__(
self,
project=_marker,
credentials=None,
_http=None,
client_info=None,
client_options=None,
):
Any help is very much appreciated!
I have a basic question on how to handle submitting large quantities of tasks to ACI. I have a limit...
p

Paweł Biernat

over 1 year ago
I have a basic question on how to handle submitting large quantities of tasks to ACI. I have a limited number of instances, say 100 and a very large number of tasks to assign to them (e.g. 10'000). Every task should get its own instance, so I'm deploying a worker flow first, and then use run_deployment to deploy it on its own instance. I tested that on 100s of tasks and 10s of instances and seems to work fine, but when I try to scale it up I'm running into issues. When submitting large number of tasks I noticed that only some actually get submitted before the flow gives up and either hangs or starts with just the subset of takss. • When submitting 10'000 tasks only ~1'000 (randomly more or less) actually show up in Task Runs tab. The submit_task flow either hangs during task submission or starts running with fewer tasks as normal. • I tried adding rate_limit and it doesn't solve the issue, even fewer tasks get submitted. With rate limit on I got a lot of "too many open files" errors from asyncio and "Crash detected! Execution was cancelled by the runtime environment.". There are also secondary issues related to UI: • UI shows all the tasks when they get submitted, even though I can only run e.g. 100 at a time. This generates a lot of clutter. • When the deployment starts, UI assigns it to the submit_task flow (which is the parent flow) and shows it parallel to the run_deployment_task flow. The run_deployment_task effectively has no "children" and no ties to the deployment. This clutters the UI even more. I guess this could be avoided by defining it as a flow but then I can't' use retries or .submit on it. My goal is to be able to submit tasks to be executed on 1'000s of instances in parallel, with a large queue of up to 100'000 individual tasks. Another complication is that these 1'000s of instances have to be run across multiple locations, but I'll leave that for later. Here's a minimal example of my implementation so far. Overall I have a feeling that I am not doing it The Prefect Way.
from prefect import flow, task
from prefect.concurrency.sync import concurrency
from prefect.deployments import run_deployment
from prefect.states import raise_state_exception


@flow(log_prints=True)
def worker(a: int):
    print(f"Run a worker with a={a}")


# retries for handling infrastructure issues
@task(retries=10)
def run_deployment_task(deployment_name, parameters):
    with concurrency("aci-max-instances"):
        flow_run = run_deployment(name=deployment_name, parameters=parameters)
        raise_state_exception(flow_run.state)


# A flow that submits multiple tasks
@flow
def submit_tasks(n_tasks: int):
    # actual parameters go here
    parameters = [{"a": i} for i in range(n_tasks)]
    for task_parameters in parameters:
        run_deployment_task.submit(
            deployment_name="worker/test-deployment",
            parameters=task_parameters,
        )


if __name__ == "__main__":
    worker.deploy(
        name="test-deployment",
        work_pool_name="aci-pool",
        image="worker:test",
    )
    submit_tasks.deploy(
        name="test-submit-deployment",
        work_pool_name="aci-pool",
        image="submit-tasks:test",
    )
    run_deployment(
        "submit-tasks/test-submit-deployment",
        parameters={"n_tasks": 10_000},
    )
Hi, I am trying to setup self hosted prefect server on AKS and use k8s workpools on AKS to run the f...
s

Sharath Chandra

over 1 year ago
Hi, I am trying to setup self hosted prefect server on AKS and use k8s workpools on AKS to run the flows. So far I have installed the following components on AKS - prefect-server - prefect worker I haven't created a public url for the prefect server yet and using the port forwarding method to view the web ui. I can browse thru the web ui at http://localhost:4200/ and see the k8s workpool as well. The next step is to have the deployment and flow registered on the server. I am trying to register the deployment using code
from prefect import flow, get_run_logger,serve
from cuebox.pipeline.tasks.task1 import cowsay_hello, log_current_path

@flow
def hello(name: str = "Marvin"):
    get_run_logger().info(f"Hello {name}!")
    cowsay_hello(name)
    log_current_path()


# This is here so that we can invoke the script directly for testing
if __name__ == "__main__":
    deploy = hello.to_deployment(name="hello-prefect-2", work_pool_name="dummy-pool")
    serve(deploy)
I have updated the prefect config with the following:
prefect config set PREFECT_API_URL=<http://localhost:4200/api>
This isn't working and I see an error
File "/Users/sharath/Library/Caches/pypoetry/virtualenvs/hello-prefect-2-eDijfXV1-py3.11/lib/python3.11/site-packages/prefect/flows.py", line 1768, in serve
    await runner.add_deployment(deployment)
  File "/Users/sharath/Library/Caches/pypoetry/virtualenvs/hello-prefect-2-eDijfXV1-py3.11/lib/python3.11/site-packages/prefect/runner/runner.py", line 201, in add_deployment
    deployment_id = await deployment.apply()
                    ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/sharath/Library/Caches/pypoetry/virtualenvs/hello-prefect-2-eDijfXV1-py3.11/lib/python3.11/site-packages/prefect/deployments/runner.py", line 324, in apply
    raise DeploymentApplyError(
prefect.deployments.runner.DeploymentApplyError: Error while applying deployment: Client error '422 Unprocessable Entity' for url '<http://localhost:4200/api/deployments/>'
Response: {'exception_message': 'Invalid request received.', 'exception_detail': [{'type': 'extra_forbidden', 'loc': ['body', 'infra_overrides'], 'msg': 'Extra inputs are not permitted', 'input': {}}], 'request_body': {'infra_overrides': {}, 'name': 'hello-prefect-2', 'flow_id': 'c722711e-39ff-47b6-9ce2-b91c0d200132', 'paused': False, 'schedules': [], 'enforce_parameter_schema': False, 'parameter_openapi_schema': {'title': 'Parameters', 'type': 'object', 'properties': {'name': {'default': 'Marvin', 'position': 0, 'title': 'name', 'type': 'string'}}}, 'parameters': {}, 'tags': [], 'pull_steps': [], 'manifest_path': None, 'work_queue_name': None, 'work_pool_name': 'dummy-pool', 'storage_document_id': None, 'infrastructure_document_id': None, 'schedule': None, 'description': None, 'path': '.', 'version': 'e7e9c463adcb09d8153871ec26ed091e', 'entrypoint': 'cuebox/pipeline/flows/hello.py:hello'}}
For more information check: <https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/422>
I error states
422 Unprocessable Entity
How can I register the deployment & flow on the prefect server from my local?